Skip to content

Commit

Permalink
feat: allow to specify the type of checkpoint slot to update
Browse files Browse the repository at this point in the history
  • Loading branch information
PJColombo committed May 29, 2024
1 parent a5bacaa commit a5bc871
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 25 deletions.
48 changes: 32 additions & 16 deletions src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
ChainReorgedEventHandlingError, FinalizedBlockEventHandlingError,
HeadBlockEventHandlingError, HistoricalSyncingError,
},
synchronizer::SynchronizerBuilder,
synchronizer::{CheckpointType, Synchronizer, SynchronizerBuilder},
utils::web3::get_full_hash,
};

Expand All @@ -35,9 +35,12 @@ pub mod types;

pub struct Indexer {
context: Context,
synchronizer_builder: SynchronizerBuilder,
dencun_fork_slot: u32,
disable_sync_historical: bool,

checkpoint_slots: Option<u32>,
disabled_checkpoint: Option<CheckpointType>,
num_threads: u32,
}

impl Indexer {
Expand All @@ -54,7 +57,12 @@ impl Indexer {
}
};

let slots_checkpoint = args.slots_per_save;
let checkpoint_slots = args.slots_per_save;
let disabled_checkpoint = if args.disable_sync_checkpoint_save {
Some(CheckpointType::Disabled)
} else {
None
};
let num_threads = match args.num_threads {
Some(num_threads) => num_threads,
None => thread::available_parallelism()
Expand All @@ -66,27 +74,19 @@ impl Indexer {
})?
.get() as u32,
};
let disable_sync_checkpoint_save = args.disable_sync_checkpoint_save;
let disable_sync_historical = args.disable_sync_historical;

let dencun_fork_slot = env
.dencun_fork_slot
.unwrap_or(env.network_name.dencun_fork_slot());

let mut synchronizer_builder = SynchronizerBuilder::new();

synchronizer_builder.with_disable_checkpoint_save(disable_sync_checkpoint_save);
synchronizer_builder.with_num_threads(num_threads);

if let Some(slots_checkpoint) = slots_checkpoint {
synchronizer_builder.with_slots_checkpoint(slots_checkpoint);
}

Ok(Self {
context,
synchronizer_builder,
dencun_fork_slot,
disable_sync_historical,
checkpoint_slots,
disabled_checkpoint,
num_threads,
})
}

Expand Down Expand Up @@ -185,7 +185,7 @@ impl Indexer {
start_block_id: BlockId,
end_block_id: BlockId,
) -> JoinHandle<IndexerResult<()>> {
let mut synchronizer = self.synchronizer_builder.build(self.context.clone());
let mut synchronizer = self._create_synchronizer(CheckpointType::Lower);

tokio::spawn(async move {
let historical_syc_thread_span = tracing::info_span!("sync:historical");
Expand Down Expand Up @@ -221,7 +221,7 @@ impl Indexer {
start_block_id: BlockId,
) -> JoinHandle<IndexerResult<()>> {
let task_context = self.context.clone();
let mut synchronizer = self.synchronizer_builder.build(self.context.clone());
let mut synchronizer = self._create_synchronizer(CheckpointType::Upper);

tokio::spawn(async move {
let realtime_sync_task_span = tracing::info_span!("sync:realtime");
Expand Down Expand Up @@ -396,4 +396,20 @@ impl Indexer {
Ok(())
})
}

fn _create_synchronizer(&self, checkpoint_type: CheckpointType) -> Synchronizer {
let mut synchronizer_builder = SynchronizerBuilder::new();

if let Some(checkpoint_slots) = self.checkpoint_slots {
synchronizer_builder.with_slots_checkpoint(checkpoint_slots);
}

let checkpoint_type = self.disabled_checkpoint.unwrap_or(checkpoint_type);

synchronizer_builder.with_checkpoint_type(checkpoint_type);

synchronizer_builder.with_num_threads(self.num_threads);

synchronizer_builder.build(self.context.clone())
}
}
35 changes: 26 additions & 9 deletions src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,23 @@ pub struct SynchronizerBuilder {
num_threads: u32,
min_slots_per_thread: u32,
slots_checkpoint: u32,
disable_checkpoint_save: bool,
checkpoint_type: CheckpointType,
}

#[derive(Debug)]
pub struct Synchronizer {
context: Context,
num_threads: u32,
min_slots_per_thread: u32,
slots_checkpoint: u32,
disable_checkpoint_save: bool,
checkpoint_type: CheckpointType,
}

#[derive(Clone, Copy, Debug, PartialEq)]
pub enum CheckpointType {
Disabled,
Lower,
Upper,
}

impl Default for SynchronizerBuilder {
Expand All @@ -35,7 +43,7 @@ impl Default for SynchronizerBuilder {
num_threads: 1,
min_slots_per_thread: 50,
slots_checkpoint: 1000,
disable_checkpoint_save: false,
checkpoint_type: CheckpointType::Upper,
}
}
}
Expand All @@ -45,8 +53,8 @@ impl SynchronizerBuilder {
SynchronizerBuilder::default()
}

pub fn with_disable_checkpoint_save(&mut self, disable_checkpoint_save: bool) -> &mut Self {
self.disable_checkpoint_save = disable_checkpoint_save;
pub fn with_checkpoint_type(&mut self, checkpoint_type: CheckpointType) -> &mut Self {
self.checkpoint_type = checkpoint_type;

self
}
Expand All @@ -68,7 +76,7 @@ impl SynchronizerBuilder {
num_threads: self.num_threads,
min_slots_per_thread: self.min_slots_per_thread,
slots_checkpoint: self.slots_checkpoint,
disable_checkpoint_save: self.disable_checkpoint_save,
checkpoint_type: self.checkpoint_type,
}
}
}
Expand Down Expand Up @@ -223,10 +231,19 @@ impl Synchronizer {
} else {
final_chunk_slot - 1
});
let last_lower_synced_slot = if is_reverse_sync { last_slot } else { None };
let last_upper_synced_slot = if is_reverse_sync { None } else { last_slot };

if !self.disable_checkpoint_save {
if self.checkpoint_type != CheckpointType::Disabled {
let last_lower_synced_slot = if self.checkpoint_type == CheckpointType::Lower {
last_slot
} else {
None
};
let last_upper_synced_slot = if self.checkpoint_type == CheckpointType::Upper {
last_slot
} else {
None
};

if let Err(error) = self
.context
.blobscan_client()
Expand Down

0 comments on commit a5bc871

Please sign in to comment.