From a2ba05038f2745176f40f51effae716db1ccce9f Mon Sep 17 00:00:00 2001 From: avalonche Date: Fri, 25 Oct 2024 17:35:13 +1100 Subject: [PATCH] Add support for multiple builder and block selection --- Cargo.lock | 37 +++++++-------- Cargo.toml | 1 + src/main.rs | 9 ++-- src/selector.rs | 28 ++++++++++++ src/server.rs | 117 +++++++++++++++++++++++++++--------------------- 5 files changed, 119 insertions(+), 73 deletions(-) create mode 100644 src/selector.rs diff --git a/Cargo.lock b/Cargo.lock index e0aa343..c92a029 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1733,9 +1733,9 @@ checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" [[package]] name = "futures" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", @@ -1748,9 +1748,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", "futures-sink", @@ -1758,15 +1758,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-executor" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" dependencies = [ "futures-core", "futures-task", @@ -1775,15 +1775,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-macro" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", @@ -1792,21 +1792,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" [[package]] name = "futures-task" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" [[package]] name = "futures-util" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-channel", "futures-core", @@ -4572,6 +4572,7 @@ dependencies = [ "assert_cmd", "clap", "dotenv", + "futures", "http 1.1.0", "http-body 0.4.6", "http-body-util", diff --git a/Cargo.toml b/Cargo.toml index 7fc589e..9ffbd2d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ hyper-util = { version = "0.1", features = ["full"] } serde_json = "1.0.96" reth-rpc-layer = { git = "https://github.com/paradigmxyz/reth.git", tag = "v1.0.7" } reth-optimism-payload-builder = { git = "https://github.com/paradigmxyz/reth.git", tag = "v1.0.7", features = ["optimism"] } +futures = "0.3.31" [dev-dependencies] anyhow = "1.0" diff --git a/src/main.rs b/src/main.rs index 09137b3..e8ffdad 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,6 +18,7 @@ use tracing_subscriber::EnvFilter; mod error; mod proxy; mod server; +mod selector; #[derive(Parser, Debug)] #[clap(author, version, about)] @@ -45,7 +46,7 @@ struct Args { /// URL of the builder execution engine #[arg(long, env)] - builder_url: String, + builder_urls: Vec, /// Use the proposer to sync the builder node #[arg(long, env, default_value = "false")] @@ -112,12 +113,12 @@ async fn main() -> Result<()> { // Initialize the l2 client let l2_client = create_client(&args.l2_url, jwt_secret)?; - // Initialize the builder client - let builder_client = create_client(&args.builder_url, builder_jwt_secret)?; + // Initialize the builder clients + let builder_clients = args.builder_urls.iter().map(|url| create_client(url, builder_jwt_secret)).collect::>>()?; let eth_engine_api = EthEngineApi::new( Arc::new(l2_client), - Arc::new(builder_client), + builder_clients.iter().map(|c| Arc::new(c.clone())).collect(), args.boost_sync, ); let mut module: RpcModule<()> = RpcModule::new(()); diff --git a/src/selector.rs b/src/selector.rs new file mode 100644 index 0000000..1be0645 --- /dev/null +++ b/src/selector.rs @@ -0,0 +1,28 @@ +use jsonrpsee::core::ClientError; +use op_alloy_rpc_types_engine::OptimismExecutionPayloadEnvelopeV3; + +// Define a trait for choosing a payload +pub trait PayloadSelector { + fn select_payload( + &self, + local_payload: Result, + builder_payloads: Vec>, + ) -> Result; +} + +pub struct DefaultPayloadSelector; + +impl PayloadSelector for DefaultPayloadSelector { + fn select_payload( + &self, + local_payload: Result, + builder_payloads: Vec>, + ) -> Result { + builder_payloads + .iter() + .filter_map(|payload| payload.as_ref().ok()) + .max_by_key(|p| p.block_value) + .map(|p| Ok(p.clone())) + .unwrap_or(local_payload) + } +} \ No newline at end of file diff --git a/src/server.rs b/src/server.rs index 3190e2f..a0efe6b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,6 +3,7 @@ use alloy_rpc_types_engine::{ ExecutionPayload, ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus, }; +use futures::future::join_all; use jsonrpsee::core::{async_trait, ClientError, RpcResult}; use jsonrpsee::http_client::transport::HttpBackend; use jsonrpsee::http_client::HttpClient; @@ -16,6 +17,8 @@ use reth_rpc_layer::AuthClientService; use std::sync::Arc; use tracing::{error, info}; +use crate::selector::{DefaultPayloadSelector, PayloadSelector}; + #[rpc(server, client, namespace = "engine")] pub trait EngineApi { #[method(name = "forkchoiceUpdatedV3")] @@ -42,20 +45,22 @@ pub trait EngineApi { pub struct EthEngineApi> { l2_client: Arc>, - builder_client: Arc>, + builder_clients: Vec>>, + payload_selector: Arc, boost_sync: bool, } impl EthEngineApi { pub fn new( l2_client: Arc>, - builder_client: Arc>, + builder_clients: Vec>>, boost_sync: bool, ) -> Self { Self { l2_client, - builder_client, + builder_clients, boost_sync, + payload_selector: Arc::new(DefaultPayloadSelector), } } } @@ -85,11 +90,12 @@ impl EngineApiServer for EthEngineApi { }; if should_send_to_builder { - // async call to builder to trigger payload building and sync - let builder = self.builder_client.clone(); - let attr = payload_attributes.clone(); - tokio::spawn(async move { - builder.fork_choice_updated_v3(fork_choice_state, attr).await.map(|response| { + // async call to each builder to trigger payload building and sync + for builder in self.builder_clients.iter() { + let builder = builder.clone(); + let attr = payload_attributes.clone(); + tokio::spawn(async move { + builder.fork_choice_updated_v3(fork_choice_state, attr).await.map(|response| { let payload_id_str = response.payload_id.map(|id| id.to_string()).unwrap_or_default(); if response.is_invalid() { error!(message = "builder rejected fork_choice_updated_v3 with attributes", "payload_id" = payload_id_str, "validation_error" = %response.payload_status.status); @@ -99,7 +105,8 @@ impl EngineApiServer for EthEngineApi { }).map_err(|e| { error!(message = "error calling fork_choice_updated_v3 to builder", "error" = %e, "head_block_hash" = %fork_choice_state.head_block_hash); }) - }); + }); + } } else { info!(message = "no payload attributes provided or no_tx_pool is set", "head_block_hash" = %fork_choice_state.head_block_hash); } @@ -126,47 +133,53 @@ impl EngineApiServer for EthEngineApi { ) -> RpcResult { info!(message = "received get_payload_v3", "payload_id" = %payload_id); let l2_client_future = self.l2_client.get_payload_v3(payload_id); - let builder_client_future = Box::pin(async { - let payload = self.builder_client.get_payload_v3(payload_id).await.map_err(|e| { - error!(message = "error calling get_payload_v3 from builder", "error" = %e, "payload_id" = %payload_id); - e - })?; - - info!(message = "received payload from builder", "payload_id" = %payload_id, "block_hash" = %payload.as_v1_payload().block_hash); - - // Send the payload to the local execution engine with engine_newPayload to validate the block from the builder. - // Otherwise, we do not want to risk the network to a halt since op-node will not be able to propose the block. - // If validation fails, return the local block since that one has already been validated. - let payload_status = self.l2_client.new_payload_v3(payload.execution_payload.clone(), vec![], payload.parent_beacon_block_root).await.map_err(|e| { - error!(message = "error calling new_payload_v3 to validate builder payload", "error" = %e, "payload_id" = %payload_id); - e - })?; - if payload_status.is_invalid() { + let builder_client_futures = self.builder_clients.iter().map(|builder| { + let builder = builder.clone(); + Box::pin(async move { + let payload = builder.get_payload_v3(payload_id).await.map_err(|e| { + error!(message = "error calling get_payload_v3 from builder", "error" = %e, "payload_id" = %payload_id); + e + })?; + + info!(message = "received payload from builder", "payload_id" = %payload_id, "block_hash" = %payload.as_v1_payload().block_hash); + + // Send the payload to the local execution engine with engine_newPayload to validate the block from the builder. + // Otherwise, we do not want to risk the network to a halt since op-node will not be able to propose the block. + // If validation fails, return the local block since that one has already been validated. + let payload_status = self.l2_client.new_payload_v3(payload.execution_payload.clone(), vec![], payload.parent_beacon_block_root).await.map_err(|e| { + error!(message = "error calling new_payload_v3 to validate builder payload", "error" = %e, "payload_id" = %payload_id); + e + })?; + if payload_status.is_invalid() { error!(message = "builder payload was not valid", "payload_status" = %payload_status.status, "payload_id" = %payload_id); Err(ClientError::Call(ErrorObject::owned( INVALID_REQUEST_CODE, "Builder payload was not valid", None::, - ))) - } else { - info!(message = "received payload status from local execution engine validating builder payload", "payload_id" = %payload_id); - Ok(payload) - } - }); - - let (l2_payload, builder_payload) = tokio::join!(l2_client_future, builder_client_future); - - builder_payload.or(l2_payload).map_err(|e| match e { - ClientError::Call(err) => err, // Already an ErrorObjectOwned, so just return it - other_error => { - error!( - message = "error calling get_payload_v3", - "error" = %other_error, - "payload_id" = %payload_id - ); - ErrorCode::InternalError.into() - } - }) + ))) + } else { + info!(message = "received payload status from local execution engine validating builder payload", "payload_id" = %payload_id); + Ok(payload) + } + }) + }).collect::>(); + + let (l2_payload, builder_payloads) = + tokio::join!(l2_client_future, join_all(builder_client_futures)); + + self.payload_selector + .select_payload(l2_payload, builder_payloads) + .map_err(|e| match e { + ClientError::Call(err) => err, // Already an ErrorObjectOwned, so just return it + other_error => { + error!( + message = "error calling get_payload_v3", + "error" = %other_error, + "payload_id" = %payload_id + ); + ErrorCode::InternalError.into() + } + }) } async fn new_payload_v3( @@ -180,11 +193,12 @@ impl EngineApiServer for EthEngineApi { // async call to builder to sync the builder node if self.boost_sync { - let builder = self.builder_client.clone(); - let builder_payload = payload.clone(); - let builder_versioned_hashes = versioned_hashes.clone(); - tokio::spawn(async move { - builder.new_payload_v3(builder_payload, builder_versioned_hashes, parent_beacon_block_root).await + for builder in self.builder_clients.iter() { + let builder = builder.clone(); + let builder_payload = payload.clone(); + let builder_versioned_hashes = versioned_hashes.clone(); + tokio::spawn(async move { + builder.new_payload_v3(builder_payload, builder_versioned_hashes, parent_beacon_block_root).await .map(|response: PayloadStatus| { if response.is_invalid() { error!(message = "builder rejected new_payload_v3", "block_hash" = %block_hash); @@ -195,7 +209,8 @@ impl EngineApiServer for EthEngineApi { error!(message = "error calling new_payload_v3 to builder", "error" = %e, "block_hash" = %block_hash); e }) - }); + }); + } } self.l2_client