Skip to content

Commit

Permalink
feat(user-ops-indexer): indexing improvements (#1205)
Browse files Browse the repository at this point in the history
* feat: better realtime polling

* feat: basic indexing status

* chore: update readme envs

* chore: validate settings
  • Loading branch information
k1rill-fedoseev authored Jan 23, 2025
1 parent dff4888 commit aa54a20
Show file tree
Hide file tree
Showing 13 changed files with 384 additions and 118 deletions.
51 changes: 32 additions & 19 deletions user-ops-indexer/README.md

Large diffs are not rendered by default.

135 changes: 98 additions & 37 deletions user-ops-indexer/user-ops-indexer-logic/src/indexer/base_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{
indexer::{
rpc_utils::{CallTracer, TraceClient, TraceType},
settings::IndexerSettings,
status::{EntryPointIndexerStatusMessage, IndexerStatusMessage},
},
repository,
types::user_op::UserOp,
Expand All @@ -11,22 +12,25 @@ use alloy::{
primitives::{Address, BlockHash, Bytes, TxHash, B256},
providers::Provider,
rpc::types::{Filter, Log, TransactionReceipt},
sol_types,
sol_types::SolEvent,
sol_types::{self, SolEvent},
transports::{TransportError, TransportErrorKind},
};
use anyhow::{anyhow, bail};
use futures::{
stream,
stream::{repeat_with, BoxStream},
Stream, StreamExt, TryStreamExt,
stream::{self, unfold, BoxStream},
FutureExt, Stream, StreamExt, TryStreamExt,
};
use sea_orm::DatabaseConnection;
use std::{future, num::NonZeroUsize, sync::Arc, time, time::Duration};
use tokio::time::sleep;
use std::{
future::{self, Future},
num::NonZeroUsize,
sync::Arc,
time::{self, Duration},
};
use tokio::{sync::mpsc, time::sleep};
use tracing::instrument;

#[derive(Hash, Eq, PartialEq)]
#[derive(Hash, Default, Eq, PartialEq)]
struct Job {
tx_hash: TxHash,
block_hash: BlockHash,
Expand Down Expand Up @@ -103,28 +107,32 @@ pub trait IndexerLogic {
}
}

pub struct Indexer<P: Provider, L: IndexerLogic + Sync> {
pub struct Indexer<P: Provider, L: IndexerLogic + Sync + Send> {
client: P,

db: Arc<DatabaseConnection>,

settings: IndexerSettings,

logic: L,

tx: mpsc::Sender<IndexerStatusMessage>,
}

impl<P: Provider, L: IndexerLogic + Sync> Indexer<P, L> {
impl<P: Provider, L: IndexerLogic + Sync + Send> Indexer<P, L> {
pub fn new(
client: P,
db: Arc<DatabaseConnection>,
settings: IndexerSettings,
logic: L,
tx: mpsc::Sender<IndexerStatusMessage>,
) -> Self {
Self {
client,
db,
settings,
logic,
tx,
}
}

Expand All @@ -140,6 +148,10 @@ impl<P: Provider, L: IndexerLogic + Sync> Indexer<P, L> {
}
};

tracing::debug!("fetching latest block number");
let block_number = self.client.get_block_number().await?;
tracing::info!(block_number, "latest block number");

let mut stream_jobs = stream::SelectAll::<BoxStream<Job>>::new();

if self.settings.realtime.enabled {
Expand All @@ -161,14 +173,10 @@ impl<P: Provider, L: IndexerLogic + Sync> Indexer<P, L> {
stream_jobs.push(Box::pin(realtime_stream_jobs));
} else {
tracing::info!("starting polling of past BeforeExecution logs from rpc");
stream_jobs.push(Box::pin(self.poll_for_jobs()));
stream_jobs.push(Box::pin(self.poll_for_realtime_jobs(block_number)));
}
}

tracing::debug!("fetching latest block number");
let block_number = self.client.get_block_number().await?;
tracing::info!(block_number, "latest block number");

let rpc_refetch_block_number =
block_number.saturating_sub(self.settings.past_rpc_logs_indexer.block_range as u64);
if self.settings.past_db_logs_indexer.enabled {
Expand All @@ -192,17 +200,28 @@ impl<P: Provider, L: IndexerLogic + Sync> Indexer<P, L> {
from_block,
to_block,
)
.await?;

stream_jobs.push(Box::pin(missed_txs.map(Job::from)));
.await?
.map(Job::from);

stream_jobs.push(Box::pin(missed_txs.do_after(self.tx.send(
IndexerStatusMessage::new(
L::VERSION,
EntryPointIndexerStatusMessage::PastDbLogsIndexingFinished,
),
))));
}

if self.settings.past_rpc_logs_indexer.enabled {
let jobs = self
.fetch_jobs_for_block_range(rpc_refetch_block_number + 1, block_number)
.await?;

stream_jobs.push(Box::pin(stream::iter(jobs)));
stream_jobs.push(Box::pin(stream::iter(jobs).do_after(self.tx.send(
IndexerStatusMessage::new(
L::VERSION,
EntryPointIndexerStatusMessage::PastRpcLogsIndexingFinished,
),
))));
}

let cache_size =
Expand All @@ -224,6 +243,13 @@ impl<P: Provider, L: IndexerLogic + Sync> Indexer<P, L> {
})
.filter_map(|tx_hash| async move { tx_hash });

self.tx
.send(IndexerStatusMessage::new(
L::VERSION,
EntryPointIndexerStatusMessage::IndexerStarted,
))
.await?;

stream_txs
.map(Ok)
.try_for_each_concurrent(Some(self.settings.concurrency as usize), |tx| async move {
Expand Down Expand Up @@ -279,26 +305,40 @@ impl<P: Provider, L: IndexerLogic + Sync> Indexer<P, L> {
Ok(jobs)
}

fn poll_for_jobs(&self) -> impl Stream<Item = Job> + '_ {
repeat_with(|| async {
sleep(self.settings.realtime.polling_interval).await;
tracing::debug!("fetching latest block number");
let block_number = self.client.get_block_number().await?;
tracing::info!(block_number, "latest block number");
fn poll_for_realtime_jobs(&self, start_block: u64) -> impl Stream<Item = Job> + '_ {
unfold(
(start_block, start_block),
move |(from_block, current_block)| async move {
async {
if current_block <= from_block {
sleep(self.settings.realtime.polling_interval).await;
tracing::debug!("fetching latest block number");
let current_block = self.client.get_block_number().await?;
tracing::info!(current_block, "latest block number");
return Ok((vec![], (from_block, current_block)));
}

let from_block =
block_number.saturating_sub(self.settings.realtime.polling_block_range as u64);
let jobs = self
.fetch_jobs_for_block_range(from_block, block_number)
.await?;
let from_block = from_block
.saturating_sub(self.settings.realtime.polling_block_range as u64);
let to_block = (from_block + self.settings.realtime.max_block_range as u64)
.min(current_block);

Ok::<Vec<Job>, TransportError>(jobs)
})
.filter_map(|fut| async {
fut.await
.map_err(|err| tracing::error!(error = ?err, "failed to poll for logs"))
.ok()
})
let jobs = self
.fetch_jobs_for_block_range(from_block, to_block)
.await?;

Ok((jobs, (to_block + 1, current_block)))
}
.await
.map_or_else(
|err: TransportError| {
tracing::error!(error = ?err, "failed to poll for logs");
Some((vec![], (from_block, current_block)))
},
Some,
)
},
)
.flat_map(stream::iter)
}

Expand Down Expand Up @@ -398,6 +438,21 @@ impl<P: Provider, L: IndexerLogic + Sync> Indexer<P, L> {
}
}

trait StreamEndHook: Stream + Sized
where
Self::Item: Default,
{
fn do_after<Fut: Future>(self, f: Fut) -> impl Stream<Item = Self::Item> {
self.chain(
f.into_stream()
.map(|_| Self::Item::default())
.filter(|_| async { false }),
)
}
}

impl<S: Stream> StreamEndHook for S where S::Item: Default {}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -432,11 +487,13 @@ mod tests {
let tx_hash = b256!("f9f60f6dc99663c6ce4912ef92fe6a122bb90585e47b5f213efca1705be26d6e");
let entry_point = EntrypointsSettings::default().v06_entry_point;

let (tx, _) = mpsc::channel(100);
let indexer = Indexer::new(
client,
db.clone(),
Default::default(),
v06::IndexerV06 { entry_point },
tx,
);
indexer
.handle_tx(tx_hash, TraceClient::Trace)
Expand Down Expand Up @@ -497,11 +554,13 @@ mod tests {
let entry_point = EntrypointsSettings::default().v06_entry_point;
let op_hash = b256!("e5df829d25b3b0a043a658eb460cf74898eb0ad72a526dba0cd509ed2b83f796");

let (tx, _) = mpsc::channel(100);
let indexer = Indexer::new(
client,
db.clone(),
Default::default(),
v06::IndexerV06 { entry_point },
tx,
);
for trace_client in [TraceClient::Debug, TraceClient::Trace] {
entity::user_operations::Entity::delete_by_id(op_hash.to_vec())
Expand Down Expand Up @@ -565,11 +624,13 @@ mod tests {
let tx_hash = b256!("4a6702f8ef5b7754f5b54dfb00ccba181603e3a6fff77c93e7d0d40148f09ad0");
let entry_point = EntrypointsSettings::default().v07_entry_point;

let (tx, _) = mpsc::channel(100);
let indexer = Indexer::new(
client,
db.clone(),
Default::default(),
v07::IndexerV07 { entry_point },
tx,
);
indexer
.handle_tx(tx_hash, TraceClient::Trace)
Expand Down
1 change: 1 addition & 0 deletions user-ops-indexer/user-ops-indexer-logic/src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod base_indexer;
pub mod common;
pub mod rpc_utils;
pub mod settings;
pub mod status;
pub mod v06;
pub mod v07;

Expand Down
Loading

0 comments on commit aa54a20

Please sign in to comment.