Skip to content

Commit

Permalink
Add standard storage mode and improve back-sync handling (30):
Browse files Browse the repository at this point in the history
Store modes and back-sync:

- Back-sync blocks to `Config::min_epochs_for_block_requests` in standard
  storage mode
- Back-sync blob sidecars to `Config::min_epochs_for_blob_sidecars_requests`
- Track & filter peers that don't serve blocks prior to
  `Config::min_epochs_for_block_requests` when performing full back-sync
- Remove `Feature::TrustBackSyncBlocks`
- Verify signatures of back-synced blocks
- Move back-sync status to `Store`
- Relocate received_blob_sidecars and received_block_roots caches from `p2p::Network` to `p2p::BlockSyncService`
- Extend `SyncBatch` with `retry_count` and `responses_received` fields
- Use smaller back-sync batches when syncing with blobs
- Don't validate signature of genesis block
- Track state archival progress in database to be able to resume it
  after restart
- Don't request data from busy peers

DB:

- Bump `libmdbx` bindings
- Add db-info command to inspect the Sync database
- Replace read-only boolean flag with more descriptive DatabaseMode enum

Other:

- Panic to trigger app-restart if network thread is down
- Handle exit signal in an archiver thread & batch archiver updates to db
- Rename `RequestType` to `RPCRequestType` as it conflicts with updated `eth2_libp2p`
- Log peer reporting in debug log
- Log minimal back-sync info when starting back-sync
- Don't log RPC received blocks to info logs (too much output during syncing)
  • Loading branch information
Tumas committed Jan 22, 2025
1 parent d556ebf commit 65c518d
Show file tree
Hide file tree
Showing 43 changed files with 1,655 additions and 695 deletions.
53 changes: 11 additions & 42 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ jemallocator = '0.5'
jwt-simple = { version = '0.12', default-features = false, features = ['pure-rust'] }
kzg = { git = 'https://github.com/grandinetech/rust-kzg.git' }
lazy_static = '1'
libmdbx = { git = 'https://github.com/paradigmxyz/reth.git', package = 'reth-libmdbx', rev = 'c228fe15808c3acbf18dc3af1a03ef5cbdcda07a' }
libmdbx = { git = 'https://github.com/paradigmxyz/reth.git', package = 'reth-libmdbx', rev = '15fac0873e91ea29ab2e605bfba17bedcd7a6084' }
libp2p = { version = '0.54', default-features = false, features = ['metrics', 'dns', 'ecdsa', 'identify', 'macros', 'noise', 'plaintext', 'secp256k1', 'serde', 'tcp', 'tokio', 'yamux', 'quic', 'upnp'] }
libp2p-mplex = '0.42'
log = '0.4'
Expand Down
1 change: 1 addition & 0 deletions benches/benches/fork_choice_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ impl Criterion {
anchor_block,
anchor_state,
false,
false,
);

for slot in (anchor_slot + 1)..=last_attestation_slot {
Expand Down
39 changes: 34 additions & 5 deletions database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,44 @@ use unwrap_none::UnwrapNone as _;
const GROWTH_STEP: ByteSize = ByteSize::mib(256);
const MAX_NAMED_DATABASES: usize = 10;

#[derive(Clone, Copy)]
pub enum DatabaseMode {
ReadOnly,
ReadWrite,
}

impl DatabaseMode {
#[must_use]
pub const fn is_read_only(self) -> bool {
matches!(self, Self::ReadOnly)
}

#[must_use]
pub const fn permissions(self) -> u32 {
match self {
// <https://erthink.github.io/libmdbx/group__c__opening.html#gabb7dd3b10dd31639ba252df545e11768>
// The UNIX permissions to set on created files. Zero value means to open existing, but do not create.
Self::ReadOnly => 0,
Self::ReadWrite => 0o600,
}
}
}

pub struct Database(DatabaseKind);

impl Database {
pub fn persistent(
name: &str,
directory: impl AsRef<Path>,
max_size: ByteSize,
read_only: bool,
mode: DatabaseMode,
) -> Result<Self> {
// If a database with the legacy name exists, keep using it.
// Otherwise, create a new database with the specified name.
// This check will not force existing users to resync.
let legacy_name = directory.as_ref().to_str().ok_or(Error)?;

if !read_only {
if !mode.is_read_only() {
fs_err::create_dir_all(&directory)?;
}

Expand All @@ -48,14 +71,14 @@ impl Database {
shrink_threshold: None,
page_size: None,
})
.open_with_permissions(directory.as_ref(), 0o600)?;
.open_with_permissions(directory.as_ref(), mode.permissions())?;

let transaction = environment.begin_rw_txn()?;
let existing_db = transaction.open_db(Some(legacy_name));

let database_name = if existing_db.is_err() {
info!("database: {legacy_name} with name {name}");
if !read_only {
if !mode.is_read_only() {
transaction.create_db(Some(name), DatabaseFlags::default())?;
}

Expand Down Expand Up @@ -753,7 +776,13 @@ mod tests {
}

fn build_persistent_database() -> Result<Database> {
let database = Database::persistent("test_db", TempDir::new()?, ByteSize::mib(1), false)?;
let database = Database::persistent(
"test_db",
TempDir::new()?,
ByteSize::mib(1),
DatabaseMode::ReadWrite,
)?;

populate_database(&database)?;
Ok(database)
}
Expand Down
1 change: 0 additions & 1 deletion features/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ pub enum Feature {
SubscribeToAllAttestationSubnets,
SubscribeToAllSyncCommitteeSubnets,
TrackMetrics,
TrustBackSyncBlocks,
// By default we fully validate objects produced by the current instance of the application.
// This costs some resources but may help in case of bugs.
TrustOwnAttestationSignatures,
Expand Down
28 changes: 25 additions & 3 deletions fork_choice_control/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// The downside is that submitting the same object multiple times in quick succession will result in
// it being processed multiple times in parallel redundantly.

use core::panic::AssertUnwindSafe;
use core::{panic::AssertUnwindSafe, sync::atomic::AtomicBool};
use std::{
sync::{mpsc::Sender, Arc},
thread::{Builder, JoinHandle},
Expand Down Expand Up @@ -106,6 +106,7 @@ where
validator_tx: impl UnboundedSink<ValidatorMessage<P, W>>,
storage: Arc<Storage<P>>,
unfinalized_blocks: impl DoubleEndedIterator<Item = Result<Arc<SignedBeaconBlock<P>>>>,
finished_back_sync: bool,
) -> Result<(Arc<Self>, MutatorHandle<P, W>)> {
let finished_initial_forward_sync = anchor_block.message().slot() >= tick.slot;

Expand All @@ -115,6 +116,7 @@ where
anchor_block,
anchor_state,
finished_initial_forward_sync,
finished_back_sync,
);

store.apply_tick(tick)?;
Expand Down Expand Up @@ -198,6 +200,14 @@ where
.send(&self.mutator_tx)
}

pub fn on_back_sync_status(&self, is_back_synced: bool) {
MutatorMessage::BackSyncStatus {
wait_group: self.owned_wait_group(),
is_back_synced,
}
.send(&self.mutator_tx)
}

pub fn on_gossip_block(&self, block: Arc<SignedBeaconBlock<P>>, gossip_id: GossipId) {
self.spawn_block_task(block, BlockOrigin::Gossip(gossip_id))
}
Expand Down Expand Up @@ -435,6 +445,13 @@ where
})
}

pub fn store_back_sync_blob_sidecars(
&self,
blob_sidecars: impl IntoIterator<Item = Arc<BlobSidecar<P>>>,
) -> Result<()> {
self.storage.store_back_sync_blob_sidecars(blob_sidecars)
}

pub fn store_back_sync_blocks(
&self,
blocks: impl IntoIterator<Item = Arc<SignedBeaconBlock<P>>>,
Expand All @@ -447,9 +464,14 @@ where
start_slot: Slot,
end_slot: Slot,
anchor_checkpoint_provider: &AnchorCheckpointProvider<P>,
is_exiting: &Arc<AtomicBool>,
) -> Result<()> {
self.storage
.archive_back_sync_states(start_slot, end_slot, anchor_checkpoint_provider)
self.storage.archive_back_sync_states(
start_slot,
end_slot,
anchor_checkpoint_provider,
is_exiting,
)
}

fn spawn_blob_sidecar_task(
Expand Down
10 changes: 6 additions & 4 deletions fork_choice_control/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ pub use crate::{
AttestationVerifierMessage, P2pMessage, PoolMessage, SubnetMessage, SyncMessage,
ValidatorMessage,
},
misc::{MutatorRejectionReason, VerifyAggregateAndProofResult, VerifyAttestationResult},
misc::{
MutatorRejectionReason, StorageMode, VerifyAggregateAndProofResult, VerifyAttestationResult,
},
queries::{BlockWithRoot, ForkChoiceContext, ForkTip, Snapshot},
specialized::{AdHocBenchController, BenchController},
storage::{
BlobSidecarByBlobId, BlockCheckpoint, BlockRootBySlot, FinalizedBlockByRoot, PrefixableKey,
SlotBlobId, SlotByStateRoot, StateByBlockRoot, StateCheckpoint, UnfinalizedBlockByRoot,
get, save, BlobSidecarByBlobId, BlockCheckpoint, BlockRootBySlot, FinalizedBlockByRoot,
PrefixableKey, SlotBlobId, SlotByStateRoot, StateByBlockRoot, StateCheckpoint,
StateLoadStrategy, Storage, UnfinalizedBlockByRoot, DEFAULT_ARCHIVAL_EPOCH_INTERVAL,
},
storage::{StateLoadStrategy, Storage, DEFAULT_ARCHIVAL_EPOCH_INTERVAL},
storage_tool::{export_state_and_blocks, replay_blocks},
wait::Wait,
};
Expand Down
4 changes: 4 additions & 0 deletions fork_choice_control/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ pub enum MutatorMessage<P: Preset, W> {
wait_group: W,
tick: Tick,
},
BackSyncStatus {
wait_group: W,
is_back_synced: bool,
},
Block {
wait_group: W,
result: Result<BlockAction<P>>,
Expand Down
19 changes: 19 additions & 0 deletions fork_choice_control/src/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,22 @@ pub enum MutatorRejectionReason {
InvalidBlock,
InvalidBlobSidecar,
}

#[derive(Clone, Copy, Debug)]
pub enum StorageMode {
Prune,
Standard,
Archive,
}

impl StorageMode {
#[must_use]
pub const fn is_prune(self) -> bool {
matches!(self, Self::Prune)
}

#[must_use]
pub const fn is_archive(self) -> bool {
matches!(self, Self::Archive)
}
}
Loading

0 comments on commit 65c518d

Please sign in to comment.