Skip to content

Commit

Permalink
refactor(indexer): check for reorgs by verifying parent and current b…
Browse files Browse the repository at this point in the history
…lock root hashes instead of listening to `chain_reorg` sse event
  • Loading branch information
PJColombo committed Jun 10, 2024
1 parent a5bc871 commit 4144f4b
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 210 deletions.
73 changes: 30 additions & 43 deletions src/indexer/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,82 +2,69 @@ use tokio::sync::mpsc::error::SendError;

use crate::{clients::common::ClientError, synchronizer::error::SynchronizerError};

use super::types::IndexerTaskMessage;
use super::{
event_handlers::{
finalized_checkpoint::FinalizedCheckpointEventHandlerError, head::HeadEventHandlerError,
},
types::IndexerTaskMessage,
};

#[derive(Debug, thiserror::Error)]
pub enum IndexerError {
#[error("failed to create indexer")]
CreationFailure(#[source] anyhow::Error),
#[error(transparent)]
SyncingTaskError(#[from] SyncingTaskError),
SyncingTaskError(#[from] IndexingError),
#[error("failed to retrieve blobscan's sync state")]
BlobscanSyncStateRetrievalError(#[source] ClientError),
#[error("sync task message send failure")]
#[error("failed to send syncing task message")]
SyncingTaskMessageSendFailure(#[from] SendError<IndexerTaskMessage>),
}

#[derive(Debug, thiserror::Error)]
pub enum SyncingTaskError {
#[error("an error ocurred while syncing historical data")]
HistoricalSyncingTaskError(#[from] HistoricalSyncingError),
#[error("an error occurred while syncing realtime data")]
RealtimeSyncingTaskError(#[from] RealtimeSyncingError),
pub enum IndexingError {
#[error(transparent)]
HistoricalIndexingFailure(#[from] HistoricalIndexingError),
#[error(transparent)]
LiveIndexingError(#[from] LiveIndexingError),
}

#[derive(Debug, thiserror::Error)]
pub enum HistoricalSyncingError {
pub enum HistoricalIndexingError {
#[error(transparent)]
SynchronizerError(#[from] SynchronizerError),
}

#[derive(Debug, thiserror::Error)]
pub enum RealtimeSyncingError {
pub enum LiveIndexingError {
#[error("an error ocurred while receiving beacon events")]
BeaconEventsConnectionFailure(#[from] reqwest_eventsource::Error),
#[error("failed to subscribe to beacon events")]
BeaconEventsSubscriptionError(#[source] ClientError),
#[error("unexpected event \"{0}\" received")]
UnexpectedBeaconEvent(String),
#[error(transparent)]
BeaconEventProcessingError(#[from] BeaconEventError),
}

#[derive(Debug, thiserror::Error)]
pub enum BeaconEventError {
#[error("failed to handle \"chain_reorged\" event")]
ChainReorged(#[from] ChainReorgedEventHandlingError),
#[error("failed to handle \"head\" event")]
HeadBlock(#[from] HeadBlockEventHandlingError),
#[error("failed to handle \"finalized_checkpoint\" event")]
FinalizedCheckpoint(#[from] FinalizedBlockEventHandlingError),
#[error("failed to handle beacon event")]
BeaconEventHandlingError(#[from] EventHandlerError),
}

#[derive(Debug, thiserror::Error)]
pub enum FinalizedBlockEventHandlingError {
pub enum EventHandlerError {
#[error(transparent)]
EventDeserializationFailure(#[from] serde_json::Error),
#[error("failed to retrieve finalized block {0}")]
BlockRetrievalError(String, #[source] ClientError),
HeadEventHandlerError(#[from] HeadEventHandlerError),
#[error(transparent)]
Other(#[from] anyhow::Error),
#[error("failed to update blobscan's last finalized block")]
BlobscanSyncStateUpdateError(#[source] ClientError),
FinalizedCheckpointHandlerError(#[from] FinalizedCheckpointEventHandlerError),
}

#[derive(Debug, thiserror::Error)]
pub enum ChainReorgedEventHandlingError {
#[error(transparent)]
EventDeserializationFailure(#[from] serde_json::Error),
#[error("failed to retrieve reorged block {0}")]
BlockRetrievalError(String, #[source] ClientError),
#[error("failed to handle reorged of depth {0} starting at block {1}")]
ReorgedHandlingFailure(u32, String, #[source] ClientError),
impl From<HeadEventHandlerError> for LiveIndexingError {
fn from(err: HeadEventHandlerError) -> Self {
LiveIndexingError::BeaconEventHandlingError(EventHandlerError::HeadEventHandlerError(err))
}
}

#[derive(Debug, thiserror::Error)]
pub enum HeadBlockEventHandlingError {
#[error(transparent)]
EventDeserializationFailure(#[from] serde_json::Error),
#[error(transparent)]
SynchronizerError(#[from] SynchronizerError),
impl From<FinalizedCheckpointEventHandlerError> for LiveIndexingError {
fn from(err: FinalizedCheckpointEventHandlerError) -> Self {
LiveIndexingError::BeaconEventHandlingError(
EventHandlerError::FinalizedCheckpointHandlerError(err),
)
}
}
85 changes: 85 additions & 0 deletions src/indexer/event_handlers/finalized_checkpoint.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use tracing::info;

use crate::{
clients::{
beacon::types::{BlockId, FinalizedCheckpointEventData},
blobscan::types::BlockchainSyncState,
common::ClientError,
},
context::Context,
utils::web3::get_full_hash,
};

#[derive(Debug, thiserror::Error)]
pub enum FinalizedCheckpointEventHandlerError {
#[error(transparent)]
EventDeserializationFailure(#[from] serde_json::Error),
#[error("failed to retrieve block {0}")]
BlockRetrievalError(String, #[source] ClientError),
#[error("block \"{0}\" not found")]
BlockNotFound(String),
#[error("failed to update last finalized block")]
BlobscanFinalizedBlockUpdateFailure(#[source] ClientError),
}

pub struct FinalizedCheckpointHandler {
context: Context,
}

impl FinalizedCheckpointHandler {
pub fn new(context: Context) -> Self {
FinalizedCheckpointHandler { context }
}

pub async fn handle(
&self,
event_data: String,
) -> Result<(), FinalizedCheckpointEventHandlerError> {
let finalized_checkpoint_data =
serde_json::from_str::<FinalizedCheckpointEventData>(&event_data)?;
let block_hash = finalized_checkpoint_data.block;
let full_block_hash = get_full_hash(&block_hash);
let last_finalized_block_number = match self
.context
.beacon_client()
.get_block(&BlockId::Hash(block_hash))
.await
.map_err(|err| {
FinalizedCheckpointEventHandlerError::BlockRetrievalError(
full_block_hash.clone(),
err,
)
})? {
Some(block) => match block.message.body.execution_payload {
Some(execution_payload) => execution_payload.block_number,
None => {
return Err(FinalizedCheckpointEventHandlerError::BlockNotFound(
full_block_hash,
))
}
},
None => {
return Err(FinalizedCheckpointEventHandlerError::BlockNotFound(
full_block_hash,
))
}
};

self.context
.blobscan_client()
.update_sync_state(BlockchainSyncState {
last_lower_synced_slot: None,
last_upper_synced_slot: None,
last_finalized_block: Some(last_finalized_block_number),
})
.await
.map_err(FinalizedCheckpointEventHandlerError::BlobscanFinalizedBlockUpdateFailure)?;

info!(
finalized_execution_block = last_finalized_block_number,
"Finalized checkpoint event received. Updated last finalized block number"
);

Ok(())
}
}
139 changes: 139 additions & 0 deletions src/indexer/event_handlers/head.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
use std::cmp;

use ethers::types::H256;
use tracing::info;

use crate::{
clients::{
beacon::types::{BlockHeader, BlockId, HeadEventData},
blobscan::types::BlockchainSyncState,
common::ClientError,
},
context::Context,
synchronizer::{error::SynchronizerError, Synchronizer},
};

#[derive(Debug, thiserror::Error)]
pub enum HeadEventHandlerError {
#[error(transparent)]
EventDeserializationFailure(#[from] serde_json::Error),
#[error("failed to retrieve header for block \"{0}\"")]
BlockHeaderRetrievalError(BlockId, #[source] ClientError),
#[error("header for block \"{0}\" not found")]
BlockHeaderNotFound(BlockId),
#[error("failed to index head block")]
BlockSyncedError(#[from] SynchronizerError),
#[error("failed to handle reorged slots")]
BlobscanReorgedSlotsFailure(#[source] ClientError),
#[error("failed to update blobscan's sync state")]
BlobscanSyncStateUpdateError(#[source] ClientError),
}

pub struct HeadEventHandler {
context: Context,
synchronizer: Synchronizer,
start_block_id: BlockId,
last_block_hash: Option<H256>,
}

impl HeadEventHandler {
pub fn new(context: Context, synchronizer: Synchronizer, start_block_id: BlockId) -> Self {
HeadEventHandler {
context,
synchronizer,
start_block_id,
last_block_hash: None,
}
}

pub async fn handle(&mut self, event_data: String) -> Result<(), HeadEventHandlerError> {
let head_block_data = serde_json::from_str::<HeadEventData>(&event_data)?;

let head_block_slot = head_block_data.slot;
let head_block_hash = head_block_data.block;

let head_block_id = BlockId::Slot(head_block_data.slot);
let initial_block_id = if self.last_block_hash.is_none() {
self.start_block_id.clone()
} else {
head_block_id.clone()
};

let head_block_header = self.get_block_header(&head_block_id).await?.header;

if let Some(last_block_hash) = self.last_block_hash {
if last_block_hash != head_block_header.message.parent_root {
let parent_block_header = self
.get_block_header(&BlockId::Hash(head_block_header.message.parent_root))
.await?
.header;
let parent_block_slot = parent_block_header.message.slot;
let reorg_start_slot = parent_block_slot + 1;
let reorg_final_slot = head_block_slot;
let reorged_slots = (reorg_start_slot..reorg_final_slot).collect::<Vec<u32>>();

let result: Result<(), HeadEventHandlerError> = async {
let total_updated_slots = self.context
.blobscan_client()
.handle_reorged_slots(reorged_slots.as_slice())
.await
.map_err(HeadEventHandlerError::BlobscanReorgedSlotsFailure)?;


info!(slot=head_block_slot, "Reorganization detected. Found the following reorged slots: {:#?}. Total slots marked as reorged: {total_updated_slots}", reorged_slots);

// Re-index parent block as it may be mark as reorged and not indexed
self.synchronizer
.run(
&BlockId::Slot(parent_block_slot),
&BlockId::Slot(parent_block_slot + 1),
)
.await?;

Ok(())
}
.await;

if let Err(err) = result {
// If an error occurred while handling the reorg try to update the latest synced slot to the last known slot before the reorg
self.context
.blobscan_client()
.update_sync_state(BlockchainSyncState {
last_finalized_block: None,
last_lower_synced_slot: None,
last_upper_synced_slot: Some(cmp::max(parent_block_slot - 1, 0)),
})
.await
.map_err(HeadEventHandlerError::BlobscanSyncStateUpdateError)?;

return Err(err);
}
}
}

self.synchronizer
.run(&initial_block_id, &BlockId::Slot(head_block_slot + 1))
.await?;

self.last_block_hash = Some(head_block_hash);

Ok(())
}

async fn get_block_header(
&self,
block_id: &BlockId,
) -> Result<BlockHeader, HeadEventHandlerError> {
match self
.context
.beacon_client()
.get_block_header(block_id)
.await
.map_err(|err| {
HeadEventHandlerError::BlockHeaderRetrievalError(block_id.clone(), err)
})? {
Some(block) => Ok(block),
None => Err(HeadEventHandlerError::BlockHeaderNotFound(block_id.clone())),
}
}
}
2 changes: 2 additions & 0 deletions src/indexer/event_handlers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod finalized_checkpoint;
pub mod head;
Loading

0 comments on commit 4144f4b

Please sign in to comment.