diff --git a/Cargo.lock b/Cargo.lock index 026c828f..16778682 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -735,7 +735,7 @@ dependencies = [ "bitflags 2.6.0", "cexpr", "clang-sys", - "itertools 0.12.1", + "itertools 0.10.5", "lazy_static", "lazycell", "proc-macro2 1.0.92", @@ -4130,15 +4130,6 @@ dependencies = [ "either", ] -[[package]] -name = "itertools" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" -dependencies = [ - "either", -] - [[package]] name = "itertools" version = "0.13.0" @@ -5562,6 +5553,7 @@ dependencies = [ "eth2_libp2p", "features", "fork_choice_control", + "fork_choice_store", "futures", "genesis", "helper_functions", @@ -5569,6 +5561,7 @@ dependencies = [ "itertools 0.13.0", "log", "logging", + "lru", "operation_pools", "prometheus-client", "prometheus_metrics", @@ -5585,6 +5578,7 @@ dependencies = [ "thiserror 1.0.69", "tokio", "tokio-stream", + "transition_functions", "tynm", "typenum", "types", diff --git a/benches/benches/fork_choice_store.rs b/benches/benches/fork_choice_store.rs index 6d63e200..404fe825 100644 --- a/benches/benches/fork_choice_store.rs +++ b/benches/benches/fork_choice_store.rs @@ -69,6 +69,7 @@ impl Criterion { anchor_block, anchor_state, false, + false, ); for slot in (anchor_slot + 1)..=last_attestation_slot { diff --git a/database/src/lib.rs b/database/src/lib.rs index 575fd853..3568591a 100644 --- a/database/src/lib.rs +++ b/database/src/lib.rs @@ -20,6 +20,41 @@ use unwrap_none::UnwrapNone as _; const GROWTH_STEP: ByteSize = ByteSize::mib(256); const MAX_NAMED_DATABASES: usize = 10; +#[derive(Clone, Copy)] +pub enum DatabaseMode { + ReadOnly, + ReadWrite, +} + +impl DatabaseMode { + #[must_use] + pub const fn is_read_only(self) -> bool { + matches!(self, Self::ReadOnly) + } + + #[must_use] + pub const fn mode_permissions(self) -> u16 { + match self { + // + // The UNIX permissions to set on created files. Zero value means to open existing, but do not create. + Self::ReadOnly => 0, + Self::ReadWrite => 0o600, + } + } + + #[must_use] + #[cfg(not(target_os = "macos"))] + pub fn permissions(self) -> u32 { + self.mode_permissions().into() + } + + #[must_use] + #[cfg(target_os = "macos")] + pub const fn permissions(self) -> u16 { + self.mode_permissions() + } +} + pub struct Database(DatabaseKind); impl Database { @@ -27,14 +62,14 @@ impl Database { name: &str, directory: impl AsRef, max_size: ByteSize, - read_only: bool, + mode: DatabaseMode, ) -> Result { // If a database with the legacy name exists, keep using it. // Otherwise, create a new database with the specified name. // This check will not force existing users to resync. let legacy_name = directory.as_ref().to_str().ok_or(Error)?; - if !read_only { + if !mode.is_read_only() { fs_err::create_dir_all(&directory)?; } @@ -48,14 +83,14 @@ impl Database { shrink_threshold: None, page_size: None, }) - .open_with_permissions(directory.as_ref(), 0o600)?; + .open_with_permissions(directory.as_ref(), mode.permissions())?; let transaction = environment.begin_rw_txn()?; let existing_db = transaction.open_db(Some(legacy_name)); let database_name = if existing_db.is_err() { info!("database: {legacy_name} with name {name}"); - if !read_only { + if !mode.is_read_only() { transaction.create_db(Some(name), DatabaseFlags::default())?; } @@ -753,7 +788,13 @@ mod tests { } fn build_persistent_database() -> Result { - let database = Database::persistent("test_db", TempDir::new()?, ByteSize::mib(1), false)?; + let database = Database::persistent( + "test_db", + TempDir::new()?, + ByteSize::mib(1), + DatabaseMode::ReadWrite, + )?; + populate_database(&database)?; Ok(database) } diff --git a/features/src/lib.rs b/features/src/lib.rs index 345ada8b..91f8415e 100644 --- a/features/src/lib.rs +++ b/features/src/lib.rs @@ -41,7 +41,6 @@ pub enum Feature { SubscribeToAllAttestationSubnets, SubscribeToAllSyncCommitteeSubnets, TrackMetrics, - TrustBackSyncBlocks, // By default we fully validate objects produced by the current instance of the application. // This costs some resources but may help in case of bugs. TrustOwnAttestationSignatures, diff --git a/fork_choice_control/src/controller.rs b/fork_choice_control/src/controller.rs index 12226b38..7825d6bd 100644 --- a/fork_choice_control/src/controller.rs +++ b/fork_choice_control/src/controller.rs @@ -8,7 +8,7 @@ // The downside is that submitting the same object multiple times in quick succession will result in // it being processed multiple times in parallel redundantly. -use core::panic::AssertUnwindSafe; +use core::{panic::AssertUnwindSafe, sync::atomic::AtomicBool}; use std::{ sync::{mpsc::Sender, Arc}, thread::{Builder, JoinHandle}, @@ -106,6 +106,7 @@ where validator_tx: impl UnboundedSink>, storage: Arc>, unfinalized_blocks: impl DoubleEndedIterator>>>, + finished_back_sync: bool, ) -> Result<(Arc, MutatorHandle)> { let finished_initial_forward_sync = anchor_block.message().slot() >= tick.slot; @@ -115,6 +116,7 @@ where anchor_block, anchor_state, finished_initial_forward_sync, + finished_back_sync, ); store.apply_tick(tick)?; @@ -198,6 +200,14 @@ where .send(&self.mutator_tx) } + pub fn on_back_sync_status(&self, is_back_synced: bool) { + MutatorMessage::BackSyncStatus { + wait_group: self.owned_wait_group(), + is_back_synced, + } + .send(&self.mutator_tx) + } + pub fn on_gossip_block(&self, block: Arc>, gossip_id: GossipId) { self.spawn_block_task(block, BlockOrigin::Gossip(gossip_id)) } @@ -435,6 +445,13 @@ where }) } + pub fn store_back_sync_blob_sidecars( + &self, + blob_sidecars: impl IntoIterator>>, + ) -> Result<()> { + self.storage.store_back_sync_blob_sidecars(blob_sidecars) + } + pub fn store_back_sync_blocks( &self, blocks: impl IntoIterator>>, @@ -447,9 +464,14 @@ where start_slot: Slot, end_slot: Slot, anchor_checkpoint_provider: &AnchorCheckpointProvider

, + is_exiting: &Arc, ) -> Result<()> { - self.storage - .archive_back_sync_states(start_slot, end_slot, anchor_checkpoint_provider) + self.storage.archive_back_sync_states( + start_slot, + end_slot, + anchor_checkpoint_provider, + is_exiting, + ) } fn spawn_blob_sidecar_task( diff --git a/fork_choice_control/src/lib.rs b/fork_choice_control/src/lib.rs index 508a601b..542ae359 100644 --- a/fork_choice_control/src/lib.rs +++ b/fork_choice_control/src/lib.rs @@ -21,14 +21,16 @@ pub use crate::{ AttestationVerifierMessage, P2pMessage, PoolMessage, SubnetMessage, SyncMessage, ValidatorMessage, }, - misc::{MutatorRejectionReason, VerifyAggregateAndProofResult, VerifyAttestationResult}, + misc::{ + MutatorRejectionReason, StorageMode, VerifyAggregateAndProofResult, VerifyAttestationResult, + }, queries::{BlockWithRoot, ForkChoiceContext, ForkTip, Snapshot}, specialized::{AdHocBenchController, BenchController}, storage::{ - BlobSidecarByBlobId, BlockCheckpoint, BlockRootBySlot, FinalizedBlockByRoot, PrefixableKey, - SlotBlobId, SlotByStateRoot, StateByBlockRoot, StateCheckpoint, UnfinalizedBlockByRoot, + get, save, BlobSidecarByBlobId, BlockCheckpoint, BlockRootBySlot, FinalizedBlockByRoot, + PrefixableKey, SlotBlobId, SlotByStateRoot, StateByBlockRoot, StateCheckpoint, + StateLoadStrategy, Storage, UnfinalizedBlockByRoot, DEFAULT_ARCHIVAL_EPOCH_INTERVAL, }, - storage::{StateLoadStrategy, Storage, DEFAULT_ARCHIVAL_EPOCH_INTERVAL}, storage_tool::{export_state_and_blocks, replay_blocks}, wait::Wait, }; diff --git a/fork_choice_control/src/messages.rs b/fork_choice_control/src/messages.rs index f37b7160..5d4e1566 100644 --- a/fork_choice_control/src/messages.rs +++ b/fork_choice_control/src/messages.rs @@ -67,6 +67,10 @@ pub enum MutatorMessage { wait_group: W, tick: Tick, }, + BackSyncStatus { + wait_group: W, + is_back_synced: bool, + }, Block { wait_group: W, result: Result>, diff --git a/fork_choice_control/src/misc.rs b/fork_choice_control/src/misc.rs index 39d5323d..3db40b9c 100644 --- a/fork_choice_control/src/misc.rs +++ b/fork_choice_control/src/misc.rs @@ -120,3 +120,22 @@ pub enum MutatorRejectionReason { InvalidBlock, InvalidBlobSidecar, } + +#[derive(Clone, Copy, Debug)] +pub enum StorageMode { + Prune, + Standard, + Archive, +} + +impl StorageMode { + #[must_use] + pub const fn is_prune(self) -> bool { + matches!(self, Self::Prune) + } + + #[must_use] + pub const fn is_archive(self) -> bool { + matches!(self, Self::Archive) + } +} diff --git a/fork_choice_control/src/mutator.rs b/fork_choice_control/src/mutator.rs index 9a448d98..73302447 100644 --- a/fork_choice_control/src/mutator.rs +++ b/fork_choice_control/src/mutator.rs @@ -190,6 +190,10 @@ where .expect("sender in Controller is not dropped until mutator thread exits") { MutatorMessage::Tick { wait_group, tick } => self.handle_tick(&wait_group, tick)?, + MutatorMessage::BackSyncStatus { + wait_group, + is_back_synced, + } => self.handle_back_sync_status(wait_group, is_back_synced), MutatorMessage::Block { wait_group, result, @@ -439,7 +443,10 @@ where self.spawn_preprocess_head_state_for_next_slot_task(); } - if self.store.is_forward_synced() && misc::slots_since_epoch_start::

(tick.slot) == 0 { + if self.store.is_forward_synced() + && self.store.is_back_synced() + && misc::slots_since_epoch_start::

(tick.slot) == 0 + { if tick.kind == TickKind::AttestFourth { self.prune_old_records()?; } @@ -467,6 +474,15 @@ where Ok(()) } + fn handle_back_sync_status(&mut self, wait_group: W, is_back_synced: bool) { + if self.store.is_back_synced() != is_back_synced { + self.store_mut().set_back_synced(is_back_synced); + self.update_store_snapshot(); + } + + drop(wait_group); + } + #[expect(clippy::too_many_lines)] fn handle_block( &mut self, @@ -1636,13 +1652,15 @@ where self.event_channels .send_blob_sidecar_event(block_root, blob_sidecar); - self.spawn(PersistBlobSidecarsTask { - store_snapshot: self.owned_store(), - storage: self.storage.clone_arc(), - mutator_tx: self.owned_mutator_tx(), - wait_group: wait_group.clone(), - metrics: self.metrics.clone(), - }); + if !self.storage.prune_storage_enabled() { + self.spawn(PersistBlobSidecarsTask { + store_snapshot: self.owned_store(), + storage: self.storage.clone_arc(), + mutator_tx: self.owned_mutator_tx(), + wait_group: wait_group.clone(), + metrics: self.metrics.clone(), + }); + } self.handle_potential_head_change(wait_group, &old_head, head_was_optimistic); } @@ -2337,6 +2355,10 @@ where } fn prune_old_records(&self) -> Result<()> { + if self.storage.archive_storage_enabled() { + return Ok(()); + } + let storage = self.storage.clone_arc(); let blobs_up_to_epoch = self.store.min_checked_data_availability_epoch(); let blobs_up_to_slot = misc::compute_start_slot_at_epoch::

(blobs_up_to_epoch); @@ -2376,7 +2398,9 @@ where match storage.prune_old_state_roots(blocks_up_to_slot) { Ok(()) => { - debug!("pruned old state roots from storage up to slot {blocks_up_to_slot}"); + debug!( + "pruned old state roots from storage up to slot {blocks_up_to_slot}" + ); } Err(error) => { error!("pruning old state roots from storage failed: {error:?}") diff --git a/fork_choice_control/src/queries.rs b/fork_choice_control/src/queries.rs index bac7e082..4d6878a5 100644 --- a/fork_choice_control/src/queries.rs +++ b/fork_choice_control/src/queries.rs @@ -6,7 +6,8 @@ use arc_swap::Guard; use eth2_libp2p::GossipId; use execution_engine::ExecutionEngine; use fork_choice_store::{ - AggregateAndProofOrigin, AttestationItem, ChainLink, StateCacheProcessor, Store, + AggregateAndProofOrigin, AttestationItem, BlobSidecarAction, BlobSidecarOrigin, ChainLink, + StateCacheProcessor, Store, }; use helper_functions::misc; use itertools::Itertools as _; @@ -272,6 +273,11 @@ where } } + #[must_use] + pub fn is_back_synced(&self) -> bool { + self.store_snapshot().is_back_synced() + } + #[must_use] pub fn is_forward_synced(&self) -> bool { self.store_snapshot().is_forward_synced() @@ -562,6 +568,23 @@ where pub fn min_checked_data_availability_epoch(&self) -> Epoch { self.store_snapshot().min_checked_data_availability_epoch() } + + pub fn validate_blob_sidecar_with_state( + &self, + blob_sidecar: Arc>, + block_seen: bool, + origin: &BlobSidecarOrigin, + parent_fn: impl FnOnce() -> Option<(Arc>, PayloadStatus)>, + state_fn: impl FnOnce() -> Result>>, + ) -> Result> { + self.store_snapshot().validate_blob_sidecar_with_state( + blob_sidecar, + block_seen, + origin, + parent_fn, + state_fn, + ) + } } #[cfg(test)] @@ -761,6 +784,11 @@ impl Snapshot<'_, P> { .try_state_at_slot(&self.store_snapshot, root, slot) } + #[must_use] + pub fn is_back_synced(&self) -> bool { + self.store_snapshot.is_back_synced() + } + #[must_use] pub fn finalized_epoch(&self) -> Epoch { self.store_snapshot.finalized_epoch() diff --git a/fork_choice_control/src/specialized.rs b/fork_choice_control/src/specialized.rs index 2b033e95..ec037c1e 100644 --- a/fork_choice_control/src/specialized.rs +++ b/fork_choice_control/src/specialized.rs @@ -23,6 +23,7 @@ use crate::{ messages::{AttestationVerifierMessage, P2pMessage}, storage::{Storage, DEFAULT_ARCHIVAL_EPOCH_INTERVAL}, unbounded_sink::UnboundedSink, + StorageMode, }; #[cfg(test)] @@ -102,7 +103,7 @@ where chain_config.clone_arc(), Database::in_memory(), DEFAULT_ARCHIVAL_EPOCH_INTERVAL, - false, + StorageMode::Standard, )); let event_channels = Arc::new(EventChannels::default()); @@ -124,6 +125,7 @@ where futures::sink::drain(), storage, core::iter::empty(), + true, ) .expect("Controller::new should not fail in tests and benchmarks") } diff --git a/fork_choice_control/src/storage.rs b/fork_choice_control/src/storage.rs index 3fbf78ab..3be7a8a6 100644 --- a/fork_choice_control/src/storage.rs +++ b/fork_choice_control/src/storage.rs @@ -12,7 +12,7 @@ use itertools::Itertools as _; use log::{debug, info, warn}; use nonzero_ext::nonzero; use reqwest::Client; -use ssz::{Ssz, SszRead, SszReadDefault as _, SszWrite}; +use ssz::{Ssz, SszRead, SszReadDefault, SszWrite}; use std_ext::ArcExt as _; use thiserror::Error; use transition_functions::combined; @@ -33,7 +33,7 @@ use types::{ traits::{BeaconState as _, SignedBeaconBlock as _}, }; -use crate::checkpoint_sync; +use crate::{checkpoint_sync, StorageMode}; pub const DEFAULT_ARCHIVAL_EPOCH_INTERVAL: NonZeroU64 = nonzero!(32_u64); @@ -57,7 +57,7 @@ pub struct Storage

{ config: Arc, pub(crate) database: Database, pub(crate) archival_epoch_interval: NonZeroU64, - prune_storage: bool, + storage_mode: StorageMode, phantom: PhantomData

, } @@ -67,13 +67,13 @@ impl Storage

{ config: Arc, database: Database, archival_epoch_interval: NonZeroU64, - prune_storage: bool, + storage_mode: StorageMode, ) -> Self { Self { config, database, archival_epoch_interval, - prune_storage, + storage_mode, phantom: PhantomData, } } @@ -83,6 +83,16 @@ impl Storage

{ &self.config } + #[must_use] + pub const fn archive_storage_enabled(&self) -> bool { + self.storage_mode.is_archive() + } + + #[must_use] + pub const fn prune_storage_enabled(&self) -> bool { + self.storage_mode.is_prune() + } + #[expect(clippy::too_many_lines)] pub async fn load( &self, @@ -254,7 +264,7 @@ impl Storage

{ let state = chain_link.state(store); let state_slot = chain_link.slot(); - if !self.prune_storage { + if !self.prune_storage_enabled() { if finalized { slots.finalized.push(state_slot); batch.push(serialize(FinalizedBlockByRoot(block_root), block)?); @@ -267,7 +277,7 @@ impl Storage

{ } if finalized { - if !self.prune_storage { + if !self.prune_storage_enabled() { batch.push(serialize( SlotByStateRoot(block.message().state_root()), state_slot, @@ -300,7 +310,7 @@ impl Storage

{ } } - if !(archival_state_appended || self.prune_storage) { + if !(archival_state_appended || self.prune_storage_enabled()) { let state_epoch = Self::epoch_at_slot(state_slot); let append_state = misc::is_epoch_start::

(state_slot) && state_epoch.is_multiple_of(self.archival_epoch_interval); @@ -827,7 +837,7 @@ impl Storage

{ itertools::process_results(results, |pairs| { pairs - .filter(|(key_bytes, _)| SlotBlobId::has_prefix(key_bytes)) + .take_while(|(key_bytes, _)| SlotBlobId::has_prefix(key_bytes)) .count() }) } @@ -1036,13 +1046,34 @@ pub enum Error { IncorrectPrefix { bytes: Vec }, } +pub fn save(database: &Database, key: impl Display, value: impl SszWrite) -> Result<()> { + database.put(serialize_key(key), serialize_value(value)?) +} + +pub fn get(database: &Database, key: impl Display) -> Result> { + database + .get(serialize_key(key))? + .map(V::from_ssz_default) + .transpose() + .map_err(Into::into) +} + +fn serialize_key(key: impl Display) -> String { + key.to_string() +} + +fn serialize_value(value: impl SszWrite) -> Result> { + value.to_ssz().map_err(Into::into) +} + pub fn serialize(key: impl Display, value: impl SszWrite) -> Result<(String, Vec)> { - Ok((key.to_string(), value.to_ssz()?)) + Ok((serialize_key(key), serialize_value(value)?)) } #[cfg(test)] mod tests { use bytesize::ByteSize; + use database::DatabaseMode; use tempfile::TempDir; use types::{ phase0::containers::SignedBeaconBlock as Phase0SignedBeaconBlock, preset::Mainnet, @@ -1052,7 +1083,13 @@ mod tests { #[test] fn test_prune_old_blocks_and_states() -> Result<()> { - let database = Database::persistent("test_db", TempDir::new()?, ByteSize::mib(10), false)?; + let database = Database::persistent( + "test_db", + TempDir::new()?, + ByteSize::mib(10), + DatabaseMode::ReadWrite, + )?; + let block = SignedBeaconBlock::::Phase0(Phase0SignedBeaconBlock::default()); database.put_batch(vec![ @@ -1084,7 +1121,7 @@ mod tests { Arc::new(Config::mainnet()), database, nonzero!(64_u64), - true, + StorageMode::Standard, ); assert_eq!(storage.finalized_block_count()?, 2); @@ -1111,13 +1148,18 @@ mod tests { #[test] #[expect(clippy::similar_names)] fn test_prune_old_blob_sidecars() -> Result<()> { - let database = Database::persistent("test_db", TempDir::new()?, ByteSize::mib(10), false)?; + let database = Database::persistent( + "test_db", + TempDir::new()?, + ByteSize::mib(10), + DatabaseMode::ReadWrite, + )?; let storage = Storage::::new( Arc::new(Config::mainnet()), database, nonzero!(64_u64), - true, + StorageMode::Standard, ); let blob_id_0 = BlobIdentifier { diff --git a/fork_choice_control/src/storage_back_sync.rs b/fork_choice_control/src/storage_back_sync.rs index 28681161..5fea5ac9 100644 --- a/fork_choice_control/src/storage_back_sync.rs +++ b/fork_choice_control/src/storage_back_sync.rs @@ -1,16 +1,18 @@ +use core::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use anyhow::Result; +use anyhow::{bail, Error as AnyhowError, Result}; use arithmetic::U64Ext as _; -use features::Feature; +use database::Database; use genesis::AnchorCheckpointProvider; use helper_functions::misc; -use log::{info, warn}; +use log::{debug, info, warn}; use ssz::SszHash as _; use std_ext::ArcExt as _; use transition_functions::combined; use types::{ combined::SignedBeaconBlock, + deneb::containers::BlobSidecar, nonstandard::{FinalizedCheckpoint, WithOrigin}, phase0::primitives::Slot, preset::Preset, @@ -19,17 +21,27 @@ use types::{ use crate::{ storage::{ - serialize, BlockRootBySlot, Error, FinalizedBlockByRoot, SlotByStateRoot, StateByBlockRoot, + get, serialize, BlockRootBySlot, Error, FinalizedBlockByRoot, SlotByStateRoot, + StateByBlockRoot, }, Storage, }; +const ARCHIVER_CHECKPOINT_KEY: &str = "carchiver"; + +// Retain archival data in memory until the number of ready beacon states +// reaches `ARCHIVED_STATES_BEFORE_FLUSH`. This approach minimizes unnecessary +// transactions and significantly reduces memory usage during the archiving +// of back-synced data. +const ARCHIVED_STATES_BEFORE_FLUSH: u64 = 5; + impl Storage

{ pub(crate) fn archive_back_sync_states( &self, - start_slot: Slot, + mut start_slot: Slot, end_slot: Slot, anchor_checkpoint_provider: &AnchorCheckpointProvider

, + is_exiting: &Arc, ) -> Result<()> { let WithOrigin { value, origin } = anchor_checkpoint_provider.checkpoint(); @@ -41,9 +53,17 @@ impl Storage

{ let anchor_block_slot = anchor_block.message().slot(); let anchor_block_root = anchor_block.message().hash_tree_root(); + // check whether archiving was interrupted + if let Some(slot) = get_latest_archived_slot(&self.database)? { + if self.stored_state(slot)?.is_some() && slot > start_slot && slot <= end_slot { + start_slot = slot; + info!("resuming back-sync archival from {slot} slot"); + } + } + let mut state = if start_slot == anchor_block_slot { if origin.is_checkpoint_sync() { - warn!("unable to back sync to genesis state as it not available"); + warn!("unable to back-sync to genesis state as it not available"); } anchor_state @@ -53,22 +73,21 @@ impl Storage

{ })? }; - let mut batch = vec![]; let mut previous_block = None; - - let state_transition = if Feature::TrustBackSyncBlocks.is_enabled() { - combined::trusted_state_transition - } else { - combined::untrusted_state_transition - }; + let mut batch = vec![]; + let mut states_in_batch = 0; if start_slot == anchor_block_slot { batch.push(serialize(StateByBlockRoot(anchor_block_root), &state)?); } for slot in (start_slot + 1)..=end_slot { + if is_exiting.load(Ordering::Relaxed) { + bail!(AnyhowError::msg("received a termination signal")); + } + if let Some((block, _)) = self.finalized_block_by_slot(slot)? { - state_transition(self.config(), state.make_mut(), &block)?; + combined::untrusted_state_transition(self.config(), state.make_mut(), &block)?; previous_block = Some(block); } else { combined::process_slots(self.config(), state.make_mut(), slot)?; @@ -82,10 +101,23 @@ impl Storage

{ if let Some(block) = previous_block.as_ref() { if append_state { - info!("archiving back sync state in slot {slot}"); + debug!("back-synced state in {slot} is ready for storage"); let block_root = block.message().hash_tree_root(); + batch.push(serialize(StateByBlockRoot(block_root), &state)?); + batch.push(serialize(ARCHIVER_CHECKPOINT_KEY, slot)?); + + states_in_batch += 1; + + if states_in_batch == ARCHIVED_STATES_BEFORE_FLUSH { + info!("archiving back-sync data up to {slot} slot"); + + self.database.put_batch(batch)?; + + batch = vec![]; + states_in_batch = 0; + } } } } @@ -93,12 +125,20 @@ impl Storage

{ self.database.put_batch(batch)?; info!( - "back sync state archival completed (start_slot: {start_slot}, end_slot: {end_slot})", + "back-synced state archival completed (start_slot: {start_slot}, end_slot: {end_slot})", ); Ok(()) } + pub(crate) fn store_back_sync_blob_sidecars( + &self, + blob_sidecars: impl IntoIterator>>, + ) -> Result<()> { + self.append_blob_sidecars(blob_sidecars.into_iter().map(Into::into))?; + Ok(()) + } + pub(crate) fn store_back_sync_blocks( &self, blocks: impl IntoIterator>>, @@ -117,6 +157,10 @@ impl Storage

{ } } +fn get_latest_archived_slot(database: &Database) -> Result> { + get(database, ARCHIVER_CHECKPOINT_KEY) +} + #[cfg(test)] #[cfg(feature = "eth2-cache")] mod tests { @@ -128,6 +172,8 @@ mod tests { use itertools::{EitherOrBoth, Itertools as _}; use types::phase0::consts::GENESIS_SLOT; + use crate::StorageMode; + use super::*; #[test] @@ -189,6 +235,7 @@ mod tests { 0, 128, &AnchorCheckpointProvider::custom_from_genesis(genesis_state), + &Arc::new(AtomicBool::new(false)), )?; // Assert that the mappings from state root to slot are stored. @@ -215,7 +262,7 @@ mod tests { Arc::new(P::default_config()), Database::in_memory(), NonZeroU64::MIN, - false, + StorageMode::Standard, ) } } diff --git a/fork_choice_store/src/misc.rs b/fork_choice_store/src/misc.rs index 894d6dde..66614eb8 100644 --- a/fork_choice_store/src/misc.rs +++ b/fork_choice_store/src/misc.rs @@ -494,6 +494,7 @@ impl AttesterSlashingOrigin { #[derive(Debug)] pub enum BlobSidecarOrigin { Api(Option>>), + BackSync, ExecutionLayer, Gossip(SubnetId, GossipId), Requested(PeerId), @@ -511,7 +512,7 @@ impl BlobSidecarOrigin { match self { Self::Gossip(_, gossip_id) => (Some(gossip_id), None), Self::Api(sender) => (None, sender), - Self::ExecutionLayer | Self::Own | Self::Requested(_) => (None, None), + Self::BackSync | Self::ExecutionLayer | Self::Own | Self::Requested(_) => (None, None), } } @@ -519,7 +520,11 @@ impl BlobSidecarOrigin { pub fn gossip_id(self) -> Option { match self { Self::Gossip(_, gossip_id) => Some(gossip_id), - Self::Api(_) | Self::ExecutionLayer | Self::Own | Self::Requested(_) => None, + Self::Api(_) + | Self::BackSync + | Self::ExecutionLayer + | Self::Own + | Self::Requested(_) => None, } } @@ -528,7 +533,7 @@ impl BlobSidecarOrigin { match self { Self::Gossip(_, gossip_id) => Some(gossip_id.source), Self::Requested(peer_id) => Some(*peer_id), - Self::Api(_) | Self::ExecutionLayer | Self::Own => None, + Self::Api(_) | Self::BackSync | Self::ExecutionLayer | Self::Own => None, } } @@ -536,7 +541,11 @@ impl BlobSidecarOrigin { pub const fn subnet_id(&self) -> Option { match self { Self::Gossip(subnet_id, _) => Some(*subnet_id), - Self::Api(_) | Self::ExecutionLayer | Self::Own | Self::Requested(_) => None, + Self::Api(_) + | Self::BackSync + | Self::ExecutionLayer + | Self::Own + | Self::Requested(_) => None, } } @@ -544,6 +553,11 @@ impl BlobSidecarOrigin { pub const fn is_from_el(&self) -> bool { matches!(self, Self::ExecutionLayer) } + + #[must_use] + pub const fn is_from_back_sync(&self) -> bool { + matches!(self, Self::BackSync) + } } pub enum BlockAction { @@ -601,6 +615,7 @@ impl AttestationAction { } } +#[derive(Debug)] pub enum BlobSidecarAction { Accept(Arc>), Ignore(Publishable), @@ -608,6 +623,13 @@ pub enum BlobSidecarAction { DelayUntilSlot(Arc>), } +impl BlobSidecarAction

{ + #[must_use] + pub const fn accepted(&self) -> bool { + matches!(self, Self::Accept(_)) + } +} + pub enum PartialBlockAction { Accept, Ignore, diff --git a/fork_choice_store/src/store.rs b/fork_choice_store/src/store.rs index f6f5a2ba..4081af49 100644 --- a/fork_choice_store/src/store.rs +++ b/fork_choice_store/src/store.rs @@ -207,6 +207,7 @@ pub struct Store { state_cache: Arc>, rejected_block_roots: HashSet, finished_initial_forward_sync: bool, + finished_back_sync: bool, } impl Store

{ @@ -218,6 +219,7 @@ impl Store

{ anchor_block: Arc>, anchor_state: Arc>, finished_initial_forward_sync: bool, + finished_back_sync: bool, ) -> Self { let block_root = anchor_block.message().hash_tree_root(); let state_root = anchor_state.hash_tree_root(); @@ -279,6 +281,7 @@ impl Store

{ )), rejected_block_roots: HashSet::default(), finished_initial_forward_sync, + finished_back_sync, } } @@ -1696,11 +1699,13 @@ impl Store

{ } // TODO(feature/deneb): Format quotes and log message like everything else. - pub fn validate_blob_sidecar( + pub fn validate_blob_sidecar_with_state( &self, blob_sidecar: Arc>, block_seen: bool, origin: &BlobSidecarOrigin, + parent_info: impl FnOnce() -> Option<(Arc>, PayloadStatus)>, + state_fn: impl FnOnce() -> Result>>, ) -> Result> { let block_header = blob_sidecar.signed_block_header.message; @@ -1732,7 +1737,7 @@ impl Store

{ } // [IGNORE] The sidecar is from a slot greater than the latest finalized slot -- i.e. validate that block_header.slot > compute_start_slot_at_epoch(state.finalized_checkpoint.epoch) - if block_header.slot <= self.finalized_slot() { + if !origin.is_from_back_sync() && block_header.slot <= self.finalized_slot() { return Ok(BlobSidecarAction::Ignore(false)); } @@ -1747,15 +1752,7 @@ impl Store

{ return Ok(BlobSidecarAction::Ignore(true)); } - let state = self - .state_cache - .try_state_at_slot(self, block_header.parent_root, block_header.slot)? - .unwrap_or_else(|| { - self.chain_link(block_header.parent_root) - .or_else(|| self.chain_link_before_or_at(block_header.slot)) - .map(|chain_link| chain_link.state(self)) - .unwrap_or_else(|| self.head().state(self)) - }); + let state = state_fn()?; // [REJECT] The proposer signature of blob_sidecar.signed_block_header, is valid with respect to the block_header.proposer_index pubkey. SingleVerifier.verify_singular( @@ -1781,19 +1778,19 @@ impl Store

{ // [IGNORE] The sidecar's block's parent (defined by block_header.parent_root) has been seen (via both gossip and non-gossip sources) // (a client MAY queue sidecars for processing once the parent block is retrieved). - let Some(parent) = self.chain_link(block_header.parent_root) else { + let Some((parent, parent_payload_status)) = parent_info() else { return Ok(BlobSidecarAction::DelayUntilParent(blob_sidecar)); }; // [REJECT] The sidecar's block's parent (defined by block_header.parent_root) passes validation. // Part 2/2: ensure!( - !parent.is_invalid(), + !parent_payload_status.is_invalid(), Error::BlobSidecarInvalidParentOfBlock { blob_sidecar }, ); // [REJECT] The sidecar is from a higher slot than the sidecar's block's parent (defined by block_header.parent_root). - let parent_slot = parent.slot(); + let parent_slot = parent.message().slot(); ensure!( block_header.slot > parent_slot, @@ -1803,16 +1800,20 @@ impl Store

{ } ); - // [REJECT] The current finalized_checkpoint is an ancestor of the sidecar's block - // -- i.e. get_checkpoint_block(store, block_header.parent_root, store.finalized_checkpoint.epoch) == store.finalized_checkpoint.root. - let ancestor_at_finalized_slot = self - .ancestor(block_header.parent_root, self.finalized_slot()) - .expect("every block in the store should have an ancestor at the last finalized slot"); + if !origin.is_from_back_sync() { + // [REJECT] The current finalized_checkpoint is an ancestor of the sidecar's block + // -- i.e. get_checkpoint_block(store, block_header.parent_root, store.finalized_checkpoint.epoch) == store.finalized_checkpoint.root. + let ancestor_at_finalized_slot = self + .ancestor(block_header.parent_root, self.finalized_slot()) + .expect( + "every block in the store should have an ancestor at the last finalized slot", + ); - ensure!( - ancestor_at_finalized_slot == self.finalized_checkpoint.root, - Error::BlobSidecarBlockNotADescendantOfFinalized { blob_sidecar }, - ); + ensure!( + ancestor_at_finalized_slot == self.finalized_checkpoint.root, + Error::BlobSidecarBlockNotADescendantOfFinalized { blob_sidecar }, + ); + } // > _[REJECT]_ The sidecar's inclusion proof is valid as // > verified by `verify_blob_sidecar_inclusion_proof(blob_sidecar)`. @@ -1832,24 +1833,56 @@ impl Store

{ Error::BlobSidecarInvalid { blob_sidecar } ); - // [REJECT] The sidecar is proposed by the expected proposer_index for the block's slot in the context of the current shuffling - // (defined by block_header.parent_root/block_header.slot). - // If the proposer_index cannot immediately be verified against the expected shuffling, - // the sidecar MAY be queued for later processing while proposers for the block's branch are calculated -- - // in such a case do not REJECT, instead IGNORE this message. - let computed = accessors::get_beacon_proposer_index(&state)?; + if !origin.is_from_back_sync() { + // [REJECT] The sidecar is proposed by the expected proposer_index for the block's slot in the context of the current shuffling + // (defined by block_header.parent_root/block_header.slot). + // If the proposer_index cannot immediately be verified against the expected shuffling, + // the sidecar MAY be queued for later processing while proposers for the block's branch are calculated -- + // in such a case do not REJECT, instead IGNORE this message. + let computed = accessors::get_beacon_proposer_index(&state)?; - ensure!( - block_header.proposer_index == computed, - Error::BlobSidecarProposerIndexMismatch { - blob_sidecar, - computed, - } - ); + ensure!( + block_header.proposer_index == computed, + Error::BlobSidecarProposerIndexMismatch { + blob_sidecar, + computed, + } + ); + } Ok(BlobSidecarAction::Accept(blob_sidecar)) } + pub fn validate_blob_sidecar( + &self, + blob_sidecar: Arc>, + block_seen: bool, + origin: &BlobSidecarOrigin, + ) -> Result> { + let block_header = blob_sidecar.signed_block_header.message; + + self.validate_blob_sidecar_with_state( + blob_sidecar, + block_seen, + origin, + || { + self.chain_link(block_header.parent_root) + .map(|chain_link| (chain_link.block.clone_arc(), chain_link.payload_status)) + }, + || { + Ok(self + .state_cache + .try_state_at_slot(self, block_header.parent_root, block_header.slot)? + .unwrap_or_else(|| { + self.chain_link(block_header.parent_root) + .or_else(|| self.chain_link_before_or_at(block_header.slot)) + .map(|chain_link| chain_link.state(self)) + .unwrap_or_else(|| self.head().state(self)) + })) + }, + ) + } + /// [`on_tick`](https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/phase0/fork-choice.md#on_tick) pub fn apply_tick(&mut self, new_tick: Tick) -> Result>> { let old_tick = self.tick; @@ -2876,6 +2909,15 @@ impl Store

{ && self.finished_initial_forward_sync } + #[must_use] + pub const fn is_back_synced(&self) -> bool { + self.finished_back_sync + } + + pub fn set_back_synced(&mut self, finished_back_sync: bool) { + self.finished_back_sync = finished_back_sync; + } + fn set_block_payload_status( &mut self, block_hash: ExecutionBlockHash, diff --git a/grandine/src/commands.rs b/grandine/src/commands.rs index 695552ce..c098b99f 100644 --- a/grandine/src/commands.rs +++ b/grandine/src/commands.rs @@ -1,15 +1,36 @@ use std::path::PathBuf; use clap::Subcommand; +use strum::EnumString; use types::phase0::primitives::Slot; +#[derive(Copy, Clone, Debug, PartialEq, Eq, EnumString)] +#[strum(serialize_all = "snake_case")] +pub enum AppDatabase { + Sync, +} + #[derive(Clone, Subcommand)] #[cfg_attr(test, derive(PartialEq, Eq, Debug))] pub enum GrandineCommand { + /// Show information about database records + /// (example: grandine db-info --database sync) + DbInfo { + /// Type of the database + #[clap(short, long)] + database: AppDatabase, + /// Path to a custom directory where database files are stored + /// (example: grandine --network holesky db-info -d sync -p ~/.grandine/holesky/beacon/sync) + #[clap(short, long)] + path: Option, + }, + /// Show `beacon_fork_choice` database element sizes /// (example: grandine db-stats) DbStats { - /// Custom database path + /// Path to a custom directory where `beacon_fork_choice` database files are stored + #[expect(clippy::doc_markdown)] + /// (example: grandine --network holesky db-stats -p ~/.grandine/holesky/beacon/beacon_fork_choice) #[clap(short, long)] path: Option, }, diff --git a/grandine/src/db_info.rs b/grandine/src/db_info.rs new file mode 100644 index 00000000..2f493275 --- /dev/null +++ b/grandine/src/db_info.rs @@ -0,0 +1,22 @@ +use std::path::PathBuf; + +use anyhow::Result; +use database::DatabaseMode; +use runtime::StorageConfig; + +use crate::commands::AppDatabase; + +pub fn print( + storage_config: &StorageConfig, + database: AppDatabase, + custom_path: Option, +) -> Result<()> { + match database { + AppDatabase::Sync => { + let database = storage_config.sync_database(custom_path, DatabaseMode::ReadOnly)?; + p2p::print_sync_database_info(&database)?; + } + }; + + Ok(()) +} diff --git a/grandine/src/db_stats.rs b/grandine/src/db_stats.rs index 4a813ac7..5804d9b6 100644 --- a/grandine/src/db_stats.rs +++ b/grandine/src/db_stats.rs @@ -2,6 +2,7 @@ use std::path::PathBuf; use anyhow::Result; use bytesize::ByteSize; +use database::DatabaseMode; use fork_choice_control::{ BlobSidecarByBlobId, BlockCheckpoint, BlockRootBySlot, FinalizedBlockByRoot, PrefixableKey as _, SlotBlobId, SlotByStateRoot, StateByBlockRoot, StateCheckpoint, @@ -55,7 +56,8 @@ pub fn print( storage_config: &StorageConfig, custom_path: Option, ) -> Result<()> { - let storage_database = storage_config.beacon_fork_choice_database(custom_path, true)?; + let storage_database = + storage_config.beacon_fork_choice_database(custom_path, DatabaseMode::ReadOnly)?; let mut total_size = 0; let mut finalized_block_root_entries = EntriesInfo::new("finalized_block_roots"); diff --git a/grandine/src/grandine_args.rs b/grandine/src/grandine_args.rs index 75de2f9b..912a7838 100644 --- a/grandine/src/grandine_args.rs +++ b/grandine/src/grandine_args.rs @@ -30,7 +30,7 @@ use eth2_libp2p::{ PeerIdSerialized, }; use features::Feature; -use fork_choice_control::DEFAULT_ARCHIVAL_EPOCH_INTERVAL; +use fork_choice_control::{StorageMode, DEFAULT_ARCHIVAL_EPOCH_INTERVAL}; use fork_choice_store::{StoreConfig, DEFAULT_CACHE_LOCK_TIMEOUT_MILLIS}; use grandine_version::{APPLICATION_NAME, APPLICATION_NAME_AND_VERSION, APPLICATION_VERSION}; use http_api::HttpApiConfig; @@ -285,9 +285,14 @@ struct BeaconNodeOptions { #[clap(long, default_value_t = DEFAULT_ARCHIVAL_EPOCH_INTERVAL)] archival_epoch_interval: NonZeroU64, - /// Enable prune mode where only single checkpoint state & block are stored in the DB + /// Enable archival storage mode, where all blocks, states (every --archival-epoch-interval epochs) and blobs are stored in the database /// [default: disabled] - #[clap(long)] + #[clap(long, conflicts_with("prune_storage"))] + archive_storage: bool, + + /// Enable prune storage mode, where only a single checkpoint state and block are stored in the database + /// [default: disabled] + #[clap(long, conflicts_with("archive_storage"))] prune_storage: bool, /// Number of unfinalized states to keep in memory. @@ -346,9 +351,11 @@ struct BeaconNodeOptions { #[clap(long = "back_sync")] back_sync: bool, - /// Enable syncing historical data + /// Enable syncing historical data. + /// When used with --archive-storage, it will back-sync to genesis and reconstruct historical states. + /// When used without --archive-storage, it will back-sync blocks to the MIN_EPOCHS_FOR_BLOCK_REQUESTS epoch. /// [default: disabled] - #[clap(long = "back-sync")] + #[clap(long = "back-sync", conflicts_with("prune_storage"))] back_sync_enabled: bool, /// Collect Prometheus metrics @@ -902,6 +909,7 @@ impl GrandineArgs { database_size, eth1_database_size, archival_epoch_interval, + archive_storage, prune_storage, unfinalized_states_in_memory, request_timeout, @@ -1238,13 +1246,21 @@ impl GrandineArgs { urls: web3signer_urls, }; + let storage_mode = if prune_storage { + StorageMode::Prune + } else if archive_storage { + StorageMode::Archive + } else { + StorageMode::Standard + }; + let storage_config = StorageConfig { in_memory, db_size: database_size, directories: directories.clone_arc(), eth1_db_size: eth1_database_size, archival_epoch_interval, - prune_storage, + storage_mode, }; network_config_options.print_upnp_warning(); @@ -1862,6 +1878,18 @@ mod tests { .expect_err("parse_graffiti should fail"); } + #[test] + fn incompatible_back_sync_and_storage_option() { + try_config_from_args(["--back-sync", "--prune-storage"]) + .expect_err("incompatible back-sync and storage options should fail"); + } + + #[test] + fn incompatible_storage_options() { + try_config_from_args(["--archive-storage", "--prune-storage"]) + .expect_err("incompatible storage options should fail"); + } + #[test] fn interchange_import_subcommand() { let config = config_from_args(["interchange", "import", "test.json"]); diff --git a/grandine/src/grandine_config.rs b/grandine/src/grandine_config.rs index a75fe1ea..3d42db34 100644 --- a/grandine/src/grandine_config.rs +++ b/grandine/src/grandine_config.rs @@ -108,6 +108,7 @@ impl GrandineConfig { ), } + info!("storage mode: {:?}", storage_config.storage_mode); info!("data directory: {data_dir:?}"); self.storage_config.print_db_sizes(); @@ -170,7 +171,7 @@ impl GrandineConfig { } info!("suggested fee recipient: {suggested_fee_recipient}"); - info!("back sync enabled: {back_sync_enabled}"); + info!("back-sync enabled: {back_sync_enabled}"); if *use_validator_key_cache { info!("using validator key cache"); diff --git a/grandine/src/main.rs b/grandine/src/main.rs index 68785df1..d613def5 100644 --- a/grandine/src/main.rs +++ b/grandine/src/main.rs @@ -10,7 +10,7 @@ use allocator as _; use anyhow::{bail, ensure, Context as _, Result}; use builder_api::BuilderConfig; use clap::{Error as ClapError, Parser as _}; -use database::Database; +use database::{Database, DatabaseMode}; use eth1::{Eth1Chain, Eth1Config}; use eth1_api::Auth; use features::Feature; @@ -57,6 +57,7 @@ use types::preset::Minimal; mod commands; mod config_dir; mod consts; +mod db_info; mod db_stats; mod grandine_args; mod grandine_config; @@ -651,6 +652,7 @@ fn ensure_ports_not_in_use( Ok(()) } +#[expect(clippy::too_many_lines)] fn handle_command( chain_config: Arc, storage_config: &StorageConfig, @@ -660,20 +662,30 @@ fn handle_command( ) -> Result<()> { Feature::InhibitApplicationRestart.enable(); + let StorageConfig { + archival_epoch_interval, + storage_mode, + .. + } = storage_config; + match command { + GrandineCommand::DbInfo { database, path } => { + db_info::print(storage_config, database, path)? + } GrandineCommand::DbStats { path } => db_stats::print::

(storage_config, path)?, GrandineCommand::Export { from, to, output_dir, } => { - let storage_database = storage_config.beacon_fork_choice_database(None, true)?; + let storage_database = + storage_config.beacon_fork_choice_database(None, DatabaseMode::ReadOnly)?; let storage = Storage::new( chain_config, storage_database, - storage_config.archival_epoch_interval, - false, + *archival_epoch_interval, + *storage_mode, ); let output_dir = output_dir.unwrap_or(std::env::current_dir()?); diff --git a/grandine/src/predefined_network.rs b/grandine/src/predefined_network.rs index 1b9331a7..dfb7b152 100644 --- a/grandine/src/predefined_network.rs +++ b/grandine/src/predefined_network.rs @@ -15,6 +15,7 @@ use types::{ nonstandard::{FinalizedCheckpoint, WithOrigin}, preset::Preset, redacting_url::RedactingUrl, + traits::BeaconState as _, }; #[cfg(any(feature = "network-mainnet", test))] @@ -308,6 +309,8 @@ async fn load_or_download_genesis_checkpoint( let state = Arc::from_ssz(config, ssz_bytes)?; let block = Arc::new(genesis::beacon_block(&state)); + info!("genesis state loaded at slot: {}", state.slot()); + Ok(WithOrigin::new_from_genesis(FinalizedCheckpoint { block, state, diff --git a/helper_functions/src/misc.rs b/helper_functions/src/misc.rs index b9868256..476da389 100644 --- a/helper_functions/src/misc.rs +++ b/helper_functions/src/misc.rs @@ -124,7 +124,8 @@ pub fn compute_fork_digest(current_version: Version, genesis_validators_root: H2 ForkDigest::from_slice(&root[..ForkDigest::len_bytes()]) } -pub(crate) fn compute_domain( +#[must_use] +pub fn compute_domain( config: &Config, domain_type: DomainType, fork_version: Option, diff --git a/http_api/src/context.rs b/http_api/src/context.rs index a5089420..8c9d491d 100644 --- a/http_api/src/context.rs +++ b/http_api/src/context.rs @@ -16,7 +16,7 @@ use eth1_api::{Eth1Api, Eth1ExecutionEngine, ExecutionService}; use eth2_cache_utils::mainnet; use features::Feature; use fork_choice_control::{ - Controller, StateLoadStrategy, Storage, DEFAULT_ARCHIVAL_EPOCH_INTERVAL, + Controller, StateLoadStrategy, Storage, StorageMode, DEFAULT_ARCHIVAL_EPOCH_INTERVAL, }; use fork_choice_store::StoreConfig; use futures::{future::FutureExt as _, lock::Mutex, select_biased}; @@ -147,7 +147,7 @@ impl Context

{ chain_config.clone_arc(), Database::in_memory(), DEFAULT_ARCHIVAL_EPOCH_INTERVAL, - false, + StorageMode::Standard, )); let state_load_strategy = StateLoadStrategy::Anchor { @@ -190,6 +190,7 @@ impl Context

{ fc_to_validator_tx, storage, core::iter::empty(), + true, )?; for block in extra_blocks { @@ -385,7 +386,6 @@ impl Context

{ let submit_requests = case.run(should_update_responses(), actual_address); SyncToApi::SyncStatus(true).send(&sync_to_api_tx); - SyncToApi::BackSyncStatus(true).send(&sync_to_api_tx); // Poll the HTTP API first to ensure it handles the messages sent above before any requests. // This could also be done by polling it once using `core::future::poll_fn`. diff --git a/http_api/src/misc.rs b/http_api/src/misc.rs index 30185a3f..149fb688 100644 --- a/http_api/src/misc.rs +++ b/http_api/src/misc.rs @@ -59,19 +59,6 @@ impl SyncedStatus { } } -#[derive(Default)] -pub struct BackSyncedStatus(AtomicBool); - -impl BackSyncedStatus { - pub fn get(&self) -> bool { - self.0.load(ORDERING) - } - - pub fn set(&self, value: bool) { - self.0.store(value, ORDERING); - } -} - pub type SignedBeaconBlockWithBlobsAndProofs

= ( SignedBeaconBlock

, ContiguousList::MaxBlobsPerBlock>, diff --git a/http_api/src/routing.rs b/http_api/src/routing.rs index 3d7626f9..6b8ffc91 100644 --- a/http_api/src/routing.rs +++ b/http_api/src/routing.rs @@ -26,7 +26,7 @@ use validator::{ApiToValidator, ValidatorConfig}; use crate::{ error::Error, gui, middleware, - misc::{BackSyncedStatus, SyncedStatus}, + misc::SyncedStatus, standard::{ beacon_events, beacon_heads, beacon_state, blob_sidecars, block, block_attestations, block_headers, block_id_headers, block_rewards, block_root, config_spec, debug_fork_choice, @@ -76,7 +76,6 @@ pub struct NormalState { pub sync_committee_agg_pool: Arc>, pub bls_to_execution_change_pool: Arc, pub is_synced: Arc, - pub is_back_synced: Arc, pub event_channels: Arc, pub api_to_liveness_tx: Option>, pub api_to_p2p_tx: UnboundedSender>, @@ -158,12 +157,6 @@ impl FromRef> for Arc { } } -impl FromRef> for Arc { - fn from_ref(state: &NormalState) -> Self { - state.is_back_synced.clone_arc() - } -} - impl FromRef> for Arc { fn from_ref(state: &NormalState) -> Self { state.event_channels.clone_arc() diff --git a/http_api/src/standard.rs b/http_api/src/standard.rs index cb5286ed..b23e8132 100644 --- a/http_api/src/standard.rs +++ b/http_api/src/standard.rs @@ -89,7 +89,7 @@ use crate::{ error::{Error, IndexedError}, extractors::{EthJson, EthJsonOrSsz, EthPath, EthQuery}, full_config::FullConfig, - misc::{APIBlock, BackSyncedStatus, BroadcastValidation, SignedAPIBlock, SyncedStatus}, + misc::{APIBlock, BroadcastValidation, SignedAPIBlock, SyncedStatus}, response::{EthResponse, JsonOrSsz}, state_id, validator_status::{ @@ -1700,12 +1700,11 @@ pub async fn node_syncing_status( State(controller): State>, State(eth1_api): State>, State(is_synced): State>, - State(is_back_synced): State>, ) -> EthResponse { let snapshot = controller.snapshot(); let head_slot = snapshot.head_slot(); let is_synced = is_synced.get(); - let is_back_synced = is_back_synced.get(); + let is_back_synced = snapshot.is_back_synced(); let el_offline = eth1_api.el_offline(); EthResponse::json(NodeSyncingResponse { @@ -1720,11 +1719,14 @@ pub async fn node_syncing_status( } /// `GET /eth/v1/node/health` -pub async fn node_health( +pub async fn node_health( + State(controller): State>, State(is_synced): State>, - State(is_back_synced): State>, ) -> StatusCode { - if is_synced.get() && is_back_synced.get() { + let snapshot = controller.snapshot(); + let is_back_synced = snapshot.is_back_synced(); + + if is_synced.get() && is_back_synced { StatusCode::OK } else { StatusCode::PARTIAL_CONTENT diff --git a/http_api/src/task.rs b/http_api/src/task.rs index 9c1a5e64..97a8326f 100644 --- a/http_api/src/task.rs +++ b/http_api/src/task.rs @@ -28,7 +28,7 @@ use validator::{ApiToValidator, ValidatorConfig}; use crate::{ error::Error, http_api_config::HttpApiConfig, - misc::{BackSyncedStatus, SyncedStatus}, + misc::SyncedStatus, routing::{self, NormalState}, }; @@ -105,7 +105,6 @@ impl HttpApi { } = channels; let is_synced = Arc::new(SyncedStatus::new(controller.is_forward_synced())); - let is_back_synced = Arc::new(BackSyncedStatus::default()); let state = NormalState { chain_config: controller.chain_config().clone_arc(), @@ -121,7 +120,6 @@ impl HttpApi { sync_committee_agg_pool, bls_to_execution_change_pool, is_synced: is_synced.clone_arc(), - is_back_synced: is_back_synced.clone_arc(), event_channels, api_to_liveness_tx, api_to_p2p_tx, @@ -143,7 +141,7 @@ impl HttpApi { .into_future() .map_err(AnyhowError::new); - let handle_sync_statuses = handle_sync_statuses(is_synced, is_back_synced, sync_to_api_rx); + let handle_sync_statuses = handle_sync_statuses(is_synced, sync_to_api_rx); info!("HTTP server listening on {address}"); @@ -156,7 +154,6 @@ impl HttpApi { async fn handle_sync_statuses( is_synced: Arc, - is_back_synced: Arc, mut sync_to_api_rx: UnboundedReceiver, ) -> Result<()> { loop { @@ -164,7 +161,6 @@ async fn handle_sync_statuses( message = sync_to_api_rx.select_next_some() => { match message { SyncToApi::SyncStatus(status) => is_synced.set(status), - SyncToApi::BackSyncStatus(status) => is_back_synced.set(status), } } diff --git a/keymanager/src/proposer_configs.rs b/keymanager/src/proposer_configs.rs index 26b142c0..08f4efcd 100644 --- a/keymanager/src/proposer_configs.rs +++ b/keymanager/src/proposer_configs.rs @@ -3,7 +3,7 @@ use std::{path::Path, str}; use anyhow::{ensure, Result}; use bls::PublicKeyBytes; use bytesize::ByteSize; -use database::Database; +use database::{Database, DatabaseMode}; use derive_more::Display; use serde::{de::DeserializeOwned, Serialize}; use types::{ @@ -45,8 +45,12 @@ impl ProposerConfigs { default_gas_limit: Gas, default_graffiti: H256, ) -> Result { - let database = - Database::persistent("proposer-configs", validator_directory, DB_MAX_SIZE, false)?; + let database = Database::persistent( + "proposer-configs", + validator_directory, + DB_MAX_SIZE, + DatabaseMode::ReadWrite, + )?; Ok(Self { database, diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 839d3526..9c75ca3a 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -19,6 +19,7 @@ eth1_api = { workspace = true } eth2_libp2p = { workspace = true } features = { workspace = true } fork_choice_control = { workspace = true } +fork_choice_store = { workspace = true } futures = { workspace = true } genesis = { workspace = true } helper_functions = { workspace = true } @@ -26,6 +27,7 @@ igd-next = { workspace = true } itertools = { workspace = true } log = { workspace = true } logging = { workspace = true } +lru = { workspace = true } operation_pools = { workspace = true } prometheus_metrics = { workspace = true } prometheus-client = { workspace = true } @@ -41,6 +43,7 @@ strum = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } +transition_functions = { workspace = true } tynm = { workspace = true } typenum = { workspace = true } types = { workspace = true } diff --git a/p2p/src/back_sync.rs b/p2p/src/back_sync.rs index 8afce626..a258c48c 100644 --- a/p2p/src/back_sync.rs +++ b/p2p/src/back_sync.rs @@ -1,26 +1,42 @@ -use std::{collections::BTreeMap, sync::Arc, thread::Builder}; +use core::sync::atomic::AtomicBool; +use std::{ + collections::{BTreeMap, HashMap}, + sync::Arc, + thread::Builder, +}; -use anyhow::{ensure, Result}; +use anyhow::{bail, ensure, Result}; use database::Database; use derive_more::Display; use eth1_api::RealController; +use fork_choice_control::PrefixableKey; +use fork_choice_store::{BlobSidecarAction, BlobSidecarOrigin}; use futures::channel::mpsc::UnboundedSender; use genesis::AnchorCheckpointProvider; +use helper_functions::misc; use log::{debug, info, warn}; use ssz::{Ssz, SszReadDefault as _, SszWrite as _}; +use std_ext::ArcExt as _; use thiserror::Error; use types::{ combined::SignedBeaconBlock, + config::Config, + deneb::{ + containers::{BlobIdentifier, BlobSidecar}, + primitives::BlobIndex, + }, + nonstandard::PayloadStatus, phase0::{ consts::GENESIS_SLOT, primitives::{Slot, H256}, }, preset::Preset, - traits::SignedBeaconBlock as _, + traits::{BeaconState as _, SignedBeaconBlock as _}, }; use crate::messages::ArchiverToSync; +#[derive(Debug)] pub struct BackSync { batch: Batch

, data: Data, @@ -31,7 +47,14 @@ impl BackSync

{ pub fn load(database: &Database) -> Result> { let data = Data::find(database)?; - debug!("loaded back sync: {data:?}"); + debug!("loaded back-sync: {data:?}"); + + if let Some(data) = data.as_ref() { + info!( + "starting back-sync from {} to {} slot", + data.current.slot, data.low.slot + ); + } Ok(data.map(Self::new)) } @@ -60,21 +83,40 @@ impl BackSync

{ self.data.low.slot } - pub fn is_finished(&self) -> bool { + pub fn low_slot_with_parent(&self) -> Slot { + self.data.low.slot.checked_sub(1).unwrap_or(GENESIS_SLOT) + } + + pub const fn is_finished(&self) -> bool { self.data.is_finished() } - pub fn finish(&self, database: &Database) -> Result<()> { + pub fn remove(&self, database: &Database) -> Result<()> { self.data.remove(database) } + pub fn reset_batch(&mut self) { + self.batch = Batch::default(); + } + + pub fn push_blob_sidecar(&mut self, blob_sidecar: Arc>) { + let slot = blob_sidecar.signed_block_header.message.slot; + + if slot <= self.high_slot() && !self.is_finished() { + self.batch.push_blob_sidecar(blob_sidecar); + } else { + let blob_identifier: BlobIdentifier = blob_sidecar.as_ref().into(); + debug!("ignoring blob sidecar: {blob_identifier:?}, slot: {slot}"); + } + } + pub fn push_block(&mut self, block: Arc>) { let slot = block.message().slot(); - if slot >= self.low_slot() && slot <= self.high_slot() && !self.is_finished() { - self.batch.push(block); + if slot <= self.high_slot() && !self.is_finished() { + self.batch.push_block(block); } else { - debug!("ignoring network block during back sync: {slot}"); + debug!("ignoring block: {slot}"); } } @@ -86,17 +128,16 @@ impl BackSync

{ &mut self, controller: RealController

, anchor_checkpoint_provider: AnchorCheckpointProvider

, + is_exiting: Arc, sync_tx: UnboundedSender, ) -> Result<()> { if !self.is_finished() { - debug!("not spawning state archiver: back sync not yet finished"); - + debug!("not spawning state archiver: back-sync not yet finished"); return Ok(()); } if self.archiving { debug!("not spawning state archiver: state archiver already started"); - return Ok(()); } @@ -106,15 +147,16 @@ impl BackSync

{ Builder::new() .name("state-archiver".to_owned()) .spawn(move || { - debug!("archiving back sync states from {start_slot} to {end_slot}"); + info!("archiving back-synced states from {start_slot} to {end_slot}"); match controller.archive_back_sync_states( start_slot, end_slot, &anchor_checkpoint_provider, + &is_exiting, ) { - Ok(()) => info!("back sync state archiver thread finished successfully"), - Err(error) => warn!("back sync state archiver thread failed: {error:?}"), + Ok(()) => info!("back-sync state archiver thread finished successfully"), + Err(error) => warn!("unable to archive back back-sync states: {error:?}"), }; ArchiverToSync::BackSyncStatesArchived.send(&sync_tx); @@ -127,88 +169,191 @@ impl BackSync

{ pub fn verify_blocks( &mut self, + config: &Config, database: &Database, controller: &RealController

, ) -> Result<()> { let last_block_checkpoint = self.data.current; - match self.batch.verify_from_checkpoint(last_block_checkpoint) { - Ok((checkpoint, blocks)) => { - debug!("back sync batch verified: {checkpoint:?}"); + let (checkpoint, blocks, blob_sidecars) = + self.batch + .verify_from_checkpoint(config, controller, last_block_checkpoint)?; - if checkpoint.slot == self.low_slot() { - let expected = self.data.low; - let actual = checkpoint; - - ensure!( - actual == expected, - Error::FinalCheckpointMismatch { expected, actual }, - ); - } + info!("back-synced to {} slot", checkpoint.slot); - // Store back synced blocks in fork choice store. - controller.store_back_sync_blocks(blocks)?; + if checkpoint.slot == self.low_slot() { + let expected = self.data.low; + let actual = checkpoint; - // Update back sync progress in sync database. - self.data.current = checkpoint; - self.save(database)?; - - debug!("back sync batch saved {checkpoint:?}"); + if !expected.block_root.is_zero() { + ensure!( + actual == expected, + Error::FinalCheckpointMismatch::

{ expected, actual }, + ); } - Err(error) => debug!("back sync batch verification failed: {error}"), } + // Store back-synced blocks in fork choice db. + controller.store_back_sync_blocks(blocks)?; + controller.store_back_sync_blob_sidecars(blob_sidecars)?; + + // Update back-sync progress in sync database. + self.data.current = checkpoint; + self.save(database)?; + + debug!("back-sync batch saved {checkpoint:?}"); + Ok(()) } } -#[derive(Default)] +#[derive(Default, Debug)] struct Batch { blocks: BTreeMap>>, + blob_sidecars: HashMap>>, } impl Batch

{ - fn push(&mut self, block: Arc>) { + fn push_blob_sidecar(&mut self, blob_sidecar: Arc>) { + self.blob_sidecars + .insert(blob_sidecar.as_ref().into(), blob_sidecar); + } + + fn push_block(&mut self, block: Arc>) { self.blocks.insert(block.message().slot(), block); } + pub fn valid_blob_sidecars_for( + &self, + config: &Config, + controller: &RealController

, + block: &Arc>, + parent: &Arc>, + ) -> Result>>> { + let block = block.message(); + + let Some(body) = block.body().post_deneb() else { + return Ok(vec![]); + }; + + let head_state = controller.head_state().value; + let block_root = block.hash_tree_root(); + let slot = block.slot(); + let head_slot = head_state.slot(); + + if slot < misc::blob_serve_range_slot::

(config, head_slot) { + return Ok(vec![]); + } + + body.blob_kzg_commitments() + .into_iter() + .zip(0..) + .map(|(_block_commitment, index)| { + let Some(blob_sidecar) = self + .blob_sidecars + .get(&BlobIdentifier { block_root, index }) + else { + bail!(Error::BlobMissing::

{ + block_root, + slot, + index, + }) + }; + + let action = controller.validate_blob_sidecar_with_state( + blob_sidecar.clone_arc(), + true, + &BlobSidecarOrigin::BackSync, + || Some((parent.clone_arc(), PayloadStatus::Optimistic)), + || Ok(head_state.clone_arc()), + )?; + + if !action.accepted() { + bail!(Error::BlobNotAccepted::

{ + action, + block_root, + slot, + index + }) + } + + Ok(blob_sidecar.clone_arc()) + }) + .collect() + } + fn verify_from_checkpoint( - &mut self, + &self, + config: &Config, + controller: &RealController

, mut checkpoint: SyncCheckpoint, ) -> Result<( SyncCheckpoint, impl Iterator>>, + impl Iterator>>, )> { - debug!("verify back sync batch from: {checkpoint:?}"); + debug!("verify back-sync batch from: {checkpoint:?}"); let mut next_parent_root = checkpoint.parent_root; - - for block in self.blocks.values().rev() { + let mut verified_blob_sidecars = vec![]; + let mut verified_blocks = vec![]; + let head_state = controller.head_state().value(); + + let mut blocks = self + .blocks + .values() + .rev() + .skip_while(|block| block.message().slot() >= checkpoint.slot) + .peekable(); + + while let Some(block) = blocks.next() { let message = block.message(); - let expected = next_parent_root; let actual = message.hash_tree_root(); - ensure!( - actual == expected, - Error::BlockRootMismatch { - slot: message.slot(), - expected, - actual, - }, - ); + if block.message().slot() == GENESIS_SLOT { + // if it's a genesis block, return it as is. + // It will be validated against our own genesis_block during + // final checkpoint vaildation + verified_blocks.push(block.clone_arc()); + } else if let Some(parent) = blocks.peek() { + debug!("back-sync batch block: {} {:?}", message.slot(), actual); + + ensure!( + actual == next_parent_root, + Error::BlockRootMismatch::

{ + actual, + expected: next_parent_root, + slot: message.slot(), + }, + ); + + let mut blobs = self.valid_blob_sidecars_for(config, controller, block, parent)?; + + verified_blob_sidecars.append(&mut blobs); + + transition_functions::combined::verify_base_signature_with_head_state( + config, + &head_state, + block, + )?; - next_parent_root = message.parent_root(); + verified_blocks.push(block.clone_arc()); + + next_parent_root = message.parent_root(); + } } - if let Some((_, earliest_block)) = self.blocks.first_key_value() { + if let Some(earliest_block) = verified_blocks.last() { checkpoint = earliest_block.as_ref().into(); } debug!("next batch checkpoint: {checkpoint:?}"); - let blocks = core::mem::take(&mut self.blocks).into_values(); - - Ok((checkpoint, blocks)) + Ok(( + checkpoint, + verified_blocks.into_iter(), + verified_blob_sidecars.into_iter(), + )) } } @@ -216,18 +361,14 @@ impl Batch

{ #[ssz(derive_hash = false)] #[cfg_attr(test, derive(PartialEq, Eq))] pub struct Data { - current: SyncCheckpoint, - high: SyncCheckpoint, - low: SyncCheckpoint, + pub current: SyncCheckpoint, + pub high: SyncCheckpoint, + pub low: SyncCheckpoint, } impl Data { - pub const fn new(current: SyncCheckpoint, high: SyncCheckpoint, low: SyncCheckpoint) -> Self { - Self { current, high, low } - } - - fn is_finished(&self) -> bool { - self.current == self.low + const fn is_finished(&self) -> bool { + self.current.slot <= self.low.slot } fn save(&self, database: &Database) -> Result<()> { @@ -240,7 +381,7 @@ impl Data { fn find(database: &Database) -> Result> { database - .next(BackSyncDataBySlot(GENESIS_SLOT).to_string())? + .prev(BackSyncDataBySlot(Slot::MAX).to_string())? .filter(|(key_bytes, _)| key_bytes.starts_with(BackSyncDataBySlot::PREFIX.as_bytes())) .map(|(_, value_bytes)| Self::from_ssz_default(value_bytes)) .transpose() @@ -255,9 +396,9 @@ impl Data { #[derive(Clone, Copy, PartialEq, Eq, Debug, Ssz)] #[cfg_attr(test, derive(Default))] pub struct SyncCheckpoint { - slot: Slot, - block_root: H256, - parent_root: H256, + pub slot: Slot, + pub block_root: H256, + pub parent_root: H256, } impl From<&SignedBeaconBlock

> for SyncCheckpoint { @@ -272,26 +413,40 @@ impl From<&SignedBeaconBlock

> for SyncCheckpoint { } } +#[expect(clippy::module_name_repetitions)] #[derive(Display)] #[display("{}{_0:020}", Self::PREFIX)] -struct BackSyncDataBySlot(Slot); +pub struct BackSyncDataBySlot(pub Slot); -impl BackSyncDataBySlot { +impl PrefixableKey for BackSyncDataBySlot { const PREFIX: &'static str = "b"; } #[derive(Debug, Error)] -pub enum Error { +pub enum Error { + #[error("blob {index} for block {block_root:?} in slot {slot} not accepted: {action:?}")] + BlobNotAccepted { + action: BlobSidecarAction

, + block_root: H256, + slot: Slot, + index: BlobIndex, + }, + #[error("missing blob {index} for block {block_root:?} in slot {slot}")] + BlobMissing { + block_root: H256, + slot: Slot, + index: BlobIndex, + }, #[error( "invalid block batch: block root mismatch \ (slot: {slot}, expected: {expected:?}, actual: {actual:?})" )] BlockRootMismatch { - slot: Slot, - expected: H256, actual: H256, + expected: H256, + slot: Slot, }, - #[error("final back sync checkpoint mismatch (expected: {expected:?}, actual: {actual:?})")] + #[error("final back-sync checkpoint mismatch (expected: {expected:?}, actual: {actual:?})")] FinalCheckpointMismatch { expected: SyncCheckpoint, actual: SyncCheckpoint, @@ -317,7 +472,7 @@ mod tests { let selected = Data::find(&database)?; - assert_eq!(selected, Some(build_sync_data(140, 0))); + assert_eq!(selected, Some(build_sync_data(200, 160))); Ok(()) } @@ -333,7 +488,7 @@ mod tests { assert_eq!(selected, Some(build_sync_data(120, 0))); selected - .expect("back sync data is saved earlier in the test") + .expect("back-sync data is saved earlier in the test") .remove(&database)?; assert_eq!(Data::find(&database)?, None); diff --git a/p2p/src/block_sync_service.rs b/p2p/src/block_sync_service.rs index a735d250..4df5082f 100644 --- a/p2p/src/block_sync_service.rs +++ b/p2p/src/block_sync_service.rs @@ -1,11 +1,16 @@ -use core::{convert::Infallible as Never, fmt::Debug, time::Duration}; -use std::sync::Arc; +use core::{ + convert::Infallible as Never, + fmt::Debug, + sync::atomic::{AtomicBool, Ordering}, + time::Duration, +}; +use std::{collections::HashMap, sync::Arc}; use anyhow::Result; use database::Database; use eth1_api::RealController; use eth2_libp2p::{PeerAction, PeerId, ReportSource}; -use fork_choice_control::SyncMessage; +use fork_choice_control::{PrefixableKey as _, StorageMode, SyncMessage}; use futures::{ channel::mpsc::{UnboundedReceiver, UnboundedSender}, future::Either, @@ -13,23 +18,27 @@ use futures::{ }; use genesis::AnchorCheckpointProvider; use helper_functions::misc; -use log::{debug, error, info}; +use log::{debug, info, warn}; use prometheus_metrics::Metrics; -use ssz::{SszReadDefault, SszWrite as _}; +use ssz::SszReadDefault; use std_ext::ArcExt as _; use thiserror::Error; use tokio::select; use tokio_stream::wrappers::IntervalStream; use types::{ + config::Config, deneb::containers::BlobIdentifier, - phase0::primitives::{Slot, H256}, + phase0::primitives::{Epoch, Slot, H256}, preset::Preset, + traits::SignedBeaconBlock as _, }; use crate::{ - back_sync::{BackSync, Data as BackSyncData, Error as BackSyncError, SyncCheckpoint}, + back_sync::{ + BackSync, BackSyncDataBySlot, Data as BackSyncData, Error as BackSyncError, SyncCheckpoint, + }, messages::{ArchiverToSync, P2pToSync, SyncToApi, SyncToMetrics, SyncToP2p}, - misc::{PeerReportReason, RequestId}, + misc::{PeerReportReason, RPCRequestType, RequestId}, sync_manager::{SyncBatch, SyncManager, SyncTarget}, }; @@ -55,6 +64,7 @@ pub struct Channels { } pub struct BlockSyncService { + config: Arc, database: Option, sync_direction: SyncDirection, back_sync: Option>, @@ -66,6 +76,9 @@ pub struct BlockSyncService { slot: Slot, is_back_synced: bool, is_forward_synced: bool, + is_exiting: Arc, + received_blob_sidecars: HashMap, + received_block_roots: HashMap, fork_choice_to_sync_rx: Option>>, p2p_to_sync_rx: UnboundedReceiver>, sync_to_p2p_tx: UnboundedSender, @@ -75,8 +88,16 @@ pub struct BlockSyncService { archiver_to_sync_rx: Option>, } +impl Drop for BlockSyncService

{ + fn drop(&mut self) { + self.is_exiting.store(true, Ordering::Relaxed) + } +} + impl BlockSyncService

{ + #[expect(clippy::too_many_arguments)] pub fn new( + config: Arc, db: Database, anchor_checkpoint_provider: AnchorCheckpointProvider

, controller: RealController

, @@ -84,6 +105,8 @@ impl BlockSyncService

{ channels: Channels

, back_sync_enabled: bool, loaded_from_remote: bool, + storage_mode: StorageMode, + target_peers: usize, ) -> Result { let database; let back_sync; @@ -94,27 +117,48 @@ impl BlockSyncService

{ if loaded_from_remote { let anchor_checkpoint = controller.anchor_block().as_ref().into(); - // Checkpoint sync happened, so we need to back sync to - // previously stored latest finalized checkpoint or genesis. - let back_sync_checkpoint = get_latest_finalized_back_sync_checkpoint(&db)? - .unwrap_or_else(|| { - anchor_checkpoint_provider - .checkpoint() - .value - .block - .as_ref() - .into() + let latest_finalized_back_sync_checkpoint = + get_latest_finalized_back_sync_checkpoint(&db)?; + + // Checkpoint sync completed. Now we need to back-sync to one of the following: + // - The previously stored latest finalized checkpoint (if the node was offline) + // - Genesis, if the storage mode is set to Archive + // - Config::min_epochs_for_block_request back, if the storage mode is Standard + let back_sync_terminus = + latest_finalized_back_sync_checkpoint.unwrap_or_else(|| { + if storage_mode.is_archive() { + anchor_checkpoint_provider + .checkpoint() + .value + .block + .as_ref() + .into() + } else { + let terminus_epoch = controller.min_checked_block_availability_epoch(); + + SyncCheckpoint { + slot: misc::compute_start_slot_at_epoch::

(terminus_epoch), + // Options don't go along well with our SSZ implementation + // And also for compatibility reasons + block_root: H256::zero(), + parent_root: H256::zero(), + } + } }); - let back_sync_process = BackSync::

::new(BackSyncData::new( - anchor_checkpoint, - anchor_checkpoint, - back_sync_checkpoint, - )); + let back_sync_process = BackSync::

::new(BackSyncData { + current: anchor_checkpoint, + high: anchor_checkpoint, + low: back_sync_terminus, + }); if !back_sync_process.is_finished() { back_sync_process.save(&db)?; } + + if latest_finalized_back_sync_checkpoint.is_none() { + save_latest_finalized_back_sync_checkpoint(&db, anchor_checkpoint)?; + } } back_sync = BackSync::load(&db)?; @@ -131,8 +175,6 @@ impl BlockSyncService

{ archiver_to_sync_rx = None; }; - let slot = controller.slot(); - let Channels { fork_choice_to_sync_rx, p2p_to_sync_rx, @@ -141,18 +183,20 @@ impl BlockSyncService

{ sync_to_metrics_tx, } = channels; - // `is_back_synced` is set correctly only when back sync is enabled. Otherwise it is set + // `is_back_synced` is set correctly only when back-sync is enabled. Otherwise it is set // to `true` and users can attempt to query historical data even after checkpoint sync. let is_back_synced = back_sync.is_none(); let is_forward_synced = controller.is_forward_synced(); + let slot = controller.slot(); let mut service = Self { + config, database, sync_direction: SyncDirection::Forward, back_sync, anchor_checkpoint_provider, controller, - sync_manager: SyncManager::default(), + sync_manager: SyncManager::new(target_peers), metrics, next_request_id: 0, slot, @@ -160,6 +204,9 @@ impl BlockSyncService

{ // Initialize `is_forward_synced` to `false`. This is needed to make // `BlockSyncService::set_forward_synced` subscribe to core topics on startup. is_forward_synced: false, + is_exiting: Arc::new(AtomicBool::new(false)), + received_blob_sidecars: HashMap::new(), + received_block_roots: HashMap::new(), fork_choice_to_sync_rx, p2p_to_sync_rx, sync_to_p2p_tx, @@ -191,16 +238,17 @@ impl BlockSyncService

{ None => Either::Right(futures::future::pending()), }, if self.archiver_to_sync_rx.is_some() => match message { ArchiverToSync::BackSyncStatesArchived => { - debug!("received back sync states archived message"); + debug!("received back-sync states archived message"); if let Some(back_sync) = self.back_sync.as_mut() { if let Some(database) = self.database.as_ref() { - back_sync.finish(database)?; + back_sync.remove(database)?; - debug!("finishing back sync: {:?}", back_sync.data()); + debug!("finishing back-sync: {:?}", back_sync.data()); if let Some(sync) = BackSync::load(database)? { self.back_sync = Some(sync); + self.try_to_spawn_back_sync_states_archiver()?; self.request_blobs_and_blocks_if_ready()?; } else { self.set_back_synced(true); @@ -218,7 +266,7 @@ impl BlockSyncService

{ if let Some(database) = &self.database { let checkpoint = block.as_ref().into(); - debug!("saving latest finalized back sync checkpoint: {checkpoint:?}"); + debug!("saving latest finalized back-sync checkpoint: {checkpoint:?}"); save_latest_finalized_back_sync_checkpoint(database, checkpoint)?; } @@ -229,6 +277,7 @@ impl BlockSyncService

{ match message { P2pToSync::Slot(slot) => { self.slot = slot; + self.track_collection_metrics(); if let Some(metrics) = self.metrics.as_ref() { self.sync_manager.track_collection_metrics(metrics); @@ -243,7 +292,7 @@ impl BlockSyncService

{ self.retry_sync_batches(batches_to_retry)?; } P2pToSync::RequestFailed(peer_id) => { - if !self.is_forward_synced { + if !self.is_forward_synced || !self.controller.is_back_synced() { let batches_to_retry = self.sync_manager.remove_peer(&peer_id); self.retry_sync_batches(batches_to_retry)?; } @@ -257,17 +306,95 @@ impl BlockSyncService

{ P2pToSync::BlockNeeded(block_root, peer_id) => { self.request_needed_block(block_root, peer_id)?; } - P2pToSync::RequestedBlobSidecar(blob_sidecar, block_seen, peer_id) => { - self.controller.on_requested_blob_sidecar(blob_sidecar, block_seen, peer_id); + P2pToSync::GossipBlobSidecar(blob_sidecar, subnet_id, gossip_id) => { + let blob_identifier: BlobIdentifier = blob_sidecar.as_ref().into(); + let block_seen = self + .received_block_roots + .contains_key(&blob_identifier.block_root); + + self.controller.on_gossip_blob_sidecar( + blob_sidecar, + subnet_id, + gossip_id, + block_seen, + ); } - P2pToSync::RequestedBlock((block, peer_id, request_id)) => { - match self - .sync_manager - .request_direction(request_id) - .unwrap_or(self.sync_direction) - { + P2pToSync::RequestedBlobSidecar(blob_sidecar, peer_id, request_id, request_type) => { + let blob_identifier = blob_sidecar.as_ref().into(); + + self.sync_manager.record_received_blob_sidecar_response(blob_identifier, peer_id, request_id); + + // Back sync does not issue BlobSidecarsByRoot requests + let request_direction = match request_type { + RPCRequestType::Root => SyncDirection::Forward, + RPCRequestType::Range => self + .sync_manager + .request_direction(request_id) + .unwrap_or(self.sync_direction), + }; + + match request_direction { SyncDirection::Forward => { - self.controller.on_requested_block(block, Some(peer_id)); + let blob_sidecar_slot = blob_sidecar.signed_block_header.message.slot; + + if self.register_new_received_blob_sidecar(blob_identifier, blob_sidecar_slot) { + let block_seen = self + .received_block_roots + .contains_key(&blob_identifier.block_root); + + self.controller.on_requested_blob_sidecar(blob_sidecar, block_seen, peer_id); + } + } + SyncDirection::Back => { + if let Some(back_sync) = self.back_sync.as_mut() { + back_sync.push_blob_sidecar(blob_sidecar); + } + } + } + } + P2pToSync::GossipBlock(beacon_block, peer_id, gossip_id) => { + let block_root = beacon_block.message().hash_tree_root(); + let block_slot = beacon_block.message().slot(); + + if self.register_new_received_block(block_root, block_slot) { + let block_slot_timestamp = misc::compute_timestamp_at_slot( + self.controller.chain_config(), + &self.controller.head_state().value(), + block_slot, + ); + + if let Some(metrics) = self.metrics.as_ref() { + metrics.observe_block_duration_to_slot(block_slot_timestamp); + } + + info!( + "received beacon block as gossip (slot: {block_slot}, root: {block_root:?}, \ + peer_id: {peer_id})" + ); + + self.controller + .on_gossip_block(beacon_block, gossip_id); + } + } + P2pToSync::RequestedBlock(block, peer_id, request_id, request_type) => { + let block_root = block.message().hash_tree_root(); + + self.sync_manager.record_received_block_response(block_root, peer_id, request_id); + + // Back sync does not issue BeaconBlocksByRoot requests + let request_direction = match request_type { + RPCRequestType::Root => SyncDirection::Forward, + RPCRequestType::Range => self + .sync_manager + .request_direction(request_id) + .unwrap_or(self.sync_direction), + }; + + match request_direction { + SyncDirection::Forward => { + if self.register_new_received_block(block_root, block.message().slot()) { + self.controller.on_requested_block(block, Some(peer_id)); + } } SyncDirection::Back => { if let Some(back_sync) = self.back_sync.as_mut() { @@ -277,51 +404,35 @@ impl BlockSyncService

{ } } P2pToSync::BlobsByRangeRequestFinished(request_id) => { - self.sync_manager.blobs_by_range_request_finished(request_id); - self.request_blobs_and_blocks_if_ready()?; - } - P2pToSync::BlobsByRootChunkReceived(identifier, peer_id, request_id) => { - self.sync_manager.received_blob_sidecar_chunk(identifier, peer_id, request_id); + let request_direction = self.sync_manager.request_direction(request_id); + + self.sync_manager.blobs_by_range_request_finished(request_id, request_direction); + + if request_direction == Some(SyncDirection::Back) { + self.check_back_sync_progress()?; + } + self.request_blobs_and_blocks_if_ready()?; } - P2pToSync::BlocksByRangeRequestFinished(request_id) => { + P2pToSync::BlocksByRangeRequestFinished(peer_id, request_id) => { let request_direction = self.sync_manager.request_direction(request_id); - self.sync_manager.blocks_by_range_request_finished(request_id); + self.sync_manager.blocks_by_range_request_finished( + &self.controller, + peer_id, + request_id, + request_direction, + ); if request_direction == Some(SyncDirection::Back) { - // aka batch finished - if self.sync_manager.ready_to_request_by_range() { - if let Some(back_sync) = self.back_sync.as_mut() { - if let Some(database) = self.database.as_ref() { - if let Err(error) = back_sync.verify_blocks( - database, - &self.controller, - ) { - error!( - "error while verifying back sync blocks: \ - {error:?}", - ); - - if let Some( - BackSyncError::FinalCheckpointMismatch { .. } - ) = error.downcast_ref() { - back_sync.finish(database)?; - self.back_sync = BackSync::load(database)?; - } - } - } - - self.try_to_spawn_back_sync_states_archiver()?; - } - } + self.check_back_sync_progress()?; } self.request_blobs_and_blocks_if_ready()?; } - P2pToSync::BlockByRootRequestFinished(block_root) => { - self.sync_manager.block_by_root_request_finished(block_root); - self.request_blobs_and_blocks_if_ready()?; + P2pToSync::FinalizedCheckpoint(finalized_checkpoint) => { + self.prune_received_blob_sidecars(finalized_checkpoint.epoch); + self.prune_received_block_roots(finalized_checkpoint.epoch); } } } @@ -329,12 +440,46 @@ impl BlockSyncService

{ } } + pub fn check_back_sync_progress(&mut self) -> Result<()> { + self.request_expired_blob_range_requests()?; + self.request_expired_block_range_requests()?; + + // Check if batch has finished + if !self.sync_manager.ready_to_request_by_range() { + return Ok(()); + } + + let Some(back_sync) = self.back_sync.as_mut() else { + return Ok(()); + }; + + let Some(database) = self.database.as_ref() else { + return Ok(()); + }; + + if let Err(error) = back_sync.verify_blocks(&self.config, database, &self.controller) { + warn!("error occurred while verifying back-sync blocks: {error:?}"); + + if let Some(BackSyncError::FinalCheckpointMismatch::

{ .. }) = error.downcast_ref() { + back_sync.remove(database)?; + self.back_sync = BackSync::load(database)?; + } + } + + if let Some(back_sync) = self.back_sync.as_mut() { + back_sync.reset_batch(); + } + + self.try_to_spawn_back_sync_states_archiver() + } + pub fn try_to_spawn_back_sync_states_archiver(&mut self) -> Result<()> { if let Some(back_sync) = self.back_sync.as_mut() { if let Some(archiver_to_sync_tx) = self.archiver_to_sync_tx.as_ref() { back_sync.try_to_spawn_state_archiver( self.controller.clone_arc(), self.anchor_checkpoint_provider.clone(), + self.is_exiting.clone_arc(), archiver_to_sync_tx.clone(), )?; } @@ -348,6 +493,7 @@ impl BlockSyncService

{ let request_id = self.request_id()?; let SyncBatch { target, + direction, peer_id, start_slot, count, @@ -362,7 +508,9 @@ impl BlockSyncService

{ ) .send(&self.sync_to_p2p_tx); - let peer = self.sync_manager.retry_batch(request_id, &batch); + let peer = + self.sync_manager + .retry_batch(request_id, batch, direction == SyncDirection::Back); if let Some(peer_id) = peer { match target { @@ -434,18 +582,23 @@ impl BlockSyncService

{ local_finalized_slot, )? } - SyncDirection::Back => self - .back_sync - .as_ref() - .filter(|back_sync| !back_sync.is_finished()) - .map(|back_sync| { - let current_slot = back_sync.current_slot(); - let low_slot = back_sync.low_slot(); - - self.sync_manager - .build_back_sync_batches::

(current_slot, low_slot) - }) - .unwrap_or_default(), + SyncDirection::Back => { + let blob_serve_range_slot = + misc::blob_serve_range_slot::

(self.controller.chain_config(), self.slot); + + self.back_sync + .as_ref() + .filter(|back_sync| !back_sync.is_finished()) + .map(|back_sync| { + self.sync_manager.build_back_sync_batches::

( + blob_serve_range_slot, + back_sync.current_slot(), + // download one extra block for parent validation + back_sync.low_slot_with_parent(), + ) + }) + .unwrap_or_default() + } }; self.request_batches(batches) @@ -501,9 +654,22 @@ impl BlockSyncService

{ return Ok(()); } + let identifiers = identifiers + .into_iter() + .filter(|blob_identifier| !self.received_blob_sidecars.contains_key(blob_identifier)) + .collect::>(); + + if identifiers.is_empty() { + debug!( + "cannot request BlobSidecarsByRoot: all requested blob sidecars have been received", + ); + + return Ok(()); + } + let request_id = self.request_id()?; - let Some(peer_id) = peer_id.or_else(|| self.sync_manager.random_peer()) else { + let Some(peer_id) = peer_id.or_else(|| self.sync_manager.random_peer(false)) else { return Ok(()); }; @@ -530,9 +696,18 @@ impl BlockSyncService

{ return Ok(()); } + if self.received_block_roots.contains_key(&block_root) { + debug!( + "cannot request BeaconBlocksByRoot: requested block has been received:\ + {block_root:?}" + ); + + return Ok(()); + } + let request_id = self.request_id()?; - let Some(peer_id) = peer_id.or_else(|| self.sync_manager.random_peer()) else { + let Some(peer_id) = peer_id.or_else(|| self.sync_manager.random_peer(false)) else { return Ok(()); }; @@ -559,19 +734,19 @@ impl BlockSyncService

{ } fn set_back_synced(&mut self, is_back_synced: bool) { - debug!("set back synced: {is_back_synced}"); + debug!("set back-synced: {is_back_synced}"); let was_back_synced = self.is_back_synced; self.is_back_synced = is_back_synced; if was_back_synced != is_back_synced && is_back_synced { - info!("back sync completed"); + info!("back-sync completed"); self.sync_manager.cache_clear(); self.sync_direction = SyncDirection::Forward; } - SyncToApi::BackSyncStatus(is_back_synced).send(&self.sync_to_api_tx); + self.controller.on_back_sync_status(is_back_synced); } fn set_forward_synced(&mut self, is_forward_synced: bool) -> Result<()> { @@ -581,7 +756,7 @@ impl BlockSyncService

{ self.is_forward_synced = is_forward_synced; if was_forward_synced && !is_forward_synced { - // Stop back sync and sync forward. + // Stop back-sync and sync forward. if self.sync_direction == SyncDirection::Back { self.sync_direction = SyncDirection::Forward; self.sync_manager.cache_clear(); @@ -593,7 +768,8 @@ impl BlockSyncService

{ SyncToP2p::SubscribeToCoreTopics.send(&self.sync_to_p2p_tx); if self.back_sync.is_some() { - SyncToP2p::PruneReceivedBlocks.send(&self.sync_to_p2p_tx); + self.received_block_roots = HashMap::new(); + self.received_blob_sidecars = HashMap::new(); self.sync_direction = SyncDirection::Back; self.sync_manager.cache_clear(); self.request_blobs_and_blocks_if_ready()?; @@ -610,26 +786,92 @@ impl BlockSyncService

{ Ok(()) } -} -fn get(database: &Database, key: impl AsRef<[u8]>) -> Result> { - database - .get(key)? - .map(V::from_ssz_default) - .transpose() - .map_err(Into::into) + fn prune_received_blob_sidecars(&mut self, epoch: Epoch) { + let start_of_epoch = misc::compute_start_slot_at_epoch::

(epoch); + + self.received_blob_sidecars + .retain(|_, slot| *slot >= start_of_epoch); + } + + fn prune_received_block_roots(&mut self, epoch: Epoch) { + let start_of_epoch = misc::compute_start_slot_at_epoch::

(epoch); + + self.received_block_roots + .retain(|_, slot| *slot >= start_of_epoch); + } + + fn register_new_received_block(&mut self, block_root: H256, slot: Slot) -> bool { + self.received_block_roots.insert(block_root, slot).is_none() + } + + fn register_new_received_blob_sidecar( + &mut self, + blob_identifier: BlobIdentifier, + slot: Slot, + ) -> bool { + self.received_blob_sidecars + .insert(blob_identifier, slot) + .is_none() + } + + fn track_collection_metrics(&self) { + if let Some(metrics) = self.metrics.as_ref() { + let type_name = tynm::type_name::(); + + metrics.set_collection_length( + module_path!(), + &type_name, + "received_blob_sidecars", + self.received_blob_sidecars.len(), + ); + + metrics.set_collection_length( + module_path!(), + &type_name, + "received_block_roots", + self.received_block_roots.len(), + ); + } + } } fn get_latest_finalized_back_sync_checkpoint( database: &Database, ) -> Result> { - get(database, LATEST_FINALIZED_BACK_SYNC_CHECKPOINT_KEY) + fork_choice_control::get(database, LATEST_FINALIZED_BACK_SYNC_CHECKPOINT_KEY) } fn save_latest_finalized_back_sync_checkpoint( database: &Database, checkpoint: SyncCheckpoint, ) -> Result<()> { - let bytes = checkpoint.to_ssz()?; - database.put(LATEST_FINALIZED_BACK_SYNC_CHECKPOINT_KEY, bytes) + fork_choice_control::save( + database, + LATEST_FINALIZED_BACK_SYNC_CHECKPOINT_KEY, + checkpoint, + ) +} + +pub fn print_sync_database_info(database: &Database) -> Result<()> { + info!( + "latest finalized back-sync checkpoint: {:#?}", + get_latest_finalized_back_sync_checkpoint(database)?, + ); + + let results = database.iterator_descending(..=BackSyncDataBySlot(Slot::MAX).to_string())?; + + for result in results { + let (key_bytes, value_bytes) = result?; + + if !BackSyncDataBySlot::has_prefix(&key_bytes) { + break; + } + + let back_sync = BackSyncData::from_ssz_default(value_bytes)?; + + info!("{} : {back_sync:#?}", String::from_utf8_lossy(&key_bytes)); + } + + Ok(()) } diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index a5eb514d..4a4d5d01 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -1,7 +1,9 @@ pub use eth2_libp2p::{metrics, Enr, ListenAddr, Multiaddr, NetworkConfig}; pub use crate::{ - block_sync_service::{BlockSyncService, Channels as BlockSyncServiceChannels}, + block_sync_service::{ + print_sync_database_info, BlockSyncService, Channels as BlockSyncServiceChannels, + }, messages::{ ApiToP2p, P2pToSlasher, P2pToValidator, SubnetServiceToP2p, SyncToApi, SyncToMetrics, ToSubnetService, ValidatorToP2p, diff --git a/p2p/src/messages.rs b/p2p/src/messages.rs index 8dfe84f9..54d084e3 100644 --- a/p2p/src/messages.rs +++ b/p2p/src/messages.rs @@ -18,7 +18,7 @@ use types::{ deneb::containers::{BlobIdentifier, BlobSidecar}, nonstandard::Phase, phase0::{ - containers::{ProposerSlashing, SignedVoluntaryExit}, + containers::{Checkpoint, ProposerSlashing, SignedVoluntaryExit}, primitives::{Epoch, ForkDigest, Slot, SubnetId, ValidatorIndex, H256}, }, preset::Preset, @@ -26,8 +26,8 @@ use types::{ use crate::{ misc::{ - AttestationSubnetActions, BeaconCommitteeSubscription, PeerReportReason, RequestId, - SyncCommitteeSubnetAction, SyncCommitteeSubscription, + AttestationSubnetActions, BeaconCommitteeSubscription, PeerReportReason, RPCRequestType, + RequestId, SyncCommitteeSubnetAction, SyncCommitteeSubscription, }, network_api::{NodeIdentity, NodePeer, NodePeerCount, NodePeersQuery}, }; @@ -39,13 +39,14 @@ pub enum P2pToSync { StatusPeer(PeerId), BlobsNeeded(Vec, Slot, Option), BlockNeeded(H256, Option), - RequestedBlobSidecar(Arc>, bool, PeerId), - RequestedBlock((Arc>, PeerId, RequestId)), + RequestedBlobSidecar(Arc>, PeerId, RequestId, RPCRequestType), + RequestedBlock(Arc>, PeerId, RequestId, RPCRequestType), BlobsByRangeRequestFinished(RequestId), - BlobsByRootChunkReceived(BlobIdentifier, PeerId, RequestId), - BlocksByRangeRequestFinished(RequestId), - BlockByRootRequestFinished(H256), + BlocksByRangeRequestFinished(PeerId, RequestId), RequestFailed(PeerId), + FinalizedCheckpoint(Checkpoint), + GossipBlobSidecar(Arc>, SubnetId, GossipId), + GossipBlock(Arc>, PeerId, GossipId), } impl P2pToSync

{ @@ -86,7 +87,6 @@ impl ApiToP2p

{ pub enum SyncToApi { SyncStatus(bool), - BackSyncStatus(bool), } impl SyncToApi { @@ -110,7 +110,6 @@ impl SyncToMetrics { } pub enum SyncToP2p { - PruneReceivedBlocks, ReportPeer(PeerId, PeerAction, ReportSource, PeerReportReason), RequestBlobsByRange(RequestId, PeerId, Slot, u64), RequestBlobsByRoot(RequestId, PeerId, Vec), @@ -209,9 +208,9 @@ pub enum ServiceInboundMessage { impl ServiceInboundMessage

{ pub fn send(self, tx: &UnboundedSender) { - if tx.unbounded_send(self).is_err() { - debug!("send to network service failed because the receiver was dropped"); - } + // panic if network thread is no longer running + tx.unbounded_send(self) + .expect("send to network service failed because the receiver was dropped"); } } diff --git a/p2p/src/misc.rs b/p2p/src/misc.rs index a83e64ec..3fad5159 100644 --- a/p2p/src/misc.rs +++ b/p2p/src/misc.rs @@ -7,6 +7,12 @@ use types::phase0::primitives::{CommitteeIndex, Epoch, Slot, SubnetId, Validator pub type RequestId = usize; +#[derive(PartialEq, Eq)] +pub enum RPCRequestType { + Range, + Root, +} + #[derive(Debug, Serialize)] pub struct AttestationSubnetActions { pub discoveries: Vec, diff --git a/p2p/src/network.rs b/p2p/src/network.rs index 725f31d6..db355388 100644 --- a/p2p/src/network.rs +++ b/p2p/src/network.rs @@ -1,6 +1,6 @@ use core::{cmp::Ordering, convert::Infallible as Never, time::Duration}; use std::{ - collections::{BTreeMap, HashMap, HashSet}, + collections::{BTreeMap, HashSet}, sync::Arc, time::Instant, }; @@ -60,7 +60,10 @@ use crate::{ ApiToP2p, P2pToSlasher, P2pToSync, P2pToValidator, ServiceInboundMessage, ServiceOutboundMessage, SubnetServiceToP2p, SyncToP2p, ValidatorToP2p, }, - misc::{AttestationSubnetActions, RequestId, SubnetPeerDiscovery, SyncCommitteeSubnetAction}, + misc::{ + AttestationSubnetActions, RPCRequestType, RequestId, SubnetPeerDiscovery, + SyncCommitteeSubnetAction, + }, upnp::PortMappings, }; @@ -102,8 +105,6 @@ pub struct Channels { #[expect(clippy::struct_field_names)] pub struct Network { network_globals: Arc, - received_blob_sidecars: HashMap, - received_block_roots: HashMap, controller: RealController

, channels: Channels

, dedicated_executor: Arc, @@ -187,8 +188,6 @@ impl Network

{ let network = Self { network_globals, - received_blob_sidecars: HashMap::new(), - received_block_roots: HashMap::new(), controller, channels, dedicated_executor, @@ -304,7 +303,6 @@ impl Network

{ match message { P2pMessage::Slot(slot) => { self.on_slot(slot); - self.track_collection_metrics(); } P2pMessage::Accept(gossip_id) => { self.report_outcome(gossip_id, MessageAcceptance::Accept); @@ -349,8 +347,8 @@ impl Network

{ .send(&self.channels.p2p_to_sync_tx); } P2pMessage::FinalizedCheckpoint(finalized_checkpoint) => { - self.prune_received_blob_sidecars(finalized_checkpoint.epoch); - self.prune_received_block_roots(finalized_checkpoint.epoch); + P2pToSync::FinalizedCheckpoint(finalized_checkpoint) + .send(&self.channels.p2p_to_sync_tx); } P2pMessage::HeadState(_state) => { // This message is only used in tests @@ -398,9 +396,6 @@ impl Network

{ message = self.channels.sync_to_p2p_rx.select_next_some() => { match message { - SyncToP2p::PruneReceivedBlocks => { - self.received_block_roots = HashMap::new(); - } SyncToP2p::ReportPeer(peer_id, peer_action, report_source, reason) => { self.report_peer( peer_id, @@ -800,7 +795,7 @@ impl Network

{ } } - fn handle_network_event(&mut self, network_event: NetworkEvent) { + fn handle_network_event(&self, network_event: NetworkEvent) { match network_event { NetworkEvent::PeerConnectedIncoming(peer_id) => { debug!("peer {peer_id} connected incoming"); @@ -1192,7 +1187,7 @@ impl Network

{ } #[expect(clippy::too_many_lines)] - fn handle_response(&mut self, peer_id: PeerId, request_id: RequestId, response: Response

) { + fn handle_response(&self, peer_id: PeerId, request_id: RequestId, response: Response

) { match response { Response::Status(remote) => { debug!("received Status response (peer_id: {peer_id}, remote: {remote:?})"); @@ -1204,7 +1199,7 @@ impl Network

{ // > blob sidecar is well-formatted, has valid inclusion proof, and is correct w.r.t. the expected KZG commitments // > through `verify_blob_kzg_proof``. Response::BlobsByRange(Some(blob_sidecar)) => { - let blob_identifier = blob_sidecar.as_ref().into(); + let blob_identifier: BlobIdentifier = blob_sidecar.as_ref().into(); debug!( "received BlobsByRange response chunk \ @@ -1213,19 +1208,13 @@ impl Network

{ blob_sidecar.slot(), ); - info!( - "received blob sidecar from RPC slot: {}, id: {blob_identifier:?}", - blob_sidecar.slot() - ); - - if self.register_new_received_blob_sidecar(blob_identifier, blob_sidecar.slot()) { - let block_seen = self - .received_block_roots - .contains_key(&blob_identifier.block_root); - - P2pToSync::RequestedBlobSidecar(blob_sidecar, block_seen, peer_id) - .send(&self.channels.p2p_to_sync_tx); - } + P2pToSync::RequestedBlobSidecar( + blob_sidecar, + peer_id, + request_id, + RPCRequestType::Range, + ) + .send(&self.channels.p2p_to_sync_tx); } Response::BlobsByRange(None) => { debug!( @@ -1237,7 +1226,7 @@ impl Network

{ .send(&self.channels.p2p_to_sync_tx); } Response::BlobsByRoot(Some(blob_sidecar)) => { - let blob_identifier = blob_sidecar.as_ref().into(); + let blob_identifier: BlobIdentifier = blob_sidecar.as_ref().into(); debug!( "received BlobsByRoot response chunk \ @@ -1246,22 +1235,13 @@ impl Network

{ blob_sidecar.slot(), ); - info!( - "received blob sidecar from RPC slot: {}, id: {blob_identifier:?}", - blob_sidecar.slot() - ); - - if self.register_new_received_blob_sidecar(blob_identifier, blob_sidecar.slot()) { - let block_seen = self - .received_block_roots - .contains_key(&blob_identifier.block_root); - - P2pToSync::RequestedBlobSidecar(blob_sidecar, block_seen, peer_id) - .send(&self.channels.p2p_to_sync_tx); - } - - P2pToSync::BlobsByRootChunkReceived(blob_identifier, peer_id, request_id) - .send(&self.channels.p2p_to_sync_tx); + P2pToSync::RequestedBlobSidecar( + blob_sidecar, + peer_id, + request_id, + RPCRequestType::Root, + ) + .send(&self.channels.p2p_to_sync_tx); } Response::BlobsByRoot(None) => { debug!( @@ -1279,12 +1259,8 @@ impl Network

{ slot: {block_slot}, root: {block_root:?})", ); - info!("received beacon block from RPC slot: {block_slot}, root: {block_root:?}"); - - if self.register_new_received_block(block_root, block.message().slot()) { - P2pToSync::RequestedBlock((block, peer_id, request_id)) - .send(&self.channels.p2p_to_sync_tx); - } + P2pToSync::RequestedBlock(block, peer_id, request_id, RPCRequestType::Range) + .send(&self.channels.p2p_to_sync_tx); } Response::BlocksByRange(None) => { debug!( @@ -1292,7 +1268,7 @@ impl Network

{ request_id: {request_id}", ); - P2pToSync::BlocksByRangeRequestFinished(request_id) + P2pToSync::BlocksByRangeRequestFinished(peer_id, request_id) .send(&self.channels.p2p_to_sync_tx); } Response::BlocksByRoot(Some(block)) => { @@ -1305,18 +1281,16 @@ impl Network

{ slot: {block_slot}, root: {block_root:?})", ); - info!("received beacon block from RPC slot: {block_slot}, root: {block_root:?}"); - - P2pToSync::BlockByRootRequestFinished(block_root) - .send(&self.channels.p2p_to_sync_tx); - - if self.register_new_received_block(block_root, block.message().slot()) { - self.controller - .on_requested_block(block.clone_arc(), Some(peer_id)); + P2pToSync::RequestedBlock( + block.clone_arc(), + peer_id, + request_id, + RPCRequestType::Root, + ) + .send(&self.channels.p2p_to_sync_tx); - if let Some(network_to_slasher_tx) = &self.channels.network_to_slasher_tx { - P2pToSlasher::Block(block).send(network_to_slasher_tx); - } + if let Some(network_to_slasher_tx) = &self.channels.network_to_slasher_tx { + P2pToSlasher::Block(block).send(network_to_slasher_tx); } } Response::BlocksByRoot(None) => { @@ -1348,7 +1322,7 @@ impl Network

{ #[expect(clippy::cognitive_complexity)] #[expect(clippy::too_many_lines)] fn handle_pubsub_message( - &mut self, + &self, message_id: MessageId, source: PeerId, message: PubsubMessage

, @@ -1359,34 +1333,12 @@ impl Network

{ metrics.register_gossip_object(&["beacon_block"]); } - let block_root = beacon_block.message().hash_tree_root(); - let block_slot = beacon_block.message().slot(); - - if !self.register_new_received_block(block_root, block_slot) { - return; - } - - let block_slot_timestamp = misc::compute_timestamp_at_slot( - self.controller.chain_config(), - &self.controller.head_state().value(), - block_slot, - ); - - if let Some(metrics) = self.metrics.as_ref() { - metrics.observe_block_duration_to_slot(block_slot_timestamp); - } - - info!( - "received beacon block as gossip (slot: {block_slot}, root: {block_root:?}, \ - peer_id: {source})" - ); - if let Some(network_to_slasher_tx) = &self.channels.network_to_slasher_tx { P2pToSlasher::Block(beacon_block.clone_arc()).send(network_to_slasher_tx); } - self.controller - .on_gossip_block(beacon_block, GossipId { source, message_id }); + P2pToSync::GossipBlock(beacon_block, source, GossipId { source, message_id }) + .send(&self.channels.p2p_to_sync_tx); } PubsubMessage::BlobSidecar(data) => { if let Some(metrics) = self.metrics.as_ref() { @@ -1401,16 +1353,12 @@ impl Network

{ from {source}", ); - let block_seen = self - .received_block_roots - .contains_key(&blob_identifier.block_root); - - self.controller.on_gossip_blob_sidecar( + P2pToSync::GossipBlobSidecar( blob_sidecar, subnet_id, GossipId { source, message_id }, - block_seen, - ); + ) + .send(&self.channels.p2p_to_sync_tx); } PubsubMessage::DataColumnSidecar(_) => {} PubsubMessage::AggregateAndProofAttestation(aggregate_and_proof) => { @@ -1710,19 +1658,6 @@ impl Network

{ peer_id: PeerId, blob_identifiers: Vec, ) { - let blob_identifiers = blob_identifiers - .into_iter() - .filter(|blob_identifier| !self.received_blob_sidecars.contains_key(blob_identifier)) - .collect::>(); - - if blob_identifiers.is_empty() { - debug!( - "cannot request BlobSidecarsByRoot: all requested blob sidecars have been received", - ); - - return; - } - let request = BlobsByRootRequest::new(self.controller.chain_config(), blob_identifiers.into_iter()); @@ -1752,10 +1687,6 @@ impl Network

{ } fn request_block_by_root(&self, request_id: RequestId, peer_id: PeerId, block_root: H256) { - if self.received_block_roots.contains_key(&block_root) { - return; - } - let request = BlocksByRootRequest::new( self.controller.chain_config(), self.controller.phase(), @@ -1805,7 +1736,11 @@ impl Network

{ source: ReportSource, reason: impl Into<&'static str>, ) { - ServiceInboundMessage::ReportPeer(peer_id, peer_action, source, reason.into()) + let reason = reason.into(); + + debug!("reporting peer: {peer_id} {peer_action} {source:?} {reason}"); + + ServiceInboundMessage::ReportPeer(peer_id, peer_action, source, reason) .send(&self.network_to_service_tx); } @@ -1842,34 +1777,6 @@ impl Network

{ .map(|digest| GossipTopic::new(subnet.into(), GossipEncoding::default(), digest)) } - fn prune_received_blob_sidecars(&mut self, epoch: Epoch) { - let start_of_epoch = Self::start_of_epoch(epoch); - - self.received_blob_sidecars - .retain(|_, slot| *slot >= start_of_epoch); - } - - fn prune_received_block_roots(&mut self, epoch: Epoch) { - let start_of_epoch = Self::start_of_epoch(epoch); - - self.received_block_roots - .retain(|_, slot| *slot >= start_of_epoch); - } - - fn register_new_received_block(&mut self, block_root: H256, slot: Slot) -> bool { - self.received_block_roots.insert(block_root, slot).is_none() - } - - fn register_new_received_blob_sidecar( - &mut self, - blob_identifier: BlobIdentifier, - slot: Slot, - ) -> bool { - self.received_blob_sidecars - .insert(blob_identifier, slot) - .is_none() - } - fn update_peer_count(&self) { PEER_LOG_METRICS.set_connected_peer_count(self.network_globals.connected_peers()) } @@ -1884,26 +1791,6 @@ impl Network

{ }) } - fn track_collection_metrics(&self) { - if let Some(metrics) = self.metrics.as_ref() { - let type_name = tynm::type_name::(); - - metrics.set_collection_length( - module_path!(), - &type_name, - "received_blob_sidecars", - self.received_blob_sidecars.len(), - ); - - metrics.set_collection_length( - module_path!(), - &type_name, - "received_block_roots", - self.received_block_roots.len(), - ); - } - } - const fn start_of_epoch(epoch: Epoch) -> Slot { misc::compute_start_slot_at_epoch::

(epoch) } diff --git a/p2p/src/range_and_root_requests.rs b/p2p/src/range_and_root_requests.rs index d0ef9b19..a639a337 100644 --- a/p2p/src/range_and_root_requests.rs +++ b/p2p/src/range_and_root_requests.rs @@ -8,14 +8,12 @@ use prometheus_metrics::Metrics; use crate::{block_sync_service::SyncDirection, misc::RequestId, sync_manager::SyncBatch}; -type RequestKey = usize; - const MAX_ROOT_REQUESTS_PER_KEY: usize = 3; const REQUEST_BY_RANGE_TIMEOUT: Duration = Duration::from_secs(15); const REQUEST_BY_ROOT_TIMEOUT_IN_SECONDS: u64 = 5; pub struct RangeAndRootRequests { - requests_by_range: SizedCache, + requests_by_range: SizedCache, requests_by_root: TimedSizedCache>, } @@ -32,6 +30,35 @@ impl Default for RangeAndRootRequests { } impl RangeAndRootRequests { + pub fn busy_peers(&self) -> impl Iterator + '_ { + self.busy_range_peers().chain(self.busy_root_peers()) + } + + pub fn busy_root_peers(&self) -> impl Iterator + '_ { + self.requests_by_root + .value_order() + .flat_map(|(_, peers)| peers) + .copied() + } + + pub fn busy_range_peers(&self) -> impl Iterator + '_ { + self.requests_by_range + .value_order() + .filter(|(_, time)| time.elapsed() < REQUEST_BY_RANGE_TIMEOUT) + .map(|(sync_batch, _)| sync_batch.peer_id) + } + + pub fn record_received_response(&mut self, k: &K, peer_id: &PeerId, request_id: RequestId) { + if let Some((batch, _)) = self.requests_by_range.cache_get_mut(&request_id) { + batch.response_received = true; + return; + } + + self.requests_by_root + .cache_get_mut(k) + .map(|requests| requests.remove(peer_id)); + } + pub fn add_request_by_range(&mut self, request_id: RequestId, batch: SyncBatch) { self.requests_by_range .cache_set(request_id, (batch, Instant::now())); @@ -120,17 +147,14 @@ impl RangeAndRootRequests { .count() } - pub fn request_by_range_finished(&mut self, request_id: RequestId) { - self.requests_by_range.cache_remove(&request_id); - } - - pub fn chunk_by_root_received(&mut self, k: &K, peer_id: &PeerId) { - self.requests_by_root - .cache_get_mut(k) - .map(|requests| requests.remove(peer_id)); + pub fn request_by_range_finished( + &mut self, + request_id: RequestId, + ) -> Option<(SyncBatch, Instant)> { + self.requests_by_range.cache_remove(&request_id) } - pub fn requests_by_range_keys(&self) -> Vec { + pub fn requests_by_range_keys(&self) -> Vec { self.requests_by_range.key_order().copied().collect() } diff --git a/p2p/src/sync_manager.rs b/p2p/src/sync_manager.rs index 5f33d9d7..6bac06d3 100644 --- a/p2p/src/sync_manager.rs +++ b/p2p/src/sync_manager.rs @@ -1,13 +1,24 @@ -use core::{fmt::Display, hash::Hash, ops::Range, time::Duration}; -use std::{collections::HashMap, sync::Arc, time::Instant}; +#![allow( + clippy::multiple_inherent_impl, + reason = "https://github.com/rust-lang/rust-clippy/issues/13040" +)] + +use core::{fmt::Display, hash::Hash, num::NonZeroUsize, ops::Range, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::Instant, +}; use anyhow::Result; use arithmetic::NonZeroExt as _; use cached::{Cached as _, TimedSizedCache}; +use eth1_api::RealController; use eth2_libp2p::{rpc::StatusMessage, PeerId}; use helper_functions::misc; use itertools::Itertools as _; use log::{log, Level}; +use lru::LruCache; use prometheus_metrics::Metrics; use rand::{prelude::SliceRandom, seq::IteratorRandom as _, thread_rng}; use typenum::Unsigned as _; @@ -53,13 +64,15 @@ pub enum SyncTarget { Block, } -#[derive(Debug)] +#[derive(Clone, Copy, Debug)] pub struct SyncBatch { pub target: SyncTarget, pub direction: SyncDirection, pub peer_id: PeerId, pub start_slot: Slot, pub count: u64, + pub retry_count: usize, + pub response_received: bool, } pub struct SyncManager { @@ -72,10 +85,13 @@ pub struct SyncManager { status_updates_cache: TimedSizedCache, not_enough_peers_message_shown_at: Option, sync_from_finalized: bool, + // store peers that don't serve blocks prior to `MIN_EPOCHS_FOR_BLOCK_REQUESTS` + // so that we can filter them when back-syncing + back_sync_black_list: LruCache, } -impl Default for SyncManager { - fn default() -> Self { +impl SyncManager { + pub fn new(target_peers: usize) -> Self { Self { peers: HashMap::new(), blob_requests: RangeAndRootRequests::::default(), @@ -89,13 +105,36 @@ impl Default for SyncManager { ), not_enough_peers_message_shown_at: None, sync_from_finalized: false, + back_sync_black_list: LruCache::new( + NonZeroUsize::new(target_peers).expect("target_peers must be be a nonzero"), + ), } } -} -impl SyncManager { + pub fn record_received_blob_sidecar_response( + &mut self, + blob_identifier: BlobIdentifier, + peer_id: PeerId, + request_id: RequestId, + ) { + self.blob_requests + .record_received_response(&blob_identifier, &peer_id, request_id); + } + + pub fn record_received_block_response( + &mut self, + block_root: H256, + peer_id: PeerId, + request_id: RequestId, + ) { + self.block_requests + .record_received_response(&block_root, &peer_id, request_id); + } + pub fn request_direction(&mut self, request_id: RequestId) -> Option { - self.block_requests.request_direction(request_id) + self.block_requests + .request_direction(request_id) + .or_else(|| self.blob_requests.request_direction(request_id)) } pub fn add_peer(&mut self, peer_id: PeerId, status: StatusMessage) { @@ -107,6 +146,18 @@ impl SyncManager { self.peers.insert(peer_id, status); } + pub fn add_peer_to_back_sync_black_list(&mut self, peer_id: PeerId) { + self.log( + Level::Debug, + format_args!( + "adding peer to a back-sync blacklist: {peer_id}, black-listed peers: {}", + self.back_sync_black_list.len() + ), + ); + + self.back_sync_black_list.put(peer_id, ()); + } + pub fn remove_peer(&mut self, peer_id: &PeerId) -> Vec { self.log( Level::Debug, @@ -114,6 +165,7 @@ impl SyncManager { ); self.peers.remove(peer_id); + self.back_sync_black_list.pop(peer_id); self.block_requests .remove_peer(peer_id) @@ -121,8 +173,13 @@ impl SyncManager { .collect_vec() } - pub fn retry_batch(&mut self, request_id: RequestId, batch: &SyncBatch) -> Option { - let peer = self.random_peer(); + pub fn retry_batch( + &mut self, + request_id: RequestId, + batch: SyncBatch, + use_black_list: bool, + ) -> Option { + let peer = self.random_peer(use_black_list); self.log( Level::Debug, @@ -137,6 +194,8 @@ impl SyncManager { peer_id, start_slot: batch.start_slot, count: batch.count, + retry_count: batch.retry_count + 1, + response_received: false, }; match batch.target { @@ -152,7 +211,7 @@ impl SyncManager { { self.log( Level::Warn, - format_args!("not enough peers to retry batch: {batch:?}"), + format_args!("not enough non-busy peers to retry batch: {batch:?}"), ); self.not_enough_peers_message_shown_at = Some(Instant::now()); } @@ -164,53 +223,97 @@ impl SyncManager { pub fn build_back_sync_batches( &mut self, - state_slot: Slot, + blob_serve_range_slot: Slot, + mut current_back_sync_slot: Slot, low_slot: Slot, ) -> Vec { - let Some(peers_to_sync) = self.find_peers_to_sync() else { + let Some(peers_to_sync) = self.find_peers_to_sync(true) else { return vec![]; }; - let slots_per_request = P::SlotsPerEpoch::non_zero().get() * EPOCHS_PER_REQUEST; - + // Use half of the available peers for back-sync batches. + let max_sync_batches = peers_to_sync.len() / 2; + let mut peers = peers_to_sync.iter(); let mut sync_batches = vec![]; - for (peer_id, index) in Self::peer_sync_batch_assignments(&peers_to_sync).zip(0..) { - let start_slot = state_slot - .saturating_sub(slots_per_request * (index + 1)) - .max(low_slot); - - let end_slot = state_slot.saturating_sub(slots_per_request * index); + while let Some(peer) = peers.next() { + let should_batch_blobs = current_back_sync_slot > blob_serve_range_slot; - let count = if start_slot == low_slot { - end_slot - low_slot + let count = if should_batch_blobs { + P::SlotsPerEpoch::non_zero().get() / 4 } else { - slots_per_request + P::SlotsPerEpoch::non_zero().get() * EPOCHS_PER_REQUEST }; + let start_slot = current_back_sync_slot.saturating_sub(count); + + if should_batch_blobs { + match peers.next() { + Some(next_peer) => { + // test if there is enough space for both blobs and blocks batches + if sync_batches.len() + 2 > max_sync_batches { + break; + } + + let mut start_slot = start_slot; + let mut count = count; + + if start_slot < blob_serve_range_slot { + count = (start_slot + count) + .checked_sub(blob_serve_range_slot) + .unwrap_or(1); + + start_slot = blob_serve_range_slot; + }; + + let batch = SyncBatch { + target: SyncTarget::BlobSidecar, + direction: SyncDirection::Back, + peer_id: *next_peer, + start_slot, + count, + response_received: false, + retry_count: 0, + }; + + self.log( + Level::Debug, + format_args!("back-sync batch built: {batch:?})"), + ); + + sync_batches.push(batch); + } + None => break, + } + } + let batch = SyncBatch { target: SyncTarget::Block, direction: SyncDirection::Back, - peer_id, + peer_id: *peer, start_slot, count, + response_received: false, + retry_count: 0, }; self.log( Level::Debug, - format_args!("back sync batch built: {batch:?})"), + format_args!("back-sync batch built: {batch:?})"), ); sync_batches.push(batch); - if start_slot == low_slot { + if start_slot <= low_slot || sync_batches.len() >= max_sync_batches { break; } + + current_back_sync_slot = start_slot; } self.log( Level::Debug, - format_args!("new back sync batches count: {}", sync_batches.len(),), + format_args!("new back-sync batches count: {}", sync_batches.len(),), ); sync_batches @@ -224,7 +327,7 @@ impl SyncManager { local_head_slot: Slot, local_finalized_slot: Slot, ) -> Result> { - let Some(peers_to_sync) = self.find_peers_to_sync() else { + let Some(peers_to_sync) = self.find_peers_to_sync(false) else { return Ok(vec![]); }; @@ -337,6 +440,8 @@ impl SyncManager { peer_id, start_slot, count, + response_received: false, + retry_count: 0, }); } @@ -346,6 +451,8 @@ impl SyncManager { peer_id, start_slot, count, + response_received: false, + retry_count: 0, }); } @@ -377,10 +484,11 @@ impl SyncManager { self.log( Level::Debug, format_args!( - "add blob request by range (request_id: {}, peer_id: {}, range: {:?})", + "add blob request by range (request_id: {}, peer_id: {}, range: {:?}, retries: {})", request_id, batch.peer_id, (batch.start_slot..(batch.start_slot + batch.count)), + batch.retry_count, ), ); @@ -406,10 +514,11 @@ impl SyncManager { self.log( Level::Debug, format_args!( - "add block request by range (request_id: {}, peer_id: {}, range: {:?})", + "add block request by range (request_id: {}, peer_id: {}, range: {:?}, retries: {})", request_id, batch.peer_id, (batch.start_slot..(batch.start_slot + batch.count)), + batch.retry_count, ), ); @@ -427,57 +536,81 @@ impl SyncManager { self.block_requests.add_request_by_root(block_root, peer_id) } - pub fn random_peer(&self) -> Option { - let chain_id = self.chain_with_max_peer_count()?; + pub fn random_peer(&self, use_black_list: bool) -> Option { + let chain_id = self.chain_with_max_peer_count(use_black_list)?; - self.peers - .iter() - .filter(|(_, status)| ChainId::from(*status) == chain_id) + let busy_peers = self + .blob_requests + .busy_peers() + .chain(self.block_requests.busy_peers()) + .collect::>(); + + self.peers(use_black_list) + .filter(|(peer_id, status)| { + ChainId::from(*status) == chain_id && !busy_peers.contains(peer_id) + }) .map(|(&peer_id, _)| peer_id) .choose(&mut thread_rng()) } - pub fn blobs_by_range_request_finished(&mut self, request_id: RequestId) { + pub fn blobs_by_range_request_finished( + &mut self, + request_id: RequestId, + request_direction: Option, + ) { self.log( Level::Debug, format_args!("request blob sidecars by range finished (request_id: {request_id})",), ); - self.blob_requests.request_by_range_finished(request_id) + if let Some((sync_batch, _)) = self.blob_requests.request_by_range_finished(request_id) { + self.log( + Level::Debug, + format_args!( + "blob sidecars by range request stats: responses received: {}, count: {}, \ + direction {request_direction:?}, retries: {}", + sync_batch.response_received, sync_batch.count, sync_batch.retry_count, + ), + ); + + if request_direction == Some(SyncDirection::Back) && !sync_batch.response_received { + self.retry_batch(request_id, sync_batch, true); + } + } } - pub fn received_blob_sidecar_chunk( + pub fn blocks_by_range_request_finished( &mut self, - blob_identifier: BlobIdentifier, + controller: &RealController

, peer_id: PeerId, request_id: RequestId, + request_direction: Option, ) { - self.log( - Level::Debug, - format_args!( - "received blob sidecar by root (blob_identifier: {blob_identifier:?}, \ - request_id: {request_id}, peer_id: {peer_id})", - ), - ); - - self.blob_requests - .chunk_by_root_received(&blob_identifier, &peer_id) - } - - pub fn blocks_by_range_request_finished(&mut self, request_id: RequestId) { self.log( Level::Debug, format_args!("request blocks by range finished (request_id: {request_id})"), ); - self.block_requests.request_by_range_finished(request_id) - } + if let Some((sync_batch, _)) = self.block_requests.request_by_range_finished(request_id) { + self.log( + Level::Debug, + format_args!( + "blocks by range request stats: responses received: {}, count: {}, \ + direction {request_direction:?}, retries: {}", + sync_batch.response_received, sync_batch.count, sync_batch.retry_count, + ), + ); - pub fn block_by_root_request_finished(&self, block_root: H256) { - self.log( - Level::Debug, - format_args!("request block by root finished (block_root: {block_root:?})"), - ); + if request_direction == Some(SyncDirection::Back) && !sync_batch.response_received { + if misc::compute_epoch_at_slot::

(sync_batch.start_slot + sync_batch.count) + < controller.min_checked_block_availability_epoch() + { + self.add_peer_to_back_sync_black_list(peer_id); + } + + self.retry_batch(request_id, sync_batch, true); + } + } } /// Log a message with peer count information. @@ -485,15 +618,27 @@ impl SyncManager { log!( level, "[Sync Peers: {}/{}] {}", - self.most_peers(), + self.most_peers(false), self.total_peers(), message ); } - fn find_peers_to_sync(&mut self) -> Option> { - self.find_chain_to_sync().map(|chain_id| { - let peers_to_sync = self.chain_peers_shuffled(&chain_id); + fn find_peers_to_sync(&mut self, use_black_list: bool) -> Option> { + self.find_chain_to_sync(use_black_list).map(|chain_id| { + let peers_to_sync = self.chain_peers_shuffled(&chain_id, use_black_list); + + let busy_peers = self + .blob_requests + .busy_peers() + .chain(self.block_requests.busy_peers()) + .collect::>(); + + let peers_to_sync = peers_to_sync + .iter() + .filter(|peer_id| !busy_peers.contains(peer_id)) + .copied() + .collect::>(); self.log( Level::Debug, @@ -504,8 +649,8 @@ impl SyncManager { }) } - fn find_chain_to_sync(&mut self) -> Option { - match self.chain_with_max_peer_count() { + fn find_chain_to_sync(&mut self, use_black_list: bool) -> Option { + match self.chain_with_max_peer_count(use_black_list) { Some(chain_id) => { self.log( Level::Debug, @@ -532,33 +677,41 @@ impl SyncManager { } } - fn chain_peers(&self, chain_id: &ChainId) -> Vec { - self.peers - .iter() + fn chain_peers(&self, chain_id: &ChainId, use_black_list: bool) -> Vec { + self.peers(use_black_list) .filter(|(_, status)| &ChainId::from(*status) == chain_id) .map(|(&peer_id, _)| peer_id) .collect() } - fn chain_peers_shuffled(&self, chain_id: &ChainId) -> Vec { - let mut peers = self.chain_peers(chain_id); + fn chain_peers_shuffled(&self, chain_id: &ChainId, use_black_list: bool) -> Vec { + let mut peers = self.chain_peers(chain_id, use_black_list); peers.shuffle(&mut thread_rng()); peers } - fn chain_with_max_peer_count(&self) -> Option { - self.chains_with_peer_counts() + fn chain_with_max_peer_count(&self, use_black_list: bool) -> Option { + self.chains_with_peer_counts(use_black_list) .into_iter() .max_by_key(|(_, peer_count)| *peer_count) .map(|(chain_id, _)| chain_id) } - fn chains_with_peer_counts(&self) -> HashMap { - self.peers.iter().counts_by(|(_, status)| status.into()) + fn peers(&self, use_black_list: bool) -> impl Iterator { + self.peers.iter().filter(move |(&peer_id, _)| { + use_black_list + .then(|| !self.back_sync_black_list.contains(&peer_id)) + .unwrap_or(true) + }) + } + + fn chains_with_peer_counts(&self, use_black_list: bool) -> HashMap { + self.peers(use_black_list) + .counts_by(|(_, status)| status.into()) } - fn most_peers(&self) -> usize { - self.chains_with_peer_counts() + fn most_peers(&self, use_black_list: bool) -> usize { + self.chains_with_peer_counts(use_black_list) .values() .max() .copied() @@ -624,6 +777,54 @@ impl SyncManager { } } +#[cfg(test)] +impl SyncManager { + pub fn add_blobs_by_range_busy_peer(&mut self, peer_id: PeerId) { + self.blob_requests.add_request_by_range( + 1, + SyncBatch { + target: SyncTarget::BlobSidecar, + direction: SyncDirection::Back, + peer_id, + start_slot: 0, + count: 64, + retry_count: 0, + response_received: false, + }, + ); + } + + pub fn add_blobs_by_root_busy_peer(&mut self, peer_id: PeerId) { + self.blob_requests.add_request_by_root( + BlobIdentifier { + block_root: H256::zero(), + index: 0, + }, + peer_id, + ); + } + + pub fn add_blocks_by_range_busy_peer(&mut self, peer_id: PeerId) { + self.block_requests.add_request_by_range( + 2, + SyncBatch { + target: SyncTarget::Block, + direction: SyncDirection::Back, + peer_id, + start_slot: 0, + count: 64, + retry_count: 0, + response_received: false, + }, + ); + } + + pub fn add_blocks_by_root_busy_peer(&mut self, peer_id: PeerId) { + self.block_requests + .add_request_by_root(H256::zero(), peer_id); + } +} + #[cfg(test)] mod tests { use test_case::test_case; @@ -634,62 +835,117 @@ mod tests { use super::*; - // `SyncBatch.count` is 16 because the test cases use `Minimal`. + // `SyncBatch.count` is either 2 (blocks & blobs) or 16 (blocks only) because the test cases use `Minimal`. // `Minimal::SlotsPerEpoch::U64` × `EPOCHS_PER_REQUEST` = 8 × 2 = 16. + // `Minimal::SlotsPerEpoch::U64` / 4 = 8 / 4 = 2 #[test_case( - 0, + Slot::MAX, 128, [ - (112, 16), - (96, 16), - (80, 16), - (64, 16), - (48, 16), - (32, 16), + (112, 16, SyncTarget::Block), + (96, 16, SyncTarget::Block), + (80, 16, SyncTarget::Block), + (64, 16, SyncTarget::Block), + (48, 16, SyncTarget::Block), ] )] #[test_case( - 0, + Slot::MAX, 64, [ - (48, 16), - (32, 16), - (16, 16), - (0, 16), + (48, 16, SyncTarget::Block), + (32, 16, SyncTarget::Block), + (16, 16, SyncTarget::Block), + (0, 16, SyncTarget::Block), ] )] #[test_case( - 2, + Slot::MAX, 30, [ - (14, 16), - (2, 12), + (14, 16, SyncTarget::Block), + (0, 16, SyncTarget::Block), + ] + )] + #[test_case( + 0, + 64, + [ + (62, 2, SyncTarget::BlobSidecar), + (62, 2, SyncTarget::Block), + (60, 2, SyncTarget::BlobSidecar), + (60, 2, SyncTarget::Block), + ] + )] + #[test_case( + 62, + 64, + [ + (62, 2, SyncTarget::BlobSidecar), + (62, 2, SyncTarget::Block), + (46, 16, SyncTarget::Block), + (30, 16, SyncTarget::Block), + (14, 16, SyncTarget::Block), + ] + )] + #[test_case( + 59, + 62, + [ + (60, 2, SyncTarget::BlobSidecar), + (60, 2, SyncTarget::Block), + (59, 1, SyncTarget::BlobSidecar), + (58, 2, SyncTarget::Block), + (42, 16, SyncTarget::Block), + ] + )] + #[test_case( + 0, + 3, + [ + (1, 2, SyncTarget::BlobSidecar), + (1, 2, SyncTarget::Block), + (0, 2, SyncTarget::BlobSidecar), + (0, 2, SyncTarget::Block), ] )] fn build_back_sync_batches( - low_slot: Slot, - state_slot: Slot, - resulting_batches: impl IntoIterator, + blob_serve_start_slot: Slot, + head_slot: Slot, + resulting_batches: impl IntoIterator, ) { let peer_status = StatusMessage { fork_digest: H32::default(), finalized_root: H256::default(), - finalized_epoch: 6, + finalized_epoch: 0, head_root: H256::default(), - head_slot: 8 * 32, + head_slot, }; - let mut sync_manager = SyncManager::default(); + let mut sync_manager = SyncManager::new(100); - sync_manager.add_peer(PeerId::random(), peer_status); - sync_manager.add_peer(PeerId::random(), peer_status); + // Add 10 valid peers. + // This will indirectly test that half of them are used for back-syncing (5 batches). + for _ in 0..10 { + sync_manager.add_peer(PeerId::random(), peer_status); + } + + // Add one peer to a blacklist + sync_manager.add_peer_to_back_sync_black_list(PeerId::random()); + + // Have some peers busy + sync_manager.add_blobs_by_range_busy_peer(PeerId::random()); + sync_manager.add_blobs_by_root_busy_peer(PeerId::random()); + sync_manager.add_blocks_by_range_busy_peer(PeerId::random()); + sync_manager.add_blocks_by_root_busy_peer(PeerId::random()); - let batches = sync_manager.build_back_sync_batches::(state_slot, low_slot); + let batches = + sync_manager.build_back_sync_batches::(blob_serve_start_slot, head_slot, 0); itertools::assert_equal( batches .into_iter() - .map(|batch| (batch.start_slot, batch.count)), + .map(|batch| (batch.start_slot, batch.count, batch.target)), resulting_batches, ); } @@ -709,7 +965,7 @@ mod tests { head_slot: 20_000, }; - let mut sync_manager = SyncManager::default(); + let mut sync_manager = SyncManager::new(100); sync_manager.add_peer(PeerId::random(), peer_status); @@ -761,7 +1017,7 @@ mod tests { head_slot: 20_000, }; - let mut sync_manager = SyncManager::default(); + let mut sync_manager = SyncManager::new(100); sync_manager.add_peer(PeerId::random(), peer_status); diff --git a/runtime/src/misc.rs b/runtime/src/misc.rs index c4faf743..b27f1159 100644 --- a/runtime/src/misc.rs +++ b/runtime/src/misc.rs @@ -3,8 +3,9 @@ use std::{path::PathBuf, sync::Arc}; use anyhow::{ensure, Result}; use bytesize::ByteSize; -use database::Database; +use database::{Database, DatabaseMode}; use directories::Directories; +use fork_choice_control::StorageMode; use fs_err::PathExt as _; use log::info; use metrics::{MetricsServerConfig, MetricsServiceConfig}; @@ -24,7 +25,7 @@ pub struct StorageConfig { pub directories: Arc, pub eth1_db_size: ByteSize, pub archival_epoch_interval: NonZeroU64, - pub prune_storage: bool, + pub storage_mode: StorageMode, } impl StorageConfig { @@ -37,14 +38,14 @@ impl StorageConfig { .unwrap_or_default() .join("eth1_cache"), self.eth1_db_size, - false, + DatabaseMode::ReadWrite, ) } pub fn beacon_fork_choice_database( &self, custom_path: Option, - read_only: bool, + mode: DatabaseMode, ) -> Result { let path = custom_path.unwrap_or_else(|| { self.directories @@ -54,27 +55,37 @@ impl StorageConfig { .join("beacon_fork_choice") }); - if read_only { + if mode.is_read_only() { ensure!( path.fs_err_try_exists()?, "beacon_fork_choice database path does not exist: {path:?}", ); } - Database::persistent("beacon_fork_choice", path, self.db_size, read_only) + Database::persistent("beacon_fork_choice", path, self.db_size, mode) } - pub fn sync_database(&self) -> Result { - Database::persistent( - "sync", + pub fn sync_database( + &self, + custom_path: Option, + mode: DatabaseMode, + ) -> Result { + let path = custom_path.unwrap_or_else(|| { self.directories .store_directory .clone() .unwrap_or_default() - .join("sync"), - self.db_size, - false, - ) + .join("sync") + }); + + if mode.is_read_only() { + ensure!( + path.fs_err_try_exists()?, + "sync database path does not exist: {path:?}", + ); + } + + Database::persistent("sync", path, self.db_size, mode) } #[must_use] @@ -85,7 +96,7 @@ impl StorageConfig { directories, eth1_db_size, archival_epoch_interval, - prune_storage, + storage_mode, } = self; let new_db_size = ByteSize::b( @@ -108,7 +119,7 @@ impl StorageConfig { directories, eth1_db_size: new_eth1_db_size, archival_epoch_interval, - prune_storage, + storage_mode, } } @@ -139,7 +150,7 @@ mod tests { directories: Arc::new(Directories::default()), eth1_db_size: ByteSize::gb(2), archival_epoch_interval: nonzero!(1_u64), - prune_storage: true, + storage_mode: StorageMode::Standard, }; let StorageConfig { @@ -160,7 +171,7 @@ mod tests { directories: Arc::new(Directories::default()), eth1_db_size: ByteSize::b(u64::MAX), archival_epoch_interval: nonzero!(1_u64), - prune_storage: true, + storage_mode: StorageMode::Standard, }; assert_eq!(storage_config.db_size, ByteSize::b(u64::MAX)); diff --git a/runtime/src/runtime.rs b/runtime/src/runtime.rs index d240d533..a3a8a7cf 100644 --- a/runtime/src/runtime.rs +++ b/runtime/src/runtime.rs @@ -7,7 +7,7 @@ use block_producer::BlockProducer; use builder_api::{BuilderApi, BuilderConfig}; use bytesize::ByteSize; use clock::Tick; -use database::Database; +use database::{Database, DatabaseMode}; use dedicated_executor::DedicatedExecutor; use doppelganger_protection::DoppelgangerProtection; use eth1::{Eth1Chain, Eth1Config}; @@ -41,7 +41,12 @@ use slasher::{Databases, Slasher, SlasherConfig}; use slashing_protection::SlashingProtector; use std_ext::ArcExt as _; use tokio::select; -use types::{config::Config as ChainConfig, preset::Preset, traits::BeaconState as _}; +use types::{ + config::Config as ChainConfig, + phase0::consts::GENESIS_SLOT, + preset::Preset, + traits::{BeaconState as _, SignedBeaconBlock as _}, +}; use validator::{ run_validator_api, Validator, ValidatorApiConfig, ValidatorChannels, ValidatorConfig, }; @@ -103,7 +108,7 @@ pub async fn run_after_genesis( in_memory, ref directories, archival_epoch_interval, - prune_storage, + storage_mode, .. } = storage_config; @@ -183,20 +188,22 @@ pub async fn run_after_genesis( let storage_database = if in_memory { Database::in_memory() } else { - storage_config.beacon_fork_choice_database(None, false)? + storage_config.beacon_fork_choice_database(None, DatabaseMode::ReadWrite)? }; let storage = Arc::new(Storage::new( chain_config.clone_arc(), storage_database, archival_epoch_interval, - prune_storage, + storage_mode, )); let ((anchor_state, anchor_block, unfinalized_blocks), loaded_from_remote) = storage .load(signer_snapshot.client(), state_load_strategy) .await?; + let is_anchor_genesis = anchor_block.message().slot() == GENESIS_SLOT; + let mut slashing_protector = if in_memory { SlashingProtector::in_memory(slashing_protection_history_limit)? } else { @@ -243,6 +250,7 @@ pub async fn run_after_genesis( fork_choice_to_validator_tx, storage.clone_arc(), unfinalized_blocks, + !back_sync_enabled || is_anchor_genesis, )?; let execution_service = ExecutionService::new( @@ -339,10 +347,11 @@ pub async fn run_after_genesis( let block_sync_database = if in_memory { Database::in_memory() } else { - storage_config.sync_database()? + storage_config.sync_database(None, DatabaseMode::ReadWrite)? }; let mut block_sync_service = BlockSyncService::new( + chain_config.clone_arc(), block_sync_database, anchor_checkpoint_provider.clone(), controller.clone_arc(), @@ -350,6 +359,8 @@ pub async fn run_after_genesis( block_sync_service_channels, back_sync_enabled, loaded_from_remote, + storage_config.storage_mode, + network_config.target_peers, )?; block_sync_service.try_to_spawn_back_sync_states_archiver()?; @@ -386,7 +397,7 @@ pub async fn run_after_genesis( .unwrap_or_default() .join(format!("slasher_attestation_votes_{fork_version:?}_db")), db_size, - false, + DatabaseMode::ReadWrite, )?, attestations_db: Database::persistent( "SLASHER_INDEXED_ATTESTATIONS", @@ -396,7 +407,7 @@ pub async fn run_after_genesis( .unwrap_or_default() .join(format!("slasher_indexed_attestations_{fork_version:?}_db")), db_size, - false, + DatabaseMode::ReadWrite, )?, min_targets_db: Database::persistent( "SLASHER_MIN_TARGETS", @@ -406,7 +417,7 @@ pub async fn run_after_genesis( .unwrap_or_default() .join(format!("slasher_min_targets_{fork_version:?}_db")), db_size, - false, + DatabaseMode::ReadWrite, )?, max_targets_db: Database::persistent( "SLASHER_MAX_TARGETS", @@ -416,7 +427,7 @@ pub async fn run_after_genesis( .unwrap_or_default() .join(format!("slasher_max_targets_{fork_version:?}_db")), db_size, - false, + DatabaseMode::ReadWrite, )?, blocks_db: Database::persistent( "SLASHER_BLOCKS", @@ -426,7 +437,7 @@ pub async fn run_after_genesis( .unwrap_or_default() .join(format!("slasher_blocks_{fork_version:?}_db")), db_size, - false, + DatabaseMode::ReadWrite, )?, } }; diff --git a/transition_functions/src/combined.rs b/transition_functions/src/combined.rs index 73ffe9fa..64e4467a 100644 --- a/transition_functions/src/combined.rs +++ b/transition_functions/src/combined.rs @@ -3,9 +3,11 @@ use derive_more::From; use enum_iterator::Sequence as _; use execution_engine::{ExecutionEngine, NullExecutionEngine}; use helper_functions::{ - accessors, fork, misc, + accessors, + error::SignatureKind, + fork, misc, slot_report::{NullSlotReport, RealSlotReport, SlotReport}, - verifier::{MultiVerifier, NullVerifier, Verifier, VerifierOption}, + verifier::{MultiVerifier, NullVerifier, SingleVerifier, Verifier, VerifierOption}, }; use static_assertions::const_assert_eq; use thiserror::Error; @@ -14,6 +16,7 @@ use types::{ config::Config, nonstandard::{Phase, Toption}, phase0::{ + consts::DOMAIN_BEACON_PROPOSER, containers::DepositData, primitives::{Slot, ValidatorIndex}, }, @@ -193,6 +196,32 @@ pub fn custom_state_transition( } } +pub fn verify_base_signature_with_head_state( + config: &Config, + head_state: &BeaconState

, + block: &SignedBeaconBlock

, +) -> Result<()> { + let phase = config.phase_at_slot::

(block.message().slot()); + let fork_version = config.version(phase); + + // Block signature + let domain = misc::compute_domain( + config, + DOMAIN_BEACON_PROPOSER, + Some(fork_version), + Some(head_state.genesis_validators_root()), + ); + + let signing_root = misc::compute_signing_root(block.message(), domain); + + SingleVerifier.verify_singular( + signing_root, + block.signature(), + accessors::public_key(head_state, block.message().proposer_index())?, + SignatureKind::Block, + ) +} + pub fn verify_signatures( config: &Config, state: &BeaconState

, diff --git a/types/src/nonstandard.rs b/types/src/nonstandard.rs index 19bf26b3..1152d24a 100644 --- a/types/src/nonstandard.rs +++ b/types/src/nonstandard.rs @@ -188,6 +188,17 @@ pub struct BlobSidecarWithId { pub blob_id: BlobIdentifier, } +impl From>> for BlobSidecarWithId

{ + fn from(blob_sidecar: Arc>) -> Self { + let blob_id = blob_sidecar.as_ref().into(); + + Self { + blob_sidecar, + blob_id, + } + } +} + #[derive(Clone, Copy, PartialEq, Eq, Default, Debug)] pub struct BlockRewards { pub total: Gwei,