From 3e5c528767e907e116e29310460019e2bf9161d1 Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Tue, 23 Jan 2024 16:00:44 +0400 Subject: [PATCH] feat: ZK Stack framework MVP (#880) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ⚠️ Please read the description before reviewing. ⚠️ This is the ZK Stack framework MVP that represents the core functionality of the new framework. A lot of things are not polished yet: it'll need better logging/reporting, interfaces can be better, naming is off at some places (and I haven't decided on the usage of ZK Stack / zkSync there), we need more tests and examples, no README.md, etc. Additionally, only a few resources and tasks are implemented right now. The reasons for that: - Adapting the existing code to get a single interface takes time, as most of the tasks/resources were created in an ad-hoc manner. - I wanted to focus on the framework functionality here, the implementations may come later as separate PRs. - I didn't want the diff to be big. In the current form, the framework contains a showcase of how interacting with it looks like, and generally, it seems to work fine. For more details on motivation and goals, please read the spec. With that in mind, *any* feedback is appreciated, but I don't guarantee that I'll proceed it in this PR. Here I'll focus on making sure that the framework works and there is no major disagreement w.r.t. its design, and then there will be N follow-up PRs bringing the missing functionality and polishing the code. --- Cargo.lock | 22 ++ Cargo.toml | 1 + checks-config/era.dic | 8 + core/lib/dal/src/connection/mod.rs | 10 + core/lib/health_check/src/lib.rs | 9 +- core/lib/node/Cargo.toml | 31 +++ core/lib/node/examples/main_node.rs | 84 +++++++ core/lib/node/src/implementations/mod.rs | 6 + .../implementations/resource/healthcheck.rs | 36 +++ .../node/src/implementations/resource/mod.rs | 3 + .../implementations/resource/object_store.rs | 15 ++ .../src/implementations/resource/pools.rs | 75 +++++++ .../implementations/task/healtcheck_server.rs | 67 ++++++ .../task/metadata_calculator.rs | 81 +++++++ core/lib/node/src/implementations/task/mod.rs | 3 + .../task/prometheus_exporter.rs | 58 +++++ core/lib/node/src/lib.rs | 24 ++ core/lib/node/src/node/context.rs | 101 +++++++++ core/lib/node/src/node/mod.rs | 207 ++++++++++++++++++ core/lib/node/src/node/stop_receiver.rs | 16 ++ core/lib/node/src/resource/lazy_resource.rs | 92 ++++++++ core/lib/node/src/resource/mod.rs | 83 +++++++ .../node/src/resource/resource_collection.rs | 106 +++++++++ core/lib/node/src/resource/resource_id.rs | 47 ++++ core/lib/node/src/task/mod.rs | 70 ++++++ .../zksync_core/src/api_server/healthcheck.rs | 23 +- .../src/metadata_calculator/mod.rs | 2 +- 27 files changed, 1267 insertions(+), 13 deletions(-) create mode 100644 core/lib/node/Cargo.toml create mode 100644 core/lib/node/examples/main_node.rs create mode 100644 core/lib/node/src/implementations/mod.rs create mode 100644 core/lib/node/src/implementations/resource/healthcheck.rs create mode 100644 core/lib/node/src/implementations/resource/mod.rs create mode 100644 core/lib/node/src/implementations/resource/object_store.rs create mode 100644 core/lib/node/src/implementations/resource/pools.rs create mode 100644 core/lib/node/src/implementations/task/healtcheck_server.rs create mode 100644 core/lib/node/src/implementations/task/metadata_calculator.rs create mode 100644 core/lib/node/src/implementations/task/mod.rs create mode 100644 core/lib/node/src/implementations/task/prometheus_exporter.rs create mode 100644 core/lib/node/src/lib.rs create mode 100644 core/lib/node/src/node/context.rs create mode 100644 core/lib/node/src/node/mod.rs create mode 100644 core/lib/node/src/node/stop_receiver.rs create mode 100644 core/lib/node/src/resource/lazy_resource.rs create mode 100644 core/lib/node/src/resource/mod.rs create mode 100644 core/lib/node/src/resource/resource_collection.rs create mode 100644 core/lib/node/src/resource/resource_id.rs create mode 100644 core/lib/node/src/task/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 073507145178..fa92d3a3cf92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9247,6 +9247,28 @@ dependencies = [ "zksync_crypto", ] +[[package]] +name = "zksync_node" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "futures 0.3.28", + "prometheus_exporter", + "thiserror", + "tokio", + "tracing", + "vlog", + "zksync_config", + "zksync_core", + "zksync_dal", + "zksync_env_config", + "zksync_health_check", + "zksync_object_store", + "zksync_storage", + "zksync_types", +] + [[package]] name = "zksync_object_store" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 3487716a8d3e..1f8f52a7991a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ members = [ "core/lib/mempool", "core/lib/merkle_tree", "core/lib/mini_merkle_tree", + "core/lib/node", "core/lib/object_store", "core/lib/prometheus_exporter", "core/lib/queued_job_processor", diff --git a/checks-config/era.dic b/checks-config/era.dic index f0ec14591b1f..15f14a6e5f8c 100644 --- a/checks-config/era.dic +++ b/checks-config/era.dic @@ -881,3 +881,11 @@ plookup shivini EIP4844 KZG +healthcheck +healthchecks +after_node_shutdown +runnable +downcasting +parameterized +reimplementation +composability diff --git a/core/lib/dal/src/connection/mod.rs b/core/lib/dal/src/connection/mod.rs index dba7098174e5..c689ddf8bc18 100644 --- a/core/lib/dal/src/connection/mod.rs +++ b/core/lib/dal/src/connection/mod.rs @@ -11,6 +11,7 @@ use crate::{metrics::CONNECTION_METRICS, StorageProcessor}; pub mod holder; /// Builder for [`ConnectionPool`]s. +#[derive(Clone)] pub struct ConnectionPoolBuilder { database_url: String, max_size: u32, @@ -65,6 +66,15 @@ impl ConnectionPoolBuilder { max_size: self.max_size, }) } + + /// Builds a connection pool that has a single connection. + pub async fn build_singleton(&self) -> anyhow::Result { + let singleton_builder = Self { + max_size: 1, + ..self.clone() + }; + singleton_builder.build().await + } } #[derive(Clone)] diff --git a/core/lib/health_check/src/lib.rs b/core/lib/health_check/src/lib.rs index 12bb292bc850..15a0d2945493 100644 --- a/core/lib/health_check/src/lib.rs +++ b/core/lib/health_check/src/lib.rs @@ -80,10 +80,13 @@ pub struct AppHealth { impl AppHealth { /// Aggregates health info from the provided checks. - pub async fn new(health_checks: &[Box]) -> Self { + pub async fn new>(health_checks: &[T]) -> Self { let check_futures = health_checks.iter().map(|check| { - let check_name = check.name(); - check.check_health().map(move |health| (check_name, health)) + let check_name = check.as_ref().name(); + check + .as_ref() + .check_health() + .map(move |health| (check_name, health)) }); let components: HashMap<_, _> = future::join_all(check_futures).await.into_iter().collect(); diff --git a/core/lib/node/Cargo.toml b/core/lib/node/Cargo.toml new file mode 100644 index 000000000000..bda2315f5361 --- /dev/null +++ b/core/lib/node/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "zksync_node" +version = "0.1.0" +edition = "2018" +authors = ["The Matter Labs Team "] +homepage = "https://zksync.io/" +repository = "https://github.com/matter-labs/zksync-era" +license = "MIT OR Apache-2.0" +keywords = ["blockchain", "zksync"] +categories = ["cryptography"] + +[dependencies] +prometheus_exporter = { path = "../prometheus_exporter" } +zksync_types = { path = "../types" } +zksync_health_check = { path = "../health_check" } +zksync_dal = { path = "../dal" } +zksync_config = { path = "../config" } +zksync_object_store = { path = "../object_store" } +zksync_core = { path = "../zksync_core" } +zksync_storage = { path = "../storage" } + +tracing = "0.1" +thiserror = "1" +async-trait = "0.1" +futures = "0.3" +anyhow = "1" +tokio = { version = "1", features = ["rt"] } + +[dev-dependencies] +zksync_env_config = { path = "../env_config" } +vlog = { path = "../vlog" } diff --git a/core/lib/node/examples/main_node.rs b/core/lib/node/examples/main_node.rs new file mode 100644 index 000000000000..afe64139e0ba --- /dev/null +++ b/core/lib/node/examples/main_node.rs @@ -0,0 +1,84 @@ +//! An incomplete example of how node initialization looks like. +//! This example defines a `ResourceProvider` that works using the main node env config, and +//! initializes a single task with a health check server. + +use zksync_config::{configs::chain::OperationsManagerConfig, DBConfig, PostgresConfig}; +use zksync_core::metadata_calculator::MetadataCalculatorConfig; +use zksync_dal::ConnectionPool; +use zksync_env_config::FromEnv; +use zksync_node::{ + implementations::{ + resource::pools::MasterPoolResource, + task::{ + healtcheck_server::HealthCheckTaskBuilder, + metadata_calculator::MetadataCalculatorTaskBuilder, + }, + }, + node::ZkSyncNode, + resource::{Resource, ResourceId, ResourceProvider, StoredResource}, +}; + +/// Resource provider for the main node. +/// It defines which resources the tasks will receive. This particular provider is stateless, e.g. it always uses +/// the main node env config, and always knows which resources to provide. +/// The resource provider can be dynamic, however. For example, we can define a resource provider which may use +/// different config load scheme (e.g. env variables / protobuf / yaml / toml), and which resources to provide +/// (e.g. decide whether we need MempoolIO or ExternalIO depending on some config). +#[derive(Debug)] +struct MainNodeResourceProvider; + +impl MainNodeResourceProvider { + fn master_pool_resource() -> anyhow::Result { + let config = PostgresConfig::from_env()?; + let mut master_pool = + ConnectionPool::builder(config.master_url()?, config.max_connections()?); + master_pool.set_statement_timeout(config.statement_timeout()); + + Ok(MasterPoolResource::new(master_pool)) + } +} + +#[async_trait::async_trait] +impl ResourceProvider for MainNodeResourceProvider { + async fn get_resource(&self, name: &ResourceId) -> Option> { + match name { + name if name == &MasterPoolResource::resource_id() => { + let resource = + Self::master_pool_resource().expect("Failed to create pools resource"); + Some(Box::new(resource)) + } + _ => None, + } + } +} + +fn main() -> anyhow::Result<()> { + #[allow(deprecated)] // TODO (QIT-21): Use centralized configuration approach. + let log_format = vlog::log_format_from_env(); + let _guard = vlog::ObservabilityBuilder::new() + .with_log_format(log_format) + .build(); + + // Create the node with specified resource provider. We don't need to add any resources explicitly, + // the task will request what they actually need. The benefit here is that we won't instantiate resources + // that are not used, which would be complex otherwise, since the task set is often dynamic. + let mut node = ZkSyncNode::new(MainNodeResourceProvider)?; + + // Add the metadata calculator task. + let merkle_tree_env_config = DBConfig::from_env()?.merkle_tree; + let operations_manager_env_config = OperationsManagerConfig::from_env()?; + let metadata_calculator_config = MetadataCalculatorConfig::for_main_node( + &merkle_tree_env_config, + &operations_manager_env_config, + ); + node.add_task(MetadataCalculatorTaskBuilder(metadata_calculator_config)); + + // Add the healthcheck server. + let healthcheck_config = zksync_config::ApiConfig::from_env()?.healthcheck; + node.add_task(HealthCheckTaskBuilder(healthcheck_config)); + + // Run the node until completion. + node.run()?; + + Ok(()) +} diff --git a/core/lib/node/src/implementations/mod.rs b/core/lib/node/src/implementations/mod.rs new file mode 100644 index 000000000000..b381595d68f9 --- /dev/null +++ b/core/lib/node/src/implementations/mod.rs @@ -0,0 +1,6 @@ +//! Implementations of resources and tasks. +//! These are temporarily provided by the framework crate itself, but will be moved to the separate crates +//! in the future. + +pub mod resource; +pub mod task; diff --git a/core/lib/node/src/implementations/resource/healthcheck.rs b/core/lib/node/src/implementations/resource/healthcheck.rs new file mode 100644 index 000000000000..5b752e33d59b --- /dev/null +++ b/core/lib/node/src/implementations/resource/healthcheck.rs @@ -0,0 +1,36 @@ +use std::{fmt, sync::Arc}; + +// Public re-exports from external crate to minimize the required dependencies. +pub use zksync_health_check::{CheckHealth, ReactiveHealthCheck}; + +use crate::resource::Resource; + +/// Wrapper for a generic health check. +#[derive(Clone)] +pub struct HealthCheckResource(Arc); + +impl HealthCheckResource { + pub fn new(check: impl CheckHealth + 'static) -> Self { + Self(Arc::new(check)) + } +} + +impl fmt::Debug for HealthCheckResource { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("HealthCheckResource") + .field("name", &self.0.name()) + .finish_non_exhaustive() + } +} + +impl Resource for HealthCheckResource { + fn resource_id() -> crate::resource::ResourceId { + "common/health_check".into() + } +} + +impl AsRef for HealthCheckResource { + fn as_ref(&self) -> &dyn CheckHealth { + self.0.as_ref() + } +} diff --git a/core/lib/node/src/implementations/resource/mod.rs b/core/lib/node/src/implementations/resource/mod.rs new file mode 100644 index 000000000000..7042d58ef59e --- /dev/null +++ b/core/lib/node/src/implementations/resource/mod.rs @@ -0,0 +1,3 @@ +pub mod healthcheck; +pub mod object_store; +pub mod pools; diff --git a/core/lib/node/src/implementations/resource/object_store.rs b/core/lib/node/src/implementations/resource/object_store.rs new file mode 100644 index 000000000000..093061a9b2d4 --- /dev/null +++ b/core/lib/node/src/implementations/resource/object_store.rs @@ -0,0 +1,15 @@ +use std::sync::Arc; + +use zksync_object_store::ObjectStore; + +use crate::resource::Resource; + +/// Wrapper for the object store. +#[derive(Debug, Clone)] +pub struct ObjectStoreResource(pub Arc); + +impl Resource for ObjectStoreResource { + fn resource_id() -> crate::resource::ResourceId { + "common/object_store".into() + } +} diff --git a/core/lib/node/src/implementations/resource/pools.rs b/core/lib/node/src/implementations/resource/pools.rs new file mode 100644 index 000000000000..c2194bddc72e --- /dev/null +++ b/core/lib/node/src/implementations/resource/pools.rs @@ -0,0 +1,75 @@ +use zksync_dal::{connection::ConnectionPoolBuilder, ConnectionPool}; + +use crate::resource::Resource; + +/// Represents a connection pool to the master database. +#[derive(Debug, Clone)] +pub struct MasterPoolResource(ConnectionPoolBuilder); + +impl Resource for MasterPoolResource { + fn resource_id() -> crate::resource::ResourceId { + "common/master_pool".into() + } +} + +impl MasterPoolResource { + pub fn new(builder: ConnectionPoolBuilder) -> Self { + Self(builder) + } + + pub async fn get(&self) -> anyhow::Result { + self.0.build().await + } + + pub async fn get_singleton(&self) -> anyhow::Result { + self.0.build_singleton().await + } +} + +/// Represents a connection pool to the replica database. +#[derive(Debug, Clone)] +pub struct ReplicaPoolResource(ConnectionPoolBuilder); + +impl Resource for ReplicaPoolResource { + fn resource_id() -> crate::resource::ResourceId { + "common/replica_pool".into() + } +} + +impl ReplicaPoolResource { + pub fn new(builder: ConnectionPoolBuilder) -> Self { + Self(builder) + } + + pub async fn get(&self) -> anyhow::Result { + self.0.build().await + } + + pub async fn get_singleton(&self) -> anyhow::Result { + self.0.build_singleton().await + } +} + +/// Represents a connection pool to the prover database. +#[derive(Debug, Clone)] +pub struct ProverPoolResource(ConnectionPoolBuilder); + +impl Resource for ProverPoolResource { + fn resource_id() -> crate::resource::ResourceId { + "common/prover_pool".into() + } +} + +impl ProverPoolResource { + pub fn new(builder: ConnectionPoolBuilder) -> Self { + Self(builder) + } + + pub async fn get(&self) -> anyhow::Result { + self.0.build().await + } + + pub async fn get_singleton(&self) -> anyhow::Result { + self.0.build_singleton().await + } +} diff --git a/core/lib/node/src/implementations/task/healtcheck_server.rs b/core/lib/node/src/implementations/task/healtcheck_server.rs new file mode 100644 index 000000000000..6350196da40e --- /dev/null +++ b/core/lib/node/src/implementations/task/healtcheck_server.rs @@ -0,0 +1,67 @@ +use std::fmt; + +use zksync_config::configs::api::HealthCheckConfig; +use zksync_core::api_server::healthcheck::HealthCheckHandle; + +use crate::{ + implementations::resource::healthcheck::HealthCheckResource, + node::{NodeContext, StopReceiver}, + resource::ResourceCollection, + task::{IntoZkSyncTask, TaskInitError, ZkSyncTask}, +}; + +/// Builder for a health check server. +/// +/// Spawned task collects all the health checks added by different tasks to the +/// corresponding resource collection and spawns an HTTP server exposing them. +#[derive(Debug)] +pub struct HealthCheckTaskBuilder(pub HealthCheckConfig); + +#[async_trait::async_trait] +impl IntoZkSyncTask for HealthCheckTaskBuilder { + fn task_name(&self) -> &'static str { + "healthcheck_server" + } + + async fn create( + self: Box, + mut node: NodeContext<'_>, + ) -> Result, TaskInitError> { + let healthchecks = node + .get_resource_or_default::>() + .await; + + let task = HealthCheckTask { + config: self.0, + healthchecks, + }; + + Ok(Box::new(task)) + } +} + +struct HealthCheckTask { + config: HealthCheckConfig, + healthchecks: ResourceCollection, +} + +impl fmt::Debug for HealthCheckTask { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("HealthCheckTask") + .field("config", &self.config) + .finish_non_exhaustive() + } +} + +#[async_trait::async_trait] +impl ZkSyncTask for HealthCheckTask { + async fn run(mut self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { + let healthchecks = self.healthchecks.resolve().await; + + let handle = HealthCheckHandle::spawn_server(self.config.bind_addr(), healthchecks); + stop_receiver.0.changed().await?; + handle.stop().await; + + Ok(()) + } +} diff --git a/core/lib/node/src/implementations/task/metadata_calculator.rs b/core/lib/node/src/implementations/task/metadata_calculator.rs new file mode 100644 index 000000000000..4bfe18167b40 --- /dev/null +++ b/core/lib/node/src/implementations/task/metadata_calculator.rs @@ -0,0 +1,81 @@ +use zksync_core::metadata_calculator::{MetadataCalculator, MetadataCalculatorConfig}; +use zksync_dal::ConnectionPool; +use zksync_storage::RocksDB; + +use crate::{ + implementations::resource::{ + healthcheck::HealthCheckResource, object_store::ObjectStoreResource, + pools::MasterPoolResource, + }, + node::{NodeContext, StopReceiver}, + resource::{Resource, ResourceCollection}, + task::{IntoZkSyncTask, TaskInitError, ZkSyncTask}, +}; + +/// Builder for a metadata calculator. +#[derive(Debug)] +pub struct MetadataCalculatorTaskBuilder(pub MetadataCalculatorConfig); + +#[derive(Debug)] +pub struct MetadataCalculatorTask { + metadata_calculator: MetadataCalculator, + main_pool: ConnectionPool, +} + +#[async_trait::async_trait] +impl IntoZkSyncTask for MetadataCalculatorTaskBuilder { + fn task_name(&self) -> &'static str { + "metadata_calculator" + } + + async fn create( + self: Box, + mut node: NodeContext<'_>, + ) -> Result, TaskInitError> { + let pool = node.get_resource::().await.ok_or( + TaskInitError::ResourceLacking(MasterPoolResource::resource_id()), + )?; + let main_pool = pool.get().await.unwrap(); + let object_store = node.get_resource::().await; // OK to be None. + + if object_store.is_none() { + tracing::info!( + "Object store is not provided, metadata calculator will run without it." + ); + } + + let metadata_calculator = + MetadataCalculator::new(self.0, object_store.map(|os| os.0)).await; + + let healthchecks = node + .get_resource_or_default::>() + .await; + healthchecks + .push(HealthCheckResource::new( + metadata_calculator.tree_health_check(), + )) + .expect("Wiring stage"); + + Ok(Box::new(MetadataCalculatorTask { + metadata_calculator, + main_pool, + })) + } +} + +#[async_trait::async_trait] +impl ZkSyncTask for MetadataCalculatorTask { + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + let result = self + .metadata_calculator + .run(self.main_pool, stop_receiver.0) + .await; + + // Wait for all the instances of RocksDB to be destroyed. + tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination) + .await + .unwrap(); + + result + } +} diff --git a/core/lib/node/src/implementations/task/mod.rs b/core/lib/node/src/implementations/task/mod.rs new file mode 100644 index 000000000000..d56e496cfb01 --- /dev/null +++ b/core/lib/node/src/implementations/task/mod.rs @@ -0,0 +1,3 @@ +pub mod healtcheck_server; +pub mod metadata_calculator; +pub mod prometheus_exporter; diff --git a/core/lib/node/src/implementations/task/prometheus_exporter.rs b/core/lib/node/src/implementations/task/prometheus_exporter.rs new file mode 100644 index 000000000000..17923efde95b --- /dev/null +++ b/core/lib/node/src/implementations/task/prometheus_exporter.rs @@ -0,0 +1,58 @@ +use prometheus_exporter::PrometheusExporterConfig; +use zksync_health_check::{HealthStatus, HealthUpdater, ReactiveHealthCheck}; + +use crate::{ + implementations::resource::healthcheck::HealthCheckResource, + node::{NodeContext, StopReceiver}, + resource::ResourceCollection, + task::{IntoZkSyncTask, TaskInitError, ZkSyncTask}, +}; + +/// Builder for a prometheus exporter. +#[derive(Debug)] +pub struct PrometheusExporterTaskBuilder(pub PrometheusExporterConfig); + +#[derive(Debug)] +pub struct PrometheusExporterTask { + config: PrometheusExporterConfig, + prometheus_health_updater: HealthUpdater, +} + +#[async_trait::async_trait] +impl IntoZkSyncTask for PrometheusExporterTaskBuilder { + fn task_name(&self) -> &'static str { + "prometheus_exporter" + } + + async fn create( + self: Box, + mut node: NodeContext<'_>, + ) -> Result, TaskInitError> { + let (prometheus_health_check, prometheus_health_updater) = + ReactiveHealthCheck::new("prometheus_exporter"); + + let healthchecks = node + .get_resource_or_default::>() + .await; + healthchecks + .push(HealthCheckResource::new(prometheus_health_check)) + .expect("Wiring stage"); + + Ok(Box::new(PrometheusExporterTask { + config: self.0, + prometheus_health_updater, + })) + } +} + +#[async_trait::async_trait] +impl ZkSyncTask for PrometheusExporterTask { + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + let prometheus_task = self.config.run(stop_receiver.0); + self.prometheus_health_updater + .update(HealthStatus::Ready.into()); + let res = prometheus_task.await; + drop(self.prometheus_health_updater); + res + } +} diff --git a/core/lib/node/src/lib.rs b/core/lib/node/src/lib.rs new file mode 100644 index 000000000000..c714f998a12a --- /dev/null +++ b/core/lib/node/src/lib.rs @@ -0,0 +1,24 @@ +//! # ZK Stack node initialization framework. +//! +//! ## Introduction +//! +//! This crate provides core abstractions that allow one to compose a ZK Stack node. +//! Main concepts used in this crate are: +//! - [`IntoZkSyncTask`](task::IntoZkSyncTask) - builder interface for tasks. +//! - [`ZkSyncTask`](task::ZkSyncTask) - a unit of work that can be executed by the node. +//! - [`Resource`](resource::Resource) - a piece of logic that can be shared between tasks. Most resources are +//! represented by generic interfaces and also serve as points of customization for tasks. +//! - [`ResourceProvider`](resource::ResourceProvider) - a trait that allows one to provide resources to the node. +//! - [`ZkSyncNode`](node::ZkSyncNode) - a container for tasks and resources that takes care of initialization, running +//! and shutting down. +//! +//! The general flow to compose a node is as follows: +//! - Create a [`ResourceProvider`](resource::ResourceProvider) that can provide all the resources that the node needs. +//! - Create a [`ZkSyncNode`](node::ZkSyncNode) with that [`ResourceProvider`](resource::ResourceProvider). +//! - Add tasks to the node. +//! - Run it. + +pub mod implementations; +pub mod node; +pub mod resource; +pub mod task; diff --git a/core/lib/node/src/node/context.rs b/core/lib/node/src/node/context.rs new file mode 100644 index 000000000000..897bc1c967ef --- /dev/null +++ b/core/lib/node/src/node/context.rs @@ -0,0 +1,101 @@ +use crate::{ + node::ZkSyncNode, + resource::{Resource, StoredResource}, + task::IntoZkSyncTask, +}; + +/// An interface to the node's resources provided to the tasks during initialization. +/// Provides the ability to fetch required resources, and also gives access to the Tokio runtime handle used by the node. +#[derive(Debug)] +pub struct NodeContext<'a> { + node: &'a mut ZkSyncNode, +} + +impl<'a> NodeContext<'a> { + pub(super) fn new(node: &'a mut ZkSyncNode) -> Self { + Self { node } + } + + /// Provides access to the runtime used by the node. + /// Can be used to spawn additional tasks within the same runtime. + /// If some tasks stores the handle to spawn additional tasks, it is expected to do all the required + /// cleanup. + /// + /// In most cases, however, it is recommended to use [`add_task`] method instead. + pub fn runtime_handle(&self) -> &tokio::runtime::Handle { + self.node.runtime.handle() + } + + /// Adds an additional task to the node. + /// This may be used if some task or its resource requires an additional routine for maintenance. + pub fn add_task(&mut self, builder: T) -> &mut Self { + self.node.add_task(builder); + self + } + + /// Attempts to retrieve the resource with the specified name. + /// Internally the resources are stored as [`std::any::Any`], and this method does the downcasting + /// on behalf of the caller. + /// + /// ## Panics + /// + /// Panics if the resource with the specified name exists, but is not of the requested type. + pub async fn get_resource(&mut self) -> Option { + #[allow(clippy::borrowed_box)] + let downcast_clone = |resource: &Box| { + resource + .downcast_ref::() + .unwrap_or_else(|| { + panic!( + "Resource {} is not of type {}", + T::resource_id(), + std::any::type_name::() + ) + }) + .clone() + }; + + let name = T::resource_id(); + // Check whether the resource is already available. + if let Some(resource) = self.node.resources.get(&name) { + return Some(downcast_clone(resource)); + } + + // Try to fetch the resource from the provider. + if let Some(resource) = self.node.resource_provider.get_resource(&name).await { + // First, ensure the type matches. + let downcasted = downcast_clone(&resource); + // Then, add it to the local resources. + self.node.resources.insert(name, resource); + return Some(downcasted); + } + + // No such resource. + // The requester is allowed to decide whether this is an error or not. + None + } + + /// Attempts to retrieve the resource with the specified name. + /// If the resource is not available, it is created using the provided closure. + pub async fn get_resource_or_insert_with T>( + &mut self, + f: F, + ) -> T { + if let Some(resource) = self.get_resource::().await { + return resource; + } + + // No such resource, insert a new one. + let resource = f(); + self.node + .resources + .insert(T::resource_id(), Box::new(resource.clone())); + resource + } + + /// Attempts to retrieve the resource with the specified name. + /// If the resource is not available, it is created using `T::default()`. + pub async fn get_resource_or_default(&mut self) -> T { + self.get_resource_or_insert_with(T::default).await + } +} diff --git a/core/lib/node/src/node/mod.rs b/core/lib/node/src/node/mod.rs new file mode 100644 index 000000000000..42b9c9ac60a4 --- /dev/null +++ b/core/lib/node/src/node/mod.rs @@ -0,0 +1,207 @@ +use std::{collections::HashMap, fmt}; + +use futures::{future::BoxFuture, FutureExt}; +use tokio::{runtime::Runtime, sync::watch}; + +pub use self::{context::NodeContext, stop_receiver::StopReceiver}; +use crate::{ + resource::{ResourceId, ResourceProvider, StoredResource}, + task::{IntoZkSyncTask, TaskInitError}, +}; + +mod context; +mod stop_receiver; + +/// "Manager" class of the node. Collects all the resources and tasks, +/// then runs tasks until completion. +/// +/// Initialization flow: +/// - Node instance is created with access to the resource provider. +/// - Task constructors are added to the node. At this step, tasks are not created yet. +/// - Optionally, a healthcheck task constructor is also added. +/// - Once the `run` method is invoked, node +/// - attempts to create every task. If there are no tasks, or at least task +/// constructor fails, the node will return an error. +/// - initializes the healthcheck task if it's provided. +/// - waits for any of the tasks to finish. +/// - sends stop signal to all the tasks. +/// - waits for the remaining tasks to finish. +/// - calls `after_node_shutdown` hook for every task that has provided it. +/// - returns the result of the task that has finished. +pub struct ZkSyncNode { + /// Primary source of resources for tasks. + resource_provider: Box, + /// Cache of resources that have been requested at least by one task. + resources: HashMap>, + /// List of task builders. + task_builders: Vec>, + + /// Sender used to stop the tasks. + stop_sender: watch::Sender, + /// Tokio runtime used to spawn tasks. + runtime: Runtime, +} + +impl fmt::Debug for ZkSyncNode { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ZkSyncNode").finish_non_exhaustive() + } +} + +impl ZkSyncNode { + pub fn new(resource_provider: R) -> anyhow::Result { + if tokio::runtime::Handle::try_current().is_ok() { + anyhow::bail!( + "Detected a Tokio Runtime. ZkSyncNode manages its own runtime and does not support nested runtimes" + ); + } + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + let (stop_sender, _stop_receiver) = watch::channel(false); + let self_ = Self { + resource_provider: Box::new(resource_provider), + resources: HashMap::default(), + task_builders: Vec::new(), + stop_sender, + runtime, + }; + + Ok(self_) + } + + /// Adds a task to the node. + /// + /// The task is not created at this point, instead, the constructor is stored in the node + /// and will be invoked during [`ZkSyncNode::run`] method. Any error returned by the constructor + /// will prevent the node from starting and will be propagated by the [`ZkSyncNode::run`] method. + pub fn add_task(&mut self, builder: T) -> &mut Self { + self.task_builders.push(Box::new(builder)); + self + } + + /// Runs the system. + pub fn run(mut self) -> anyhow::Result<()> { + // Initialize tasks. + let task_builders = std::mem::take(&mut self.task_builders); + + let mut tasks = Vec::new(); + + let mut errors: Vec<(String, TaskInitError)> = Vec::new(); + + let runtime_handle = self.runtime.handle().clone(); + for task_builder in task_builders { + let name = task_builder.task_name().to_string(); + let task_result = + runtime_handle.block_on(task_builder.create(NodeContext::new(&mut self))); + let task = match task_result { + Ok(task) => task, + Err(err) => { + // We don't want to bail on the first error, since it'll provide worse DevEx: + // People likely want to fix as much problems as they can in one go, rather than have + // to fix them one by one. + errors.push((name, err)); + continue; + } + }; + let after_node_shutdown = task.after_node_shutdown(); + let task_future = Box::pin(task.run(self.stop_receiver())); + let task_repr = TaskRepr { + name, + task: Some(task_future), + after_node_shutdown, + }; + tasks.push(task_repr); + } + + // Report all the errors we've met during the init. + if !errors.is_empty() { + for (task, error) in errors { + tracing::error!("Task {task} can't be initialized: {error}"); + } + anyhow::bail!("One or more task weren't able to start"); + } + + if tasks.is_empty() { + anyhow::bail!("No tasks to run"); + } + + // Wiring is now complete. + for resource in self.resources.values_mut() { + resource.stored_resource_wired(); + } + + // Prepare tasks for running. + let rt_handle = self.runtime.handle().clone(); + let join_handles: Vec<_> = tasks + .iter_mut() + .map(|task| { + let task = task.task.take().expect( + "Tasks are created by the node and must be Some prior to calling this method", + ); + rt_handle.spawn(task).fuse() + }) + .collect(); + + // Run the tasks until one of them exits. + // TODO (QIT-24): wrap every task into a timeout to prevent hanging. + let (resolved, idx, remaining) = self + .runtime + .block_on(futures::future::select_all(join_handles)); + let task_name = tasks[idx].name.clone(); + let failure = match resolved { + Ok(Ok(())) => { + tracing::info!("Task {task_name} completed"); + false + } + Ok(Err(err)) => { + tracing::error!("Task {task_name} exited with an error: {err}"); + true + } + Err(_) => { + tracing::error!("Task {task_name} panicked"); + true + } + }; + + // Send stop signal to remaining tasks and wait for them to finish. + // Given that we are shutting down, we do not really care about returned values. + self.stop_sender.send(true).ok(); + self.runtime.block_on(futures::future::join_all(remaining)); + + // Call after_node_shutdown hooks. + let local_set = tokio::task::LocalSet::new(); + let join_handles = tasks.iter_mut().filter_map(|task| { + task.after_node_shutdown + .take() + .map(|task| local_set.spawn_local(task)) + }); + local_set.block_on(&self.runtime, futures::future::join_all(join_handles)); + + if failure { + anyhow::bail!("Task {task_name} failed"); + } else { + Ok(()) + } + } + + pub(crate) fn stop_receiver(&self) -> StopReceiver { + StopReceiver(self.stop_sender.subscribe()) + } +} + +struct TaskRepr { + name: String, + task: Option>>, + after_node_shutdown: Option>, +} + +impl fmt::Debug for TaskRepr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TaskRepr") + .field("name", &self.name) + .finish_non_exhaustive() + } +} diff --git a/core/lib/node/src/node/stop_receiver.rs b/core/lib/node/src/node/stop_receiver.rs new file mode 100644 index 000000000000..7a181b49a80d --- /dev/null +++ b/core/lib/node/src/node/stop_receiver.rs @@ -0,0 +1,16 @@ +use tokio::sync::watch; + +/// Represents a receiver for the stop signal. +/// This signal is sent when the node is shutting down. +/// Every task is expected to listen to this signal and stop its execution when it is received. +/// +/// This structure exists as a first-class entity instead of being a resource to make it more visible +/// and prevent tasks from hanging by accident. +#[derive(Debug, Clone)] +pub struct StopReceiver(pub watch::Receiver); + +impl StopReceiver { + pub fn new(receiver: watch::Receiver) -> Self { + Self(receiver) + } +} diff --git a/core/lib/node/src/resource/lazy_resource.rs b/core/lib/node/src/resource/lazy_resource.rs new file mode 100644 index 000000000000..3ec6e3cdb0bd --- /dev/null +++ b/core/lib/node/src/resource/lazy_resource.rs @@ -0,0 +1,92 @@ +use std::sync::Arc; + +use thiserror::Error; +use tokio::sync::watch; + +use super::{Resource, ResourceId}; +use crate::node::StopReceiver; + +/// A lazy resource represents a resource that isn't available at the time when the tasks start. +/// +/// Normally it's used to represent the resources that should be provided by one task to another one. +/// Lazy resources are aware of the node lifecycle, so attempt to resolve the resource won't hang +/// if the resource is never provided: the resolve future will fail once the stop signal is sent by the node. +#[derive(Debug)] +pub struct LazyResource { + resolve_sender: Arc>>, + stop_receiver: StopReceiver, +} + +impl Resource for LazyResource { + fn resource_id() -> ResourceId { + ResourceId::new("lazy") + T::resource_id() + } +} + +impl Clone for LazyResource { + fn clone(&self) -> Self { + Self { + resolve_sender: self.resolve_sender.clone(), + stop_receiver: self.stop_receiver.clone(), + } + } +} + +impl LazyResource { + /// Creates a new lazy resource. + /// Provided stop receiver will be used to prevent resolving from hanging if the resource is never provided. + pub fn new(stop_receiver: StopReceiver) -> Self { + let (resolve_sender, _resolve_receiver) = watch::channel(None); + + Self { + resolve_sender: Arc::new(resolve_sender), + stop_receiver, + } + } + + /// Returns a future that resolves to the resource once it is provided. + /// If the resource is never provided, the method will return an error once the node is shutting down. + pub async fn resolve(mut self) -> Result { + let mut resolve_receiver = self.resolve_sender.subscribe(); + if let Some(resource) = resolve_receiver.borrow().as_ref() { + return Ok(resource.clone()); + } + + tokio::select! { + _ = self.stop_receiver.0.changed() => { + Err(LazyResourceError::NodeShutdown) + } + _ = resolve_receiver.changed() => { + // ^ we can ignore the error on `changed`, since we hold a strong reference to the sender. + let resource = resolve_receiver.borrow().as_ref().expect("Can only change if provided").clone(); + Ok(resource) + } + } + } + + /// Provides the resource. + /// May be called at most once. Subsequent calls will return an error. + pub async fn provide(&mut self, resource: T) -> Result<(), LazyResourceError> { + let sent = self.resolve_sender.send_if_modified(|current| { + if current.is_some() { + return false; + } + *current = Some(resource.clone()); + true + }); + + if !sent { + return Err(LazyResourceError::ResourceAlreadyProvided); + } + + Ok(()) + } +} + +#[derive(Debug, Error)] +pub enum LazyResourceError { + #[error("Node is shutting down")] + NodeShutdown, + #[error("Resource is already provided")] + ResourceAlreadyProvided, +} diff --git a/core/lib/node/src/resource/mod.rs b/core/lib/node/src/resource/mod.rs new file mode 100644 index 000000000000..2ed10d56416d --- /dev/null +++ b/core/lib/node/src/resource/mod.rs @@ -0,0 +1,83 @@ +use std::{any::TypeId, fmt}; + +pub use self::{ + lazy_resource::LazyResource, resource_collection::ResourceCollection, resource_id::ResourceId, +}; + +mod lazy_resource; +mod resource_collection; +mod resource_id; + +/// A trait for anything that can be stored (and retrieved) as a resource. +/// Typically, the type that implements this trait also should implement `Clone` +/// since the same resource may be requested by several tasks and thus it would be an additional +/// bound on most methods that work with [`Resource`]. +pub trait Resource: 'static + Send + Sync + std::any::Any { + /// Unique identifier of the resource. + /// Used to fetch the resource from the provider. + /// + /// It is recommended to name resources in form of `/`, where `` is the name of the task + /// that will use this resource, or 'common' in case it is used by several tasks, and `` is the name + /// of the resource itself. + fn resource_id() -> ResourceId; + + fn on_resource_wired(&mut self) {} +} + +/// Internal, object-safe version of [`Resource`]. +/// Used to store resources in the node without knowing their exact type. +/// +/// This trait is implemented for any type that implements [`Resource`], so there is no need to +/// implement it manually. +pub trait StoredResource: 'static + std::any::Any + Send + Sync { + /// An object-safe version of [`Resource::resource_id`]. + fn stored_resource_id(&self) -> ResourceId; + + /// An object-safe version of [`Resource::on_resoure_wired`]. + fn stored_resource_wired(&mut self); +} + +impl StoredResource for T { + fn stored_resource_id(&self) -> ResourceId { + T::resource_id() + } + + fn stored_resource_wired(&mut self) { + Resource::on_resource_wired(self); + } +} + +impl dyn StoredResource { + /// Reimplementation of `Any::downcast_ref`. + /// Returns `Some` if the type is correct, and `None` otherwise. + // Note: This method is required as we cannot store objects as, for example, `dyn StoredResource + Any`, + // so we don't have access to `Any::downcast_ref` within the node. + pub(crate) fn downcast_ref(&self) -> Option<&T> { + if self.type_id() == TypeId::of::() { + // SAFETY: We just checked that the type is correct. + unsafe { Some(&*(self as *const dyn StoredResource as *const T)) } + } else { + None + } + } +} + +/// An entity that knows how to initialize resources. +/// +/// It exists to simplify the initialization process, as both tasks and *resources* can depend on other resources, +/// and by having an entity that can initialize the resource on demand we can avoid the need to provide resources +/// in any particular order. +/// +/// Node will only call `get_resource` method once per resource, and will cache the result. This guarantees that +/// all the resource consumers will interact with the same resource instance, which may be important for getting +/// the consistent state (e.g. to make sure that L1 gas price is the same for all the tasks). +#[async_trait::async_trait] +pub trait ResourceProvider: 'static + Send + Sync + fmt::Debug { + /// Returns a resource with the given name. + /// + /// In case it isn't possible to obtain the resource (for example, if some error occurred during initialization), + /// the provider is free to either return `None` (if it assumes that the node can continue without this resource), + /// or to panic. + // Note: we have to use `Box` here, since we can't use `Box` due to it not being object-safe. + async fn get_resource(&self, resource: &ResourceId) -> Option>; +} diff --git a/core/lib/node/src/resource/resource_collection.rs b/core/lib/node/src/resource/resource_collection.rs new file mode 100644 index 000000000000..05c12ad40662 --- /dev/null +++ b/core/lib/node/src/resource/resource_collection.rs @@ -0,0 +1,106 @@ +use std::{ + fmt, + sync::{Arc, Mutex}, +}; + +use thiserror::Error; +use tokio::sync::watch; + +use super::{Resource, ResourceId}; + +/// Collection of resources that can be extended during the initialization phase, and then resolved once +/// the wiring is complete. +/// +/// During component initialization, resource collections can be requested by the components in order to push new +/// elements there. Once the initialization is complete, it is no longer possible to push new elements, and the +/// collection can be resolved into a vector of resources. +/// +/// Collections implement `Clone`, so they can be consumed by several tasks. Every task that resolves the collection +/// is guaranteed to have the same set of resources. +/// +/// The purpose of this container is to allow different tasks to register their resource in a single place for some +/// other task to consume. For example, tasks may register their healthchecks, and then healthcheck task will observe +/// all the provided healthchecks. +pub struct ResourceCollection { + /// Collection of the resources. + resources: Arc>>, + /// Sender indicating that the wiring is complete. + wiring_complete_sender: Arc>, + /// Flag indicating that the collection has been resolved. + wired: watch::Receiver, +} + +impl Resource for ResourceCollection { + fn resource_id() -> ResourceId { + ResourceId::new("collection") + T::resource_id() + } + + fn on_resource_wired(&mut self) { + self.wiring_complete_sender.send(true).ok(); + } +} + +impl Default for ResourceCollection { + fn default() -> Self { + Self::new() + } +} + +impl Clone for ResourceCollection { + fn clone(&self) -> Self { + Self { + resources: self.resources.clone(), + wiring_complete_sender: self.wiring_complete_sender.clone(), + wired: self.wired.clone(), + } + } +} + +impl fmt::Debug for ResourceCollection { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ResourceCollection") + .field("resources", &"{..}") + .finish_non_exhaustive() + } +} + +#[derive(Debug, Error)] +pub enum ResourceCollectionError { + #[error("Adding resources to the collection is not allowed after the wiring is complete")] + AlreadyWired, +} + +impl ResourceCollection { + pub(crate) fn new() -> Self { + let (wiring_complete_sender, wired) = watch::channel(false); + Self { + resources: Arc::default(), + wiring_complete_sender: Arc::new(wiring_complete_sender), + wired, + } + } + + /// Adds a new element to the resource collection. + /// Returns an error if the wiring is already complete. + pub fn push(&self, resource: T) -> Result<(), ResourceCollectionError> { + // This check is sufficient, since no task is guaranteed to be running when the value changes. + if *self.wired.borrow() { + return Err(ResourceCollectionError::AlreadyWired); + } + + let mut handle = self.resources.lock().unwrap(); + handle.push(resource); + Ok(()) + } + + /// Waits until the wiring is complete, and resolves the collection into a vector of resources. + pub async fn resolve(mut self) -> Vec { + // Guaranteed not to hang on server shutdown, since the node will invoke the `on_wiring_complete` before any task + // is actually spawned (per framework rules). For most cases, this check will resolve immediately, unless + // some tasks would spawn something from the `IntoZkSyncTask` impl. + self.wired.changed().await.expect("Sender can't be dropped"); + + let handle = self.resources.lock().unwrap(); + (*handle).clone() + } +} diff --git a/core/lib/node/src/resource/resource_id.rs b/core/lib/node/src/resource/resource_id.rs new file mode 100644 index 000000000000..73f7f43c58a4 --- /dev/null +++ b/core/lib/node/src/resource/resource_id.rs @@ -0,0 +1,47 @@ +use std::{ + fmt, + ops::{Add, AddAssign}, +}; + +/// A unique identifier of the resource. +/// Typically, represented as a path-like string, e.g. `common/master_pool`. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct ResourceId { + /// Path-like representation of the resource identifier. + /// Represented as a `Vec` for ID composability (e.g. collection IDs can be defined as + /// `ResourceId::from("collection") + Resource::resource_id()`). + id: Vec<&'static str>, +} + +impl ResourceId { + pub fn new(id: &'static str) -> Self { + Self { id: vec![id] } + } +} + +impl Add for ResourceId { + type Output = Self; + + fn add(mut self, rhs: ResourceId) -> Self::Output { + self.id.extend(rhs.id); + self + } +} + +impl AddAssign for ResourceId { + fn add_assign(&mut self, rhs: ResourceId) { + self.id.extend(rhs.id); + } +} + +impl From<&'static str> for ResourceId { + fn from(id: &'static str) -> Self { + Self { id: vec![id] } + } +} + +impl fmt::Display for ResourceId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.id.join("/")) + } +} diff --git a/core/lib/node/src/task/mod.rs b/core/lib/node/src/task/mod.rs new file mode 100644 index 000000000000..43f332a0f4d9 --- /dev/null +++ b/core/lib/node/src/task/mod.rs @@ -0,0 +1,70 @@ +//! Tasks define the "runnable" concept of the node, e.g. something that can be launched and runs until the node +//! is stopped. +//! +//! Task is normally defined by two types, one implementing two traits [`IntoZkSyncTask`], which acts like a +//! constructor, and another one, which implements [`ZkSyncTask`], providing an interface for `ZkSyncNode` to +//! implement the task lifecycle. + +use futures::future::BoxFuture; + +use crate::{ + node::{NodeContext, StopReceiver}, + resource::ResourceId, +}; + +/// Factory that can create a task. +#[async_trait::async_trait] +pub trait IntoZkSyncTask: 'static + Send + Sync { + /// Unique name of the task. + fn task_name(&self) -> &'static str; + + /// Creates a new task. + /// + /// `NodeContext` provides an interface to the utilities that task may need, e.g. ability to get resources + /// or spawn additional tasks. + async fn create( + self: Box, + node: NodeContext<'_>, + ) -> Result, TaskInitError>; +} + +/// A task implementation. +#[async_trait::async_trait] +pub trait ZkSyncTask: 'static + Send + Sync { + /// Runs the task. + /// + /// Once any of the task returns, the node will shutdown. + /// If the task returns an error, the node will spawn an error-level log message and will return a non-zero + /// exit code. + /// + /// `stop_receiver` argument contains a channel receiver that will change its value once the node requests + /// a shutdown. Every task is expected to either await or periodically check the state of channel and stop + /// its execution once the channel is changed. + /// + /// Each task is expected to perform the required cleanup after receiving the stop signal. + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()>; + + /// Asynchronous hook that will be called after *each task* has finished their cleanup. + /// It is guaranteed that no other task is running at this point, e.g. `ZkSyncNode` will invoke + /// this hook sequentially for each task. + /// + /// This hook can be used to perform some cleanup that assumes exclusive access to the resources, e.g. + /// to rollback some state. + /// + /// *Note*: This hook **should not** be used to perform trivial task cleanup, e.g. to wait for the spawned + /// server to stop. By the time this hook is invoked, every component of the node is expected to stop. Not + /// following this rule may cause the tasks that properly implement this hook to malfunction. + fn after_node_shutdown(&self) -> Option> { + None + } +} + +/// An error that can occur during the task initialization. +#[derive(thiserror::Error, Debug)] +#[non_exhaustive] +pub enum TaskInitError { + #[error("Resource {0} is not provided")] + ResourceLacking(ResourceId), + #[error(transparent)] + Internal(#[from] anyhow::Error), +} diff --git a/core/lib/zksync_core/src/api_server/healthcheck.rs b/core/lib/zksync_core/src/api_server/healthcheck.rs index 7010d29fb4b5..cbf8c9d6faf4 100644 --- a/core/lib/zksync_core/src/api_server/healthcheck.rs +++ b/core/lib/zksync_core/src/api_server/healthcheck.rs @@ -4,9 +4,9 @@ use axum::{extract::State, http::StatusCode, routing::get, Json, Router}; use tokio::sync::watch; use zksync_health_check::{AppHealth, CheckHealth}; -type SharedHealthchecks = Arc<[Box]>; - -async fn check_health(health_checks: State) -> (StatusCode, Json) { +async fn check_health>( + health_checks: State>, +) -> (StatusCode, Json) { let response = AppHealth::new(&health_checks).await; let response_code = if response.is_ready() { StatusCode::OK @@ -16,14 +16,16 @@ async fn check_health(health_checks: State) -> (StatusCode, (response_code, Json(response)) } -async fn run_server( +async fn run_server( bind_address: &SocketAddr, - health_checks: Vec>, + health_checks: Vec, mut stop_receiver: watch::Receiver, -) { +) where + T: AsRef + Send + Sync + 'static, +{ let mut health_check_names = HashSet::with_capacity(health_checks.len()); for check in &health_checks { - let health_check_name = check.name(); + let health_check_name = check.as_ref().name(); if !health_check_names.insert(health_check_name) { tracing::warn!( "Health check with name `{health_check_name}` is defined multiple times; only the last mention \ @@ -35,7 +37,7 @@ async fn run_server( "Starting healthcheck server with checks {health_check_names:?} on {bind_address}" ); - let health_checks = SharedHealthchecks::from(health_checks); + let health_checks = Arc::from(health_checks); let app = Router::new() .route("/health", get(check_health)) .with_state(health_checks); @@ -60,7 +62,10 @@ pub struct HealthCheckHandle { } impl HealthCheckHandle { - pub fn spawn_server(addr: SocketAddr, healthchecks: Vec>) -> Self { + pub fn spawn_server(addr: SocketAddr, healthchecks: Vec) -> Self + where + T: AsRef + Send + Sync + 'static, + { let (stop_sender, stop_receiver) = watch::channel(false); let server = tokio::spawn(async move { run_server(&addr, healthchecks, stop_receiver).await; diff --git a/core/lib/zksync_core/src/metadata_calculator/mod.rs b/core/lib/zksync_core/src/metadata_calculator/mod.rs index e5a93d7d3de2..36c8d7eb5a64 100644 --- a/core/lib/zksync_core/src/metadata_calculator/mod.rs +++ b/core/lib/zksync_core/src/metadata_calculator/mod.rs @@ -61,7 +61,7 @@ pub struct MetadataCalculatorConfig { } impl MetadataCalculatorConfig { - pub(crate) fn for_main_node( + pub fn for_main_node( merkle_tree_config: &MerkleTreeConfig, operation_config: &OperationsManagerConfig, ) -> Self {