From 2dd69d4277774407f6052b1e54f724b34845fedf Mon Sep 17 00:00:00 2001 From: Shahar Kaminsky Date: Mon, 29 Jan 2024 10:36:59 +0200 Subject: [PATCH] chore(cross checker): Cross The Checker (#856) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ This PR removes the cross checker binary and all its references. ## Why ❔ The checker's scope is covered by other components. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [ ] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. - [ ] Spellcheck has been run via `zk spellcheck`. --- .github/workflows/build-core-template.yml | 3 - .github/workflows/ci-core-reusable.yml | 3 - .github/workflows/ci.yml | 1 - Cargo.lock | 18 - Cargo.toml | 1 - .../cross_external_nodes_checker/Cargo.toml | 26 - .../cross_external_nodes_checker/README.md | 44 - .../src/checker.rs | 906 ------------------ .../src/config.rs | 171 ---- .../src/divergence.rs | 89 -- .../src/helpers.rs | 326 ------- .../cross_external_nodes_checker/src/main.rs | 65 -- .../src/pubsub_checker.rs | 311 ------ .../cross-external-nodes-checker/Dockerfile | 27 - etc/env/base/rust.toml | 1 - etc/env/ext-node-docker.toml | 1 - etc/env/ext-node.toml | 1 - infrastructure/zk/src/docker.ts | 2 - infrastructure/zk/src/run/run.ts | 110 --- 19 files changed, 2106 deletions(-) delete mode 100644 core/tests/cross_external_nodes_checker/Cargo.toml delete mode 100644 core/tests/cross_external_nodes_checker/README.md delete mode 100644 core/tests/cross_external_nodes_checker/src/checker.rs delete mode 100644 core/tests/cross_external_nodes_checker/src/config.rs delete mode 100644 core/tests/cross_external_nodes_checker/src/divergence.rs delete mode 100644 core/tests/cross_external_nodes_checker/src/helpers.rs delete mode 100644 core/tests/cross_external_nodes_checker/src/main.rs delete mode 100644 core/tests/cross_external_nodes_checker/src/pubsub_checker.rs delete mode 100644 docker/cross-external-nodes-checker/Dockerfile diff --git a/.github/workflows/build-core-template.yml b/.github/workflows/build-core-template.yml index 377279415b16..7853867e426f 100644 --- a/.github/workflows/build-core-template.yml +++ b/.github/workflows/build-core-template.yml @@ -30,7 +30,6 @@ jobs: - server-v2 - external-node - contract-verifier - - cross-external-nodes-checker - snapshots-creator platforms: - linux/amd64 @@ -109,8 +108,6 @@ jobs: platform: linux/amd64,linux/arm64 - name: contract-verifier platform: linux/amd64 - - name: cross-external-nodes-checker - platform: linux/amd64 - name: snapshots-creator platform: linux/amd64 env: diff --git a/.github/workflows/ci-core-reusable.yml b/.github/workflows/ci-core-reusable.yml index 3bbb1bbf5d2e..6a0fc7a07377 100644 --- a/.github/workflows/ci-core-reusable.yml +++ b/.github/workflows/ci-core-reusable.yml @@ -317,9 +317,6 @@ jobs: - name: Integration tests run: ci_run zk test i server --testPathIgnorePatterns 'contract-verification|custom-erc20-bridge|snapshots-creator' - - name: Run Cross EN Checker - run: ci_run zk run cross-en-checker - - name: Run revert test run: | ci_run zk env diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f15efa8a6a35..4acc990d0f3a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -49,7 +49,6 @@ jobs: - 'core/**' - '!core/CHANGELOG.md' - 'docker/contract-verifier/**' - - 'docker/cross-external-nodes-checker/**' - 'docker/external-node/**' - 'docker/server/**' - '.github/workflows/build-core-template.yml' diff --git a/Cargo.lock b/Cargo.lock index 7669c99d77bb..2dc03f5cc230 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1526,24 +1526,6 @@ dependencies = [ "itertools 0.10.5", ] -[[package]] -name = "cross_external_nodes_checker" -version = "0.1.0" -dependencies = [ - "anyhow", - "ctrlc", - "envy", - "futures 0.3.28", - "serde", - "serde_json", - "tokio", - "tracing", - "vlog", - "zksync_types", - "zksync_utils", - "zksync_web3_decl", -] - [[package]] name = "crossbeam" version = "0.7.3" diff --git a/Cargo.toml b/Cargo.toml index e377cee289c3..0481e5d96223 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,6 @@ members = [ # Test infrastructure "core/tests/test_account", - "core/tests/cross_external_nodes_checker", "core/tests/loadnext", "core/tests/vm-benchmark", "core/tests/vm-benchmark/harness", diff --git a/core/tests/cross_external_nodes_checker/Cargo.toml b/core/tests/cross_external_nodes_checker/Cargo.toml deleted file mode 100644 index 4f8285aef5a5..000000000000 --- a/core/tests/cross_external_nodes_checker/Cargo.toml +++ /dev/null @@ -1,26 +0,0 @@ -[package] -name = "cross_external_nodes_checker" -version = "0.1.0" -edition = "2018" -authors = ["The Matter Labs Team "] -homepage = "https://zksync.io/" -repository = "https://github.com/matter-labs/zksync-era" -license = "MIT OR Apache-2.0" -keywords = ["blockchain", "zksync"] -categories = ["cryptography"] -publish = false # We don't want to publish our binaries. - -[dependencies] -zksync_types = { path = "../../lib/types" } -zksync_web3_decl = { path = "../../lib/web3_decl" } -zksync_utils = { path = "../../lib/utils" } -vlog = { path = "../../lib/vlog" } -serde_json = "1.0" - -anyhow = "1.0" -tokio = { version = "1", features = ["time"] } -futures = "0.3" -envy = "0.4" -serde = { version = "1.0" } -ctrlc = { version = "3.1" } -tracing = "0.1" diff --git a/core/tests/cross_external_nodes_checker/README.md b/core/tests/cross_external_nodes_checker/README.md deleted file mode 100644 index 78c1fe48b40e..000000000000 --- a/core/tests/cross_external_nodes_checker/README.md +++ /dev/null @@ -1,44 +0,0 @@ -# zkSync Cross External Nodes Consistency Checker - -This tool is used to check the consistency of external node instances against the main node. The tool has two main -checkers: - -1. RPC Checker, which checks the consistency of the RPC API of the external node against the main node. -2. PubSub Checker, which checks the consistency of the PubSub API of the external node against the main node. - -Without any arguments, the tool will run both checkers. The RPC Checker will run in Triggered mode, checking all -available blocks, and the PubSub Checker will run for as long as the RPC Checker is working. - -Do note that for the PubSub Checker to properly check the consistency between the nodes, enough time needs to pass. That -is because the PubSub clients may start out of sync. Minimal recommended amount of time for the PubSub Checker is 80 -seconds, which would guarantee at least 20 miniblocks checked. - -## Running locally - -Run the server - -``` -zk init -zk server --components api,tree,eth,state_keeper -``` - -Run the EN - -``` -zk env ext-node -zk clean --database -zk db setup -zk external-node -``` - -Run integration tests to populate the main node with data. - -``` -zk test i server -``` - -Run the checker - -``` -zk run cross-en-checker -``` diff --git a/core/tests/cross_external_nodes_checker/src/checker.rs b/core/tests/cross_external_nodes_checker/src/checker.rs deleted file mode 100644 index 0ddd179c2667..000000000000 --- a/core/tests/cross_external_nodes_checker/src/checker.rs +++ /dev/null @@ -1,906 +0,0 @@ -use std::{ - cmp::Ordering::{Equal, Greater, Less}, - collections::HashMap, - fmt::Debug, - time::Duration, -}; - -use serde_json::Value; -use tokio::{sync::watch::Receiver, time::sleep}; -use zksync_types::{ - api::{BlockDetails, BlockNumber, L1BatchDetails}, - web3::types::U64, - L1BatchNumber, MiniblockNumber, H256, -}; -use zksync_utils::wait_for_tasks::wait_for_tasks; -use zksync_web3_decl::{ - jsonrpsee::{ - core::ClientError, - http_client::{HttpClient, HttpClientBuilder}, - }, - namespaces::{EnNamespaceClient, EthNamespaceClient, ZksNamespaceClient}, - types::FilterBuilder, - RpcResult, -}; - -use crate::{ - config::{CheckerConfig, RpcMode}, - divergence::{Divergence, DivergenceDetails}, - helpers::compare_json, -}; - -#[derive(Debug, Clone)] -pub struct Checker { - /// 'Triggered' to run once. 'Continuous' to run forever. - mode: RpcMode, - /// Client for interacting with the main node. - main_node_client: HttpClient, - /// Client for interacting with the instance nodes. - instance_clients: Vec, - /// Check all miniblocks starting from this. If 'None' then check from genesis. Inclusive. - start_miniblock: Option, - /// For Triggered mode. If 'None' then check all available miniblocks. Inclusive. - finish_miniblock: Option, - /// In seconds, how often to poll the instance node for new miniblocks. - instance_poll_period: u64, - /// Maps instance URL to a list of its divergences. - divergences: HashMap>, - /// How often should blocks logs be checked. - log_check_interval: u32, - /// Next batch number to check for each instance. - next_batch_to_check: HashMap, - /// The maximum number of transactions to be checked at random in each miniblock. - max_transactions_to_check: Option, -} - -#[derive(Debug, Clone)] -pub struct InstanceHttpClient { - pub url: String, - pub client: HttpClient, -} - -impl Checker { - pub fn new(config: &CheckerConfig) -> Self { - let (main_node_client, instance_clients) = Self::setup_clients( - config - .main_node_http_url - .clone() - .expect("An RPC URL for the main node has to be provided for RPC mode."), - config - .instances_http_urls - .clone() - .expect("RPC URLs for the EN instances have to be provided for RPC mode."), - ); - - let last_checked_batch = instance_clients - .iter() - .map(|instance| (instance.url.clone(), L1BatchNumber(0))) - .collect(); - - let mode = config - .rpc_mode - .expect("The RPC Checker has to be provided an RPC mode"); - - Self { - mode, - main_node_client, - instance_clients, - start_miniblock: config.start_miniblock.map(|n| n.into()), - finish_miniblock: config.finish_miniblock.map(|n| n.into()), - instance_poll_period: config.instance_poll_period.unwrap_or(10), - divergences: HashMap::new(), - log_check_interval: 1, // TODO (BFT-192): make configurable if we want to keep it. - next_batch_to_check: last_checked_batch, - max_transactions_to_check: config.max_transactions_to_check, - } - } - - // Set up clients for the main node and all EN instances we want to check. - fn setup_clients( - main_node_url: String, - instances_urls: Vec, - ) -> (HttpClient, Vec) { - let main_node_client = HttpClientBuilder::default() - .build(main_node_url) - .expect("Failed to create an HTTP client for the main node"); - - let mut instance_clients: Vec = Vec::new(); - for url in instances_urls { - let client = HttpClientBuilder::default() - .build(url.clone()) - .expect("Failed to create an HTTP client for an instance of the external node"); - instance_clients.push(InstanceHttpClient { url, client }); - } - - (main_node_client, instance_clients) - } - - pub async fn run(mut self, stop_receiver: Receiver) -> anyhow::Result<()> { - match self.mode { - RpcMode::Triggered => { - tracing::info!("Starting Checker in Triggered mode"); - if let Err(e) = self.run_triggered().await { - self.log_divergences(); - tracing::error!("Error running in Triggered mode: {:?}", e); - } - // Ensure CI will fail if any divergences were found. - assert!(self.divergences.is_empty(), "Divergences found"); - } - RpcMode::Continuous => { - tracing::info!("Starting Checker in Continuous mode"); - if let Err(e) = self.run_continuous(stop_receiver).await { - tracing::error!("Error running in Continuous mode: {:?}", e); - } - } - } - Ok(()) - } - - // For each instance, spawn a task that will continuously poll the instance for new miniblocks - // and compare them with corresponding main node miniblocks. - // - // Errors in task loops exist the loop, stop the tasks, and cause all other tasks to exit too. - async fn run_continuous(&mut self, mut stop_receiver: Receiver) -> RpcResult<()> { - let mut join_handles = Vec::new(); - - for instance in &self.instance_clients { - let main_node_client = self.main_node_client.clone(); - let instance_client = instance.clone(); - let instance_stop_receiver = stop_receiver.clone(); - let mut checker = self.clone(); - - let handle = tokio::spawn(async move { - tracing::info!("Started a task to check instance {}", instance_client.url); - if let Err(e) = checker.run_node_level_checkers(&instance_client).await { - tracing::error!("Error checking instance {}: {:?}", instance_client.url, e); - }; - let mut next_block_to_check = checker.start_miniblock.unwrap_or(MiniblockNumber(0)); - - // - Get the next block the instance has to be checked. - // - Get the corresponding block from the main node. - // - Run the checkers through the blocks. - // - Maybe check batches. - loop { - tracing::debug!( - "entered loop to check miniblock #({}) for instance: {}", - next_block_to_check, - instance_client.url - ); - - if *instance_stop_receiver.borrow() { - break; - } - - let instance_miniblock = match instance_client - .client - .get_block_details(next_block_to_check) - .await - { - Ok(Some(miniblock)) => miniblock, - Ok(None) => { - tracing::debug!( - "No miniblock found for miniblock #({}). Sleeping for {} seconds", - next_block_to_check, - checker.instance_poll_period - ); - // The instance doesn't have a next block to check yet. For now, we wait until it does. - // TODO(BFT-165): Implement miniblock existence divergence checker. - sleep(Duration::from_secs(checker.instance_poll_period)).await; - continue; - } - Err(e) => { - tracing::error!( - "Error getting miniblock #({}) from instance: {}: {:?}", - next_block_to_check, - instance_client.url, - e - ); - break; - } - }; - - let main_node_miniblock = match main_node_client - .get_block_details(next_block_to_check) - .await - { - Ok(Some(miniblock)) => miniblock, - Ok(None) => { - tracing::error!( - "Miniblock #({}), which exists in external node instance {}, was not found in the main node", - next_block_to_check, instance_client.url - ); - break; - } - Err(e) => { - tracing::error!("Error getting miniblock from main node while checking instance {}: {:?}", instance_client.url, e); - break; - } - }; - - let main_node_miniblock_txs = match checker - .create_tx_map(&main_node_client, main_node_miniblock.number) - .await - { - Ok(tx_map) => tx_map, - Err(e) => { - tracing::error!("Error creating tx map for main node miniblock while checking instance {}: {}", instance_client.url, e); - break; - } - }; - - match checker - .compare_miniblocks( - &instance_client, - &main_node_miniblock_txs, - &main_node_miniblock, - &instance_miniblock, - ) - .await - { - Ok(_) => { - tracing::info!( - "successfully checked miniblock #({}) for instance: {}", - next_block_to_check, - instance_client.url - ); - } - Err(e) => { - tracing::error!( - "Error checking miniblock #({}) for instance {}: {:?}. Skipping this miniblock", - next_block_to_check, - instance_client.url, - e - ); - } - } - next_block_to_check += 1; - - if let Err(e) = checker - .maybe_check_batches(&instance_client, instance_miniblock.l1_batch_number) - .await - { - tracing::error!( - "Error comparing batch {} for instance {}: {:?}", - instance_miniblock.l1_batch_number, - instance_client.url, - e - ); - } - } - Ok(()) - }); - join_handles.push(handle); - } - - // Wait for either all tasks to finish or a stop signal. - tokio::select! { - _ = wait_for_tasks(join_handles, None, None::>, false) => {}, - _ = stop_receiver.changed() => { - tracing::info!("Stop signal received, shutting down"); - }, - } - - Ok(()) - } - - // Iterate through all miniblocks to be checked. For each, run the checkers through every given instance. - async fn run_triggered(&mut self) -> RpcResult<()> { - let start_miniblock = self.start_miniblock.unwrap_or(MiniblockNumber(0)); - let finish_miniblock = match self.finish_miniblock { - Some(finish_miniblock) => finish_miniblock, - None => { - let highest_main_node_miniblock = self.main_node_client.get_block_number().await?; - MiniblockNumber(highest_main_node_miniblock.as_u32()) - } - }; - - for instance_client in self.instance_clients.clone() { - self.run_node_level_checkers(&instance_client).await?; - } - - for miniblock_num_to_check in start_miniblock.0..=finish_miniblock.0 { - let main_node_miniblock = match self - .main_node_client - .get_block_details(MiniblockNumber(miniblock_num_to_check)) - .await - { - Ok(Some(miniblock)) => miniblock, - Ok(None) => panic!("No miniblock found for existing miniblock number {:?}", miniblock_num_to_check), - Err(e) => panic!("Couldn't fetch existing main node miniblock header for miniblock {:?} due to error: {:?}", miniblock_num_to_check, e), - }; - - let main_node_miniblock_txs = self - .create_tx_map(&self.main_node_client, main_node_miniblock.number) - .await?; - - for instance_client in self.instance_clients.clone() { - let instance_miniblock = match instance_client - .client - .get_block_details(MiniblockNumber(miniblock_num_to_check)) - .await? - { - Some(miniblock) => miniblock, - None => { - // TODO(BFT-165): Implement Miniblock Existence Checker - tracing::warn!( - "No miniblock found for miniblock #({}) in instance {}. skipping checking it for now.", - miniblock_num_to_check, - instance_client.url - ); - continue; - } - }; - - self.compare_miniblocks( - &instance_client, - &main_node_miniblock_txs, - &main_node_miniblock, - &instance_miniblock, - ) - .await?; - - self.maybe_check_batches(&instance_client, main_node_miniblock.l1_batch_number) - .await?; - - tracing::info!( - "successfully checked miniblock #({}) for instance: {}", - miniblock_num_to_check, - instance_client.url - ); - } - } - - self.log_divergences(); - - Ok(()) - } - - async fn maybe_check_batches( - &mut self, - instance_client: &InstanceHttpClient, - miniblock_batch_number: L1BatchNumber, - ) -> RpcResult<()> { - let instance_batch_to_check = self - .next_batch_to_check - .get(instance_client.url.as_str()) - .expect("All instance URLs must exists in next_batch_to_check"); - tracing::debug!("Maybe checking batch {}", miniblock_batch_number); - - // We should check batches only the first time we encounter them per instance - // (i.e., `next_instance_batch_to_check == miniblock_batch_number`) - match instance_batch_to_check.cmp(&miniblock_batch_number) { - Greater => return Ok(()), // This batch has already been checked. - Less => { - // Either somehow a batch wasn't checked or a non-genesis miniblock was set as the start - // miniblock. In the latter case, update the `next_batch_to_check` map and check the batch. - if self.start_miniblock == Some(MiniblockNumber(0)) { - return Err(ClientError::Custom(format!( - "the next batch number to check (#{}) is less than current miniblock batch number (#{}) for instance {}", - instance_batch_to_check, - miniblock_batch_number, - instance_client.url - ))); - } - *self - .next_batch_to_check - .get_mut(instance_client.url.as_str()) - .unwrap() = miniblock_batch_number; - } - Equal => {} - } - - let main_node_batch = match self - .main_node_client - .get_l1_batch_details(miniblock_batch_number) - .await - { - Ok(Some(batch)) => batch, - Ok(None) => panic!( - "No batch found for existing batch with batch number {}", - miniblock_batch_number - ), - Err(e) => panic!( - "Couldn't fetch existing main node batch for batch number {} due to error: {:?}", - miniblock_batch_number, e - ), - }; - - let instance_batch = match instance_client - .client - .get_l1_batch_details(miniblock_batch_number) - .await? - { - Some(batch) => batch, - None => { - // TODO(BFT-165): Implement batch existence checker. - tracing::warn!( - "No batch found for batch #({}) in instance {}. skipping checking it for now.", - miniblock_batch_number, - instance_client.url - ); - return Ok(()); - } - }; - - self.check_batch_details(main_node_batch, instance_batch, &instance_client.url); - - *self - .next_batch_to_check - .get_mut(instance_client.url.as_str()) - .unwrap() += 1; - - Ok(()) - } - - // Check divergences using all checkers for every given pair of miniblocks. - async fn compare_miniblocks( - &mut self, - instance_client: &InstanceHttpClient, - main_node_tx_map: &HashMap, - main_node_miniblock: &BlockDetails, - instance_miniblock: &BlockDetails, - ) -> RpcResult<()> { - self.check_miniblock_details( - &instance_client.url, - main_node_miniblock, - instance_miniblock, - ); - - // Also checks tx receipts and tx details - self.check_transactions(main_node_tx_map, instance_miniblock, instance_client) - .await?; - - self.check_logs(instance_client, main_node_miniblock.number) - .await?; - - Ok(()) - } - - // Run all the checkers that ought to be run once per instance (the non block-dependent checkers.) - async fn run_node_level_checkers( - &mut self, - instance_client: &InstanceHttpClient, - ) -> RpcResult<()> { - self.check_chain_id(instance_client).await?; - self.check_main_contract(instance_client).await?; - self.check_bridge_contracts(instance_client).await?; - self.check_l1_chain_id(instance_client).await?; - Ok(()) - } - - // Add a divergence in Triggered mode; log it in Continuous mode. - fn communicate_divergence(&mut self, url: &str, divergence: Divergence) { - match self.mode { - RpcMode::Triggered => { - // Add a divergence to the list of divergences for the given EN instance. - let divergences = self.divergences.entry(url.to_string()).or_default(); - divergences.push(divergence.clone()); - tracing::error!("{}", divergence); - } - RpcMode::Continuous => { - // Simply log for now. TODO(BFT-177): Add grafana metrics. - tracing::error!("{}", divergence); - } - } - } - - // Create a mapping from the tx hash to a json representation of the tx. - async fn create_tx_map( - &self, - client: &HttpClient, - miniblock_num: MiniblockNumber, - ) -> RpcResult> { - let txs = client - .sync_l2_block(miniblock_num, true) - .await? - .and_then(|block| block.transactions) - .unwrap_or_default(); - - let mut tx_map = HashMap::new(); - for tx in txs { - tx_map.insert( - tx.hash(), - serde_json::to_value(tx).expect("tx serialization fail"), - ); - } - - Ok(tx_map) - } - - fn log_divergences(&mut self) { - if self.divergences.is_empty() { - tracing::info!("No divergences found"); - return; - } - for (url, divergences) in &self.divergences { - tracing::error!("Divergences found for URL: {}", url); - for divergence in divergences { - tracing::error!("{}", divergence); - } - } - } -} - -// Separate impl for the checkers. -impl Checker { - fn check_batch_details( - &mut self, - main_node_batch: L1BatchDetails, - instance_batch: L1BatchDetails, - instance_url: &str, - ) { - tracing::debug!( - "Checking batch details for batch #({})", - main_node_batch.number - ); - let batch_differences = compare_json(&main_node_batch, &instance_batch, "".to_string()); - for (key, (main_node_val, instance_val)) in batch_differences { - self.communicate_divergence( - instance_url, - Divergence::BatchDetails(DivergenceDetails { - en_instance_url: instance_url.to_string(), - main_node_value: Some(format!("{}: {:?}", key, main_node_val)), - en_instance_value: Some(format!("{}: {:?}", key, instance_val)), - entity_id: Some(format!("Batch Number: {}", main_node_batch.number)), - miniblock_number: None, - }), - ); - } - } - - // TODO: What if when we checked the miniblock when the status was Sealed but not Verified? - fn check_miniblock_details( - &mut self, - instance_url: &str, - main_node_miniblock: &BlockDetails, - instance_miniblock: &BlockDetails, - ) { - tracing::debug!( - "Checking miniblock details for miniblock #({})", - main_node_miniblock.number - ); - let details_differences = - compare_json(main_node_miniblock, instance_miniblock, "".to_string()); - for (key, (main_node_val, instance_val)) in details_differences { - self.communicate_divergence( - instance_url, - Divergence::MiniblockDetails(DivergenceDetails { - en_instance_url: instance_url.to_string(), - main_node_value: Some(format!("{}: {:?}", key, main_node_val)), - en_instance_value: Some(format!("{}: {:?}", key, instance_val)), - entity_id: None, - miniblock_number: Some(main_node_miniblock.number), - }), - ); - } - } - - // Looks for txs existing in one node's miniblock and not the other, for - // discrepancies in the content of txs, and runs the individual transaction checkers. - async fn check_transactions( - &mut self, - main_node_tx_map: &HashMap, - instance_miniblock: &BlockDetails, - instance_client: &InstanceHttpClient, - ) -> RpcResult<()> { - let mut instance_tx_map = self - .create_tx_map(&instance_client.client, instance_miniblock.number) - .await?; - - tracing::debug!( - "Checking transactions for miniblock #({}) that has {} transactions", - instance_miniblock.number, - instance_tx_map.len(), - ); - - for (i, (tx_hash, main_node_tx)) in main_node_tx_map.iter().enumerate() { - if let Some(max_num) = self.max_transactions_to_check { - if i >= max_num { - return Ok(()); - } - } - match instance_tx_map.remove(tx_hash) { - Some(instance_tx) => { - if *main_node_tx != instance_tx { - let tx_differences = - compare_json(main_node_tx, &instance_tx, "".to_string()); - for (key, (main_node_val, instance_val)) in tx_differences { - self.communicate_divergence( - &instance_client.url, - Divergence::Transaction(DivergenceDetails { - en_instance_url: instance_client.url.to_string(), - main_node_value: Some(format!("{}: {:?}", key, main_node_val)), - en_instance_value: Some(format!("{}: {:?}", key, instance_val)), - entity_id: Some(format!("Tx Hash: {}", tx_hash)), - miniblock_number: Some(instance_miniblock.number), - }), - ); - } - } else { - self.check_transaction_receipt( - instance_client, - tx_hash, - instance_miniblock.number, - ) - .await?; - - self.check_transaction_details( - instance_client, - tx_hash, - instance_miniblock.number, - ) - .await?; - } - } - None => { - self.communicate_divergence( - &instance_client.url, - Divergence::Transaction(DivergenceDetails { - en_instance_url: instance_client.url.to_string(), - main_node_value: Some(tx_hash.to_string()), - en_instance_value: None, - entity_id: Some(format!("Tx Hash: {}", tx_hash)), - miniblock_number: Some(instance_miniblock.number), - }), - ); - tracing::debug!( - "Added divergence for a tx that is in main node but not in instance: {:?}", - tx_hash - ); - } - } - } - - // If there are txs left in the instance tx map, then they don't exist in the main node. - for tx_hash in instance_tx_map.keys() { - self.communicate_divergence( - &instance_client.url, - Divergence::Transaction(DivergenceDetails { - en_instance_url: instance_client.url.to_string(), - main_node_value: None, - en_instance_value: Some(tx_hash.to_string()), - entity_id: Some(format!("Tx Hash: {}", tx_hash)), - miniblock_number: Some(instance_miniblock.number), - }), - ); - tracing::debug!( - "Added divergence for a tx that is in instance but not in main node: {:?}", - tx_hash - ); - } - - Ok(()) - } - - async fn check_transaction_receipt( - &mut self, - instance_client: &InstanceHttpClient, - tx_hash: &H256, - miniblock_number: MiniblockNumber, - ) -> RpcResult<()> { - tracing::debug!( - "Checking receipts for a tx in miniblock {}", - miniblock_number - ); - - let main_node_receipt = self - .main_node_client - .get_transaction_receipt(*tx_hash) - .await?; - let instance_receipt = instance_client - .client - .get_transaction_receipt(*tx_hash) - .await?; - - let receipt_differences = - compare_json(&main_node_receipt, &instance_receipt, "".to_string()); - for (key, (main_node_val, instance_val)) in receipt_differences { - self.communicate_divergence( - &instance_client.url, - Divergence::TransactionReceipt(DivergenceDetails { - en_instance_url: instance_client.url.to_string(), - main_node_value: Some(format!("{}: {:?}", key, main_node_val)), - en_instance_value: Some(format!("{}: {:?}", key, instance_val)), - entity_id: Some(format!("Tx Hash: {}", tx_hash)), - miniblock_number: Some(miniblock_number), - }), - ); - } - - Ok(()) - } - - async fn check_transaction_details( - &mut self, - instance_client: &InstanceHttpClient, - tx_hash: &H256, - miniblock_number: MiniblockNumber, - ) -> RpcResult<()> { - tracing::debug!( - "Checking transaction details for a tx in miniblock {}", - miniblock_number - ); - - let main_node_tx_details = self - .main_node_client - .get_transaction_details(*tx_hash) - .await?; - let instance_tx_details = instance_client - .client - .get_transaction_details(*tx_hash) - .await?; - - let tx_details_differences = - compare_json(&main_node_tx_details, &instance_tx_details, "".to_string()); - for (key, (main_node_val, instance_val)) in tx_details_differences { - self.communicate_divergence( - &instance_client.url, - Divergence::TransactionDetails(DivergenceDetails { - en_instance_url: instance_client.url.to_string(), - main_node_value: Some(format!("{}: {:?}", key, main_node_val)), - en_instance_value: Some(format!("{}: {:?}", key, instance_val)), - entity_id: Some(format!("Tx Hash: {}", tx_hash)), - miniblock_number: Some(miniblock_number), - }), - ); - } - - Ok(()) - } - - async fn check_logs( - &mut self, - instance_client: &InstanceHttpClient, - current_miniblock_block_num: MiniblockNumber, - ) -> RpcResult<()> { - let from_block = current_miniblock_block_num - .0 - .checked_sub(self.log_check_interval); - let to_block = current_miniblock_block_num.0; - - if from_block < Some(0) || to_block % self.log_check_interval != 0 { - tracing::debug!("Skipping log check for miniblock {}", to_block); - return Ok(()); - } - tracing::debug!( - "Checking logs for miniblocks {}-{}", - from_block.unwrap(), - to_block - 1 - ); - - let filter = FilterBuilder::default() - .set_from_block(BlockNumber::Number(U64::from(from_block.unwrap()))) - .set_to_block(BlockNumber::Number(U64::from(&to_block - 1))) - .build(); - - let main_node_logs = match self.main_node_client.get_logs(filter.clone()).await { - Ok(logs) => logs, - Err(e) => { - // TODO(BFT-192): Be more specific with checking logs - tracing::error!("Failed to get logs from main node: {}", e); - return Ok(()); - } - }; - let instance_logs = match instance_client.client.get_logs(filter).await { - Ok(logs) => logs, - Err(e) => { - // TODO(BFT-192): Be more specific with checking logs - tracing::error!("Failed to get logs from instance: {}", e); - return Ok(()); - } - }; - - for (main_node_log, instance_log) in main_node_logs.iter().zip(instance_logs.iter()) { - let log_differences = compare_json(&main_node_log, &instance_log, "".to_string()); - for (key, (main_node_val, instance_val)) in log_differences { - self.communicate_divergence( - &instance_client.url, - Divergence::Log(DivergenceDetails { - en_instance_url: instance_client.url.to_string(), - main_node_value: Some(format!("{}: {:?}", key, main_node_val)), - en_instance_value: Some(format!("{}: {:?}", key, instance_val)), - entity_id: None, - miniblock_number: Some(MiniblockNumber( - main_node_log.block_number.unwrap().as_u32(), - )), - }), - ); - } - } - - Ok(()) - } - - async fn check_main_contract(&mut self, instance_client: &InstanceHttpClient) -> RpcResult<()> { - let main_node_main_contract = self.main_node_client.get_main_contract().await?; - let instance_main_contract = instance_client.client.get_main_contract().await?; - - let contract_differences = compare_json( - &main_node_main_contract, - &instance_main_contract, - "".to_string(), - ); - for (key, (main_node_val, instance_val)) in contract_differences { - self.communicate_divergence( - &instance_client.url, - Divergence::MainContracts(DivergenceDetails { - en_instance_url: instance_client.url.to_string(), - main_node_value: Some(format!("{} {:?}", key, main_node_val)), - en_instance_value: Some(format!("{} {:?}", key, instance_val)), - entity_id: None, - miniblock_number: None, - }), - ); - } - - Ok(()) - } - - async fn check_chain_id(&mut self, instance_client: &InstanceHttpClient) -> RpcResult<()> { - let main_node_chain_id = self.main_node_client.chain_id().await?; - let instance_chain_id = instance_client.client.chain_id().await?; - - if main_node_chain_id != instance_chain_id { - self.communicate_divergence( - &instance_client.url, - Divergence::ChainID(DivergenceDetails { - en_instance_url: instance_client.url.to_string(), - main_node_value: Some(main_node_chain_id), - en_instance_value: Some(instance_chain_id), - entity_id: None, - miniblock_number: None, - }), - ); - } - - Ok(()) - } - - async fn check_l1_chain_id(&mut self, instance_client: &InstanceHttpClient) -> RpcResult<()> { - let main_node_chain_id = self.main_node_client.l1_chain_id().await?; - let instance_chain_id = instance_client.client.l1_chain_id().await?; - - if main_node_chain_id != instance_chain_id { - self.communicate_divergence( - &instance_client.url, - Divergence::L1ChainID(DivergenceDetails { - en_instance_url: instance_client.url.to_string(), - main_node_value: Some(main_node_chain_id), - en_instance_value: Some(instance_chain_id), - entity_id: None, - miniblock_number: None, - }), - ); - } - - Ok(()) - } - - async fn check_bridge_contracts( - &mut self, - instance_client: &InstanceHttpClient, - ) -> RpcResult<()> { - let main_node_bridge_contracts = self.main_node_client.get_bridge_contracts().await?; - let instance_bridge_contracts = instance_client.client.get_bridge_contracts().await?; - - let receipt_differences = compare_json( - &main_node_bridge_contracts, - &instance_bridge_contracts, - "".to_string(), - ); - for (key, (main_node_val, instance_val)) in receipt_differences { - self.communicate_divergence( - &instance_client.url, - Divergence::BridgeContracts(DivergenceDetails { - en_instance_url: instance_client.url.to_string(), - main_node_value: Some(format!("{}: {:?}", key, main_node_val)), - en_instance_value: Some(format!("{}: {:?}", key, instance_val)), - entity_id: None, - miniblock_number: None, - }), - ); - } - - Ok(()) - } -} diff --git a/core/tests/cross_external_nodes_checker/src/config.rs b/core/tests/cross_external_nodes_checker/src/config.rs deleted file mode 100644 index 636a4fd9ae58..000000000000 --- a/core/tests/cross_external_nodes_checker/src/config.rs +++ /dev/null @@ -1,171 +0,0 @@ -use envy::prefixed; -use serde::Deserialize; - -#[derive(Debug, Deserialize, PartialEq)] -pub struct CheckerConfig { - #[serde(default = "default_mode")] - pub mode: Mode, - - #[serde(default = "default_rpc_mode")] - pub rpc_mode: Option, - - #[serde(default = "default_start_miniblock")] - pub start_miniblock: Option, - - #[serde(default = "default_finish_miniblock")] - pub finish_miniblock: Option, - - #[serde(default = "default_main_node_http_url")] - pub main_node_http_url: Option, - - #[serde(default = "default_instances_http_urls")] - pub instances_http_urls: Option>, - - #[serde(default = "default_main_node_ws_url")] - pub main_node_ws_url: Option, - - #[serde(default = "default_instances_ws_urls")] - pub instances_ws_urls: Option>, - - #[serde(default = "default_max_transactions_to_check")] - pub max_transactions_to_check: Option, - - #[serde(default = "default_instance_poll_period")] - pub instance_poll_period: Option, - - #[serde(default = "default_subscription_duration")] - pub subscription_duration: Option, -} - -#[derive(Copy, Clone, Debug, Deserialize, PartialEq)] -pub enum Mode { - Rpc, - PubSub, - All, -} - -impl Mode { - pub fn run_rpc(&self) -> bool { - matches!(self, Mode::Rpc | Mode::All) - } - - pub fn run_pubsub(&self) -> bool { - matches!(self, Mode::PubSub | Mode::All) - } -} - -#[derive(Copy, Clone, Debug, Deserialize, PartialEq)] -pub enum RpcMode { - Triggered, - Continuous, -} - -impl CheckerConfig { - pub fn from_env() -> Self { - prefixed("CHECKER_") - .from_env() - .unwrap_or_else(|err| panic!("Failed to load the checker config with error: {}", err)) - } -} - -// Default functions for each of the fields - -fn default_mode() -> Mode { - Mode::All -} - -fn default_rpc_mode() -> Option { - Some(RpcMode::Triggered) -} - -fn default_start_miniblock() -> Option { - None -} - -fn default_finish_miniblock() -> Option { - None -} - -fn default_main_node_http_url() -> Option { - Some("http://127.0.0.1:3050".to_string()) -} - -fn default_instances_http_urls() -> Option> { - Some(vec!["http://127.0.0.1:3060".to_string()]) -} - -fn default_main_node_ws_url() -> Option { - Some("ws://127.0.0.1:3051".to_string()) -} - -fn default_instances_ws_urls() -> Option> { - Some(vec!["ws://127.0.0.1:3061".to_string()]) -} - -fn default_max_transactions_to_check() -> Option { - Some(3) -} - -fn default_instance_poll_period() -> Option { - Some(10) -} - -fn default_subscription_duration() -> Option { - None -} - -#[cfg(test)] -mod tests { - use std::env; - - use super::*; - - #[test] - fn success() { - let config = r#" - CHECKER_MODE="Rpc" - CHECKER_RPC_MODE="Continuous" - CHECKER_START_MINIBLOCK="2" - CHECKER_FINISH_MINIBLOCK="4" - CHECKER_MAIN_NODE_HTTP_URL="http://127.0.0.1:1020" - CHECKER_INSTANCES_HTTP_URLS="http://127.0.0.1:1030,http://127.0.0.1:1020" - CHECKER_INSTANCE_POLL_PERIOD="60" - CHECKER_MAX_TRANSACTIONS_TO_CHECK="10" - CHECKER_SUBSCRIPTION_DURATION="120" - "#; - - set_env(config); - - let actual = CheckerConfig::from_env(); - let want = CheckerConfig { - mode: Mode::Rpc, - rpc_mode: Some(RpcMode::Continuous), - start_miniblock: Some(2), - finish_miniblock: Some(4), - main_node_http_url: Some("http://127.0.0.1:1020".into()), - instances_http_urls: Some(vec![ - "http://127.0.0.1:1030".into(), - "http://127.0.0.1:1020".into(), - ]), - main_node_ws_url: Some("ws://127.0.0.1:3051".into()), - instances_ws_urls: Some(vec!["ws://127.0.0.1:3061".into()]), - instance_poll_period: Some(60), - max_transactions_to_check: Some(10), - subscription_duration: Some(120), - }; - assert_eq!(actual, want); - } - - pub fn set_env(fixture: &str) { - for line in fixture.split('\n').map(str::trim) { - if line.is_empty() { - continue; - } - let elements: Vec<_> = line.split('=').collect(); - let variable_name = elements[0]; - let variable_value = elements[1].trim_matches('"'); - - env::set_var(variable_name, variable_value); - } - } -} diff --git a/core/tests/cross_external_nodes_checker/src/divergence.rs b/core/tests/cross_external_nodes_checker/src/divergence.rs deleted file mode 100644 index 18c910349f79..000000000000 --- a/core/tests/cross_external_nodes_checker/src/divergence.rs +++ /dev/null @@ -1,89 +0,0 @@ -use std::fmt; - -use zksync_types::{web3::types::U64, MiniblockNumber}; - -#[derive(Debug, Clone)] -pub(crate) enum Divergence { - BatchDetails(DivergenceDetails>), - MiniblockDetails(DivergenceDetails>), - Transaction(DivergenceDetails>), - TransactionReceipt(DivergenceDetails>), - TransactionDetails(DivergenceDetails>), - Log(DivergenceDetails>), - MainContracts(DivergenceDetails>), - BridgeContracts(DivergenceDetails>), - ChainID(DivergenceDetails>), - L1ChainID(DivergenceDetails>), - PubSubHeader(DivergenceDetails>), -} - -impl fmt::Display for Divergence { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - Divergence::BatchDetails(details) => { - write!(f, "Batch Details divergence found: {}", details) - } - Divergence::MiniblockDetails(details) => { - write!(f, "Miniblock Details divergence found: {}", details) - } - Divergence::Transaction(details) => { - write!(f, "Transaction divergence found: {}", details) - } - Divergence::TransactionReceipt(details) => { - write!(f, "TransactionReceipt divergence found: {}", details) - } - Divergence::TransactionDetails(details) => { - write!(f, "TransactionDetails divergence found: {}", details) - } - Divergence::Log(details) => write!(f, "Log divergence found: {}", details), - Divergence::MainContracts(details) => { - write!(f, "MainContracts divergence found: {}", details) - } - Divergence::BridgeContracts(details) => { - write!(f, "BridgeContracts divergence found: {}", details) - } - Divergence::ChainID(details) => write!(f, "ChainID divergence found: {}", details), - Divergence::L1ChainID(details) => { - write!(f, "L1ChainID divergence found: {}", details) - } - Divergence::PubSubHeader(details) => { - write!(f, "PubSubHeader divergence found: {}", details) - } - } - } -} - -#[derive(Debug, Clone)] -pub(crate) struct DivergenceDetails { - pub(crate) en_instance_url: String, - pub(crate) main_node_value: T, - pub(crate) en_instance_value: T, - pub(crate) entity_id: Option, - pub(crate) miniblock_number: Option, -} - -impl fmt::Display for DivergenceDetails> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let main_node_value = match &self.main_node_value { - Some(value) => format!("{}", value), - None => String::from("None"), - }; - let en_instance_value = match &self.en_instance_value { - Some(value) => format!("{}", value), - None => String::from("None"), - }; - let entity_info = match self.entity_id { - Some(ref entity_id) => format!(", Entity ID: {}", entity_id), - None => String::from(""), - }; - let miniblock_number = match self.miniblock_number { - Some(ref number) => format!(", Miniblock number: {}", number), - None => String::from(""), - }; - write!( - f, - "Main node value: {}, EN instance value: {}{} in EN instance: {}{}", - main_node_value, en_instance_value, miniblock_number, self.en_instance_url, entity_info - ) - } -} diff --git a/core/tests/cross_external_nodes_checker/src/helpers.rs b/core/tests/cross_external_nodes_checker/src/helpers.rs deleted file mode 100644 index 6247b5e8c8ad..000000000000 --- a/core/tests/cross_external_nodes_checker/src/helpers.rs +++ /dev/null @@ -1,326 +0,0 @@ -use std::{collections::HashMap, future::Future, time::Duration}; - -use futures::channel::oneshot; -use serde_json::{Map, Value}; -use tokio::time::sleep; - -/// Sets up an interrupt handler and returns a future that resolves once an interrupt signal is received. -pub fn setup_sigint_handler() -> oneshot::Receiver<()> { - let (sigint_sender, sigint_receiver) = oneshot::channel(); - let mut sigint_sender = Some(sigint_sender); - ctrlc::set_handler(move || { - if let Some(sigint_sender) = sigint_sender.take() { - sigint_sender.send(()).ok(); - // ^ The send fails if `sigint_receiver` is dropped. We're OK with this, - // since at this point the node should be stopping anyway, or is not interested - // in listening to interrupt signals. - } - }) - .expect("Error setting Ctrl+C handler"); - - sigint_receiver -} - -pub fn compare_json( - a: &T, - b: &T, - path: String, -) -> HashMap, Option)> { - let a = serde_json::to_value(a).expect("serialization failure"); - let b = serde_json::to_value(b).expect("serialization failure"); - - if a == b { - return HashMap::new(); - } - - match (a, b) { - (Value::Object(ref a), Value::Object(ref b)) => compare_json_object(a, b, path), - (Value::Array(ref a), Value::Array(ref b)) => compare_json_array(a, b, path), - (a, b) => { - let mut res = HashMap::new(); - let a_val = if a.is_null() { None } else { Some(a) }; - let b_val = if b.is_null() { None } else { Some(b) }; - res.insert(path, (a_val, b_val)); - res - } - } -} - -fn compare_json_object( - a: &Map, - b: &Map, - path: String, -) -> HashMap, Option)> { - let mut differences = HashMap::new(); - - for (k, v) in a.iter() { - let new_path = if path.is_empty() { - k.clone() - } else { - format!("{}.{}", path, k) - }; - - differences.extend(compare_json(v, b.get(k).unwrap_or(&Value::Null), new_path)); - } - - for (k, v) in b.iter() { - if !a.contains_key(k) { - let new_path = if path.is_empty() { - k.clone() - } else { - format!("{}.{}", path, k) - }; - differences.insert(new_path, (None, Some(v.clone()))); - } - } - - differences -} - -fn compare_json_array( - a: &Vec, - b: &Vec, - path: String, -) -> HashMap, Option)> { - let mut differences = HashMap::new(); - - let len = a.len().max(b.len()); - for i in 0..len { - let new_path = format!("{}[{}]", path, i); - differences.extend(compare_json( - a.get(i).unwrap_or(&Value::Null), - b.get(i).unwrap_or(&Value::Null), - new_path, - )); - } - - differences -} - -#[derive(Debug, Clone)] -pub struct ExponentialBackoff { - pub max_retries: u32, - pub base_delay: Duration, - pub retry_message: String, -} - -impl ExponentialBackoff { - // Keep retrying until the operation returns Some or we reach the max number of retries. - pub async fn retry(&self, mut operation: F) -> Option - where - F: FnMut() -> Fut, - Fut: Future>, - { - for retry in 1..=self.max_retries { - if let Some(result) = operation().await { - return Some(result); - } - if retry == self.max_retries { - break; - } - let delay = self.base_delay * retry; - tracing::warn!( - "{} Retrying in {} seconds", - self.retry_message, - delay.as_secs() - ); - sleep(delay).await; - } - None - } -} - -#[cfg(test)] -mod tests { - use serde_json::json; - - use super::*; - - #[test] - fn test_same_json() { - let json1 = json!({ - "key1": "value1", - "key2": 2, - "key3": [ - "value2", - "+value3" - ] - }); - - let differences = compare_json(&json1, &json1, "".to_string()); - assert_eq!(differences.len(), 0); - } - - #[test] - fn test_deeply_nested_objects() { - let a = json!({ - "key1": { - "subkey1": { - "subsubkey1": "value1", - "subsubkey2": "value2" - }, - "subkey2": "value3" - }, - "key2": "value4" - }); - - let b = json!({ - "key1": { - "subkey1": { - "subsubkey1": "value1", - "subsubkey2": "value5" - }, - "subkey2": "value6" - }, - "key2": "value4" - }); - - let differences = compare_json(&a, &b, "".to_string()); - - assert_eq!(differences.len(), 2); - assert_eq!( - differences.get("key1.subkey1.subsubkey2"), - Some(&(Some(json!("value2")), Some(json!("value5")))) - ); - assert_eq!( - differences.get("key1.subkey2"), - Some(&(Some(json!("value3")), Some(json!("value6")))) - ); - } - - #[test] - fn test_diff_different_keys() { - let a = json!({ - "key1": "value1", - "key2": "value2" - }); - - let b = json!({ - "key1": "value1", - "key3": "value3" - }); - - let differences = compare_json(&a, &b, "".to_string()); - - assert_eq!(differences.len(), 2); - assert_eq!( - differences.get("key2"), - Some(&(Some(json!("value2")), None)) - ); - assert_eq!( - differences.get("key3"), - Some(&(None, Some(json!("value3")))) - ); - } - - #[test] - fn test_diff_different_types() { - let a = json!({ - "key1": true, - "key2": 123, - "key3": "value1" - }); - - let b = json!({ - "key1": false, - "key2": "123", - "key3": "value2" - }); - - let differences = compare_json(&a, &b, "".to_string()); - - assert_eq!(differences.len(), 3); - assert_eq!( - differences.get("key1"), - Some(&(Some(json!(true)), Some(json!(false)))) - ); - assert_eq!( - differences.get("key2"), - Some(&(Some(json!(123)), Some(json!("123")))) - ); - assert_eq!( - differences.get("key3"), - Some(&(Some(json!("value1")), Some(json!("value2")))) - ); - } - - #[test] - fn test_empty_jsons() { - let json1 = json!({}); - let json2 = json!([]); - - let differences = compare_json(&json1, &json1, "".to_string()); - assert_eq!(differences.len(), 0); - - let differences = compare_json(&json2, &json2, "".to_string()); - assert_eq!(differences.len(), 0); - - let differences = compare_json(&json1, &json2, "".to_string()); - assert_eq!(differences.len(), 1); - } - - #[test] - fn test_one_empty_json() { - let json1 = json!({}); - let json2 = json!({ - "key1": "value1", - "key2": 2, - }); - - let differences = compare_json(&json1, &json2, "".to_string()); - assert_eq!(differences.len(), 2); - - let differences = compare_json(&json2, &json1, "".to_string()); - assert_eq!(differences.len(), 2); - } - - #[test] - fn test_json_with_null() { - let a = json!({ - "key1": null, - "key2": "value2" - }); - - let b = json!({ - "key1": "value1", - "key2": null - }); - - let differences = compare_json(&a, &b, "".to_string()); - - assert_eq!(differences.len(), 2); - assert_eq!( - differences.get("key1"), - Some(&(None, Some(json!("value1")))) - ); - assert_eq!( - differences.get("key2"), - Some(&(Some(json!("value2")), None)) - ); - } - - #[test] - fn test_arrays_different_lengths() { - let a = json!([1, 2, 3]); - let b = json!([1, 2, 3, 4]); - - let differences = compare_json(&a, &b, "".to_string()); - - assert_eq!(differences.len(), 1); - assert_eq!(differences.get("[3]"), Some(&(None, Some(json!(4))))); - } - - #[test] - fn test_arrays_with_nested_objects() { - let a = json!([{"key1": "value1"}, {"key2": "value2"}]); - let b = json!([{"key1": "value1"}, {"key2": "value3"}]); - - let differences = compare_json(&a, &b, "".to_string()); - - assert_eq!(differences.len(), 1); - assert_eq!( - differences.get("[1].key2"), - Some(&(Some(json!("value2")), Some(json!("value3")))) - ); - } -} diff --git a/core/tests/cross_external_nodes_checker/src/main.rs b/core/tests/cross_external_nodes_checker/src/main.rs deleted file mode 100644 index 7199c1cbd32c..000000000000 --- a/core/tests/cross_external_nodes_checker/src/main.rs +++ /dev/null @@ -1,65 +0,0 @@ -use tokio::sync::watch; -use zksync_utils::wait_for_tasks::wait_for_tasks; - -use self::{checker::Checker, pubsub_checker::PubSubChecker}; -use crate::{config::CheckerConfig, helpers::setup_sigint_handler}; - -mod checker; -mod config; -mod divergence; -mod helpers; -mod pubsub_checker; - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - #[allow(deprecated)] // TODO (QIT-21): Use centralized configuration approach. - let log_format = vlog::log_format_from_env(); - #[allow(deprecated)] // TODO (QIT-21): Use centralized configuration approach. - let sentry_url = vlog::sentry_url_from_env(); - #[allow(deprecated)] // TODO (QIT-21): Use centralized configuration approach. - let environment = vlog::environment_from_env(); - - let mut builder = vlog::ObservabilityBuilder::new().with_log_format(log_format); - if let Some(sentry_url) = sentry_url { - builder = builder - .with_sentry_url(&sentry_url) - .expect("Invalid Sentry URL") - .with_sentry_environment(environment); - } - let _guard = builder.build(); - - tracing::info!("Started the Cross Node Checker"); - - let config = CheckerConfig::from_env(); - tracing::info!("Loaded the checker config: {:?}", config); - - let mut join_handles = Vec::new(); - let sigint_receiver = setup_sigint_handler(); - let (stop_sender, stop_receiver) = watch::channel::(false); - - if config.mode.run_rpc() { - let cross_node_checker = Checker::new(&config); - let checker_stop_receiver = stop_receiver.clone(); - let checker_handle = - tokio::spawn(async move { cross_node_checker.run(checker_stop_receiver).await }); - join_handles.push(checker_handle); - } - - if config.mode.run_pubsub() { - let pubsub_checker = PubSubChecker::new(config).await; - let pubsub_stop_receiver = stop_receiver.clone(); - let pubsub_handle = - tokio::spawn(async move { pubsub_checker.run(pubsub_stop_receiver).await }); - join_handles.push(pubsub_handle); - } - - tokio::select! { - _ = wait_for_tasks(join_handles, None, None::>, false) => {}, - _ = sigint_receiver => { - let _ = stop_sender.send(true); - tracing::info!("Stop signal received, shutting down the cross EN Checker"); - }, - } - - Ok(()) -} diff --git a/core/tests/cross_external_nodes_checker/src/pubsub_checker.rs b/core/tests/cross_external_nodes_checker/src/pubsub_checker.rs deleted file mode 100644 index 3e000e83d8f5..000000000000 --- a/core/tests/cross_external_nodes_checker/src/pubsub_checker.rs +++ /dev/null @@ -1,311 +0,0 @@ -use std::{ - collections::HashMap, - sync::Arc, - time::{Duration, Instant}, -}; - -use anyhow::Context as _; -use tokio::{ - select, spawn, - sync::{watch::Receiver, Mutex as TokioMutex}, - time::timeout, -}; -use zksync_types::{web3::types::U64, MiniblockNumber}; -use zksync_utils::wait_for_tasks::wait_for_tasks; -use zksync_web3_decl::{ - jsonrpsee::{ - core::{ - client::{Subscription, SubscriptionClientT}, - ClientError, - }, - rpc_params, - ws_client::{WsClient, WsClientBuilder}, - }, - types::{BlockHeader, PubSubResult}, -}; - -use crate::{ - config::CheckerConfig, - divergence::{Divergence, DivergenceDetails}, - helpers::{compare_json, ExponentialBackoff}, -}; - -const MAX_RETRIES: u32 = 6; -const GRACE_PERIOD: Duration = Duration::from_secs(60); -const SUBSCRIPTION_TIMEOUT: Duration = Duration::from_secs(120); - -#[derive(Debug, Clone)] -pub struct PubSubChecker { - main_node_url: String, - instance_urls: Vec, - /// Time in seconds for a subscription to be active. If `None`, the subscription will run forever. - subscription_duration: Option, - /// Mapping of block numbers to the block header and the number of instances that still need to - /// check the corresponding header. This Hashmap is shared between all threads. - /// The number of instances is used to determine when to remove the block from the hashmap. - pub blocks: Arc>>, -} - -impl PubSubChecker { - pub async fn new(config: CheckerConfig) -> Self { - let duration = config.subscription_duration.map(Duration::from_secs); - Self { - main_node_url: config - .main_node_ws_url - .expect("WS URL for the main node has to be provided for PubSub mode."), - instance_urls: config - .instances_ws_urls - .expect("WS URLs for the EN instances have to be provided for PubSub mode."), - subscription_duration: duration, - blocks: Arc::new(TokioMutex::new(HashMap::new())), - } - } - - pub async fn run(&self, mut stop_receiver: Receiver) -> anyhow::Result<()> { - tracing::info!("Started pubsub checker"); - - let mut join_handles = Vec::new(); - - let this = self.clone(); - let main_stop_receiver = stop_receiver.clone(); - let handle = spawn(async move { - tracing::info!("Started a task to subscribe to the main node"); - if let Err(e) = this.subscribe_main(main_stop_receiver).await { - tracing::error!("Error in main node subscription task: {}", e); - } - Ok(()) - }); - join_handles.push(handle); - - let instance_urls = self.instance_urls.clone(); - for instance_url in &instance_urls { - let this = self.clone(); - let instance_stop_receiver = stop_receiver.clone(); - let url = instance_url.clone(); - let handle = spawn(async move { - tracing::info!("Started a task to subscribe to instance {}", url); - this.subscribe_instance(&url, instance_stop_receiver) - .await - .with_context(|| format!("Error in instance {} subscription task", url)) - }); - join_handles.push(handle); - } - - select! { - _ = wait_for_tasks(join_handles, None, None::>, false) => {}, - _ = stop_receiver.changed() => { - tracing::info!("Stop signal received, shutting down pubsub checker"); - }, - } - Ok(()) - } - - // Setup a client for the main node, subscribe, and insert incoming pubsub results into the shared hashmap. - async fn subscribe_main(&self, stop_receiver: Receiver) -> anyhow::Result<()> { - let client = self.setup_client(&self.main_node_url).await; - let params = rpc_params!["newHeads"]; - - let mut subscription: Subscription = client - .subscribe("eth_subscribe", params, "eth_unsubscribe") - .await?; - - let start = Instant::now(); - loop { - if self.check_if_loop_should_break(&stop_receiver, &start, &self.main_node_url) { - break; - } - - let no_res_timeout_duration = self.get_timeout_duration(&start); - let stream_res = timeout(no_res_timeout_duration, subscription.next()) - .await - .map_err(|_| - anyhow::anyhow!( - "OperationTimeout: Haven't gotten an item for over {} seconds subscribing to the main node", - no_res_timeout_duration.as_secs() - ) - )?; - let pubsub_res = stream_res.ok_or_else(|| anyhow::anyhow!("Stream has ended"))?; - - let (block_header, block_number) = self.extract_block_info(pubsub_res).await?; - - // Secure the lock for the map and insert the new header. - let mut blocks = self.blocks.lock().await; - blocks.insert(block_number, (block_header, self.instance_urls.len())); - tracing::debug!("Inserted block {} to main node map", block_number); - } - - Ok(()) - } - - // Setup a client for the instance node, subscribe, and compare incoming pubsub results to the main node's. - async fn subscribe_instance( - &self, - url: &str, - stop_receiver: Receiver, - ) -> anyhow::Result<()> { - let client = self.setup_client(url).await; - let params = rpc_params!["newHeads"]; - - let mut subscription: Subscription = client - .subscribe("eth_subscribe", params, "eth_unsubscribe") - .await?; - - let start = Instant::now(); - loop { - if self.check_if_loop_should_break(&stop_receiver, &start, url) { - break; - } - - let no_res_timeout_duration = self.get_timeout_duration(&start); - let stream_res = timeout(no_res_timeout_duration, subscription.next()) - .await - .map_err(|_| - anyhow::anyhow!( - "OperationTimeout: Haven't gotten an item for over {} seconds subscribing to instance {}", - no_res_timeout_duration.as_secs(), url - ) - )?; - let pubsub_res = stream_res.ok_or_else(|| anyhow::anyhow!("Stream has ended"))?; - let (instance_block_header, block_number) = self.extract_block_info(pubsub_res).await?; - tracing::debug!("Got block {} from instance {}", block_number, url); - - // Get the main node block header from the map and update its count. - // This should be retried because the map not having the block the instance does might - // just mean the main node subscriber is lagging. - let backoff = ExponentialBackoff { - max_retries: MAX_RETRIES, - base_delay: Duration::from_secs(1), - retry_message: format!( - "block {} is still not present in main node map for instance {}.", - block_number, url, - ), - }; - let main_node_value = backoff - // Wait for the block to appear in the main node map. - .retry(|| { - async move { - let mut blocks = self.blocks.lock().await; - let main_node_value = blocks.get(&block_number).cloned(); - match main_node_value { - Some((header, count)) => { - if count > 1 { - blocks.insert(block_number, (header.clone(), count - 1)); - } else { - blocks.remove(&block_number); - } - tracing::debug!("Updated blocks map: {:?}", blocks.keys()); - Some((header, count)) - } - None => None, // Retry - } - } - }) - .await; - - // If main node map contained the header, compare main & instance headers. - match main_node_value { - Some((main_node_header, _)) => { - self.check_headers(&main_node_header, &instance_block_header, url); - } - None => { - // If the main subscriber starts ahead of an instance subscriber, the map may - // start with block X while instance is looking for block X-1, which will never - // be in the map. We don't want to log an error for this case. - if start.elapsed() > GRACE_PERIOD { - tracing::error!( - "block {} has not been found in the main node map for instance {} after {} retries", - block_number, - url, - MAX_RETRIES - ); - } - } - }; - } - - Ok(()) - } - - fn get_timeout_duration(&self, start: &Instant) -> Duration { - match self.subscription_duration { - Some(duration) => std::cmp::min( - duration.checked_sub(start.elapsed()), - Some(SUBSCRIPTION_TIMEOUT), - ) - .unwrap(), - None => SUBSCRIPTION_TIMEOUT, - } - } - - fn check_if_loop_should_break( - &self, - stop_receiver: &Receiver, - start: &Instant, - url: &str, - ) -> bool { - if *stop_receiver.borrow() { - tracing::info!("Stop signal received, shutting down pubsub checker"); - return true; - } - if let Some(duration) = self.subscription_duration { - if start.elapsed() > duration { - tracing::info!("Client {} reached its subscription duration", url); - return true; - } - } - false - } - - async fn setup_client(&self, url: &str) -> WsClient { - WsClientBuilder::default() - .build(url) - .await - .expect("Failed to create a WS client") - } - - // Extract the block header and block number from the pubsub result that is expected to be a header. - async fn extract_block_info( - &self, - pubsub_res: Result, - ) -> Result<(BlockHeader, U64), anyhow::Error> { - let PubSubResult::Header(header) = pubsub_res? else { - return Err(anyhow::anyhow!("Received non-header pubsub result")); - }; - - let Some(block_number) = header.number else { - return Err(anyhow::anyhow!("Received header without block number.")); - }; - - Ok((header, block_number)) - } - - fn check_headers( - &self, - main_node_header: &BlockHeader, - instance_header: &BlockHeader, - instance_url: &str, - ) { - let header_differences = compare_json(&main_node_header, &instance_header, "".to_string()); - if header_differences.is_empty() { - tracing::info!( - "No divergences found in header for block {} for instance {}", - instance_header.number.unwrap().as_u64(), - instance_url - ); - } - for (key, (main_node_val, instance_val)) in header_differences { - tracing::error!( - "{}", - Divergence::PubSubHeader(DivergenceDetails { - en_instance_url: instance_url.to_string(), - main_node_value: Some(format!("{}: {:?}", key, main_node_val)), - en_instance_value: Some(format!("{}: {:?}", key, instance_val)), - entity_id: None, - miniblock_number: Some(MiniblockNumber( - main_node_header.number.unwrap().as_u32() - )), - }), - ); - } - } -} diff --git a/docker/cross-external-nodes-checker/Dockerfile b/docker/cross-external-nodes-checker/Dockerfile deleted file mode 100644 index 87b5d67d719d..000000000000 --- a/docker/cross-external-nodes-checker/Dockerfile +++ /dev/null @@ -1,27 +0,0 @@ -# syntax=docker/dockerfile:experimental -FROM debian:bookworm-slim as builder - -RUN apt-get update && apt-get install -y curl clang openssl libssl-dev gcc g++ \ - pkg-config build-essential libclang-dev && \ - rm -rf /var/lib/apt/lists/* - -ENV RUSTUP_HOME=/usr/local/rustup \ - CARGO_HOME=/usr/local/cargo \ - PATH=/usr/local/cargo/bin:$PATH - -RUN curl https://sh.rustup.rs -sSf | bash -s -- -y && \ - rustup install nightly-2023-08-21 && \ - rustup default nightly-2023-08-21 - -WORKDIR /usr/src/zksync -COPY . . - -RUN cargo build --release - -FROM debian:bookworm-slim - -RUN apt-get update && apt-get install -y curl ca-certificates && rm -rf /var/lib/apt/lists/* - -COPY --from=builder /usr/src/zksync/target/release/cross_external_nodes_checker /usr/bin - -ENTRYPOINT ["cross_external_nodes_checker"] diff --git a/etc/env/base/rust.toml b/etc/env/base/rust.toml index adc79ff05e6a..8eef7700067a 100644 --- a/etc/env/base/rust.toml +++ b/etc/env/base/rust.toml @@ -27,7 +27,6 @@ block_sizes_test=info,\ zksync_object_store=info,\ en_playground=info,\ zksync_external_node=info,\ -cross_nodes_checker=debug,\ zksync_witness_generator=info,\ zksync_prover_fri=info,\ zksync_witness_vector_generator=info,\ diff --git a/etc/env/ext-node-docker.toml b/etc/env/ext-node-docker.toml index be32233311f2..129b41a41816 100644 --- a/etc/env/ext-node-docker.toml +++ b/etc/env/ext-node-docker.toml @@ -64,7 +64,6 @@ zksync_types=info,\ loadnext=info,\ vm=info,\ zksync_external_node=info,\ -cross_nodes_checker=debug,\ """ # `RUST_BACKTRACE` variable diff --git a/etc/env/ext-node.toml b/etc/env/ext-node.toml index fab7b90564f5..58298a5501bd 100644 --- a/etc/env/ext-node.toml +++ b/etc/env/ext-node.toml @@ -64,7 +64,6 @@ zksync_types=info,\ loadnext=info,\ vm=info,\ zksync_external_node=info,\ -cross_nodes_checker=debug,\ """ # `RUST_BACKTRACE` variable diff --git a/infrastructure/zk/src/docker.ts b/infrastructure/zk/src/docker.ts index 7b571ebcc344..143a41bd154f 100644 --- a/infrastructure/zk/src/docker.ts +++ b/infrastructure/zk/src/docker.ts @@ -4,7 +4,6 @@ import * as utils from './utils'; const IMAGES = [ 'server-v2', 'external-node', - 'cross-external-nodes-checker', 'contract-verifier', 'prover-v2', 'geth', @@ -70,7 +69,6 @@ function defaultTagList(image: string, imageTagSha: string, imageTagShaTS: strin const tagList = [ 'server-v2', 'external-node', - 'cross-external-nodes-checker', 'prover', 'contract-verifier', 'prover-v2', diff --git a/infrastructure/zk/src/run/run.ts b/infrastructure/zk/src/run/run.ts index e4132e64b204..da5b66d8d687 100644 --- a/infrastructure/zk/src/run/run.ts +++ b/infrastructure/zk/src/run/run.ts @@ -113,12 +113,6 @@ export async function readVariable(address: string, contractName: string, variab ); } -export async function cross_en_checker() { - let logLevel = 'RUST_LOG=cross_external_nodes_checker=debug'; - let suffix = 'cargo run --release --bin cross_external_nodes_checker'; - await utils.spawn(`${logLevel} ${suffix}`); -} - export async function snapshots_creator() { process.chdir(`${process.env.ZKSYNC_HOME}`); let logLevel = 'RUST_LOG=snapshots_creator=debug'; @@ -194,107 +188,3 @@ command }); command.command('snapshots-creator').action(snapshots_creator); - -command - .command('cross-en-checker') - .description('run the cross external nodes checker. See Checker Readme the default run mode and configuration.') - .option( - '--mode ', - '`Rpc` to run only the RPC checker; `PubSub` to run only the PubSub checker; `All` to run both.' - ) - .option( - '--env ', - `Provide the env the checker will test in to use the default urls for that env. 'Local', 'Stage, 'Testnet', or 'Mainnet'` - ) - .option('--main_node_http_url ', 'Manually provide the HTTP URL of the main node') - .option('--instances_http_urls ', 'Manually provide the HTTP URLs of the instances to check') - .option('--main_node_ws_url ', 'Manually provide the WS URL of the main node') - .option('--instances_ws_urls ', 'Manually provide the WS URLs of the instances to check') - .option( - '--rpc_mode ', - 'The mode to run the RPC checker in. `Triggered` to run once; `Continuous` to run forever.' - ) - .option( - '--start_miniblock ', - 'Check all miniblocks starting from this. If not set, then check from genesis. Inclusive.' - ) - .option( - '--finish_miniblock ', - 'For Triggered mode. If not set, then check all available miniblocks. Inclusive.' - ) - .option( - '--max_transactions_to_check ', - 'The maximum number of transactions to be checked at random in each miniblock.' - ) - .option( - '--instance_poll_period ', - 'For RPC mode. In seconds, how often to poll the instance node for new miniblocks.' - ) - .option( - '--subscription_duration ', - 'For PubSub mode. Time in seconds for a subscription to be active. If not set, then the subscription will run forever.' - ) - .action(async (cmd: Command) => { - interface Environment { - httpMain: string; - httpInstances: string; - wsMain: string; - wsInstances: string; - } - - const nodeUrls: Record = { - Local: { - httpMain: 'http://127.0.0.1:3050', - httpInstances: 'http://127.0.0.1:3060', - wsMain: 'ws://127.0.0.1:3051', - wsInstances: 'ws://127.0.0.1:3061' - }, - Stage: { - httpMain: 'https://z2-dev-api.zksync.dev:443', - httpInstances: 'https://external-node-dev.zksync.dev:443', - wsMain: 'wss://z2-dev-api.zksync.dev:443/ws', - wsInstances: 'wss://external-node-dev.zksync.dev:443/ws' - }, - Testnet: { - httpMain: 'https://zksync2-testnet.zksync.dev:443', - httpInstances: 'https://external-node-testnet.zksync.dev:443', - wsMain: 'wss://zksync2-testnet.zksync.dev:443/ws', - wsInstances: 'wss://external-node-testnet.zksync.dev:443/ws' - }, - Mainnet: { - httpMain: 'https://zksync2-mainnet.zksync.io:443', - httpInstances: 'https://external-node-mainnet.zksync.dev:443', - wsMain: 'wss://zksync2-mainnet.zksync.io:443/ws', - wsInstances: 'wss://external-node-mainnet.zksync.dev:443/ws' - } - }; - - if (cmd.env && nodeUrls[cmd.env]) { - process.env.CHECKER_MAIN_NODE_HTTP_URL = nodeUrls[cmd.env].httpMain; - process.env.CHECKER_INSTANCES_HTTP_URLS = nodeUrls[cmd.env].httpInstances; - process.env.CHECKER_MAIN_NODE_WS_URL = nodeUrls[cmd.env].wsMain; - process.env.CHECKER_INSTANCES_WS_URLS = nodeUrls[cmd.env].wsInstances; - } - - const envVarMap = { - mode: 'CHECKER_MODE', - rpc_mode: 'CHECKER_RPC_MODE', - main_node_http_url: 'CHECKER_MAIN_NODE_HTTP_URL', - instances_http_urls: 'CHECKER_INSTANCES_HTTP_URLS', - main_node_ws_url: 'CHECKER_MAIN_NODE_WS_URL', - instances_ws_urls: 'CHECKER_INSTANCES_WS_URLS', - start_miniblock: 'CHECKER_START_MINIBLOCK', - finish_miniblock: 'CHECKER_FINISH_MINIBLOCK', - max_transactions_to_check: 'CHECKER_MAX_TRANSACTIONS_TO_CHECK', - instance_poll_period: 'CHECKER_INSTANCE_POLL_PERIOD', - subscription_duration: 'CHECKER_SUBSCRIPTION_DURATION' - }; - - for (const [cmdOption, envVar] of Object.entries(envVarMap)) { - if (cmd[cmdOption]) { - process.env[envVar] = cmd[cmdOption]; - } - } - - await cross_en_checker(); - });