diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f8b26657df..9885039c012 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] ### Added +- [2081](https://github.com/FuelLabs/fuel-core/pull/2081): Enable producer to include predefined blocks. - [2079](https://github.com/FuelLabs/fuel-core/pull/2079): Open unknown columns in the RocksDB for forward compatibility. ### Changed diff --git a/Cargo.lock b/Cargo.lock index 086b4b6e486..8901cc23ae3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3156,6 +3156,8 @@ dependencies = [ "fuel-core-types", "mockall", "rand", + "serde", + "serde_json", "test-case", "tokio", "tokio-stream", @@ -3174,6 +3176,7 @@ dependencies = [ "fuel-core-trace", "fuel-core-types", "mockall", + "proptest", "rand", "tokio", "tokio-rayon", diff --git a/crates/fuel-core/src/service/adapters/consensus_module/poa.rs b/crates/fuel-core/src/service/adapters/consensus_module/poa.rs index ff2c36d5261..cf5661e5b0f 100644 --- a/crates/fuel-core/src/service/adapters/consensus_module/poa.rs +++ b/crates/fuel-core/src/service/adapters/consensus_module/poa.rs @@ -24,6 +24,7 @@ use fuel_core_poa::{ use fuel_core_services::stream::BoxStream; use fuel_core_storage::transactional::Changes; use fuel_core_types::{ + blockchain::block::Block, fuel_tx::TxId, fuel_types::BlockHeight, services::{ @@ -120,6 +121,15 @@ impl fuel_core_poa::ports::BlockProducer for BlockProducerAdapter { } } } + + async fn produce_predefined_block( + &self, + block: &Block, + ) -> anyhow::Result> { + self.block_producer + .produce_and_execute_predefined(block) + .await + } } #[async_trait::async_trait] diff --git a/crates/fuel-core/src/service/config.rs b/crates/fuel-core/src/service/config.rs index 1c375a4af41..0e84142916a 100644 --- a/crates/fuel-core/src/service/config.rs +++ b/crates/fuel-core/src/service/config.rs @@ -204,6 +204,11 @@ impl From<&Config> for fuel_core_poa::Config { metrics: false, min_connected_reserved_peers: config.min_connected_reserved_peers, time_until_synced: config.time_until_synced, + chain_id: config + .snapshot_reader + .chain_config() + .consensus_parameters + .chain_id(), } } } diff --git a/crates/fuel-core/src/service/sub_services.rs b/crates/fuel-core/src/service/sub_services.rs index 136870874f6..d8f1bf2b27c 100644 --- a/crates/fuel-core/src/service/sub_services.rs +++ b/crates/fuel-core/src/service/sub_services.rs @@ -42,7 +42,10 @@ use fuel_core_gas_price_service::fuel_gas_price_updater::{ UpdaterMetadata, V0Metadata, }; -use fuel_core_poa::Trigger; +use fuel_core_poa::{ + ports::InMemoryPredefinedBlocks, + Trigger, +}; use fuel_core_services::{ RunnableService, ServiceRunner, @@ -53,13 +56,20 @@ use fuel_core_storage::{ }; #[cfg(feature = "relayer")] use fuel_core_types::blockchain::primitives::DaBlockHeight; -use std::sync::Arc; +use std::{ + collections::HashMap, + sync::Arc, +}; use tokio::sync::Mutex; mod algorithm_updater; -pub type PoAService = - fuel_core_poa::Service; +pub type PoAService = fuel_core_poa::Service< + TxPoolAdapter, + BlockProducerAdapter, + BlockImporterAdapter, + InMemoryPredefinedBlocks, +>; #[cfg(feature = "p2p")] pub type P2PService = fuel_core_p2p::service::Service; pub type TxPoolSharedState = fuel_core_txpool::service::SharedState< @@ -235,6 +245,7 @@ pub fn init_sub_services( tracing::info!("Enabled manual block production because of `debug` flag"); } + let predefined_blocks: InMemoryPredefinedBlocks = HashMap::new().into(); let poa = (production_enabled).then(|| { fuel_core_poa::new_service( &last_block_header, @@ -243,6 +254,7 @@ pub fn init_sub_services( producer_adapter.clone(), importer_adapter.clone(), p2p_adapter.clone(), + predefined_blocks, ) }); let poa_adapter = PoAAdapter::new(poa.as_ref().map(|service| service.shared.clone())); diff --git a/crates/services/consensus_module/poa/Cargo.toml b/crates/services/consensus_module/poa/Cargo.toml index 3a96a22e015..3ae865c7de2 100644 --- a/crates/services/consensus_module/poa/Cargo.toml +++ b/crates/services/consensus_module/poa/Cargo.toml @@ -16,12 +16,15 @@ fuel-core-chain-config = { workspace = true } fuel-core-services = { workspace = true } fuel-core-storage = { workspace = true } fuel-core-types = { workspace = true } +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } tokio = { workspace = true, features = ["full"] } tokio-stream = { workspace = true } tracing = { workspace = true } [dev-dependencies] fuel-core-poa = { path = ".", features = ["test-helpers"] } +fuel-core-services = { workspace = true, features = ["test-helpers"] } fuel-core-storage = { path = "./../../../storage", features = ["test-helpers"] } fuel-core-types = { path = "./../../../types", features = ["test-helpers"] } mockall = { workspace = true } diff --git a/crates/services/consensus_module/poa/src/config.rs b/crates/services/consensus_module/poa/src/config.rs index 83f8596b23d..7da6cad5b80 100644 --- a/crates/services/consensus_module/poa/src/config.rs +++ b/crates/services/consensus_module/poa/src/config.rs @@ -1,5 +1,6 @@ use fuel_core_types::{ blockchain::primitives::SecretKeyWrapper, + fuel_types::ChainId, secrecy::Secret, }; use tokio::time::Duration; @@ -11,6 +12,7 @@ pub struct Config { pub metrics: bool, pub min_connected_reserved_peers: usize, pub time_until_synced: Duration, + pub chain_id: ChainId, } #[cfg(feature = "test-helpers")] @@ -22,6 +24,7 @@ impl Default for Config { metrics: false, min_connected_reserved_peers: 0, time_until_synced: Duration::ZERO, + chain_id: ChainId::default(), } } } diff --git a/crates/services/consensus_module/poa/src/ports.rs b/crates/services/consensus_module/poa/src/ports.rs index 41ed8ad4adb..e5fa26fa336 100644 --- a/crates/services/consensus_module/poa/src/ports.rs +++ b/crates/services/consensus_module/poa/src/ports.rs @@ -5,6 +5,7 @@ use fuel_core_storage::{ }; use fuel_core_types::{ blockchain::{ + block::Block, header::BlockHeader, primitives::DaBlockHeight, }, @@ -29,6 +30,7 @@ use fuel_core_types::{ }, tai64::Tai64, }; +use std::collections::HashMap; #[cfg_attr(test, mockall::automock)] pub trait TransactionPool: Send + Sync { @@ -59,6 +61,11 @@ pub trait BlockProducer: Send + Sync { block_time: Tai64, source: TransactionsSource, ) -> anyhow::Result>; + + async fn produce_predefined_block( + &self, + block: &Block, + ) -> anyhow::Result>; } #[cfg_attr(test, mockall::automock)] @@ -108,3 +115,29 @@ pub trait SyncPort: Send + Sync { /// await synchronization with the peers async fn sync_with_peers(&mut self) -> anyhow::Result<()>; } + +pub trait PredefinedBlocks: Send + Sync { + fn get_block(&self, height: &BlockHeight) -> Option; +} + +pub struct InMemoryPredefinedBlocks { + blocks: HashMap, +} + +impl From> for InMemoryPredefinedBlocks { + fn from(blocks: HashMap) -> Self { + Self::new(blocks) + } +} + +impl InMemoryPredefinedBlocks { + pub fn new(blocks: HashMap) -> Self { + Self { blocks } + } +} + +impl PredefinedBlocks for InMemoryPredefinedBlocks { + fn get_block(&self, height: &BlockHeight) -> Option { + self.blocks.get(height).cloned() + } +} diff --git a/crates/services/consensus_module/poa/src/service.rs b/crates/services/consensus_module/poa/src/service.rs index 8a3e01fcca0..43a9878b346 100644 --- a/crates/services/consensus_module/poa/src/service.rs +++ b/crates/services/consensus_module/poa/src/service.rs @@ -1,3 +1,21 @@ +use anyhow::{ + anyhow, + Context, +}; +use std::{ + ops::Deref, + sync::Arc, + time::Duration, +}; +use tokio::{ + sync::{ + mpsc, + oneshot, + }, + time::Instant, +}; +use tokio_stream::StreamExt; + use crate::{ deadline_clock::{ DeadlineClock, @@ -7,6 +25,7 @@ use crate::{ BlockImporter, BlockProducer, P2pPort, + PredefinedBlocks, TransactionPool, TransactionsSource, }, @@ -17,10 +36,6 @@ use crate::{ Config, Trigger, }; -use anyhow::{ - anyhow, - Context, -}; use fuel_core_services::{ stream::BoxStream, RunnableService, @@ -54,6 +69,7 @@ use fuel_core_types::{ services::{ block_importer::ImportResult, executor::{ + Error as ExecutorError, ExecutionResult, UncommittedResult as UncommittedExecutionResult, }, @@ -61,20 +77,9 @@ use fuel_core_types::{ }, tai64::Tai64, }; -use std::{ - ops::Deref, - time::Duration, -}; -use tokio::{ - sync::{ - mpsc, - oneshot, - }, - time::Instant, -}; -use tokio_stream::StreamExt; +use serde::Serialize; -pub type Service = ServiceRunner>; +pub type Service = ServiceRunner>; #[derive(Clone)] pub struct SharedState { request_sender: mpsc::Sender, @@ -128,7 +133,7 @@ pub(crate) enum RequestType { Trigger, } -pub struct MainTask { +pub struct MainTask { signing_key: Option>, block_producer: B, block_importer: I, @@ -139,16 +144,18 @@ pub struct MainTask { last_height: BlockHeight, last_timestamp: Tai64, last_block_created: Instant, + predefined_blocks: PB, trigger: Trigger, /// Deadline clock, used by the triggers timer: DeadlineClock, sync_task_handle: ServiceRunner, } -impl MainTask +impl MainTask where T: TransactionPool, I: BlockImporter, + PB: PredefinedBlocks, { pub fn new( last_block: &BlockHeader, @@ -157,6 +164,7 @@ where block_producer: B, block_importer: I, p2p_port: P, + predefined_blocks: PB, ) -> Self { let tx_status_update_stream = txpool.transaction_status_events(); let (request_sender, request_receiver) = mpsc::channel(1024); @@ -195,6 +203,7 @@ where last_height, last_timestamp, last_block_created, + predefined_blocks, trigger, timer: DeadlineClock::new(), sync_task_handle, @@ -203,10 +212,10 @@ where fn extract_block_info(last_block: &BlockHeader) -> (BlockHeight, Tai64, Instant) { let last_timestamp = last_block.time(); - let duration = + let duration_since_last_block = Duration::from_secs(Tai64::now().0.saturating_sub(last_timestamp.0)); let last_block_created = Instant::now() - .checked_sub(duration) + .checked_sub(duration_since_last_block) .unwrap_or(Instant::now()); let last_height = *last_block.height(); (last_height, last_timestamp, last_block_created) @@ -241,11 +250,12 @@ where } } -impl MainTask +impl MainTask where T: TransactionPool, B: BlockProducer, I: BlockImporter, + PB: PredefinedBlocks, { // Request the block producer to make a new block, and return it when ready async fn signal_produce_block( @@ -387,6 +397,65 @@ where Ok(()) } + async fn produce_predefined_block( + &mut self, + predefined_block: &Block, + ) -> anyhow::Result<()> { + tracing::info!("Producing predefined block"); + let last_block_created = Instant::now(); + // verify signing key is set + if self.signing_key.is_none() { + return Err(anyhow!("unable to produce blocks without a consensus key")) + } + + // Ask the block producer to create the block + let ( + ExecutionResult { + block, + skipped_transactions, + tx_status, + events, + }, + changes, + ) = self + .block_producer + .produce_predefined_block(predefined_block) + .await? + .into(); + + if !skipped_transactions.is_empty() { + let block_and_skipped = PredefinedBlockWithSkippedTransactions { + block: predefined_block.clone(), + skipped_transactions, + }; + let serialized = serde_json::to_string_pretty(&block_and_skipped)?; + tracing::error!( + "During block production got invalid transactions: BEGIN {} END", + serialized + ); + } + // Sign the block and seal it + let seal = seal_block(&self.signing_key, &block)?; + let sealed_block = SealedBlock { + entity: block, + consensus: seal, + }; + // Import the sealed block + self.block_importer + .commit_result(Uncommitted::new( + ImportResult::new_from_local(sealed_block.clone(), tx_status, events), + changes, + )) + .await?; + + // Update last block time + self.last_height = *sealed_block.entity.header().height(); + self.last_timestamp = sealed_block.entity.header().time(); + self.last_block_created = last_block_created; + + Ok(()) + } + pub(crate) async fn on_txpool_event(&mut self) -> anyhow::Result<()> { match self.trigger { Trigger::Instant => { @@ -413,17 +482,32 @@ where } } } + fn update_last_block_values(&mut self, block_header: &Arc) { + let (last_height, last_timestamp, last_block_created) = + Self::extract_block_info(block_header); + if last_height > self.last_height { + self.last_height = last_height; + self.last_timestamp = last_timestamp; + self.last_block_created = last_block_created; + } + } +} + +#[derive(Serialize)] +struct PredefinedBlockWithSkippedTransactions { + block: Block, + skipped_transactions: Vec<(TxId, ExecutorError)>, } #[async_trait::async_trait] -impl RunnableService for MainTask +impl RunnableService for MainTask where Self: RunnableTask, { const NAME: &'static str = "PoA"; type SharedData = SharedState; - type Task = MainTask; + type Task = MainTask; type TaskParams = (); fn shared_data(&self) -> Self::SharedData { @@ -451,24 +535,25 @@ where } #[async_trait::async_trait] -impl RunnableTask for MainTask +impl RunnableTask for MainTask where T: TransactionPool, B: BlockProducer, I: BlockImporter, + PB: PredefinedBlocks, { async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { let should_continue; - let mut state = self.sync_task_handle.shared.clone(); + let mut sync_state = self.sync_task_handle.shared.clone(); // make sure we're synced first - while *state.borrow_and_update() == SyncState::NotSynced { + while *sync_state.borrow_and_update() == SyncState::NotSynced { tokio::select! { biased; result = watcher.while_started() => { should_continue = result?.started(); return Ok(should_continue); } - _ = state.changed() => { + _ = sync_state.changed() => { break; } _ = self.tx_status_update_stream.next() => { @@ -480,16 +565,17 @@ where } } - if let SyncState::Synced(block_header) = &*state.borrow_and_update() { - let (last_height, last_timestamp, last_block_created) = - Self::extract_block_info(block_header); - if last_height > self.last_height { - self.last_height = last_height; - self.last_timestamp = last_timestamp; - self.last_block_created = last_block_created; - } + if let SyncState::Synced(block_header) = &*sync_state.borrow_and_update() { + self.update_last_block_values(block_header); } + let next_height = self.next_height(); + let maybe_block = self.predefined_blocks.get_block(&next_height); + if let Some(block) = maybe_block { + self.produce_predefined_block(&block).await?; + should_continue = true; + return Ok(should_continue) + } tokio::select! { biased; _ = watcher.while_started() => { @@ -528,6 +614,7 @@ where should_continue = true; } } + Ok(should_continue) } @@ -538,18 +625,20 @@ where } } -pub fn new_service( +pub fn new_service( last_block: &BlockHeader, config: Config, txpool: T, block_producer: B, block_importer: I, p2p_port: P, -) -> Service + predefined_blocks: PB, +) -> Service where T: TransactionPool + 'static, B: BlockProducer + 'static, I: BlockImporter + 'static, + PB: PredefinedBlocks + 'static, P: P2pPort, { Service::new(MainTask::new( @@ -559,6 +648,7 @@ where block_producer, block_importer, p2p_port, + predefined_blocks, )) } diff --git a/crates/services/consensus_module/poa/src/service_test.rs b/crates/services/consensus_module/poa/src/service_test.rs index a9aacc3531f..4c0855d6a4e 100644 --- a/crates/services/consensus_module/poa/src/service_test.rs +++ b/crates/services/consensus_module/poa/src/service_test.rs @@ -1,26 +1,37 @@ #![allow(clippy::arithmetic_side_effects)] +#![allow(non_snake_case)] use crate::{ new_service, ports::{ + BlockProducer, + InMemoryPredefinedBlocks, MockBlockImporter, MockBlockProducer, MockP2pPort, MockTransactionPool, + TransactionsSource, }, service::MainTask, Config, Service, Trigger, }; +use async_trait::async_trait; use fuel_core_services::{ stream::pending, Service as StorageTrait, + ServiceRunner, State, }; +use fuel_core_storage::transactional::Changes; use fuel_core_types::{ blockchain::{ - header::BlockHeader, + block::Block, + header::{ + BlockHeader, + PartialBlockHeader, + }, primitives::SecretKeyWrapper, SealedBlock, }, @@ -47,7 +58,10 @@ use rand::{ SeedableRng, }; use std::{ - collections::HashSet, + collections::{ + HashMap, + HashSet, + }, sync::{ Arc, Mutex as StdMutex, @@ -148,6 +162,8 @@ impl TestContextBuilder { let p2p_port = generate_p2p_port(); + let predefined_blocks = HashMap::new().into(); + let service = new_service( &BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()), config, @@ -155,6 +171,7 @@ impl TestContextBuilder { producer, importer, p2p_port, + predefined_blocks, ); service.start().unwrap(); TestContext { service } @@ -162,7 +179,12 @@ impl TestContextBuilder { } struct TestContext { - service: Service, + service: Service< + MockTransactionPool, + MockBlockProducer, + MockBlockImporter, + InMemoryPredefinedBlocks, + >, } impl TestContext { @@ -332,6 +354,8 @@ async fn remove_skipped_transactions() { let p2p_port = generate_p2p_port(); + let predefined_blocks: InMemoryPredefinedBlocks = HashMap::new().into(); + let mut task = MainTask::new( &BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()), config, @@ -339,6 +363,7 @@ async fn remove_skipped_transactions() { block_producer, block_importer, p2p_port, + predefined_blocks, ); assert!(task.produce_next_block().await.is_ok()); @@ -379,6 +404,8 @@ async fn does_not_produce_when_txpool_empty_in_instant_mode() { let p2p_port = generate_p2p_port(); + let predefined_blocks: InMemoryPredefinedBlocks = HashMap::new().into(); + let mut task = MainTask::new( &BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()), config, @@ -386,6 +413,7 @@ async fn does_not_produce_when_txpool_empty_in_instant_mode() { block_producer, block_importer, p2p_port, + predefined_blocks, ); // simulate some txpool event to see if any block production is erroneously triggered @@ -397,3 +425,188 @@ fn test_signing_key() -> Secret { let secret_key = SecretKey::random(&mut rng); Secret::new(secret_key.into()) } + +#[derive(Debug, PartialEq)] +enum FakeProducedBlock { + Predefined(Block), + New(BlockHeight, Tai64), +} + +struct FakeBlockProducer { + block_sender: tokio::sync::mpsc::Sender, +} + +impl FakeBlockProducer { + fn new() -> (Self, tokio::sync::mpsc::Receiver) { + let (block_sender, receiver) = tokio::sync::mpsc::channel(100); + (Self { block_sender }, receiver) + } +} + +#[async_trait] +impl BlockProducer for FakeBlockProducer { + async fn produce_and_execute_block( + &self, + height: BlockHeight, + block_time: Tai64, + _source: TransactionsSource, + ) -> anyhow::Result> { + self.block_sender + .send(FakeProducedBlock::New(height, block_time)) + .await + .unwrap(); + Ok(UncommittedResult::new( + ExecutionResult { + block: Default::default(), + skipped_transactions: Default::default(), + tx_status: Default::default(), + events: Default::default(), + }, + Default::default(), + )) + } + + async fn produce_predefined_block( + &self, + block: &Block, + ) -> anyhow::Result> { + self.block_sender + .send(FakeProducedBlock::Predefined(block.clone())) + .await + .unwrap(); + Ok(UncommittedResult::new( + ExecutionResult { + block: block.clone(), + skipped_transactions: Default::default(), + tx_status: Default::default(), + events: Default::default(), + }, + Default::default(), + )) + } +} + +fn block_for_height(height: u32) -> Block { + let mut header = PartialBlockHeader::default(); + header.consensus.height = height.into(); + let transactions = vec![]; + Block::new(header, transactions, Default::default(), Default::default()).unwrap() +} + +#[tokio::test] +async fn consensus_service__run__will_include_sequential_predefined_blocks_before_new_blocks( +) { + // given + let blocks: [(BlockHeight, Block); 3] = [ + (2u32.into(), block_for_height(2)), + (3u32.into(), block_for_height(3)), + (4u32.into(), block_for_height(4)), + ]; + let blocks_map: HashMap<_, _> = blocks.clone().into_iter().collect(); + let (block_producer, mut block_receiver) = FakeBlockProducer::new(); + let last_block = BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()); + let config = Config { + trigger: Trigger::Interval { + block_time: Duration::from_millis(100), + }, + signing_key: Some(test_signing_key()), + metrics: false, + ..Default::default() + }; + let mut block_importer = MockBlockImporter::default(); + block_importer.expect_commit_result().returning(|_| Ok(())); + block_importer + .expect_block_stream() + .returning(|| Box::pin(tokio_stream::empty())); + let mut rng = StdRng::seed_from_u64(0); + let tx = make_tx(&mut rng); + let TxPoolContext { txpool, .. } = MockTransactionPool::new_with_txs(vec![tx]); + let task = MainTask::new( + &last_block, + config, + txpool, + block_producer, + block_importer, + generate_p2p_port(), + InMemoryPredefinedBlocks::new(blocks_map), + ); + + // when + let service = ServiceRunner::new(task); + service.start().unwrap(); + + // then + for (_, block) in blocks { + let expected = FakeProducedBlock::Predefined(block); + let actual = block_receiver.recv().await.unwrap(); + assert_eq!(expected, actual); + } + let maybe_produced_block = block_receiver.recv().await.unwrap(); + assert!(matches! { + maybe_produced_block, + FakeProducedBlock::New(_, _) + }); +} + +#[tokio::test] +async fn consensus_service__run__will_insert_predefined_blocks_in_correct_order() { + // given + let predefined_blocks: &[Option<(BlockHeight, Block)>] = &[ + None, + Some((3u32.into(), block_for_height(3))), + None, + Some((5u32.into(), block_for_height(5))), + None, + Some((7u32.into(), block_for_height(7))), + None, + ]; + let predefined_blocks_map: HashMap<_, _> = predefined_blocks + .iter() + .flat_map(|x| x.to_owned()) + .collect(); + let (block_producer, mut block_receiver) = FakeBlockProducer::new(); + let last_block = BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()); + let config = Config { + trigger: Trigger::Interval { + block_time: Duration::from_millis(100), + }, + signing_key: Some(test_signing_key()), + metrics: false, + ..Default::default() + }; + let mut block_importer = MockBlockImporter::default(); + block_importer.expect_commit_result().returning(|_| Ok(())); + block_importer + .expect_block_stream() + .returning(|| Box::pin(tokio_stream::empty())); + let mut rng = StdRng::seed_from_u64(0); + let tx = make_tx(&mut rng); + let TxPoolContext { txpool, .. } = MockTransactionPool::new_with_txs(vec![tx]); + let task = MainTask::new( + &last_block, + config, + txpool, + block_producer, + block_importer, + generate_p2p_port(), + InMemoryPredefinedBlocks::new(predefined_blocks_map), + ); + + // when + let service = ServiceRunner::new(task); + service.start().unwrap(); + + // then + for maybe_predefined in predefined_blocks { + let actual = block_receiver.recv().await.unwrap(); + if let Some((_, block)) = maybe_predefined { + let expected = FakeProducedBlock::Predefined(block.clone()); + assert_eq!(expected, actual); + } else { + assert!(matches! { + actual, + FakeProducedBlock::New(_, _) + }); + } + } +} diff --git a/crates/services/producer/Cargo.toml b/crates/services/producer/Cargo.toml index 51c7b3aee53..5a6e8b91fbc 100644 --- a/crates/services/producer/Cargo.toml +++ b/crates/services/producer/Cargo.toml @@ -24,6 +24,7 @@ tracing = { workspace = true } fuel-core-producer = { path = "", features = ["test-helpers"] } fuel-core-trace = { path = "../../trace" } fuel-core-types = { path = "../../types", features = ["test-helpers"] } +proptest = { workspace = true } rand = { workspace = true } [features] diff --git a/crates/services/producer/proptest-regressions/block_producer/tests.txt b/crates/services/producer/proptest-regressions/block_producer/tests.txt new file mode 100644 index 00000000000..bd2f192bf33 --- /dev/null +++ b/crates/services/producer/proptest-regressions/block_producer/tests.txt @@ -0,0 +1,7 @@ +# Seeds for failure cases proptest has generated in the past. It is +# automatically read and these particular cases re-run before any +# novel cases are generated. +# +# It is recommended to check this file in to source control so that +# everyone who runs the test benefits from these saved cases. +cc dd33a239ab8e44231c0bdfc62e5712d60afc5404211bdce6bd6404ea6ed143fc # shrinks to block = V1(BlockV1 { header: V1(BlockHeaderV1 { application: ApplicationHeader { da_height: DaBlockHeight(1), consensus_parameters_version: 0, state_transition_bytecode_version: 7, generated: GeneratedApplicationFields { transactions_count: 1, message_receipt_count: 0, transactions_root: 167ff38d512ce7cfc6a39f25bf541c65d35b05a50226ab5c43179efc9a3e92e0, message_outbox_root: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855, event_inbox_root: 0000000000000000000000000000000000000000000000000000000000000000 } }, consensus: ConsensusHeader { prev_root: 0000000000000000000000000000000000000000000000000000000000000000, height: 00000001, time: Tai64(4611686018427387914), generated: GeneratedConsensusFields { application_hash: 706ec8c8b182f5cc589bda36f4a218e0e4fc5e850bd21ea48194bc5d7b21e827 } }, metadata: Some(BlockHeaderMetadata { id: BlockId(f6e071429d32c8d90c22552c05d489e89d0c4a3d17d4a753efc16210b917882e) }) }), transactions: [Mint(Mint { tx_pointer: TxPointer { block_height: 00000000, tx_index: 0 }, input_contract: Contract { utxo_id: UtxoId { tx_id: 0000000000000000000000000000000000000000000000000000000000000000, output_index: 0 }, balance_root: 0000000000000000000000000000000000000000000000000000000000000000, state_root: 0000000000000000000000000000000000000000000000000000000000000000, tx_pointer: TxPointer { block_height: 00000000, tx_index: 0 }, contract_id: 0000000000000000000000000000000000000000000000000000000000000000 }, output_contract: Contract { input_index: 0, balance_root: 0000000000000000000000000000000000000000000000000000000000000000, state_root: 0000000000000000000000000000000000000000000000000000000000000000 }, mint_amount: 0, mint_asset_id: 0000000000000000000000000000000000000000000000000000000000000000, gas_price: 0, metadata: None })] }) diff --git a/crates/services/producer/src/block_producer.rs b/crates/services/producer/src/block_producer.rs index e86120473a2..4ccb7d57d7e 100644 --- a/crates/services/producer/src/block_producer.rs +++ b/crates/services/producer/src/block_producer.rs @@ -17,6 +17,7 @@ use fuel_core_storage::transactional::{ }; use fuel_core_types::{ blockchain::{ + block::Block, header::{ ApplicationHeader, ConsensusHeader, @@ -24,7 +25,13 @@ use fuel_core_types::{ }, primitives::DaBlockHeight, }, - fuel_tx::Transaction, + fuel_tx::{ + field::{ + InputContract, + MintGasPrice, + }, + Transaction, + }, fuel_types::{ BlockHeight, Bytes32, @@ -88,6 +95,64 @@ pub struct Producer + Producer +where + ViewProvider: AtomicView + 'static, + ViewProvider::LatestView: BlockProducerDatabase, + ConsensusProvider: ConsensusParametersProvider, +{ + pub async fn produce_and_execute_predefined( + &self, + predefined_block: &Block, + ) -> anyhow::Result> + where + Executor: ports::BlockProducer> + 'static, + { + let _production_guard = self.lock.lock().await; + + let mut transactions_source = predefined_block.transactions().to_vec(); + + let height = predefined_block.header().consensus().height; + + let block_time = predefined_block.header().consensus().time; + + let da_height = predefined_block.header().application().da_height; + + let header_to_produce = self + .new_header_with_da_height(height, block_time, da_height) + .await?; + + let maybe_mint_tx = transactions_source.pop(); + let mint_tx = + maybe_mint_tx + .and_then(|tx| tx.as_mint().cloned()) + .ok_or(anyhow!( + "The last transaction in the block should be a mint transaction" + ))?; + + let gas_price = *mint_tx.gas_price(); + let coinbase_recipient = mint_tx.input_contract().contract_id; + + let component = Components { + header_to_produce, + transactions_source, + coinbase_recipient, + gas_price, + }; + + let result = self + .executor + .produce_without_commit(component) + .map_err(Into::::into) + .with_context(|| { + format!("Failed to produce block {height:?} due to execution failure") + })?; + + debug!("Produced block with result: {:?}", result.result()); + Ok(result) + } +} impl Producer where @@ -296,7 +361,17 @@ where Ok(block_header) } - + /// Create the header for a new block at the provided height + async fn new_header_with_da_height( + &self, + height: BlockHeight, + block_time: Tai64, + da_height: DaBlockHeight, + ) -> anyhow::Result { + let mut block_header = self._new_header(height, block_time)?; + block_header.application.da_height = da_height; + Ok(block_header) + } async fn select_new_da_height( &self, gas_limit: u64, diff --git a/crates/services/producer/src/block_producer/tests.rs b/crates/services/producer/src/block_producer/tests.rs index fb2c7859f24..14cfdc63f2a 100644 --- a/crates/services/producer/src/block_producer/tests.rs +++ b/crates/services/producer/src/block_producer/tests.rs @@ -6,6 +6,7 @@ use crate::{ GasPriceProvider, MockConsensusParametersProvider, }, + Bytes32, Error, }, mocks::{ @@ -23,6 +24,7 @@ use fuel_core_producer as _; use fuel_core_types::{ blockchain::{ block::{ + Block, CompressedBlock, PartialFuelBlock, }, @@ -33,7 +35,14 @@ use fuel_core_types::{ }, primitives::DaBlockHeight, }, - fuel_tx::ConsensusParameters, + fuel_tx, + fuel_tx::{ + field::InputContract, + ConsensusParameters, + Mint, + Script, + Transaction, + }, fuel_types::BlockHeight, services::executor::Error as ExecutorError, tai64::Tai64, @@ -534,6 +543,157 @@ mod produce_and_execute_block_txpool { } } +use fuel_core_types::fuel_tx::field::MintGasPrice; +use proptest::{ + prop_compose, + proptest, +}; + +prop_compose! { + fn arb_block()(height in 1..255u8, da_height in 1..255u64, gas_price: u64, coinbase_recipient: [u8; 32], num_txs in 0..100u32) -> Block { + let mut txs : Vec<_> = (0..num_txs).map(|_| Transaction::Script(Script::default())).collect(); + let mut inner_mint = Mint::default(); + *inner_mint.gas_price_mut() = gas_price; + *inner_mint.input_contract_mut() = fuel_tx::input::contract::Contract{ + contract_id: coinbase_recipient.into(), + ..Default::default() + }; + + let mint = Transaction::Mint(inner_mint); + txs.push(mint); + let header = PartialBlockHeader { + consensus: ConsensusHeader { + height: (height as u32).into(), + ..Default::default() + }, + application: ApplicationHeader { + da_height: DaBlockHeight(da_height), + ..Default::default() + }, + }; + let outbox_message_ids = vec![]; + let event_inbox_root = Bytes32::default(); + Block::new(header, txs, &outbox_message_ids, event_inbox_root).unwrap() + } +} + +#[allow(clippy::arithmetic_side_effects)] +fn ctx_for_block( + block: &Block, + executor: MockExecutorWithCapture, +) -> TestContext> { + let prev_height = block.header().height().pred().unwrap(); + let prev_da_height = block.header().da_height.as_u64() - 1; + TestContextBuilder::new() + .with_prev_height(prev_height) + .with_prev_da_height(prev_da_height.into()) + .build_with_executor(executor) +} + +// gas_price +proptest! { + #[test] + fn produce_and_execute_predefined_block__contains_expected_gas_price(block in arb_block()) { + let rt = multithreaded_runtime(); + + // given + let executor = MockExecutorWithCapture::default(); + let ctx = ctx_for_block(&block, executor.clone()); + + //when + let _ = rt.block_on(ctx.producer().produce_and_execute_predefined(&block)).unwrap(); + + // then + let expected_gas_price = *block + .transactions().last().and_then(|tx| tx.as_mint()).unwrap().gas_price(); + let captured = executor.captured.lock().unwrap(); + let actual = captured.as_ref().unwrap().gas_price; + assert_eq!(expected_gas_price, actual); + } + + // time + #[test] + fn produce_and_execute_predefined_block__contains_expected_time(block in arb_block()) { + let rt = multithreaded_runtime(); + + // given + let executor = MockExecutorWithCapture::default(); + let ctx = ctx_for_block(&block, executor.clone()); + + //when + let _ = rt.block_on(ctx.producer().produce_and_execute_predefined(&block)).unwrap(); + + // then + let expected_time = block.header().consensus().time; + let captured = executor.captured.lock().unwrap(); + let actual = captured.as_ref().unwrap().header_to_produce.consensus.time; + assert_eq!(expected_time, actual); + } + + // coinbase + #[test] + fn produce_and_execute_predefined_block__contains_expected_coinbase_recipient(block in arb_block()) { + let rt = multithreaded_runtime(); + + // given + let executor = MockExecutorWithCapture::default(); + let ctx = ctx_for_block(&block, executor.clone()); + + //when + let _ = rt.block_on(ctx.producer().produce_and_execute_predefined(&block)).unwrap(); + + // then + let expected_coinbase = block.transactions().last().and_then(|tx| tx.as_mint()).unwrap().input_contract().contract_id; + let captured = executor.captured.lock().unwrap(); + let actual = captured.as_ref().unwrap().coinbase_recipient; + assert_eq!(expected_coinbase, actual); + } + + // DA height + #[test] + fn produce_and_execute_predefined_block__contains_expected_da_height(block in arb_block()) { + let rt = multithreaded_runtime(); + + // given + let executor = MockExecutorWithCapture::default(); + let ctx = ctx_for_block(&block, executor.clone()); + + //when + let _ = rt.block_on(ctx.producer().produce_and_execute_predefined(&block)).unwrap(); + + // then + let expected_da_height = block.header().application().da_height; + let captured = executor.captured.lock().unwrap(); + let actual = captured.as_ref().unwrap().header_to_produce.application.da_height; + assert_eq!(expected_da_height, actual); + } + + #[test] + fn produce_and_execute_predefined_block__do_not_include_original_mint_in_txs_source(block in arb_block()) { + let rt = multithreaded_runtime(); + + // given + let executor = MockExecutorWithCapture::default(); + let ctx = ctx_for_block(&block, executor.clone()); + + //when + let _ = rt.block_on(ctx.producer().produce_and_execute_predefined(&block)).unwrap(); + + // then + let captured = executor.captured.lock().unwrap(); + let txs_source = &captured.as_ref().unwrap().transactions_source; + let has_a_mint = txs_source.iter().any(|tx| matches!(tx, Transaction::Mint(_))); + assert!(!has_a_mint); + } +} + +fn multithreaded_runtime() -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() +} + struct TestContext { config: Config, db: MockDb, @@ -727,4 +887,46 @@ impl TestContextBuilder { ..TestContext::default_from_db(db) } } + + fn build_with_executor(&self, executor: Ex) -> TestContext { + let da_height = self.prev_da_height; + let previous_block = PartialFuelBlock { + header: PartialBlockHeader { + application: ApplicationHeader { + da_height, + ..Default::default() + }, + consensus: ConsensusHeader { + height: self.prev_height, + ..Default::default() + }, + }, + transactions: vec![], + } + .generate(&[], Default::default()) + .unwrap() + .compress(&Default::default()); + + let db = MockDb { + blocks: Arc::new(Mutex::new( + vec![(self.prev_height, previous_block)] + .into_iter() + .collect(), + )), + consensus_parameters_version: 0, + state_transition_bytecode_version: 0, + }; + + let mock_relayer = MockRelayer { + latest_block_height: self.latest_block_height, + latest_da_blocks_with_costs: self.blocks_with_gas_costs.clone(), + ..Default::default() + }; + + TestContext { + relayer: mock_relayer, + block_gas_limit: self.block_gas_limit.unwrap_or_default(), + ..TestContext::default_from_db_and_executor(db, executor) + } + } } diff --git a/crates/services/producer/src/mocks.rs b/crates/services/producer/src/mocks.rs index 95fe741c34c..191bfdb20ed 100644 --- a/crates/services/producer/src/mocks.rs +++ b/crates/services/producer/src/mocks.rs @@ -24,6 +24,7 @@ use fuel_core_types::{ }, primitives::DaBlockHeight, }, + fuel_tx::Transaction, fuel_types::{ Address, BlockHeight, @@ -50,7 +51,6 @@ use std::{ Mutex, }, }; - // TODO: Replace mocks with `mockall`. #[derive(Default, Clone)] @@ -107,7 +107,7 @@ impl AsRef for MockDb { } } -fn to_block(component: &Components>) -> Block { +fn arc_pool_tx_comp_to_block(component: &Components>) -> Block { let transactions = component .transactions_source .clone() @@ -123,12 +123,23 @@ fn to_block(component: &Components>) -> Block { .unwrap() } +fn tx_comp_to_block(component: &Components>) -> Block { + let transactions = component.transactions_source.clone(); + Block::new( + component.header_to_produce, + transactions, + &[], + Default::default(), + ) + .unwrap() +} + impl BlockProducer> for MockExecutor { fn produce_without_commit( &self, component: Components>, ) -> ExecutorResult> { - let block = to_block(&component); + let block = arc_pool_tx_comp_to_block(&component); // simulate executor inserting a block let mut block_db = self.0.blocks.lock().unwrap(); block_db.insert( @@ -159,7 +170,7 @@ impl BlockProducer> for FailingMockExecutor { if let Some(err) = err.take() { Err(err) } else { - let block = to_block(&component); + let block = arc_pool_tx_comp_to_block(&component); Ok(UncommittedResult::new( ExecutionResult { block, @@ -174,16 +185,35 @@ impl BlockProducer> for FailingMockExecutor { } #[derive(Clone)] -pub struct MockExecutorWithCapture { - pub captured: Arc>>>>, +pub struct MockExecutorWithCapture { + pub captured: Arc>>>>, } -impl BlockProducer> for MockExecutorWithCapture { +impl BlockProducer> for MockExecutorWithCapture { fn produce_without_commit( &self, component: Components>, ) -> ExecutorResult> { - let block = to_block(&component); + let block = arc_pool_tx_comp_to_block(&component); + *self.captured.lock().unwrap() = Some(component); + Ok(UncommittedResult::new( + ExecutionResult { + block, + skipped_transactions: vec![], + tx_status: vec![], + events: vec![], + }, + Default::default(), + )) + } +} + +impl BlockProducer> for MockExecutorWithCapture { + fn produce_without_commit( + &self, + component: Components>, + ) -> ExecutorResult> { + let block = tx_comp_to_block(&component); *self.captured.lock().unwrap() = Some(component); Ok(UncommittedResult::new( ExecutionResult { @@ -197,7 +227,7 @@ impl BlockProducer> for MockExecutorWithCapture { } } -impl Default for MockExecutorWithCapture { +impl Default for MockExecutorWithCapture { fn default() -> Self { Self { captured: Arc::new(Mutex::new(None)), diff --git a/tests/tests/snapshot.rs b/tests/tests/snapshot.rs index 56b6aaa74a3..33a2848dc4a 100644 --- a/tests/tests/snapshot.rs +++ b/tests/tests/snapshot.rs @@ -10,7 +10,10 @@ use fuel_core::{ FuelService, }, }; -use fuel_core_poa::ports::Database; +use fuel_core_poa::{ + ports::Database, + Trigger, +}; use fuel_core_storage::transactional::AtomicView; use fuel_core_types::blockchain::primitives::DaBlockHeight; use rand::{ @@ -36,7 +39,10 @@ async fn loads_snapshot() { }), ..StateConfig::randomize(&mut rng) }; - let config = Config::local_node_with_state_config(starting_state.clone()); + // Disable block production + let mut config = Config::local_node_with_state_config(starting_state.clone()); + config.debug = false; + config.block_production = Trigger::Never; // setup server & client let _ = FuelService::from_combined_database(db.clone(), config)