Skip to content

Commit

Permalink
refactor: abstract all clients and context functions into traits
Browse files Browse the repository at this point in the history
  • Loading branch information
PJColombo committed Jun 21, 2024
1 parent 0d2b059 commit 9dd0350
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 61 deletions.
11 changes: 9 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = "0.1.66"
async-trait = "0.1.80"
dyn-clone = "1.0.17"
dotenv = "0.15.0"
envy = "0.4.2"
ethers = "1.0.2"
Expand Down
23 changes: 19 additions & 4 deletions src/clients/beacon/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use std::fmt::Debug;

use anyhow::Context as AnyhowContext;
use async_trait::async_trait;
use backoff::ExponentialBackoff;

use reqwest::{Client, Url};
use reqwest_eventsource::EventSource;

Expand All @@ -24,6 +28,14 @@ pub struct Config {
pub exp_backoff: Option<ExponentialBackoff>,
}

#[async_trait]
pub trait CommonBeaconClient: Send + Sync + Debug {
async fn get_block(&self, block_id: &BlockId) -> ClientResult<Option<Block>>;
async fn get_block_header(&self, block_id: &BlockId) -> ClientResult<Option<BlockHeader>>;
async fn get_blobs(&self, block_id: &BlockId) -> ClientResult<Option<Vec<Blob>>>;
fn subscribe_to_events(&self, topics: &[Topic]) -> ClientResult<EventSource>;
}

impl BeaconClient {
pub fn try_with_client(client: Client, config: Config) -> ClientResult<Self> {
let base_url = Url::parse(&format!("{}/eth/", config.base_url))
Expand All @@ -36,8 +48,11 @@ impl BeaconClient {
exp_backoff,
})
}
}

pub async fn get_block(&self, block_id: &BlockId) -> ClientResult<Option<Block>> {
#[async_trait]
impl CommonBeaconClient for BeaconClient {
async fn get_block(&self, block_id: &BlockId) -> ClientResult<Option<Block>> {
let path = format!("v2/beacon/blocks/{}", { block_id.to_detailed_string() });
let url = self.base_url.join(path.as_str())?;

Expand All @@ -47,7 +62,7 @@ impl BeaconClient {
})
}

pub async fn get_block_header(&self, block_id: &BlockId) -> ClientResult<Option<BlockHeader>> {
async fn get_block_header(&self, block_id: &BlockId) -> ClientResult<Option<BlockHeader>> {
let path = format!("v1/beacon/headers/{}", { block_id.to_detailed_string() });
let url = self.base_url.join(path.as_str())?;

Expand All @@ -63,7 +78,7 @@ impl BeaconClient {
})
}

pub async fn get_blobs(&self, block_id: &BlockId) -> ClientResult<Option<Vec<Blob>>> {
async fn get_blobs(&self, block_id: &BlockId) -> ClientResult<Option<Vec<Blob>>> {
let path = format!("v1/beacon/blob_sidecars/{}", {
block_id.to_detailed_string()
});
Expand All @@ -75,7 +90,7 @@ impl BeaconClient {
})
}

pub fn subscribe_to_events(&self, topics: &[Topic]) -> ClientResult<EventSource> {
fn subscribe_to_events(&self, topics: &[Topic]) -> ClientResult<EventSource> {
let topics = topics
.iter()
.map(|topic| topic.into())
Expand Down
33 changes: 27 additions & 6 deletions src/clients/blobscan/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::fmt::Debug;

use async_trait::async_trait;
use backoff::ExponentialBackoff;
use chrono::TimeDelta;
use reqwest::{Client, Url};
Expand All @@ -18,6 +21,23 @@ use self::{
mod jwt_manager;

pub mod types;

#[async_trait]
pub trait CommonBlobscanClient: Send + Sync + Debug {
fn try_with_client(client: Client, config: Config) -> ClientResult<Self>
where
Self: Sized;
async fn index(
&self,
block: Block,
transactions: Vec<Transaction>,
blobs: Vec<Blob>,
) -> ClientResult<()>;
async fn handle_reorged_slots(&self, slots: &[u32]) -> ClientResult<u32>;
async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()>;
async fn get_sync_state(&self) -> ClientResult<Option<BlockchainSyncState>>;
}

#[derive(Debug, Clone)]
pub struct BlobscanClient {
base_url: Url,
Expand All @@ -32,8 +52,9 @@ pub struct Config {
pub exp_backoff: Option<ExponentialBackoff>,
}

impl BlobscanClient {
pub fn try_with_client(client: Client, config: Config) -> ClientResult<Self> {
#[async_trait]
impl CommonBlobscanClient for BlobscanClient {
fn try_with_client(client: Client, config: Config) -> ClientResult<Self> {
let base_url = Url::parse(&format!("{}/", config.base_url))?;
let jwt_manager = JWTManager::new(JWTManagerConfig {
secret_key: config.secret_key,
Expand All @@ -50,7 +71,7 @@ impl BlobscanClient {
})
}

pub async fn index(
async fn index(
&self,
block: Block,
transactions: Vec<Transaction>,
Expand All @@ -67,7 +88,7 @@ impl BlobscanClient {
json_put!(&self.client, url, token, &req).map(|_: Option<()>| ())
}

pub async fn handle_reorged_slots(&self, slots: &[u32]) -> ClientResult<u32> {
async fn handle_reorged_slots(&self, slots: &[u32]) -> ClientResult<u32> {
let url = self.base_url.join("indexer/reorged-slots")?;
let token = self.jwt_manager.get_token()?;
let req = ReorgedSlotsRequest {
Expand All @@ -78,15 +99,15 @@ impl BlobscanClient {
.map(|res: Option<ReorgedSlotsResponse>| res.unwrap().total_updated_slots)
}

pub async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()> {
async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()> {
let url = self.base_url.join("blockchain-sync-state")?;
let token = self.jwt_manager.get_token()?;
let req: BlockchainSyncStateRequest = sync_state.into();

json_put!(&self.client, url, token, &req).map(|_: Option<()>| ())
}

pub async fn get_sync_state(&self) -> ClientResult<Option<BlockchainSyncState>> {
async fn get_sync_state(&self) -> ClientResult<Option<BlockchainSyncState>> {
let url = self.base_url.join("blockchain-sync-state")?;
json_get!(
&self.client,
Expand Down
47 changes: 30 additions & 17 deletions src/context.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
use std::{sync::Arc, time::Duration};
use std::{fmt::Debug, sync::Arc, time::Duration};

use anyhow::Result as AnyhowResult;
use backoff::ExponentialBackoffBuilder;
use ethers::prelude::*;
use dyn_clone::DynClone;
use ethers::providers::{Http as HttpProvider, Provider};

use crate::{
clients::beacon::{BeaconClient, Config as BeaconClientConfig},
clients::blobscan::{BlobscanClient, Config as BlobscanClientConfig},
clients::{
beacon::{BeaconClient, CommonBeaconClient, Config as BeaconClientConfig},
blobscan::{BlobscanClient, CommonBlobscanClient, Config as BlobscanClientConfig},
},
env::Environment,
};

#[derive(Debug, Clone)]
struct ContextRef {
pub beacon_client: BeaconClient,
pub blobscan_client: BlobscanClient,
pub provider: Provider<Http>,
dyn_clone::clone_trait_object!(CommonContext<HttpProvider>);

pub trait CommonContext<T>: Send + Sync + Debug + DynClone {
fn beacon_client(&self) -> &Box<dyn CommonBeaconClient>;
fn blobscan_client(&self) -> &Box<dyn CommonBlobscanClient>;
fn provider(&self) -> &Provider<T>;
}

pub struct Config {
Expand All @@ -24,6 +28,13 @@ pub struct Config {
pub secret_key: String,
}

#[derive(Debug)]
struct ContextRef {
pub beacon_client: Box<dyn CommonBeaconClient>,
pub blobscan_client: Box<dyn CommonBlobscanClient>,
pub provider: Provider<HttpProvider>,
}

#[derive(Debug, Clone)]
pub struct Context {
inner: Arc<ContextRef>,
Expand All @@ -45,35 +56,37 @@ impl Context {

Ok(Self {
inner: Arc::new(ContextRef {
blobscan_client: BlobscanClient::try_with_client(
blobscan_client: Box::new(BlobscanClient::try_with_client(
client.clone(),
BlobscanClientConfig {
base_url: blobscan_api_endpoint,
secret_key,
exp_backoff: exp_backoff.clone(),
},
)?,
beacon_client: BeaconClient::try_with_client(
)?),
beacon_client: Box::new(BeaconClient::try_with_client(
client,
BeaconClientConfig {
base_url: beacon_node_url,
exp_backoff,
},
)?,
provider: Provider::<Http>::try_from(execution_node_endpoint)?,
)?),
provider: Provider::<HttpProvider>::try_from(execution_node_endpoint)?,
}),
})
}
}

pub fn beacon_client(&self) -> &BeaconClient {
impl CommonContext<HttpProvider> for Context {
fn beacon_client(&self) -> &Box<dyn CommonBeaconClient> {
&self.inner.beacon_client
}

pub fn blobscan_client(&self) -> &BlobscanClient {
fn blobscan_client(&self) -> &Box<dyn CommonBlobscanClient> {
&self.inner.blobscan_client
}

pub fn provider(&self) -> &Provider<Http> {
fn provider(&self) -> &Provider<HttpProvider> {
&self.inner.provider
}
}
Expand Down
11 changes: 6 additions & 5 deletions src/indexer/event_handlers/finalized_checkpoint.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use ethers::providers::Http as HttpProvider;
use tracing::info;

use crate::{
Expand All @@ -6,7 +7,7 @@ use crate::{
blobscan::types::BlockchainSyncState,
common::ClientError,
},
context::Context,
context::CommonContext,
utils::web3::get_full_hash,
};

Expand All @@ -22,12 +23,12 @@ pub enum FinalizedCheckpointEventHandlerError {
BlobscanFinalizedBlockUpdateFailure(#[source] ClientError),
}

pub struct FinalizedCheckpointHandler {
context: Context,
pub struct FinalizedCheckpointHandler<T> {
context: Box<dyn CommonContext<T>>,
}

impl FinalizedCheckpointHandler {
pub fn new(context: Context) -> Self {
impl FinalizedCheckpointHandler<HttpProvider> {
pub fn new(context: Box<dyn CommonContext<HttpProvider>>) -> Self {
FinalizedCheckpointHandler { context }
}

Expand Down
18 changes: 11 additions & 7 deletions src/indexer/event_handlers/head.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::cmp;

use ethers::types::H256;
use ethers::{providers::Http as HttpProvider, types::H256};
use tracing::info;

use crate::{
Expand All @@ -9,7 +9,7 @@ use crate::{
blobscan::types::BlockchainSyncState,
common::ClientError,
},
context::Context,
context::CommonContext,
synchronizer::{error::SynchronizerError, Synchronizer},
};

Expand All @@ -29,15 +29,19 @@ pub enum HeadEventHandlerError {
BlobscanSyncStateUpdateError(#[source] ClientError),
}

pub struct HeadEventHandler {
context: Context,
synchronizer: Synchronizer,
pub struct HeadEventHandler<T> {
context: Box<dyn CommonContext<T>>,
synchronizer: Synchronizer<T>,
start_block_id: BlockId,
last_block_hash: Option<H256>,
}

impl HeadEventHandler {
pub fn new(context: Context, synchronizer: Synchronizer, start_block_id: BlockId) -> Self {
impl HeadEventHandler<HttpProvider> {
pub fn new(
context: Box<dyn CommonContext<HttpProvider>>,
synchronizer: Synchronizer<HttpProvider>,
start_block_id: BlockId,
) -> Self {
HeadEventHandler {
context,
synchronizer,
Expand Down
Loading

0 comments on commit 9dd0350

Please sign in to comment.