diff --git a/src/indexer/event_handlers/head.rs b/src/indexer/event_handlers/head.rs index 55062c9..e1ec695 100644 --- a/src/indexer/event_handlers/head.rs +++ b/src/indexer/event_handlers/head.rs @@ -10,7 +10,7 @@ use crate::{ common::ClientError, }, context::CommonContext, - synchronizer::{error::SynchronizerError, Synchronizer}, + synchronizer::{error::SynchronizerError, CommonSynchronizer}, }; #[derive(Debug, thiserror::Error)] @@ -31,7 +31,7 @@ pub enum HeadEventHandlerError { pub struct HeadEventHandler { context: Box>, - synchronizer: Synchronizer, + synchronizer: Box, start_block_id: BlockId, last_block_hash: Option, } @@ -39,7 +39,7 @@ pub struct HeadEventHandler { impl HeadEventHandler { pub fn new( context: Box>, - synchronizer: Synchronizer, + synchronizer: Box, start_block_id: BlockId, ) -> Self { HeadEventHandler { diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index eebec8a..0236076 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -14,7 +14,7 @@ use crate::{ context::{CommonContext, Config as ContextConfig, Context}, env::Environment, indexer::error::HistoricalIndexingError, - synchronizer::{CheckpointType, Synchronizer, SynchronizerBuilder}, + synchronizer::{CheckpointType, CommonSynchronizer, SynchronizerBuilder}, }; use self::{ @@ -289,7 +289,7 @@ impl Indexer { }) } - fn create_synchronizer(&self, checkpoint_type: CheckpointType) -> Synchronizer { + fn create_synchronizer(&self, checkpoint_type: CheckpointType) -> Box { let mut synchronizer_builder = SynchronizerBuilder::new(); if let Some(checkpoint_slots) = self.checkpoint_slots { @@ -302,6 +302,6 @@ impl Indexer { synchronizer_builder.with_num_threads(self.num_threads); - synchronizer_builder.build(self.context.clone()) + Box::new(synchronizer_builder.build(self.context.clone())) } } diff --git a/src/synchronizer/mod.rs b/src/synchronizer/mod.rs index 36b7df3..167dfe9 100644 --- a/src/synchronizer/mod.rs +++ b/src/synchronizer/mod.rs @@ -1,4 +1,7 @@ +use std::fmt::Debug; + use anyhow::anyhow; +use async_trait::async_trait; use ethers::providers::Http as HttpProvider; use futures::future::join_all; use tokio::task::JoinHandle; @@ -14,6 +17,15 @@ use self::error::{SlotsChunksErrors, SynchronizerError}; pub mod error; +#[async_trait] +pub trait CommonSynchronizer: Send + Sync + Debug { + async fn run( + &self, + initial_block_id: &BlockId, + final_block_id: &BlockId, + ) -> Result<(), SynchronizerError>; +} + #[derive(Debug)] pub struct SynchronizerBuilder { num_threads: u32, @@ -86,32 +98,6 @@ impl SynchronizerBuilder { } impl Synchronizer { - pub async fn run( - &self, - initial_block_id: &BlockId, - final_block_id: &BlockId, - ) -> Result<(), SynchronizerError> { - let initial_slot = self.resolve_to_slot(initial_block_id).await?; - let mut final_slot = self.resolve_to_slot(final_block_id).await?; - - if initial_slot == final_slot { - return Ok(()); - } - - loop { - self.sync_slots_by_checkpoints(initial_slot, final_slot) - .await?; - - let latest_final_slot = self.resolve_to_slot(final_block_id).await?; - - if final_slot == latest_final_slot { - return Ok(()); - } - - final_slot = latest_final_slot; - } - } - async fn sync_slots(&self, from_slot: u32, to_slot: u32) -> Result<(), SynchronizerError> { let is_reverse_sync = to_slot < from_slot; let unprocessed_slots = to_slot.abs_diff(from_slot); @@ -322,3 +308,32 @@ impl Synchronizer { } } } + +#[async_trait] +impl CommonSynchronizer for Synchronizer { + async fn run( + &self, + initial_block_id: &BlockId, + final_block_id: &BlockId, + ) -> Result<(), SynchronizerError> { + let initial_slot = self.resolve_to_slot(initial_block_id).await?; + let mut final_slot = self.resolve_to_slot(final_block_id).await?; + + if initial_slot == final_slot { + return Ok(()); + } + + loop { + self.sync_slots_by_checkpoints(initial_slot, final_slot) + .await?; + + let latest_final_slot = self.resolve_to_slot(final_block_id).await?; + + if final_slot == latest_final_slot { + return Ok(()); + } + + final_slot = latest_final_slot; + } + } +}