diff --git a/Cargo.lock b/Cargo.lock index 073507145178..fa92d3a3cf92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9247,6 +9247,28 @@ dependencies = [ "zksync_crypto", ] +[[package]] +name = "zksync_node" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "futures 0.3.28", + "prometheus_exporter", + "thiserror", + "tokio", + "tracing", + "vlog", + "zksync_config", + "zksync_core", + "zksync_dal", + "zksync_env_config", + "zksync_health_check", + "zksync_object_store", + "zksync_storage", + "zksync_types", +] + [[package]] name = "zksync_object_store" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 3487716a8d3e..1f8f52a7991a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ members = [ "core/lib/mempool", "core/lib/merkle_tree", "core/lib/mini_merkle_tree", + "core/lib/node", "core/lib/object_store", "core/lib/prometheus_exporter", "core/lib/queued_job_processor", diff --git a/checks-config/era.dic b/checks-config/era.dic index f0ec14591b1f..15f14a6e5f8c 100644 --- a/checks-config/era.dic +++ b/checks-config/era.dic @@ -881,3 +881,11 @@ plookup shivini EIP4844 KZG +healthcheck +healthchecks +after_node_shutdown +runnable +downcasting +parameterized +reimplementation +composability diff --git a/core/lib/dal/src/connection/mod.rs b/core/lib/dal/src/connection/mod.rs index dba7098174e5..c689ddf8bc18 100644 --- a/core/lib/dal/src/connection/mod.rs +++ b/core/lib/dal/src/connection/mod.rs @@ -11,6 +11,7 @@ use crate::{metrics::CONNECTION_METRICS, StorageProcessor}; pub mod holder; /// Builder for [`ConnectionPool`]s. +#[derive(Clone)] pub struct ConnectionPoolBuilder { database_url: String, max_size: u32, @@ -65,6 +66,15 @@ impl ConnectionPoolBuilder { max_size: self.max_size, }) } + + /// Builds a connection pool that has a single connection. + pub async fn build_singleton(&self) -> anyhow::Result { + let singleton_builder = Self { + max_size: 1, + ..self.clone() + }; + singleton_builder.build().await + } } #[derive(Clone)] diff --git a/core/lib/health_check/src/lib.rs b/core/lib/health_check/src/lib.rs index 12bb292bc850..15a0d2945493 100644 --- a/core/lib/health_check/src/lib.rs +++ b/core/lib/health_check/src/lib.rs @@ -80,10 +80,13 @@ pub struct AppHealth { impl AppHealth { /// Aggregates health info from the provided checks. - pub async fn new(health_checks: &[Box]) -> Self { + pub async fn new>(health_checks: &[T]) -> Self { let check_futures = health_checks.iter().map(|check| { - let check_name = check.name(); - check.check_health().map(move |health| (check_name, health)) + let check_name = check.as_ref().name(); + check + .as_ref() + .check_health() + .map(move |health| (check_name, health)) }); let components: HashMap<_, _> = future::join_all(check_futures).await.into_iter().collect(); diff --git a/core/lib/node/Cargo.toml b/core/lib/node/Cargo.toml new file mode 100644 index 000000000000..bda2315f5361 --- /dev/null +++ b/core/lib/node/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "zksync_node" +version = "0.1.0" +edition = "2018" +authors = ["The Matter Labs Team "] +homepage = "https://zksync.io/" +repository = "https://github.com/matter-labs/zksync-era" +license = "MIT OR Apache-2.0" +keywords = ["blockchain", "zksync"] +categories = ["cryptography"] + +[dependencies] +prometheus_exporter = { path = "../prometheus_exporter" } +zksync_types = { path = "../types" } +zksync_health_check = { path = "../health_check" } +zksync_dal = { path = "../dal" } +zksync_config = { path = "../config" } +zksync_object_store = { path = "../object_store" } +zksync_core = { path = "../zksync_core" } +zksync_storage = { path = "../storage" } + +tracing = "0.1" +thiserror = "1" +async-trait = "0.1" +futures = "0.3" +anyhow = "1" +tokio = { version = "1", features = ["rt"] } + +[dev-dependencies] +zksync_env_config = { path = "../env_config" } +vlog = { path = "../vlog" } diff --git a/core/lib/node/examples/main_node.rs b/core/lib/node/examples/main_node.rs new file mode 100644 index 000000000000..afe64139e0ba --- /dev/null +++ b/core/lib/node/examples/main_node.rs @@ -0,0 +1,84 @@ +//! An incomplete example of how node initialization looks like. +//! This example defines a `ResourceProvider` that works using the main node env config, and +//! initializes a single task with a health check server. + +use zksync_config::{configs::chain::OperationsManagerConfig, DBConfig, PostgresConfig}; +use zksync_core::metadata_calculator::MetadataCalculatorConfig; +use zksync_dal::ConnectionPool; +use zksync_env_config::FromEnv; +use zksync_node::{ + implementations::{ + resource::pools::MasterPoolResource, + task::{ + healtcheck_server::HealthCheckTaskBuilder, + metadata_calculator::MetadataCalculatorTaskBuilder, + }, + }, + node::ZkSyncNode, + resource::{Resource, ResourceId, ResourceProvider, StoredResource}, +}; + +/// Resource provider for the main node. +/// It defines which resources the tasks will receive. This particular provider is stateless, e.g. it always uses +/// the main node env config, and always knows which resources to provide. +/// The resource provider can be dynamic, however. For example, we can define a resource provider which may use +/// different config load scheme (e.g. env variables / protobuf / yaml / toml), and which resources to provide +/// (e.g. decide whether we need MempoolIO or ExternalIO depending on some config). +#[derive(Debug)] +struct MainNodeResourceProvider; + +impl MainNodeResourceProvider { + fn master_pool_resource() -> anyhow::Result { + let config = PostgresConfig::from_env()?; + let mut master_pool = + ConnectionPool::builder(config.master_url()?, config.max_connections()?); + master_pool.set_statement_timeout(config.statement_timeout()); + + Ok(MasterPoolResource::new(master_pool)) + } +} + +#[async_trait::async_trait] +impl ResourceProvider for MainNodeResourceProvider { + async fn get_resource(&self, name: &ResourceId) -> Option> { + match name { + name if name == &MasterPoolResource::resource_id() => { + let resource = + Self::master_pool_resource().expect("Failed to create pools resource"); + Some(Box::new(resource)) + } + _ => None, + } + } +} + +fn main() -> anyhow::Result<()> { + #[allow(deprecated)] // TODO (QIT-21): Use centralized configuration approach. + let log_format = vlog::log_format_from_env(); + let _guard = vlog::ObservabilityBuilder::new() + .with_log_format(log_format) + .build(); + + // Create the node with specified resource provider. We don't need to add any resources explicitly, + // the task will request what they actually need. The benefit here is that we won't instantiate resources + // that are not used, which would be complex otherwise, since the task set is often dynamic. + let mut node = ZkSyncNode::new(MainNodeResourceProvider)?; + + // Add the metadata calculator task. + let merkle_tree_env_config = DBConfig::from_env()?.merkle_tree; + let operations_manager_env_config = OperationsManagerConfig::from_env()?; + let metadata_calculator_config = MetadataCalculatorConfig::for_main_node( + &merkle_tree_env_config, + &operations_manager_env_config, + ); + node.add_task(MetadataCalculatorTaskBuilder(metadata_calculator_config)); + + // Add the healthcheck server. + let healthcheck_config = zksync_config::ApiConfig::from_env()?.healthcheck; + node.add_task(HealthCheckTaskBuilder(healthcheck_config)); + + // Run the node until completion. + node.run()?; + + Ok(()) +} diff --git a/core/lib/node/src/implementations/mod.rs b/core/lib/node/src/implementations/mod.rs new file mode 100644 index 000000000000..b381595d68f9 --- /dev/null +++ b/core/lib/node/src/implementations/mod.rs @@ -0,0 +1,6 @@ +//! Implementations of resources and tasks. +//! These are temporarily provided by the framework crate itself, but will be moved to the separate crates +//! in the future. + +pub mod resource; +pub mod task; diff --git a/core/lib/node/src/implementations/resource/healthcheck.rs b/core/lib/node/src/implementations/resource/healthcheck.rs new file mode 100644 index 000000000000..5b752e33d59b --- /dev/null +++ b/core/lib/node/src/implementations/resource/healthcheck.rs @@ -0,0 +1,36 @@ +use std::{fmt, sync::Arc}; + +// Public re-exports from external crate to minimize the required dependencies. +pub use zksync_health_check::{CheckHealth, ReactiveHealthCheck}; + +use crate::resource::Resource; + +/// Wrapper for a generic health check. +#[derive(Clone)] +pub struct HealthCheckResource(Arc); + +impl HealthCheckResource { + pub fn new(check: impl CheckHealth + 'static) -> Self { + Self(Arc::new(check)) + } +} + +impl fmt::Debug for HealthCheckResource { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("HealthCheckResource") + .field("name", &self.0.name()) + .finish_non_exhaustive() + } +} + +impl Resource for HealthCheckResource { + fn resource_id() -> crate::resource::ResourceId { + "common/health_check".into() + } +} + +impl AsRef for HealthCheckResource { + fn as_ref(&self) -> &dyn CheckHealth { + self.0.as_ref() + } +} diff --git a/core/lib/node/src/implementations/resource/mod.rs b/core/lib/node/src/implementations/resource/mod.rs new file mode 100644 index 000000000000..7042d58ef59e --- /dev/null +++ b/core/lib/node/src/implementations/resource/mod.rs @@ -0,0 +1,3 @@ +pub mod healthcheck; +pub mod object_store; +pub mod pools; diff --git a/core/lib/node/src/implementations/resource/object_store.rs b/core/lib/node/src/implementations/resource/object_store.rs new file mode 100644 index 000000000000..093061a9b2d4 --- /dev/null +++ b/core/lib/node/src/implementations/resource/object_store.rs @@ -0,0 +1,15 @@ +use std::sync::Arc; + +use zksync_object_store::ObjectStore; + +use crate::resource::Resource; + +/// Wrapper for the object store. +#[derive(Debug, Clone)] +pub struct ObjectStoreResource(pub Arc); + +impl Resource for ObjectStoreResource { + fn resource_id() -> crate::resource::ResourceId { + "common/object_store".into() + } +} diff --git a/core/lib/node/src/implementations/resource/pools.rs b/core/lib/node/src/implementations/resource/pools.rs new file mode 100644 index 000000000000..c2194bddc72e --- /dev/null +++ b/core/lib/node/src/implementations/resource/pools.rs @@ -0,0 +1,75 @@ +use zksync_dal::{connection::ConnectionPoolBuilder, ConnectionPool}; + +use crate::resource::Resource; + +/// Represents a connection pool to the master database. +#[derive(Debug, Clone)] +pub struct MasterPoolResource(ConnectionPoolBuilder); + +impl Resource for MasterPoolResource { + fn resource_id() -> crate::resource::ResourceId { + "common/master_pool".into() + } +} + +impl MasterPoolResource { + pub fn new(builder: ConnectionPoolBuilder) -> Self { + Self(builder) + } + + pub async fn get(&self) -> anyhow::Result { + self.0.build().await + } + + pub async fn get_singleton(&self) -> anyhow::Result { + self.0.build_singleton().await + } +} + +/// Represents a connection pool to the replica database. +#[derive(Debug, Clone)] +pub struct ReplicaPoolResource(ConnectionPoolBuilder); + +impl Resource for ReplicaPoolResource { + fn resource_id() -> crate::resource::ResourceId { + "common/replica_pool".into() + } +} + +impl ReplicaPoolResource { + pub fn new(builder: ConnectionPoolBuilder) -> Self { + Self(builder) + } + + pub async fn get(&self) -> anyhow::Result { + self.0.build().await + } + + pub async fn get_singleton(&self) -> anyhow::Result { + self.0.build_singleton().await + } +} + +/// Represents a connection pool to the prover database. +#[derive(Debug, Clone)] +pub struct ProverPoolResource(ConnectionPoolBuilder); + +impl Resource for ProverPoolResource { + fn resource_id() -> crate::resource::ResourceId { + "common/prover_pool".into() + } +} + +impl ProverPoolResource { + pub fn new(builder: ConnectionPoolBuilder) -> Self { + Self(builder) + } + + pub async fn get(&self) -> anyhow::Result { + self.0.build().await + } + + pub async fn get_singleton(&self) -> anyhow::Result { + self.0.build_singleton().await + } +} diff --git a/core/lib/node/src/implementations/task/healtcheck_server.rs b/core/lib/node/src/implementations/task/healtcheck_server.rs new file mode 100644 index 000000000000..6350196da40e --- /dev/null +++ b/core/lib/node/src/implementations/task/healtcheck_server.rs @@ -0,0 +1,67 @@ +use std::fmt; + +use zksync_config::configs::api::HealthCheckConfig; +use zksync_core::api_server::healthcheck::HealthCheckHandle; + +use crate::{ + implementations::resource::healthcheck::HealthCheckResource, + node::{NodeContext, StopReceiver}, + resource::ResourceCollection, + task::{IntoZkSyncTask, TaskInitError, ZkSyncTask}, +}; + +/// Builder for a health check server. +/// +/// Spawned task collects all the health checks added by different tasks to the +/// corresponding resource collection and spawns an HTTP server exposing them. +#[derive(Debug)] +pub struct HealthCheckTaskBuilder(pub HealthCheckConfig); + +#[async_trait::async_trait] +impl IntoZkSyncTask for HealthCheckTaskBuilder { + fn task_name(&self) -> &'static str { + "healthcheck_server" + } + + async fn create( + self: Box, + mut node: NodeContext<'_>, + ) -> Result, TaskInitError> { + let healthchecks = node + .get_resource_or_default::>() + .await; + + let task = HealthCheckTask { + config: self.0, + healthchecks, + }; + + Ok(Box::new(task)) + } +} + +struct HealthCheckTask { + config: HealthCheckConfig, + healthchecks: ResourceCollection, +} + +impl fmt::Debug for HealthCheckTask { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("HealthCheckTask") + .field("config", &self.config) + .finish_non_exhaustive() + } +} + +#[async_trait::async_trait] +impl ZkSyncTask for HealthCheckTask { + async fn run(mut self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { + let healthchecks = self.healthchecks.resolve().await; + + let handle = HealthCheckHandle::spawn_server(self.config.bind_addr(), healthchecks); + stop_receiver.0.changed().await?; + handle.stop().await; + + Ok(()) + } +} diff --git a/core/lib/node/src/implementations/task/metadata_calculator.rs b/core/lib/node/src/implementations/task/metadata_calculator.rs new file mode 100644 index 000000000000..4bfe18167b40 --- /dev/null +++ b/core/lib/node/src/implementations/task/metadata_calculator.rs @@ -0,0 +1,81 @@ +use zksync_core::metadata_calculator::{MetadataCalculator, MetadataCalculatorConfig}; +use zksync_dal::ConnectionPool; +use zksync_storage::RocksDB; + +use crate::{ + implementations::resource::{ + healthcheck::HealthCheckResource, object_store::ObjectStoreResource, + pools::MasterPoolResource, + }, + node::{NodeContext, StopReceiver}, + resource::{Resource, ResourceCollection}, + task::{IntoZkSyncTask, TaskInitError, ZkSyncTask}, +}; + +/// Builder for a metadata calculator. +#[derive(Debug)] +pub struct MetadataCalculatorTaskBuilder(pub MetadataCalculatorConfig); + +#[derive(Debug)] +pub struct MetadataCalculatorTask { + metadata_calculator: MetadataCalculator, + main_pool: ConnectionPool, +} + +#[async_trait::async_trait] +impl IntoZkSyncTask for MetadataCalculatorTaskBuilder { + fn task_name(&self) -> &'static str { + "metadata_calculator" + } + + async fn create( + self: Box, + mut node: NodeContext<'_>, + ) -> Result, TaskInitError> { + let pool = node.get_resource::().await.ok_or( + TaskInitError::ResourceLacking(MasterPoolResource::resource_id()), + )?; + let main_pool = pool.get().await.unwrap(); + let object_store = node.get_resource::().await; // OK to be None. + + if object_store.is_none() { + tracing::info!( + "Object store is not provided, metadata calculator will run without it." + ); + } + + let metadata_calculator = + MetadataCalculator::new(self.0, object_store.map(|os| os.0)).await; + + let healthchecks = node + .get_resource_or_default::>() + .await; + healthchecks + .push(HealthCheckResource::new( + metadata_calculator.tree_health_check(), + )) + .expect("Wiring stage"); + + Ok(Box::new(MetadataCalculatorTask { + metadata_calculator, + main_pool, + })) + } +} + +#[async_trait::async_trait] +impl ZkSyncTask for MetadataCalculatorTask { + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + let result = self + .metadata_calculator + .run(self.main_pool, stop_receiver.0) + .await; + + // Wait for all the instances of RocksDB to be destroyed. + tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination) + .await + .unwrap(); + + result + } +} diff --git a/core/lib/node/src/implementations/task/mod.rs b/core/lib/node/src/implementations/task/mod.rs new file mode 100644 index 000000000000..d56e496cfb01 --- /dev/null +++ b/core/lib/node/src/implementations/task/mod.rs @@ -0,0 +1,3 @@ +pub mod healtcheck_server; +pub mod metadata_calculator; +pub mod prometheus_exporter; diff --git a/core/lib/node/src/implementations/task/prometheus_exporter.rs b/core/lib/node/src/implementations/task/prometheus_exporter.rs new file mode 100644 index 000000000000..17923efde95b --- /dev/null +++ b/core/lib/node/src/implementations/task/prometheus_exporter.rs @@ -0,0 +1,58 @@ +use prometheus_exporter::PrometheusExporterConfig; +use zksync_health_check::{HealthStatus, HealthUpdater, ReactiveHealthCheck}; + +use crate::{ + implementations::resource::healthcheck::HealthCheckResource, + node::{NodeContext, StopReceiver}, + resource::ResourceCollection, + task::{IntoZkSyncTask, TaskInitError, ZkSyncTask}, +}; + +/// Builder for a prometheus exporter. +#[derive(Debug)] +pub struct PrometheusExporterTaskBuilder(pub PrometheusExporterConfig); + +#[derive(Debug)] +pub struct PrometheusExporterTask { + config: PrometheusExporterConfig, + prometheus_health_updater: HealthUpdater, +} + +#[async_trait::async_trait] +impl IntoZkSyncTask for PrometheusExporterTaskBuilder { + fn task_name(&self) -> &'static str { + "prometheus_exporter" + } + + async fn create( + self: Box, + mut node: NodeContext<'_>, + ) -> Result, TaskInitError> { + let (prometheus_health_check, prometheus_health_updater) = + ReactiveHealthCheck::new("prometheus_exporter"); + + let healthchecks = node + .get_resource_or_default::>() + .await; + healthchecks + .push(HealthCheckResource::new(prometheus_health_check)) + .expect("Wiring stage"); + + Ok(Box::new(PrometheusExporterTask { + config: self.0, + prometheus_health_updater, + })) + } +} + +#[async_trait::async_trait] +impl ZkSyncTask for PrometheusExporterTask { + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + let prometheus_task = self.config.run(stop_receiver.0); + self.prometheus_health_updater + .update(HealthStatus::Ready.into()); + let res = prometheus_task.await; + drop(self.prometheus_health_updater); + res + } +} diff --git a/core/lib/node/src/lib.rs b/core/lib/node/src/lib.rs new file mode 100644 index 000000000000..c714f998a12a --- /dev/null +++ b/core/lib/node/src/lib.rs @@ -0,0 +1,24 @@ +//! # ZK Stack node initialization framework. +//! +//! ## Introduction +//! +//! This crate provides core abstractions that allow one to compose a ZK Stack node. +//! Main concepts used in this crate are: +//! - [`IntoZkSyncTask`](task::IntoZkSyncTask) - builder interface for tasks. +//! - [`ZkSyncTask`](task::ZkSyncTask) - a unit of work that can be executed by the node. +//! - [`Resource`](resource::Resource) - a piece of logic that can be shared between tasks. Most resources are +//! represented by generic interfaces and also serve as points of customization for tasks. +//! - [`ResourceProvider`](resource::ResourceProvider) - a trait that allows one to provide resources to the node. +//! - [`ZkSyncNode`](node::ZkSyncNode) - a container for tasks and resources that takes care of initialization, running +//! and shutting down. +//! +//! The general flow to compose a node is as follows: +//! - Create a [`ResourceProvider`](resource::ResourceProvider) that can provide all the resources that the node needs. +//! - Create a [`ZkSyncNode`](node::ZkSyncNode) with that [`ResourceProvider`](resource::ResourceProvider). +//! - Add tasks to the node. +//! - Run it. + +pub mod implementations; +pub mod node; +pub mod resource; +pub mod task; diff --git a/core/lib/node/src/node/context.rs b/core/lib/node/src/node/context.rs new file mode 100644 index 000000000000..897bc1c967ef --- /dev/null +++ b/core/lib/node/src/node/context.rs @@ -0,0 +1,101 @@ +use crate::{ + node::ZkSyncNode, + resource::{Resource, StoredResource}, + task::IntoZkSyncTask, +}; + +/// An interface to the node's resources provided to the tasks during initialization. +/// Provides the ability to fetch required resources, and also gives access to the Tokio runtime handle used by the node. +#[derive(Debug)] +pub struct NodeContext<'a> { + node: &'a mut ZkSyncNode, +} + +impl<'a> NodeContext<'a> { + pub(super) fn new(node: &'a mut ZkSyncNode) -> Self { + Self { node } + } + + /// Provides access to the runtime used by the node. + /// Can be used to spawn additional tasks within the same runtime. + /// If some tasks stores the handle to spawn additional tasks, it is expected to do all the required + /// cleanup. + /// + /// In most cases, however, it is recommended to use [`add_task`] method instead. + pub fn runtime_handle(&self) -> &tokio::runtime::Handle { + self.node.runtime.handle() + } + + /// Adds an additional task to the node. + /// This may be used if some task or its resource requires an additional routine for maintenance. + pub fn add_task(&mut self, builder: T) -> &mut Self { + self.node.add_task(builder); + self + } + + /// Attempts to retrieve the resource with the specified name. + /// Internally the resources are stored as [`std::any::Any`], and this method does the downcasting + /// on behalf of the caller. + /// + /// ## Panics + /// + /// Panics if the resource with the specified name exists, but is not of the requested type. + pub async fn get_resource(&mut self) -> Option { + #[allow(clippy::borrowed_box)] + let downcast_clone = |resource: &Box| { + resource + .downcast_ref::() + .unwrap_or_else(|| { + panic!( + "Resource {} is not of type {}", + T::resource_id(), + std::any::type_name::() + ) + }) + .clone() + }; + + let name = T::resource_id(); + // Check whether the resource is already available. + if let Some(resource) = self.node.resources.get(&name) { + return Some(downcast_clone(resource)); + } + + // Try to fetch the resource from the provider. + if let Some(resource) = self.node.resource_provider.get_resource(&name).await { + // First, ensure the type matches. + let downcasted = downcast_clone(&resource); + // Then, add it to the local resources. + self.node.resources.insert(name, resource); + return Some(downcasted); + } + + // No such resource. + // The requester is allowed to decide whether this is an error or not. + None + } + + /// Attempts to retrieve the resource with the specified name. + /// If the resource is not available, it is created using the provided closure. + pub async fn get_resource_or_insert_with T>( + &mut self, + f: F, + ) -> T { + if let Some(resource) = self.get_resource::().await { + return resource; + } + + // No such resource, insert a new one. + let resource = f(); + self.node + .resources + .insert(T::resource_id(), Box::new(resource.clone())); + resource + } + + /// Attempts to retrieve the resource with the specified name. + /// If the resource is not available, it is created using `T::default()`. + pub async fn get_resource_or_default(&mut self) -> T { + self.get_resource_or_insert_with(T::default).await + } +} diff --git a/core/lib/node/src/node/mod.rs b/core/lib/node/src/node/mod.rs new file mode 100644 index 000000000000..42b9c9ac60a4 --- /dev/null +++ b/core/lib/node/src/node/mod.rs @@ -0,0 +1,207 @@ +use std::{collections::HashMap, fmt}; + +use futures::{future::BoxFuture, FutureExt}; +use tokio::{runtime::Runtime, sync::watch}; + +pub use self::{context::NodeContext, stop_receiver::StopReceiver}; +use crate::{ + resource::{ResourceId, ResourceProvider, StoredResource}, + task::{IntoZkSyncTask, TaskInitError}, +}; + +mod context; +mod stop_receiver; + +/// "Manager" class of the node. Collects all the resources and tasks, +/// then runs tasks until completion. +/// +/// Initialization flow: +/// - Node instance is created with access to the resource provider. +/// - Task constructors are added to the node. At this step, tasks are not created yet. +/// - Optionally, a healthcheck task constructor is also added. +/// - Once the `run` method is invoked, node +/// - attempts to create every task. If there are no tasks, or at least task +/// constructor fails, the node will return an error. +/// - initializes the healthcheck task if it's provided. +/// - waits for any of the tasks to finish. +/// - sends stop signal to all the tasks. +/// - waits for the remaining tasks to finish. +/// - calls `after_node_shutdown` hook for every task that has provided it. +/// - returns the result of the task that has finished. +pub struct ZkSyncNode { + /// Primary source of resources for tasks. + resource_provider: Box, + /// Cache of resources that have been requested at least by one task. + resources: HashMap>, + /// List of task builders. + task_builders: Vec>, + + /// Sender used to stop the tasks. + stop_sender: watch::Sender, + /// Tokio runtime used to spawn tasks. + runtime: Runtime, +} + +impl fmt::Debug for ZkSyncNode { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ZkSyncNode").finish_non_exhaustive() + } +} + +impl ZkSyncNode { + pub fn new(resource_provider: R) -> anyhow::Result { + if tokio::runtime::Handle::try_current().is_ok() { + anyhow::bail!( + "Detected a Tokio Runtime. ZkSyncNode manages its own runtime and does not support nested runtimes" + ); + } + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + let (stop_sender, _stop_receiver) = watch::channel(false); + let self_ = Self { + resource_provider: Box::new(resource_provider), + resources: HashMap::default(), + task_builders: Vec::new(), + stop_sender, + runtime, + }; + + Ok(self_) + } + + /// Adds a task to the node. + /// + /// The task is not created at this point, instead, the constructor is stored in the node + /// and will be invoked during [`ZkSyncNode::run`] method. Any error returned by the constructor + /// will prevent the node from starting and will be propagated by the [`ZkSyncNode::run`] method. + pub fn add_task(&mut self, builder: T) -> &mut Self { + self.task_builders.push(Box::new(builder)); + self + } + + /// Runs the system. + pub fn run(mut self) -> anyhow::Result<()> { + // Initialize tasks. + let task_builders = std::mem::take(&mut self.task_builders); + + let mut tasks = Vec::new(); + + let mut errors: Vec<(String, TaskInitError)> = Vec::new(); + + let runtime_handle = self.runtime.handle().clone(); + for task_builder in task_builders { + let name = task_builder.task_name().to_string(); + let task_result = + runtime_handle.block_on(task_builder.create(NodeContext::new(&mut self))); + let task = match task_result { + Ok(task) => task, + Err(err) => { + // We don't want to bail on the first error, since it'll provide worse DevEx: + // People likely want to fix as much problems as they can in one go, rather than have + // to fix them one by one. + errors.push((name, err)); + continue; + } + }; + let after_node_shutdown = task.after_node_shutdown(); + let task_future = Box::pin(task.run(self.stop_receiver())); + let task_repr = TaskRepr { + name, + task: Some(task_future), + after_node_shutdown, + }; + tasks.push(task_repr); + } + + // Report all the errors we've met during the init. + if !errors.is_empty() { + for (task, error) in errors { + tracing::error!("Task {task} can't be initialized: {error}"); + } + anyhow::bail!("One or more task weren't able to start"); + } + + if tasks.is_empty() { + anyhow::bail!("No tasks to run"); + } + + // Wiring is now complete. + for resource in self.resources.values_mut() { + resource.stored_resource_wired(); + } + + // Prepare tasks for running. + let rt_handle = self.runtime.handle().clone(); + let join_handles: Vec<_> = tasks + .iter_mut() + .map(|task| { + let task = task.task.take().expect( + "Tasks are created by the node and must be Some prior to calling this method", + ); + rt_handle.spawn(task).fuse() + }) + .collect(); + + // Run the tasks until one of them exits. + // TODO (QIT-24): wrap every task into a timeout to prevent hanging. + let (resolved, idx, remaining) = self + .runtime + .block_on(futures::future::select_all(join_handles)); + let task_name = tasks[idx].name.clone(); + let failure = match resolved { + Ok(Ok(())) => { + tracing::info!("Task {task_name} completed"); + false + } + Ok(Err(err)) => { + tracing::error!("Task {task_name} exited with an error: {err}"); + true + } + Err(_) => { + tracing::error!("Task {task_name} panicked"); + true + } + }; + + // Send stop signal to remaining tasks and wait for them to finish. + // Given that we are shutting down, we do not really care about returned values. + self.stop_sender.send(true).ok(); + self.runtime.block_on(futures::future::join_all(remaining)); + + // Call after_node_shutdown hooks. + let local_set = tokio::task::LocalSet::new(); + let join_handles = tasks.iter_mut().filter_map(|task| { + task.after_node_shutdown + .take() + .map(|task| local_set.spawn_local(task)) + }); + local_set.block_on(&self.runtime, futures::future::join_all(join_handles)); + + if failure { + anyhow::bail!("Task {task_name} failed"); + } else { + Ok(()) + } + } + + pub(crate) fn stop_receiver(&self) -> StopReceiver { + StopReceiver(self.stop_sender.subscribe()) + } +} + +struct TaskRepr { + name: String, + task: Option>>, + after_node_shutdown: Option>, +} + +impl fmt::Debug for TaskRepr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TaskRepr") + .field("name", &self.name) + .finish_non_exhaustive() + } +} diff --git a/core/lib/node/src/node/stop_receiver.rs b/core/lib/node/src/node/stop_receiver.rs new file mode 100644 index 000000000000..7a181b49a80d --- /dev/null +++ b/core/lib/node/src/node/stop_receiver.rs @@ -0,0 +1,16 @@ +use tokio::sync::watch; + +/// Represents a receiver for the stop signal. +/// This signal is sent when the node is shutting down. +/// Every task is expected to listen to this signal and stop its execution when it is received. +/// +/// This structure exists as a first-class entity instead of being a resource to make it more visible +/// and prevent tasks from hanging by accident. +#[derive(Debug, Clone)] +pub struct StopReceiver(pub watch::Receiver); + +impl StopReceiver { + pub fn new(receiver: watch::Receiver) -> Self { + Self(receiver) + } +} diff --git a/core/lib/node/src/resource/lazy_resource.rs b/core/lib/node/src/resource/lazy_resource.rs new file mode 100644 index 000000000000..3ec6e3cdb0bd --- /dev/null +++ b/core/lib/node/src/resource/lazy_resource.rs @@ -0,0 +1,92 @@ +use std::sync::Arc; + +use thiserror::Error; +use tokio::sync::watch; + +use super::{Resource, ResourceId}; +use crate::node::StopReceiver; + +/// A lazy resource represents a resource that isn't available at the time when the tasks start. +/// +/// Normally it's used to represent the resources that should be provided by one task to another one. +/// Lazy resources are aware of the node lifecycle, so attempt to resolve the resource won't hang +/// if the resource is never provided: the resolve future will fail once the stop signal is sent by the node. +#[derive(Debug)] +pub struct LazyResource { + resolve_sender: Arc>>, + stop_receiver: StopReceiver, +} + +impl Resource for LazyResource { + fn resource_id() -> ResourceId { + ResourceId::new("lazy") + T::resource_id() + } +} + +impl Clone for LazyResource { + fn clone(&self) -> Self { + Self { + resolve_sender: self.resolve_sender.clone(), + stop_receiver: self.stop_receiver.clone(), + } + } +} + +impl LazyResource { + /// Creates a new lazy resource. + /// Provided stop receiver will be used to prevent resolving from hanging if the resource is never provided. + pub fn new(stop_receiver: StopReceiver) -> Self { + let (resolve_sender, _resolve_receiver) = watch::channel(None); + + Self { + resolve_sender: Arc::new(resolve_sender), + stop_receiver, + } + } + + /// Returns a future that resolves to the resource once it is provided. + /// If the resource is never provided, the method will return an error once the node is shutting down. + pub async fn resolve(mut self) -> Result { + let mut resolve_receiver = self.resolve_sender.subscribe(); + if let Some(resource) = resolve_receiver.borrow().as_ref() { + return Ok(resource.clone()); + } + + tokio::select! { + _ = self.stop_receiver.0.changed() => { + Err(LazyResourceError::NodeShutdown) + } + _ = resolve_receiver.changed() => { + // ^ we can ignore the error on `changed`, since we hold a strong reference to the sender. + let resource = resolve_receiver.borrow().as_ref().expect("Can only change if provided").clone(); + Ok(resource) + } + } + } + + /// Provides the resource. + /// May be called at most once. Subsequent calls will return an error. + pub async fn provide(&mut self, resource: T) -> Result<(), LazyResourceError> { + let sent = self.resolve_sender.send_if_modified(|current| { + if current.is_some() { + return false; + } + *current = Some(resource.clone()); + true + }); + + if !sent { + return Err(LazyResourceError::ResourceAlreadyProvided); + } + + Ok(()) + } +} + +#[derive(Debug, Error)] +pub enum LazyResourceError { + #[error("Node is shutting down")] + NodeShutdown, + #[error("Resource is already provided")] + ResourceAlreadyProvided, +} diff --git a/core/lib/node/src/resource/mod.rs b/core/lib/node/src/resource/mod.rs new file mode 100644 index 000000000000..2ed10d56416d --- /dev/null +++ b/core/lib/node/src/resource/mod.rs @@ -0,0 +1,83 @@ +use std::{any::TypeId, fmt}; + +pub use self::{ + lazy_resource::LazyResource, resource_collection::ResourceCollection, resource_id::ResourceId, +}; + +mod lazy_resource; +mod resource_collection; +mod resource_id; + +/// A trait for anything that can be stored (and retrieved) as a resource. +/// Typically, the type that implements this trait also should implement `Clone` +/// since the same resource may be requested by several tasks and thus it would be an additional +/// bound on most methods that work with [`Resource`]. +pub trait Resource: 'static + Send + Sync + std::any::Any { + /// Unique identifier of the resource. + /// Used to fetch the resource from the provider. + /// + /// It is recommended to name resources in form of `/`, where `` is the name of the task + /// that will use this resource, or 'common' in case it is used by several tasks, and `` is the name + /// of the resource itself. + fn resource_id() -> ResourceId; + + fn on_resource_wired(&mut self) {} +} + +/// Internal, object-safe version of [`Resource`]. +/// Used to store resources in the node without knowing their exact type. +/// +/// This trait is implemented for any type that implements [`Resource`], so there is no need to +/// implement it manually. +pub trait StoredResource: 'static + std::any::Any + Send + Sync { + /// An object-safe version of [`Resource::resource_id`]. + fn stored_resource_id(&self) -> ResourceId; + + /// An object-safe version of [`Resource::on_resoure_wired`]. + fn stored_resource_wired(&mut self); +} + +impl StoredResource for T { + fn stored_resource_id(&self) -> ResourceId { + T::resource_id() + } + + fn stored_resource_wired(&mut self) { + Resource::on_resource_wired(self); + } +} + +impl dyn StoredResource { + /// Reimplementation of `Any::downcast_ref`. + /// Returns `Some` if the type is correct, and `None` otherwise. + // Note: This method is required as we cannot store objects as, for example, `dyn StoredResource + Any`, + // so we don't have access to `Any::downcast_ref` within the node. + pub(crate) fn downcast_ref(&self) -> Option<&T> { + if self.type_id() == TypeId::of::() { + // SAFETY: We just checked that the type is correct. + unsafe { Some(&*(self as *const dyn StoredResource as *const T)) } + } else { + None + } + } +} + +/// An entity that knows how to initialize resources. +/// +/// It exists to simplify the initialization process, as both tasks and *resources* can depend on other resources, +/// and by having an entity that can initialize the resource on demand we can avoid the need to provide resources +/// in any particular order. +/// +/// Node will only call `get_resource` method once per resource, and will cache the result. This guarantees that +/// all the resource consumers will interact with the same resource instance, which may be important for getting +/// the consistent state (e.g. to make sure that L1 gas price is the same for all the tasks). +#[async_trait::async_trait] +pub trait ResourceProvider: 'static + Send + Sync + fmt::Debug { + /// Returns a resource with the given name. + /// + /// In case it isn't possible to obtain the resource (for example, if some error occurred during initialization), + /// the provider is free to either return `None` (if it assumes that the node can continue without this resource), + /// or to panic. + // Note: we have to use `Box` here, since we can't use `Box` due to it not being object-safe. + async fn get_resource(&self, resource: &ResourceId) -> Option>; +} diff --git a/core/lib/node/src/resource/resource_collection.rs b/core/lib/node/src/resource/resource_collection.rs new file mode 100644 index 000000000000..05c12ad40662 --- /dev/null +++ b/core/lib/node/src/resource/resource_collection.rs @@ -0,0 +1,106 @@ +use std::{ + fmt, + sync::{Arc, Mutex}, +}; + +use thiserror::Error; +use tokio::sync::watch; + +use super::{Resource, ResourceId}; + +/// Collection of resources that can be extended during the initialization phase, and then resolved once +/// the wiring is complete. +/// +/// During component initialization, resource collections can be requested by the components in order to push new +/// elements there. Once the initialization is complete, it is no longer possible to push new elements, and the +/// collection can be resolved into a vector of resources. +/// +/// Collections implement `Clone`, so they can be consumed by several tasks. Every task that resolves the collection +/// is guaranteed to have the same set of resources. +/// +/// The purpose of this container is to allow different tasks to register their resource in a single place for some +/// other task to consume. For example, tasks may register their healthchecks, and then healthcheck task will observe +/// all the provided healthchecks. +pub struct ResourceCollection { + /// Collection of the resources. + resources: Arc>>, + /// Sender indicating that the wiring is complete. + wiring_complete_sender: Arc>, + /// Flag indicating that the collection has been resolved. + wired: watch::Receiver, +} + +impl Resource for ResourceCollection { + fn resource_id() -> ResourceId { + ResourceId::new("collection") + T::resource_id() + } + + fn on_resource_wired(&mut self) { + self.wiring_complete_sender.send(true).ok(); + } +} + +impl Default for ResourceCollection { + fn default() -> Self { + Self::new() + } +} + +impl Clone for ResourceCollection { + fn clone(&self) -> Self { + Self { + resources: self.resources.clone(), + wiring_complete_sender: self.wiring_complete_sender.clone(), + wired: self.wired.clone(), + } + } +} + +impl fmt::Debug for ResourceCollection { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ResourceCollection") + .field("resources", &"{..}") + .finish_non_exhaustive() + } +} + +#[derive(Debug, Error)] +pub enum ResourceCollectionError { + #[error("Adding resources to the collection is not allowed after the wiring is complete")] + AlreadyWired, +} + +impl ResourceCollection { + pub(crate) fn new() -> Self { + let (wiring_complete_sender, wired) = watch::channel(false); + Self { + resources: Arc::default(), + wiring_complete_sender: Arc::new(wiring_complete_sender), + wired, + } + } + + /// Adds a new element to the resource collection. + /// Returns an error if the wiring is already complete. + pub fn push(&self, resource: T) -> Result<(), ResourceCollectionError> { + // This check is sufficient, since no task is guaranteed to be running when the value changes. + if *self.wired.borrow() { + return Err(ResourceCollectionError::AlreadyWired); + } + + let mut handle = self.resources.lock().unwrap(); + handle.push(resource); + Ok(()) + } + + /// Waits until the wiring is complete, and resolves the collection into a vector of resources. + pub async fn resolve(mut self) -> Vec { + // Guaranteed not to hang on server shutdown, since the node will invoke the `on_wiring_complete` before any task + // is actually spawned (per framework rules). For most cases, this check will resolve immediately, unless + // some tasks would spawn something from the `IntoZkSyncTask` impl. + self.wired.changed().await.expect("Sender can't be dropped"); + + let handle = self.resources.lock().unwrap(); + (*handle).clone() + } +} diff --git a/core/lib/node/src/resource/resource_id.rs b/core/lib/node/src/resource/resource_id.rs new file mode 100644 index 000000000000..73f7f43c58a4 --- /dev/null +++ b/core/lib/node/src/resource/resource_id.rs @@ -0,0 +1,47 @@ +use std::{ + fmt, + ops::{Add, AddAssign}, +}; + +/// A unique identifier of the resource. +/// Typically, represented as a path-like string, e.g. `common/master_pool`. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct ResourceId { + /// Path-like representation of the resource identifier. + /// Represented as a `Vec` for ID composability (e.g. collection IDs can be defined as + /// `ResourceId::from("collection") + Resource::resource_id()`). + id: Vec<&'static str>, +} + +impl ResourceId { + pub fn new(id: &'static str) -> Self { + Self { id: vec![id] } + } +} + +impl Add for ResourceId { + type Output = Self; + + fn add(mut self, rhs: ResourceId) -> Self::Output { + self.id.extend(rhs.id); + self + } +} + +impl AddAssign for ResourceId { + fn add_assign(&mut self, rhs: ResourceId) { + self.id.extend(rhs.id); + } +} + +impl From<&'static str> for ResourceId { + fn from(id: &'static str) -> Self { + Self { id: vec![id] } + } +} + +impl fmt::Display for ResourceId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.id.join("/")) + } +} diff --git a/core/lib/node/src/task/mod.rs b/core/lib/node/src/task/mod.rs new file mode 100644 index 000000000000..43f332a0f4d9 --- /dev/null +++ b/core/lib/node/src/task/mod.rs @@ -0,0 +1,70 @@ +//! Tasks define the "runnable" concept of the node, e.g. something that can be launched and runs until the node +//! is stopped. +//! +//! Task is normally defined by two types, one implementing two traits [`IntoZkSyncTask`], which acts like a +//! constructor, and another one, which implements [`ZkSyncTask`], providing an interface for `ZkSyncNode` to +//! implement the task lifecycle. + +use futures::future::BoxFuture; + +use crate::{ + node::{NodeContext, StopReceiver}, + resource::ResourceId, +}; + +/// Factory that can create a task. +#[async_trait::async_trait] +pub trait IntoZkSyncTask: 'static + Send + Sync { + /// Unique name of the task. + fn task_name(&self) -> &'static str; + + /// Creates a new task. + /// + /// `NodeContext` provides an interface to the utilities that task may need, e.g. ability to get resources + /// or spawn additional tasks. + async fn create( + self: Box, + node: NodeContext<'_>, + ) -> Result, TaskInitError>; +} + +/// A task implementation. +#[async_trait::async_trait] +pub trait ZkSyncTask: 'static + Send + Sync { + /// Runs the task. + /// + /// Once any of the task returns, the node will shutdown. + /// If the task returns an error, the node will spawn an error-level log message and will return a non-zero + /// exit code. + /// + /// `stop_receiver` argument contains a channel receiver that will change its value once the node requests + /// a shutdown. Every task is expected to either await or periodically check the state of channel and stop + /// its execution once the channel is changed. + /// + /// Each task is expected to perform the required cleanup after receiving the stop signal. + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()>; + + /// Asynchronous hook that will be called after *each task* has finished their cleanup. + /// It is guaranteed that no other task is running at this point, e.g. `ZkSyncNode` will invoke + /// this hook sequentially for each task. + /// + /// This hook can be used to perform some cleanup that assumes exclusive access to the resources, e.g. + /// to rollback some state. + /// + /// *Note*: This hook **should not** be used to perform trivial task cleanup, e.g. to wait for the spawned + /// server to stop. By the time this hook is invoked, every component of the node is expected to stop. Not + /// following this rule may cause the tasks that properly implement this hook to malfunction. + fn after_node_shutdown(&self) -> Option> { + None + } +} + +/// An error that can occur during the task initialization. +#[derive(thiserror::Error, Debug)] +#[non_exhaustive] +pub enum TaskInitError { + #[error("Resource {0} is not provided")] + ResourceLacking(ResourceId), + #[error(transparent)] + Internal(#[from] anyhow::Error), +} diff --git a/core/lib/zksync_core/src/api_server/healthcheck.rs b/core/lib/zksync_core/src/api_server/healthcheck.rs index 7010d29fb4b5..cbf8c9d6faf4 100644 --- a/core/lib/zksync_core/src/api_server/healthcheck.rs +++ b/core/lib/zksync_core/src/api_server/healthcheck.rs @@ -4,9 +4,9 @@ use axum::{extract::State, http::StatusCode, routing::get, Json, Router}; use tokio::sync::watch; use zksync_health_check::{AppHealth, CheckHealth}; -type SharedHealthchecks = Arc<[Box]>; - -async fn check_health(health_checks: State) -> (StatusCode, Json) { +async fn check_health>( + health_checks: State>, +) -> (StatusCode, Json) { let response = AppHealth::new(&health_checks).await; let response_code = if response.is_ready() { StatusCode::OK @@ -16,14 +16,16 @@ async fn check_health(health_checks: State) -> (StatusCode, (response_code, Json(response)) } -async fn run_server( +async fn run_server( bind_address: &SocketAddr, - health_checks: Vec>, + health_checks: Vec, mut stop_receiver: watch::Receiver, -) { +) where + T: AsRef + Send + Sync + 'static, +{ let mut health_check_names = HashSet::with_capacity(health_checks.len()); for check in &health_checks { - let health_check_name = check.name(); + let health_check_name = check.as_ref().name(); if !health_check_names.insert(health_check_name) { tracing::warn!( "Health check with name `{health_check_name}` is defined multiple times; only the last mention \ @@ -35,7 +37,7 @@ async fn run_server( "Starting healthcheck server with checks {health_check_names:?} on {bind_address}" ); - let health_checks = SharedHealthchecks::from(health_checks); + let health_checks = Arc::from(health_checks); let app = Router::new() .route("/health", get(check_health)) .with_state(health_checks); @@ -60,7 +62,10 @@ pub struct HealthCheckHandle { } impl HealthCheckHandle { - pub fn spawn_server(addr: SocketAddr, healthchecks: Vec>) -> Self { + pub fn spawn_server(addr: SocketAddr, healthchecks: Vec) -> Self + where + T: AsRef + Send + Sync + 'static, + { let (stop_sender, stop_receiver) = watch::channel(false); let server = tokio::spawn(async move { run_server(&addr, healthchecks, stop_receiver).await; diff --git a/core/lib/zksync_core/src/metadata_calculator/mod.rs b/core/lib/zksync_core/src/metadata_calculator/mod.rs index e5a93d7d3de2..36c8d7eb5a64 100644 --- a/core/lib/zksync_core/src/metadata_calculator/mod.rs +++ b/core/lib/zksync_core/src/metadata_calculator/mod.rs @@ -61,7 +61,7 @@ pub struct MetadataCalculatorConfig { } impl MetadataCalculatorConfig { - pub(crate) fn for_main_node( + pub fn for_main_node( merkle_tree_config: &MerkleTreeConfig, operation_config: &OperationsManagerConfig, ) -> Self {