From 4144f4bb4d670e973a18f10033d6dddefcd98868 Mon Sep 17 00:00:00 2001 From: PJColombo Date: Mon, 10 Jun 2024 15:18:49 +0200 Subject: [PATCH] refactor(indexer): check for reorgs by verifying parent and current block root hashes instead of listening to `chain_reorg` sse event --- src/indexer/error.rs | 73 +++---- .../event_handlers/finalized_checkpoint.rs | 85 ++++++++ src/indexer/event_handlers/head.rs | 139 ++++++++++++ src/indexer/event_handlers/mod.rs | 2 + src/indexer/mod.rs | 202 ++++-------------- src/indexer/types.rs | 4 +- src/synchronizer/mod.rs | 20 +- 7 files changed, 315 insertions(+), 210 deletions(-) create mode 100644 src/indexer/event_handlers/finalized_checkpoint.rs create mode 100644 src/indexer/event_handlers/head.rs create mode 100644 src/indexer/event_handlers/mod.rs diff --git a/src/indexer/error.rs b/src/indexer/error.rs index 143e3cd..b707e59 100644 --- a/src/indexer/error.rs +++ b/src/indexer/error.rs @@ -2,82 +2,69 @@ use tokio::sync::mpsc::error::SendError; use crate::{clients::common::ClientError, synchronizer::error::SynchronizerError}; -use super::types::IndexerTaskMessage; +use super::{ + event_handlers::{ + finalized_checkpoint::FinalizedCheckpointEventHandlerError, head::HeadEventHandlerError, + }, + types::IndexerTaskMessage, +}; #[derive(Debug, thiserror::Error)] pub enum IndexerError { #[error("failed to create indexer")] CreationFailure(#[source] anyhow::Error), #[error(transparent)] - SyncingTaskError(#[from] SyncingTaskError), + SyncingTaskError(#[from] IndexingError), #[error("failed to retrieve blobscan's sync state")] BlobscanSyncStateRetrievalError(#[source] ClientError), - #[error("sync task message send failure")] + #[error("failed to send syncing task message")] SyncingTaskMessageSendFailure(#[from] SendError), } #[derive(Debug, thiserror::Error)] -pub enum SyncingTaskError { - #[error("an error ocurred while syncing historical data")] - HistoricalSyncingTaskError(#[from] HistoricalSyncingError), - #[error("an error occurred while syncing realtime data")] - RealtimeSyncingTaskError(#[from] RealtimeSyncingError), +pub enum IndexingError { + #[error(transparent)] + HistoricalIndexingFailure(#[from] HistoricalIndexingError), + #[error(transparent)] + LiveIndexingError(#[from] LiveIndexingError), } #[derive(Debug, thiserror::Error)] -pub enum HistoricalSyncingError { +pub enum HistoricalIndexingError { #[error(transparent)] SynchronizerError(#[from] SynchronizerError), } #[derive(Debug, thiserror::Error)] -pub enum RealtimeSyncingError { +pub enum LiveIndexingError { #[error("an error ocurred while receiving beacon events")] BeaconEventsConnectionFailure(#[from] reqwest_eventsource::Error), #[error("failed to subscribe to beacon events")] BeaconEventsSubscriptionError(#[source] ClientError), #[error("unexpected event \"{0}\" received")] UnexpectedBeaconEvent(String), - #[error(transparent)] - BeaconEventProcessingError(#[from] BeaconEventError), -} - -#[derive(Debug, thiserror::Error)] -pub enum BeaconEventError { - #[error("failed to handle \"chain_reorged\" event")] - ChainReorged(#[from] ChainReorgedEventHandlingError), - #[error("failed to handle \"head\" event")] - HeadBlock(#[from] HeadBlockEventHandlingError), - #[error("failed to handle \"finalized_checkpoint\" event")] - FinalizedCheckpoint(#[from] FinalizedBlockEventHandlingError), + #[error("failed to handle beacon event")] + BeaconEventHandlingError(#[from] EventHandlerError), } #[derive(Debug, thiserror::Error)] -pub enum FinalizedBlockEventHandlingError { +pub enum EventHandlerError { #[error(transparent)] - EventDeserializationFailure(#[from] serde_json::Error), - #[error("failed to retrieve finalized block {0}")] - BlockRetrievalError(String, #[source] ClientError), + HeadEventHandlerError(#[from] HeadEventHandlerError), #[error(transparent)] - Other(#[from] anyhow::Error), - #[error("failed to update blobscan's last finalized block")] - BlobscanSyncStateUpdateError(#[source] ClientError), + FinalizedCheckpointHandlerError(#[from] FinalizedCheckpointEventHandlerError), } -#[derive(Debug, thiserror::Error)] -pub enum ChainReorgedEventHandlingError { - #[error(transparent)] - EventDeserializationFailure(#[from] serde_json::Error), - #[error("failed to retrieve reorged block {0}")] - BlockRetrievalError(String, #[source] ClientError), - #[error("failed to handle reorged of depth {0} starting at block {1}")] - ReorgedHandlingFailure(u32, String, #[source] ClientError), +impl From for LiveIndexingError { + fn from(err: HeadEventHandlerError) -> Self { + LiveIndexingError::BeaconEventHandlingError(EventHandlerError::HeadEventHandlerError(err)) + } } -#[derive(Debug, thiserror::Error)] -pub enum HeadBlockEventHandlingError { - #[error(transparent)] - EventDeserializationFailure(#[from] serde_json::Error), - #[error(transparent)] - SynchronizerError(#[from] SynchronizerError), +impl From for LiveIndexingError { + fn from(err: FinalizedCheckpointEventHandlerError) -> Self { + LiveIndexingError::BeaconEventHandlingError( + EventHandlerError::FinalizedCheckpointHandlerError(err), + ) + } } diff --git a/src/indexer/event_handlers/finalized_checkpoint.rs b/src/indexer/event_handlers/finalized_checkpoint.rs new file mode 100644 index 0000000..2e37290 --- /dev/null +++ b/src/indexer/event_handlers/finalized_checkpoint.rs @@ -0,0 +1,85 @@ +use tracing::info; + +use crate::{ + clients::{ + beacon::types::{BlockId, FinalizedCheckpointEventData}, + blobscan::types::BlockchainSyncState, + common::ClientError, + }, + context::Context, + utils::web3::get_full_hash, +}; + +#[derive(Debug, thiserror::Error)] +pub enum FinalizedCheckpointEventHandlerError { + #[error(transparent)] + EventDeserializationFailure(#[from] serde_json::Error), + #[error("failed to retrieve block {0}")] + BlockRetrievalError(String, #[source] ClientError), + #[error("block \"{0}\" not found")] + BlockNotFound(String), + #[error("failed to update last finalized block")] + BlobscanFinalizedBlockUpdateFailure(#[source] ClientError), +} + +pub struct FinalizedCheckpointHandler { + context: Context, +} + +impl FinalizedCheckpointHandler { + pub fn new(context: Context) -> Self { + FinalizedCheckpointHandler { context } + } + + pub async fn handle( + &self, + event_data: String, + ) -> Result<(), FinalizedCheckpointEventHandlerError> { + let finalized_checkpoint_data = + serde_json::from_str::(&event_data)?; + let block_hash = finalized_checkpoint_data.block; + let full_block_hash = get_full_hash(&block_hash); + let last_finalized_block_number = match self + .context + .beacon_client() + .get_block(&BlockId::Hash(block_hash)) + .await + .map_err(|err| { + FinalizedCheckpointEventHandlerError::BlockRetrievalError( + full_block_hash.clone(), + err, + ) + })? { + Some(block) => match block.message.body.execution_payload { + Some(execution_payload) => execution_payload.block_number, + None => { + return Err(FinalizedCheckpointEventHandlerError::BlockNotFound( + full_block_hash, + )) + } + }, + None => { + return Err(FinalizedCheckpointEventHandlerError::BlockNotFound( + full_block_hash, + )) + } + }; + + self.context + .blobscan_client() + .update_sync_state(BlockchainSyncState { + last_lower_synced_slot: None, + last_upper_synced_slot: None, + last_finalized_block: Some(last_finalized_block_number), + }) + .await + .map_err(FinalizedCheckpointEventHandlerError::BlobscanFinalizedBlockUpdateFailure)?; + + info!( + finalized_execution_block = last_finalized_block_number, + "Finalized checkpoint event received. Updated last finalized block number" + ); + + Ok(()) + } +} diff --git a/src/indexer/event_handlers/head.rs b/src/indexer/event_handlers/head.rs new file mode 100644 index 0000000..faad56d --- /dev/null +++ b/src/indexer/event_handlers/head.rs @@ -0,0 +1,139 @@ +use std::cmp; + +use ethers::types::H256; +use tracing::info; + +use crate::{ + clients::{ + beacon::types::{BlockHeader, BlockId, HeadEventData}, + blobscan::types::BlockchainSyncState, + common::ClientError, + }, + context::Context, + synchronizer::{error::SynchronizerError, Synchronizer}, +}; + +#[derive(Debug, thiserror::Error)] +pub enum HeadEventHandlerError { + #[error(transparent)] + EventDeserializationFailure(#[from] serde_json::Error), + #[error("failed to retrieve header for block \"{0}\"")] + BlockHeaderRetrievalError(BlockId, #[source] ClientError), + #[error("header for block \"{0}\" not found")] + BlockHeaderNotFound(BlockId), + #[error("failed to index head block")] + BlockSyncedError(#[from] SynchronizerError), + #[error("failed to handle reorged slots")] + BlobscanReorgedSlotsFailure(#[source] ClientError), + #[error("failed to update blobscan's sync state")] + BlobscanSyncStateUpdateError(#[source] ClientError), +} + +pub struct HeadEventHandler { + context: Context, + synchronizer: Synchronizer, + start_block_id: BlockId, + last_block_hash: Option, +} + +impl HeadEventHandler { + pub fn new(context: Context, synchronizer: Synchronizer, start_block_id: BlockId) -> Self { + HeadEventHandler { + context, + synchronizer, + start_block_id, + last_block_hash: None, + } + } + + pub async fn handle(&mut self, event_data: String) -> Result<(), HeadEventHandlerError> { + let head_block_data = serde_json::from_str::(&event_data)?; + + let head_block_slot = head_block_data.slot; + let head_block_hash = head_block_data.block; + + let head_block_id = BlockId::Slot(head_block_data.slot); + let initial_block_id = if self.last_block_hash.is_none() { + self.start_block_id.clone() + } else { + head_block_id.clone() + }; + + let head_block_header = self.get_block_header(&head_block_id).await?.header; + + if let Some(last_block_hash) = self.last_block_hash { + if last_block_hash != head_block_header.message.parent_root { + let parent_block_header = self + .get_block_header(&BlockId::Hash(head_block_header.message.parent_root)) + .await? + .header; + let parent_block_slot = parent_block_header.message.slot; + let reorg_start_slot = parent_block_slot + 1; + let reorg_final_slot = head_block_slot; + let reorged_slots = (reorg_start_slot..reorg_final_slot).collect::>(); + + let result: Result<(), HeadEventHandlerError> = async { + let total_updated_slots = self.context + .blobscan_client() + .handle_reorged_slots(reorged_slots.as_slice()) + .await + .map_err(HeadEventHandlerError::BlobscanReorgedSlotsFailure)?; + + + info!(slot=head_block_slot, "Reorganization detected. Found the following reorged slots: {:#?}. Total slots marked as reorged: {total_updated_slots}", reorged_slots); + + // Re-index parent block as it may be mark as reorged and not indexed + self.synchronizer + .run( + &BlockId::Slot(parent_block_slot), + &BlockId::Slot(parent_block_slot + 1), + ) + .await?; + + Ok(()) + } + .await; + + if let Err(err) = result { + // If an error occurred while handling the reorg try to update the latest synced slot to the last known slot before the reorg + self.context + .blobscan_client() + .update_sync_state(BlockchainSyncState { + last_finalized_block: None, + last_lower_synced_slot: None, + last_upper_synced_slot: Some(cmp::max(parent_block_slot - 1, 0)), + }) + .await + .map_err(HeadEventHandlerError::BlobscanSyncStateUpdateError)?; + + return Err(err); + } + } + } + + self.synchronizer + .run(&initial_block_id, &BlockId::Slot(head_block_slot + 1)) + .await?; + + self.last_block_hash = Some(head_block_hash); + + Ok(()) + } + + async fn get_block_header( + &self, + block_id: &BlockId, + ) -> Result { + match self + .context + .beacon_client() + .get_block_header(block_id) + .await + .map_err(|err| { + HeadEventHandlerError::BlockHeaderRetrievalError(block_id.clone(), err) + })? { + Some(block) => Ok(block), + None => Err(HeadEventHandlerError::BlockHeaderNotFound(block_id.clone())), + } + } +} diff --git a/src/indexer/event_handlers/mod.rs b/src/indexer/event_handlers/mod.rs new file mode 100644 index 0000000..dea7c11 --- /dev/null +++ b/src/indexer/event_handlers/mod.rs @@ -0,0 +1,2 @@ +pub mod finalized_checkpoint; +pub mod head; diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index ceca713..02ef0e7 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -1,36 +1,29 @@ use std::thread; -use anyhow::{anyhow, Context as AnyhowContext}; +use anyhow::anyhow; +use event_handlers::{finalized_checkpoint::FinalizedCheckpointHandler, head::HeadEventHandler}; use futures::StreamExt; use reqwest_eventsource::Event; use tokio::{sync::mpsc, task::JoinHandle}; -use tracing::{debug, error, info, warn, Instrument}; +use tracing::{debug, error, info, Instrument}; use crate::{ args::Args, - clients::{ - beacon::types::{ - BlockId, ChainReorgEventData, FinalizedCheckpointEventData, HeadEventData, Topic, - }, - blobscan::types::BlockchainSyncState, - }, + clients::beacon::types::{BlockId, Topic}, context::{Config as ContextConfig, Context}, env::Environment, - indexer::error::{ - ChainReorgedEventHandlingError, FinalizedBlockEventHandlingError, - HeadBlockEventHandlingError, HistoricalSyncingError, - }, + indexer::error::HistoricalIndexingError, synchronizer::{CheckpointType, Synchronizer, SynchronizerBuilder}, - utils::web3::get_full_hash, }; use self::{ - error::{IndexerError, RealtimeSyncingError}, + error::{IndexerError, LiveIndexingError}, types::{IndexerResult, IndexerTaskMessage}, }; pub mod error; +pub mod event_handlers; pub mod types; pub struct Indexer { @@ -142,7 +135,7 @@ impl Indexer { let mut total_tasks = 0; if end_block_id.is_none() { - self._start_realtime_syncing_task(tx, current_upper_block_id); + self.start_live_indexing_task(tx, current_upper_block_id); total_tasks += 1; } @@ -152,7 +145,7 @@ impl Indexer { matches!(current_lower_block_id, BlockId::Slot(slot) if slot < self.dencun_fork_slot); if !self.disable_sync_historical && !historical_sync_completed { - self._start_historical_syncing_task(tx1, current_lower_block_id, end_block_id); + self.start_historical_indexing_task(tx1, current_lower_block_id, end_block_id); total_tasks += 1; } @@ -179,23 +172,23 @@ impl Indexer { Ok(()) } - fn _start_historical_syncing_task( + fn start_historical_indexing_task( &self, tx: mpsc::Sender, start_block_id: BlockId, end_block_id: BlockId, ) -> JoinHandle> { - let mut synchronizer = self._create_synchronizer(CheckpointType::Lower); + let synchronizer = self.create_synchronizer(CheckpointType::Lower); tokio::spawn(async move { - let historical_syc_thread_span = tracing::info_span!("sync:historical"); + let historical_syc_thread_span = tracing::info_span!("indexer:historical"); let result: Result<(), IndexerError> = async move { let result = synchronizer.run(&start_block_id, &end_block_id).await; if let Err(error) = result { tx.send(IndexerTaskMessage::Error( - HistoricalSyncingError::SynchronizerError(error).into(), + HistoricalIndexingError::SynchronizerError(error).into(), )) .await?; } else { @@ -215,34 +208,33 @@ impl Indexer { }) } - fn _start_realtime_syncing_task( + fn start_live_indexing_task( &self, tx: mpsc::Sender, start_block_id: BlockId, ) -> JoinHandle> { let task_context = self.context.clone(); - let mut synchronizer = self._create_synchronizer(CheckpointType::Upper); + let synchronizer = self.create_synchronizer(CheckpointType::Upper); tokio::spawn(async move { - let realtime_sync_task_span = tracing::info_span!("sync:realtime"); - - let result: Result<(), RealtimeSyncingError> = async { - let beacon_client = task_context.beacon_client(); - let blobscan_client = task_context.blobscan_client(); - let topics = vec![ - Topic::ChainReorg, - Topic::Head, - Topic::FinalizedCheckpoint, - ]; + let realtime_sync_task_span = tracing::info_span!("indexer:live"); + + let result: Result<(), LiveIndexingError> = async { + let topics = vec![Topic::Head, Topic::FinalizedCheckpoint]; let mut event_source = task_context .beacon_client() - .subscribe_to_events(&topics).map_err(RealtimeSyncingError::BeaconEventsSubscriptionError)?; - let mut is_initial_sync_to_head = true; + .subscribe_to_events(&topics) + .map_err(LiveIndexingError::BeaconEventsSubscriptionError)?; let events = topics - .iter() - .map(|topic| topic.into()) - .collect::>() - .join(", "); + .iter() + .map(|topic| topic.into()) + .collect::>() + .join(", "); + + let mut head_event_handler = + HeadEventHandler::new(task_context.clone(), synchronizer, start_block_id); + let finalized_checkpoint_event_handler = + FinalizedCheckpointHandler::new(task_context); info!("Subscribed to beacon events: {events}"); @@ -255,125 +247,25 @@ impl Indexer { let event_name = event.event.as_str(); match event_name { - "chain_reorg" => { - let chain_reorg_span = tracing::info_span!("chain_reorg"); - - let result: Result<(), ChainReorgedEventHandlingError> = async { - let reorg_block_data = - serde_json::from_str::(&event.data)?; - let slot = reorg_block_data.slot; - let old_head_block = reorg_block_data.old_head_block; - let target_depth = reorg_block_data.depth; - - let mut current_reorged_block = old_head_block; - let mut reorged_slots: Vec = vec![]; - - for current_depth in 1..=target_depth { - let reorged_block_head = match beacon_client.get_block_header(&BlockId::Hash(current_reorged_block)).await.map_err(|err| ChainReorgedEventHandlingError::BlockRetrievalError(get_full_hash(¤t_reorged_block), err))? { - Some(block) => block, - None => { - warn!(event=event_name, slot=slot, "Found {current_depth} out of {target_depth} reorged blocks only"); - break - } - }; - - reorged_slots.push(reorged_block_head.header.message.slot); - current_reorged_block = reorged_block_head.header.message.parent_root; - } - - let total_updated_slots = blobscan_client.handle_reorged_slots(&reorged_slots).await.map_err(|err| ChainReorgedEventHandlingError::ReorgedHandlingFailure(target_depth, get_full_hash(&old_head_block), err))?; - - info!(event=event_name, slot=slot, "Reorganization of depth {target_depth} detected. Found the following reorged slots: {:#?}. Total slots marked as reorged: {total_updated_slots}", reorged_slots); - - Ok(()) - }.instrument(chain_reorg_span).await; - - if let Err(error) = result { - // If an error occurred while processing the event, try to update the latest synced slot to the last known slot before the reorg - if let Ok(reorg_block_data) = serde_json::from_str::(&event.data) { - let slot = reorg_block_data.slot; - - let _ = blobscan_client.update_sync_state(BlockchainSyncState { - last_finalized_block: None, - last_lower_synced_slot: None, - last_upper_synced_slot: Some(slot -1) - }).await; - } - - return Err(RealtimeSyncingError::BeaconEventProcessingError(error.into())); - } - }, - "head" => { - let head_span = tracing::info_span!("head_block"); - - let result: Result<(), HeadBlockEventHandlingError> = async { - let head_block_data = - serde_json::from_str::(&event.data)?; - - - let head_block_id = &BlockId::Slot(head_block_data.slot); - let initial_block_id = if is_initial_sync_to_head { - is_initial_sync_to_head = false; - - &start_block_id - } else { - head_block_id - }; - - synchronizer.run(initial_block_id, &BlockId::Slot(head_block_data.slot + 1)).await?; - - Ok(()) - }.instrument(head_span).await; - - if let Err(error) = result { - return Err(RealtimeSyncingError::BeaconEventProcessingError(error.into())); - } - }, + "head" => { + head_event_handler + .handle(event.data) + .instrument(tracing::info_span!("head_block")) + .await?; + } "finalized_checkpoint" => { - let finalized_checkpoint_span = tracing::info_span!("finalized_checkpoint"); - - let result: Result<(), FinalizedBlockEventHandlingError> = async move { - let finalized_checkpoint_data = - serde_json::from_str::( - &event.data, - )?; - let block_hash = finalized_checkpoint_data.block; - let last_finalized_block_number = beacon_client - .get_block(&BlockId::Hash(block_hash)) - .await.map_err(|err| FinalizedBlockEventHandlingError::BlockRetrievalError(get_full_hash(&block_hash), err))? - .with_context(|| { - anyhow!("Finalized block not found") - })? - .message.body.execution_payload - .with_context(|| { - anyhow!("Finalized block has no execution payload") - })?.block_number; - - blobscan_client - .update_sync_state(BlockchainSyncState { - last_lower_synced_slot: None, - last_upper_synced_slot: None, - last_finalized_block: Some( - last_finalized_block_number - ), - }) - .await.map_err(FinalizedBlockEventHandlingError::BlobscanSyncStateUpdateError)?; - - info!(finalized_execution_block=last_finalized_block_number, "Finalized checkpoint event received. Updated last finalized block number"); - - Ok(()) - }.instrument(finalized_checkpoint_span).await; - - if let Err(error) = result { - return Err(RealtimeSyncingError::BeaconEventProcessingError(error.into())); - } - - }, + finalized_checkpoint_event_handler + .handle(event.data) + .instrument(tracing::info_span!("finalized_checkpoint")) + .await?; + } unexpected_event_id => { - return Err(RealtimeSyncingError::UnexpectedBeaconEvent(unexpected_event_id.to_string())); - }, + return Err(LiveIndexingError::UnexpectedBeaconEvent( + unexpected_event_id.to_string(), + )); + } } - }, + } Err(error) => { event_source.close(); @@ -397,7 +289,7 @@ impl Indexer { }) } - fn _create_synchronizer(&self, checkpoint_type: CheckpointType) -> Synchronizer { + fn create_synchronizer(&self, checkpoint_type: CheckpointType) -> Synchronizer { let mut synchronizer_builder = SynchronizerBuilder::new(); if let Some(checkpoint_slots) = self.checkpoint_slots { diff --git a/src/indexer/types.rs b/src/indexer/types.rs index bde0077..36189b1 100644 --- a/src/indexer/types.rs +++ b/src/indexer/types.rs @@ -1,8 +1,8 @@ -use super::error::{IndexerError, SyncingTaskError}; +use super::error::{IndexerError, IndexingError}; pub type IndexerResult = Result; pub enum IndexerTaskMessage { Done, - Error(SyncingTaskError), + Error(IndexingError), } diff --git a/src/synchronizer/mod.rs b/src/synchronizer/mod.rs index 8487d5f..856320e 100644 --- a/src/synchronizer/mod.rs +++ b/src/synchronizer/mod.rs @@ -83,22 +83,22 @@ impl SynchronizerBuilder { impl Synchronizer { pub async fn run( - &mut self, + &self, initial_block_id: &BlockId, final_block_id: &BlockId, ) -> Result<(), SynchronizerError> { - let initial_slot = self._resolve_to_slot(initial_block_id).await?; - let mut final_slot = self._resolve_to_slot(final_block_id).await?; + let initial_slot = self.resolve_to_slot(initial_block_id).await?; + let mut final_slot = self.resolve_to_slot(final_block_id).await?; if initial_slot == final_slot { return Ok(()); } loop { - self._sync_slots_by_checkpoints(initial_slot, final_slot) + self.sync_slots_by_checkpoints(initial_slot, final_slot) .await?; - let latest_final_slot = self._resolve_to_slot(final_block_id).await?; + let latest_final_slot = self.resolve_to_slot(final_block_id).await?; if final_slot == latest_final_slot { return Ok(()); @@ -108,7 +108,7 @@ impl Synchronizer { } } - async fn _sync_slots(&mut self, from_slot: u32, to_slot: u32) -> Result<(), SynchronizerError> { + async fn sync_slots(&self, from_slot: u32, to_slot: u32) -> Result<(), SynchronizerError> { let is_reverse_sync = to_slot < from_slot; let unprocessed_slots = to_slot.abs_diff(from_slot); let min_slots_per_thread = std::cmp::min(unprocessed_slots, self.min_slots_per_thread); @@ -190,8 +190,8 @@ impl Synchronizer { } } - async fn _sync_slots_by_checkpoints( - &mut self, + async fn sync_slots_by_checkpoints( + &self, initial_slot: u32, final_slot: u32, ) -> Result<(), SynchronizerError> { @@ -222,7 +222,7 @@ impl Synchronizer { checkpoint_final_slot = final_chunk_slot ); - self._sync_slots(initial_chunk_slot, final_chunk_slot) + self.sync_slots(initial_chunk_slot, final_chunk_slot) .instrument(sync_slots_chunk_span) .await?; @@ -293,7 +293,7 @@ impl Synchronizer { Ok(()) } - async fn _resolve_to_slot(&self, block_id: &BlockId) -> Result { + async fn resolve_to_slot(&self, block_id: &BlockId) -> Result { let beacon_client = self.context.beacon_client(); let resolved_block_id: Result = match block_id {