Skip to content

Commit

Permalink
chore: graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
namn-grg committed Jan 23, 2025
1 parent 81a9216 commit 0e6a7c4
Showing 1 changed file with 38 additions and 15 deletions.
53 changes: 38 additions & 15 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,30 @@
use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};

use clap::{arg, ArgGroup, Parser};
use dotenv::dotenv;
use error::Error;
use http::{StatusCode, Uri};
use hyper::service::service_fn;
use hyper::{server::conn::http1, Request, Response};
use hyper_util::rt::TokioIo;
use jsonrpsee::http_client::transport::HttpBackend;
use jsonrpsee::http_client::{HttpBody, HttpClient, HttpClientBuilder};
use jsonrpsee::server::Server;
use jsonrpsee::RpcModule;
use jsonrpsee::{
http_client::{transport::HttpBackend, HttpBody, HttpClient, HttpClientBuilder},
server::Server,
RpcModule,
};
use metrics::ServerMetrics;
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
use metrics_util::layers::{PrefixLayer, Stack};
use opentelemetry::global;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::trace::Config;
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::Config, Resource};
use proxy::ProxyLayer;
use reth_rpc_layer::{AuthClientLayer, AuthClientService, JwtSecret};
use server::{EngineApiServer, EthEngineApi, HttpClientWrapper};
use std::net::AddrParseError;
use std::sync::Arc;
use std::time::Duration;
use std::{net::SocketAddr, path::PathBuf};
use tokio::net::TcpListener;
use tracing::error;
use tracing::{info, Level};
use tokio::signal::unix::{signal as unix_signal, SignalKind};
use tracing::{error, info, Level};
use tracing_subscriber::EnvFilter;

mod error;
Expand Down Expand Up @@ -159,7 +157,7 @@ async fn main() -> Result<()> {
None
};

// telemetry setup
// Telemetry setup
if args.tracing {
init_tracing(&args.otlp_endpoint);
}
Expand Down Expand Up @@ -199,6 +197,7 @@ async fn main() -> Result<()> {
let builder_client =
create_client(&args.builder_url, builder_jwt_secret, args.builder_timeout)?;

// Construct the RPC module
let eth_engine_api = EthEngineApi::new(
Arc::new(l2_client),
Arc::new(builder_client),
Expand All @@ -210,13 +209,14 @@ async fn main() -> Result<()> {
.merge(eth_engine_api.into_rpc())
.map_err(|e| Error::InitRPCServer(e.to_string()))?;

// server setup
// Build and start the server
info!("Starting server on :{}", args.rpc_port);
let service_builder = tower::ServiceBuilder::new().layer(ProxyLayer::new(
args.l2_url
.parse::<Uri>()
.map_err(|e| Error::InvalidArgs(e.to_string()))?,
));

let server = Server::builder()
.set_http_middleware(service_builder)
.build(
Expand All @@ -226,8 +226,30 @@ async fn main() -> Result<()> {
)
.await
.map_err(|e| Error::InitRPCServer(e.to_string()))?;

let handle = server.start(module);
handle.stopped().await;
let stop_handle = handle.clone();

// Capture SIGINT and SIGTERM
let mut sigint =
unix_signal(SignalKind::interrupt()).map_err(|e| Error::InitRPCServer(e.to_string()))?;
let mut sigterm =
unix_signal(SignalKind::terminate()).map_err(|e| Error::InitRPCServer(e.to_string()))?;

tokio::select! {
_ = handle.stopped() => {
// The server has already shut down by itself
info!("Server stopped");
}
_ = sigint.recv() => {
info!("Received SIGINT, shutting down gracefully...");
let _ = stop_handle.stop();
}
_ = sigterm.recv() => {
info!("Received SIGTERM, shutting down gracefully...");
let _ = stop_handle.stop();
}
}

Ok(())
}
Expand All @@ -246,6 +268,7 @@ fn create_client(
.request_timeout(Duration::from_millis(timeout))
.build(url)
.map_err(|e| Error::InitRPCClient(e.to_string()))?;

Ok(HttpClientWrapper::new(client, url.to_string()))
}

Expand Down

0 comments on commit 0e6a7c4

Please sign in to comment.