Skip to content

Commit

Permalink
chore: graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
namn-grg committed Jan 19, 2025
1 parent feef2c2 commit aa11beb
Showing 1 changed file with 46 additions and 19 deletions.
65 changes: 46 additions & 19 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};

use clap::{arg, ArgGroup, Parser};
use dotenv::dotenv;
use error::Error;
use http::Uri;
use jsonrpsee::http_client::transport::HttpBackend;
use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
use jsonrpsee::server::Server;
use jsonrpsee::RpcModule;
use jsonrpsee::{
http_client::{transport::HttpBackend, HttpClient, HttpClientBuilder},
server::Server,
RpcModule,
};
use metrics::ServerMetrics;
use metrics_exporter_prometheus::PrometheusBuilder;
use metrics_util::layers::{PrefixLayer, Stack};
use opentelemetry::global;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::trace::Config;
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::Config, Resource};
use proxy::ProxyLayer;
use reth_rpc_layer::{AuthClientLayer, AuthClientService, JwtSecret};
use server::{EngineApiServer, EthEngineApi, HttpClientWrapper};
use std::sync::Arc;
use std::time::Duration;
use std::{net::SocketAddr, path::PathBuf};
use tracing::error;
use tracing::{info, Level};
use tokio::signal::unix::{signal as unix_signal, SignalKind};
use tracing::{error, info, Level};
use tracing_subscriber::EnvFilter;

mod error;
Expand Down Expand Up @@ -123,7 +121,7 @@ async fn main() -> Result<()> {
(None, None)
};

// telemetry setup
// Telemetry setup
if args.tracing {
init_tracing(&args.otlp_endpoint);
}
Expand Down Expand Up @@ -163,6 +161,7 @@ async fn main() -> Result<()> {
let builder_client =
create_client(&args.builder_url, builder_jwt_secret, args.builder_timeout)?;

// Construct the RPC module
let eth_engine_api = EthEngineApi::new(
Arc::new(l2_client),
Arc::new(builder_client),
Expand All @@ -174,14 +173,15 @@ async fn main() -> Result<()> {
.merge(eth_engine_api.into_rpc())
.map_err(|e| Error::InitRPCServer(e.to_string()))?;

// server setup
// Build and start the server
info!("Starting server on :{}", args.rpc_port);
let service_builder = tower::ServiceBuilder::new().layer(ProxyLayer::new(
args.l2_url
.parse::<Uri>()
.map_err(|e| Error::InvalidArgs(e.to_string()))?,
handler,
));

let server = Server::builder()
.set_http_middleware(service_builder)
.build(
Expand All @@ -191,8 +191,34 @@ async fn main() -> Result<()> {
)
.await
.map_err(|e| Error::InitRPCServer(e.to_string()))?;

let handle = server.start(module);
handle.stopped().await;
let stop_handle = handle.clone();

// Graceful shutdown handling:
// 1. The RPC server finishing by itself (it errors out)
// 2. OS signal (SIGINT or SIGTERM)

// For Unix systems, you can capture both SIGINT and SIGTERM:
let mut sigint =
unix_signal(SignalKind::interrupt()).map_err(|e| Error::InitRPCServer(e.to_string()))?;
let mut sigterm =
unix_signal(SignalKind::terminate()).map_err(|e| Error::InitRPCServer(e.to_string()))?;

tokio::select! {
_ = handle.stopped() => {
// The server has already shut down by itself
info!("Server stopped");
}
_ = sigint.recv() => {
info!("Received SIGINT, shutting down gracefully...");
let _ = stop_handle.stop();
}
_ = sigterm.recv() => {
info!("Received SIGTERM, shutting down gracefully...");
let _ = stop_handle.stop();
}
}

Ok(())
}
Expand All @@ -211,12 +237,13 @@ fn create_client(
.request_timeout(Duration::from_millis(timeout))
.build(url)
.map_err(|e| Error::InitRPCClient(e.to_string()))?;

Ok(HttpClientWrapper::new(client, url.to_string()))
}

fn init_tracing(endpoint: &str) {
global::set_text_map_propagator(TraceContextPropagator::new());
let provider = opentelemetry_otlp::new_pipeline()
let pipeline = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
Expand All @@ -225,9 +252,9 @@ fn init_tracing(endpoint: &str) {
)
.with_trace_config(Config::default().with_resource(Resource::new(vec![
opentelemetry::KeyValue::new("service.name", "rollup-boost"),
])))
.install_batch(opentelemetry_sdk::runtime::Tokio);
match provider {
])));

match pipeline.install_batch(opentelemetry_sdk::runtime::Tokio) {
Ok(provider) => {
let _ = global::set_tracer_provider(provider);
}
Expand Down

0 comments on commit aa11beb

Please sign in to comment.