From 6ca34d2b3ce0e5224616c0f094528c017232b343 Mon Sep 17 00:00:00 2001 From: PJColombo Date: Thu, 8 Feb 2024 13:34:49 +0100 Subject: [PATCH] feat: support reverse sync --- src/clients/beacon/types.rs | 12 ++ src/clients/blobscan/mod.rs | 33 +++-- src/clients/blobscan/types.rs | 43 ++++++- src/indexer.rs | 66 +++++----- src/slots_processor/error.rs | 2 + src/slots_processor/mod.rs | 153 +++++++++++++++--------- src/synchronizer/mod.rs | 218 +++++++++++++++------------------- 7 files changed, 301 insertions(+), 226 deletions(-) diff --git a/src/clients/beacon/types.rs b/src/clients/beacon/types.rs index cf7a54e..a328e53 100644 --- a/src/clients/beacon/types.rs +++ b/src/clients/beacon/types.rs @@ -3,6 +3,8 @@ use std::{fmt, str::FromStr}; use ethers::types::{Bytes, H256}; use serde::{Deserialize, Serialize}; +use crate::slots_processor::BlockData; + #[derive(Serialize, Debug, Clone)] pub enum BlockId { Head, @@ -73,6 +75,7 @@ pub struct InnerBlockHeader { #[derive(Deserialize, Debug)] pub struct BlockHeaderMessage { + pub parent_root: H256, #[serde(deserialize_with = "deserialize_slot")] pub slot: u32, } @@ -128,3 +131,12 @@ impl From<&Topic> for String { } } } + +impl From for BlockData { + fn from(event_data: HeadBlockEventData) -> Self { + Self { + root: event_data.block, + slot: event_data.slot, + } + } +} diff --git a/src/clients/blobscan/mod.rs b/src/clients/blobscan/mod.rs index 1da443e..035ab5a 100644 --- a/src/clients/blobscan/mod.rs +++ b/src/clients/blobscan/mod.rs @@ -6,7 +6,8 @@ use crate::{clients::common::ClientResult, json_get, json_put}; use self::{ jwt_manager::{Config as JWTManagerConfig, JWTManager}, types::{ - Blob, Block, IndexRequest, ReorgedSlotRequest, SlotRequest, SlotResponse, Transaction, + Blob, Block, BlockchainSyncState, BlockchainSyncStateRequest, BlockchainSyncStateResponse, + IndexRequest, ReorgedSlotRequest, Transaction, }, }; @@ -29,7 +30,7 @@ pub struct Config { impl BlobscanClient { pub fn try_with_client(client: Client, config: Config) -> ClientResult { - let base_url = Url::parse(&format!("{}/api/indexer/", config.base_url))?; + let base_url = Url::parse(&format!("{}/api/", config.base_url))?; let jwt_manager = JWTManager::new(JWTManagerConfig { secret_key: config.secret_key, refresh_interval: chrono::Duration::hours(1), @@ -51,7 +52,7 @@ impl BlobscanClient { transactions: Vec, blobs: Vec, ) -> ClientResult<()> { - let url = self.base_url.join("block-txs-blobs")?; + let url = self.base_url.join("indexer/block-txs-blobs")?; let token = self.jwt_manager.get_token()?; let req = IndexRequest { block, @@ -63,25 +64,31 @@ impl BlobscanClient { } pub async fn handle_reorged_slot(&self, slot: u32) -> ClientResult<()> { - let url = self.base_url.join("reorged-slot")?; + let url = self.base_url.join("indexer/reorged-slot")?; let token = self.jwt_manager.get_token()?; - let req = ReorgedSlotRequest { slot }; + let req = ReorgedSlotRequest { + new_head_slot: slot, + }; json_put!(&self.client, url, token, &req).map(|_: Option<()>| ()) } - pub async fn update_slot(&self, slot: u32) -> ClientResult<()> { - let url = self.base_url.join("slot")?; + pub async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()> { + let url = self.base_url.join("blockchain-sync-state")?; let token = self.jwt_manager.get_token()?; - let req = SlotRequest { slot }; + let req: BlockchainSyncStateRequest = sync_state.into(); json_put!(&self.client, url, token, &req).map(|_: Option<()>| ()) } - pub async fn get_slot(&self) -> ClientResult> { - let url = self.base_url.join("slot")?; - - json_get!(&self.client, url, SlotResponse, self.exp_backoff.clone()) - .map(|res: Option| Some(res.unwrap().slot)) + pub async fn get_synced_state(&self) -> ClientResult> { + let url = self.base_url.join("blockchain-sync-state")?; + json_get!( + &self.client, + url, + BlockchainSyncStateResponse, + self.exp_backoff.clone() + ) + .map(|res: Option| Some(res.unwrap().into())) } } diff --git a/src/clients/blobscan/types.rs b/src/clients/blobscan/types.rs index af79fdd..b8d2e37 100644 --- a/src/clients/blobscan/types.rs +++ b/src/clients/blobscan/types.rs @@ -51,13 +51,27 @@ pub struct FailedSlotsChunk { } #[derive(Serialize, Debug)] -pub struct SlotRequest { - pub slot: u32, +#[serde(rename_all = "camelCase")] +pub struct BlockchainSyncStateRequest { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_lower_synced_slot: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_upper_synced_slot: Option, } #[derive(Deserialize, Debug)] -pub struct SlotResponse { - pub slot: u32, +pub struct BlockchainSyncStateResponse { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_lower_synced_slot: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_upper_synced_slot: Option, +} + +#[derive(Debug)] + +pub struct BlockchainSyncState { + pub last_lower_synced_slot: Option, + pub last_upper_synced_slot: Option, } #[derive(Serialize, Debug)] @@ -68,8 +82,9 @@ pub struct IndexRequest { } #[derive(Serialize, Debug)] +#[serde(rename_all = "camelCase")] pub struct ReorgedSlotRequest { - pub slot: u32, + pub new_head_slot: u32, } impl fmt::Debug for Blob { @@ -213,3 +228,21 @@ impl<'a> From<(&'a BeaconBlob, &'a H256, usize, &'a H256)> for Blob { } } } + +impl From for BlockchainSyncState { + fn from(response: BlockchainSyncStateResponse) -> Self { + Self { + last_lower_synced_slot: response.last_lower_synced_slot, + last_upper_synced_slot: response.last_upper_synced_slot, + } + } +} + +impl From for BlockchainSyncStateRequest { + fn from(sync_state: BlockchainSyncState) -> Self { + Self { + last_lower_synced_slot: sync_state.last_lower_synced_slot, + last_upper_synced_slot: sync_state.last_upper_synced_slot, + } + } +} diff --git a/src/indexer.rs b/src/indexer.rs index 7985976..8ed019f 100644 --- a/src/indexer.rs +++ b/src/indexer.rs @@ -5,7 +5,10 @@ use tracing::{debug, error}; use crate::{ args::Args, - clients::beacon::types::{BlockId, HeadBlockEventData, Topic}, + clients::{ + beacon::types::{BlockId, HeadBlockEventData, Topic}, + blobscan::types::BlockchainSyncState, + }, context::{Config as ContextConfig, Context}, env::Environment, slots_processor::SlotsProcessor, @@ -50,39 +53,39 @@ impl Indexer { let blobscan_client = self.context.blobscan_client(); let mut event_source = beacon_client.subscribe_to_events(vec![Topic::Head])?; - let current_block_id = match start_block_id { - Some(start_slot) => start_slot, - None => match blobscan_client.get_slot().await { - Err(error) => { - error!(target = "indexer", ?error, "Failed to fetch latest slot"); + let sync_state = match blobscan_client.get_synced_state().await { + Ok(state) => state, + Err(error) => { + error!(target = "indexer", ?error, "Failed to fetch sync state"); - return Err(error.into()); - } - Ok(res) => BlockId::Slot(match res { - Some(latest_slot) => latest_slot + 1, - None => 0, - }), + return Err(error.into()); + } + }; + + let current_lower_block_id = match &sync_state { + Some(state) => match state.last_lower_synced_slot { + Some(slot) => BlockId::Slot(slot - 1), + None => BlockId::Head, + }, + None => BlockId::Head, + }; + let current_upper_block_id = match &sync_state { + Some(state) => match state.last_upper_synced_slot { + Some(slot) => BlockId::Slot(slot + 1), + None => BlockId::Head, }, + None => BlockId::Head, }; - let finalized_block_header = self - .synchronizer - .run(¤t_block_id, &BlockId::Finalized) + self.synchronizer + .run(¤t_lower_block_id, &BlockId::Slot(0)) .await?; - // We disable parallel processing for better handling of possible reorgs - self.synchronizer.enable_parallel_processing(false); - - let head_block_header = self - .synchronizer - .run( - &BlockId::Slot(finalized_block_header.header.message.slot), - &BlockId::Head, - ) + self.synchronizer + .run(¤t_upper_block_id, &BlockId::Head) .await?; - let mut last_indexed_block_root = head_block_header.root; - let slots_processor = SlotsProcessor::new(self.context.clone()); + let mut slots_processor = SlotsProcessor::new(self.context.clone()); while let Some(event) = event_source.next().await { match event { @@ -91,11 +94,14 @@ impl Indexer { let head_block_data = serde_json::from_str::(&event.data)?; slots_processor - .process_slot(head_block_data.slot, Some(last_indexed_block_root)) + .process_slot(head_block_data.slot, Some(true)) + .await?; + blobscan_client + .update_sync_state(BlockchainSyncState { + last_lower_synced_slot: None, + last_upper_synced_slot: Some(head_block_data.slot), + }) .await?; - blobscan_client.update_slot(head_block_data.slot).await?; - - last_indexed_block_root = head_block_data.block; } Err(error) => { error!( diff --git a/src/slots_processor/error.rs b/src/slots_processor/error.rs index 53438ed..8686d3e 100644 --- a/src/slots_processor/error.rs +++ b/src/slots_processor/error.rs @@ -20,5 +20,7 @@ pub enum SlotsProcessorError { error: SlotProcessingError, }, #[error(transparent)] + ClientError(#[from] crate::clients::common::ClientError), + #[error(transparent)] Other(#[from] anyhow::Error), } diff --git a/src/slots_processor/mod.rs b/src/slots_processor/mod.rs index df88dd8..9059869 100644 --- a/src/slots_processor/mod.rs +++ b/src/slots_processor/mod.rs @@ -5,7 +5,7 @@ use tracing::{debug, info}; use crate::{ clients::{ - beacon::types::{Block as BeaconBlock, BlockId}, + beacon::types::{BlockHeader, BlockId}, blobscan::types::{Blob, Block, Transaction}, }, context::Context, @@ -19,67 +19,85 @@ mod helpers; pub struct SlotsProcessor { context: Context, + last_block: Option, +} + +#[derive(Debug, Clone)] +pub struct BlockData { + pub root: H256, + pub slot: u32, +} + +impl From for BlockData { + fn from(block_header: BlockHeader) -> Self { + Self { + root: block_header.root.into(), + slot: block_header.header.message.slot, + } + } } impl SlotsProcessor { pub fn new(context: Context) -> SlotsProcessor { - Self { context } + Self { + context, + last_block: None, + } } pub async fn process_slots( - &self, + &mut self, initial_slot: u32, final_slot: u32, ) -> Result<(), SlotsProcessorError> { - let beacon_client = self.context.beacon_client(); - let mut last_block_root: Option = None; - - for current_slot in initial_slot..final_slot { - let beacon_block_header = match beacon_client - .get_block_header(&BlockId::Slot(current_slot)) - .await - .map_err(|error| SlotsProcessorError::FailedSlotsProcessing { - initial_slot, - final_slot, - failed_slot: current_slot, - error: error.into(), - })? { - Some(block_header) => block_header, - None => { - debug!( - target = "slots_processor", - slot = current_slot, - "Skipping as there is no beacon block header" - ); - - continue; + let is_reverse_processing = initial_slot > final_slot; + + if is_reverse_processing { + for current_slot in (final_slot..=initial_slot).rev() { + let result = self.process_slot(current_slot, Some(false)).await; + + if let Err(error) = result { + return Err(SlotsProcessorError::FailedSlotsProcessing { + initial_slot, + final_slot, + failed_slot: current_slot, + error, + }); + } + } + } else { + for current_slot in initial_slot..=final_slot { + let result = self.process_slot(current_slot, Some(true)).await; + + if let Err(error) = result { + return Err(SlotsProcessorError::FailedSlotsProcessing { + initial_slot, + final_slot, + failed_slot: current_slot, + error, + }); } - }; - - let result = self.process_slot(current_slot, last_block_root).await; - - if let Err(error) = result { - return Err(SlotsProcessorError::FailedSlotsProcessing { - initial_slot, - final_slot, - failed_slot: current_slot, - error, - }); } - - last_block_root = Some(beacon_block_header.root); } Ok(()) } pub async fn process_slot( - &self, + &mut self, slot: u32, - last_block_root: Option, + enable_reorg_detection: Option, ) -> Result<(), SlotProcessingError> { + if let Some(enable_reorg_detection) = enable_reorg_detection { + if enable_reorg_detection { + self._detect_and_handle_reorg(slot).await?; + } + } + let beacon_client = self.context.beacon_client(); let blobscan_client = self.context.blobscan_client(); + let provider = self.context.provider(); + let beacon_block = match beacon_client.get_block(&BlockId::Slot(slot)).await? { Some(block) => block, None => { @@ -93,24 +111,7 @@ impl SlotsProcessor { } }; - if let Some(last_block_root) = last_block_root { - if beacon_block.message.parent_root != last_block_root { - info!(target = "slots_processor", slot, "Block reorg detected"); - - blobscan_client.handle_reorged_slot(slot).await?; - } - } - - self.process_block(beacon_block).await - } - - async fn process_block(&self, block: BeaconBlock) -> Result<(), SlotProcessingError> { - let beacon_client = self.context.beacon_client(); - let blobscan_client = self.context.blobscan_client(); - let provider = self.context.provider(); - let slot = block.message.slot; - - let execution_payload = match block.message.body.execution_payload { + let execution_payload = match beacon_block.message.body.execution_payload { Some(payload) => payload, None => { debug!( @@ -122,7 +123,7 @@ impl SlotsProcessor { } }; - let has_kzg_blob_commitments = match block.message.body.blob_kzg_commitments { + let has_kzg_blob_commitments = match beacon_block.message.body.blob_kzg_commitments { Some(commitments) => !commitments.is_empty(), None => false, }; @@ -228,4 +229,38 @@ impl SlotsProcessor { Ok(()) } + + pub fn get_last_block(&self) -> Option { + self.last_block.clone() + } + + async fn _detect_and_handle_reorg(&mut self, slot: u32) -> Result<(), SlotProcessingError> { + let beacon_client = self.context.beacon_client(); + let blobscan_client = self.context.blobscan_client(); + + let beacon_block_header = match beacon_client.get_block_header(&BlockId::Slot(slot)).await? + { + Some(block_header) => block_header, + None => { + debug!( + target = "slots_processor", + slot, "Skipping as there is no beacon block header" + ); + + return Ok(()); + } + }; + + if let Some(block) = &self.last_block { + if beacon_block_header.header.message.parent_root != block.root { + info!(target = "slots_processor", slot, "Block reorg detected"); + + blobscan_client.handle_reorged_slot(slot).await?; + } + } + + self.last_block = Some(beacon_block_header.into()); + + Ok(()) + } } diff --git a/src/synchronizer/mod.rs b/src/synchronizer/mod.rs index c157a76..3572ab5 100644 --- a/src/synchronizer/mod.rs +++ b/src/synchronizer/mod.rs @@ -1,4 +1,4 @@ -use std::{cmp::Ordering, thread}; +use std::thread; use anyhow::anyhow; use futures::future::join_all; @@ -6,14 +6,14 @@ use tokio::task::JoinHandle; use tracing::{debug, debug_span, error, info, Instrument}; use crate::{ - clients::beacon::types::{BlockHeader, BlockId}, + clients::{beacon::types::BlockId, blobscan::types::BlockchainSyncState}, context::Context, - slots_processor::{error::SlotsProcessorError, SlotsProcessor}, + slots_processor::{error::SlotsProcessorError, BlockData, SlotsProcessor}, }; use self::error::{SlotsChunksErrors, SynchronizerError}; -mod error; +pub mod error; #[derive(Debug)] pub struct SynchronizerBuilder { @@ -21,6 +21,13 @@ pub struct SynchronizerBuilder { slots_checkpoint: u32, } +pub struct Synchronizer { + context: Context, + num_threads: u32, + slots_checkpoint: u32, + last_synced_block: Option, +} + impl SynchronizerBuilder { pub fn new() -> Result { SynchronizerBuilder::default() @@ -51,98 +58,41 @@ impl SynchronizerBuilder { context, num_threads: self.num_threads, slots_checkpoint: self.slots_checkpoint, - enable_parallel_processing: self.num_threads > 1, + last_synced_block: None, } } } -pub struct Synchronizer { - context: Context, - num_threads: u32, - slots_checkpoint: u32, - enable_parallel_processing: bool, -} - impl Synchronizer { pub async fn run( - &self, + &mut self, initial_block_id: &BlockId, final_block_id: &BlockId, - ) -> Result { - let initial_block_slot = match initial_block_id { - BlockId::Slot(slot) => *slot, - _ => { - self._fetch_block_header(initial_block_id) - .await? - .header - .message - .slot - } - }; - let mut final_block_header = self._fetch_block_header(final_block_id).await?; - let final_block_slot = final_block_header.header.message.slot; + ) -> Result<(), SynchronizerError> { + let initial_slot = self._resolve_to_slot(initial_block_id).await?; + let mut final_slot = self._resolve_to_slot(final_block_id).await?; loop { - match initial_block_slot.cmp(&final_block_slot) { - Ordering::Equal => { - return Ok(final_block_header); - } - Ordering::Less => { - self._sync_slots_by_checkpoints(initial_block_slot, final_block_slot) - .await?; - } - Ordering::Greater => { - let err = anyhow!("Initial block slot is greater than final one"); + self._sync_slots_by_checkpoints(initial_slot, final_slot) + .await?; - error!( - target = "synchronizer", - initial_block_slot, - final_block_slot, - "{}", - err.to_string() - ); + let latest_final_slot = self._resolve_to_slot(final_block_id).await?; - return Err(err.into()); - } + if final_slot == latest_final_slot { + return Ok(()); } - /* - * If provided final block ID is a slot, we can stop syncing once we reach it. Otherwise, - * we need to keep fetching the latest block header to check if the final block slot has been - * reached. - */ - match final_block_id { - BlockId::Slot(_) => return Ok(final_block_header), - _ => { - let latest_final_block_header = - self._fetch_block_header(final_block_id).await?; - - if latest_final_block_header.header.message.slot == final_block_slot { - return Ok(final_block_header); - } - - final_block_header = latest_final_block_header; - } - } + final_slot = latest_final_slot; } } - pub fn enable_parallel_processing(&mut self, enable_parallel_processing: bool) -> &mut Self { - self.enable_parallel_processing = enable_parallel_processing; - - self - } - async fn _sync_slots_in_parallel( - &self, + &mut self, from_slot: u32, to_slot: u32, ) -> Result<(), SynchronizerError> { - if from_slot == to_slot { - return Ok(()); - } - - let unprocessed_slots = to_slot - from_slot; + let is_reverse_sync = to_slot < from_slot; + let unprocessed_slots = to_slot.abs_diff(from_slot) + 1; let num_threads = std::cmp::min(self.num_threads, unprocessed_slots); let slots_per_thread = unprocessed_slots / num_threads; let remaining_slots = unprocessed_slots % num_threads; @@ -152,18 +102,26 @@ impl Synchronizer { unprocessed_slots }; - let mut handles: Vec>> = vec![]; + let mut handles: Vec, SlotsProcessorError>>> = vec![]; for i in 0..num_threads { - let slots_in_current_thread = if i == num_threads - 1 { - slots_per_thread + remaining_slots + let mut slots_processor = SlotsProcessor::new(self.context.clone()); + let thread_total_slots = slots_per_thread + + if i == num_threads - 1 { + remaining_slots + } else { + 0 + }; + let thread_initial_slot = if is_reverse_sync { + from_slot - i * slots_per_thread } else { - slots_per_thread + from_slot + i * slots_per_thread + }; + let thread_final_slot = if is_reverse_sync { + thread_initial_slot - thread_total_slots + 1 + } else { + thread_initial_slot + thread_total_slots - 1 }; - - let slots_processor = SlotsProcessor::new(self.context.clone()); - let thread_initial_slot = from_slot + i * slots_per_thread; - let thread_final_slot = thread_initial_slot + slots_in_current_thread; let synchronizer_thread_span = tracing::trace_span!( "synchronizer_thread", @@ -175,7 +133,9 @@ impl Synchronizer { async move { slots_processor .process_slots(thread_initial_slot, thread_final_slot) - .await + .await?; + + Ok(slots_processor.get_last_block()) } .instrument(synchronizer_thread_span), ); @@ -186,11 +146,14 @@ impl Synchronizer { let handle_outputs = join_all(handles).await; let mut errors = vec![]; + let mut last_synced_block: Option = None; for handle in handle_outputs { match handle { Ok(thread_result) => match thread_result { - Ok(_) => (), + Ok(thread_last_block) => { + last_synced_block = thread_last_block; + } Err(error) => errors.push(error), }, Err(error) => { @@ -208,6 +171,8 @@ impl Synchronizer { } if errors.is_empty() { + self.last_synced_block = last_synced_block; + Ok(()) } else { Err(SynchronizerError::FailedParallelSlotsProcessing { @@ -219,24 +184,30 @@ impl Synchronizer { } async fn _sync_slots_by_checkpoints( - &self, + &mut self, initial_slot: u32, final_slot: u32, ) -> Result<(), SynchronizerError> { - let blobscan_client = self.context.blobscan_client(); - + let is_reverse_sync = final_slot < initial_slot; let mut current_slot = initial_slot; - let mut unprocessed_slots = final_slot - current_slot; + let mut unprocessed_slots = final_slot.abs_diff(current_slot) + 1; info!( target = "synchronizer", - initial_slot, final_slot, "Syncing {unprocessed_slots} slots…" + reverse_sync = is_reverse_sync, + initial_slot, + final_slot, + "Syncing {unprocessed_slots} slots…" ); while unprocessed_slots > 0 { let slots_chunk = std::cmp::min(unprocessed_slots, self.slots_checkpoint); let initial_chunk_slot = current_slot; - let final_chunk_slot = current_slot + slots_chunk; + let final_chunk_slot = if is_reverse_sync { + current_slot - slots_chunk + 1 + } else { + current_slot + slots_chunk - 1 + }; let sync_slots_chunk_span = debug_span!( "synchronizer", @@ -244,25 +215,29 @@ impl Synchronizer { final_slot = final_chunk_slot ); - if self.enable_parallel_processing { - self._sync_slots_in_parallel(initial_chunk_slot, final_chunk_slot) - .instrument(sync_slots_chunk_span) - .await?; - } else { - let slots_processor = SlotsProcessor::new(self.context.clone()); + self._sync_slots_in_parallel(initial_chunk_slot, final_chunk_slot) + .instrument(sync_slots_chunk_span) + .await?; - slots_processor - .process_slots(initial_chunk_slot, final_chunk_slot) - .instrument(sync_slots_chunk_span) - .await?; - } + let last_slot = Some(final_chunk_slot); + let last_lower_synced_slot = if is_reverse_sync { last_slot } else { None }; + let last_upper_synced_slot = if is_reverse_sync { None } else { last_slot }; + + let blobscan_client = self.context.blobscan_client(); - if let Err(error) = blobscan_client.update_slot(final_chunk_slot - 1).await { + if let Err(error) = blobscan_client + .update_sync_state(BlockchainSyncState { + last_lower_synced_slot, + last_upper_synced_slot, + }) + .await + { error!( target = "synchronizer", - new_latest_slot = final_chunk_slot - 1, + new_last_lower_synced_slot = last_lower_synced_slot, + new_last_upper_synced_slot = last_upper_synced_slot, ?error, - "Failed to update indexer's latest slot" + "Failed to update sync state after processing slots chunk" ); return Err(error.into()); @@ -270,32 +245,37 @@ impl Synchronizer { debug!( target = "synchronizer", - latest_slot = final_chunk_slot - 1, - "Checkpoint reached. Latest indexed slot updated" + new_last_lower_synced_slot = last_lower_synced_slot, + new_last_upper_synced_slot = last_upper_synced_slot, + "Checkpoint reached. Last synced slots updated" ); - current_slot += slots_chunk; + current_slot = if is_reverse_sync { + current_slot - slots_chunk + } else { + current_slot + slots_chunk + }; unprocessed_slots -= slots_chunk; } Ok(()) } - async fn _fetch_block_header( - &self, - block_id: &BlockId, - ) -> Result { + async fn _resolve_to_slot(&self, block_id: &BlockId) -> Result { let beacon_client = self.context.beacon_client(); - match beacon_client.get_block_header(block_id).await? { - Some(block_header) => Ok(block_header), - None => { - let err = anyhow!("Block header not found for block ID {}", block_id); + match block_id { + BlockId::Slot(slot) => Ok(*slot), + _ => match beacon_client.get_block_header(block_id).await? { + Some(block_header) => Ok(block_header.header.message.slot), + None => { + let err = anyhow!("Slot not found for block ID {}", block_id); - error!(target = "synchronizer", "{}", err.to_string()); + error!(target = "synchronizer", "{}", err.to_string()); - Err(err.into()) - } + Err(err.into()) + } + }, } } }