Skip to content

Commit

Permalink
Merge pull request #3410 from ProvableHQ/fix-sync-duplicate-block-req…
Browse files Browse the repository at this point in the history
…uest

Update syncing logic to fix duplicate block requests
  • Loading branch information
raychu86 authored Jan 17, 2025
2 parents 25ed580 + dd29ea5 commit d9ef855
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 33 deletions.
37 changes: 29 additions & 8 deletions node/bft/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,20 @@ impl<N: Network> Sync<N> {
// Determine if we can sync the ledger without updating the BFT first.
if current_height <= max_gc_height {
// Try to advance the ledger *to tip* without updating the BFT.
while let Some(block) = self.block_sync.process_next_block(current_height) {
while let Some(block) = self.block_sync.peek_next_block(current_height) {
info!("Syncing the ledger to block {}...", block.height());
self.sync_ledger_with_block_without_bft(block).await?;
// Update the current height.
current_height += 1;
// Sync the ledger with the block without BFT.
match self.sync_ledger_with_block_without_bft(block).await {
Ok(_) => {
// Update the current height if sync succeeds.
current_height += 1;
}
Err(e) => {
// Mark the current height as processed in block_sync.
self.block_sync.remove_block_response(current_height);
return Err(e);
}
}
}
// Sync the storage with the ledger if we should transition to the BFT sync.
if current_height > max_gc_height {
Expand All @@ -329,12 +338,20 @@ impl<N: Network> Sync<N> {
}

// Try to advance the ledger with sync blocks.
while let Some(block) = self.block_sync.process_next_block(current_height) {
while let Some(block) = self.block_sync.peek_next_block(current_height) {
info!("Syncing the BFT to block {}...", block.height());
// Sync the storage with the block.
self.sync_storage_with_block(block).await?;
// Update the current height.
current_height += 1;
match self.sync_storage_with_block(block).await {
Ok(_) => {
// Update the current height if sync succeeds.
current_height += 1;
}
Err(e) => {
// Mark the current height as processed in block_sync.
self.block_sync.remove_block_response(current_height);
return Err(e);
}
}
}
Ok(())
}
Expand All @@ -355,6 +372,8 @@ impl<N: Network> Sync<N> {
self_.storage.sync_height_with_block(block.height());
// Sync the round with the block.
self_.storage.sync_round_with_block(block.round());
// Mark the block height as processed in block_sync.
self_.block_sync.remove_block_response(block.height());

Ok(())
})
Expand Down Expand Up @@ -502,6 +521,8 @@ impl<N: Network> Sync<N> {
.await??;
// Remove the block height from the latest block responses.
latest_block_responses.remove(&block_height);
// Mark the block height as processed in block_sync.
self.block_sync.remove_block_response(block_height);
}
} else {
debug!(
Expand Down
70 changes: 45 additions & 25 deletions node/sync/src/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,25 @@ impl<N: Network> BlockSync<N> {
Ok(())
}

/// Returns the next block to process, if one is ready.
/// Returns the next block for the given `next_height` if the request is complete,
/// or `None` otherwise. This does not remove the block from the `responses` map.
#[inline]
pub fn process_next_block(&self, next_height: u32) -> Option<Block<N>> {
// Try to advance the ledger with a block from the sync pool.
self.remove_block_response(next_height)
pub fn peek_next_block(&self, next_height: u32) -> Option<Block<N>> {
// Acquire the requests write lock.
// Note: This lock must be held across the entire scope, due to asynchronous block responses
// from multiple peers that may be received concurrently.
let requests = self.requests.read();

// Determine if the request is complete.
let is_request_complete =
requests.get(&next_height).map(|(_, _, peer_ips)| peer_ips.is_empty()).unwrap_or(true);

// If the request is not complete, return early.
if !is_request_complete {
return None;
}

self.responses.read().get(&next_height).cloned()
}

/// Attempts to advance with blocks from the sync pool.
Expand All @@ -351,20 +365,33 @@ impl<N: Network> BlockSync<N> {

/// Handles the block responses from the sync pool.
fn try_advancing_with_block_responses(&self, mut current_height: u32) {
while let Some(block) = self.remove_block_response(current_height + 1) {
while let Some(block) = self.peek_next_block(current_height + 1) {
// Ensure the block height matches.
if block.height() != current_height + 1 {
warn!("Block height mismatch: expected {}, found {}", current_height + 1, block.height());
break;
}
// Check the next block.
if let Err(error) = self.canon.check_next_block(&block) {
warn!("The next block ({}) is invalid - {error}", block.height());
break;
}
// Attempt to advance to the next block.
if let Err(error) = self.canon.advance_to_next_block(&block) {
warn!("{error}");

// Try to check the next block and advance to it.
let advanced = match self.canon.check_next_block(&block) {
Ok(_) => match self.canon.advance_to_next_block(&block) {
Ok(_) => true,
Err(err) => {
warn!("Failed to advance to next block ({}): {err}", block.height());
false
}
},
Err(err) => {
warn!("The next block ({}) is invalid - {err}", block.height());
false
}
};

// Remove the block response.
self.remove_block_response(current_height + 1);

// If advancing failed, exit the loop.
if !advanced {
break;
}
// Update the latest height.
Expand Down Expand Up @@ -609,32 +636,25 @@ impl<N: Network> BlockSync<N> {
self.request_timestamps.write().remove(&height);
}

/// Removes and returns the block response for the given height, if the request is complete.
fn remove_block_response(&self, height: u32) -> Option<Block<N>> {
/// Removes the block response for the given height
/// This may only be called after `peek_next_block`, which checked if the request for the given height was complete.
pub fn remove_block_response(&self, height: u32) {
// Acquire the requests write lock.
// Note: This lock must be held across the entire scope, due to asynchronous block responses
// from multiple peers that may be received concurrently.
let mut requests = self.requests.write();

// Determine if the request is complete.
let is_request_complete = requests.get(&height).map(|(_, _, peer_ips)| peer_ips.is_empty()).unwrap_or(true);

// If the request is not complete, return early.
if !is_request_complete {
return None;
}
// Remove the request entry for the given height.
requests.remove(&height);
// Remove the request timestamp entry for the given height.
self.request_timestamps.write().remove(&height);
// Remove the response entry for the given height.
self.responses.write().remove(&height)
self.responses.write().remove(&height);
}

/// Removes the block request for the given peer IP, if it exists.
#[allow(dead_code)]
fn remove_block_request_to_peer(&self, peer_ip: &SocketAddr, height: u32) {
let mut can_revoke = self.responses.read().get(&height).is_none();
let mut can_revoke = self.peek_next_block(height).is_none();

// Remove the peer IP from the request entry. If the request entry is now empty,
// and the response entry for this height is also empty, then remove the request entry altogether.
Expand Down

0 comments on commit d9ef855

Please sign in to comment.