Skip to content

Commit

Permalink
refactor:
Browse files Browse the repository at this point in the history
- Modularize env and context initialization logic
- Include db manager creation logic into struct
  • Loading branch information
PJColombo committed Mar 19, 2023
1 parent 7a3e3d7 commit 509c7f9
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 78 deletions.
6 changes: 4 additions & 2 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
MONGODB_URI=
MONGODB_DB=
CONNECTION_URI=
DB_NAME=

# Optional variables
EXECUTION_NODE_URL=
BEACON_NODE_RPC=
LOGGER=
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = "2021"
[dependencies]
async-trait = "0.1.66"
dotenv = "0.15.0"
envy = "0.4.2"
ethers = "1.0.2"
futures = "0.3.25"
hex = "0.4.3"
Expand Down
56 changes: 56 additions & 0 deletions src/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use ethers::prelude::*;
use serde::Deserialize;
use std::error;

use crate::{
beacon_chain::BeaconChainAPI,
db::{blob_db_manager::DBManager, mongodb::MongoDBManager},
};

fn default_execution_node_rpc() -> String {
"http://localhost:8545".to_string()
}

fn default_beacon_node_rpc() -> String {
"http://localhost:3500".to_string()
}

fn default_logger() -> String {
"indexer".to_string()
}

#[derive(Deserialize, Debug)]
struct Environment {
connection_uri: String,
db_name: String,
#[serde(default = "default_execution_node_rpc")]
execution_node_rpc: String,
#[serde(default = "default_beacon_node_rpc")]
beacon_node_rpc: String,
#[serde(default = "default_logger")]
logger: String,
}

pub struct Context {
pub beacon_api: BeaconChainAPI,
pub db_manager: MongoDBManager,
pub provider: Provider<Http>,
pub logger: String,
}

pub async fn create_context() -> Result<Context, Box<dyn error::Error>> {
let Environment {
beacon_node_rpc,
connection_uri,
db_name,
execution_node_rpc,
logger,
} = envy::from_env::<Environment>()?;

Ok(Context {
beacon_api: BeaconChainAPI::new(beacon_node_rpc),
db_manager: MongoDBManager::new(&connection_uri, &db_name).await?,
provider: Provider::<Http>::try_from(execution_node_rpc)?,
logger,
})
}
4 changes: 4 additions & 0 deletions src/db/blob_db_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ use super::types::IndexerMetadata;
pub trait DBManager {
type Options;

async fn new(connection_uri: &String, db_name: &String) -> Result<Self, Box<dyn Error>>
where
Self: Sized;

async fn commit_transaction(
&mut self,
options: Option<Self::Options>,
Expand Down
29 changes: 14 additions & 15 deletions src/db/mongodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,24 @@ pub struct MongoDBManagerOptions {

const INDEXER_METADATA_ID: &str = "indexer_metadata";

pub async fn connect() -> Result<MongoDBManager, Box<dyn Error>> {
let connection_url = std::env::var("MONGODB_URI").unwrap();
let database_name = std::env::var("MONGODB_DB").unwrap();

let mut client_options = ClientOptions::parse(connection_url).await?;
#[async_trait]
impl DBManager for MongoDBManager {
type Options = MongoDBManagerOptions;

client_options.app_name = Some("Blobscan".to_string());
async fn new(connection_uri: &String, db_name: &String) -> Result<Self, Box<dyn Error>>
where
Self: Sized,
{
let mut client_options = ClientOptions::parse(connection_uri).await?;

let client = Client::with_options(client_options)?;
let session = client.start_session(None).await?;
let db = client.database(&database_name);
client_options.app_name = Some("Blobscan".to_string());

Ok(MongoDBManager { session, db })
}
let client = Client::with_options(client_options)?;
let session = client.start_session(None).await?;
let db = client.database(db_name);

#[async_trait]
impl DBManager for MongoDBManager {
type Options = MongoDBManagerOptions;
Ok(MongoDBManager { session, db })
}

async fn commit_transaction(
&mut self,
Expand All @@ -62,7 +62,6 @@ impl DBManager for MongoDBManager {
let result = self.session.commit_transaction().await;

if let Err(ref error) = result {
println!("Commit result: {:?}", error);
if error.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT) {
continue;
}
Expand Down
30 changes: 9 additions & 21 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
use std::{
env,
error::{self},
thread,
time::Duration,
};

use beacon_chain::BeaconChainAPI;
use ethers::prelude::*;
use slots::{process_slots, Config as SlotConfig};
use context::create_context;
use slots::process_slots;

use crate::db::{blob_db_manager::DBManager, mongodb::connect};
use crate::db::blob_db_manager::DBManager;

mod beacon_chain;
mod context;
mod db;
mod slots;
mod utils;
Expand All @@ -20,35 +19,24 @@ type StdErr = Box<dyn error::Error>;

#[tokio::main]
async fn main() -> Result<(), StdErr> {
dotenv::dotenv()?;

let execution_node_rpc = env::var("EXECUTION_NODE_RPC")?;
let beacon_node_rpc = env::var("BEACON_NODE_RPC")?;
dotenv::dotenv().expect("Failed to read .env file");

log4rs::init_file("log4rs.yml", Default::default()).unwrap();

let beacon_api = BeaconChainAPI::new(beacon_node_rpc);
let db_manager = connect().await?;
let provider = Provider::<Http>::try_from(execution_node_rpc)?;

let mut config = SlotConfig {
beacon_api,
db_manager,
provider,
};
let mut context = create_context().await?;

let mut current_slot = match config.db_manager.read_metadata(None).await? {
let mut current_slot = match context.db_manager.read_metadata(None).await? {
Some(metadata) => metadata.last_slot + 1,
None => 0,
};

loop {
match config.beacon_api.get_block(None).await? {
match context.beacon_api.get_block(None).await? {
Some(latest_beacon_block) => {
let latest_slot: u32 = latest_beacon_block.slot.parse()?;

if current_slot < latest_slot {
process_slots(current_slot, latest_slot, &mut config).await;
process_slots(current_slot, latest_slot, &mut context).await;

current_slot = latest_slot;
}
Expand Down
69 changes: 31 additions & 38 deletions src/slots/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,25 @@ use futures::future::join_all;
use log::{error, info};

use crate::{
beacon_chain::BeaconChainAPI,
db::{
blob_db_manager::DBManager,
mongodb::{MongoDBManager, MongoDBManagerOptions},
types::Blob,
},
utils::{
logs::INDEXER_LOGGER,
web3::{calculate_versioned_hash, get_eip_4844_tx, get_tx_versioned_hashes},
},
context::Context,
db::{blob_db_manager::DBManager, mongodb::MongoDBManagerOptions, types::Blob},
utils::web3::{calculate_versioned_hash, get_eip_4844_tx, get_tx_versioned_hashes},
};

type StdErr = Box<dyn error::Error>;

pub struct Config {
pub db_manager: MongoDBManager,
pub beacon_api: BeaconChainAPI,
pub provider: Provider<Http>,
}

pub async fn process_slots(start_slot: u32, end_slot: u32, config: &mut Config) {
pub async fn process_slots(start_slot: u32, end_slot: u32, context: &mut Context) {
let mut current_slot = start_slot;

while current_slot < end_slot {
let result = process_slot(current_slot, config).await;
let result = process_slot(current_slot, context).await;

// TODO: implement exponential backoff for proper error handling. If X intents have been made, then notify and stop process
if let Err(e) = result {
save_slot(current_slot, config).await;
save_slot(current_slot - 1, context).await;

error!(
target: INDEXER_LOGGER,
target: context.logger.as_str(),
"[Slot {}] Couldn't process slot: {}", current_slot, e
);

Expand All @@ -46,20 +33,23 @@ pub async fn process_slots(start_slot: u32, end_slot: u32, config: &mut Config)
current_slot = current_slot + 1;
}

save_slot(current_slot, config).await
save_slot(current_slot, context).await
}

async fn process_slot(slot: u32, config: &mut Config) -> Result<(), StdErr> {
let provider = &config.provider;
let db_manager = &mut config.db_manager;
let beacon_api = &config.beacon_api;
async fn process_slot(slot: u32, context: &mut Context) -> Result<(), StdErr> {
let Context {
beacon_api,
db_manager,
provider,
logger,
} = context;

let start = Instant::now();
let beacon_block = match beacon_api.get_block(Some(slot)).await? {
Some(block) => block,
None => {
info!(
target: INDEXER_LOGGER,
target: logger,
"[Slot {}] Skipping as there is no beacon block", slot
);

Expand All @@ -71,7 +61,7 @@ async fn process_slot(slot: u32, config: &mut Config) -> Result<(), StdErr> {
Some(payload) => payload,
None => {
info!(
target: INDEXER_LOGGER,
target: logger,
"[Slot {}] Skipping as beacon block doesn't contain execution payload", slot
);

Expand All @@ -83,7 +73,7 @@ async fn process_slot(slot: u32, config: &mut Config) -> Result<(), StdErr> {
Some(commitments) => commitments,
None => {
info!(
target: INDEXER_LOGGER,
target: logger,
"[Slot {}] Skipping as beacon block doesn't contain blob kzg commitments", slot
);

Expand All @@ -93,7 +83,7 @@ async fn process_slot(slot: u32, config: &mut Config) -> Result<(), StdErr> {
let execution_block_hash = execution_payload.block_hash;
let execution_block_hash = H256::from_str(execution_block_hash.as_str())?;

let execution_block = match config.provider.get_block(execution_block_hash).await? {
let execution_block = match provider.get_block(execution_block_hash).await? {
Some(block) => block,
None => {
let error_msg = format!("Execution block {} not found", execution_block_hash);
Expand All @@ -120,7 +110,7 @@ async fn process_slot(slot: u32, config: &mut Config) -> Result<(), StdErr> {

if blob_txs.len() == 0 {
info!(
target: INDEXER_LOGGER,
target: logger,
"[Slot {}] Skipping as execution block doesn't contain blob txs", slot
);

Expand All @@ -131,7 +121,7 @@ async fn process_slot(slot: u32, config: &mut Config) -> Result<(), StdErr> {
Some(blobs_sidecar) => {
if blobs_sidecar.blobs.len() == 0 {
info!(
target: INDEXER_LOGGER,
target: logger,
"[Slot {}] Skipping as blobs sidecar is empty", slot
);

Expand All @@ -142,7 +132,7 @@ async fn process_slot(slot: u32, config: &mut Config) -> Result<(), StdErr> {
}
None => {
info!(
target: INDEXER_LOGGER,
target: logger,
"[Slot {}] Skipping as there is no blobs sidecar", slot
);

Expand Down Expand Up @@ -185,12 +175,12 @@ async fn process_slot(slot: u32, config: &mut Config) -> Result<(), StdErr> {
db_manager.insert_blob(blob, blob_tx.hash, None).await?;
}

config.db_manager.commit_transaction(None).await?;
db_manager.commit_transaction(None).await?;

let duration = start.elapsed();

info!(
target: INDEXER_LOGGER,
target: logger,
"[Slot {}] Blobs indexed correctly (elapsed time: {:?}s)",
slot,
duration.as_secs()
Expand All @@ -199,14 +189,17 @@ async fn process_slot(slot: u32, config: &mut Config) -> Result<(), StdErr> {
Ok(())
}

async fn save_slot(slot: u32, config: &mut Config) {
let result = config
.db_manager
async fn save_slot(slot: u32, context: &mut Context) {
let Context {
db_manager, logger, ..
} = context;

let result = db_manager
.update_last_slot(slot, Some(MongoDBManagerOptions { use_session: false }))
.await;

if let Err(e) = result {
error!(target: INDEXER_LOGGER, "Couldn't update last slot: {}", e);
error!(target: logger, "Couldn't update last slot: {}", e);
panic!();
}
}
1 change: 0 additions & 1 deletion src/utils/logs.rs

This file was deleted.

1 change: 0 additions & 1 deletion src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
pub mod logs;
pub mod web3;

0 comments on commit 509c7f9

Please sign in to comment.