diff --git a/.github/actions/sccache/action.yaml b/.github/actions/sccache/action.yaml index 14954b1f202..700c47e2f96 100644 --- a/.github/actions/sccache/action.yaml +++ b/.github/actions/sccache/action.yaml @@ -34,7 +34,7 @@ inputs: default: "true" version: description: "sccache version" - default: "0.8.2" + default: "0.9.1" required: false outputs: env_vars: diff --git a/Cargo.lock b/Cargo.lock index e8c68108b7d..36c90110bcc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1547,6 +1547,7 @@ dependencies = [ "ciborium 0.2.0", "clap", "console-subscriber", + "crc", "dapi-grpc", "dashcore-rpc", "delegate", @@ -2098,8 +2099,7 @@ dependencies = [ [[package]] name = "grovedb" version = "2.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebf36cc41af86d8ccb8b7f64fd15006cd83ec979c49cd2ad30628bf855c54d7d" +source = "git+https://github.com/dashpay/grovedb?branch=feat%2Fchunk_packing_master_fix#ee83e00aef4fbb253d584cb57450428a2264b552" dependencies = [ "axum", "bincode", @@ -2130,8 +2130,7 @@ dependencies = [ [[package]] name = "grovedb-costs" version = "2.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cc526a58bdca58cb86340632081e27264e3557a73608cf29f6738ad9bfab316" +source = "git+https://github.com/dashpay/grovedb?branch=feat%2Fchunk_packing_master_fix#ee83e00aef4fbb253d584cb57450428a2264b552" dependencies = [ "integer-encoding", "intmap", @@ -2141,8 +2140,7 @@ dependencies = [ [[package]] name = "grovedb-epoch-based-storage-flags" version = "2.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abd5f01eb50ff57b2c24e856377684d42dacdbd04de7c0189bf2e0e0cd109692" +source = "git+https://github.com/dashpay/grovedb?branch=feat%2Fchunk_packing_master_fix#ee83e00aef4fbb253d584cb57450428a2264b552" dependencies = [ "grovedb-costs", "hex", @@ -2154,8 +2152,7 @@ dependencies = [ [[package]] name = "grovedb-merk" version = "2.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ce3b133a76e9935f3a57e08598769849d79df582244e1ac99509177e23c2605" +source = "git+https://github.com/dashpay/grovedb?branch=feat%2Fchunk_packing_master_fix#ee83e00aef4fbb253d584cb57450428a2264b552" dependencies = [ "bincode", "blake3", @@ -2177,14 +2174,12 @@ dependencies = [ [[package]] name = "grovedb-path" version = "2.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67dc8bc00b9be473f7b25670d1422daadd706c9b09ed6aa5cf2caf8722a487ac" +source = "git+https://github.com/dashpay/grovedb?branch=feat%2Fchunk_packing_master_fix#ee83e00aef4fbb253d584cb57450428a2264b552" [[package]] name = "grovedb-storage" version = "2.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "905cff776de89b9ee1e96979861e254b0912170b35d89678ad782f487a22a2e3" +source = "git+https://github.com/dashpay/grovedb?branch=feat%2Fchunk_packing_master_fix#ee83e00aef4fbb253d584cb57450428a2264b552" dependencies = [ "blake3", "grovedb-costs", @@ -2203,8 +2198,7 @@ dependencies = [ [[package]] name = "grovedb-version" version = "2.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a987e051c8c9cf8fa381b29b243d4951f8c1f24f9c90ceed52afca3ac460986c" +source = "git+https://github.com/dashpay/grovedb?branch=feat%2Fchunk_packing_master_fix#ee83e00aef4fbb253d584cb57450428a2264b552" dependencies = [ "thiserror 2.0.11", "versioned-feature-core 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2213,8 +2207,7 @@ dependencies = [ [[package]] name = "grovedb-visualize" version = "2.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56eee6f57d324505611de0042af2b6b9933235e704a1a6542ff7ba5b5c56f64e" +source = "git+https://github.com/dashpay/grovedb?branch=feat%2Fchunk_packing_master_fix#ee83e00aef4fbb253d584cb57450428a2264b552" dependencies = [ "hex", "itertools 0.14.0", @@ -2223,8 +2216,7 @@ dependencies = [ [[package]] name = "grovedbg-types" version = "2.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cfa37a90579ba2c71e074d6047c1dcfc19caa458fbcefd4e8e31a02f6a9fe38" +source = "git+https://github.com/dashpay/grovedb?branch=feat%2Fchunk_packing_master_fix#ee83e00aef4fbb253d584cb57450428a2264b552" dependencies = [ "serde", "serde_with 3.9.0", @@ -4393,9 +4385,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.126" +version = "1.0.138" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3b863381a05ffefbc82571a2d893edf47b27fb0ebedbf582c39640e51abebef" +checksum = "d434192e7da787e94a6ea7e9670b26a036d0ca41e0b7efb2676dd32bae872949" dependencies = [ "indexmap 2.7.0", "itoa", @@ -4836,7 +4828,7 @@ dependencies = [ [[package]] name = "tenderdash-abci" version = "1.2.1+1.3.0" -source = "git+https://github.com/dashpay/rs-tenderdash-abci?tag=v1.2.1%2B1.3.0#aad72f4d25816bdf0f584ee4ba3cd383addf8a33" +source = "git+https://github.com/dashpay/rs-tenderdash-abci?rev=b55bed9f574b68f2b6c96cbc80da41072056781d#b55bed9f574b68f2b6c96cbc80da41072056781d" dependencies = [ "bytes", "futures", @@ -4845,7 +4837,7 @@ dependencies = [ "semver", "serde_json", "tenderdash-proto", - "thiserror 1.0.64", + "thiserror 2.0.11", "tokio", "tokio-util", "tracing", @@ -4857,7 +4849,7 @@ dependencies = [ [[package]] name = "tenderdash-proto" version = "1.2.1+1.3.0" -source = "git+https://github.com/dashpay/rs-tenderdash-abci?tag=v1.2.1%2B1.3.0#aad72f4d25816bdf0f584ee4ba3cd383addf8a33" +source = "git+https://github.com/dashpay/rs-tenderdash-abci?rev=b55bed9f574b68f2b6c96cbc80da41072056781d#b55bed9f574b68f2b6c96cbc80da41072056781d" dependencies = [ "bytes", "chrono", @@ -4876,7 +4868,7 @@ dependencies = [ [[package]] name = "tenderdash-proto-compiler" version = "1.2.1+1.3.0" -source = "git+https://github.com/dashpay/rs-tenderdash-abci?tag=v1.2.1%2B1.3.0#aad72f4d25816bdf0f584ee4ba3cd383addf8a33" +source = "git+https://github.com/dashpay/rs-tenderdash-abci?rev=b55bed9f574b68f2b6c96cbc80da41072056781d#b55bed9f574b68f2b6c96cbc80da41072056781d" dependencies = [ "fs_extra", "prost-build", diff --git a/Dockerfile b/Dockerfile index 0050ed916a1..b7ab465628a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -147,7 +147,7 @@ ENV NODE_ENV=${NODE_ENV} FROM deps-base AS deps-sccache # SCCACHE_VERSION must be the same as in github actions, to avoid cache incompatibility -ARG SCCHACHE_VERSION=0.8.2 +ARG SCCHACHE_VERSION=0.9.1 # Install sccache for caching RUN if [[ "$TARGETARCH" == "arm64" ]] ; then export SCC_ARCH=aarch64; else export SCC_ARCH=x86_64; fi; \ @@ -552,12 +552,9 @@ LABEL description="Drive ABCI Rust" RUN apk add --no-cache libgcc libstdc++ ENV DB_PATH=/var/lib/dash/rs-drive-abci/db +ENV CHECKPOINTS_PATH=/var/lib/dash/rs-drive-abci/db-checkpoints ENV REJECTIONS_PATH=/var/log/dash/rejected -RUN mkdir -p /var/log/dash \ - /var/lib/dash/rs-drive-abci/db \ - ${REJECTIONS_PATH} - COPY --from=build-drive-abci /artifacts/drive-abci /usr/bin/drive-abci COPY packages/rs-drive-abci/.env.mainnet /var/lib/dash/rs-drive-abci/.env @@ -565,6 +562,14 @@ COPY packages/rs-drive-abci/.env.mainnet /var/lib/dash/rs-drive-abci/.env VOLUME /var/lib/dash/rs-drive-abci/db VOLUME /var/log/dash +# Ensure required paths do exist +# TODO: remove /var/lib/dash-platform/data/checkpoints when drive-abci is fixed +RUN mkdir -p /var/log/dash \ + ${DB_PATH} \ + ${CHECKPOINTS_PATH} \ + ${REJECTIONS_PATH} \ + /var/lib/dash-platform/data/checkpoints + # Double-check that we don't have missing deps RUN ldd /usr/bin/drive-abci @@ -574,9 +579,10 @@ RUN ldd /usr/bin/drive-abci ARG USERNAME=dash ARG USER_UID=1000 ARG USER_GID=$USER_UID +# TODO: remove /var/lib/dash-platform/data/checkpoints when drive-abci is fixed RUN addgroup -g $USER_GID $USERNAME && \ adduser -D -u $USER_UID -G $USERNAME -h /var/lib/dash/rs-drive-abci $USERNAME && \ - chown -R $USER_UID:$USER_GID /var/lib/dash/rs-drive-abci /var/log/dash + chown -R $USER_UID:$USER_GID /var/lib/dash/rs-drive-abci /var/log/dash /var/lib/dash-platform/data/checkpoints USER $USERNAME diff --git a/packages/dapi-grpc/Cargo.toml b/packages/dapi-grpc/Cargo.toml index 2b4fe8e531e..db46c81cd47 100644 --- a/packages/dapi-grpc/Cargo.toml +++ b/packages/dapi-grpc/Cargo.toml @@ -42,8 +42,9 @@ tonic = { version = "0.12.3", features = [ serde = { version = "1.0.197", optional = true, features = ["derive"] } serde_bytes = { version = "0.11.12", optional = true } serde_json = { version = "1.0", optional = true } -tenderdash-proto = { git = "https://github.com/dashpay/rs-tenderdash-abci", version = "1.2.1", tag = "v1.2.1+1.3.0", default-features = false, features = [ +tenderdash-proto = { git = "https://github.com/dashpay/rs-tenderdash-abci", rev = "b55bed9f574b68f2b6c96cbc80da41072056781d", default-features = false, features = [ "grpc", + "serde", ] } dapi-grpc-macros = { path = "../rs-dapi-grpc-macros" } platform-version = { path = "../rs-platform-version" } diff --git a/packages/dashmate/configs/defaults/getBaseConfigFactory.js b/packages/dashmate/configs/defaults/getBaseConfigFactory.js index 1614216ad2a..dcae8ddd765 100644 --- a/packages/dashmate/configs/defaults/getBaseConfigFactory.js +++ b/packages/dashmate/configs/defaults/getBaseConfigFactory.js @@ -309,7 +309,7 @@ export default function getBaseConfigFactory() { tenderdash: { mode: 'full', docker: { - image: 'dashpay/tenderdash:1', + image: 'dashpay/tenderdash:feat-statesync-integration', }, p2p: { host: '0.0.0.0', diff --git a/packages/dashmate/templates/platform/drive/tenderdash/config.toml.dot b/packages/dashmate/templates/platform/drive/tenderdash/config.toml.dot index a818a356409..c9c55267791 100644 --- a/packages/dashmate/templates/platform/drive/tenderdash/config.toml.dot +++ b/packages/dashmate/templates/platform/drive/tenderdash/config.toml.dot @@ -81,7 +81,7 @@ filter-peers = false # Example for routed multi-app setup: # abci = "routed" # address = "Info:socket:unix:///tmp/socket.1,Info:socket:unix:///tmp/socket.2,CheckTx:socket:unix:///tmp/socket.1,*:socket:unix:///tmp/socket.3" -address = "CheckTx:grpc:drive_abci:26670,*:socket:tcp://drive_abci:26658" +address = "ListSnapshots:grpc:drive_abci:26670,LoadSnapshotChunk:grpc:drive_abci:26670,CheckTx:grpc:drive_abci:26670,*:socket:tcp://drive_abci:26658" # Transport mechanism to connect to the ABCI application: socket | grpc | routed transport = "routed" # Maximum number of simultaneous connections to the ABCI application @@ -97,6 +97,10 @@ transport = "routed" #] grpc-concurrency = [ { "check_tx" = {{= it.platform.drive.tenderdash.mempool.maxConcurrentCheckTx }} }, + { "list_snapshots" = {{= it.platform.drive.tenderdash.mempool.maxConcurrentCheckTx }} }, + { "load_snapshot_chunk" = {{= it.platform.drive.tenderdash.mempool.maxConcurrentCheckTx }} }, + { "offer_snapshot" = 1 }, + { "apply_snapshot_chunk" = 1 }, ] @@ -414,26 +418,17 @@ ttl-num-blocks = {{=it.platform.drive.tenderdash.mempool.ttlNumBlocks}} # the network to take and serve state machine snapshots. State sync is not attempted if the node # has any local state (LastBlockHeight > 0). The node will have a truncated block history, # starting from the height of the snapshot. -enable = false +enable = true # State sync uses light client verification to verify state. This can be done either through the # P2P layer or RPC layer. Set this to true to use the P2P layer. If false (default), RPC layer # will be used. -use-p2p = false +use-p2p = true # If using RPC, at least two addresses need to be provided. They should be compatible with net.Dial, # for example: "host.example.com:2125" rpc-servers = "" -# The hash and height of a trusted block. Must be within the trust-period. -trust-height = 0 -trust-hash = "" - -# The trust period should be set so that Tendermint can detect and gossip misbehavior before -# it is considered expired. For chains based on the Cosmos SDK, one day less than the unbonding -# period should suffice. -trust-period = "168h0m0s" - # Time to spend discovering snapshots before initiating a restore. discovery-time = "15s" diff --git a/packages/rs-dapi-client/Cargo.toml b/packages/rs-dapi-client/Cargo.toml index f48715b6dd6..21568f85247 100644 --- a/packages/rs-dapi-client/Cargo.toml +++ b/packages/rs-dapi-client/Cargo.toml @@ -35,7 +35,7 @@ sha2 = { version = "0.10", optional = true } hex = { version = "0.4.3", optional = true } lru = { version = "0.12.3" } serde = { version = "1.0.197", optional = true, features = ["derive"] } -serde_json = { version = "1.0.120", optional = true } +serde_json = { version = "1.0", optional = true } chrono = { version = "0.4.38", features = ["serde"] } [dev-dependencies] diff --git a/packages/rs-drive-abci/.env.local b/packages/rs-drive-abci/.env.local index c0e3ac3347a..a5f6eed657d 100644 --- a/packages/rs-drive-abci/.env.local +++ b/packages/rs-drive-abci/.env.local @@ -12,6 +12,12 @@ ABCI_LOG_STDOUT_FORMAT=pretty ABCI_LOG_STDOUT_COLOR=true DB_PATH=/tmp/db + +CHECKPOINTS_PATH=${DB_PATH}/checkpoints + +# GroveDB database file +GROVEDB_LATEST_FILE=${DB_PATH}/latest_state + REJECTIONS_PATH=/tmp/rejected # Cache size for Data Contracts diff --git a/packages/rs-drive-abci/Cargo.toml b/packages/rs-drive-abci/Cargo.toml index 2060d6fa6c9..4981f73d821 100644 --- a/packages/rs-drive-abci/Cargo.toml +++ b/packages/rs-drive-abci/Cargo.toml @@ -16,6 +16,7 @@ license = "MIT" arc-swap = "1.7.0" bincode = { version = "2.0.0-rc.3", features = ["serde"] } ciborium = { git = "https://github.com/qrayven/ciborium", branch = "feat-ser-null-as-undefined" } +crc = { version = "3.2.1" } chrono = "0.4.35" serde = { version = "1.0.197", features = ["derive"] } serde_json = { version = "1.0", features = ["preserve_order"] } @@ -49,8 +50,10 @@ tracing-subscriber = { version = "0.3.16", default-features = false, features = "registry", "tracing-log", ], optional = false } -tenderdash-abci = { git = "https://github.com/dashpay/rs-tenderdash-abci", version = "1.2.1", tag = "v1.2.1+1.3.0", features = [ +tenderdash-abci = { git = "https://github.com/dashpay/rs-tenderdash-abci", rev = "b55bed9f574b68f2b6c96cbc80da41072056781d", features = [ + "crypto", "grpc", + "serde", ] } lazy_static = "1.4.0" itertools = { version = "0.13" } diff --git a/packages/rs-drive-abci/src/abci/app/check_tx.rs b/packages/rs-drive-abci/src/abci/app/check_tx.rs index 2f4a39c1b33..31ed6cacd36 100644 --- a/packages/rs-drive-abci/src/abci/app/check_tx.rs +++ b/packages/rs-drive-abci/src/abci/app/check_tx.rs @@ -1,7 +1,8 @@ -use crate::abci::app::PlatformApplication; +use crate::abci::app::{PlatformApplication, SnapshotManagerApplication}; use crate::abci::handler; use crate::error::Error; use crate::platform_types::platform::Platform; +use crate::platform_types::snapshot::SnapshotManager; use crate::rpc::core::CoreRPCLike; use crate::utils::spawn_blocking_task_with_name_if_supported; use async_trait::async_trait; @@ -22,6 +23,8 @@ where /// Platform platform: Arc>, core_rpc: Arc, + /// Snapshot manager + snapshot_manager: SnapshotManager, } impl PlatformApplication for CheckTxAbciApplication @@ -33,13 +36,31 @@ where } } +impl SnapshotManagerApplication for CheckTxAbciApplication +where + C: CoreRPCLike + Send + Sync + 'static, +{ + fn snapshot_manager(&self) -> &SnapshotManager { + &self.snapshot_manager + } +} + impl CheckTxAbciApplication where C: CoreRPCLike + Send + Sync + 'static, { /// Create new ABCI app pub fn new(platform: Arc>, core_rpc: Arc) -> Self { - Self { platform, core_rpc } + let snapshot_manager = SnapshotManager::new( + platform.config.state_sync_config.checkpoints_path.clone(), + platform.config.state_sync_config.max_num_snapshots, + platform.config.state_sync_config.snapshots_frequency, + ); + Self { + platform, + core_rpc, + snapshot_manager, + } } } @@ -92,6 +113,24 @@ where .await .map_err(|error| tonic::Status::internal(format!("check tx panics: {}", error)))? } + + async fn list_snapshots( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + handler::list_snapshots(self, request.into_inner()) + .map(tonic::Response::new) + .map_err(|e| tonic::Status::internal(format!("list_snapshots failed: {}", e))) + } + + async fn load_snapshot_chunk( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + handler::load_snapshot_chunk(self, request.into_inner()) + .map(tonic::Response::new) + .map_err(|e| tonic::Status::internal(format!("load_snapshot_chunk failed: {}", e))) + } } pub fn error_into_status(error: Error) -> tonic::Status { diff --git a/packages/rs-drive-abci/src/abci/app/consensus.rs b/packages/rs-drive-abci/src/abci/app/consensus.rs index a1f08429847..f79aeddbf24 100644 --- a/packages/rs-drive-abci/src/abci/app/consensus.rs +++ b/packages/rs-drive-abci/src/abci/app/consensus.rs @@ -1,11 +1,16 @@ -use crate::abci::app::{BlockExecutionApplication, PlatformApplication, TransactionalApplication}; +use crate::abci::app::{ + BlockExecutionApplication, PlatformApplication, SnapshotFetchingApplication, + SnapshotManagerApplication, TransactionalApplication, +}; use crate::abci::handler; use crate::abci::handler::error::error_into_exception; use crate::error::execution::ExecutionError; use crate::error::Error; use crate::execution::types::block_execution_context::BlockExecutionContext; use crate::platform_types::platform::Platform; +use crate::platform_types::snapshot::{SnapshotFetchingSession, SnapshotManager}; use crate::rpc::core::CoreRPCLike; +use dapi_grpc::tonic; use dpp::version::PlatformVersion; use drive::grovedb::Transaction; use std::fmt::Debug; @@ -16,46 +21,74 @@ use tenderdash_abci::proto::abci as proto; /// /// AbciApp implements logic that should be triggered when Tenderdash performs various operations, like /// creating new proposal or finalizing new block. -pub struct ConsensusAbciApplication<'a, C> { +/// 'p: 'tx, means that Platform must outlive the transaction +pub struct ConsensusAbciApplication<'p, C> { /// Platform - platform: &'a Platform, + platform: &'p Platform, /// The current GroveDb transaction - transaction: RwLock>>, + transaction: RwLock>>, /// The current block execution context block_execution_context: RwLock>, + /// The State sync session + snapshot_fetching_session: RwLock>>, + /// The snapshot manager + snapshot_manager: SnapshotManager, } -impl<'a, C> ConsensusAbciApplication<'a, C> { +impl<'p, C> ConsensusAbciApplication<'p, C> { /// Create new ABCI app - pub fn new(platform: &'a Platform) -> Self { + pub fn new(platform: &'p Platform) -> Self { + let snapshot_manager = SnapshotManager::new( + platform.config.state_sync_config.checkpoints_path.clone(), + platform.config.state_sync_config.max_num_snapshots, + platform.config.state_sync_config.snapshots_frequency, + ); Self { platform, transaction: Default::default(), block_execution_context: Default::default(), + snapshot_fetching_session: Default::default(), + snapshot_manager, } } } -impl<'a, C> PlatformApplication for ConsensusAbciApplication<'a, C> { +impl<'p, C> PlatformApplication for ConsensusAbciApplication<'p, C> { fn platform(&self) -> &Platform { self.platform } } -impl<'a, C> BlockExecutionApplication for ConsensusAbciApplication<'a, C> { +impl<'p, C> SnapshotManagerApplication for ConsensusAbciApplication<'p, C> { + fn snapshot_manager(&self) -> &SnapshotManager { + &self.snapshot_manager + } +} + +impl<'p, C> SnapshotFetchingApplication<'p, C> for ConsensusAbciApplication<'p, C> { + fn snapshot_fetching_session(&self) -> &RwLock>> { + &self.snapshot_fetching_session + } + + fn platform(&self) -> &'p Platform { + self.platform + } +} + +impl<'p, C> BlockExecutionApplication for ConsensusAbciApplication<'p, C> { fn block_execution_context(&self) -> &RwLock> { &self.block_execution_context } } -impl<'a, C> TransactionalApplication<'a> for ConsensusAbciApplication<'a, C> { +impl<'p, C> TransactionalApplication<'p> for ConsensusAbciApplication<'p, C> { /// create and store a new transaction fn start_transaction(&self) { let transaction = self.platform.drive.grove.start_transaction(); self.transaction.write().unwrap().replace(transaction); } - fn transaction(&self) -> &RwLock>> { + fn transaction(&self) -> &RwLock>> { &self.transaction } @@ -77,13 +110,13 @@ impl<'a, C> TransactionalApplication<'a> for ConsensusAbciApplication<'a, C> { } } -impl<'a, C> Debug for ConsensusAbciApplication<'a, C> { +impl<'p, C> Debug for ConsensusAbciApplication<'p, C> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "") } } -impl<'a, C> tenderdash_abci::Application for ConsensusAbciApplication<'a, C> +impl<'p, C> tenderdash_abci::Application for ConsensusAbciApplication<'p, C> where C: CoreRPCLike, { @@ -149,4 +182,25 @@ where ) -> Result { handler::verify_vote_extension(self, request).map_err(error_into_exception) } + + fn offer_snapshot( + &self, + request: proto::RequestOfferSnapshot, + ) -> Result { + handler::offer_snapshot(self, request).map_err(error_into_exception) + } + + fn apply_snapshot_chunk( + &self, + request: proto::RequestApplySnapshotChunk, + ) -> Result { + handler::apply_snapshot_chunk(self, request).map_err(error_into_exception) + } + + fn finalize_snapshot( + &self, + request: proto::RequestFinalizeSnapshot, + ) -> Result { + handler::finalize_snapshot(self, request).map_err(error_into_exception) + } } diff --git a/packages/rs-drive-abci/src/abci/app/full.rs b/packages/rs-drive-abci/src/abci/app/full.rs index 55771162ad8..585c0424dd0 100644 --- a/packages/rs-drive-abci/src/abci/app/full.rs +++ b/packages/rs-drive-abci/src/abci/app/full.rs @@ -1,10 +1,14 @@ -use crate::abci::app::{BlockExecutionApplication, PlatformApplication, TransactionalApplication}; +use crate::abci::app::{ + BlockExecutionApplication, PlatformApplication, SnapshotFetchingApplication, + SnapshotManagerApplication, TransactionalApplication, +}; use crate::abci::handler; use crate::abci::handler::error::error_into_exception; use crate::error::execution::ExecutionError; use crate::error::Error; use crate::execution::types::block_execution_context::BlockExecutionContext; use crate::platform_types::platform::Platform; +use crate::platform_types::snapshot::{SnapshotFetchingSession, SnapshotManager}; use crate::rpc::core::CoreRPCLike; use dpp::version::PlatformVersion; use drive::grovedb::Transaction; @@ -23,15 +27,26 @@ pub struct FullAbciApplication<'a, C> { pub transaction: RwLock>>, /// The current block execution context pub block_execution_context: RwLock>, + /// The State sync session + pub snapshot_fetching_session: RwLock>>, + /// The snapshot manager + pub snapshot_manager: SnapshotManager, } impl<'a, C> FullAbciApplication<'a, C> { /// Create new ABCI app pub fn new(platform: &'a Platform) -> Self { + let snapshot_manager = SnapshotManager::new( + platform.config.state_sync_config.checkpoints_path.clone(), + platform.config.state_sync_config.max_num_snapshots, + platform.config.state_sync_config.snapshots_frequency, + ); Self { platform, transaction: Default::default(), block_execution_context: Default::default(), + snapshot_fetching_session: Default::default(), + snapshot_manager, } } } @@ -42,6 +57,22 @@ impl<'a, C> PlatformApplication for FullAbciApplication<'a, C> { } } +impl<'a, C> SnapshotManagerApplication for FullAbciApplication<'a, C> { + fn snapshot_manager(&self) -> &SnapshotManager { + &self.snapshot_manager + } +} + +impl<'a, C> SnapshotFetchingApplication<'a, C> for FullAbciApplication<'a, C> { + fn snapshot_fetching_session(&self) -> &RwLock>> { + &self.snapshot_fetching_session + } + + fn platform(&self) -> &'a Platform { + self.platform + } +} + impl<'a, C> BlockExecutionApplication for FullAbciApplication<'a, C> { fn block_execution_context(&self) -> &RwLock> { &self.block_execution_context @@ -150,4 +181,32 @@ where ) -> Result { handler::verify_vote_extension(self, request).map_err(error_into_exception) } + + fn offer_snapshot( + &self, + request: proto::RequestOfferSnapshot, + ) -> Result { + handler::offer_snapshot(self, request).map_err(error_into_exception) + } + + fn apply_snapshot_chunk( + &self, + request: proto::RequestApplySnapshotChunk, + ) -> Result { + handler::apply_snapshot_chunk(self, request).map_err(error_into_exception) + } + + fn list_snapshots( + &self, + request: proto::RequestListSnapshots, + ) -> Result { + handler::list_snapshots(self, request).map_err(error_into_exception) + } + + fn load_snapshot_chunk( + &self, + request: proto::RequestLoadSnapshotChunk, + ) -> Result { + handler::load_snapshot_chunk(self, request).map_err(error_into_exception) + } } diff --git a/packages/rs-drive-abci/src/abci/app/mod.rs b/packages/rs-drive-abci/src/abci/app/mod.rs index d86290b566b..572379b173d 100644 --- a/packages/rs-drive-abci/src/abci/app/mod.rs +++ b/packages/rs-drive-abci/src/abci/app/mod.rs @@ -10,6 +10,7 @@ pub mod execution_result; mod full; use crate::execution::types::block_execution_context::BlockExecutionContext; +use crate::platform_types::snapshot::{SnapshotFetchingSession, SnapshotManager}; use crate::rpc::core::DefaultCoreRPC; pub use check_tx::CheckTxAbciApplication; pub use consensus::ConsensusAbciApplication; @@ -22,13 +23,19 @@ pub trait PlatformApplication { fn platform(&self) -> &Platform; } +/// Platform-based ABCI application +pub trait SnapshotManagerApplication { + /// Returns Platform + fn snapshot_manager(&self) -> &SnapshotManager; +} + /// Transactional ABCI application -pub trait TransactionalApplication<'a> { +pub trait TransactionalApplication<'p> { /// Creates and keeps a new transaction fn start_transaction(&self); /// Returns the current transaction - fn transaction(&self) -> &RwLock>>; + fn transaction(&self) -> &RwLock>>; /// Commits created transaction fn commit_transaction(&self, platform_version: &PlatformVersion) -> Result<(), Error>; @@ -39,3 +46,12 @@ pub trait BlockExecutionApplication { /// Returns the current block execution context fn block_execution_context(&self) -> &RwLock>; } + +/// Application that can maintain state sync +pub trait SnapshotFetchingApplication<'p, C> { + /// Returns the current snapshot fetching session + fn snapshot_fetching_session(&self) -> &RwLock>>; + + /// Returns platform reference + fn platform(&self) -> &'p Platform; +} diff --git a/packages/rs-drive-abci/src/abci/config.rs b/packages/rs-drive-abci/src/abci/config.rs index 62779edd6b7..5b7f3250eaa 100644 --- a/packages/rs-drive-abci/src/abci/config.rs +++ b/packages/rs-drive-abci/src/abci/config.rs @@ -2,6 +2,7 @@ use crate::utils::from_opt_str_or_number; use serde::{Deserialize, Serialize}; +use std::path::PathBuf; // We allow changes in the ABCI configuration, but there should be a social process // involved in making this change. @@ -61,3 +62,62 @@ impl Default for AbciConfig { } } } + +/// Configuration for StateSync feature +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct StateSyncAbciConfig { + /// Enable snapshot + pub snapshots_enabled: bool, + + /// Path to checkpoints + #[serde(default = "StateSyncAbciConfig::default_checkpoints_path")] + pub checkpoints_path: PathBuf, + + /// Frequency of snapshot creation (in blocks) + pub snapshots_frequency: i64, + + /// Maximum number of snapshots to keep + pub max_num_snapshots: usize, +} + +impl Default for StateSyncAbciConfig { + fn default() -> Self { + Self::default_mainnet() + } +} + +#[allow(missing_docs)] +impl StateSyncAbciConfig { + pub fn default_local() -> Self { + Self { + snapshots_enabled: true, + checkpoints_path: PathBuf::from("/var/lib/dash-platform/data/checkpoints"), + snapshots_frequency: 10, + max_num_snapshots: 100, + } + } + + pub fn default_testnet() -> Self { + Self { + snapshots_enabled: true, + checkpoints_path: PathBuf::from("/var/lib/dash-platform/data/checkpoints"), + snapshots_frequency: 10, + max_num_snapshots: 100, + } + } + + pub fn default_mainnet() -> Self { + Self { + snapshots_enabled: true, + checkpoints_path: PathBuf::from("/var/lib/dash-platform/data/checkpoints"), + snapshots_frequency: 10, + max_num_snapshots: 100, + } + } + + fn default_checkpoints_path() -> PathBuf { + std::env::var("CHECKPOINTS_PATH") + .map(PathBuf::from) + .unwrap_or_else(|_| PathBuf::from("/var/lib/dash-platform/data/checkpoints")) + } +} diff --git a/packages/rs-drive-abci/src/abci/error.rs b/packages/rs-drive-abci/src/abci/error.rs index 857321a16e8..56cd78deafd 100644 --- a/packages/rs-drive-abci/src/abci/error.rs +++ b/packages/rs-drive-abci/src/abci/error.rs @@ -54,6 +54,14 @@ pub enum AbciError { #[error("bad commit signature: {0}")] BadCommitSignature(String), + /// Client State sync bad request + #[error("bad request state sync: {0}")] + StateSyncBadRequest(String), + + /// Server State sync bad request + #[error("internal error state sync: {0}")] + StateSyncInternalError(String), + /// The chain lock received was invalid #[error("invalid chain lock: {0}")] InvalidChainLock(String), diff --git a/packages/rs-drive-abci/src/abci/handler/apply_snapshot_chunk.rs b/packages/rs-drive-abci/src/abci/handler/apply_snapshot_chunk.rs new file mode 100644 index 00000000000..62b5844ad03 --- /dev/null +++ b/packages/rs-drive-abci/src/abci/handler/apply_snapshot_chunk.rs @@ -0,0 +1,122 @@ +use crate::abci::app::{SnapshotFetchingApplication, SnapshotManagerApplication}; +use crate::abci::handler::load_snapshot_chunk::ChunkData; +use crate::abci::AbciError; +use crate::error::Error; +use dpp::version::PlatformVersion; +use tenderdash_abci::proto::abci as proto; + +pub fn apply_snapshot_chunk<'a, 'db: 'a, A, C: 'db>( + app: &'a A, + request: proto::RequestApplySnapshotChunk, +) -> Result +where + A: SnapshotManagerApplication + SnapshotFetchingApplication<'db, C> + 'db, +{ + tracing::trace!( + "[state_sync] api apply_snapshot_chunk chunk_id:{}", + hex::encode(&request.chunk_id) + ); + let mut is_state_sync_completed: bool = false; + // Lock first the RwLock + let mut session_write_guard = app.snapshot_fetching_session().write().map_err(|_| { + AbciError::StateSyncInternalError( + "apply_snapshot_chunk unable to lock session (poisoned)".to_string(), + ) + })?; + { + let session = session_write_guard + .as_mut() + .ok_or(AbciError::StateSyncInternalError( + "apply_snapshot_chunk unable to lock session".to_string(), + ))?; + + let chunk_data = ChunkData::deserialize(&request.chunk).map_err(|e| { + AbciError::StateSyncInternalError(format!( + "apply_snapshot_chunk unable to deserialize chunk: {}", + e + )) + })?; + let chunk = chunk_data.chunk(); + + let next_chunk_ids = session + .state_sync_info + .apply_chunk( + &app.platform().drive.grove, + &request.chunk_id, + chunk, + 1u16, + &PlatformVersion::latest().drive.grove_version, + ) + .map_err(|e| { + tracing::error!( + chunk_id = ?request.chunk_id, + chunk = ?request.chunk, + "state_sync apply_chunk_error", + ); + AbciError::StateSyncInternalError(format!( + "apply_snapshot_chunk unable to apply chunk:{}", + e + )) + })?; + if next_chunk_ids.is_empty() && session.state_sync_info.is_sync_completed() { + is_state_sync_completed = true; + } + tracing::debug!(is_state_sync_completed, "state_sync apply_snapshot_chunk",); + if !is_state_sync_completed { + return Ok(proto::ResponseApplySnapshotChunk { + result: proto::response_apply_snapshot_chunk::Result::Accept.into(), + refetch_chunks: vec![], // TODO: Check when this is needed + reject_senders: vec![], // TODO: Check when this is needed + next_chunks: next_chunk_ids, + }); + } + } + { + // State sync is completed, consume session and commit it + let session = session_write_guard + .take() + .ok_or(AbciError::StateSyncInternalError( + "apply_snapshot_chunk unable to lock session (poisoned)".to_string(), + ))?; + let state_sync_info = session.state_sync_info; + app.platform() + .drive + .grove + .commit_session(state_sync_info) + .map_err(|e| { + AbciError::StateSyncInternalError(format!( + "apply_snapshot_chunk unable to commit session: {}", + e + )) + })?; + tracing::trace!("[state_sync] state sync completed. verifying"); + let incorrect_hashes = app + .platform() + .drive + .grove + .verify_grovedb( + None, + true, + false, + &PlatformVersion::latest().drive.grove_version, + ) + .map_err(|e| { + AbciError::StateSyncInternalError(format!( + "apply_snapshot_chunk unable to verify grovedb: {}", + e + )) + })?; + if !incorrect_hashes.is_empty() { + Err(AbciError::StateSyncInternalError(format!( + "apply_snapshot_chunk grovedb verification failed with {} incorrect hashes", + incorrect_hashes.len() + )))?; + } + Ok(proto::ResponseApplySnapshotChunk { + result: proto::response_apply_snapshot_chunk::Result::CompleteSnapshot.into(), + refetch_chunks: vec![], + reject_senders: vec![], + next_chunks: vec![], + }) + } +} diff --git a/packages/rs-drive-abci/src/abci/handler/finalize_block.rs b/packages/rs-drive-abci/src/abci/handler/finalize_block.rs index 852f85cc6b8..b010803818a 100644 --- a/packages/rs-drive-abci/src/abci/handler/finalize_block.rs +++ b/packages/rs-drive-abci/src/abci/handler/finalize_block.rs @@ -1,4 +1,7 @@ -use crate::abci::app::{BlockExecutionApplication, PlatformApplication, TransactionalApplication}; +use crate::abci::app::{ + BlockExecutionApplication, PlatformApplication, SnapshotManagerApplication, + TransactionalApplication, +}; use crate::error::execution::ExecutionError; use crate::error::Error; use crate::execution::types::block_execution_context::v0::BlockExecutionContextV0Getters; @@ -14,7 +17,10 @@ pub fn finalize_block<'a, A, C>( request: proto::RequestFinalizeBlock, ) -> Result where - A: PlatformApplication + TransactionalApplication<'a> + BlockExecutionApplication, + A: PlatformApplication + + SnapshotManagerApplication + + TransactionalApplication<'a> + + BlockExecutionApplication, C: CoreRPCLike, { let _timer = crate::metrics::abci_request_duration("finalize_block"); @@ -94,7 +100,28 @@ where app.platform() .committed_block_height_guard - .store(block_height, Ordering::Relaxed); + .store(block_height.clone(), Ordering::Relaxed); + + if (app.platform().config.state_sync_config.snapshots_enabled) { + app.snapshot_manager() + .create_snapshot(&app.platform().drive.grove, block_height as i64) + .map_err(|e| { + Error::Execution(ExecutionError::CorruptedDriveResponse(format!( + "Unable to create snapshot:{}", + e + ))) + })?; + } + + let platform_state = app.platform().state.load(); + + let block_height = platform_state.last_committed_block_height(); + + tracing::trace!( + block_height, + platform_state = ?platform_state, + "state_finalize_block" + ); Ok(proto::ResponseFinalizeBlock { retain_height: 0 }) } diff --git a/packages/rs-drive-abci/src/abci/handler/finalize_snapshot.rs b/packages/rs-drive-abci/src/abci/handler/finalize_snapshot.rs new file mode 100644 index 00000000000..219657e2438 --- /dev/null +++ b/packages/rs-drive-abci/src/abci/handler/finalize_snapshot.rs @@ -0,0 +1,684 @@ +use std::collections::BTreeMap; +use std::sync::Arc; +use dashcore_rpc::dashcore::hashes::Hash; +use dashcore_rpc::dashcore::{PubkeyHash, QuorumHash}; +use dashcore_rpc::dashcore::blsful::Bls12381G2Impl; +use dashcore_rpc::dashcore_rpc_json::{ExtendedQuorumListResult, MasternodeListDiff, MasternodeListItem, MasternodeType, QuorumType}; +use indexmap::IndexMap; +use itertools::Itertools; +use tenderdash_abci::proto::{abci as proto, ToMillis}; +use tenderdash_abci::proto::crypto::PublicKey; +use tenderdash_abci::proto::google::protobuf::Timestamp; +use tenderdash_abci::proto::tenderdash_grpc::crypto::public_key::Sum::Bls12381; +use tenderdash_abci::proto::tenderdash_nostd::types::LightBlock; +use tenderdash_abci::proto::types::ValidatorSet; +use tenderdash_abci::signatures::Hashable; +use dpp::block::block_info::BlockInfo; +use dpp::block::epoch::{Epoch, EPOCH_0}; +use dpp::block::extended_block_info::ExtendedBlockInfo; +use dpp::block::extended_block_info::v0::ExtendedBlockInfoV0; +use dpp::core_types::validator::v0::ValidatorV0; +use dpp::core_types::validator_set::v0::{ValidatorSetV0, ValidatorSetV0Getters}; +use dpp::bls_signatures::PublicKey as BlsPublicKey; +use dpp::dashcore::ProTxHash; +use dpp::platform_value::Bytes32; +use dpp::version::version::ProtocolVersion; +use dpp::version::PlatformVersion; +use dpp::core_types::validator_set::ValidatorSet as CoreValidatorSet; +use crate::abci::AbciError; +use crate::abci::app::{PlatformApplication}; +use crate::error::Error; +use crate::error::execution::ExecutionError; +use crate::execution::types::block_state_info::v0::BlockStateInfoV0; +use crate::platform_types::epoch_info::EpochInfo; +use crate::platform_types::epoch_info::v0::EpochInfoV0; +use crate::platform_types::platform_state::PlatformState; +use crate::platform_types::platform_state::v0::{PlatformStateForSavingV1, PlatformStateV0, PlatformStateV0Methods}; +use crate::platform_types::signature_verification_quorum_set::{SignatureVerificationQuorumSet, SignatureVerificationQuorumSetForSaving, SignatureVerificationQuorumSetV0Methods, ThresholdBlsPublicKey, VerificationQuorum}; +use crate::rpc::core::CoreRPCLike; +use crate::platform_types::signature_verification_quorum_set::SignatureVerificationQuorumSetForSaving::V1; +use crate::platform_types::validator_set::v0::ValidatorSetMethodsV0; +use crate::execution::platform_events::core_based_updates::update_quorum_info::v0::QuorumSetType; +use crate::execution::types::block_execution_context::BlockExecutionContext; +use crate::platform_types::validator_set::ValidatorSetExt; + +pub fn finalize_snapshot( + app: &A, + request: proto::RequestFinalizeSnapshot, +) -> Result +where + A: PlatformApplication, + C: CoreRPCLike, +{ + let config = &app.platform().config; + + let snapshot_block = + request + .snapshot_block + .as_ref() + .ok_or(Error::Abci(AbciError::BadRequest( + "Empty Snapshot Block".to_string(), + )))?; + + let snapshot_signed_header = + snapshot_block + .signed_header + .as_ref() + .ok_or(Error::Abci(AbciError::BadRequest( + "Empty Snapshot Signed Header".to_string(), + )))?; + + let snapshot_header = + snapshot_signed_header + .header + .as_ref() + .ok_or(Error::Abci(AbciError::BadRequest( + "Empty Snapshot Header".to_string(), + )))?; + + if snapshot_header.proposer_pro_tx_hash.len() != 32 { + return Err(Error::Abci(AbciError::BadRequestDataSize( + "Invalid Proposer Tx Hash Size".to_string(), + ))); + } + let mut snapshot_proposer_pro_tx_hash_32 = [0u8; 32]; + snapshot_proposer_pro_tx_hash_32.copy_from_slice(&snapshot_header.proposer_pro_tx_hash[..32]); + + let snapshot_header_version = + snapshot_header + .version + .as_ref() + .ok_or(Error::Abci(AbciError::BadRequest( + "Empty Snapshot Header Version".to_string(), + )))?; + + let snapshot_header_time = + snapshot_header + .time + .as_ref() + .ok_or(Error::Abci(AbciError::BadRequest( + "Empty Snapshot Header Timestamp".to_string(), + )))?; + + let snapshot_block_time = snapshot_header_time.to_millis().map_err(|_| { + Error::Abci(AbciError::BadRequest( + "Invalid Snapshot Header Timestamp".to_string(), + )) + })?; + + let snapshot_header_last_block_id = + snapshot_header + .last_block_id + .as_ref() + .ok_or(Error::Abci(AbciError::BadRequest( + "Empty Snapshot Header Last BlockId".to_string(), + )))?; + + if snapshot_header.app_hash.len() != 32 { + return Err(Error::Abci(AbciError::BadRequestDataSize( + "Invalid Snapshot Header App Hash Size".to_string(), + ))); + } + let mut snapshot_header_app_hash_32 = [0u8; 32]; + snapshot_header_app_hash_32.copy_from_slice(&snapshot_header.app_hash[..32]); + + if snapshot_header.proposer_pro_tx_hash.len() != 32 { + return Err(Error::Abci(AbciError::BadRequestDataSize( + "Invalid Snapshot Header Proposer ProTx Hash Size".to_string(), + ))); + } + let mut snapshot_header_proposer_pro_tx_hash_32 = [0u8; 32]; + snapshot_header_proposer_pro_tx_hash_32 + .copy_from_slice(&snapshot_header.proposer_pro_tx_hash[..32]); + + if snapshot_header_last_block_id.hash.len() != 32 { + return Err(Error::Abci(AbciError::BadRequestDataSize( + "Invalid Snapshot Header Last Block Hash Size".to_string(), + ))); + } + let mut snapshot_header_last_block_id_hash_32 = [0u8; 32]; + snapshot_header_last_block_id_hash_32 + .copy_from_slice(&snapshot_header_last_block_id.hash[..32]); + + if snapshot_header.validators_hash.len() != 32 { + return Err(Error::Abci(AbciError::BadRequestDataSize( + "Invalid Snapshot Header Validator Hash Size".to_string(), + ))); + } + let mut snapshot_header_validator_hash_32 = [0u8; 32]; + snapshot_header_validator_hash_32.copy_from_slice(&snapshot_header.validators_hash[..32]); + + if snapshot_header.next_validators_hash.len() != 32 { + return Err(Error::Abci(AbciError::BadRequestDataSize( + "Invalid Snapshot Header Next Validator Hash Size".to_string(), + ))); + } + let mut snapshot_header_next_validator_hash_32 = [0u8; 32]; + snapshot_header_next_validator_hash_32 + .copy_from_slice(&snapshot_header.next_validators_hash[..32]); + + let snapshot_commit = + snapshot_signed_header + .commit + .as_ref() + .ok_or(Error::Abci(AbciError::BadRequest( + "Empty Snapshot Commit".to_string(), + )))?; + + let snapshot_commit_block_id = + snapshot_commit + .block_id + .as_ref() + .ok_or(Error::Abci(AbciError::BadRequest( + "Empty Snapshot Commit".to_string(), + )))?; + + if snapshot_commit_block_id.hash.len() != 32 { + return Err(Error::Abci(AbciError::BadRequestDataSize( + "Invalid Snapshot Commit Block Hash Size".to_string(), + ))); + } + let mut snapshot_commit_block_hash_32 = [0u8; 32]; + snapshot_commit_block_hash_32.copy_from_slice(&snapshot_commit_block_id.hash[..32]); + + if snapshot_commit.quorum_hash.len() != 32 { + return Err(Error::Abci(AbciError::BadRequestDataSize( + "Invalid Snapshot Commit Quorum Hash Size".to_string(), + ))); + } + let mut snapshot_commit_quorum_hash_32 = [0u8; 32]; + snapshot_commit_quorum_hash_32.copy_from_slice(&snapshot_commit.quorum_hash[..32]); + + if snapshot_commit.threshold_block_signature.len() != 96 { + return Err(Error::Abci(AbciError::BadRequestDataSize( + "Invalid Snapshot Commit Threshold Block Signature Size".to_string(), + ))); + } + let mut snapshot_commit_threshold_block_sig_96 = [0u8; 96]; + snapshot_commit_threshold_block_sig_96 + .copy_from_slice(&snapshot_commit.threshold_block_signature[..96]); + + let genesis_block = + request + .genesis_block + .as_ref() + .ok_or(Error::Abci(AbciError::BadRequest( + "Empty Genesis Block".to_string(), + )))?; + + let genesis_signed_header = + genesis_block + .signed_header + .as_ref() + .ok_or(Error::Abci(AbciError::BadRequest( + "Empty Genesis Signed Header".to_string(), + )))?; + + let genesis_header = + genesis_signed_header + .header + .as_ref() + .ok_or(Error::Abci(AbciError::BadRequest( + "Empty Genesis Header".to_string(), + )))?; + + let genesis_header_time = + genesis_header + .time + .as_ref() + .ok_or(Error::Abci(AbciError::BadRequest( + "Empty Genesis Header Timestamp".to_string(), + )))?; + + let genesis_block_time = genesis_header_time.to_millis().map_err(|_| { + Error::Abci(AbciError::BadRequest( + "Invalid Genesis Header Timestamp".to_string(), + )) + })?; + + let genesis_block_info = BlockInfo { + time_ms: genesis_block_time, + height: 1, + core_height: genesis_header.core_chain_locked_height, + epoch: EPOCH_0, + }; + + let snapshot_block_state_info = BlockStateInfoV0 { + height: snapshot_header.height as u64, + round: snapshot_commit.round as u32, + block_time_ms: snapshot_block_time, + previous_block_time_ms: None, + proposer_pro_tx_hash: snapshot_proposer_pro_tx_hash_32, + core_chain_locked_height: snapshot_header.core_chain_locked_height, + block_hash: Some(snapshot_commit_block_hash_32), + app_hash: None, + }; + + let current_epoch_info = EpochInfoV0::from_genesis_time_and_block_info( + genesis_block_time, + &snapshot_block_state_info, + config.execution.epoch_time_length_s, + )?; + + let current_protocol_version_in_consensus = snapshot_header_version.app as u32; + let next_epoch_protocol_version = snapshot_header.proposed_app_version as u32; + let current_validator_set_quorum_hash = + QuorumHash::from_byte_array(snapshot_commit_quorum_hash_32); + + let mut platform_state = PlatformState::V0(PlatformStateV0 { + genesis_block_info: Some(genesis_block_info), + last_committed_block_info: Some(ExtendedBlockInfo::from(ExtendedBlockInfoV0 { + basic_info: BlockInfo { + time_ms: snapshot_block_time, + height: snapshot_header.height as u64, + core_height: snapshot_header.core_chain_locked_height, + epoch: Epoch::new(current_epoch_info.current_epoch_index)?, + }, + app_hash: snapshot_header_app_hash_32, + quorum_hash: snapshot_commit_quorum_hash_32, + block_id_hash: snapshot_header_last_block_id_hash_32, + proposer_pro_tx_hash: snapshot_header_proposer_pro_tx_hash_32, + signature: snapshot_commit_threshold_block_sig_96, + round: snapshot_commit.round as u32, + })), + current_protocol_version_in_consensus, + next_epoch_protocol_version, + current_validator_set_quorum_hash, + next_validator_set_quorum_hash: None, + patched_platform_version: None, + validator_sets: Default::default(), + chain_lock_validating_quorums: SignatureVerificationQuorumSet::from( + SignatureVerificationQuorumSet::new( + &config.chain_lock, + PlatformVersion::get(current_protocol_version_in_consensus)?, + )?, + ), + instant_lock_validating_quorums: SignatureVerificationQuorumSet::from( + SignatureVerificationQuorumSet::new( + &config.instant_lock, + PlatformVersion::get(current_protocol_version_in_consensus)?, + )?, + ), + full_masternode_list: BTreeMap::new(), + hpmn_masternode_list: BTreeMap::new(), + previous_fee_versions: Default::default(), + }); + + build_masternode_lists( + app, + &mut platform_state, + snapshot_header.core_chain_locked_height, + )?; + + let mut extended_quorum_list = app + .platform() + .core_rpc + .get_quorum_listextended(Some(snapshot_header.core_chain_locked_height))?; + build_quorum_verification_set( + app, + &extended_quorum_list, + QuorumSetType::ChainLock(config.chain_lock.quorum_type), + platform_state.chain_lock_validating_quorums_mut(), + )?; + build_quorum_verification_set( + app, + &extended_quorum_list, + QuorumSetType::InstantLock(config.instant_lock.quorum_type), + platform_state.instant_lock_validating_quorums_mut(), + )?; + + build_validators_list( + app, + &mut platform_state, + &mut extended_quorum_list, + config.validator_set.quorum_type, + )?; + + build_next_validator_set_quorum_hash( + snapshot_header_proposer_pro_tx_hash_32, + &mut platform_state, + )?; + + let block_height = platform_state.last_committed_block_height(); + + tracing::info!( + block_height, + platform_state = ?platform_state, + "state_finalize_snapshot", + ); + + let tx = app.platform().drive.grove.start_transaction(); + + app.platform() + .store_platform_state(&platform_state, Some(&tx), &PlatformVersion::latest())?; + + let _ = app.platform().drive.grove.commit_transaction(tx); + + app.platform().state.store(Arc::new(platform_state)); + + Ok(Default::default()) +} + +fn build_masternode_lists( + app: &A, + platform_state: &mut PlatformState, + core_block_height: u32, +) -> Result<(), Error> +where + A: PlatformApplication, + C: CoreRPCLike, +{ + let mn_list_diff = app + .platform() + .core_rpc + .get_protx_diff_with_masternodes(Some(1), core_block_height)?; + + let MasternodeListDiff { added_mns, .. } = &mn_list_diff; + + let added_hpmns = added_mns.iter().filter_map(|masternode| { + if masternode.node_type == MasternodeType::Evo { + Some((masternode.pro_tx_hash, masternode.clone())) + } else { + None + } + }); + + let added_masternodes = added_mns + .iter() + .map(|masternode| (masternode.pro_tx_hash, masternode.clone())); + + platform_state + .full_masternode_list_mut() + .extend(added_masternodes); + platform_state + .hpmn_masternode_list_mut() + .extend(added_hpmns); + + Ok(()) +} + +fn build_quorum_verification_set( + app: &A, + extended_quorum_list: &ExtendedQuorumListResult, + quorum_set_type: QuorumSetType, + quorum_set: &mut SignatureVerificationQuorumSet, +) -> Result<(), Error> +where + A: PlatformApplication, + C: CoreRPCLike, +{ + let quorums_list: BTreeMap<_, _> = extended_quorum_list + .quorums_by_type + .get(&quorum_set_type.quorum_type()) + .ok_or(Error::Execution(ExecutionError::DashCoreBadResponseError( + format!( + "expected quorums {}, but did not receive any from Dash Core", + quorum_set_type + ), + )))? + .iter() + .map(|(quorum_hash, extended_quorum_details)| { + (quorum_hash, extended_quorum_details.quorum_index) + }) + .collect(); + + // Fetch quorum info and their keys from the RPC for new quorums + // and then create VerificationQuorum instances + let new_quorums = quorums_list + .into_iter() + .map(|(quorum_hash, index)| { + let quorum_info = app.platform().core_rpc.get_quorum_info( + quorum_set_type.quorum_type(), + quorum_hash, + None, + )?; + + let public_key = match BlsPublicKey::try_from(quorum_info.quorum_public_key.as_slice()) + .map_err(ExecutionError::BlsErrorFromDashCoreResponse) + { + Ok(public_key) => public_key, + Err(e) => return Err(e.into()), + }; + + Ok((*quorum_hash, VerificationQuorum { public_key, index })) + }) + .collect::, Error>>()?; + + quorum_set.current_quorums_mut().extend(new_quorums); + + Ok(()) +} + +fn build_validators_list( + app: &A, + platform_state: &mut PlatformState, + extended_quorum_list: &mut ExtendedQuorumListResult, + validator_set_quorum_type: QuorumType, +) -> Result<(), Error> +where + A: PlatformApplication, + C: CoreRPCLike, +{ + let validator_quorums_list: BTreeMap<_, _> = extended_quorum_list + .quorums_by_type + .remove(&validator_set_quorum_type) + .ok_or(Error::Execution(ExecutionError::DashCoreBadResponseError( + format!( + "expected quorums of type {}, but did not receive any from Dash Core", + validator_set_quorum_type + ), + )))? + .into_iter() + .collect(); + + // Fetch quorum info and their keys from the RPC for new quorums + let mut quorum_infos = validator_quorums_list + .into_iter() + .map(|(key, _)| { + let quorum_info_result = + app.platform() + .core_rpc + .get_quorum_info(validator_set_quorum_type, &key, None)?; + Ok((key, quorum_info_result)) + }) + .collect::, Error>>()?; + + // Sort by height and then by hash + quorum_infos.sort_by(|a, b| { + let height_cmp = a.1.height.cmp(&b.1.height); + if height_cmp == std::cmp::Ordering::Equal { + a.0.cmp(&b.0) // Compare hashes if heights are equal + } else { + height_cmp + } + }); + + // Map to validator sets + let new_validator_sets = quorum_infos + .into_iter() + .map(|(quorum_hash, info_result)| { + let validator_set = CoreValidatorSet::V0(ValidatorSetV0::try_from_quorum_info_result( + info_result, + &platform_state, + )?); + Ok((quorum_hash, validator_set)) + }) + .collect::, Error>>()?; + + platform_state + .validator_sets_mut() + .extend(new_validator_sets); + + // Sort all validator sets into deterministic order by core block height of creation + platform_state + .validator_sets_mut() + .sort_by(|_, quorum_a, _, quorum_b| { + let primary_comparison = quorum_b.core_height().cmp(&quorum_a.core_height()); + if primary_comparison == std::cmp::Ordering::Equal { + quorum_b + .quorum_hash() + .cmp(quorum_a.quorum_hash()) + .then_with(|| quorum_b.core_height().cmp(&quorum_a.core_height())) + } else { + primary_comparison + } + }); + + Ok(()) +} + +fn build_next_validator_set_quorum_hash( + proposer_pro_tx_hash: [u8; 32], + platform_state: &mut PlatformState, +) -> Result<(), Error> { + let mut perform_rotation = false; + + if let Some(validator_set) = platform_state + .validator_sets() + .get(&platform_state.current_validator_set_quorum_hash()) + { + if let Some((last_member_pro_tx_hash, _)) = validator_set.members().last_key_value() { + // we should also perform a rotation if the validator set went through all quorum members + // this means we are at the last member of the quorum + if last_member_pro_tx_hash.as_byte_array() == &proposer_pro_tx_hash { + tracing::debug!( + method = "build_next_validator_set_quorum_hash", + "rotation: quorum finished as we hit last member {} of quorum {}. All known quorums are: [{}]. quorum rotation expected", + hex::encode(proposer_pro_tx_hash), + hex::encode(platform_state.current_validator_set_quorum_hash().as_byte_array()), + platform_state + .validator_sets() + .keys() + .map(hex::encode).collect::>().join(" | "), + ); + perform_rotation = true; + } + } else { + // the validator set has no members, very weird, but let's just perform a rotation + tracing::debug!( + method = "build_next_validator_set_quorum_hash", + "rotation: validator set has no members", + ); + perform_rotation = true; + } + + // We should also perform a rotation if there are more than one quorum in the system + // and that the new proposer is on the same quorum and the last proposer but is before + // them in the list of proposers. + // This only works if Tenderdash goes through proposers properly + if &platform_state.last_committed_quorum_hash() + == platform_state + .current_validator_set_quorum_hash() + .as_byte_array() + && platform_state.last_committed_block_proposer_pro_tx_hash() > proposer_pro_tx_hash + && platform_state.validator_sets().len() > 1 + { + // 1 - We haven't changed quorums + // 2 - The new proposer is before the old proposer + // 3 - There are more than one quorum in the system + tracing::debug!( + method = "build_next_validator_set_quorum_hash", + "rotation: quorum finished as we hit last an earlier member {} than last block proposer {} for quorum {}. All known quorums are: [{}]. quorum rotation expected", + hex::encode(proposer_pro_tx_hash), + hex::encode(platform_state.last_committed_block_proposer_pro_tx_hash()), + hex::encode(platform_state.current_validator_set_quorum_hash().as_byte_array()), + platform_state + .validator_sets() + .keys() + .map(hex::encode).collect::>().join(" | "), + ); + perform_rotation = true; + } + } else { + // we also need to perform a rotation if the validator set is being removed + tracing::debug!( + method = "build_next_validator_set_quorum_hash", + "rotation: new quorums not containing current quorum current {:?}, {}. quorum rotation expected", + platform_state + .validator_sets() + .keys() + .map(|quorum_hash| format!("{}", quorum_hash)), + &platform_state.current_validator_set_quorum_hash() + ); + perform_rotation = true; + } + + //todo: (maybe) perform a rotation if quorum health is low + + if perform_rotation { + // get the index of the previous quorum + let mut index = platform_state + .validator_sets() + .get_index_of(&platform_state.current_validator_set_quorum_hash()) + .ok_or(Error::Execution(ExecutionError::CorruptedCachedState( + format!("perform_rotation: current validator set quorum hash {} not in current known validator sets [{}] processing block {}", platform_state.current_validator_set_quorum_hash(), platform_state + .validator_sets().keys().map(|quorum_hash| quorum_hash.to_string()).join(" | "), + platform_state.last_committed_block_height() + 1, + ))))?; + // we should rotate the quorum + let quorum_count = platform_state.validator_sets().len(); + match quorum_count { + 0 => Err(Error::Execution(ExecutionError::CorruptedCachedState( + "no current quorums".to_string(), + ))), + 1 => Ok(()), // no rotation as we are the only quorum + count => { + let start_index = index; + let oldest_quorum_index_we_can_go_to = if count > 10 { + // if we have a lot of quorums (like on testnet and mainnet) + // we shouldn't start using the last ones as they could cycle out + count - 2 + } else { + count + }; + index = if index + 1 >= oldest_quorum_index_we_can_go_to { + 0 + } else { + index + 1 + }; + // We can't just take the next item because it might no longer be in the state + for _i in 0..oldest_quorum_index_we_can_go_to { + let (quorum_hash, _) = platform_state + .validator_sets() + .get_index(index) + .expect("expected next validator set"); + + // We still have it in the state + if let Some(new_validator_set) = + platform_state.validator_sets().get(quorum_hash) + { + tracing::debug!( + method = "build_next_validator_set_quorum_hash", + "rotation: to new quorum: {} with {} members", + &quorum_hash, + new_validator_set.members().len() + ); + *platform_state.current_validator_set_quorum_hash_mut() = *quorum_hash; + return Ok(()); + } + index = (index + 1) % oldest_quorum_index_we_can_go_to; + if index == start_index { + break; + } + } + // All quorums changed + if let Some((quorum_hash, new_validator_set)) = + platform_state.validator_sets().first() + { + tracing::debug!( + method = "build_next_validator_set_quorum_hash", + "rotation: all quorums changed, rotation to new quorum: {}", + &quorum_hash + ); + *platform_state.current_validator_set_quorum_hash_mut() = *quorum_hash; + return Ok(()); + } + tracing::debug!("no new quorums to choose from"); + Ok(()) + } + } + } else { + tracing::debug!("no rotation"); + Ok(()) + } +} diff --git a/packages/rs-drive-abci/src/abci/handler/info.rs b/packages/rs-drive-abci/src/abci/handler/info.rs index 9ac9d316267..bb3005d6b54 100644 --- a/packages/rs-drive-abci/src/abci/handler/info.rs +++ b/packages/rs-drive-abci/src/abci/handler/info.rs @@ -22,6 +22,14 @@ where let platform_state = app.platform().state.load(); + let block_height = platform_state.last_committed_block_height(); + + tracing::trace!( + block_height, + platform_state = ?platform_state, + "state_info" + ); + let last_block_height = platform_state.last_committed_block_height() as i64; // Verify that Platform State corresponds to Drive commited state diff --git a/packages/rs-drive-abci/src/abci/handler/list_snapshots.rs b/packages/rs-drive-abci/src/abci/handler/list_snapshots.rs new file mode 100644 index 00000000000..9744d98cacb --- /dev/null +++ b/packages/rs-drive-abci/src/abci/handler/list_snapshots.rs @@ -0,0 +1,48 @@ +use crate::abci::app::{PlatformApplication, SnapshotManagerApplication}; +use crate::abci::handler::error::error_into_exception; +use crate::abci::AbciError; +use crate::error::Error; +use crate::platform_types::snapshot::Snapshot; +use crate::rpc::core::CoreRPCLike; +use drive::grovedb::GroveDb; +use std::path::Path; +use tenderdash_abci::proto::abci as proto; + +pub fn list_snapshots( + app: &A, + _: proto::RequestListSnapshots, +) -> Result +where + A: SnapshotManagerApplication + PlatformApplication, + C: CoreRPCLike, +{ + println!("[state_sync] api list_snapshots called"); + tracing::trace!("[state_sync] api list_snapshots called"); + let snapshots = app + .snapshot_manager() + .get_snapshots(&*app.platform().drive.grove) + .map_err(|e| { + AbciError::StateSyncInternalError(format!( + "list_snapshots unable to get snapshots: {}", + e + )) + })?; + + let mut response: proto::ResponseListSnapshots = Default::default(); + let convert_snapshots = |s: Snapshot| -> proto::Snapshot { + proto::Snapshot { + height: s.height as u64, + version: s.version as u32, + hash: s.hash.to_vec(), + metadata: s.metadata, + } + }; + let checkpoint_exists = |s: &Snapshot| -> bool { Path::new(&s.path).exists() }; + + response.snapshots = snapshots + .into_iter() + .filter(checkpoint_exists) + .map(convert_snapshots) + .collect(); + Ok(response) +} diff --git a/packages/rs-drive-abci/src/abci/handler/load_snapshot_chunk.rs b/packages/rs-drive-abci/src/abci/handler/load_snapshot_chunk.rs new file mode 100644 index 00000000000..4f10e447805 --- /dev/null +++ b/packages/rs-drive-abci/src/abci/handler/load_snapshot_chunk.rs @@ -0,0 +1,197 @@ +use crate::abci::app::{PlatformApplication, SnapshotManagerApplication}; +use bincode::{Decode, Encode}; +use drive::grovedb::GroveDb; +use tenderdash_abci::proto::abci as proto; +//use platform_version::version::PlatformVersion; +use crate::abci::AbciError; +use crate::error::Error; +use crate::rpc::core::CoreRPCLike; +use dpp::version::PlatformVersion; + +pub fn load_snapshot_chunk( + app: &A, + request: proto::RequestLoadSnapshotChunk, +) -> Result +where + A: SnapshotManagerApplication + PlatformApplication, + C: CoreRPCLike, +{ + tracing::trace!( + "[state_sync] api load_snapshot_chunk height:{} chunk_id:{}", + request.height, + hex::encode(&request.chunk_id) + ); + let matched_snapshot = app + .snapshot_manager() + .get_snapshot_at_height(&app.platform().drive.grove, request.height as i64) + .map_err(|_| { + AbciError::StateSyncInternalError( + "load_snapshot_chunk failed: error matched snapshot".to_string(), + ) + })? + .ok_or_else(|| { + AbciError::StateSyncInternalError( + "load_snapshot_chunk failed: empty matched snapshot".to_string(), + ) + })?; + let db = GroveDb::open(&matched_snapshot.path).map_err(|e| { + AbciError::StateSyncInternalError(format!( + "load_snapshot_chunk failed: error opening grove:{}", + e + )) + })?; + let chunk = db + .fetch_chunk( + &request.chunk_id, + None, + request.version as u16, + &PlatformVersion::latest().drive.grove_version, + ) + .map_err(|e| { + AbciError::StateSyncInternalError(format!( + "load_snapshot_chunk failed: error fetching chunk{}", + e + )) + })?; + + // wrap chunk data with some metadata + let chunk_data = ChunkData::new(&chunk).serialize()?; + let response = proto::ResponseLoadSnapshotChunk { chunk: chunk_data }; + Ok(response) +} + +fn crc32(data: &[u8]) -> [u8; 4] { + let crc32 = crc::Crc::::new(&crc::CRC_32_CKSUM); + let mut digest = crc32.digest(); + digest.update(data); + digest.finalize().to_le_bytes() +} + +// ChunkData wraps binary chunk data with additional metadata, like checksum and size. +// +// There is no way to update the chunk - create new ChunkData instead. +// +// TODO: Use Platform encoding instead of raw binencode? +#[derive(Debug, Clone, Encode, Decode)] +pub(crate) struct ChunkData { + version: u8, + crc32: [u8; 4], + size: u64, + chunk: Vec, +} + +const CHUNK_VERSION: u8 = 1; + +impl ChunkData { + pub fn new(chunk_data: &[u8]) -> Self { + let crc32 = crc32(chunk_data); + let size = chunk_data.len() as u64; + ChunkData { + chunk: chunk_data.to_vec(), + crc32, + size, + version: CHUNK_VERSION, + } + } + + pub fn chunk(&self) -> &[u8] { + &self.chunk + } + + // serialize ChunkData to bytes to send to Tenderdash. + pub fn serialize(&mut self) -> Result, Error> { + tracing::trace!( + checksum = hex::encode(self.crc32), + size = self.size, + "state_sync crc32 checksum calculated" + ); + let data: &ChunkData = self; + + bincode::encode_to_vec(data, bincode::config::standard()).map_err(|e| { + tracing::error!(error = ?e, "state_sync failed to encode chunk data"); + Error::Abci(AbciError::StateSyncInternalError(format!( + "failed to encode chunk data: {}", + e + ))) + }) + } + + // verify chunk checksums, etc. + pub fn verify(&self) -> Result<(), Error> { + if self.version != CHUNK_VERSION { + return Err(Error::Abci(AbciError::StateSyncInternalError(format!( + "state_sync chunk version mismatch: expected {}, got {}", + CHUNK_VERSION, self.version + )))); + } + + if self.size != self.chunk.len() as u64 { + return Err(Error::Abci(AbciError::StateSyncInternalError(format!( + "state_sync chunk size mismatch: expected {}, got {}", + self.size, + self.chunk.len() + )))); + } + + let checksum = crc32(&self.chunk); + if self.crc32 != checksum { + tracing::error!( + checksum = hex::encode(checksum), + received = hex::encode(self.crc32), + "state_sync crc32 checksum mismatch", + ); + return Err(Error::Abci(AbciError::StateSyncInternalError(format!( + "state_sync crc32 checksum mismatch: expected {}, got {}", + hex::encode(self.crc32), + hex::encode(checksum), + )))); + } + tracing::trace!( + checksum = hex::encode(checksum), + "state_sync crc32 checksum verified" + ); + + Ok(()) + } + + // deserialize ChunkData from bytes received from Tenderdash and verifies it. + pub fn deserialize(data: &[u8]) -> Result { + let (data, _): (ChunkData, _) = + bincode::decode_from_slice(data, bincode::config::standard()).map_err(|e| { + tracing::error!(error = ?e, "state_sync failed to decode chunk data"); + Error::Abci(AbciError::StateSyncInternalError( + "failed to decode chunk data".to_string(), + )) + })?; + + data.verify()?; + Ok(data) + } +} + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_chunk_data_match() { + let chunk = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + let serialized_chunk_data = ChunkData::new(&chunk).serialize().unwrap(); + assert_ne!(chunk, serialized_chunk_data); + + let deserialized_chunk_data = ChunkData::deserialize(&serialized_chunk_data).unwrap(); + let deserialized_chunk = deserialized_chunk_data.chunk(); + assert_eq!(chunk, deserialized_chunk); + } + + #[test] + fn test_chunk_data_mismatch() { + let chunk = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + let serialized_chunk_data = ChunkData::new(&chunk).serialize().unwrap(); + assert_ne!(chunk, serialized_chunk_data); + + let mut deserialized_chunk_data = ChunkData::deserialize(&serialized_chunk_data).unwrap(); + deserialized_chunk_data.chunk[7] = 0; + let deserialized_chunk = deserialized_chunk_data.chunk(); + assert_ne!(chunk, deserialized_chunk); + } +} diff --git a/packages/rs-drive-abci/src/abci/handler/mod.rs b/packages/rs-drive-abci/src/abci/handler/mod.rs index 8acd0737ebe..ac385e9444f 100644 --- a/packages/rs-drive-abci/src/abci/handler/mod.rs +++ b/packages/rs-drive-abci/src/abci/handler/mod.rs @@ -35,23 +35,33 @@ //! can only make changes that are backwards compatible. Otherwise new calls must be made instead. //! +mod apply_snapshot_chunk; mod check_tx; mod echo; pub mod error; mod extend_vote; mod finalize_block; +mod finalize_snapshot; mod info; mod init_chain; +mod list_snapshots; +mod load_snapshot_chunk; +mod offer_snapshot; mod prepare_proposal; mod process_proposal; mod verify_vote_extension; +pub use apply_snapshot_chunk::apply_snapshot_chunk; pub use check_tx::check_tx; pub use echo::echo; pub use extend_vote::extend_vote; pub use finalize_block::finalize_block; +pub use finalize_snapshot::finalize_snapshot; pub use info::info; pub use init_chain::init_chain; +pub use list_snapshots::list_snapshots; +pub use load_snapshot_chunk::load_snapshot_chunk; +pub use offer_snapshot::offer_snapshot; pub use prepare_proposal::prepare_proposal; pub use process_proposal::process_proposal; pub use verify_vote_extension::verify_vote_extension; diff --git a/packages/rs-drive-abci/src/abci/handler/offer_snapshot.rs b/packages/rs-drive-abci/src/abci/handler/offer_snapshot.rs new file mode 100644 index 00000000000..b47da6a1fe6 --- /dev/null +++ b/packages/rs-drive-abci/src/abci/handler/offer_snapshot.rs @@ -0,0 +1,103 @@ +use crate::abci::app::{SnapshotFetchingApplication, SnapshotManagerApplication}; +use crate::abci::AbciError; +use crate::error::Error; +use crate::platform_types::snapshot::SnapshotFetchingSession; +use dpp::version::PlatformVersion; +use drive::grovedb::replication::CURRENT_STATE_SYNC_VERSION; +use tenderdash_abci::proto::abci as proto; +use tenderdash_abci::proto::abci::response_offer_snapshot; + +pub fn offer_snapshot<'a, 'db: 'a, A, C: 'db>( + app: &'a A, + request: proto::RequestOfferSnapshot, +) -> Result +where + A: SnapshotManagerApplication + SnapshotFetchingApplication<'db, C> + 'db, +{ + let request_app_hash: [u8; 32] = request.app_hash.try_into().map_err(|_| { + AbciError::StateSyncBadRequest("offer_snapshot invalid app_hash length".to_string()) + })?; + let offered_snapshot = request.snapshot.ok_or(AbciError::StateSyncBadRequest( + "offer_snapshot empty snapshot in request".to_string(), + ))?; + tracing::trace!( + "[state_sync] api offer_snapshot height:{}", + offered_snapshot.height + ); + let mut session_write_guard = app.snapshot_fetching_session().write().map_err(|_| { + AbciError::StateSyncInternalError( + "offer_snapshot unable to lock session (poisoned)".to_string(), + ) + })?; + if session_write_guard.is_none() { + // No session currently, start a new one. + app.platform().drive.grove.wipe().map_err(|e| { + AbciError::StateSyncInternalError(format!( + "offer_snapshot unable to wipe grovedb:{}", + e + )) + })?; + let state_sync_info = app + .platform() + .drive + .grove + .start_snapshot_syncing( + request_app_hash, + CURRENT_STATE_SYNC_VERSION, + &PlatformVersion::latest().drive.grove_version, + ) + .map_err(|e| { + AbciError::StateSyncInternalError(format!( + "offer_snapshot unable to start snapshot syncing session:{}", + e + )) + })?; + let session = SnapshotFetchingSession::new( + offered_snapshot, + request_app_hash.to_vec(), + state_sync_info, + ); + *session_write_guard = Some(session); + let mut response = proto::ResponseOfferSnapshot::default(); + response.result = i32::from(response_offer_snapshot::Result::Accept); + Ok(response) + } else { + // Already syncing another snapshot session + let session = session_write_guard + .as_mut() + .ok_or(AbciError::StateSyncInternalError( + "offer_snapshot unable to lock session".to_string(), + ))?; + if offered_snapshot.height <= session.snapshot.height { + return Err(Error::Abci(AbciError::StateSyncBadRequest( + "offer_snapshot already syncing newest height".to_string(), + ))); + } + app.platform().drive.grove.wipe().map_err(|e| { + AbciError::StateSyncInternalError(format!( + "offer_snapshot unable to wipe grovedb:{}", + e + )) + })?; + let state_sync_info = app + .platform() + .drive + .grove + .start_snapshot_syncing( + request_app_hash, + CURRENT_STATE_SYNC_VERSION, + &PlatformVersion::latest().drive.grove_version, + ) + .map_err(|e| { + AbciError::StateSyncInternalError(format!( + "offer_snapshot unable to start snapshot syncing session:{}", + e + )) + })?; + session.snapshot = offered_snapshot; + session.app_hash = request_app_hash.to_vec(); + session.state_sync_info = state_sync_info; + let response = proto::ResponseOfferSnapshot::default(); + Ok(response) + } +} diff --git a/packages/rs-drive-abci/src/config.rs b/packages/rs-drive-abci/src/config.rs index 1e8f5f3c264..d76365b7322 100644 --- a/packages/rs-drive-abci/src/config.rs +++ b/packages/rs-drive-abci/src/config.rs @@ -1,3 +1,4 @@ +use crate::abci::config::StateSyncAbciConfig; use crate::logging::LogConfigs; use crate::utils::from_str_or_number; use crate::{abci::config::AbciConfig, error::Error}; @@ -183,6 +184,9 @@ pub struct PlatformConfig { /// Path to data storage pub db_path: PathBuf, + /// State sync configuration + pub state_sync_config: StateSyncAbciConfig, + /// Path to store rejected / invalid items (like transactions). /// Used mainly for debugging. /// @@ -274,6 +278,7 @@ impl<'de> Deserialize<'de> for PlatformConfig { instant_lock: config.instant_lock, block_spacing_ms: config.block_spacing_ms, db_path: config.db_path, + state_sync_config: StateSyncAbciConfig::default(), rejections_path: config.rejections_path, #[cfg(feature = "testing-config")] testing_configs: config.testing_configs, @@ -724,6 +729,7 @@ impl PlatformConfig { core: Default::default(), execution: Default::default(), db_path: PathBuf::from("/var/lib/dash-platform/data"), + state_sync_config: StateSyncAbciConfig::default_local(), rejections_path: Some(PathBuf::from("/var/log/dash/rejected")), #[cfg(feature = "testing-config")] testing_configs: PlatformTestConfig::default(), @@ -767,6 +773,7 @@ impl PlatformConfig { execution: Default::default(), db_path: PathBuf::from("/var/lib/dash-platform/data"), rejections_path: Some(PathBuf::from("/var/log/dash/rejected")), + state_sync_config: StateSyncAbciConfig::default_local(), #[cfg(feature = "testing-config")] testing_configs: PlatformTestConfig::default(), tokio_console_enabled: false, @@ -808,6 +815,7 @@ impl PlatformConfig { core: Default::default(), execution: Default::default(), db_path: PathBuf::from("/var/lib/dash-platform/data"), + state_sync_config: StateSyncAbciConfig::default_testnet(), rejections_path: Some(PathBuf::from("/var/log/dash/rejected")), #[cfg(feature = "testing-config")] testing_configs: PlatformTestConfig::default(), @@ -850,6 +858,7 @@ impl PlatformConfig { core: Default::default(), execution: Default::default(), db_path: PathBuf::from("/var/lib/dash-platform/data"), + state_sync_config: StateSyncAbciConfig::default_mainnet(), rejections_path: Some(PathBuf::from("/var/log/dash/rejected")), #[cfg(feature = "testing-config")] testing_configs: PlatformTestConfig::default(), diff --git a/packages/rs-drive-abci/src/execution/mod.rs b/packages/rs-drive-abci/src/execution/mod.rs index 0e4b73c3f00..c3b315725d8 100644 --- a/packages/rs-drive-abci/src/execution/mod.rs +++ b/packages/rs-drive-abci/src/execution/mod.rs @@ -3,7 +3,7 @@ mod check_tx; /// Engine module pub mod engine; /// platform execution events -pub(in crate::execution) mod platform_events; +pub(crate) mod platform_events; /// Storage implementation for the execution state pub mod storage; /// Types needed in execution diff --git a/packages/rs-drive-abci/src/execution/platform_events/core_based_updates/mod.rs b/packages/rs-drive-abci/src/execution/platform_events/core_based_updates/mod.rs index f0a8e18930a..3251e226209 100644 --- a/packages/rs-drive-abci/src/execution/platform_events/core_based_updates/mod.rs +++ b/packages/rs-drive-abci/src/execution/platform_events/core_based_updates/mod.rs @@ -1,4 +1,4 @@ mod update_core_info; mod update_masternode_identities; mod update_masternode_list; -mod update_quorum_info; +pub(crate) mod update_quorum_info; diff --git a/packages/rs-drive-abci/src/execution/platform_events/core_based_updates/update_quorum_info/mod.rs b/packages/rs-drive-abci/src/execution/platform_events/core_based_updates/update_quorum_info/mod.rs index 87510c71171..ff567593874 100644 --- a/packages/rs-drive-abci/src/execution/platform_events/core_based_updates/update_quorum_info/mod.rs +++ b/packages/rs-drive-abci/src/execution/platform_events/core_based_updates/update_quorum_info/mod.rs @@ -1,4 +1,4 @@ -mod v0; +pub(crate) mod v0; use crate::error::execution::ExecutionError; use crate::error::Error; diff --git a/packages/rs-drive-abci/src/execution/platform_events/core_based_updates/update_quorum_info/v0/mod.rs b/packages/rs-drive-abci/src/execution/platform_events/core_based_updates/update_quorum_info/v0/mod.rs index 14320ad1d25..58ccfd60b33 100644 --- a/packages/rs-drive-abci/src/execution/platform_events/core_based_updates/update_quorum_info/v0/mod.rs +++ b/packages/rs-drive-abci/src/execution/platform_events/core_based_updates/update_quorum_info/v0/mod.rs @@ -21,13 +21,13 @@ use dpp::dashcore::QuorumHash; use tracing::Level; #[derive(Copy, Clone)] -enum QuorumSetType { +pub enum QuorumSetType { ChainLock(QuorumType), InstantLock(QuorumType), } impl QuorumSetType { - fn quorum_type(&self) -> QuorumType { + pub(crate) fn quorum_type(&self) -> QuorumType { match self { QuorumSetType::ChainLock(quorum_type) => *quorum_type, QuorumSetType::InstantLock(quorum_type) => *quorum_type, diff --git a/packages/rs-drive-abci/src/execution/platform_events/mod.rs b/packages/rs-drive-abci/src/execution/platform_events/mod.rs index 41b64d63b18..67c6ca6bb28 100644 --- a/packages/rs-drive-abci/src/execution/platform_events/mod.rs +++ b/packages/rs-drive-abci/src/execution/platform_events/mod.rs @@ -5,7 +5,7 @@ pub(in crate::execution) mod block_fee_processing; /// Events happening what starting to process a block pub(in crate::execution) mod block_start; /// Update from core such as a masternode list update or quorums being updated -pub(in crate::execution) mod core_based_updates; +pub(crate) mod core_based_updates; /// Verify the chain lock pub(in crate::execution) mod core_chain_lock; /// Instant lock methods diff --git a/packages/rs-drive-abci/src/platform_types/mod.rs b/packages/rs-drive-abci/src/platform_types/mod.rs index b973b9051eb..92ae30b58c2 100644 --- a/packages/rs-drive-abci/src/platform_types/mod.rs +++ b/packages/rs-drive-abci/src/platform_types/mod.rs @@ -31,3 +31,6 @@ pub mod validator_set; pub mod verify_chain_lock_result; /// Withdrawal types pub mod withdrawal; + +/// Snapshots +pub mod snapshot; diff --git a/packages/rs-drive-abci/src/platform_types/snapshot/mod.rs b/packages/rs-drive-abci/src/platform_types/snapshot/mod.rs new file mode 100644 index 00000000000..5ac5942fe41 --- /dev/null +++ b/packages/rs-drive-abci/src/platform_types/snapshot/mod.rs @@ -0,0 +1,230 @@ +use std::{ + fs, + path::{Path, PathBuf}, + pin::Pin, +}; + +use bincode::{config, Decode, Encode}; +use dapi_grpc::tonic; +use drive::error::drive::DriveError; +use drive::error::Error::{Drive, GroveDB}; +use drive::grovedb::replication::MultiStateSyncSession; +use drive::grovedb::GroveDb; +use prost::Message; +use tenderdash_abci::proto::abci; +//use platform_version::version::PlatformVersion; +use crate::error::Error; +use dpp::version::PlatformVersion; + +const SNAPSHOT_KEY: &[u8] = b"snapshots"; + +const CHUNK_SIZE_16MB: usize = 16 * 1024 * 1024; + +const SNAPSHOT_VERSION: u16 = 1; + +/// Snapshot entity +#[derive(Clone, Encode, Decode, PartialEq, Debug)] +pub struct Snapshot { + /// Block height + pub height: i64, + /// Version + pub version: u16, + /// Path to the checkpoint + pub path: String, + /// Root hash of the checkpoint + pub hash: [u8; 32], + /// Metadata + pub metadata: Vec, +} + +/// Snapshot manager is responsible for creating and managing snapshots to keep only the certain +/// number of snapshots and remove the old ones +#[derive(Default, Clone)] +pub struct SnapshotManager { + freq: i64, + number_stored_snapshots: usize, + checkpoints_path: PathBuf, +} + +/// Snapshot manager is responsible for creating and managing snapshots to keep only the certain +/// number of snapshots and remove the old ones +pub struct SnapshotFetchingSession<'db> { + /// Snapshot accepted + pub snapshot: abci::Snapshot, + /// Snapshot accepted + pub app_hash: Vec, + /// Snapshot accepted + pub state_sync_info: Pin>>, +} + +impl<'db> SnapshotFetchingSession<'db> { + /// Creates a new `SnapshotFetchingSession`. + /// + /// # Parameters + /// + /// - `snapshot`: The accepted snapshot. + /// - `app_hash`: The application hash associated with the snapshot. + /// - `state_sync_info`: The state sync information. + /// + /// # Returns + /// + /// A new instance of `SnapshotFetchingSession`. + pub fn new( + snapshot: abci::Snapshot, + app_hash: Vec, + state_sync_info: Pin>>, + ) -> Self { + SnapshotFetchingSession { + snapshot, + app_hash, + state_sync_info, + } + } +} + +impl SnapshotManager { + /// Create a new instance of snapshot manager + pub fn new(checkpoints_path: PathBuf, number_stored_snapshots: usize, freq: i64) -> Self { + if let Err(e) = fs::create_dir_all(checkpoints_path.clone()) { + tracing::error!( + "Failed to create directory {}: {}", + checkpoints_path.display(), + e + ); + } + Self { + freq, + number_stored_snapshots, + checkpoints_path, + } + } + + /// Return a persisted list of snapshots + pub fn get_snapshots(&self, grove: &GroveDb) -> Result, Error> { + let data = grove + .get_aux(SNAPSHOT_KEY, None) + .value + .map_err(|e| Error::Drive(GroveDB(e)))?; + + match data { + Some(data) => { + let conf = config::standard(); + let (mut decoded, _): (Vec, usize) = + bincode::decode_from_slice(data.as_slice(), conf) + .map_err(|e| Error::Drive(Drive(DriveError::Snapshot(e.to_string()))))?; + decoded.sort_by(|a, b| a.height.cmp(&b.height)); + Ok(decoded) + } + None => Ok(vec![]), + } + } + + /// Return the snapshot a requested height + pub fn get_snapshot_at_height( + &self, + grove: &GroveDb, + height: i64, + ) -> Result, Error> { + let snapshots = self.get_snapshots(&grove)?; + let matched_snapshot = snapshots + .iter() + .find(|&snapshot| snapshot.height == height) + .cloned(); + Ok(matched_snapshot) + } + + /// Create a new snapshot for the given height, if a height is not a multiple of N, + /// it will be skipped. + pub fn create_snapshot(&self, grove: &GroveDb, height: i64) -> Result<(), Error> { + if height == 0 || height % self.freq != 0 { + return Ok(()); + } + let checkpoints_path_base = self.checkpoints_path.clone(); + let checkpoint_path: PathBuf = checkpoints_path_base.join(height.to_string()); + grove + .create_checkpoint(&checkpoint_path) + .map_err(|e| Error::Drive(GroveDB(e)))?; + + let root_hash = grove + .root_hash(None, &PlatformVersion::latest().drive.grove_version) + .value + .map_err(|e| Error::Drive(Drive(DriveError::Snapshot(e.to_string()))))?; + + let snapshot = Snapshot { + height, + version: SNAPSHOT_VERSION, + path: checkpoint_path.to_string_lossy().into_owned(), + hash: root_hash as [u8; 32], + metadata: vec![], + }; + + let mut snapshots = self.get_snapshots(grove)?; + snapshots.push(snapshot); + snapshots = self.prune_excess_snapshots(snapshots)?; + self.save_snapshots(grove, snapshots) + } + + fn prune_excess_snapshots(&self, snapshots: Vec) -> Result, Error> { + if snapshots.len() <= self.number_stored_snapshots { + return Ok(snapshots); + } + let separator = snapshots.len() - self.number_stored_snapshots; + for snapshot in &snapshots[0..separator] { + if Path::new(&snapshot.path).is_dir() { + fs::remove_dir_all(&snapshot.path) + .map_err(|e| Error::Drive(Drive(DriveError::Snapshot(e.to_string()))))?; + } + } + Ok(snapshots[separator..].to_vec()) + } + + fn save_snapshots(&self, grove: &GroveDb, snapshots: Vec) -> Result<(), Error> { + let conf = config::standard(); + let data: Vec = bincode::encode_to_vec(snapshots, conf) + .map_err(|e| Error::Drive(Drive(DriveError::Snapshot(e.to_string()))))?; + grove + .put_aux(SNAPSHOT_KEY, data.as_slice(), None, None) + .value + .map_err(|e| Error::Drive(GroveDB(e)))?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::fs; + + #[test] + fn test_create_snapshot() { + let test_cases = vec![ + (1000, 1000, vec![1000]), + (1000, 1001, vec![1000, 1001]), + (1000, 1002, vec![1000, 1001, 1002]), + (1000, 1004, vec![1002, 1003, 1004]), + (1000, 1005, vec![1003, 1004, 1005]), + ]; + for (start, end, want) in test_cases { + let grove_dir = tempfile::tempdir().unwrap(); + let checkpoints_dir = tempfile::tempdir().unwrap(); + let grove = GroveDb::open(grove_dir.path()).unwrap(); + let manager = SnapshotManager::new( + checkpoints_dir.path().to_str().unwrap().to_string().into(), + 3, + 1, + ); + for height in start..=end { + manager.create_snapshot(&grove, height).unwrap(); + } + let snapshots = manager.get_snapshots(&grove).unwrap(); + let res: Vec = snapshots.iter().map(|s| s.height).collect(); + assert_eq!(want, res); + + let paths: Vec = snapshots.iter().map(|s| s.path.to_string()).collect(); + for path in paths { + assert!(Path::new(&path).exists()); + } + fs::remove_dir_all(grove_dir.path()).unwrap(); + } + } +} diff --git a/packages/rs-drive-abci/tests/strategy_tests/main.rs b/packages/rs-drive-abci/tests/strategy_tests/main.rs index 4194e306896..fa73bff9a88 100644 --- a/packages/rs-drive-abci/tests/strategy_tests/main.rs +++ b/packages/rs-drive-abci/tests/strategy_tests/main.rs @@ -26,6 +26,7 @@ mod masternode_list_item_helpers; mod masternodes; mod patch_platform_tests; mod query; +mod state_sync; mod strategy; mod upgrade_fork_tests; mod verify_state_transitions; diff --git a/packages/rs-drive-abci/tests/strategy_tests/state_sync.rs b/packages/rs-drive-abci/tests/strategy_tests/state_sync.rs new file mode 100644 index 00000000000..a47267ea383 --- /dev/null +++ b/packages/rs-drive-abci/tests/strategy_tests/state_sync.rs @@ -0,0 +1,305 @@ +#[cfg(test)] +mod tests { + use crate::execution::run_chain_for_strategy; + use crate::strategy::NetworkStrategy; + use dpp::data_contract::accessors::v0::DataContractV0Getters; + use dpp::data_contract::document_type::random_document::{ + DocumentFieldFillSize, DocumentFieldFillType, + }; + use dpp::tests::json_document::json_document_to_created_contract; + use drive_abci::abci::app::{FullAbciApplication, PlatformApplication}; + use drive_abci::abci::config::StateSyncAbciConfig; + use drive_abci::config::{ExecutionConfig, PlatformConfig}; + use drive_abci::test::helpers::setup::TestPlatformBuilder; + use platform_version::version::PlatformVersion; + use rand::distributions::Alphanumeric; + use rand::Rng; + use std::collections::VecDeque; + use std::fs; + use std::path::PathBuf; + use std::time::{Duration, Instant}; + use strategy_tests::frequency::Frequency; + use strategy_tests::operations::{DocumentAction, DocumentOp, Operation, OperationType}; + use strategy_tests::{IdentityInsertInfo, StartIdentities, Strategy}; + use tenderdash_abci::proto::abci::{ + RequestApplySnapshotChunk, RequestInfo, RequestListSnapshots, RequestLoadSnapshotChunk, + RequestOfferSnapshot, + }; + use tenderdash_abci::Application; + + fn generate_random_path(prefix: &str, suffix: &str, len: usize) -> String { + let random_string: String = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(len) + .map(char::from) + .collect(); + format!("{}/{}{}", prefix, random_string, suffix) + } + + fn create_dir_if_not_exists(path: &PathBuf) -> std::io::Result<()> { + if !path.exists() { + fs::create_dir_all(path)?; + } + Ok(()) + } + + fn remove_dir(path: &PathBuf) -> std::io::Result<()> { + if path.exists() { + fs::remove_dir_all(path)?; + } + Ok(()) + } + + fn get_target_folder() -> PathBuf { + // Use the environment variable `CARGO_MANIFEST_DIR` to locate the current package + let manifest_dir = env!("CARGO_MANIFEST_DIR"); + + // Traverse up the directory tree to find the workspace root + let mut current_dir = PathBuf::from(manifest_dir); + while !current_dir.join("Cargo.lock").exists() { + current_dir.pop(); // Go up one level + } + + // The `target` folder is located at the workspace root + current_dir.join("target") + } + + fn get_target_tmp_folder() -> PathBuf { + get_target_folder().join("tmp") + } + + #[test] + fn run_state_sync() { + let platform_version = PlatformVersion::latest(); + let created_contract = json_document_to_created_contract( + "tests/supporting_files/contract/dashpay/dashpay-contract-all-mutable.json", + 1, + true, + platform_version, + ) + .expect("expected to get contract from a json document"); + + let contract = created_contract.data_contract(); + + let document_insertion_op = DocumentOp { + contract: contract.clone(), + action: DocumentAction::DocumentActionInsertRandom( + DocumentFieldFillType::FillIfNotRequired, + DocumentFieldFillSize::AnyDocumentFillSize, + ), + document_type: contract + .document_type_for_name("contactRequest") + .expect("expected a profile document type") + .to_owned_document_type(), + }; + + let strategy = NetworkStrategy { + strategy: Strategy { + start_contracts: vec![(created_contract, None)], + operations: vec![Operation { + op_type: OperationType::Document(document_insertion_op), + frequency: Frequency { + times_per_block_range: 50000..50002, + chance_per_block: None, + }, + }], + start_identities: StartIdentities::default(), + identity_inserts: IdentityInsertInfo { + frequency: Frequency { + times_per_block_range: 1..2, + chance_per_block: None, + }, + ..Default::default() + }, + + identity_contract_nonce_gaps: None, + signer: None, + }, + total_hpmns: 100, + extra_normal_mns: 0, + validator_quorum_count: 24, + chain_lock_quorum_count: 24, + upgrading_info: None, + + proposer_strategy: Default::default(), + rotate_quorums: false, + failure_testing: None, + query_testing: None, + verify_state_transition_results: true, + ..Default::default() + }; + let day_in_ms = 1000 * 60 * 60 * 24; + + let base_test_directory = PathBuf::from(generate_random_path( + get_target_tmp_folder().to_str().unwrap(), + "", + 12, + )); + + let mut checkpoint_test_directory = base_test_directory.clone(); + checkpoint_test_directory.push("checkpoints"); + + create_dir_if_not_exists(&checkpoint_test_directory) + .expect("should create checkpoint directory"); + println!( + "checkpoint_test_directory: {}", + checkpoint_test_directory.to_str().unwrap() + ); + + let mut db_test_directory = base_test_directory.clone(); + db_test_directory.push("db"); + + create_dir_if_not_exists(&db_test_directory).expect("should create db directory"); + println!("db_test_directory: {}", db_test_directory.to_str().unwrap()); + + let local_state_sync_config = StateSyncAbciConfig { + snapshots_enabled: true, + checkpoints_path: checkpoint_test_directory, + snapshots_frequency: 10, + max_num_snapshots: 3, + }; + + let config = PlatformConfig { + execution: ExecutionConfig { + verify_sum_trees: true, + ..Default::default() + }, + block_spacing_ms: day_in_ms, + db_path: db_test_directory, + state_sync_config: local_state_sync_config, + ..Default::default() + }; + + let block_count = 50; + let mut platform = TestPlatformBuilder::new() + .with_config(config.clone()) + .build_with_mock_rpc(); + + let source_outcome = run_chain_for_strategy( + &mut platform, + block_count, + strategy, + config.clone(), + 15, + &mut None, + ); + let source_snapshots = source_outcome + .abci_app + .list_snapshots(RequestListSnapshots::default()) + .expect("should expected snapshots"); + + for s in &source_snapshots.snapshots { + println!( + "snapshot height:{} app_hash:{}", + s.height, + hex::encode(&s.hash) + ); + } + let best_snapshot = match source_snapshots.snapshots.iter().max_by_key(|s| s.height) { + Some(s) => s, + None => { + println!("no snapshots available. exit"); + return; // Return early if no item is found + } + }; + println!( + "best_snapshot height:{} app_hash:{}", + best_snapshot.height, + hex::encode(&best_snapshot.hash) + ); + + let target_platform = TestPlatformBuilder::new() + .with_config(config.clone()) + .build_with_mock_rpc(); + let target_abci_app = FullAbciApplication::new(&target_platform); + + let offer_snapshot_request = RequestOfferSnapshot { + snapshot: Some(best_snapshot.clone()), + app_hash: best_snapshot.hash.to_vec(), + }; + + let _ = target_abci_app + .offer_snapshot(offer_snapshot_request) + .expect("should offer_snapshot succeed"); + + let mut chunk_queue: VecDeque> = VecDeque::new(); + chunk_queue.push_back(best_snapshot.hash.to_vec()); + + let start_time = Instant::now(); + + let mut duration_sum_fetch: Duration = Duration::ZERO; + let mut duration_sum_apply: Duration = Duration::ZERO; + + let mut chunk_counter = 0; + let mut ops_counter = 0; + while let Some(chunk_id) = chunk_queue.pop_front() { + let request_load_chunk = RequestLoadSnapshotChunk { + height: best_snapshot.height, + version: best_snapshot.version, + chunk_id: chunk_id.clone(), + }; + let start_time_fetch = Instant::now(); + let load_chunk_response = source_outcome + .abci_app + .load_snapshot_chunk(request_load_chunk) + .expect("should fetch chunk"); + duration_sum_fetch += start_time_fetch.elapsed(); + + let request_apply_chunk = RequestApplySnapshotChunk { + chunk_id, + chunk: load_chunk_response.chunk, + ..Default::default() + }; + let request_apply_num_ops = request_apply_chunk.chunk.len(); + ops_counter += request_apply_num_ops; + + let elapsed = start_time.elapsed(); + let chunk_id_hex = hex::encode(&request_apply_chunk.chunk_id); + let start_time_apply = Instant::now(); + let apply_chunk_response = target_abci_app + .apply_snapshot_chunk(request_apply_chunk) + .expect("should apply chunk succeed"); + duration_sum_apply += start_time_apply.elapsed(); + println!( + "#{} apply:{} num_ops:{} returned:{} queue:{} {:.2?}", + chunk_counter, + chunk_id_hex, + request_apply_num_ops, + apply_chunk_response.next_chunks.len(), + chunk_queue.len(), + elapsed + ); + chunk_queue.extend(apply_chunk_response.next_chunks); + chunk_counter += 1; + } + println!("total chunks:{} ops:{}", chunk_counter, ops_counter); + println!("duration_sum_fetch: {}", duration_sum_fetch.as_secs_f64()); + println!("duration_sum_apply: {}", duration_sum_apply.as_secs_f64()); + + println!( + "source app_hash:{}", + hex::encode( + source_outcome + .abci_app + .platform + .drive + .grove + .root_hash(None, &PlatformVersion::latest().drive.grove_version) + .unwrap() + .unwrap() + ) + ); + println!( + "target app_hash:{}", + hex::encode( + target_abci_app + .platform() + .drive + .grove + .root_hash(None, &PlatformVersion::latest().drive.grove_version) + .unwrap() + .unwrap() + ) + ); + } +} diff --git a/packages/rs-drive-proof-verifier/Cargo.toml b/packages/rs-drive-proof-verifier/Cargo.toml index 00082eed054..08cbdeef47d 100644 --- a/packages/rs-drive-proof-verifier/Cargo.toml +++ b/packages/rs-drive-proof-verifier/Cargo.toml @@ -34,14 +34,13 @@ dpp = { path = "../rs-dpp", features = [ bincode = { version = "2.0.0-rc.3", features = ["serde"], optional = true } platform-serialization-derive = { path = "../rs-platform-serialization-derive", optional = true } platform-serialization = { path = "../rs-platform-serialization", optional = true } -tenderdash-abci = { git = "https://github.com/dashpay/rs-tenderdash-abci", version = "1.2.1", tag = "v1.2.1+1.3.0", features = [ +tenderdash-abci = { git = "https://github.com/dashpay/rs-tenderdash-abci", rev = "b55bed9f574b68f2b6c96cbc80da41072056781d", features = [ "crypto", + "serde", ], default-features = false } tracing = { version = "0.1.37" } serde = { version = "1.0.197", default-features = false, optional = true } -serde_json = { version = "1.0.103", features = [ - "preserve_order", -], optional = true } +serde_json = { version = "1.0", features = ["preserve_order"], optional = true } hex = { version = "0.4.3" } indexmap = { version = "2.6.0" } derive_more = { version = "1.0", features = ["from"] } diff --git a/packages/rs-drive/Cargo.toml b/packages/rs-drive/Cargo.toml index 6bc09663429..e7aa1870b32 100644 --- a/packages/rs-drive/Cargo.toml +++ b/packages/rs-drive/Cargo.toml @@ -52,12 +52,12 @@ enum-map = { version = "2.0.3", optional = true } intmap = { version = "3.0.1", features = ["serde"], optional = true } chrono = { version = "0.4.35", optional = true } itertools = { version = "0.13", optional = true } -grovedb = { version = "2.2.1", optional = true, default-features = false } -grovedb-costs = { version = "2.2.1", optional = true } -grovedb-path = { version = "2.2.1" } -grovedb-storage = { version = "2.2.1", optional = true } -grovedb-version = { version = "2.2.1" } -grovedb-epoch-based-storage-flags = { version = "2.2.1" } +grovedb = { git = "https://github.com/dashpay/grovedb", branch = "feat/chunk_packing_master_fix", optional = true, default-features = false } +grovedb-costs = { git = "https://github.com/dashpay/grovedb", branch = "feat/chunk_packing_master_fix", optional = true } +grovedb-path = { git = "https://github.com/dashpay/grovedb", branch = "feat/chunk_packing_master_fix" } +grovedb-storage = { git = "https://github.com/dashpay/grovedb", branch = "feat/chunk_packing_master_fix", optional = true } +grovedb-version = { git = "https://github.com/dashpay/grovedb", branch = "feat/chunk_packing_master_fix" } +grovedb-epoch-based-storage-flags = { git = "https://github.com/dashpay/grovedb", branch = "feat/chunk_packing_master_fix" } [dev-dependencies] criterion = "0.5" diff --git a/packages/rs-drive/src/error/drive.rs b/packages/rs-drive/src/error/drive.rs index 922cc5b8e4b..f5e8c8b657c 100644 --- a/packages/rs-drive/src/error/drive.rs +++ b/packages/rs-drive/src/error/drive.rs @@ -188,4 +188,8 @@ pub enum DriveError { /// Data Contract not found #[error("data contract not found: {0}")] DataContractNotFound(String), + + /// Error todo + #[error("snapshot error")] + Snapshot(String), } diff --git a/packages/rs-json-schema-compatibility-validator/Cargo.toml b/packages/rs-json-schema-compatibility-validator/Cargo.toml index a4211f6d181..aa61bb62c22 100644 --- a/packages/rs-json-schema-compatibility-validator/Cargo.toml +++ b/packages/rs-json-schema-compatibility-validator/Cargo.toml @@ -7,7 +7,7 @@ authors = ["Ivan Shumkov "] [dependencies] json-patch = "1.4" -serde_json = "1.0.115" +serde_json = { version = "1.0" } thiserror = "1.0.64" once_cell = "1.19.0" diff --git a/packages/rs-platform-version/Cargo.toml b/packages/rs-platform-version/Cargo.toml index 308329b22a8..373e4db23b2 100644 --- a/packages/rs-platform-version/Cargo.toml +++ b/packages/rs-platform-version/Cargo.toml @@ -11,7 +11,7 @@ license = "MIT" thiserror = { version = "1.0.63" } bincode = { version = "2.0.0-rc.3" } versioned-feature-core = { git = "https://github.com/dashpay/versioned-feature-core", version = "1.0.0" } -grovedb-version = { version = "2.2.1" } +grovedb-version = { git = "https://github.com/dashpay/grovedb", branch = "feat/chunk_packing_master_fix" } once_cell = "1.19.0" [features]