From 88d557425e21853159c342dda0d9a43fc6f4dca9 Mon Sep 17 00:00:00 2001 From: Konstantin Pandl <16907747+kpandl@users.noreply.github.com> Date: Thu, 3 Oct 2024 15:01:06 +0200 Subject: [PATCH 1/5] update syncing logic to fix duplicate block requests --- node/bft/src/sync/mod.rs | 33 +++++++++++++++---- node/sync/src/block_sync.rs | 64 ++++++++++++++++++++++++------------- 2 files changed, 68 insertions(+), 29 deletions(-) diff --git a/node/bft/src/sync/mod.rs b/node/bft/src/sync/mod.rs index 9f4642124c..71876a0c40 100644 --- a/node/bft/src/sync/mod.rs +++ b/node/bft/src/sync/mod.rs @@ -316,9 +316,18 @@ impl Sync { // Try to advance the ledger *to tip* without updating the BFT. while let Some(block) = self.block_sync.process_next_block(current_height) { info!("Syncing the ledger to block {}...", block.height()); - self.sync_ledger_with_block_without_bft(block).await?; - // Update the current height. - current_height += 1; + // Sync the ledger with the block without BFT. + match self.sync_ledger_with_block_without_bft(block).await { + Ok(_) => { + // Update the current height if sync succeeds. + current_height += 1; + } + Err(e) => { + // Mark the current height as processed in block_sync. + self.block_sync.remove_block_response(current_height); + return Err(e); + } + } } // Sync the storage with the ledger if we should transition to the BFT sync. if current_height > max_gc_height { @@ -332,9 +341,17 @@ impl Sync { while let Some(block) = self.block_sync.process_next_block(current_height) { info!("Syncing the BFT to block {}...", block.height()); // Sync the storage with the block. - self.sync_storage_with_block(block).await?; - // Update the current height. - current_height += 1; + match self.sync_storage_with_block(block).await { + Ok(_) => { + // Update the current height if sync succeeds. + current_height += 1; + } + Err(e) => { + // Mark the current height as processed in block_sync. + self.block_sync.remove_block_response(current_height); + return Err(e); + } + } } Ok(()) } @@ -355,6 +372,8 @@ impl Sync { self_.storage.sync_height_with_block(block.height()); // Sync the round with the block. self_.storage.sync_round_with_block(block.round()); + // Mark the block height as processed in block_sync. + self_.block_sync.remove_block_response(block.height()); Ok(()) }) @@ -502,6 +521,8 @@ impl Sync { .await??; // Remove the block height from the latest block responses. latest_block_responses.remove(&block_height); + // Mark the block height as processed in block_sync. + self.block_sync.remove_block_response(block_height); } } else { debug!( diff --git a/node/sync/src/block_sync.rs b/node/sync/src/block_sync.rs index 00b405c9e9..e4a84ed9ba 100644 --- a/node/sync/src/block_sync.rs +++ b/node/sync/src/block_sync.rs @@ -325,8 +325,20 @@ impl BlockSync { /// Returns the next block to process, if one is ready. #[inline] pub fn process_next_block(&self, next_height: u32) -> Option> { - // Try to advance the ledger with a block from the sync pool. - self.remove_block_response(next_height) + // Acquire the requests write lock. + // Note: This lock must be held across the entire scope, due to asynchronous block responses + // from multiple peers that may be received concurrently. + let requests = self.requests.write(); + + // Determine if the request is complete. + let is_request_complete = requests.get(&next_height).map(|(_, _, peer_ips)| peer_ips.is_empty()).unwrap_or(true); + + // If the request is not complete, return early. + if !is_request_complete { + return None; + } + + self.responses.read().get(&next_height).cloned() } /// Attempts to advance with blocks from the sync pool. @@ -351,20 +363,33 @@ impl BlockSync { /// Handles the block responses from the sync pool. fn try_advancing_with_block_responses(&self, mut current_height: u32) { - while let Some(block) = self.remove_block_response(current_height + 1) { + while let Some(block) = self.process_next_block(current_height + 1) { // Ensure the block height matches. if block.height() != current_height + 1 { warn!("Block height mismatch: expected {}, found {}", current_height + 1, block.height()); break; } - // Check the next block. - if let Err(error) = self.canon.check_next_block(&block) { - warn!("The next block ({}) is invalid - {error}", block.height()); - break; - } - // Attempt to advance to the next block. - if let Err(error) = self.canon.advance_to_next_block(&block) { - warn!("{error}"); + + // Try to check the next block and advance to it. + let advanced = match self.canon.check_next_block(&block) { + Ok(_) => match self.canon.advance_to_next_block(&block) { + Ok(_) => true, + Err(err) => { + warn!("{err}"); + false + } + }, + Err(err) => { + warn!("{err}"); + false + } + }; + + // Remove the block response. + self.remove_block_response(current_height + 1); + + // If advancing failed, exit the loop. + if !advanced { break; } // Update the latest height. @@ -609,32 +634,25 @@ impl BlockSync { self.request_timestamps.write().remove(&height); } - /// Removes and returns the block response for the given height, if the request is complete. - fn remove_block_response(&self, height: u32) -> Option> { + /// Removes the block response for the given height + /// This may only be called after `process_next_block`, which checked if the request for the given height was complete. + pub fn remove_block_response(&self, height: u32) { // Acquire the requests write lock. // Note: This lock must be held across the entire scope, due to asynchronous block responses // from multiple peers that may be received concurrently. let mut requests = self.requests.write(); - - // Determine if the request is complete. - let is_request_complete = requests.get(&height).map(|(_, _, peer_ips)| peer_ips.is_empty()).unwrap_or(true); - - // If the request is not complete, return early. - if !is_request_complete { - return None; - } // Remove the request entry for the given height. requests.remove(&height); // Remove the request timestamp entry for the given height. self.request_timestamps.write().remove(&height); // Remove the response entry for the given height. - self.responses.write().remove(&height) + self.responses.write().remove(&height); } /// Removes the block request for the given peer IP, if it exists. #[allow(dead_code)] fn remove_block_request_to_peer(&self, peer_ip: &SocketAddr, height: u32) { - let mut can_revoke = self.responses.read().get(&height).is_none(); + let mut can_revoke = self.process_next_block(height).is_none(); // Remove the peer IP from the request entry. If the request entry is now empty, // and the response entry for this height is also empty, then remove the request entry altogether. From 9f7c47fba12e0599d5d346299cfe1c606d828210 Mon Sep 17 00:00:00 2001 From: Konstantin Pandl Date: Tue, 17 Dec 2024 09:48:37 +0100 Subject: [PATCH 2/5] fmt --- node/bft/src/sync/mod.rs | 10 +++++----- node/sync/src/block_sync.rs | 9 +++++---- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/node/bft/src/sync/mod.rs b/node/bft/src/sync/mod.rs index 71876a0c40..0e008d5469 100644 --- a/node/bft/src/sync/mod.rs +++ b/node/bft/src/sync/mod.rs @@ -323,9 +323,9 @@ impl Sync { current_height += 1; } Err(e) => { - // Mark the current height as processed in block_sync. + // Mark the current height as processed in block_sync. self.block_sync.remove_block_response(current_height); - return Err(e); + return Err(e); } } } @@ -347,7 +347,7 @@ impl Sync { current_height += 1; } Err(e) => { - // Mark the current height as processed in block_sync. + // Mark the current height as processed in block_sync. self.block_sync.remove_block_response(current_height); return Err(e); } @@ -372,7 +372,7 @@ impl Sync { self_.storage.sync_height_with_block(block.height()); // Sync the round with the block. self_.storage.sync_round_with_block(block.round()); - // Mark the block height as processed in block_sync. + // Mark the block height as processed in block_sync. self_.block_sync.remove_block_response(block.height()); Ok(()) @@ -521,7 +521,7 @@ impl Sync { .await??; // Remove the block height from the latest block responses. latest_block_responses.remove(&block_height); - // Mark the block height as processed in block_sync. + // Mark the block height as processed in block_sync. self.block_sync.remove_block_response(block_height); } } else { diff --git a/node/sync/src/block_sync.rs b/node/sync/src/block_sync.rs index e4a84ed9ba..fbe25f6d0a 100644 --- a/node/sync/src/block_sync.rs +++ b/node/sync/src/block_sync.rs @@ -331,7 +331,8 @@ impl BlockSync { let requests = self.requests.write(); // Determine if the request is complete. - let is_request_complete = requests.get(&next_height).map(|(_, _, peer_ips)| peer_ips.is_empty()).unwrap_or(true); + let is_request_complete = + requests.get(&next_height).map(|(_, _, peer_ips)| peer_ips.is_empty()).unwrap_or(true); // If the request is not complete, return early. if !is_request_complete { @@ -369,7 +370,7 @@ impl BlockSync { warn!("Block height mismatch: expected {}, found {}", current_height + 1, block.height()); break; } - + // Try to check the next block and advance to it. let advanced = match self.canon.check_next_block(&block) { Ok(_) => match self.canon.advance_to_next_block(&block) { @@ -384,10 +385,10 @@ impl BlockSync { false } }; - + // Remove the block response. self.remove_block_response(current_height + 1); - + // If advancing failed, exit the loop. if !advanced { break; From 88c5c6a16eb810b9904127bc7fd66b16b676876a Mon Sep 17 00:00:00 2001 From: Konstantin Pandl Date: Wed, 18 Dec 2024 16:16:37 +0100 Subject: [PATCH 3/5] Update node/sync/src/block_sync.rs Co-authored-by: ljedrz <3750347+ljedrz@users.noreply.github.com> --- node/sync/src/block_sync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/sync/src/block_sync.rs b/node/sync/src/block_sync.rs index fbe25f6d0a..f1a91a2927 100644 --- a/node/sync/src/block_sync.rs +++ b/node/sync/src/block_sync.rs @@ -328,7 +328,7 @@ impl BlockSync { // Acquire the requests write lock. // Note: This lock must be held across the entire scope, due to asynchronous block responses // from multiple peers that may be received concurrently. - let requests = self.requests.write(); + let requests = self.requests.read(); // Determine if the request is complete. let is_request_complete = From d6e4f737e99361dd2ffdff6c8abf5621689545ca Mon Sep 17 00:00:00 2001 From: Konstantin Pandl Date: Wed, 18 Dec 2024 17:19:05 +0100 Subject: [PATCH 4/5] more expressive warnings --- node/sync/src/block_sync.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/sync/src/block_sync.rs b/node/sync/src/block_sync.rs index f1a91a2927..330373777b 100644 --- a/node/sync/src/block_sync.rs +++ b/node/sync/src/block_sync.rs @@ -376,12 +376,12 @@ impl BlockSync { Ok(_) => match self.canon.advance_to_next_block(&block) { Ok(_) => true, Err(err) => { - warn!("{err}"); + warn!("Failed to advance to next block ({}): {err}", block.height()); false } }, Err(err) => { - warn!("{err}"); + warn!("The next block ({}) is invalid - {err}", block.height()); false } }; From 778961a1b61ade1c28bfa9b34b3c127418a3d170 Mon Sep 17 00:00:00 2001 From: Konstantin Pandl Date: Thu, 2 Jan 2025 12:17:20 +0200 Subject: [PATCH 5/5] process_next_block -> peek_next_block --- node/bft/src/sync/mod.rs | 4 ++-- node/sync/src/block_sync.rs | 11 ++++++----- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/node/bft/src/sync/mod.rs b/node/bft/src/sync/mod.rs index 0e008d5469..a99ef26999 100644 --- a/node/bft/src/sync/mod.rs +++ b/node/bft/src/sync/mod.rs @@ -314,7 +314,7 @@ impl Sync { // Determine if we can sync the ledger without updating the BFT first. if current_height <= max_gc_height { // Try to advance the ledger *to tip* without updating the BFT. - while let Some(block) = self.block_sync.process_next_block(current_height) { + while let Some(block) = self.block_sync.peek_next_block(current_height) { info!("Syncing the ledger to block {}...", block.height()); // Sync the ledger with the block without BFT. match self.sync_ledger_with_block_without_bft(block).await { @@ -338,7 +338,7 @@ impl Sync { } // Try to advance the ledger with sync blocks. - while let Some(block) = self.block_sync.process_next_block(current_height) { + while let Some(block) = self.block_sync.peek_next_block(current_height) { info!("Syncing the BFT to block {}...", block.height()); // Sync the storage with the block. match self.sync_storage_with_block(block).await { diff --git a/node/sync/src/block_sync.rs b/node/sync/src/block_sync.rs index 330373777b..dd8663485a 100644 --- a/node/sync/src/block_sync.rs +++ b/node/sync/src/block_sync.rs @@ -322,9 +322,10 @@ impl BlockSync { Ok(()) } - /// Returns the next block to process, if one is ready. + /// Returns the next block for the given `next_height` if the request is complete, + /// or `None` otherwise. This does not remove the block from the `responses` map. #[inline] - pub fn process_next_block(&self, next_height: u32) -> Option> { + pub fn peek_next_block(&self, next_height: u32) -> Option> { // Acquire the requests write lock. // Note: This lock must be held across the entire scope, due to asynchronous block responses // from multiple peers that may be received concurrently. @@ -364,7 +365,7 @@ impl BlockSync { /// Handles the block responses from the sync pool. fn try_advancing_with_block_responses(&self, mut current_height: u32) { - while let Some(block) = self.process_next_block(current_height + 1) { + while let Some(block) = self.peek_next_block(current_height + 1) { // Ensure the block height matches. if block.height() != current_height + 1 { warn!("Block height mismatch: expected {}, found {}", current_height + 1, block.height()); @@ -636,7 +637,7 @@ impl BlockSync { } /// Removes the block response for the given height - /// This may only be called after `process_next_block`, which checked if the request for the given height was complete. + /// This may only be called after `peek_next_block`, which checked if the request for the given height was complete. pub fn remove_block_response(&self, height: u32) { // Acquire the requests write lock. // Note: This lock must be held across the entire scope, due to asynchronous block responses @@ -653,7 +654,7 @@ impl BlockSync { /// Removes the block request for the given peer IP, if it exists. #[allow(dead_code)] fn remove_block_request_to_peer(&self, peer_ip: &SocketAddr, height: u32) { - let mut can_revoke = self.process_next_block(height).is_none(); + let mut can_revoke = self.peek_next_block(height).is_none(); // Remove the peer IP from the request entry. If the request entry is now empty, // and the response entry for this height is also empty, then remove the request entry altogether.