diff --git a/node/bft/src/sync/mod.rs b/node/bft/src/sync/mod.rs index 9f4642124c..71876a0c40 100644 --- a/node/bft/src/sync/mod.rs +++ b/node/bft/src/sync/mod.rs @@ -316,9 +316,18 @@ impl Sync { // Try to advance the ledger *to tip* without updating the BFT. while let Some(block) = self.block_sync.process_next_block(current_height) { info!("Syncing the ledger to block {}...", block.height()); - self.sync_ledger_with_block_without_bft(block).await?; - // Update the current height. - current_height += 1; + // Sync the ledger with the block without BFT. + match self.sync_ledger_with_block_without_bft(block).await { + Ok(_) => { + // Update the current height if sync succeeds. + current_height += 1; + } + Err(e) => { + // Mark the current height as processed in block_sync. + self.block_sync.remove_block_response(current_height); + return Err(e); + } + } } // Sync the storage with the ledger if we should transition to the BFT sync. if current_height > max_gc_height { @@ -332,9 +341,17 @@ impl Sync { while let Some(block) = self.block_sync.process_next_block(current_height) { info!("Syncing the BFT to block {}...", block.height()); // Sync the storage with the block. - self.sync_storage_with_block(block).await?; - // Update the current height. - current_height += 1; + match self.sync_storage_with_block(block).await { + Ok(_) => { + // Update the current height if sync succeeds. + current_height += 1; + } + Err(e) => { + // Mark the current height as processed in block_sync. + self.block_sync.remove_block_response(current_height); + return Err(e); + } + } } Ok(()) } @@ -355,6 +372,8 @@ impl Sync { self_.storage.sync_height_with_block(block.height()); // Sync the round with the block. self_.storage.sync_round_with_block(block.round()); + // Mark the block height as processed in block_sync. + self_.block_sync.remove_block_response(block.height()); Ok(()) }) @@ -502,6 +521,8 @@ impl Sync { .await??; // Remove the block height from the latest block responses. latest_block_responses.remove(&block_height); + // Mark the block height as processed in block_sync. + self.block_sync.remove_block_response(block_height); } } else { debug!( diff --git a/node/sync/src/block_sync.rs b/node/sync/src/block_sync.rs index 00b405c9e9..e4a84ed9ba 100644 --- a/node/sync/src/block_sync.rs +++ b/node/sync/src/block_sync.rs @@ -325,8 +325,20 @@ impl BlockSync { /// Returns the next block to process, if one is ready. #[inline] pub fn process_next_block(&self, next_height: u32) -> Option> { - // Try to advance the ledger with a block from the sync pool. - self.remove_block_response(next_height) + // Acquire the requests write lock. + // Note: This lock must be held across the entire scope, due to asynchronous block responses + // from multiple peers that may be received concurrently. + let requests = self.requests.write(); + + // Determine if the request is complete. + let is_request_complete = requests.get(&next_height).map(|(_, _, peer_ips)| peer_ips.is_empty()).unwrap_or(true); + + // If the request is not complete, return early. + if !is_request_complete { + return None; + } + + self.responses.read().get(&next_height).cloned() } /// Attempts to advance with blocks from the sync pool. @@ -351,20 +363,33 @@ impl BlockSync { /// Handles the block responses from the sync pool. fn try_advancing_with_block_responses(&self, mut current_height: u32) { - while let Some(block) = self.remove_block_response(current_height + 1) { + while let Some(block) = self.process_next_block(current_height + 1) { // Ensure the block height matches. if block.height() != current_height + 1 { warn!("Block height mismatch: expected {}, found {}", current_height + 1, block.height()); break; } - // Check the next block. - if let Err(error) = self.canon.check_next_block(&block) { - warn!("The next block ({}) is invalid - {error}", block.height()); - break; - } - // Attempt to advance to the next block. - if let Err(error) = self.canon.advance_to_next_block(&block) { - warn!("{error}"); + + // Try to check the next block and advance to it. + let advanced = match self.canon.check_next_block(&block) { + Ok(_) => match self.canon.advance_to_next_block(&block) { + Ok(_) => true, + Err(err) => { + warn!("{err}"); + false + } + }, + Err(err) => { + warn!("{err}"); + false + } + }; + + // Remove the block response. + self.remove_block_response(current_height + 1); + + // If advancing failed, exit the loop. + if !advanced { break; } // Update the latest height. @@ -609,32 +634,25 @@ impl BlockSync { self.request_timestamps.write().remove(&height); } - /// Removes and returns the block response for the given height, if the request is complete. - fn remove_block_response(&self, height: u32) -> Option> { + /// Removes the block response for the given height + /// This may only be called after `process_next_block`, which checked if the request for the given height was complete. + pub fn remove_block_response(&self, height: u32) { // Acquire the requests write lock. // Note: This lock must be held across the entire scope, due to asynchronous block responses // from multiple peers that may be received concurrently. let mut requests = self.requests.write(); - - // Determine if the request is complete. - let is_request_complete = requests.get(&height).map(|(_, _, peer_ips)| peer_ips.is_empty()).unwrap_or(true); - - // If the request is not complete, return early. - if !is_request_complete { - return None; - } // Remove the request entry for the given height. requests.remove(&height); // Remove the request timestamp entry for the given height. self.request_timestamps.write().remove(&height); // Remove the response entry for the given height. - self.responses.write().remove(&height) + self.responses.write().remove(&height); } /// Removes the block request for the given peer IP, if it exists. #[allow(dead_code)] fn remove_block_request_to_peer(&self, peer_ip: &SocketAddr, height: u32) { - let mut can_revoke = self.responses.read().get(&height).is_none(); + let mut can_revoke = self.process_next_block(height).is_none(); // Remove the peer IP from the request entry. If the request entry is now empty, // and the response entry for this height is also empty, then remove the request entry altogether.