Skip to content

Commit

Permalink
Add standard storage mode and improve back-sync handling (30):
Browse files Browse the repository at this point in the history
Store modes and back-sync:

- Back-sync blocks to `Config::min_epochs_for_block_requests` in standard
  storage mode
- Back-sync blob sidecars to `Config::min_epochs_for_blob_sidecars_requests`
- Track & filter peers that don't serve blocks prior to
  `Config::min_epochs_for_block_requests` when performing full back-sync
- Remove `Feature::TrustBackSyncBlocks`
- Verify signatures of back-synced blocks
- Move back-sync status to `Store`
- Relocate `received_blob_sidecars` and `received_block_roots` caches from `p2p::Network` to `p2p::BlockSyncService`
- Extend `SyncBatch` with `retry_count` and `responses_received` fields
- Use smaller back-sync batches when syncing with blobs
- Don't validate signature of genesis block
- Track state archival progress in database to be able to resume it
  after restart
- Don't request data from busy peers

DB:

- Add db-info command to inspect the Sync database
- Replace read-only boolean flag with more descriptive `DatabaseMode` enum

Other:

- Panic to trigger app-restart if network thread is down
- Handle exit signal in an archiver thread & batch archiver updates to db
- Rename `RequestType` to `RPCRequestType` as it conflicts with updated `eth2_libp2p`
- Log peer reporting in debug log
- Log minimal back-sync info when starting back-sync
- Don't log RPC received blocks to info logs (too much output during syncing)
  • Loading branch information
Tumas committed Jan 23, 2025
1 parent c0678b0 commit 8c6e54c
Show file tree
Hide file tree
Showing 42 changed files with 1,659 additions and 662 deletions.
14 changes: 4 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions benches/benches/fork_choice_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ impl Criterion {
anchor_block,
anchor_state,
false,
false,
);

for slot in (anchor_slot + 1)..=last_attestation_slot {
Expand Down
51 changes: 46 additions & 5 deletions database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,56 @@ use unwrap_none::UnwrapNone as _;
const GROWTH_STEP: ByteSize = ByteSize::mib(256);
const MAX_NAMED_DATABASES: usize = 10;

#[derive(Clone, Copy)]
pub enum DatabaseMode {
ReadOnly,
ReadWrite,
}

impl DatabaseMode {
#[must_use]
pub const fn is_read_only(self) -> bool {
matches!(self, Self::ReadOnly)
}

#[must_use]
pub const fn mode_permissions(self) -> u16 {
match self {
// <https://erthink.github.io/libmdbx/group__c__opening.html#gabb7dd3b10dd31639ba252df545e11768>
// The UNIX permissions to set on created files. Zero value means to open existing, but do not create.
Self::ReadOnly => 0,
Self::ReadWrite => 0o600,
}
}

#[must_use]
#[cfg(target_os = "linux")]
pub fn permissions(self) -> u32 {
self.mode_permissions().into()
}

#[must_use]
#[cfg(not(target_os = "linux"))]
pub const fn permissions(self) -> u16 {
self.mode_permissions()
}
}

pub struct Database(DatabaseKind);

impl Database {
pub fn persistent(
name: &str,
directory: impl AsRef<Path>,
max_size: ByteSize,
read_only: bool,
mode: DatabaseMode,
) -> Result<Self> {
// If a database with the legacy name exists, keep using it.
// Otherwise, create a new database with the specified name.
// This check will not force existing users to resync.
let legacy_name = directory.as_ref().to_str().ok_or(Error)?;

if !read_only {
if !mode.is_read_only() {
fs_err::create_dir_all(&directory)?;
}

Expand All @@ -48,14 +83,14 @@ impl Database {
shrink_threshold: None,
page_size: None,
})
.open_with_permissions(directory.as_ref(), 0o600)?;
.open_with_permissions(directory.as_ref(), mode.permissions())?;

let transaction = environment.begin_rw_txn()?;
let existing_db = transaction.open_db(Some(legacy_name));

let database_name = if existing_db.is_err() {
info!("database: {legacy_name} with name {name}");
if !read_only {
if !mode.is_read_only() {
transaction.create_db(Some(name), DatabaseFlags::default())?;
}

Expand Down Expand Up @@ -753,7 +788,13 @@ mod tests {
}

fn build_persistent_database() -> Result<Database> {
let database = Database::persistent("test_db", TempDir::new()?, ByteSize::mib(1), false)?;
let database = Database::persistent(
"test_db",
TempDir::new()?,
ByteSize::mib(1),
DatabaseMode::ReadWrite,
)?;

populate_database(&database)?;
Ok(database)
}
Expand Down
1 change: 0 additions & 1 deletion features/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ pub enum Feature {
SubscribeToAllAttestationSubnets,
SubscribeToAllSyncCommitteeSubnets,
TrackMetrics,
TrustBackSyncBlocks,
// By default we fully validate objects produced by the current instance of the application.
// This costs some resources but may help in case of bugs.
TrustOwnAttestationSignatures,
Expand Down
28 changes: 25 additions & 3 deletions fork_choice_control/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// The downside is that submitting the same object multiple times in quick succession will result in
// it being processed multiple times in parallel redundantly.

use core::panic::AssertUnwindSafe;
use core::{panic::AssertUnwindSafe, sync::atomic::AtomicBool};
use std::{
sync::{mpsc::Sender, Arc},
thread::{Builder, JoinHandle},
Expand Down Expand Up @@ -106,6 +106,7 @@ where
validator_tx: impl UnboundedSink<ValidatorMessage<P, W>>,
storage: Arc<Storage<P>>,
unfinalized_blocks: impl DoubleEndedIterator<Item = Result<Arc<SignedBeaconBlock<P>>>>,
finished_back_sync: bool,
) -> Result<(Arc<Self>, MutatorHandle<P, W>)> {
let finished_initial_forward_sync = anchor_block.message().slot() >= tick.slot;

Expand All @@ -115,6 +116,7 @@ where
anchor_block,
anchor_state,
finished_initial_forward_sync,
finished_back_sync,
);

store.apply_tick(tick)?;
Expand Down Expand Up @@ -198,6 +200,14 @@ where
.send(&self.mutator_tx)
}

pub fn on_back_sync_status(&self, is_back_synced: bool) {
MutatorMessage::BackSyncStatus {
wait_group: self.owned_wait_group(),
is_back_synced,
}
.send(&self.mutator_tx)
}

pub fn on_gossip_block(&self, block: Arc<SignedBeaconBlock<P>>, gossip_id: GossipId) {
self.spawn_block_task(block, BlockOrigin::Gossip(gossip_id))
}
Expand Down Expand Up @@ -435,6 +445,13 @@ where
})
}

pub fn store_back_sync_blob_sidecars(
&self,
blob_sidecars: impl IntoIterator<Item = Arc<BlobSidecar<P>>>,
) -> Result<()> {
self.storage.store_back_sync_blob_sidecars(blob_sidecars)
}

pub fn store_back_sync_blocks(
&self,
blocks: impl IntoIterator<Item = Arc<SignedBeaconBlock<P>>>,
Expand All @@ -447,9 +464,14 @@ where
start_slot: Slot,
end_slot: Slot,
anchor_checkpoint_provider: &AnchorCheckpointProvider<P>,
is_exiting: &Arc<AtomicBool>,
) -> Result<()> {
self.storage
.archive_back_sync_states(start_slot, end_slot, anchor_checkpoint_provider)
self.storage.archive_back_sync_states(
start_slot,
end_slot,
anchor_checkpoint_provider,
is_exiting,
)
}

fn spawn_blob_sidecar_task(
Expand Down
10 changes: 6 additions & 4 deletions fork_choice_control/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ pub use crate::{
AttestationVerifierMessage, P2pMessage, PoolMessage, SubnetMessage, SyncMessage,
ValidatorMessage,
},
misc::{MutatorRejectionReason, VerifyAggregateAndProofResult, VerifyAttestationResult},
misc::{
MutatorRejectionReason, StorageMode, VerifyAggregateAndProofResult, VerifyAttestationResult,
},
queries::{BlockWithRoot, ForkChoiceContext, ForkTip, Snapshot},
specialized::{AdHocBenchController, BenchController},
storage::{
BlobSidecarByBlobId, BlockCheckpoint, BlockRootBySlot, FinalizedBlockByRoot, PrefixableKey,
SlotBlobId, SlotByStateRoot, StateByBlockRoot, StateCheckpoint, UnfinalizedBlockByRoot,
get, save, BlobSidecarByBlobId, BlockCheckpoint, BlockRootBySlot, FinalizedBlockByRoot,
PrefixableKey, SlotBlobId, SlotByStateRoot, StateByBlockRoot, StateCheckpoint,
StateLoadStrategy, Storage, UnfinalizedBlockByRoot, DEFAULT_ARCHIVAL_EPOCH_INTERVAL,
},
storage::{StateLoadStrategy, Storage, DEFAULT_ARCHIVAL_EPOCH_INTERVAL},
storage_tool::{export_state_and_blocks, replay_blocks},
wait::Wait,
};
Expand Down
4 changes: 4 additions & 0 deletions fork_choice_control/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ pub enum MutatorMessage<P: Preset, W> {
wait_group: W,
tick: Tick,
},
BackSyncStatus {
wait_group: W,
is_back_synced: bool,
},
Block {
wait_group: W,
result: Result<BlockAction<P>>,
Expand Down
19 changes: 19 additions & 0 deletions fork_choice_control/src/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,22 @@ pub enum MutatorRejectionReason {
InvalidBlock,
InvalidBlobSidecar,
}

#[derive(Clone, Copy, Debug)]
pub enum StorageMode {
Prune,
Standard,
Archive,
}

impl StorageMode {
#[must_use]
pub const fn is_prune(self) -> bool {
matches!(self, Self::Prune)
}

#[must_use]
pub const fn is_archive(self) -> bool {
matches!(self, Self::Archive)
}
}
42 changes: 33 additions & 9 deletions fork_choice_control/src/mutator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ where
.expect("sender in Controller is not dropped until mutator thread exits")
{
MutatorMessage::Tick { wait_group, tick } => self.handle_tick(&wait_group, tick)?,
MutatorMessage::BackSyncStatus {
wait_group,
is_back_synced,
} => self.handle_back_sync_status(wait_group, is_back_synced),
MutatorMessage::Block {
wait_group,
result,
Expand Down Expand Up @@ -439,7 +443,10 @@ where
self.spawn_preprocess_head_state_for_next_slot_task();
}

if self.store.is_forward_synced() && misc::slots_since_epoch_start::<P>(tick.slot) == 0 {
if self.store.is_forward_synced()
&& self.store.is_back_synced()
&& misc::slots_since_epoch_start::<P>(tick.slot) == 0
{
if tick.kind == TickKind::AttestFourth {
self.prune_old_records()?;
}
Expand Down Expand Up @@ -467,6 +474,15 @@ where
Ok(())
}

fn handle_back_sync_status(&mut self, wait_group: W, is_back_synced: bool) {
if self.store.is_back_synced() != is_back_synced {
self.store_mut().set_back_synced(is_back_synced);
self.update_store_snapshot();
}

drop(wait_group);
}

#[expect(clippy::too_many_lines)]
fn handle_block(
&mut self,
Expand Down Expand Up @@ -1636,13 +1652,15 @@ where
self.event_channels
.send_blob_sidecar_event(block_root, blob_sidecar);

self.spawn(PersistBlobSidecarsTask {
store_snapshot: self.owned_store(),
storage: self.storage.clone_arc(),
mutator_tx: self.owned_mutator_tx(),
wait_group: wait_group.clone(),
metrics: self.metrics.clone(),
});
if !self.storage.prune_storage_enabled() {
self.spawn(PersistBlobSidecarsTask {
store_snapshot: self.owned_store(),
storage: self.storage.clone_arc(),
mutator_tx: self.owned_mutator_tx(),
wait_group: wait_group.clone(),
metrics: self.metrics.clone(),
});
}

self.handle_potential_head_change(wait_group, &old_head, head_was_optimistic);
}
Expand Down Expand Up @@ -2337,6 +2355,10 @@ where
}

fn prune_old_records(&self) -> Result<()> {
if self.storage.archive_storage_enabled() {
return Ok(());
}

let storage = self.storage.clone_arc();
let blobs_up_to_epoch = self.store.min_checked_data_availability_epoch();
let blobs_up_to_slot = misc::compute_start_slot_at_epoch::<P>(blobs_up_to_epoch);
Expand Down Expand Up @@ -2376,7 +2398,9 @@ where

match storage.prune_old_state_roots(blocks_up_to_slot) {
Ok(()) => {
debug!("pruned old state roots from storage up to slot {blocks_up_to_slot}");
debug!(
"pruned old state roots from storage up to slot {blocks_up_to_slot}"
);
}
Err(error) => {
error!("pruning old state roots from storage failed: {error:?}")
Expand Down
Loading

0 comments on commit 8c6e54c

Please sign in to comment.