Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add standard storage mode and improve back-sync handling (30): #94

Merged
merged 1 commit into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions benches/benches/fork_choice_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ impl Criterion {
anchor_block,
anchor_state,
false,
false,
);

for slot in (anchor_slot + 1)..=last_attestation_slot {
Expand Down
51 changes: 46 additions & 5 deletions database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,56 @@ use unwrap_none::UnwrapNone as _;
const GROWTH_STEP: ByteSize = ByteSize::mib(256);
const MAX_NAMED_DATABASES: usize = 10;

#[derive(Clone, Copy)]
pub enum DatabaseMode {
ReadOnly,
ReadWrite,
}

impl DatabaseMode {
#[must_use]
pub const fn is_read_only(self) -> bool {
matches!(self, Self::ReadOnly)
}

#[must_use]
pub const fn mode_permissions(self) -> u16 {
match self {
// <https://erthink.github.io/libmdbx/group__c__opening.html#gabb7dd3b10dd31639ba252df545e11768>
// The UNIX permissions to set on created files. Zero value means to open existing, but do not create.
Self::ReadOnly => 0,
Self::ReadWrite => 0o600,
}
}

#[must_use]
#[cfg(target_os = "linux")]
pub fn permissions(self) -> u32 {
self.mode_permissions().into()
}

#[must_use]
#[cfg(not(target_os = "linux"))]
povi marked this conversation as resolved.
Show resolved Hide resolved
pub const fn permissions(self) -> u16 {
self.mode_permissions()
}
}

pub struct Database(DatabaseKind);

impl Database {
pub fn persistent(
name: &str,
directory: impl AsRef<Path>,
max_size: ByteSize,
read_only: bool,
mode: DatabaseMode,
) -> Result<Self> {
// If a database with the legacy name exists, keep using it.
// Otherwise, create a new database with the specified name.
// This check will not force existing users to resync.
let legacy_name = directory.as_ref().to_str().ok_or(Error)?;

if !read_only {
if !mode.is_read_only() {
fs_err::create_dir_all(&directory)?;
}

Expand All @@ -48,14 +83,14 @@ impl Database {
shrink_threshold: None,
page_size: None,
})
.open_with_permissions(directory.as_ref(), 0o600)?;
.open_with_permissions(directory.as_ref(), mode.permissions())?;

let transaction = environment.begin_rw_txn()?;
let existing_db = transaction.open_db(Some(legacy_name));

let database_name = if existing_db.is_err() {
info!("database: {legacy_name} with name {name}");
if !read_only {
if !mode.is_read_only() {
transaction.create_db(Some(name), DatabaseFlags::default())?;
}

Expand Down Expand Up @@ -753,7 +788,13 @@ mod tests {
}

fn build_persistent_database() -> Result<Database> {
let database = Database::persistent("test_db", TempDir::new()?, ByteSize::mib(1), false)?;
let database = Database::persistent(
"test_db",
TempDir::new()?,
ByteSize::mib(1),
DatabaseMode::ReadWrite,
)?;

populate_database(&database)?;
Ok(database)
}
Expand Down
1 change: 0 additions & 1 deletion features/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ pub enum Feature {
SubscribeToAllAttestationSubnets,
SubscribeToAllSyncCommitteeSubnets,
TrackMetrics,
TrustBackSyncBlocks,
// By default we fully validate objects produced by the current instance of the application.
// This costs some resources but may help in case of bugs.
TrustOwnAttestationSignatures,
Expand Down
28 changes: 25 additions & 3 deletions fork_choice_control/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// The downside is that submitting the same object multiple times in quick succession will result in
// it being processed multiple times in parallel redundantly.

use core::panic::AssertUnwindSafe;
use core::{panic::AssertUnwindSafe, sync::atomic::AtomicBool};
use std::{
sync::{mpsc::Sender, Arc},
thread::{Builder, JoinHandle},
Expand Down Expand Up @@ -106,6 +106,7 @@ where
validator_tx: impl UnboundedSink<ValidatorMessage<P, W>>,
storage: Arc<Storage<P>>,
unfinalized_blocks: impl DoubleEndedIterator<Item = Result<Arc<SignedBeaconBlock<P>>>>,
finished_back_sync: bool,
) -> Result<(Arc<Self>, MutatorHandle<P, W>)> {
let finished_initial_forward_sync = anchor_block.message().slot() >= tick.slot;

Expand All @@ -115,6 +116,7 @@ where
anchor_block,
anchor_state,
finished_initial_forward_sync,
finished_back_sync,
);

store.apply_tick(tick)?;
Expand Down Expand Up @@ -198,6 +200,14 @@ where
.send(&self.mutator_tx)
}

pub fn on_back_sync_status(&self, is_back_synced: bool) {
MutatorMessage::BackSyncStatus {
wait_group: self.owned_wait_group(),
is_back_synced,
}
.send(&self.mutator_tx)
}

pub fn on_gossip_block(&self, block: Arc<SignedBeaconBlock<P>>, gossip_id: GossipId) {
self.spawn_block_task(block, BlockOrigin::Gossip(gossip_id))
}
Expand Down Expand Up @@ -435,6 +445,13 @@ where
})
}

pub fn store_back_sync_blob_sidecars(
&self,
blob_sidecars: impl IntoIterator<Item = Arc<BlobSidecar<P>>>,
) -> Result<()> {
self.storage.store_back_sync_blob_sidecars(blob_sidecars)
}

pub fn store_back_sync_blocks(
&self,
blocks: impl IntoIterator<Item = Arc<SignedBeaconBlock<P>>>,
Expand All @@ -447,9 +464,14 @@ where
start_slot: Slot,
end_slot: Slot,
anchor_checkpoint_provider: &AnchorCheckpointProvider<P>,
is_exiting: &Arc<AtomicBool>,
) -> Result<()> {
self.storage
.archive_back_sync_states(start_slot, end_slot, anchor_checkpoint_provider)
self.storage.archive_back_sync_states(
start_slot,
end_slot,
anchor_checkpoint_provider,
is_exiting,
)
}

fn spawn_blob_sidecar_task(
Expand Down
10 changes: 6 additions & 4 deletions fork_choice_control/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ pub use crate::{
AttestationVerifierMessage, P2pMessage, PoolMessage, SubnetMessage, SyncMessage,
ValidatorMessage,
},
misc::{MutatorRejectionReason, VerifyAggregateAndProofResult, VerifyAttestationResult},
misc::{
MutatorRejectionReason, StorageMode, VerifyAggregateAndProofResult, VerifyAttestationResult,
},
queries::{BlockWithRoot, ForkChoiceContext, ForkTip, Snapshot},
specialized::{AdHocBenchController, BenchController},
storage::{
BlobSidecarByBlobId, BlockCheckpoint, BlockRootBySlot, FinalizedBlockByRoot, PrefixableKey,
SlotBlobId, SlotByStateRoot, StateByBlockRoot, StateCheckpoint, UnfinalizedBlockByRoot,
get, save, BlobSidecarByBlobId, BlockCheckpoint, BlockRootBySlot, FinalizedBlockByRoot,
PrefixableKey, SlotBlobId, SlotByStateRoot, StateByBlockRoot, StateCheckpoint,
StateLoadStrategy, Storage, UnfinalizedBlockByRoot, DEFAULT_ARCHIVAL_EPOCH_INTERVAL,
},
storage::{StateLoadStrategy, Storage, DEFAULT_ARCHIVAL_EPOCH_INTERVAL},
storage_tool::{export_state_and_blocks, replay_blocks},
wait::Wait,
};
Expand Down
4 changes: 4 additions & 0 deletions fork_choice_control/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ pub enum MutatorMessage<P: Preset, W> {
wait_group: W,
tick: Tick,
},
BackSyncStatus {
wait_group: W,
is_back_synced: bool,
},
Block {
wait_group: W,
result: Result<BlockAction<P>>,
Expand Down
19 changes: 19 additions & 0 deletions fork_choice_control/src/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,22 @@ pub enum MutatorRejectionReason {
InvalidBlock,
InvalidBlobSidecar,
}

#[derive(Clone, Copy, Debug)]
pub enum StorageMode {
Prune,
Standard,
Archive,
}

impl StorageMode {
#[must_use]
pub const fn is_prune(self) -> bool {
matches!(self, Self::Prune)
}

#[must_use]
pub const fn is_archive(self) -> bool {
matches!(self, Self::Archive)
}
}
39 changes: 30 additions & 9 deletions fork_choice_control/src/mutator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ where
.expect("sender in Controller is not dropped until mutator thread exits")
{
MutatorMessage::Tick { wait_group, tick } => self.handle_tick(&wait_group, tick)?,
MutatorMessage::BackSyncStatus {
wait_group,
is_back_synced,
} => self.handle_back_sync_status(wait_group, is_back_synced),
MutatorMessage::Block {
wait_group,
result,
Expand Down Expand Up @@ -440,7 +444,7 @@ where
}

if self.store.is_forward_synced() && misc::slots_since_epoch_start::<P>(tick.slot) == 0 {
if tick.kind == TickKind::AttestFourth {
if tick.kind == TickKind::AttestFourth && self.store.is_back_synced() {
self.prune_old_records()?;
}

Expand All @@ -467,6 +471,15 @@ where
Ok(())
}

fn handle_back_sync_status(&mut self, wait_group: W, is_back_synced: bool) {
if self.store.is_back_synced() != is_back_synced {
self.store_mut().set_back_synced(is_back_synced);
self.update_store_snapshot();
}

drop(wait_group);
}

#[expect(clippy::too_many_lines)]
fn handle_block(
&mut self,
Expand Down Expand Up @@ -1636,13 +1649,15 @@ where
self.event_channels
.send_blob_sidecar_event(block_root, blob_sidecar);

self.spawn(PersistBlobSidecarsTask {
store_snapshot: self.owned_store(),
storage: self.storage.clone_arc(),
mutator_tx: self.owned_mutator_tx(),
wait_group: wait_group.clone(),
metrics: self.metrics.clone(),
});
if !self.storage.prune_storage_enabled() {
self.spawn(PersistBlobSidecarsTask {
store_snapshot: self.owned_store(),
storage: self.storage.clone_arc(),
mutator_tx: self.owned_mutator_tx(),
wait_group: wait_group.clone(),
metrics: self.metrics.clone(),
});
}

self.handle_potential_head_change(wait_group, &old_head, head_was_optimistic);
}
Expand Down Expand Up @@ -2337,6 +2352,10 @@ where
}

fn prune_old_records(&self) -> Result<()> {
if self.storage.archive_storage_enabled() {
return Ok(());
}

let storage = self.storage.clone_arc();
let blobs_up_to_epoch = self.store.min_checked_data_availability_epoch();
let blobs_up_to_slot = misc::compute_start_slot_at_epoch::<P>(blobs_up_to_epoch);
Expand Down Expand Up @@ -2376,7 +2395,9 @@ where

match storage.prune_old_state_roots(blocks_up_to_slot) {
Ok(()) => {
debug!("pruned old state roots from storage up to slot {blocks_up_to_slot}");
debug!(
"pruned old state roots from storage up to slot {blocks_up_to_slot}"
);
}
Err(error) => {
error!("pruning old state roots from storage failed: {error:?}")
Expand Down
Loading
Loading