Skip to content

Commit

Permalink
Add support for multiple builder and block selection
Browse files Browse the repository at this point in the history
  • Loading branch information
avalonche committed Oct 25, 2024
1 parent bbdab15 commit a2ba050
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 73 deletions.
37 changes: 19 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ hyper-util = { version = "0.1", features = ["full"] }
serde_json = "1.0.96"
reth-rpc-layer = { git = "https://github.com/paradigmxyz/reth.git", tag = "v1.0.7" }
reth-optimism-payload-builder = { git = "https://github.com/paradigmxyz/reth.git", tag = "v1.0.7", features = ["optimism"] }
futures = "0.3.31"

[dev-dependencies]
anyhow = "1.0"
Expand Down
9 changes: 5 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use tracing_subscriber::EnvFilter;
mod error;
mod proxy;
mod server;
mod selector;

#[derive(Parser, Debug)]
#[clap(author, version, about)]
Expand Down Expand Up @@ -45,7 +46,7 @@ struct Args {

/// URL of the builder execution engine
#[arg(long, env)]
builder_url: String,
builder_urls: Vec<String>,

/// Use the proposer to sync the builder node
#[arg(long, env, default_value = "false")]
Expand Down Expand Up @@ -112,12 +113,12 @@ async fn main() -> Result<()> {
// Initialize the l2 client
let l2_client = create_client(&args.l2_url, jwt_secret)?;

// Initialize the builder client
let builder_client = create_client(&args.builder_url, builder_jwt_secret)?;
// Initialize the builder clients
let builder_clients = args.builder_urls.iter().map(|url| create_client(url, builder_jwt_secret)).collect::<Result<Vec<_>>>()?;

let eth_engine_api = EthEngineApi::new(
Arc::new(l2_client),
Arc::new(builder_client),
builder_clients.iter().map(|c| Arc::new(c.clone())).collect(),
args.boost_sync,
);
let mut module: RpcModule<()> = RpcModule::new(());
Expand Down
28 changes: 28 additions & 0 deletions src/selector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use jsonrpsee::core::ClientError;
use op_alloy_rpc_types_engine::OptimismExecutionPayloadEnvelopeV3;

// Define a trait for choosing a payload
pub trait PayloadSelector {
fn select_payload(
&self,
local_payload: Result<OptimismExecutionPayloadEnvelopeV3, ClientError>,
builder_payloads: Vec<Result<OptimismExecutionPayloadEnvelopeV3, ClientError>>,
) -> Result<OptimismExecutionPayloadEnvelopeV3, ClientError>;
}

pub struct DefaultPayloadSelector;

impl PayloadSelector for DefaultPayloadSelector {
fn select_payload(
&self,
local_payload: Result<OptimismExecutionPayloadEnvelopeV3, ClientError>,
builder_payloads: Vec<Result<OptimismExecutionPayloadEnvelopeV3, ClientError>>,
) -> Result<OptimismExecutionPayloadEnvelopeV3, ClientError> {
builder_payloads
.iter()
.filter_map(|payload| payload.as_ref().ok())
.max_by_key(|p| p.block_value)
.map(|p| Ok(p.clone()))
.unwrap_or(local_payload)
}
}
117 changes: 66 additions & 51 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use alloy_rpc_types_engine::{
ExecutionPayload, ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, PayloadId,
PayloadStatus,
};
use futures::future::join_all;
use jsonrpsee::core::{async_trait, ClientError, RpcResult};
use jsonrpsee::http_client::transport::HttpBackend;
use jsonrpsee::http_client::HttpClient;
Expand All @@ -16,6 +17,8 @@ use reth_rpc_layer::AuthClientService;
use std::sync::Arc;
use tracing::{error, info};

use crate::selector::{DefaultPayloadSelector, PayloadSelector};

#[rpc(server, client, namespace = "engine")]
pub trait EngineApi {
#[method(name = "forkchoiceUpdatedV3")]
Expand All @@ -42,20 +45,22 @@ pub trait EngineApi {

pub struct EthEngineApi<S = AuthClientService<HttpBackend>> {
l2_client: Arc<HttpClient<S>>,
builder_client: Arc<HttpClient<S>>,
builder_clients: Vec<Arc<HttpClient<S>>>,
payload_selector: Arc<dyn PayloadSelector + Send + Sync>,
boost_sync: bool,
}

impl<S> EthEngineApi<S> {
pub fn new(
l2_client: Arc<HttpClient<S>>,
builder_client: Arc<HttpClient<S>>,
builder_clients: Vec<Arc<HttpClient<S>>>,
boost_sync: bool,
) -> Self {
Self {
l2_client,
builder_client,
builder_clients,
boost_sync,
payload_selector: Arc::new(DefaultPayloadSelector),
}
}
}
Expand Down Expand Up @@ -85,11 +90,12 @@ impl EngineApiServer for EthEngineApi {
};

if should_send_to_builder {
// async call to builder to trigger payload building and sync
let builder = self.builder_client.clone();
let attr = payload_attributes.clone();
tokio::spawn(async move {
builder.fork_choice_updated_v3(fork_choice_state, attr).await.map(|response| {
// async call to each builder to trigger payload building and sync
for builder in self.builder_clients.iter() {
let builder = builder.clone();
let attr = payload_attributes.clone();
tokio::spawn(async move {
builder.fork_choice_updated_v3(fork_choice_state, attr).await.map(|response| {
let payload_id_str = response.payload_id.map(|id| id.to_string()).unwrap_or_default();
if response.is_invalid() {
error!(message = "builder rejected fork_choice_updated_v3 with attributes", "payload_id" = payload_id_str, "validation_error" = %response.payload_status.status);
Expand All @@ -99,7 +105,8 @@ impl EngineApiServer for EthEngineApi {
}).map_err(|e| {
error!(message = "error calling fork_choice_updated_v3 to builder", "error" = %e, "head_block_hash" = %fork_choice_state.head_block_hash);
})
});
});
}
} else {
info!(message = "no payload attributes provided or no_tx_pool is set", "head_block_hash" = %fork_choice_state.head_block_hash);
}
Expand All @@ -126,47 +133,53 @@ impl EngineApiServer for EthEngineApi {
) -> RpcResult<OptimismExecutionPayloadEnvelopeV3> {
info!(message = "received get_payload_v3", "payload_id" = %payload_id);
let l2_client_future = self.l2_client.get_payload_v3(payload_id);
let builder_client_future = Box::pin(async {
let payload = self.builder_client.get_payload_v3(payload_id).await.map_err(|e| {
error!(message = "error calling get_payload_v3 from builder", "error" = %e, "payload_id" = %payload_id);
e
})?;

info!(message = "received payload from builder", "payload_id" = %payload_id, "block_hash" = %payload.as_v1_payload().block_hash);

// Send the payload to the local execution engine with engine_newPayload to validate the block from the builder.
// Otherwise, we do not want to risk the network to a halt since op-node will not be able to propose the block.
// If validation fails, return the local block since that one has already been validated.
let payload_status = self.l2_client.new_payload_v3(payload.execution_payload.clone(), vec![], payload.parent_beacon_block_root).await.map_err(|e| {
error!(message = "error calling new_payload_v3 to validate builder payload", "error" = %e, "payload_id" = %payload_id);
e
})?;
if payload_status.is_invalid() {
let builder_client_futures = self.builder_clients.iter().map(|builder| {
let builder = builder.clone();
Box::pin(async move {
let payload = builder.get_payload_v3(payload_id).await.map_err(|e| {
error!(message = "error calling get_payload_v3 from builder", "error" = %e, "payload_id" = %payload_id);
e
})?;

info!(message = "received payload from builder", "payload_id" = %payload_id, "block_hash" = %payload.as_v1_payload().block_hash);

// Send the payload to the local execution engine with engine_newPayload to validate the block from the builder.
// Otherwise, we do not want to risk the network to a halt since op-node will not be able to propose the block.
// If validation fails, return the local block since that one has already been validated.
let payload_status = self.l2_client.new_payload_v3(payload.execution_payload.clone(), vec![], payload.parent_beacon_block_root).await.map_err(|e| {
error!(message = "error calling new_payload_v3 to validate builder payload", "error" = %e, "payload_id" = %payload_id);
e
})?;
if payload_status.is_invalid() {
error!(message = "builder payload was not valid", "payload_status" = %payload_status.status, "payload_id" = %payload_id);
Err(ClientError::Call(ErrorObject::owned(
INVALID_REQUEST_CODE,
"Builder payload was not valid",
None::<String>,
)))
} else {
info!(message = "received payload status from local execution engine validating builder payload", "payload_id" = %payload_id);
Ok(payload)
}
});

let (l2_payload, builder_payload) = tokio::join!(l2_client_future, builder_client_future);

builder_payload.or(l2_payload).map_err(|e| match e {
ClientError::Call(err) => err, // Already an ErrorObjectOwned, so just return it
other_error => {
error!(
message = "error calling get_payload_v3",
"error" = %other_error,
"payload_id" = %payload_id
);
ErrorCode::InternalError.into()
}
})
)))
} else {
info!(message = "received payload status from local execution engine validating builder payload", "payload_id" = %payload_id);
Ok(payload)
}
})
}).collect::<Vec<_>>();

let (l2_payload, builder_payloads) =
tokio::join!(l2_client_future, join_all(builder_client_futures));

self.payload_selector
.select_payload(l2_payload, builder_payloads)
.map_err(|e| match e {
ClientError::Call(err) => err, // Already an ErrorObjectOwned, so just return it
other_error => {
error!(
message = "error calling get_payload_v3",
"error" = %other_error,
"payload_id" = %payload_id
);
ErrorCode::InternalError.into()
}
})
}

async fn new_payload_v3(
Expand All @@ -180,11 +193,12 @@ impl EngineApiServer for EthEngineApi {

// async call to builder to sync the builder node
if self.boost_sync {
let builder = self.builder_client.clone();
let builder_payload = payload.clone();
let builder_versioned_hashes = versioned_hashes.clone();
tokio::spawn(async move {
builder.new_payload_v3(builder_payload, builder_versioned_hashes, parent_beacon_block_root).await
for builder in self.builder_clients.iter() {
let builder = builder.clone();
let builder_payload = payload.clone();
let builder_versioned_hashes = versioned_hashes.clone();
tokio::spawn(async move {
builder.new_payload_v3(builder_payload, builder_versioned_hashes, parent_beacon_block_root).await
.map(|response: PayloadStatus| {
if response.is_invalid() {
error!(message = "builder rejected new_payload_v3", "block_hash" = %block_hash);
Expand All @@ -195,7 +209,8 @@ impl EngineApiServer for EthEngineApi {
error!(message = "error calling new_payload_v3 to builder", "error" = %e, "block_hash" = %block_hash);
e
})
});
});
}
}

self.l2_client
Expand Down

0 comments on commit a2ba050

Please sign in to comment.