diff --git a/CHANGELOG.md b/CHANGELOG.md index 201435abcaa..11971d144e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ Description of the upcoming release here. ### Changed - [#1591](https://github.com/FuelLabs/fuel-core/pull/1591): Simplify libp2p dependencies and not depend on all sub modules directly. +- [#1590](https://github.com/FuelLabs/fuel-core/pull/1590): Use `AtomicView` in the `TxPool` to read the state of the database during insertion of the transactions. - [#1587](https://github.com/FuelLabs/fuel-core/pull/1587): Use `BlockHeight` as a primary key for the `FuelsBlock` table. - [#1585](https://github.com/FuelLabs/fuel-core/pull/1585): Let `NetworkBehaviour` macro generate `FuelBehaviorEvent` in p2p - [#1579](https://github.com/FuelLabs/fuel-core/pull/1579): The change extracts the off-chain-related logic from the executor and moves it to the GraphQL off-chain worker. It creates two new concepts - Off-chain and On-chain databases where the GraphQL worker has exclusive ownership of the database and may modify it without intersecting with the On-chain database. diff --git a/crates/fuel-core/src/graphql_api/api_service.rs b/crates/fuel-core/src/graphql_api/api_service.rs index 15023a5995f..6f4e26c2fbb 100644 --- a/crates/fuel-core/src/graphql_api/api_service.rs +++ b/crates/fuel-core/src/graphql_api/api_service.rs @@ -1,13 +1,11 @@ use crate::{ fuel_core_graphql_api::{ - database::{ - OffChainView, - OnChainView, - }, metrics_extension::MetricsExtension, ports::{ BlockProducerPort, ConsensusModulePort, + OffChainDatabase, + OnChainDatabase, P2pPort, TxPoolPort, }, @@ -178,8 +176,10 @@ pub fn new_service( request_timeout: Duration, ) -> anyhow::Result where - OnChain: AtomicView + 'static, - OffChain: AtomicView + 'static, + OnChain: AtomicView + 'static, + OffChain: AtomicView + 'static, + OnChain::View: OnChainDatabase, + OffChain::View: OffChainDatabase, { let network_addr = config.addr; let combined_read_database = ReadDatabase::new(on_database, off_database); diff --git a/crates/fuel-core/src/graphql_api/database.rs b/crates/fuel-core/src/graphql_api/database.rs index eb0a3c00f93..3b59cfb7723 100644 --- a/crates/fuel-core/src/graphql_api/database.rs +++ b/crates/fuel-core/src/graphql_api/database.rs @@ -1,11 +1,16 @@ -use crate::fuel_core_graphql_api::ports::{ - DatabaseBlocks, - DatabaseChain, - DatabaseContracts, - DatabaseMessageProof, - DatabaseMessages, - OffChainDatabase, - OnChainDatabase, +mod arc_wrapper; + +use crate::fuel_core_graphql_api::{ + database::arc_wrapper::ArcWrapper, + ports::{ + DatabaseBlocks, + DatabaseChain, + DatabaseContracts, + DatabaseMessageProof, + DatabaseMessages, + OffChainDatabase, + OnChainDatabase, + }, }; use fuel_core_storage::{ iter::{ @@ -64,21 +69,23 @@ pub type OffChainView = Arc; /// It is used only by `ViewExtension` to create a [`ReadView`]. pub struct ReadDatabase { /// The on-chain database view provider. - on_chain: Box>, + on_chain: Box>, /// The off-chain database view provider. - off_chain: Box>, + off_chain: Box>, } impl ReadDatabase { /// Creates a new [`ReadDatabase`] with the given on-chain and off-chain database view providers. pub fn new(on_chain: OnChain, off_chain: OffChain) -> Self where - OnChain: AtomicView + 'static, - OffChain: AtomicView + 'static, + OnChain: AtomicView + 'static, + OffChain: AtomicView + 'static, + OnChain::View: OnChainDatabase, + OffChain::View: OffChainDatabase, { Self { - on_chain: Box::new(on_chain), - off_chain: Box::new(off_chain), + on_chain: Box::new(ArcWrapper::new(on_chain)), + off_chain: Box::new(ArcWrapper::new(off_chain)), } } diff --git a/crates/fuel-core/src/graphql_api/database/arc_wrapper.rs b/crates/fuel-core/src/graphql_api/database/arc_wrapper.rs new file mode 100644 index 00000000000..470e7e9b81a --- /dev/null +++ b/crates/fuel-core/src/graphql_api/database/arc_wrapper.rs @@ -0,0 +1,66 @@ +use crate::fuel_core_graphql_api::{ + database::{ + OffChainView, + OnChainView, + }, + ports::{ + OffChainDatabase, + OnChainDatabase, + }, +}; +use fuel_core_storage::{ + transactional::AtomicView, + Result as StorageResult, +}; +use fuel_core_types::fuel_types::BlockHeight; +use std::sync::Arc; + +/// The GraphQL can't work with the generics in [`async_graphql::Context::data_unchecked`] and requires a known type. +/// It is an `Arc` wrapper around the generic for on-chain and off-chain databases. +pub struct ArcWrapper { + inner: Provider, + _marker: core::marker::PhantomData, +} + +impl ArcWrapper { + pub fn new(inner: Provider) -> Self { + Self { + inner, + _marker: core::marker::PhantomData, + } + } +} + +impl AtomicView for ArcWrapper +where + Provider: AtomicView, + View: OnChainDatabase + 'static, +{ + type View = OnChainView; + + fn view_at(&self, height: BlockHeight) -> StorageResult { + let view = self.inner.view_at(height)?; + Ok(Arc::new(view)) + } + + fn latest_view(&self) -> Self::View { + Arc::new(self.inner.latest_view()) + } +} + +impl AtomicView for ArcWrapper +where + Provider: AtomicView, + View: OffChainDatabase + 'static, +{ + type View = OffChainView; + + fn view_at(&self, height: BlockHeight) -> StorageResult { + let view = self.inner.view_at(height)?; + Ok(Arc::new(view)) + } + + fn latest_view(&self) -> Self::View { + Arc::new(self.inner.latest_view()) + } +} diff --git a/crates/fuel-core/src/service/adapters/graphql_api.rs b/crates/fuel-core/src/service/adapters/graphql_api.rs index e83efc44e08..b6f303a9b89 100644 --- a/crates/fuel-core/src/service/adapters/graphql_api.rs +++ b/crates/fuel-core/src/service/adapters/graphql_api.rs @@ -18,7 +18,10 @@ use crate::{ }; use async_trait::async_trait; use fuel_core_services::stream::BoxStream; -use fuel_core_storage::Result as StorageResult; +use fuel_core_storage::{ + transactional::AtomicView, + Result as StorageResult, +}; use fuel_core_txpool::{ service::TxStatusMessage, types::TxId, @@ -145,3 +148,18 @@ impl worker::BlockImporter for BlockImporterAdapter { ) } } + +impl AtomicView for Database { + type View = Database; + + fn view_at(&self, _: BlockHeight) -> StorageResult { + unimplemented!( + "Unimplemented until of the https://github.com/FuelLabs/fuel-core/issues/451" + ) + } + + fn latest_view(&self) -> Self::View { + // TODO: https://github.com/FuelLabs/fuel-core/issues/1581 + self.clone() + } +} diff --git a/crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs b/crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs index 86fc7002a02..a892b84c2bf 100644 --- a/crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs +++ b/crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs @@ -3,12 +3,9 @@ use crate::{ transactions::OwnedTransactionIndexCursor, Database, }, - fuel_core_graphql_api::{ - database::OffChainView, - ports::{ - worker, - OffChainDatabase, - }, + fuel_core_graphql_api::ports::{ + worker, + OffChainDatabase, }, }; use fuel_core_storage::{ @@ -18,7 +15,6 @@ use fuel_core_storage::{ IterDirection, }, not_found, - transactional::AtomicView, Error as StorageError, Result as StorageResult, }; @@ -36,7 +32,6 @@ use fuel_core_types::{ }, services::txpool::TransactionStatus, }; -use std::sync::Arc; impl OffChainDatabase for Database { fn owned_message_ids( @@ -83,19 +78,6 @@ impl OffChainDatabase for Database { } } -impl AtomicView for Database { - fn view_at(&self, _: BlockHeight) -> StorageResult { - unimplemented!( - "Unimplemented until of the https://github.com/FuelLabs/fuel-core/issues/451" - ) - } - - fn latest_view(&self) -> OffChainView { - // TODO: https://github.com/FuelLabs/fuel-core/issues/1581 - Arc::new(self.clone()) - } -} - impl worker::OffChainDatabase for Database { fn record_tx_id_owner( &mut self, diff --git a/crates/fuel-core/src/service/adapters/graphql_api/on_chain.rs b/crates/fuel-core/src/service/adapters/graphql_api/on_chain.rs index d09f045cfb0..09ec40a9897 100644 --- a/crates/fuel-core/src/service/adapters/graphql_api/on_chain.rs +++ b/crates/fuel-core/src/service/adapters/graphql_api/on_chain.rs @@ -1,14 +1,11 @@ use crate::{ database::Database, - fuel_core_graphql_api::{ - database::OnChainView, - ports::{ - DatabaseBlocks, - DatabaseChain, - DatabaseContracts, - DatabaseMessages, - OnChainDatabase, - }, + fuel_core_graphql_api::ports::{ + DatabaseBlocks, + DatabaseChain, + DatabaseContracts, + DatabaseMessages, + OnChainDatabase, }, }; use fuel_core_importer::ports::ImporterDatabase; @@ -20,7 +17,6 @@ use fuel_core_storage::{ }, not_found, tables::FuelBlocks, - transactional::AtomicView, Error as StorageError, Result as StorageResult, }; @@ -41,7 +37,6 @@ use fuel_core_types::{ }, services::graphql_api::ContractBalance, }; -use std::sync::Arc; impl DatabaseBlocks for Database { fn block_height(&self, id: &BlockId) -> StorageResult { @@ -130,16 +125,3 @@ impl DatabaseChain for Database { } impl OnChainDatabase for Database {} - -impl AtomicView for Database { - fn view_at(&self, _: BlockHeight) -> StorageResult { - unimplemented!( - "Unimplemented until of the https://github.com/FuelLabs/fuel-core/issues/451" - ) - } - - fn latest_view(&self) -> OnChainView { - // TODO: https://github.com/FuelLabs/fuel-core/issues/1581 - Arc::new(self.clone()) - } -} diff --git a/crates/fuel-core/src/service/adapters/txpool.rs b/crates/fuel-core/src/service/adapters/txpool.rs index ccd33474df6..d06fc1face0 100644 --- a/crates/fuel-core/src/service/adapters/txpool.rs +++ b/crates/fuel-core/src/service/adapters/txpool.rs @@ -27,7 +27,6 @@ use fuel_core_types::{ UtxoId, }, fuel_types::{ - BlockHeight, ContractId, Nonce, }, @@ -139,8 +138,4 @@ impl fuel_core_txpool::ports::TxPoolDb for Database { fn is_message_spent(&self, id: &Nonce) -> StorageResult { self.storage::().contains_key(id) } - - fn current_block_height(&self) -> StorageResult { - self.latest_height() - } } diff --git a/crates/fuel-core/src/service/sub_services.rs b/crates/fuel-core/src/service/sub_services.rs index ba8dc05e93a..84e941e15f2 100644 --- a/crates/fuel-core/src/service/sub_services.rs +++ b/crates/fuel-core/src/service/sub_services.rs @@ -51,6 +51,7 @@ pub fn init_sub_services( let last_block = database.get_current_block()?.ok_or(anyhow::anyhow!( "The blockchain is not initialized with any block" ))?; + let last_height = *last_block.header().height(); #[cfg(feature = "relayer")] let relayer_service = if let Some(config) = &config.relayer { Some(fuel_core_relayer::new_service( @@ -140,6 +141,7 @@ pub fn init_sub_services( database.clone(), importer_adapter.clone(), p2p_adapter.clone(), + last_height, ); let tx_pool_adapter = TxPoolAdapter::new(txpool.shared.clone()); diff --git a/crates/services/txpool/Cargo.toml b/crates/services/txpool/Cargo.toml index 9c07108646c..fa0cee10746 100644 --- a/crates/services/txpool/Cargo.toml +++ b/crates/services/txpool/Cargo.toml @@ -28,6 +28,7 @@ tracing = { workspace = true } [dev-dependencies] fuel-core-trace = { path = "./../../trace" } fuel-core-txpool = { path = "", features = ["test-helpers"] } +fuel-core-types = { path = "../../types", features = ["test-helpers"] } itertools = { workspace = true } mockall = { workspace = true } proptest = { workspace = true } diff --git a/crates/services/txpool/src/mock_db.rs b/crates/services/txpool/src/mock_db.rs index 5435585a3f1..b12c1c1fd9a 100644 --- a/crates/services/txpool/src/mock_db.rs +++ b/crates/services/txpool/src/mock_db.rs @@ -1,5 +1,8 @@ use crate::ports::TxPoolDb; -use fuel_core_storage::Result as StorageResult; +use fuel_core_storage::{ + transactional::AtomicView, + Result as StorageResult, +}; use fuel_core_types::{ entities::{ coins::coin::{ @@ -91,8 +94,18 @@ impl TxPoolDb for MockDb { fn is_message_spent(&self, id: &Nonce) -> StorageResult { Ok(self.data.lock().unwrap().spent_messages.contains(id)) } +} + +pub struct MockDBProvider(pub MockDb); + +impl AtomicView for MockDBProvider { + type View = MockDb; + + fn view_at(&self, _: BlockHeight) -> StorageResult { + Ok(self.latest_view()) + } - fn current_block_height(&self) -> StorageResult { - Ok(Default::default()) + fn latest_view(&self) -> Self::View { + self.0.clone() } } diff --git a/crates/services/txpool/src/ports.rs b/crates/services/txpool/src/ports.rs index 375d7066982..7a32746c7ef 100644 --- a/crates/services/txpool/src/ports.rs +++ b/crates/services/txpool/src/ports.rs @@ -10,7 +10,6 @@ use fuel_core_types::{ UtxoId, }, fuel_types::{ - BlockHeight, ContractId, Nonce, }, @@ -55,6 +54,4 @@ pub trait TxPoolDb: Send + Sync { fn message(&self, message_id: &Nonce) -> StorageResult>; fn is_message_spent(&self, message_id: &Nonce) -> StorageResult; - - fn current_block_height(&self) -> StorageResult; } diff --git a/crates/services/txpool/src/service.rs b/crates/services/txpool/src/service.rs index 38ac9b75929..50e61fab098 100644 --- a/crates/services/txpool/src/service.rs +++ b/crates/services/txpool/src/service.rs @@ -51,6 +51,7 @@ use fuel_core_types::{ }; use anyhow::anyhow; +use fuel_core_storage::transactional::AtomicView; use fuel_core_types::services::block_importer::SharedImportResult; use parking_lot::Mutex as ParkingMutex; use std::{ @@ -119,45 +120,46 @@ impl TxStatusChange { } } -pub struct SharedState { +pub struct SharedState { tx_status_sender: TxStatusChange, - txpool: Arc>>, + txpool: Arc>>, p2p: Arc, consensus_params: ConsensusParameters, - db: DB, + current_height: Arc>, config: Config, } -impl Clone for SharedState { +impl Clone for SharedState { fn clone(&self) -> Self { Self { tx_status_sender: self.tx_status_sender.clone(), txpool: self.txpool.clone(), p2p: self.p2p.clone(), consensus_params: self.consensus_params.clone(), - db: self.db.clone(), + current_height: self.current_height.clone(), config: self.config.clone(), } } } -pub struct Task { +pub struct Task { gossiped_tx_stream: BoxStream, committed_block_stream: BoxStream, - shared: SharedState, + shared: SharedState, ttl_timer: tokio::time::Interval, } #[async_trait::async_trait] -impl RunnableService for Task +impl RunnableService for Task where - P2P: PeerToPeer + Send + Sync, - DB: TxPoolDb + Clone, + P2P: PeerToPeer, + ViewProvider: AtomicView, + View: TxPoolDb, { const NAME: &'static str = "TxPool"; - type SharedData = SharedState; - type Task = Task; + type SharedData = SharedState; + type Task = Task; type TaskParams = (); fn shared_data(&self) -> Self::SharedData { @@ -175,10 +177,11 @@ where } #[async_trait::async_trait] -impl RunnableTask for Task +impl RunnableTask for Task where - P2P: PeerToPeer + Send + Sync, - DB: TxPoolDb, + P2P: PeerToPeer, + ViewProvider: AtomicView, + View: TxPoolDb, { async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { let should_continue; @@ -201,14 +204,22 @@ where result = self.committed_block_stream.next() => { if let Some(result) = result { + let new_height = *result + .sealed_block + .entity.header().height(); + let block = &result .sealed_block .entity; - self.shared.txpool.lock().block_update( - &self.shared.tx_status_sender, - block, - &result.tx_status, - ); + { + let mut lock = self.shared.txpool.lock(); + lock.block_update( + &self.shared.tx_status_sender, + block, + &result.tx_status, + ); + *self.shared.current_height.lock() = new_height; + } should_continue = true; } else { should_continue = false; @@ -218,7 +229,7 @@ where new_transaction = self.gossiped_tx_stream.next() => { if let Some(GossipData { data: Some(tx), message_id, peer_id }) = new_transaction { let id = tx.id(&self.shared.consensus_params.chain_id); - let current_height = self.shared.db.current_block_height()?; + let current_height = *self.shared.current_height.lock(); // verify tx let checked_tx = check_single_tx(tx, current_height, &self.shared.config).await; @@ -282,10 +293,7 @@ where // Instead, `fuel-core` can create a `DatabaseWithTxPool` that aggregates `TxPool` and // storage `Database` together. GraphQL will retrieve data from this `DatabaseWithTxPool` via // `StorageInspect` trait. -impl SharedState -where - DB: TxPoolDb, -{ +impl SharedState { pub fn pending_number(&self) -> usize { self.txpool.lock().pending_number() } @@ -337,10 +345,11 @@ where } } -impl SharedState +impl SharedState where P2P: PeerToPeer, - DB: TxPoolDb, + ViewProvider: AtomicView, + View: TxPoolDb, { #[tracing::instrument(name = "insert_submitted_txn", skip_all)] pub async fn insert( @@ -348,11 +357,7 @@ where txs: Vec>, ) -> Vec> { // verify txs - let block_height = self.db.current_block_height(); - let current_height = match block_height { - Ok(val) => val, - Err(e) => return vec![Err(e.into())], - }; + let current_height = *self.current_height.lock(); let checked_txs = check_transactions(&txs, current_height, &self.config).await; @@ -430,16 +435,18 @@ pub enum TxStatusMessage { FailedStatus, } -pub fn new_service( +pub fn new_service( config: Config, - db: DB, + provider: ViewProvider, importer: Importer, p2p: P2P, -) -> Service + current_height: BlockHeight, +) -> Service where Importer: BlockImporter, P2P: PeerToPeer + 'static, - DB: TxPoolDb + Clone + 'static, + ViewProvider: AtomicView, + ViewProvider::View: TxPoolDb, { let p2p = Arc::new(p2p); let gossiped_tx_stream = p2p.gossiped_transaction_events(); @@ -448,7 +455,7 @@ where ttl_timer.set_missed_tick_behavior(MissedTickBehavior::Skip); let consensus_params = config.chain_config.consensus_parameters.clone(); let number_of_active_subscription = config.number_of_active_subscription; - let txpool = Arc::new(ParkingMutex::new(TxPool::new(config.clone(), db.clone()))); + let txpool = Arc::new(ParkingMutex::new(TxPool::new(config.clone(), provider))); let task = Task { gossiped_tx_stream, committed_block_stream, @@ -464,7 +471,7 @@ where txpool, p2p, consensus_params, - db, + current_height: Arc::new(ParkingMutex::new(current_height)), config, }, ttl_timer, diff --git a/crates/services/txpool/src/service/test_helpers.rs b/crates/services/txpool/src/service/test_helpers.rs index 3cf532bfa8b..3aea0044ff2 100644 --- a/crates/services/txpool/src/service/test_helpers.rs +++ b/crates/services/txpool/src/service/test_helpers.rs @@ -1,5 +1,6 @@ use super::*; use crate::{ + mock_db::MockDBProvider, ports::BlockImporter, MockDb, }; @@ -31,7 +32,7 @@ use std::cell::RefCell; type GossipedTransaction = GossipData; pub struct TestContext { - pub(crate) service: Service, + pub(crate) service: Service, mock_db: MockDb, rng: RefCell, } @@ -41,7 +42,7 @@ impl TestContext { TestContextBuilder::new().build_and_start().await } - pub fn service(&self) -> &Service { + pub fn service(&self) -> &Service { &self.service } @@ -193,7 +194,13 @@ impl TestContextBuilder { .importer .unwrap_or_else(|| MockImporter::with_blocks(vec![])); - let service = new_service(config, mock_db.clone(), importer, p2p); + let service = new_service( + config, + MockDBProvider(mock_db.clone()), + importer, + p2p, + Default::default(), + ); TestContext { service, diff --git a/crates/services/txpool/src/service/update_sender/tests/test_e2e.rs b/crates/services/txpool/src/service/update_sender/tests/test_e2e.rs index b3871b06e86..482839b6679 100644 --- a/crates/services/txpool/src/service/update_sender/tests/test_e2e.rs +++ b/crates/services/txpool/src/service/update_sender/tests/test_e2e.rs @@ -144,7 +144,7 @@ fn test_update_sender_inner(ops: Vec) { Op::DropRecv(i) => { // Real if i < receivers.len() { - receivers.remove(i); + let _ = receivers.remove(i); } // Model if i < model_receivers.len() { diff --git a/crates/services/txpool/src/service/update_sender/tests/test_subscribe.rs b/crates/services/txpool/src/service/update_sender/tests/test_subscribe.rs index 4c0795be410..936cbadaae9 100644 --- a/crates/services/txpool/src/service/update_sender/tests/test_subscribe.rs +++ b/crates/services/txpool/src/service/update_sender/tests/test_subscribe.rs @@ -23,7 +23,7 @@ fn test_subscriber(input: Input) { let Input { tx_id, senders } = input; let mut senders = box_senders(senders); let len_before = senders.values().map(|v| v.len()).sum::(); - subscribe::<_, MockCreateChannel>( + let _ = subscribe::<_, MockCreateChannel>( Bytes32::from([tx_id; 32]), &mut senders, Box::new(()), diff --git a/crates/services/txpool/src/test_helpers.rs b/crates/services/txpool/src/test_helpers.rs index 3c487ccb5c5..5586abee542 100644 --- a/crates/services/txpool/src/test_helpers.rs +++ b/crates/services/txpool/src/test_helpers.rs @@ -1,7 +1,12 @@ // Rust isn't smart enough to detect cross module test deps #![allow(dead_code)] -use crate::MockDb; +use crate::{ + mock_db::MockDBProvider, + Config, + MockDb, + TxPool, +}; use fuel_core_types::{ entities::coins::coin::{ Coin, @@ -11,6 +16,7 @@ use fuel_core_types::{ fuel_crypto::rand::{ rngs::StdRng, Rng, + SeedableRng, }, fuel_tx::{ field::Inputs, @@ -39,6 +45,85 @@ use fuel_core_types::{ // the byte and gas price fees. pub const TEST_COIN_AMOUNT: u64 = 100_000_000u64; +pub(crate) struct TextContext { + mock_db: MockDb, + rng: StdRng, + config: Option, +} + +impl Default for TextContext { + fn default() -> Self { + Self { + mock_db: MockDb::default(), + rng: StdRng::seed_from_u64(0), + config: None, + } + } +} + +impl TextContext { + pub(crate) fn database_mut(&mut self) -> &mut MockDb { + &mut self.mock_db + } + + pub(crate) fn config(self, config: Config) -> Self { + Self { + config: Some(config), + ..self + } + } + + pub(crate) fn build(self) -> TxPool { + TxPool::new( + self.config.unwrap_or_default(), + MockDBProvider(self.mock_db), + ) + } + + pub(crate) fn setup_coin(&mut self) -> (Coin, Input) { + setup_coin(&mut self.rng, Some(&self.mock_db)) + } + + pub(crate) fn create_output_and_input( + &mut self, + amount: Word, + ) -> (Output, UnsetInput) { + let input = self.random_predicate(AssetId::BASE, amount, None); + let output = Output::coin(*input.input_owner().unwrap(), amount, AssetId::BASE); + (output, UnsetInput(input)) + } + + pub(crate) fn random_predicate( + &mut self, + asset_id: AssetId, + amount: Word, + utxo_id: Option, + ) -> Input { + random_predicate(&mut self.rng, asset_id, amount, utxo_id) + } + + pub(crate) fn custom_predicate( + &mut self, + asset_id: AssetId, + amount: Word, + code: Vec, + utxo_id: Option, + ) -> Input { + let owner = Input::predicate_owner(&code); + Input::coin_predicate( + utxo_id.unwrap_or_else(|| self.rng.gen()), + owner, + amount, + asset_id, + Default::default(), + Default::default(), + Default::default(), + code, + vec![], + ) + } +} + pub(crate) fn setup_coin(rng: &mut StdRng, mock_db: Option<&MockDb>) -> (Coin, Input) { let input = random_predicate(rng, AssetId::BASE, TEST_COIN_AMOUNT, None); add_coin_to_state(input, mock_db) @@ -64,32 +149,6 @@ pub(crate) fn add_coin_to_state(input: Input, mock_db: Option<&MockDb>) -> (Coin (coin.uncompress(utxo_id), input) } -pub(crate) fn create_output_and_input( - rng: &mut StdRng, - amount: Word, -) -> (Output, UnsetInput) { - let input = random_predicate(rng, AssetId::BASE, amount, None); - let output = Output::coin(*input.input_owner().unwrap(), amount, AssetId::BASE); - (output, UnsetInput(input)) -} - -pub struct UnsetInput(Input); - -impl UnsetInput { - pub fn into_input(self, new_utxo_id: UtxoId) -> Input { - let mut input = self.0; - match &mut input { - Input::CoinSigned(CoinSigned { utxo_id, .. }) - | Input::CoinPredicate(CoinPredicate { utxo_id, .. }) - | Input::Contract(Contract { utxo_id, .. }) => { - *utxo_id = new_utxo_id; - } - _ => {} - } - input - } -} - pub(crate) fn random_predicate( rng: &mut StdRng, asset_id: AssetId, @@ -115,25 +174,21 @@ pub(crate) fn random_predicate( .into_default_estimated() } -pub(crate) fn custom_predicate( - rng: &mut StdRng, - asset_id: AssetId, - amount: Word, - code: Vec, - utxo_id: Option, -) -> Input { - let owner = Input::predicate_owner(&code); - Input::coin_predicate( - utxo_id.unwrap_or_else(|| rng.gen()), - owner, - amount, - asset_id, - Default::default(), - Default::default(), - Default::default(), - code, - vec![], - ) +pub struct UnsetInput(Input); + +impl UnsetInput { + pub fn into_input(self, new_utxo_id: UtxoId) -> Input { + let mut input = self.0; + match &mut input { + Input::CoinSigned(CoinSigned { utxo_id, .. }) + | Input::CoinPredicate(CoinPredicate { utxo_id, .. }) + | Input::Contract(Contract { utxo_id, .. }) => { + *utxo_id = new_utxo_id; + } + _ => {} + } + input + } } pub trait IntoEstimated { diff --git a/crates/services/txpool/src/txpool.rs b/crates/services/txpool/src/txpool.rs index 1c3c0376e8d..63a84a803b5 100644 --- a/crates/services/txpool/src/txpool.rs +++ b/crates/services/txpool/src/txpool.rs @@ -37,6 +37,7 @@ use fuel_core_types::{ use crate::service::TxStatusMessage; use fuel_core_metrics::txpool_metrics::txpool_metrics; +use fuel_core_storage::transactional::AtomicView; use fuel_core_types::{ blockchain::block::Block, fuel_vm::checked_transaction::CheckPredicateParams, @@ -54,20 +55,17 @@ use std::{ use tokio_rayon::AsyncRayonHandle; #[derive(Debug, Clone)] -pub struct TxPool { +pub struct TxPool { by_hash: HashMap, by_gas_price: PriceSort, by_time: TimeSort, by_dependency: Dependency, config: Config, - database: DB, + database: ViewProvider, } -impl TxPool -where - DB: TxPoolDb, -{ - pub fn new(config: Config, database: DB) -> Self { +impl TxPool { + pub fn new(config: Config, database: ViewProvider) -> Self { let max_depth = config.max_depth; Self { @@ -93,94 +91,6 @@ where &self.by_dependency } - #[tracing::instrument(level = "info", skip_all, fields(tx_id = %tx.id()), ret, err)] - // this is atomic operation. Return removed(pushed out/replaced) transactions - fn insert_inner( - &mut self, - tx: Checked, - ) -> anyhow::Result { - let tx: CheckedTransaction = tx.into(); - - let tx = Arc::new(match tx { - CheckedTransaction::Script(script) => PoolTransaction::Script(script), - CheckedTransaction::Create(create) => PoolTransaction::Create(create), - CheckedTransaction::Mint(_) => { - return Err(anyhow::anyhow!("Mint transactions is not supported")) - } - }); - - if !tx.is_computed() { - return Err(Error::NoMetadata.into()) - } - - // verify max gas is less than block limit - if tx.max_gas() > self.config.chain_config.block_gas_limit { - return Err(Error::NotInsertedMaxGasLimit { - tx_gas: tx.max_gas(), - block_limit: self.config.chain_config.block_gas_limit, - } - .into()) - } - - if self.by_hash.contains_key(&tx.id()) { - return Err(Error::NotInsertedTxKnown.into()) - } - - let mut max_limit_hit = false; - // check if we are hitting limit of pool - if self.by_hash.len() >= self.config.max_tx { - max_limit_hit = true; - // limit is hit, check if we can push out lowest priced tx - let lowest_price = self.by_gas_price.lowest_value().unwrap_or_default(); - if lowest_price >= tx.price() { - return Err(Error::NotInsertedLimitHit.into()) - } - } - if self.config.metrics { - txpool_metrics() - .gas_price_histogram - .observe(tx.price() as f64); - - txpool_metrics() - .tx_size_histogram - .observe(tx.metered_bytes_size() as f64); - } - // check and insert dependency - let rem = self - .by_dependency - .insert(&self.by_hash, &self.database, &tx)?; - let info = TxInfo::new(tx.clone()); - let submitted_time = info.submitted_time(); - self.by_gas_price.insert(&info); - self.by_time.insert(&info); - self.by_hash.insert(tx.id(), info); - - // if some transaction were removed so we don't need to check limit - let removed = if rem.is_empty() { - if max_limit_hit { - // remove last tx from sort - let rem_tx = self.by_gas_price.lowest_tx().unwrap(); // safe to unwrap limit is hit - self.remove_inner(&rem_tx); - vec![rem_tx] - } else { - Vec::new() - } - } else { - // remove ret from by_hash and from by_price - for rem in rem.iter() { - self.remove_tx(&rem.id()); - } - - rem - }; - - Ok(InsertionResult { - inserted: tx, - submitted_time, - removed, - }) - } - /// Return all sorted transactions that are includable in next block. pub fn sorted_includable(&self) -> impl Iterator + '_ { self.by_gas_price @@ -228,47 +138,6 @@ where self.remove_by_tx_id(tx_id) } - #[tracing::instrument(level = "info", skip_all)] - /// Import a set of transactions from network gossip or GraphQL endpoints. - pub fn insert( - &mut self, - tx_status_sender: &TxStatusChange, - txs: Vec>, - ) -> Vec> { - // Check if that data is okay (witness match input/output, and if recovered signatures ara valid). - // should be done before transaction comes to txpool, or before it enters RwLocked region. - let mut res = Vec::new(); - - for tx in txs.into_iter() { - res.push(self.insert_inner(tx)); - } - - // announce to subscribers - for ret in res.iter() { - match ret { - Ok(InsertionResult { - removed, - inserted, - submitted_time, - }) => { - for removed in removed { - // small todo there is possibility to have removal reason (ReplacedByHigherGas, DependencyRemoved) - // but for now it is okay to just use Error::Removed. - tx_status_sender.send_squeezed_out(removed.id(), Error::Removed); - } - tx_status_sender.send_submitted( - inserted.id(), - Tai64::from_unix(submitted_time.as_secs() as i64), - ); - } - Err(_) => { - // @dev should not broadcast tx if error occurred - } - } - } - res - } - /// find all tx by its hash pub fn find(&self, hashes: &[TxId]) -> Vec> { let mut res = Vec::with_capacity(hashes.len()); @@ -385,6 +254,150 @@ where } } +impl TxPool +where + ViewProvider: AtomicView, + View: TxPoolDb, +{ + #[cfg(test)] + fn insert_single( + &mut self, + tx: Checked, + ) -> anyhow::Result { + let view = self.database.latest_view(); + self.insert_inner(tx, &view) + } + + #[tracing::instrument(level = "info", skip_all, fields(tx_id = %tx.id()), ret, err)] + // this is atomic operation. Return removed(pushed out/replaced) transactions + fn insert_inner( + &mut self, + tx: Checked, + view: &View, + ) -> anyhow::Result { + let tx: CheckedTransaction = tx.into(); + + let tx = Arc::new(match tx { + CheckedTransaction::Script(script) => PoolTransaction::Script(script), + CheckedTransaction::Create(create) => PoolTransaction::Create(create), + CheckedTransaction::Mint(_) => { + return Err(anyhow::anyhow!("Mint transactions is not supported")) + } + }); + + if !tx.is_computed() { + return Err(Error::NoMetadata.into()) + } + + // verify max gas is less than block limit + if tx.max_gas() > self.config.chain_config.block_gas_limit { + return Err(Error::NotInsertedMaxGasLimit { + tx_gas: tx.max_gas(), + block_limit: self.config.chain_config.block_gas_limit, + } + .into()) + } + + if self.by_hash.contains_key(&tx.id()) { + return Err(Error::NotInsertedTxKnown.into()) + } + + let mut max_limit_hit = false; + // check if we are hitting limit of pool + if self.by_hash.len() >= self.config.max_tx { + max_limit_hit = true; + // limit is hit, check if we can push out lowest priced tx + let lowest_price = self.by_gas_price.lowest_value().unwrap_or_default(); + if lowest_price >= tx.price() { + return Err(Error::NotInsertedLimitHit.into()) + } + } + if self.config.metrics { + txpool_metrics() + .gas_price_histogram + .observe(tx.price() as f64); + + txpool_metrics() + .tx_size_histogram + .observe(tx.metered_bytes_size() as f64); + } + // check and insert dependency + let rem = self.by_dependency.insert(&self.by_hash, view, &tx)?; + let info = TxInfo::new(tx.clone()); + let submitted_time = info.submitted_time(); + self.by_gas_price.insert(&info); + self.by_time.insert(&info); + self.by_hash.insert(tx.id(), info); + + // if some transaction were removed so we don't need to check limit + let removed = if rem.is_empty() { + if max_limit_hit { + // remove last tx from sort + let rem_tx = self.by_gas_price.lowest_tx().unwrap(); // safe to unwrap limit is hit + self.remove_inner(&rem_tx); + vec![rem_tx] + } else { + Vec::new() + } + } else { + // remove ret from by_hash and from by_price + for rem in rem.iter() { + self.remove_tx(&rem.id()); + } + + rem + }; + + Ok(InsertionResult { + inserted: tx, + submitted_time, + removed, + }) + } + + #[tracing::instrument(level = "info", skip_all)] + /// Import a set of transactions from network gossip or GraphQL endpoints. + pub fn insert( + &mut self, + tx_status_sender: &TxStatusChange, + txs: Vec>, + ) -> Vec> { + // Check if that data is okay (witness match input/output, and if recovered signatures ara valid). + // should be done before transaction comes to txpool, or before it enters RwLocked region. + let mut res = Vec::new(); + let view = self.database.latest_view(); + + for tx in txs.into_iter() { + res.push(self.insert_inner(tx, &view)); + } + + // announce to subscribers + for ret in res.iter() { + match ret { + Ok(InsertionResult { + removed, + inserted, + submitted_time, + }) => { + for removed in removed { + // small todo there is possibility to have removal reason (ReplacedByHigherGas, DependencyRemoved) + // but for now it is okay to just use Error::Removed. + tx_status_sender.send_squeezed_out(removed.id(), Error::Removed); + } + tx_status_sender.send_submitted( + inserted.id(), + Tai64::from_unix(submitted_time.as_secs() as i64), + ); + } + Err(_) => { + // @dev should not broadcast tx if error occurred + } + } + } + res + } +} + pub async fn check_transactions( txs: &[Arc], current_height: BlockHeight, diff --git a/crates/services/txpool/src/txpool/tests.rs b/crates/services/txpool/src/txpool/tests.rs index 2e4c7706d56..8e572c2abd5 100644 --- a/crates/services/txpool/src/txpool/tests.rs +++ b/crates/services/txpool/src/txpool/tests.rs @@ -1,12 +1,7 @@ use crate::{ - ports::TxPoolDb, test_helpers::{ - add_coin_to_state, - create_output_and_input, - custom_predicate, - random_predicate, - setup_coin, IntoEstimated, + TextContext, TEST_COIN_AMOUNT, }, txpool::test_helpers::{ @@ -17,8 +12,6 @@ use crate::{ }, Config, Error, - MockDb, - TxPool, }; use fuel_core_types::{ fuel_asm::{ @@ -26,10 +19,6 @@ use fuel_core_types::{ RegId, Word, }, - fuel_crypto::rand::{ - rngs::StdRng, - SeedableRng, - }, fuel_tx::{ input::coin::CoinPredicate, Address, @@ -45,7 +34,6 @@ use fuel_core_types::{ fuel_types::ChainId, fuel_vm::checked_transaction::Checked, }; - use std::{ cmp::Reverse, collections::HashMap, @@ -56,51 +44,43 @@ use super::check_single_tx; const GAS_LIMIT: Word = 1000; -async fn check_unwrap_tx( - tx: Transaction, - db: MockDb, - config: &Config, -) -> Checked { - check_single_tx(tx, db.current_block_height().unwrap(), config) +async fn check_unwrap_tx(tx: Transaction, config: &Config) -> Checked { + check_single_tx(tx, Default::default(), config) .await .expect("Transaction should be checked") } async fn check_tx( tx: Transaction, - db: MockDb, config: &Config, ) -> anyhow::Result> { - check_single_tx(tx, db.current_block_height().unwrap(), config).await + check_single_tx(tx, Default::default(), config).await } #[tokio::test] async fn insert_simple_tx_succeeds() { - let mut rng = StdRng::seed_from_u64(0); - let db = MockDb::default(); - let mut txpool = TxPool::new(Default::default(), db.clone()); + let mut context = TextContext::default(); - let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); + let (_, gas_coin) = context.setup_coin(); let tx = TransactionBuilder::script(vec![], vec![]) .script_gas_limit(GAS_LIMIT) .add_input(gas_coin) .finalize_as_transaction(); - let tx = check_unwrap_tx(tx, db.clone(), &txpool.config).await; + let mut txpool = context.build(); + let tx = check_unwrap_tx(tx, &txpool.config).await; txpool - .insert_inner(tx) + .insert_single(tx) .expect("Transaction should be OK, got Err"); } #[tokio::test] async fn insert_simple_tx_dependency_chain_succeeds() { - let mut rng = StdRng::seed_from_u64(0); - let db = MockDb::default(); - let mut txpool = TxPool::new(Default::default(), db.clone()); + let mut context = TextContext::default(); - let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); - let (output, unset_input) = create_output_and_input(&mut rng, 1); + let (_, gas_coin) = context.setup_coin(); + let (output, unset_input) = context.create_output_and_input(1); let tx1 = TransactionBuilder::script(vec![], vec![]) .gas_price(1) .script_gas_limit(GAS_LIMIT) @@ -108,7 +88,7 @@ async fn insert_simple_tx_dependency_chain_succeeds() { .add_output(output) .finalize_as_transaction(); - let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); + let (_, gas_coin) = context.setup_coin(); let input = unset_input.into_input(UtxoId::new(tx1.id(&Default::default()), 0)); let tx2 = TransactionBuilder::script(vec![], vec![]) .gas_price(1) @@ -117,26 +97,27 @@ async fn insert_simple_tx_dependency_chain_succeeds() { .add_input(gas_coin) .finalize_as_transaction(); - let tx1 = check_unwrap_tx(tx1, db.clone(), &txpool.config).await; - let tx2 = check_unwrap_tx(tx2, db.clone(), &txpool.config).await; + let mut txpool = context.build(); + let tx1 = check_unwrap_tx(tx1, &txpool.config).await; + let tx2 = check_unwrap_tx(tx2, &txpool.config).await; - txpool.insert_inner(tx1).expect("Tx1 should be OK, got Err"); txpool - .insert_inner(tx2) + .insert_single(tx1) + .expect("Tx1 should be OK, got Err"); + txpool + .insert_single(tx2) .expect("Tx2 dependent should be OK, got Err"); } #[tokio::test] async fn faulty_t2_collided_on_contract_id_from_tx1() { - let mut rng = StdRng::seed_from_u64(0); - let db = MockDb::default(); - let mut txpool = TxPool::new(Default::default(), db.clone()); + let mut context = TextContext::default(); let contract_id = Contract::EMPTY_CONTRACT_ID; // contract creation tx - let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); - let (output, unset_input) = create_output_and_input(&mut rng, 10); + let (_, gas_coin) = context.setup_coin(); + let (output, unset_input) = context.create_output_and_input(10); let tx = TransactionBuilder::create( Default::default(), Default::default(), @@ -148,7 +129,7 @@ async fn faulty_t2_collided_on_contract_id_from_tx1() { .add_output(output) .finalize_as_transaction(); - let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); + let (_, gas_coin) = context.setup_coin(); let input = unset_input.into_input(UtxoId::new(tx.id(&Default::default()), 1)); // attempt to insert a different creation tx with a valid dependency on the first tx, @@ -165,13 +146,14 @@ async fn faulty_t2_collided_on_contract_id_from_tx1() { .add_output(output) .finalize_as_transaction(); - let tx = check_unwrap_tx(tx, db.clone(), &txpool.config).await; - txpool.insert_inner(tx).expect("Tx1 should be Ok, got Err"); + let mut txpool = context.build(); + let tx = check_unwrap_tx(tx, &txpool.config).await; + txpool.insert_single(tx).expect("Tx1 should be Ok, got Err"); - let tx_faulty = check_unwrap_tx(tx_faulty, db.clone(), &txpool.config).await; + let tx_faulty = check_unwrap_tx(tx_faulty, &txpool.config).await; let err = txpool - .insert_inner(tx_faulty) + .insert_single(tx_faulty) .expect_err("Tx2 should be Err, got Ok"); assert!(matches!( err.downcast_ref::(), @@ -181,12 +163,10 @@ async fn faulty_t2_collided_on_contract_id_from_tx1() { #[tokio::test] async fn fail_to_insert_tx_with_dependency_on_invalid_utxo_type() { - let mut rng = StdRng::seed_from_u64(0); - let db = MockDb::default(); - let mut txpool = TxPool::new(Default::default(), db.clone()); + let mut context = TextContext::default(); let contract_id = Contract::EMPTY_CONTRACT_ID; - let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); + let (_, gas_coin) = context.setup_coin(); let tx_faulty = TransactionBuilder::create( Default::default(), Default::default(), @@ -201,25 +181,25 @@ async fn fail_to_insert_tx_with_dependency_on_invalid_utxo_type() { let tx = TransactionBuilder::script(vec![], vec![]) .gas_price(1) .script_gas_limit(GAS_LIMIT) - .add_input(random_predicate( - &mut rng, + .add_input(context.random_predicate( AssetId::BASE, TEST_COIN_AMOUNT, Some(UtxoId::new(tx_faulty.id(&Default::default()), 0)), )) .finalize_as_transaction(); + let mut txpool = context.build(); let tx_faulty_id = tx_faulty.id(&ChainId::default()); - let tx_faulty = check_unwrap_tx(tx_faulty, db.clone(), &txpool.config).await; + let tx_faulty = check_unwrap_tx(tx_faulty, &txpool.config).await; txpool - .insert_inner(tx_faulty.clone()) + .insert_single(tx_faulty.clone()) .expect("Tx1 should be Ok, got Err"); - let tx = check_unwrap_tx(tx, db.clone(), &txpool.config).await; + let tx = check_unwrap_tx(tx, &txpool.config).await; let err = txpool - .insert_inner(tx) + .insert_single(tx) .expect_err("Tx2 should be Err, got Ok"); assert!(matches!( err.downcast_ref::(), @@ -233,18 +213,18 @@ async fn not_inserted_known_tx() { utxo_validation: false, ..Default::default() }; - let db = MockDb::default(); - let mut txpool = TxPool::new(config, db.clone()); + let context = TextContext::default().config(config); + let mut txpool = context.build(); let tx = Transaction::default_test_tx(); - let tx = check_unwrap_tx(tx, db.clone(), &txpool.config).await; + let tx = check_unwrap_tx(tx, &txpool.config).await; txpool - .insert_inner(tx.clone()) + .insert_single(tx.clone()) .expect("Tx1 should be Ok, got Err"); let err = txpool - .insert_inner(tx) + .insert_single(tx) .expect_err("Second insertion of Tx1 should be Err, got Ok"); assert!(matches!( err.downcast_ref::(), @@ -254,20 +234,20 @@ async fn not_inserted_known_tx() { #[tokio::test] async fn try_to_insert_tx2_missing_utxo() { - let mut rng = StdRng::seed_from_u64(0); - let mut txpool = TxPool::new(Default::default(), MockDb::default()); + let mut context = TextContext::default(); - let (_, input) = setup_coin(&mut rng, None); + let input = context.random_predicate(AssetId::BASE, TEST_COIN_AMOUNT, None); let tx = TransactionBuilder::script(vec![], vec![]) .gas_price(10) .script_gas_limit(GAS_LIMIT) .add_input(input) .finalize_as_transaction(); - let tx = check_unwrap_tx(tx, txpool.database.clone(), &txpool.config).await; + let mut txpool = context.build(); + let tx = check_unwrap_tx(tx, &txpool.config).await; let err = txpool - .insert_inner(tx) + .insert_single(tx) .expect_err("Tx should be Err, got Ok"); assert!(matches!( err.downcast_ref::(), @@ -277,11 +257,9 @@ async fn try_to_insert_tx2_missing_utxo() { #[tokio::test] async fn higher_priced_tx_removes_lower_priced_tx() { - let mut rng = StdRng::seed_from_u64(0); - let db = MockDb::default(); - let mut txpool = TxPool::new(Default::default(), db.clone()); + let mut context = TextContext::default(); - let (_, coin_input) = setup_coin(&mut rng, Some(&txpool.database)); + let (_, coin_input) = context.setup_coin(); let tx1 = TransactionBuilder::script(vec![], vec![]) .gas_price(10) @@ -296,26 +274,27 @@ async fn higher_priced_tx_removes_lower_priced_tx() { .finalize_as_transaction(); let tx1_id = tx1.id(&ChainId::default()); - let tx1 = check_unwrap_tx(tx1, db.clone(), &txpool.config).await; + let mut txpool = context.build(); + let tx1 = check_unwrap_tx(tx1, &txpool.config).await; txpool - .insert_inner(tx1.clone()) + .insert_single(tx1.clone()) .expect("Tx1 should be Ok, got Err"); - let tx2 = check_unwrap_tx(tx2, db.clone(), &txpool.config).await; + let tx2 = check_unwrap_tx(tx2, &txpool.config).await; - let vec = txpool.insert_inner(tx2).expect("Tx2 should be Ok, got Err"); + let vec = txpool + .insert_single(tx2) + .expect("Tx2 should be Ok, got Err"); assert_eq!(vec.removed[0].id(), tx1_id, "Tx1 id should be removed"); } #[tokio::test] async fn underpriced_tx1_not_included_coin_collision() { - let mut rng = StdRng::seed_from_u64(0); - let db = MockDb::default(); - let mut txpool = TxPool::new(Default::default(), db.clone()); + let mut context = TextContext::default(); - let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); - let (output, unset_input) = create_output_and_input(&mut rng, 10); + let (_, gas_coin) = context.setup_coin(); + let (output, unset_input) = context.create_output_and_input(10); let tx1 = TransactionBuilder::script(vec![], vec![]) .gas_price(20) .script_gas_limit(GAS_LIMIT) @@ -337,19 +316,20 @@ async fn underpriced_tx1_not_included_coin_collision() { .add_input(input) .finalize_as_transaction(); - let tx1_checked = check_unwrap_tx(tx1.clone(), db.clone(), txpool.config()).await; + let mut txpool = context.build(); + let tx1_checked = check_unwrap_tx(tx1.clone(), txpool.config()).await; txpool - .insert_inner(tx1_checked) + .insert_single(tx1_checked) .expect("Tx1 should be Ok, got Err"); - let tx2_checked = check_unwrap_tx(tx2.clone(), db.clone(), txpool.config()).await; + let tx2_checked = check_unwrap_tx(tx2.clone(), txpool.config()).await; txpool - .insert_inner(tx2_checked) + .insert_single(tx2_checked) .expect("Tx2 should be Ok, got Err"); - let tx3_checked = check_unwrap_tx(tx3, db.clone(), txpool.config()).await; + let tx3_checked = check_unwrap_tx(tx3, txpool.config()).await; let err = txpool - .insert_inner(tx3_checked) + .insert_single(tx3_checked) .expect_err("Tx3 should be Err, got Ok"); assert!(matches!( err.downcast_ref::(), @@ -359,12 +339,10 @@ async fn underpriced_tx1_not_included_coin_collision() { #[tokio::test] async fn overpriced_tx_contract_input_not_inserted() { - let mut rng = StdRng::seed_from_u64(0); - let db = MockDb::default(); - let mut txpool = TxPool::new(Default::default(), db.clone()); + let mut context = TextContext::default(); let contract_id = Contract::EMPTY_CONTRACT_ID; - let (_, gas_funds) = setup_coin(&mut rng, Some(&txpool.database)); + let (_, gas_funds) = context.setup_coin(); let tx1 = TransactionBuilder::create( Default::default(), Default::default(), @@ -375,7 +353,7 @@ async fn overpriced_tx_contract_input_not_inserted() { .add_output(create_contract_output(contract_id)) .finalize_as_transaction(); - let (_, gas_funds) = setup_coin(&mut rng, Some(&txpool.database)); + let (_, gas_funds) = context.setup_coin(); let tx2 = TransactionBuilder::script(vec![], vec![]) .gas_price(11) .script_gas_limit(GAS_LIMIT) @@ -388,12 +366,15 @@ async fn overpriced_tx_contract_input_not_inserted() { .add_output(Output::contract(1, Default::default(), Default::default())) .finalize_as_transaction(); - let tx1 = check_unwrap_tx(tx1, db.clone(), &txpool.config).await; - txpool.insert_inner(tx1).expect("Tx1 should be Ok, got err"); + let mut txpool = context.build(); + let tx1 = check_unwrap_tx(tx1, &txpool.config).await; + txpool + .insert_single(tx1) + .expect("Tx1 should be Ok, got err"); - let tx2 = check_unwrap_tx(tx2, db.clone(), &txpool.config).await; + let tx2 = check_unwrap_tx(tx2, &txpool.config).await; let err = txpool - .insert_inner(tx2) + .insert_single(tx2) .expect_err("Tx2 should be Err, got Ok"); assert!( matches!( @@ -406,12 +387,10 @@ async fn overpriced_tx_contract_input_not_inserted() { #[tokio::test] async fn dependent_contract_input_inserted() { - let mut rng = StdRng::seed_from_u64(0); - let db = MockDb::default(); - let mut txpool = TxPool::new(Default::default(), db.clone()); + let mut context = TextContext::default(); let contract_id = Contract::EMPTY_CONTRACT_ID; - let (_, gas_funds) = setup_coin(&mut rng, Some(&txpool.database)); + let (_, gas_funds) = context.setup_coin(); let tx1 = TransactionBuilder::create( Default::default(), Default::default(), @@ -422,7 +401,7 @@ async fn dependent_contract_input_inserted() { .add_output(create_contract_output(contract_id)) .finalize_as_transaction(); - let (_, gas_funds) = setup_coin(&mut rng, Some(&txpool.database)); + let (_, gas_funds) = context.setup_coin(); let tx2 = TransactionBuilder::script(vec![], vec![]) .gas_price(10) .script_gas_limit(GAS_LIMIT) @@ -435,21 +414,24 @@ async fn dependent_contract_input_inserted() { .add_output(Output::contract(1, Default::default(), Default::default())) .finalize_as_transaction(); - let tx1 = check_unwrap_tx(tx1, db.clone(), &txpool.config).await; - let tx2 = check_unwrap_tx(tx2, db.clone(), &txpool.config).await; - txpool.insert_inner(tx1).expect("Tx1 should be Ok, got Err"); - txpool.insert_inner(tx2).expect("Tx2 should be Ok, got Err"); + let mut txpool = context.build(); + let tx1 = check_unwrap_tx(tx1, &txpool.config).await; + let tx2 = check_unwrap_tx(tx2, &txpool.config).await; + txpool + .insert_single(tx1) + .expect("Tx1 should be Ok, got Err"); + txpool + .insert_single(tx2) + .expect("Tx2 should be Ok, got Err"); } #[tokio::test] async fn more_priced_tx3_removes_tx1_and_dependent_tx2() { - let mut rng = StdRng::seed_from_u64(0); - let db = MockDb::default(); - let mut txpool = TxPool::new(Default::default(), db.clone()); + let mut context = TextContext::default(); - let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); + let (_, gas_coin) = context.setup_coin(); - let (output, unset_input) = create_output_and_input(&mut rng, 10); + let (output, unset_input) = context.create_output_and_input(10); let tx1 = TransactionBuilder::script(vec![], vec![]) .gas_price(10) .script_gas_limit(GAS_LIMIT) @@ -473,17 +455,20 @@ async fn more_priced_tx3_removes_tx1_and_dependent_tx2() { let tx1_id = tx1.id(&ChainId::default()); let tx2_id = tx2.id(&ChainId::default()); - let tx1 = check_unwrap_tx(tx1, db.clone(), &txpool.config).await; - let tx2 = check_unwrap_tx(tx2, db.clone(), &txpool.config).await; - let tx3 = check_unwrap_tx(tx3, db.clone(), &txpool.config).await; + let mut txpool = context.build(); + let tx1 = check_unwrap_tx(tx1, &txpool.config).await; + let tx2 = check_unwrap_tx(tx2, &txpool.config).await; + let tx3 = check_unwrap_tx(tx3, &txpool.config).await; txpool - .insert_inner(tx1.clone()) + .insert_single(tx1.clone()) .expect("Tx1 should be OK, got Err"); txpool - .insert_inner(tx2.clone()) + .insert_single(tx2.clone()) .expect("Tx2 should be OK, got Err"); - let vec = txpool.insert_inner(tx3).expect("Tx3 should be OK, got Err"); + let vec = txpool + .insert_single(tx3) + .expect("Tx3 should be OK, got Err"); assert_eq!( vec.removed.len(), 2, @@ -495,11 +480,9 @@ async fn more_priced_tx3_removes_tx1_and_dependent_tx2() { #[tokio::test] async fn more_priced_tx2_removes_tx1_and_more_priced_tx3_removes_tx2() { - let mut rng = StdRng::seed_from_u64(0); - let db = MockDb::default(); - let mut txpool = TxPool::new(Default::default(), db.clone()); + let mut context = TextContext::default(); - let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); + let (_, gas_coin) = context.setup_coin(); let tx1 = TransactionBuilder::script(vec![], vec![]) .gas_price(10) @@ -519,14 +502,21 @@ async fn more_priced_tx2_removes_tx1_and_more_priced_tx3_removes_tx2() { .add_input(gas_coin) .finalize_as_transaction(); - let tx1 = check_unwrap_tx(tx1, db.clone(), &txpool.config).await; - let tx2 = check_unwrap_tx(tx2, db.clone(), &txpool.config).await; - let tx3 = check_unwrap_tx(tx3, db.clone(), &txpool.config).await; + let mut txpool = context.build(); + let tx1 = check_unwrap_tx(tx1, &txpool.config).await; + let tx2 = check_unwrap_tx(tx2, &txpool.config).await; + let tx3 = check_unwrap_tx(tx3, &txpool.config).await; - txpool.insert_inner(tx1).expect("Tx1 should be OK, got Err"); - let squeezed = txpool.insert_inner(tx2).expect("Tx2 should be OK, got Err"); + txpool + .insert_single(tx1) + .expect("Tx1 should be OK, got Err"); + let squeezed = txpool + .insert_single(tx2) + .expect("Tx2 should be OK, got Err"); assert_eq!(squeezed.removed.len(), 1); - let squeezed = txpool.insert_inner(tx3).expect("Tx3 should be OK, got Err"); + let squeezed = txpool + .insert_single(tx3) + .expect("Tx3 should be OK, got Err"); assert_eq!( squeezed.removed.len(), 1, @@ -536,35 +526,33 @@ async fn more_priced_tx2_removes_tx1_and_more_priced_tx3_removes_tx2() { #[tokio::test] async fn tx_limit_hit() { - let mut rng = StdRng::seed_from_u64(0); - let db = MockDb::default(); - let mut txpool = TxPool::new( - Config { - max_tx: 1, - ..Default::default() - }, - db.clone(), - ); + let mut context = TextContext::default().config(Config { + max_tx: 1, + ..Default::default() + }); - let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); + let (_, gas_coin) = context.setup_coin(); let tx1 = TransactionBuilder::script(vec![], vec![]) .script_gas_limit(GAS_LIMIT) .add_input(gas_coin) .add_output(create_coin_output()) .finalize_as_transaction(); - let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); + let (_, gas_coin) = context.setup_coin(); let tx2 = TransactionBuilder::script(vec![], vec![]) .script_gas_limit(GAS_LIMIT) .add_input(gas_coin) .finalize_as_transaction(); - let tx1 = check_unwrap_tx(tx1, db.clone(), &txpool.config).await; - let tx2 = check_unwrap_tx(tx2, db.clone(), &txpool.config).await; - txpool.insert_inner(tx1).expect("Tx1 should be Ok, got Err"); + let mut txpool = context.build(); + let tx1 = check_unwrap_tx(tx1, &txpool.config).await; + let tx2 = check_unwrap_tx(tx2, &txpool.config).await; + txpool + .insert_single(tx1) + .expect("Tx1 should be Ok, got Err"); let err = txpool - .insert_inner(tx2) + .insert_single(tx2) .expect_err("Tx2 should be Err, got Ok"); assert!(matches!( err.downcast_ref::(), @@ -574,18 +562,13 @@ async fn tx_limit_hit() { #[tokio::test] async fn tx_depth_hit() { - let mut rng = StdRng::seed_from_u64(0); - let db = MockDb::default(); - let mut txpool = TxPool::new( - Config { - max_depth: 2, - ..Default::default() - }, - db.clone(), - ); + let mut context = TextContext::default().config(Config { + max_depth: 2, + ..Default::default() + }); - let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); - let (output, unset_input) = create_output_and_input(&mut rng, 10_000); + let (_, gas_coin) = context.setup_coin(); + let (output, unset_input) = context.create_output_and_input(10_000); let tx1 = TransactionBuilder::script(vec![], vec![]) .script_gas_limit(GAS_LIMIT) .add_input(gas_coin) @@ -593,7 +576,7 @@ async fn tx_depth_hit() { .finalize_as_transaction(); let input = unset_input.into_input(UtxoId::new(tx1.id(&Default::default()), 0)); - let (output, unset_input) = create_output_and_input(&mut rng, 5_000); + let (output, unset_input) = context.create_output_and_input(5_000); let tx2 = TransactionBuilder::script(vec![], vec![]) .script_gas_limit(GAS_LIMIT) .add_input(input) @@ -606,15 +589,20 @@ async fn tx_depth_hit() { .add_input(input) .finalize_as_transaction(); - let tx1 = check_unwrap_tx(tx1, db.clone(), &txpool.config).await; - let tx2 = check_unwrap_tx(tx2, db.clone(), &txpool.config).await; - let tx3 = check_unwrap_tx(tx3, db.clone(), &txpool.config).await; + let mut txpool = context.build(); + let tx1 = check_unwrap_tx(tx1, &txpool.config).await; + let tx2 = check_unwrap_tx(tx2, &txpool.config).await; + let tx3 = check_unwrap_tx(tx3, &txpool.config).await; - txpool.insert_inner(tx1).expect("Tx1 should be OK, got Err"); - txpool.insert_inner(tx2).expect("Tx2 should be OK, got Err"); + txpool + .insert_single(tx1) + .expect("Tx1 should be OK, got Err"); + txpool + .insert_single(tx2) + .expect("Tx2 should be OK, got Err"); let err = txpool - .insert_inner(tx3) + .insert_single(tx3) .expect_err("Tx3 should be Err, got Ok"); assert!(matches!( err.downcast_ref::(), @@ -624,25 +612,23 @@ async fn tx_depth_hit() { #[tokio::test] async fn sorted_out_tx1_2_4() { - let mut rng = StdRng::seed_from_u64(0); - let db = MockDb::default(); - let mut txpool = TxPool::new(Default::default(), db.clone()); + let mut context = TextContext::default(); - let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); + let (_, gas_coin) = context.setup_coin(); let tx1 = TransactionBuilder::script(vec![], vec![]) .gas_price(10) .script_gas_limit(GAS_LIMIT) .add_input(gas_coin) .finalize_as_transaction(); - let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); + let (_, gas_coin) = context.setup_coin(); let tx2 = TransactionBuilder::script(vec![], vec![]) .gas_price(9) .script_gas_limit(GAS_LIMIT) .add_input(gas_coin) .finalize_as_transaction(); - let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); + let (_, gas_coin) = context.setup_coin(); let tx3 = TransactionBuilder::script(vec![], vec![]) .gas_price(20) .script_gas_limit(GAS_LIMIT) @@ -653,13 +639,20 @@ async fn sorted_out_tx1_2_4() { let tx2_id = tx2.id(&ChainId::default()); let tx3_id = tx3.id(&ChainId::default()); - let tx1 = check_unwrap_tx(tx1, db.clone(), &txpool.config).await; - let tx2 = check_unwrap_tx(tx2, db.clone(), &txpool.config).await; - let tx3 = check_unwrap_tx(tx3, db.clone(), &txpool.config).await; + let mut txpool = context.build(); + let tx1 = check_unwrap_tx(tx1, &txpool.config).await; + let tx2 = check_unwrap_tx(tx2, &txpool.config).await; + let tx3 = check_unwrap_tx(tx3, &txpool.config).await; - txpool.insert_inner(tx1).expect("Tx1 should be Ok, got Err"); - txpool.insert_inner(tx2).expect("Tx2 should be Ok, got Err"); - txpool.insert_inner(tx3).expect("Tx4 should be Ok, got Err"); + txpool + .insert_single(tx1) + .expect("Tx1 should be Ok, got Err"); + txpool + .insert_single(tx2) + .expect("Tx2 should be Ok, got Err"); + txpool + .insert_single(tx3) + .expect("Tx4 should be Ok, got Err"); let txs = txpool.sorted_includable().collect::>(); @@ -671,12 +664,10 @@ async fn sorted_out_tx1_2_4() { #[tokio::test] async fn find_dependent_tx1_tx2() { - let mut rng = StdRng::seed_from_u64(0); - let db = MockDb::default(); - let mut txpool = TxPool::new(Default::default(), db.clone()); + let mut context = TextContext::default(); - let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); - let (output, unset_input) = create_output_and_input(&mut rng, 10_000); + let (_, gas_coin) = context.setup_coin(); + let (output, unset_input) = context.create_output_and_input(10_000); let tx1 = TransactionBuilder::script(vec![], vec![]) .gas_price(11) .script_gas_limit(GAS_LIMIT) @@ -685,7 +676,7 @@ async fn find_dependent_tx1_tx2() { .finalize_as_transaction(); let input = unset_input.into_input(UtxoId::new(tx1.id(&Default::default()), 0)); - let (output, unset_input) = create_output_and_input(&mut rng, 7_500); + let (output, unset_input) = context.create_output_and_input(7_500); let tx2 = TransactionBuilder::script(vec![], vec![]) .gas_price(10) .script_gas_limit(GAS_LIMIT) @@ -704,13 +695,20 @@ async fn find_dependent_tx1_tx2() { let tx2_id = tx2.id(&ChainId::default()); let tx3_id = tx3.id(&ChainId::default()); - let tx1 = check_unwrap_tx(tx1, db.clone(), &txpool.config).await; - let tx2 = check_unwrap_tx(tx2, db.clone(), &txpool.config).await; - let tx3 = check_unwrap_tx(tx3, db.clone(), &txpool.config).await; + let mut txpool = context.build(); + let tx1 = check_unwrap_tx(tx1, &txpool.config).await; + let tx2 = check_unwrap_tx(tx2, &txpool.config).await; + let tx3 = check_unwrap_tx(tx3, &txpool.config).await; - txpool.insert_inner(tx1).expect("Tx0 should be Ok, got Err"); - txpool.insert_inner(tx2).expect("Tx1 should be Ok, got Err"); - let tx3_result = txpool.insert_inner(tx3).expect("Tx2 should be Ok, got Err"); + txpool + .insert_single(tx1) + .expect("Tx0 should be Ok, got Err"); + txpool + .insert_single(tx2) + .expect("Tx1 should be Ok, got Err"); + let tx3_result = txpool + .insert_single(tx3) + .expect("Tx2 should be Ok, got Err"); let mut seen = HashMap::new(); txpool @@ -728,33 +726,28 @@ async fn find_dependent_tx1_tx2() { #[tokio::test] async fn tx_at_least_min_gas_price_is_insertable() { - let mut rng = StdRng::seed_from_u64(0); - let db = MockDb::default(); - let mut txpool = TxPool::new( - Config { - min_gas_price: 10, - ..Default::default() - }, - db.clone(), - ); + let mut context = TextContext::default().config(Config { + min_gas_price: 10, + ..Default::default() + }); - let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); + let (_, gas_coin) = context.setup_coin(); let tx = TransactionBuilder::script(vec![], vec![]) .gas_price(10) .script_gas_limit(GAS_LIMIT) .add_input(gas_coin) .finalize_as_transaction(); - let tx = check_unwrap_tx(tx, txpool.database.clone(), &txpool.config).await; - txpool.insert_inner(tx).expect("Tx should be Ok, got Err"); + let mut txpool = context.build(); + let tx = check_unwrap_tx(tx, &txpool.config).await; + txpool.insert_single(tx).expect("Tx should be Ok, got Err"); } #[tokio::test] async fn tx_below_min_gas_price_is_not_insertable() { - let mut rng = StdRng::seed_from_u64(0); - let db = MockDb::default(); + let mut context = TextContext::default(); - let (_, gas_coin) = setup_coin(&mut rng, Some(&db)); + let gas_coin = context.random_predicate(AssetId::BASE, TEST_COIN_AMOUNT, None); let tx = TransactionBuilder::script(vec![], vec![]) .gas_price(10) .script_gas_limit(GAS_LIMIT) @@ -763,7 +756,6 @@ async fn tx_below_min_gas_price_is_not_insertable() { let err = check_tx( tx, - db, &Config { min_gas_price: 11, ..Default::default() @@ -780,6 +772,7 @@ async fn tx_below_min_gas_price_is_not_insertable() { #[tokio::test] async fn tx_inserted_into_pool_when_input_message_id_exists_in_db() { + let mut context = TextContext::default(); let (message, input) = create_message_predicate_from_message(5000, 0); let tx = TransactionBuilder::script(vec![], vec![]) @@ -787,14 +780,13 @@ async fn tx_inserted_into_pool_when_input_message_id_exists_in_db() { .add_input(input) .finalize_as_transaction(); - let db = MockDb::default(); - db.insert_message(message); + context.database_mut().insert_message(message); let tx1_id = tx.id(&ChainId::default()); - let mut txpool = TxPool::new(Default::default(), db.clone()); + let mut txpool = context.build(); - let tx = check_unwrap_tx(tx, db.clone(), &txpool.config).await; - txpool.insert_inner(tx).expect("should succeed"); + let tx = check_unwrap_tx(tx, &txpool.config).await; + txpool.insert_single(tx).expect("should succeed"); let tx_info = txpool.find_one(&tx1_id).unwrap(); assert_eq!(tx_info.tx().id(), tx1_id); @@ -802,6 +794,7 @@ async fn tx_inserted_into_pool_when_input_message_id_exists_in_db() { #[tokio::test] async fn tx_rejected_when_input_message_id_is_spent() { + let mut context = TextContext::default(); let (message, input) = create_message_predicate_from_message(5_000, 0); let tx = TransactionBuilder::script(vec![], vec![]) @@ -809,13 +802,12 @@ async fn tx_rejected_when_input_message_id_is_spent() { .add_input(input) .finalize_as_transaction(); - let db = MockDb::default(); - db.insert_message(message.clone()); - db.spend_message(*message.id()); - let mut txpool = TxPool::new(Default::default(), db.clone()); + context.database_mut().insert_message(message.clone()); + context.database_mut().spend_message(*message.id()); + let mut txpool = context.build(); - let tx = check_unwrap_tx(tx, db.clone(), &txpool.config).await; - let err = txpool.insert_inner(tx).expect_err("should fail"); + let tx = check_unwrap_tx(tx, &txpool.config).await; + let err = txpool.insert_single(tx).expect_err("should fail"); // check error assert!(matches!( @@ -826,18 +818,18 @@ async fn tx_rejected_when_input_message_id_is_spent() { #[tokio::test] async fn tx_rejected_from_pool_when_input_message_id_does_not_exist_in_db() { + let context = TextContext::default(); let (message, input) = create_message_predicate_from_message(5000, 0); let tx = TransactionBuilder::script(vec![], vec![]) .script_gas_limit(GAS_LIMIT) .add_input(input) .finalize_as_transaction(); - let db = MockDb::default(); // Do not insert any messages into the DB to ensure there is no matching message for the // tx. - let mut txpool = TxPool::new(Default::default(), db.clone()); - let tx = check_unwrap_tx(tx, db.clone(), &txpool.config).await; - let err = txpool.insert_inner(tx).expect_err("should fail"); + let mut txpool = context.build(); + let tx = check_unwrap_tx(tx, &txpool.config).await; + let err = txpool.insert_single(tx).expect_err("should fail"); // check error assert!(matches!( @@ -849,6 +841,7 @@ async fn tx_rejected_from_pool_when_input_message_id_does_not_exist_in_db() { #[tokio::test] async fn tx_rejected_from_pool_when_gas_price_is_lower_than_another_tx_with_same_message_id( ) { + let mut context = TextContext::default(); let message_amount = 10_000; let gas_price_high = 2u64; let gas_price_low = 1u64; @@ -867,25 +860,24 @@ async fn tx_rejected_from_pool_when_gas_price_is_lower_than_another_tx_with_same .add_input(conflicting_message_input) .finalize_as_transaction(); - let db = MockDb::default(); - db.insert_message(message.clone()); + context.database_mut().insert_message(message.clone()); - let mut txpool = TxPool::new(Default::default(), db.clone()); + let mut txpool = context.build(); let tx_high_id = tx_high.id(&ChainId::default()); - let tx_high = check_unwrap_tx(tx_high, db.clone(), &txpool.config).await; + let tx_high = check_unwrap_tx(tx_high, &txpool.config).await; // Insert a tx for the message id with a high gas amount txpool - .insert_inner(tx_high) + .insert_single(tx_high) .expect("expected successful insertion"); - let tx_low = check_unwrap_tx(tx_low, db.clone(), &txpool.config).await; + let tx_low = check_unwrap_tx(tx_low, &txpool.config).await; // Insert a tx for the message id with a low gas amount // Because the new transaction's id matches an existing transaction, we compare the gas // prices of both the new and existing transactions. Since the existing transaction's gas // price is higher, we must now reject the new transaction. - let err = txpool.insert_inner(tx_low).expect_err("expected failure"); + let err = txpool.insert_single(tx_low).expect_err("expected failure"); // check error assert!(matches!( @@ -896,6 +888,7 @@ async fn tx_rejected_from_pool_when_gas_price_is_lower_than_another_tx_with_same #[tokio::test] async fn higher_priced_tx_squeezes_out_lower_priced_tx_with_same_message_id() { + let mut context = TextContext::default(); let message_amount = 10_000; let gas_price_high = 2u64; let gas_price_low = 1u64; @@ -909,13 +902,12 @@ async fn higher_priced_tx_squeezes_out_lower_priced_tx_with_same_message_id() { .add_input(conflicting_message_input.clone()) .finalize_as_transaction(); - let db = MockDb::default(); - db.insert_message(message); + context.database_mut().insert_message(message); - let mut txpool = TxPool::new(Default::default(), db.clone()); + let mut txpool = context.build(); let tx_low_id = tx_low.id(&ChainId::default()); - let tx_low = check_unwrap_tx(tx_low, db.clone(), &txpool.config).await; - txpool.insert_inner(tx_low).expect("should succeed"); + let tx_low = check_unwrap_tx(tx_low, &txpool.config).await; + txpool.insert_single(tx_low).expect("should succeed"); // Insert a tx for the message id with a high gas amount // Because the new transaction's id matches an existing transaction, we compare the gas @@ -926,8 +918,8 @@ async fn higher_priced_tx_squeezes_out_lower_priced_tx_with_same_message_id() { .script_gas_limit(GAS_LIMIT) .add_input(conflicting_message_input) .finalize_as_transaction(); - let tx_high = check_unwrap_tx(tx_high, db.clone(), &txpool.config).await; - let squeezed_out_txs = txpool.insert_inner(tx_high).expect("should succeed"); + let tx_high = check_unwrap_tx(tx_high, &txpool.config).await; + let squeezed_out_txs = txpool.insert_single(tx_high).expect("should succeed"); assert_eq!(squeezed_out_txs.removed.len(), 1); assert_eq!(squeezed_out_txs.removed[0].id(), tx_low_id,); @@ -941,6 +933,7 @@ async fn message_of_squeezed_out_tx_can_be_resubmitted_at_lower_gas_price() { // tx3 (message 2) gas_price 1 // works since tx1 is no longer part of txpool state even though gas price is less + let mut context = TextContext::default(); let (message_1, message_input_1) = create_message_predicate_from_message(10_000, 0); let (message_2, message_input_2) = create_message_predicate_from_message(20_000, 1); @@ -964,38 +957,35 @@ async fn message_of_squeezed_out_tx_can_be_resubmitted_at_lower_gas_price() { .add_input(message_input_2) .finalize_as_transaction(); - let db = MockDb::default(); - db.insert_message(message_1); - db.insert_message(message_2); - let mut txpool = TxPool::new(Default::default(), db.clone()); + context.database_mut().insert_message(message_1); + context.database_mut().insert_message(message_2); + let mut txpool = context.build(); - let tx1 = check_unwrap_tx(tx1, db.clone(), &txpool.config).await; - let tx2 = check_unwrap_tx(tx2, db.clone(), &txpool.config).await; - let tx3 = check_unwrap_tx(tx3, db.clone(), &txpool.config).await; + let tx1 = check_unwrap_tx(tx1, &txpool.config).await; + let tx2 = check_unwrap_tx(tx2, &txpool.config).await; + let tx3 = check_unwrap_tx(tx3, &txpool.config).await; - txpool.insert_inner(tx1).expect("should succeed"); + txpool.insert_single(tx1).expect("should succeed"); - txpool.insert_inner(tx2).expect("should succeed"); + txpool.insert_single(tx2).expect("should succeed"); - txpool.insert_inner(tx3).expect("should succeed"); + txpool.insert_single(tx3).expect("should succeed"); } #[tokio::test] async fn predicates_with_incorrect_owner_fails() { - let mut rng = StdRng::seed_from_u64(0); - let db = MockDb::default(); - let mut coin = random_predicate(&mut rng, AssetId::BASE, TEST_COIN_AMOUNT, None); + let mut context = TextContext::default(); + let mut coin = context.random_predicate(AssetId::BASE, TEST_COIN_AMOUNT, None); if let Input::CoinPredicate(CoinPredicate { owner, .. }) = &mut coin { *owner = Address::zeroed(); } - let (_, gas_coin) = add_coin_to_state(coin, Some(&db.clone())); let tx = TransactionBuilder::script(vec![], vec![]) .script_gas_limit(GAS_LIMIT) - .add_input(gas_coin) + .add_input(coin) .finalize_as_transaction(); - let err = check_tx(tx, db.clone(), &Default::default()) + let err = check_tx(tx, &Default::default()) .await .expect_err("Transaction should be err, got ok"); @@ -1007,8 +997,7 @@ async fn predicates_with_incorrect_owner_fails() { #[tokio::test] async fn predicate_without_enough_gas_returns_out_of_gas() { - let mut rng = StdRng::seed_from_u64(0); - let db = MockDb::default(); + let mut context = TextContext::default(); let mut config = Config::default(); config .chain_config @@ -1020,23 +1009,22 @@ async fn predicate_without_enough_gas_returns_out_of_gas() { .consensus_parameters .tx_params .max_gas_per_tx = 10000; - let coin = custom_predicate( - &mut rng, - AssetId::BASE, - TEST_COIN_AMOUNT, - // forever loop - vec![op::jmp(RegId::ZERO)].into_iter().collect(), - None, - ) - .into_estimated(&config.chain_config.consensus_parameters); + let coin = context + .custom_predicate( + AssetId::BASE, + TEST_COIN_AMOUNT, + // forever loop + vec![op::jmp(RegId::ZERO)].into_iter().collect(), + None, + ) + .into_estimated(&config.chain_config.consensus_parameters); - let (_, gas_coin) = add_coin_to_state(coin, Some(&db.clone())); let tx = TransactionBuilder::script(vec![], vec![]) .script_gas_limit(GAS_LIMIT) - .add_input(gas_coin) + .add_input(coin) .finalize_as_transaction(); - let err = check_tx(tx, db.clone(), &Default::default()) + let err = check_tx(tx, &Default::default()) .await .expect_err("Transaction should be err, got ok"); @@ -1049,25 +1037,23 @@ async fn predicate_without_enough_gas_returns_out_of_gas() { #[tokio::test] async fn predicate_that_returns_false_is_invalid() { - let mut rng = StdRng::seed_from_u64(0); - let db = MockDb::default(); - let coin = custom_predicate( - &mut rng, - AssetId::BASE, - TEST_COIN_AMOUNT, - // forever loop - vec![op::ret(RegId::ZERO)].into_iter().collect(), - None, - ) - .into_default_estimated(); + let mut context = TextContext::default(); + let coin = context + .custom_predicate( + AssetId::BASE, + TEST_COIN_AMOUNT, + // forever loop + vec![op::ret(RegId::ZERO)].into_iter().collect(), + None, + ) + .into_default_estimated(); - let (_, gas_coin) = add_coin_to_state(coin, Some(&db.clone())); let tx = TransactionBuilder::script(vec![], vec![]) .script_gas_limit(GAS_LIMIT) - .add_input(gas_coin) + .add_input(coin) .finalize_as_transaction(); - let err = check_tx(tx, db.clone(), &Default::default()) + let err = check_tx(tx, &Default::default()) .await .expect_err("Transaction should be err, got ok"); diff --git a/crates/storage/src/transactional.rs b/crates/storage/src/transactional.rs index 854557bd117..31b4ac51fe3 100644 --- a/crates/storage/src/transactional.rs +++ b/crates/storage/src/transactional.rs @@ -79,10 +79,13 @@ impl StorageTransaction { /// Provides a view of the storage at the given height. /// It guarantees to be atomic, meaning the view is immutable to outside modifications. -pub trait AtomicView: Send + Sync { +pub trait AtomicView: Send + Sync { + /// The type of the storage view. + type View; + /// Returns the view of the storage at the given `height`. - fn view_at(&self, height: BlockHeight) -> StorageResult; + fn view_at(&self, height: BlockHeight) -> StorageResult; /// Returns the view of the storage for the latest block height. - fn latest_view(&self) -> View; + fn latest_view(&self) -> Self::View; }