From 6ed5ceb4806f2560ba52a30134992f63d635c64a Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Mon, 5 Feb 2024 13:38:43 +0400 Subject: [PATCH] refactor: Framework interface improvements (#975) Renamed: - `IntoZkSyncTask` -> `WiringLayer` (as this trait now does more than just creating a single task) - `IntoZkSyncTask::create` -> `WiringLayer::wire` (layer doesn't have to create anything) - `ZkSyncTask` -> `Task` (was unnecessary verbose and inconsistent with simple `Resource`) - `ZkSyncNode` -> `ZkStackService` (1) not tied to zkSync; 2) several people told me that it doesn't have to represent the whole node) - `ZkSyncNode::add_task` -> `ZKStackService::add_layer` Moved: - `core/lib/node` -> `core/node/node_framework` (as we previously agreed to create a separate directory for things that define the node) Changed: - `WiringLayer` no longer returns a task, instead, it adds any number of tasks through the node context. --- Cargo.lock | 2 +- Cargo.toml | 3 +- core/lib/node/Cargo.toml | 31 ------ core/node/node_framework/Cargo.toml | 31 ++++++ .../node_framework}/examples/main_node.rs | 17 ++-- .../layers}/healtcheck_server.rs | 36 ++++--- .../layers}/metadata_calculator.rs | 48 ++++++---- .../src/implementations/layers}/mod.rs | 0 .../layers}/prometheus_exporter.rs | 36 ++++--- .../src/implementations/mod.rs | 4 +- .../implementations/resources}/healthcheck.rs | 0 .../src/implementations/resources}/mod.rs | 0 .../resources}/object_store.rs | 0 .../src/implementations/resources}/pools.rs | 0 .../node => node/node_framework}/src/lib.rs | 11 ++- .../src/resource/lazy_resource.rs | 2 +- .../node_framework}/src/resource/mod.rs | 0 .../src/resource/resource_collection.rs | 0 .../src/resource/resource_id.rs | 0 .../node_framework/src/service}/context.rs | 38 ++++---- .../node_framework/src/service}/mod.rs | 95 +++++++++---------- .../src/service}/stop_receiver.rs | 0 .../node_framework/src/task.rs} | 40 +------- core/node/node_framework/src/wiring_layer.rs | 26 +++++ 24 files changed, 225 insertions(+), 195 deletions(-) delete mode 100644 core/lib/node/Cargo.toml create mode 100644 core/node/node_framework/Cargo.toml rename core/{lib/node => node/node_framework}/examples/main_node.rs (88%) rename core/{lib/node/src/implementations/task => node/node_framework/src/implementations/layers}/healtcheck_server.rs (64%) rename core/{lib/node/src/implementations/task => node/node_framework/src/implementations/layers}/metadata_calculator.rs (63%) rename core/{lib/node/src/implementations/task => node/node_framework/src/implementations/layers}/mod.rs (100%) rename core/{lib/node/src/implementations/task => node/node_framework/src/implementations/layers}/prometheus_exporter.rs (61%) rename core/{lib/node => node/node_framework}/src/implementations/mod.rs (83%) rename core/{lib/node/src/implementations/resource => node/node_framework/src/implementations/resources}/healthcheck.rs (100%) rename core/{lib/node/src/implementations/resource => node/node_framework/src/implementations/resources}/mod.rs (100%) rename core/{lib/node/src/implementations/resource => node/node_framework/src/implementations/resources}/object_store.rs (100%) rename core/{lib/node/src/implementations/resource => node/node_framework/src/implementations/resources}/pools.rs (100%) rename core/{lib/node => node/node_framework}/src/lib.rs (64%) rename core/{lib/node => node/node_framework}/src/resource/lazy_resource.rs (98%) rename core/{lib/node => node/node_framework}/src/resource/mod.rs (100%) rename core/{lib/node => node/node_framework}/src/resource/resource_collection.rs (100%) rename core/{lib/node => node/node_framework}/src/resource/resource_id.rs (100%) rename core/{lib/node/src/node => node/node_framework/src/service}/context.rs (75%) rename core/{lib/node/src/node => node/node_framework/src/service}/mod.rs (71%) rename core/{lib/node/src/node => node/node_framework/src/service}/stop_receiver.rs (100%) rename core/{lib/node/src/task/mod.rs => node/node_framework/src/task.rs} (59%) create mode 100644 core/node/node_framework/src/wiring_layer.rs diff --git a/Cargo.lock b/Cargo.lock index f9152a0f6838..44e87b66230d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8766,7 +8766,7 @@ dependencies = [ ] [[package]] -name = "zksync_node" +name = "zksync_node_framework" version = "0.1.0" dependencies = [ "anyhow", diff --git a/Cargo.toml b/Cargo.toml index 8213e01170d2..a7dab6c4b367 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,8 @@ members = [ "core/bin/system-constants-generator", "core/bin/verified_sources_fetcher", "core/bin/zksync_server", + # Node services + "core/node/node_framework", # Libraries "core/lib/zksync_core", "core/lib/basic_types", @@ -27,7 +29,6 @@ members = [ "core/lib/mempool", "core/lib/merkle_tree", "core/lib/mini_merkle_tree", - "core/lib/node", "core/lib/object_store", "core/lib/prometheus_exporter", "core/lib/prover_interface", diff --git a/core/lib/node/Cargo.toml b/core/lib/node/Cargo.toml deleted file mode 100644 index bda2315f5361..000000000000 --- a/core/lib/node/Cargo.toml +++ /dev/null @@ -1,31 +0,0 @@ -[package] -name = "zksync_node" -version = "0.1.0" -edition = "2018" -authors = ["The Matter Labs Team "] -homepage = "https://zksync.io/" -repository = "https://github.com/matter-labs/zksync-era" -license = "MIT OR Apache-2.0" -keywords = ["blockchain", "zksync"] -categories = ["cryptography"] - -[dependencies] -prometheus_exporter = { path = "../prometheus_exporter" } -zksync_types = { path = "../types" } -zksync_health_check = { path = "../health_check" } -zksync_dal = { path = "../dal" } -zksync_config = { path = "../config" } -zksync_object_store = { path = "../object_store" } -zksync_core = { path = "../zksync_core" } -zksync_storage = { path = "../storage" } - -tracing = "0.1" -thiserror = "1" -async-trait = "0.1" -futures = "0.3" -anyhow = "1" -tokio = { version = "1", features = ["rt"] } - -[dev-dependencies] -zksync_env_config = { path = "../env_config" } -vlog = { path = "../vlog" } diff --git a/core/node/node_framework/Cargo.toml b/core/node/node_framework/Cargo.toml new file mode 100644 index 000000000000..d213afb8c5ee --- /dev/null +++ b/core/node/node_framework/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "zksync_node_framework" +version = "0.1.0" +edition = "2018" +authors = ["The Matter Labs Team "] +homepage = "https://zksync.io/" +repository = "https://github.com/matter-labs/zksync-era" +license = "MIT OR Apache-2.0" +keywords = ["blockchain", "zksync"] +categories = ["cryptography"] + +[dependencies] +prometheus_exporter = { path = "../../lib/prometheus_exporter" } +zksync_types = { path = "../../lib/types" } +zksync_health_check = { path = "../../lib/health_check" } +zksync_dal = { path = "../../lib/dal" } +zksync_config = { path = "../../lib/config" } +zksync_object_store = { path = "../../lib/object_store" } +zksync_core = { path = "../../lib/zksync_core" } +zksync_storage = { path = "../../lib/storage" } + +tracing = "0.1" +thiserror = "1" +async-trait = "0.1" +futures = "0.3" +anyhow = "1" +tokio = { version = "1", features = ["rt"] } + +[dev-dependencies] +zksync_env_config = { path = "../../lib/env_config" } +vlog = { path = "../../lib/vlog" } diff --git a/core/lib/node/examples/main_node.rs b/core/node/node_framework/examples/main_node.rs similarity index 88% rename from core/lib/node/examples/main_node.rs rename to core/node/node_framework/examples/main_node.rs index afe64139e0ba..6a3036cd7be5 100644 --- a/core/lib/node/examples/main_node.rs +++ b/core/node/node_framework/examples/main_node.rs @@ -6,16 +6,15 @@ use zksync_config::{configs::chain::OperationsManagerConfig, DBConfig, PostgresC use zksync_core::metadata_calculator::MetadataCalculatorConfig; use zksync_dal::ConnectionPool; use zksync_env_config::FromEnv; -use zksync_node::{ +use zksync_node_framework::{ implementations::{ - resource::pools::MasterPoolResource, - task::{ - healtcheck_server::HealthCheckTaskBuilder, - metadata_calculator::MetadataCalculatorTaskBuilder, + layers::{ + healtcheck_server::HealthCheckLayer, metadata_calculator::MetadataCalculatorLayer, }, + resources::pools::MasterPoolResource, }, - node::ZkSyncNode, resource::{Resource, ResourceId, ResourceProvider, StoredResource}, + service::ZkStackService, }; /// Resource provider for the main node. @@ -62,7 +61,7 @@ fn main() -> anyhow::Result<()> { // Create the node with specified resource provider. We don't need to add any resources explicitly, // the task will request what they actually need. The benefit here is that we won't instantiate resources // that are not used, which would be complex otherwise, since the task set is often dynamic. - let mut node = ZkSyncNode::new(MainNodeResourceProvider)?; + let mut node = ZkStackService::new(MainNodeResourceProvider)?; // Add the metadata calculator task. let merkle_tree_env_config = DBConfig::from_env()?.merkle_tree; @@ -71,11 +70,11 @@ fn main() -> anyhow::Result<()> { &merkle_tree_env_config, &operations_manager_env_config, ); - node.add_task(MetadataCalculatorTaskBuilder(metadata_calculator_config)); + node.add_layer(MetadataCalculatorLayer(metadata_calculator_config)); // Add the healthcheck server. let healthcheck_config = zksync_config::ApiConfig::from_env()?.healthcheck; - node.add_task(HealthCheckTaskBuilder(healthcheck_config)); + node.add_layer(HealthCheckLayer(healthcheck_config)); // Run the node until completion. node.run()?; diff --git a/core/lib/node/src/implementations/task/healtcheck_server.rs b/core/node/node_framework/src/implementations/layers/healtcheck_server.rs similarity index 64% rename from core/lib/node/src/implementations/task/healtcheck_server.rs rename to core/node/node_framework/src/implementations/layers/healtcheck_server.rs index 6350196da40e..8527d5323415 100644 --- a/core/lib/node/src/implementations/task/healtcheck_server.rs +++ b/core/node/node_framework/src/implementations/layers/healtcheck_server.rs @@ -4,29 +4,34 @@ use zksync_config::configs::api::HealthCheckConfig; use zksync_core::api_server::healthcheck::HealthCheckHandle; use crate::{ - implementations::resource::healthcheck::HealthCheckResource, - node::{NodeContext, StopReceiver}, + implementations::resources::healthcheck::HealthCheckResource, resource::ResourceCollection, - task::{IntoZkSyncTask, TaskInitError, ZkSyncTask}, + service::{ServiceContext, StopReceiver}, + task::Task, + wiring_layer::{WiringError, WiringLayer}, }; /// Builder for a health check server. /// /// Spawned task collects all the health checks added by different tasks to the /// corresponding resource collection and spawns an HTTP server exposing them. +/// +/// This layer expects other tasks to add health checks to the `ResourceCollection`. +/// +/// ## Effects +/// +/// - Resolves `ResourceCollection`. +/// - Adds `healthcheck_server` to the node. #[derive(Debug)] -pub struct HealthCheckTaskBuilder(pub HealthCheckConfig); +pub struct HealthCheckLayer(pub HealthCheckConfig); #[async_trait::async_trait] -impl IntoZkSyncTask for HealthCheckTaskBuilder { - fn task_name(&self) -> &'static str { - "healthcheck_server" +impl WiringLayer for HealthCheckLayer { + fn layer_name(&self) -> &'static str { + "healthcheck_layer" } - async fn create( - self: Box, - mut node: NodeContext<'_>, - ) -> Result, TaskInitError> { + async fn wire(self: Box, mut node: ServiceContext<'_>) -> Result<(), WiringError> { let healthchecks = node .get_resource_or_default::>() .await; @@ -36,7 +41,8 @@ impl IntoZkSyncTask for HealthCheckTaskBuilder { healthchecks, }; - Ok(Box::new(task)) + node.add_task(Box::new(task)); + Ok(()) } } @@ -54,7 +60,11 @@ impl fmt::Debug for HealthCheckTask { } #[async_trait::async_trait] -impl ZkSyncTask for HealthCheckTask { +impl Task for HealthCheckTask { + fn name(&self) -> &'static str { + "healthcheck_server" + } + async fn run(mut self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { let healthchecks = self.healthchecks.resolve().await; diff --git a/core/lib/node/src/implementations/task/metadata_calculator.rs b/core/node/node_framework/src/implementations/layers/metadata_calculator.rs similarity index 63% rename from core/lib/node/src/implementations/task/metadata_calculator.rs rename to core/node/node_framework/src/implementations/layers/metadata_calculator.rs index 0a61ad00abb8..e3a3bdfe1a9b 100644 --- a/core/lib/node/src/implementations/task/metadata_calculator.rs +++ b/core/node/node_framework/src/implementations/layers/metadata_calculator.rs @@ -3,18 +3,26 @@ use zksync_dal::ConnectionPool; use zksync_storage::RocksDB; use crate::{ - implementations::resource::{ + implementations::resources::{ healthcheck::HealthCheckResource, object_store::ObjectStoreResource, pools::MasterPoolResource, }, - node::{NodeContext, StopReceiver}, resource::{Resource, ResourceCollection}, - task::{IntoZkSyncTask, TaskInitError, ZkSyncTask}, + service::{ServiceContext, StopReceiver}, + task::Task, + wiring_layer::{WiringError, WiringLayer}, }; /// Builder for a metadata calculator. +/// +/// ## Effects +/// +/// - Resolves `MasterPoolResource`. +/// - Resolves `ObjectStoreResource` (optional). +/// - Adds `tree_health_check` to the `ResourceCollection`. +/// - Adds `metadata_calculator` to the node. #[derive(Debug)] -pub struct MetadataCalculatorTaskBuilder(pub MetadataCalculatorConfig); +pub struct MetadataCalculatorLayer(pub MetadataCalculatorConfig); #[derive(Debug)] pub struct MetadataCalculatorTask { @@ -23,18 +31,18 @@ pub struct MetadataCalculatorTask { } #[async_trait::async_trait] -impl IntoZkSyncTask for MetadataCalculatorTaskBuilder { - fn task_name(&self) -> &'static str { - "metadata_calculator" +impl WiringLayer for MetadataCalculatorLayer { + fn layer_name(&self) -> &'static str { + "metadata_calculator_layer" } - async fn create( - self: Box, - mut node: NodeContext<'_>, - ) -> Result, TaskInitError> { - let pool = node.get_resource::().await.ok_or( - TaskInitError::ResourceLacking(MasterPoolResource::resource_id()), - )?; + async fn wire(self: Box, mut node: ServiceContext<'_>) -> Result<(), WiringError> { + let pool = + node.get_resource::() + .await + .ok_or(WiringError::ResourceLacking( + MasterPoolResource::resource_id(), + ))?; let main_pool = pool.get().await.unwrap(); let object_store = node.get_resource::().await; // OK to be None. @@ -56,15 +64,21 @@ impl IntoZkSyncTask for MetadataCalculatorTaskBuilder { )) .expect("Wiring stage"); - Ok(Box::new(MetadataCalculatorTask { + let task = Box::new(MetadataCalculatorTask { metadata_calculator, main_pool, - })) + }); + node.add_task(task); + Ok(()) } } #[async_trait::async_trait] -impl ZkSyncTask for MetadataCalculatorTask { +impl Task for MetadataCalculatorTask { + fn name(&self) -> &'static str { + "metadata_calculator" + } + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { let result = self .metadata_calculator diff --git a/core/lib/node/src/implementations/task/mod.rs b/core/node/node_framework/src/implementations/layers/mod.rs similarity index 100% rename from core/lib/node/src/implementations/task/mod.rs rename to core/node/node_framework/src/implementations/layers/mod.rs diff --git a/core/lib/node/src/implementations/task/prometheus_exporter.rs b/core/node/node_framework/src/implementations/layers/prometheus_exporter.rs similarity index 61% rename from core/lib/node/src/implementations/task/prometheus_exporter.rs rename to core/node/node_framework/src/implementations/layers/prometheus_exporter.rs index 17923efde95b..5fca61119b94 100644 --- a/core/lib/node/src/implementations/task/prometheus_exporter.rs +++ b/core/node/node_framework/src/implementations/layers/prometheus_exporter.rs @@ -2,15 +2,21 @@ use prometheus_exporter::PrometheusExporterConfig; use zksync_health_check::{HealthStatus, HealthUpdater, ReactiveHealthCheck}; use crate::{ - implementations::resource::healthcheck::HealthCheckResource, - node::{NodeContext, StopReceiver}, + implementations::resources::healthcheck::HealthCheckResource, resource::ResourceCollection, - task::{IntoZkSyncTask, TaskInitError, ZkSyncTask}, + service::{ServiceContext, StopReceiver}, + task::Task, + wiring_layer::{WiringError, WiringLayer}, }; /// Builder for a prometheus exporter. +/// +/// ## Effects +/// +/// - Adds prometheus health check to the `ResourceCollection`. +/// - Adds `prometheus_exporter` to the node. #[derive(Debug)] -pub struct PrometheusExporterTaskBuilder(pub PrometheusExporterConfig); +pub struct PrometheusExporterLayer(pub PrometheusExporterConfig); #[derive(Debug)] pub struct PrometheusExporterTask { @@ -19,15 +25,12 @@ pub struct PrometheusExporterTask { } #[async_trait::async_trait] -impl IntoZkSyncTask for PrometheusExporterTaskBuilder { - fn task_name(&self) -> &'static str { +impl WiringLayer for PrometheusExporterLayer { + fn layer_name(&self) -> &'static str { "prometheus_exporter" } - async fn create( - self: Box, - mut node: NodeContext<'_>, - ) -> Result, TaskInitError> { + async fn wire(self: Box, mut node: ServiceContext<'_>) -> Result<(), WiringError> { let (prometheus_health_check, prometheus_health_updater) = ReactiveHealthCheck::new("prometheus_exporter"); @@ -38,15 +41,22 @@ impl IntoZkSyncTask for PrometheusExporterTaskBuilder { .push(HealthCheckResource::new(prometheus_health_check)) .expect("Wiring stage"); - Ok(Box::new(PrometheusExporterTask { + let task = Box::new(PrometheusExporterTask { config: self.0, prometheus_health_updater, - })) + }); + + node.add_task(task); + Ok(()) } } #[async_trait::async_trait] -impl ZkSyncTask for PrometheusExporterTask { +impl Task for PrometheusExporterTask { + fn name(&self) -> &'static str { + "prometheus_exporter" + } + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { let prometheus_task = self.config.run(stop_receiver.0); self.prometheus_health_updater diff --git a/core/lib/node/src/implementations/mod.rs b/core/node/node_framework/src/implementations/mod.rs similarity index 83% rename from core/lib/node/src/implementations/mod.rs rename to core/node/node_framework/src/implementations/mod.rs index b381595d68f9..a7e589de2916 100644 --- a/core/lib/node/src/implementations/mod.rs +++ b/core/node/node_framework/src/implementations/mod.rs @@ -2,5 +2,5 @@ //! These are temporarily provided by the framework crate itself, but will be moved to the separate crates //! in the future. -pub mod resource; -pub mod task; +pub mod layers; +pub mod resources; diff --git a/core/lib/node/src/implementations/resource/healthcheck.rs b/core/node/node_framework/src/implementations/resources/healthcheck.rs similarity index 100% rename from core/lib/node/src/implementations/resource/healthcheck.rs rename to core/node/node_framework/src/implementations/resources/healthcheck.rs diff --git a/core/lib/node/src/implementations/resource/mod.rs b/core/node/node_framework/src/implementations/resources/mod.rs similarity index 100% rename from core/lib/node/src/implementations/resource/mod.rs rename to core/node/node_framework/src/implementations/resources/mod.rs diff --git a/core/lib/node/src/implementations/resource/object_store.rs b/core/node/node_framework/src/implementations/resources/object_store.rs similarity index 100% rename from core/lib/node/src/implementations/resource/object_store.rs rename to core/node/node_framework/src/implementations/resources/object_store.rs diff --git a/core/lib/node/src/implementations/resource/pools.rs b/core/node/node_framework/src/implementations/resources/pools.rs similarity index 100% rename from core/lib/node/src/implementations/resource/pools.rs rename to core/node/node_framework/src/implementations/resources/pools.rs diff --git a/core/lib/node/src/lib.rs b/core/node/node_framework/src/lib.rs similarity index 64% rename from core/lib/node/src/lib.rs rename to core/node/node_framework/src/lib.rs index c714f998a12a..de6744f34412 100644 --- a/core/lib/node/src/lib.rs +++ b/core/node/node_framework/src/lib.rs @@ -4,21 +4,22 @@ //! //! This crate provides core abstractions that allow one to compose a ZK Stack node. //! Main concepts used in this crate are: -//! - [`IntoZkSyncTask`](task::IntoZkSyncTask) - builder interface for tasks. -//! - [`ZkSyncTask`](task::ZkSyncTask) - a unit of work that can be executed by the node. +//! - [`WiringLayer`](wiring_layer::WiringLayer) - builder interface for tasks. +//! - [`Task`](task::Task) - a unit of work that can be executed by the node. //! - [`Resource`](resource::Resource) - a piece of logic that can be shared between tasks. Most resources are //! represented by generic interfaces and also serve as points of customization for tasks. //! - [`ResourceProvider`](resource::ResourceProvider) - a trait that allows one to provide resources to the node. -//! - [`ZkSyncNode`](node::ZkSyncNode) - a container for tasks and resources that takes care of initialization, running +//! - [`ZkStackService`](service::ZkStackService) - a container for tasks and resources that takes care of initialization, running //! and shutting down. //! //! The general flow to compose a node is as follows: //! - Create a [`ResourceProvider`](resource::ResourceProvider) that can provide all the resources that the node needs. -//! - Create a [`ZkSyncNode`](node::ZkSyncNode) with that [`ResourceProvider`](resource::ResourceProvider). +//! - Create a [`ZkStackService`](node::ZkStackService) with that [`ResourceProvider`](resource::ResourceProvider). //! - Add tasks to the node. //! - Run it. pub mod implementations; -pub mod node; pub mod resource; +pub mod service; pub mod task; +pub mod wiring_layer; diff --git a/core/lib/node/src/resource/lazy_resource.rs b/core/node/node_framework/src/resource/lazy_resource.rs similarity index 98% rename from core/lib/node/src/resource/lazy_resource.rs rename to core/node/node_framework/src/resource/lazy_resource.rs index 3ec6e3cdb0bd..abf5c3fb5f78 100644 --- a/core/lib/node/src/resource/lazy_resource.rs +++ b/core/node/node_framework/src/resource/lazy_resource.rs @@ -4,7 +4,7 @@ use thiserror::Error; use tokio::sync::watch; use super::{Resource, ResourceId}; -use crate::node::StopReceiver; +use crate::service::StopReceiver; /// A lazy resource represents a resource that isn't available at the time when the tasks start. /// diff --git a/core/lib/node/src/resource/mod.rs b/core/node/node_framework/src/resource/mod.rs similarity index 100% rename from core/lib/node/src/resource/mod.rs rename to core/node/node_framework/src/resource/mod.rs diff --git a/core/lib/node/src/resource/resource_collection.rs b/core/node/node_framework/src/resource/resource_collection.rs similarity index 100% rename from core/lib/node/src/resource/resource_collection.rs rename to core/node/node_framework/src/resource/resource_collection.rs diff --git a/core/lib/node/src/resource/resource_id.rs b/core/node/node_framework/src/resource/resource_id.rs similarity index 100% rename from core/lib/node/src/resource/resource_id.rs rename to core/node/node_framework/src/resource/resource_id.rs diff --git a/core/lib/node/src/node/context.rs b/core/node/node_framework/src/service/context.rs similarity index 75% rename from core/lib/node/src/node/context.rs rename to core/node/node_framework/src/service/context.rs index 897bc1c967ef..0f12daf12a41 100644 --- a/core/lib/node/src/node/context.rs +++ b/core/node/node_framework/src/service/context.rs @@ -1,35 +1,35 @@ use crate::{ - node::ZkSyncNode, resource::{Resource, StoredResource}, - task::IntoZkSyncTask, + service::ZkStackService, + task::Task, }; -/// An interface to the node's resources provided to the tasks during initialization. -/// Provides the ability to fetch required resources, and also gives access to the Tokio runtime handle used by the node. +/// An interface to the service's resources provided to the tasks during initialization. +/// Provides the ability to fetch required resources, and also gives access to the Tokio runtime handle. #[derive(Debug)] -pub struct NodeContext<'a> { - node: &'a mut ZkSyncNode, +pub struct ServiceContext<'a> { + service: &'a mut ZkStackService, } -impl<'a> NodeContext<'a> { - pub(super) fn new(node: &'a mut ZkSyncNode) -> Self { - Self { node } +impl<'a> ServiceContext<'a> { + pub(super) fn new(service: &'a mut ZkStackService) -> Self { + Self { service } } - /// Provides access to the runtime used by the node. + /// Provides access to the runtime used by the service. /// Can be used to spawn additional tasks within the same runtime. /// If some tasks stores the handle to spawn additional tasks, it is expected to do all the required /// cleanup. /// /// In most cases, however, it is recommended to use [`add_task`] method instead. pub fn runtime_handle(&self) -> &tokio::runtime::Handle { - self.node.runtime.handle() + self.service.runtime.handle() } - /// Adds an additional task to the node. - /// This may be used if some task or its resource requires an additional routine for maintenance. - pub fn add_task(&mut self, builder: T) -> &mut Self { - self.node.add_task(builder); + /// Adds a task to the service. + /// Added tasks will be launched after the wiring process will be finished. + pub fn add_task(&mut self, task: Box) -> &mut Self { + self.service.tasks.push(task); self } @@ -57,16 +57,16 @@ impl<'a> NodeContext<'a> { let name = T::resource_id(); // Check whether the resource is already available. - if let Some(resource) = self.node.resources.get(&name) { + if let Some(resource) = self.service.resources.get(&name) { return Some(downcast_clone(resource)); } // Try to fetch the resource from the provider. - if let Some(resource) = self.node.resource_provider.get_resource(&name).await { + if let Some(resource) = self.service.resource_provider.get_resource(&name).await { // First, ensure the type matches. let downcasted = downcast_clone(&resource); // Then, add it to the local resources. - self.node.resources.insert(name, resource); + self.service.resources.insert(name, resource); return Some(downcasted); } @@ -87,7 +87,7 @@ impl<'a> NodeContext<'a> { // No such resource, insert a new one. let resource = f(); - self.node + self.service .resources .insert(T::resource_id(), Box::new(resource.clone())); resource diff --git a/core/lib/node/src/node/mod.rs b/core/node/node_framework/src/service/mod.rs similarity index 71% rename from core/lib/node/src/node/mod.rs rename to core/node/node_framework/src/service/mod.rs index 42b9c9ac60a4..40e6328695cd 100644 --- a/core/lib/node/src/node/mod.rs +++ b/core/node/node_framework/src/service/mod.rs @@ -3,38 +3,40 @@ use std::{collections::HashMap, fmt}; use futures::{future::BoxFuture, FutureExt}; use tokio::{runtime::Runtime, sync::watch}; -pub use self::{context::NodeContext, stop_receiver::StopReceiver}; +pub use self::{context::ServiceContext, stop_receiver::StopReceiver}; use crate::{ resource::{ResourceId, ResourceProvider, StoredResource}, - task::{IntoZkSyncTask, TaskInitError}, + task::Task, + wiring_layer::{WiringError, WiringLayer}, }; mod context; mod stop_receiver; -/// "Manager" class of the node. Collects all the resources and tasks, +/// "Manager" class for a set of tasks. Collects all the resources and tasks, /// then runs tasks until completion. /// /// Initialization flow: -/// - Node instance is created with access to the resource provider. -/// - Task constructors are added to the node. At this step, tasks are not created yet. -/// - Optionally, a healthcheck task constructor is also added. -/// - Once the `run` method is invoked, node -/// - attempts to create every task. If there are no tasks, or at least task -/// constructor fails, the node will return an error. -/// - initializes the healthcheck task if it's provided. +/// - Service instance is created with access to the resource provider. +/// - Wiring layers are added to the service. At this step, tasks are not created yet. +/// - Once the `run` method is invoked, service +/// - invokes a `wire` method on each added wiring layer. If any of the layers fails, +/// the service will return an error. If no layers have added a task, the service will +/// also return an error. /// - waits for any of the tasks to finish. /// - sends stop signal to all the tasks. /// - waits for the remaining tasks to finish. /// - calls `after_node_shutdown` hook for every task that has provided it. /// - returns the result of the task that has finished. -pub struct ZkSyncNode { +pub struct ZkStackService { /// Primary source of resources for tasks. resource_provider: Box, /// Cache of resources that have been requested at least by one task. resources: HashMap>, - /// List of task builders. - task_builders: Vec>, + /// List of wiring layers. + layers: Vec>, + /// Tasks added to the service. + tasks: Vec>, /// Sender used to stop the tasks. stop_sender: watch::Sender, @@ -42,13 +44,13 @@ pub struct ZkSyncNode { runtime: Runtime, } -impl fmt::Debug for ZkSyncNode { +impl fmt::Debug for ZkStackService { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ZkSyncNode").finish_non_exhaustive() } } -impl ZkSyncNode { +impl ZkStackService { pub fn new(resource_provider: R) -> anyhow::Result { if tokio::runtime::Handle::try_current().is_ok() { anyhow::bail!( @@ -64,7 +66,8 @@ impl ZkSyncNode { let self_ = Self { resource_provider: Box::new(resource_provider), resources: HashMap::default(), - task_builders: Vec::new(), + layers: Vec::new(), + tasks: Vec::new(), stop_sender, runtime, }; @@ -72,48 +75,32 @@ impl ZkSyncNode { Ok(self_) } - /// Adds a task to the node. - /// - /// The task is not created at this point, instead, the constructor is stored in the node - /// and will be invoked during [`ZkSyncNode::run`] method. Any error returned by the constructor - /// will prevent the node from starting and will be propagated by the [`ZkSyncNode::run`] method. - pub fn add_task(&mut self, builder: T) -> &mut Self { - self.task_builders.push(Box::new(builder)); + /// Adds a wiring layer. + /// During the [`run`](ZkStackService::run) call the service will invoke + /// `wire` method of every layer in the order they were added. + pub fn add_layer(&mut self, layer: T) -> &mut Self { + self.layers.push(Box::new(layer)); self } /// Runs the system. pub fn run(mut self) -> anyhow::Result<()> { // Initialize tasks. - let task_builders = std::mem::take(&mut self.task_builders); + let wiring_layers = std::mem::take(&mut self.layers); - let mut tasks = Vec::new(); - - let mut errors: Vec<(String, TaskInitError)> = Vec::new(); + let mut errors: Vec<(String, WiringError)> = Vec::new(); let runtime_handle = self.runtime.handle().clone(); - for task_builder in task_builders { - let name = task_builder.task_name().to_string(); - let task_result = - runtime_handle.block_on(task_builder.create(NodeContext::new(&mut self))); - let task = match task_result { - Ok(task) => task, - Err(err) => { - // We don't want to bail on the first error, since it'll provide worse DevEx: - // People likely want to fix as much problems as they can in one go, rather than have - // to fix them one by one. - errors.push((name, err)); - continue; - } - }; - let after_node_shutdown = task.after_node_shutdown(); - let task_future = Box::pin(task.run(self.stop_receiver())); - let task_repr = TaskRepr { - name, - task: Some(task_future), - after_node_shutdown, + for layer in wiring_layers { + let name = layer.layer_name().to_string(); + let task_result = runtime_handle.block_on(layer.wire(ServiceContext::new(&mut self))); + if let Err(err) = task_result { + // We don't want to bail on the first error, since it'll provide worse DevEx: + // People likely want to fix as much problems as they can in one go, rather than have + // to fix them one by one. + errors.push((name, err)); + continue; }; - tasks.push(task_repr); } // Report all the errors we've met during the init. @@ -124,6 +111,18 @@ impl ZkSyncNode { anyhow::bail!("One or more task weren't able to start"); } + let mut tasks = Vec::new(); + for task in std::mem::take(&mut self.tasks) { + let name = task.name().to_string(); + let after_node_shutdown = task.after_node_shutdown(); + let task_future = Box::pin(task.run(self.stop_receiver())); + let task_repr = TaskRepr { + name, + task: Some(task_future), + after_node_shutdown, + }; + tasks.push(task_repr); + } if tasks.is_empty() { anyhow::bail!("No tasks to run"); } diff --git a/core/lib/node/src/node/stop_receiver.rs b/core/node/node_framework/src/service/stop_receiver.rs similarity index 100% rename from core/lib/node/src/node/stop_receiver.rs rename to core/node/node_framework/src/service/stop_receiver.rs diff --git a/core/lib/node/src/task/mod.rs b/core/node/node_framework/src/task.rs similarity index 59% rename from core/lib/node/src/task/mod.rs rename to core/node/node_framework/src/task.rs index 43f332a0f4d9..0f0c45f7bc80 100644 --- a/core/lib/node/src/task/mod.rs +++ b/core/node/node_framework/src/task.rs @@ -1,36 +1,16 @@ //! Tasks define the "runnable" concept of the node, e.g. something that can be launched and runs until the node //! is stopped. -//! -//! Task is normally defined by two types, one implementing two traits [`IntoZkSyncTask`], which acts like a -//! constructor, and another one, which implements [`ZkSyncTask`], providing an interface for `ZkSyncNode` to -//! implement the task lifecycle. use futures::future::BoxFuture; -use crate::{ - node::{NodeContext, StopReceiver}, - resource::ResourceId, -}; +use crate::service::StopReceiver; -/// Factory that can create a task. +/// A task implementation. #[async_trait::async_trait] -pub trait IntoZkSyncTask: 'static + Send + Sync { +pub trait Task: 'static + Send + Sync { /// Unique name of the task. - fn task_name(&self) -> &'static str; - - /// Creates a new task. - /// - /// `NodeContext` provides an interface to the utilities that task may need, e.g. ability to get resources - /// or spawn additional tasks. - async fn create( - self: Box, - node: NodeContext<'_>, - ) -> Result, TaskInitError>; -} + fn name(&self) -> &'static str; -/// A task implementation. -#[async_trait::async_trait] -pub trait ZkSyncTask: 'static + Send + Sync { /// Runs the task. /// /// Once any of the task returns, the node will shutdown. @@ -45,7 +25,7 @@ pub trait ZkSyncTask: 'static + Send + Sync { async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()>; /// Asynchronous hook that will be called after *each task* has finished their cleanup. - /// It is guaranteed that no other task is running at this point, e.g. `ZkSyncNode` will invoke + /// It is guaranteed that no other task is running at this point, e.g. `ZkStackService` will invoke /// this hook sequentially for each task. /// /// This hook can be used to perform some cleanup that assumes exclusive access to the resources, e.g. @@ -58,13 +38,3 @@ pub trait ZkSyncTask: 'static + Send + Sync { None } } - -/// An error that can occur during the task initialization. -#[derive(thiserror::Error, Debug)] -#[non_exhaustive] -pub enum TaskInitError { - #[error("Resource {0} is not provided")] - ResourceLacking(ResourceId), - #[error(transparent)] - Internal(#[from] anyhow::Error), -} diff --git a/core/node/node_framework/src/wiring_layer.rs b/core/node/node_framework/src/wiring_layer.rs new file mode 100644 index 000000000000..6922b8850a26 --- /dev/null +++ b/core/node/node_framework/src/wiring_layer.rs @@ -0,0 +1,26 @@ +use crate::{resource::ResourceId, service::ServiceContext}; + +/// Wiring layer provides a way to customize the `ZkStackService` by +/// adding new tasks or resources to it. +/// +/// Structures that implement this trait are advised to specify in doc comments +/// which resources they use or add, and the list of tasks they add. +#[async_trait::async_trait] +pub trait WiringLayer: 'static + Send + Sync { + /// Identifier of the wiring layer. + fn layer_name(&self) -> &'static str; + + /// Performs the wiring process, e.g. adds tasks and resources to the node. + /// This method will be called once during the node initialization. + async fn wire(self: Box, node: ServiceContext<'_>) -> Result<(), WiringError>; +} + +/// An error that can occur during the wiring phase. +#[derive(thiserror::Error, Debug)] +#[non_exhaustive] +pub enum WiringError { + #[error("Resource {0} is not provided")] + ResourceLacking(ResourceId), + #[error(transparent)] + Internal(#[from] anyhow::Error), +}