Skip to content

Commit

Permalink
refactor: abstract synchronizer's functions into a trait
Browse files Browse the repository at this point in the history
  • Loading branch information
PJColombo committed Jun 21, 2024
1 parent 9dd0350 commit 7dfd24d
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 32 deletions.
6 changes: 3 additions & 3 deletions src/indexer/event_handlers/head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
common::ClientError,
},
context::CommonContext,
synchronizer::{error::SynchronizerError, Synchronizer},
synchronizer::{error::SynchronizerError, CommonSynchronizer},
};

#[derive(Debug, thiserror::Error)]
Expand All @@ -31,15 +31,15 @@ pub enum HeadEventHandlerError {

pub struct HeadEventHandler<T> {
context: Box<dyn CommonContext<T>>,
synchronizer: Synchronizer<T>,
synchronizer: Box<dyn CommonSynchronizer>,
start_block_id: BlockId,
last_block_hash: Option<H256>,
}

impl HeadEventHandler<HttpProvider> {
pub fn new(
context: Box<dyn CommonContext<HttpProvider>>,
synchronizer: Synchronizer<HttpProvider>,
synchronizer: Box<dyn CommonSynchronizer>,
start_block_id: BlockId,
) -> Self {
HeadEventHandler {
Expand Down
6 changes: 3 additions & 3 deletions src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
context::{CommonContext, Config as ContextConfig, Context},
env::Environment,
indexer::error::HistoricalIndexingError,
synchronizer::{CheckpointType, Synchronizer, SynchronizerBuilder},
synchronizer::{CheckpointType, CommonSynchronizer, SynchronizerBuilder},
};

use self::{
Expand Down Expand Up @@ -289,7 +289,7 @@ impl Indexer<HttpProvider> {
})
}

fn create_synchronizer(&self, checkpoint_type: CheckpointType) -> Synchronizer<HttpProvider> {
fn create_synchronizer(&self, checkpoint_type: CheckpointType) -> Box<dyn CommonSynchronizer> {
let mut synchronizer_builder = SynchronizerBuilder::new();

if let Some(checkpoint_slots) = self.checkpoint_slots {
Expand All @@ -302,6 +302,6 @@ impl Indexer<HttpProvider> {

synchronizer_builder.with_num_threads(self.num_threads);

synchronizer_builder.build(self.context.clone())
Box::new(synchronizer_builder.build(self.context.clone()))
}
}
67 changes: 41 additions & 26 deletions src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::fmt::Debug;

use anyhow::anyhow;
use async_trait::async_trait;
use ethers::providers::Http as HttpProvider;
use futures::future::join_all;
use tokio::task::JoinHandle;
Expand All @@ -14,6 +17,15 @@ use self::error::{SlotsChunksErrors, SynchronizerError};

pub mod error;

#[async_trait]
pub trait CommonSynchronizer: Send + Sync + Debug {
async fn run(
&self,
initial_block_id: &BlockId,
final_block_id: &BlockId,
) -> Result<(), SynchronizerError>;
}

#[derive(Debug)]
pub struct SynchronizerBuilder {
num_threads: u32,
Expand Down Expand Up @@ -86,32 +98,6 @@ impl SynchronizerBuilder {
}

impl Synchronizer<HttpProvider> {
pub async fn run(
&self,
initial_block_id: &BlockId,
final_block_id: &BlockId,
) -> Result<(), SynchronizerError> {
let initial_slot = self.resolve_to_slot(initial_block_id).await?;
let mut final_slot = self.resolve_to_slot(final_block_id).await?;

if initial_slot == final_slot {
return Ok(());
}

loop {
self.sync_slots_by_checkpoints(initial_slot, final_slot)
.await?;

let latest_final_slot = self.resolve_to_slot(final_block_id).await?;

if final_slot == latest_final_slot {
return Ok(());
}

final_slot = latest_final_slot;
}
}

async fn sync_slots(&self, from_slot: u32, to_slot: u32) -> Result<(), SynchronizerError> {
let is_reverse_sync = to_slot < from_slot;
let unprocessed_slots = to_slot.abs_diff(from_slot);
Expand Down Expand Up @@ -322,3 +308,32 @@ impl Synchronizer<HttpProvider> {
}
}
}

#[async_trait]
impl CommonSynchronizer for Synchronizer<HttpProvider> {
async fn run(
&self,
initial_block_id: &BlockId,
final_block_id: &BlockId,
) -> Result<(), SynchronizerError> {
let initial_slot = self.resolve_to_slot(initial_block_id).await?;
let mut final_slot = self.resolve_to_slot(final_block_id).await?;

if initial_slot == final_slot {
return Ok(());
}

loop {
self.sync_slots_by_checkpoints(initial_slot, final_slot)
.await?;

let latest_final_slot = self.resolve_to_slot(final_block_id).await?;

if final_slot == latest_final_slot {
return Ok(());
}

final_slot = latest_final_slot;
}
}
}

0 comments on commit 7dfd24d

Please sign in to comment.