From 9f546d8e93592d64fade51c0cc1d96c96ae1845b Mon Sep 17 00:00:00 2001 From: Konstantin Pandl <16907747+kpandl@users.noreply.github.com> Date: Thu, 3 Oct 2024 15:01:06 +0200 Subject: [PATCH] update syncing logic to fix duplicate block requests --- node/bft/src/sync/mod.rs | 33 +++++++++++++++---- node/sync/src/block_sync.rs | 64 ++++++++++++++++++++++++------------- 2 files changed, 68 insertions(+), 29 deletions(-) diff --git a/node/bft/src/sync/mod.rs b/node/bft/src/sync/mod.rs index 46cb269a54..5204c85415 100644 --- a/node/bft/src/sync/mod.rs +++ b/node/bft/src/sync/mod.rs @@ -315,9 +315,18 @@ impl Sync { // Try to advance the ledger *to tip* without updating the BFT. while let Some(block) = self.block_sync.process_next_block(current_height) { info!("Syncing the ledger to block {}...", block.height()); - self.sync_ledger_with_block_without_bft(block).await?; - // Update the current height. - current_height += 1; + // Sync the ledger with the block without BFT. + match self.sync_ledger_with_block_without_bft(block).await { + Ok(_) => { + // Update the current height if sync succeeds. + current_height += 1; + } + Err(e) => { + // Mark the current height as processed in block_sync. + self.block_sync.remove_block_response(current_height); + return Err(e); + } + } } // Sync the storage with the ledger if we should transition to the BFT sync. if current_height > max_gc_height { @@ -331,9 +340,17 @@ impl Sync { while let Some(block) = self.block_sync.process_next_block(current_height) { info!("Syncing the BFT to block {}...", block.height()); // Sync the storage with the block. - self.sync_storage_with_block(block).await?; - // Update the current height. - current_height += 1; + match self.sync_storage_with_block(block).await { + Ok(_) => { + // Update the current height if sync succeeds. + current_height += 1; + } + Err(e) => { + // Mark the current height as processed in block_sync. + self.block_sync.remove_block_response(current_height); + return Err(e); + } + } } Ok(()) } @@ -354,6 +371,8 @@ impl Sync { self_.storage.sync_height_with_block(block.height()); // Sync the round with the block. self_.storage.sync_round_with_block(block.round()); + // Mark the block height as processed in block_sync. + self_.block_sync.remove_block_response(block.height()); Ok(()) }) @@ -501,6 +520,8 @@ impl Sync { .await??; // Remove the block height from the latest block responses. latest_block_responses.remove(&block_height); + // Mark the block height as processed in block_sync. + self.block_sync.remove_block_response(block_height); } } else { debug!( diff --git a/node/sync/src/block_sync.rs b/node/sync/src/block_sync.rs index d512e7478c..0cab5f1e68 100644 --- a/node/sync/src/block_sync.rs +++ b/node/sync/src/block_sync.rs @@ -321,8 +321,20 @@ impl BlockSync { /// Returns the next block to process, if one is ready. #[inline] pub fn process_next_block(&self, next_height: u32) -> Option> { - // Try to advance the ledger with a block from the sync pool. - self.remove_block_response(next_height) + // Acquire the requests write lock. + // Note: This lock must be held across the entire scope, due to asynchronous block responses + // from multiple peers that may be received concurrently. + let requests = self.requests.write(); + + // Determine if the request is complete. + let is_request_complete = requests.get(&next_height).map(|(_, _, peer_ips)| peer_ips.is_empty()).unwrap_or(true); + + // If the request is not complete, return early. + if !is_request_complete { + return None; + } + + self.responses.read().get(&next_height).cloned() } /// Attempts to advance with blocks from the sync pool. @@ -347,20 +359,33 @@ impl BlockSync { /// Handles the block responses from the sync pool. fn try_advancing_with_block_responses(&self, mut current_height: u32) { - while let Some(block) = self.remove_block_response(current_height + 1) { + while let Some(block) = self.process_next_block(current_height + 1) { // Ensure the block height matches. if block.height() != current_height + 1 { warn!("Block height mismatch: expected {}, found {}", current_height + 1, block.height()); break; } - // Check the next block. - if let Err(error) = self.canon.check_next_block(&block) { - warn!("The next block ({}) is invalid - {error}", block.height()); - break; - } - // Attempt to advance to the next block. - if let Err(error) = self.canon.advance_to_next_block(&block) { - warn!("{error}"); + + // Try to check the next block and advance to it. + let advanced = match self.canon.check_next_block(&block) { + Ok(_) => match self.canon.advance_to_next_block(&block) { + Ok(_) => true, + Err(err) => { + warn!("{err}"); + false + } + }, + Err(err) => { + warn!("{err}"); + false + } + }; + + // Remove the block response. + self.remove_block_response(current_height + 1); + + // If advancing failed, exit the loop. + if !advanced { break; } // Update the latest height. @@ -605,32 +630,25 @@ impl BlockSync { self.request_timestamps.write().remove(&height); } - /// Removes and returns the block response for the given height, if the request is complete. - fn remove_block_response(&self, height: u32) -> Option> { + /// Removes the block response for the given height + /// This may only be called after `process_next_block`, which checked if the request for the given height was complete. + pub fn remove_block_response(&self, height: u32) { // Acquire the requests write lock. // Note: This lock must be held across the entire scope, due to asynchronous block responses // from multiple peers that may be received concurrently. let mut requests = self.requests.write(); - - // Determine if the request is complete. - let is_request_complete = requests.get(&height).map(|(_, _, peer_ips)| peer_ips.is_empty()).unwrap_or(true); - - // If the request is not complete, return early. - if !is_request_complete { - return None; - } // Remove the request entry for the given height. requests.remove(&height); // Remove the request timestamp entry for the given height. self.request_timestamps.write().remove(&height); // Remove the response entry for the given height. - self.responses.write().remove(&height) + self.responses.write().remove(&height); } /// Removes the block request for the given peer IP, if it exists. #[allow(dead_code)] fn remove_block_request_to_peer(&self, peer_ip: &SocketAddr, height: u32) { - let mut can_revoke = self.responses.read().get(&height).is_none(); + let mut can_revoke = self.process_next_block(height).is_none(); // Remove the peer IP from the request entry. If the request entry is now empty, // and the response entry for this height is also empty, then remove the request entry altogether.