Skip to content

Commit

Permalink
feat: support reverse sync
Browse files Browse the repository at this point in the history
  • Loading branch information
PJColombo committed Feb 9, 2024
1 parent 3dc2c9a commit 6ca34d2
Show file tree
Hide file tree
Showing 7 changed files with 301 additions and 226 deletions.
12 changes: 12 additions & 0 deletions src/clients/beacon/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::{fmt, str::FromStr};
use ethers::types::{Bytes, H256};
use serde::{Deserialize, Serialize};

use crate::slots_processor::BlockData;

#[derive(Serialize, Debug, Clone)]
pub enum BlockId {
Head,
Expand Down Expand Up @@ -73,6 +75,7 @@ pub struct InnerBlockHeader {

#[derive(Deserialize, Debug)]
pub struct BlockHeaderMessage {
pub parent_root: H256,
#[serde(deserialize_with = "deserialize_slot")]
pub slot: u32,
}
Expand Down Expand Up @@ -128,3 +131,12 @@ impl From<&Topic> for String {
}
}
}

impl From<HeadBlockEventData> for BlockData {
fn from(event_data: HeadBlockEventData) -> Self {
Self {
root: event_data.block,
slot: event_data.slot,
}
}
}
33 changes: 20 additions & 13 deletions src/clients/blobscan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use crate::{clients::common::ClientResult, json_get, json_put};
use self::{
jwt_manager::{Config as JWTManagerConfig, JWTManager},
types::{
Blob, Block, IndexRequest, ReorgedSlotRequest, SlotRequest, SlotResponse, Transaction,
Blob, Block, BlockchainSyncState, BlockchainSyncStateRequest, BlockchainSyncStateResponse,
IndexRequest, ReorgedSlotRequest, Transaction,
},
};

Expand All @@ -29,7 +30,7 @@ pub struct Config {

impl BlobscanClient {
pub fn try_with_client(client: Client, config: Config) -> ClientResult<Self> {
let base_url = Url::parse(&format!("{}/api/indexer/", config.base_url))?;
let base_url = Url::parse(&format!("{}/api/", config.base_url))?;
let jwt_manager = JWTManager::new(JWTManagerConfig {
secret_key: config.secret_key,
refresh_interval: chrono::Duration::hours(1),
Expand All @@ -51,7 +52,7 @@ impl BlobscanClient {
transactions: Vec<Transaction>,
blobs: Vec<Blob>,
) -> ClientResult<()> {
let url = self.base_url.join("block-txs-blobs")?;
let url = self.base_url.join("indexer/block-txs-blobs")?;
let token = self.jwt_manager.get_token()?;
let req = IndexRequest {
block,
Expand All @@ -63,25 +64,31 @@ impl BlobscanClient {
}

pub async fn handle_reorged_slot(&self, slot: u32) -> ClientResult<()> {
let url = self.base_url.join("reorged-slot")?;
let url = self.base_url.join("indexer/reorged-slot")?;
let token = self.jwt_manager.get_token()?;
let req = ReorgedSlotRequest { slot };
let req = ReorgedSlotRequest {
new_head_slot: slot,
};

json_put!(&self.client, url, token, &req).map(|_: Option<()>| ())
}

pub async fn update_slot(&self, slot: u32) -> ClientResult<()> {
let url = self.base_url.join("slot")?;
pub async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()> {
let url = self.base_url.join("blockchain-sync-state")?;
let token = self.jwt_manager.get_token()?;
let req = SlotRequest { slot };
let req: BlockchainSyncStateRequest = sync_state.into();

json_put!(&self.client, url, token, &req).map(|_: Option<()>| ())
}

pub async fn get_slot(&self) -> ClientResult<Option<u32>> {
let url = self.base_url.join("slot")?;

json_get!(&self.client, url, SlotResponse, self.exp_backoff.clone())
.map(|res: Option<SlotResponse>| Some(res.unwrap().slot))
pub async fn get_synced_state(&self) -> ClientResult<Option<BlockchainSyncState>> {
let url = self.base_url.join("blockchain-sync-state")?;
json_get!(
&self.client,
url,
BlockchainSyncStateResponse,
self.exp_backoff.clone()
)
.map(|res: Option<BlockchainSyncStateResponse>| Some(res.unwrap().into()))
}
}
43 changes: 38 additions & 5 deletions src/clients/blobscan/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,27 @@ pub struct FailedSlotsChunk {
}

#[derive(Serialize, Debug)]
pub struct SlotRequest {
pub slot: u32,
#[serde(rename_all = "camelCase")]
pub struct BlockchainSyncStateRequest {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_lower_synced_slot: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_upper_synced_slot: Option<u32>,
}

#[derive(Deserialize, Debug)]
pub struct SlotResponse {
pub slot: u32,
pub struct BlockchainSyncStateResponse {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_lower_synced_slot: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_upper_synced_slot: Option<u32>,
}

#[derive(Debug)]

pub struct BlockchainSyncState {
pub last_lower_synced_slot: Option<u32>,
pub last_upper_synced_slot: Option<u32>,
}

#[derive(Serialize, Debug)]
Expand All @@ -68,8 +82,9 @@ pub struct IndexRequest {
}

#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ReorgedSlotRequest {
pub slot: u32,
pub new_head_slot: u32,
}

impl fmt::Debug for Blob {
Expand Down Expand Up @@ -213,3 +228,21 @@ impl<'a> From<(&'a BeaconBlob, &'a H256, usize, &'a H256)> for Blob {
}
}
}

impl From<BlockchainSyncStateResponse> for BlockchainSyncState {
fn from(response: BlockchainSyncStateResponse) -> Self {
Self {
last_lower_synced_slot: response.last_lower_synced_slot,
last_upper_synced_slot: response.last_upper_synced_slot,
}
}
}

impl From<BlockchainSyncState> for BlockchainSyncStateRequest {
fn from(sync_state: BlockchainSyncState) -> Self {
Self {
last_lower_synced_slot: sync_state.last_lower_synced_slot,
last_upper_synced_slot: sync_state.last_upper_synced_slot,
}
}
}
66 changes: 36 additions & 30 deletions src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use tracing::{debug, error};

use crate::{
args::Args,
clients::beacon::types::{BlockId, HeadBlockEventData, Topic},
clients::{
beacon::types::{BlockId, HeadBlockEventData, Topic},
blobscan::types::BlockchainSyncState,
},
context::{Config as ContextConfig, Context},
env::Environment,
slots_processor::SlotsProcessor,
Expand Down Expand Up @@ -50,39 +53,39 @@ impl Indexer {
let blobscan_client = self.context.blobscan_client();
let mut event_source = beacon_client.subscribe_to_events(vec![Topic::Head])?;

let current_block_id = match start_block_id {
Some(start_slot) => start_slot,
None => match blobscan_client.get_slot().await {
Err(error) => {
error!(target = "indexer", ?error, "Failed to fetch latest slot");
let sync_state = match blobscan_client.get_synced_state().await {
Ok(state) => state,
Err(error) => {
error!(target = "indexer", ?error, "Failed to fetch sync state");

return Err(error.into());
}
Ok(res) => BlockId::Slot(match res {
Some(latest_slot) => latest_slot + 1,
None => 0,
}),
return Err(error.into());
}
};

let current_lower_block_id = match &sync_state {
Some(state) => match state.last_lower_synced_slot {
Some(slot) => BlockId::Slot(slot - 1),
None => BlockId::Head,
},
None => BlockId::Head,
};
let current_upper_block_id = match &sync_state {
Some(state) => match state.last_upper_synced_slot {
Some(slot) => BlockId::Slot(slot + 1),
None => BlockId::Head,
},
None => BlockId::Head,
};

let finalized_block_header = self
.synchronizer
.run(&current_block_id, &BlockId::Finalized)
self.synchronizer
.run(&current_lower_block_id, &BlockId::Slot(0))
.await?;

// We disable parallel processing for better handling of possible reorgs
self.synchronizer.enable_parallel_processing(false);

let head_block_header = self
.synchronizer
.run(
&BlockId::Slot(finalized_block_header.header.message.slot),
&BlockId::Head,
)
self.synchronizer
.run(&current_upper_block_id, &BlockId::Head)
.await?;

let mut last_indexed_block_root = head_block_header.root;
let slots_processor = SlotsProcessor::new(self.context.clone());
let mut slots_processor = SlotsProcessor::new(self.context.clone());

while let Some(event) = event_source.next().await {
match event {
Expand All @@ -91,11 +94,14 @@ impl Indexer {
let head_block_data = serde_json::from_str::<HeadBlockEventData>(&event.data)?;

slots_processor
.process_slot(head_block_data.slot, Some(last_indexed_block_root))
.process_slot(head_block_data.slot, Some(true))
.await?;
blobscan_client
.update_sync_state(BlockchainSyncState {
last_lower_synced_slot: None,
last_upper_synced_slot: Some(head_block_data.slot),
})
.await?;
blobscan_client.update_slot(head_block_data.slot).await?;

last_indexed_block_root = head_block_data.block;
}
Err(error) => {
error!(
Expand Down
2 changes: 2 additions & 0 deletions src/slots_processor/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,7 @@ pub enum SlotsProcessorError {
error: SlotProcessingError,
},
#[error(transparent)]
ClientError(#[from] crate::clients::common::ClientError),
#[error(transparent)]
Other(#[from] anyhow::Error),
}
Loading

0 comments on commit 6ca34d2

Please sign in to comment.