Skip to content

Commit

Permalink
update syncing logic to fix duplicate block requests
Browse files Browse the repository at this point in the history
  • Loading branch information
kpandl committed Oct 3, 2024
1 parent 2349f3e commit 9f546d8
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 29 deletions.
33 changes: 27 additions & 6 deletions node/bft/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,18 @@ impl<N: Network> Sync<N> {
// Try to advance the ledger *to tip* without updating the BFT.
while let Some(block) = self.block_sync.process_next_block(current_height) {
info!("Syncing the ledger to block {}...", block.height());
self.sync_ledger_with_block_without_bft(block).await?;
// Update the current height.
current_height += 1;
// Sync the ledger with the block without BFT.
match self.sync_ledger_with_block_without_bft(block).await {
Ok(_) => {
// Update the current height if sync succeeds.
current_height += 1;
}
Err(e) => {
// Mark the current height as processed in block_sync.
self.block_sync.remove_block_response(current_height);
return Err(e);
}
}
}
// Sync the storage with the ledger if we should transition to the BFT sync.
if current_height > max_gc_height {
Expand All @@ -331,9 +340,17 @@ impl<N: Network> Sync<N> {
while let Some(block) = self.block_sync.process_next_block(current_height) {
info!("Syncing the BFT to block {}...", block.height());
// Sync the storage with the block.
self.sync_storage_with_block(block).await?;
// Update the current height.
current_height += 1;
match self.sync_storage_with_block(block).await {
Ok(_) => {
// Update the current height if sync succeeds.
current_height += 1;
}
Err(e) => {
// Mark the current height as processed in block_sync.
self.block_sync.remove_block_response(current_height);
return Err(e);
}
}
}
Ok(())
}
Expand All @@ -354,6 +371,8 @@ impl<N: Network> Sync<N> {
self_.storage.sync_height_with_block(block.height());
// Sync the round with the block.
self_.storage.sync_round_with_block(block.round());
// Mark the block height as processed in block_sync.
self_.block_sync.remove_block_response(block.height());

Ok(())
})
Expand Down Expand Up @@ -501,6 +520,8 @@ impl<N: Network> Sync<N> {
.await??;
// Remove the block height from the latest block responses.
latest_block_responses.remove(&block_height);
// Mark the block height as processed in block_sync.
self.block_sync.remove_block_response(block_height);
}
} else {
debug!(
Expand Down
64 changes: 41 additions & 23 deletions node/sync/src/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,20 @@ impl<N: Network> BlockSync<N> {
/// Returns the next block to process, if one is ready.
#[inline]
pub fn process_next_block(&self, next_height: u32) -> Option<Block<N>> {
// Try to advance the ledger with a block from the sync pool.
self.remove_block_response(next_height)
// Acquire the requests write lock.
// Note: This lock must be held across the entire scope, due to asynchronous block responses
// from multiple peers that may be received concurrently.
let requests = self.requests.write();

// Determine if the request is complete.
let is_request_complete = requests.get(&next_height).map(|(_, _, peer_ips)| peer_ips.is_empty()).unwrap_or(true);

// If the request is not complete, return early.
if !is_request_complete {
return None;
}

self.responses.read().get(&next_height).cloned()
}

/// Attempts to advance with blocks from the sync pool.
Expand All @@ -347,20 +359,33 @@ impl<N: Network> BlockSync<N> {

/// Handles the block responses from the sync pool.
fn try_advancing_with_block_responses(&self, mut current_height: u32) {
while let Some(block) = self.remove_block_response(current_height + 1) {
while let Some(block) = self.process_next_block(current_height + 1) {
// Ensure the block height matches.
if block.height() != current_height + 1 {
warn!("Block height mismatch: expected {}, found {}", current_height + 1, block.height());
break;
}
// Check the next block.
if let Err(error) = self.canon.check_next_block(&block) {
warn!("The next block ({}) is invalid - {error}", block.height());
break;
}
// Attempt to advance to the next block.
if let Err(error) = self.canon.advance_to_next_block(&block) {
warn!("{error}");

// Try to check the next block and advance to it.
let advanced = match self.canon.check_next_block(&block) {
Ok(_) => match self.canon.advance_to_next_block(&block) {
Ok(_) => true,
Err(err) => {
warn!("{err}");
false
}
},
Err(err) => {
warn!("{err}");
false
}
};

// Remove the block response.
self.remove_block_response(current_height + 1);

// If advancing failed, exit the loop.
if !advanced {
break;
}
// Update the latest height.
Expand Down Expand Up @@ -605,32 +630,25 @@ impl<N: Network> BlockSync<N> {
self.request_timestamps.write().remove(&height);
}

/// Removes and returns the block response for the given height, if the request is complete.
fn remove_block_response(&self, height: u32) -> Option<Block<N>> {
/// Removes the block response for the given height
/// This may only be called after `process_next_block`, which checked if the request for the given height was complete.
pub fn remove_block_response(&self, height: u32) {
// Acquire the requests write lock.
// Note: This lock must be held across the entire scope, due to asynchronous block responses
// from multiple peers that may be received concurrently.
let mut requests = self.requests.write();

// Determine if the request is complete.
let is_request_complete = requests.get(&height).map(|(_, _, peer_ips)| peer_ips.is_empty()).unwrap_or(true);

// If the request is not complete, return early.
if !is_request_complete {
return None;
}
// Remove the request entry for the given height.
requests.remove(&height);
// Remove the request timestamp entry for the given height.
self.request_timestamps.write().remove(&height);
// Remove the response entry for the given height.
self.responses.write().remove(&height)
self.responses.write().remove(&height);
}

/// Removes the block request for the given peer IP, if it exists.
#[allow(dead_code)]
fn remove_block_request_to_peer(&self, peer_ip: &SocketAddr, height: u32) {
let mut can_revoke = self.responses.read().get(&height).is_none();
let mut can_revoke = self.process_next_block(height).is_none();

// Remove the peer IP from the request entry. If the request entry is now empty,
// and the response entry for this height is also empty, then remove the request entry altogether.
Expand Down

0 comments on commit 9f546d8

Please sign in to comment.