From 87c9579fcdc6ac287dbf9e59a1c92c7121be37a1 Mon Sep 17 00:00:00 2001 From: Green Baneling Date: Mon, 14 Oct 2024 23:58:10 +0200 Subject: [PATCH] Avoid long heavy tasks in the GraphQL service (#2340) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR adds chunking to the stream in GraphQL and requests data in batches. The current implementation is simple and executes batches in the same runtime. But at the end of batch fetching, it yields, allowing other tasks to be processed. ## Checklist - [x] New behavior is reflected in tests ### Before requesting review - [x] I have reviewed the code myself --------- Co-authored-by: MÃ¥rten Blankfors --- CHANGELOG.md | 1 + benches/benches/transaction_throughput.rs | 1 + bin/fuel-core/src/cli/run.rs | 1 + bin/fuel-core/src/cli/run/graphql.rs | 4 + crates/fuel-core/src/coins_query.rs | 13 +- crates/fuel-core/src/database/block.rs | 3 +- crates/fuel-core/src/graphql_api.rs | 1 + .../fuel-core/src/graphql_api/api_service.rs | 8 +- crates/fuel-core/src/graphql_api/database.rs | 57 +++--- crates/fuel-core/src/query/balance.rs | 2 + .../src/query/balance/asset_query.rs | 50 ++++-- crates/fuel-core/src/query/block.rs | 3 +- crates/fuel-core/src/query/coin.rs | 30 +++- crates/fuel-core/src/query/message.rs | 37 ++-- crates/fuel-core/src/query/message/test.rs | 13 -- crates/fuel-core/src/query/tx.rs | 29 ++-- crates/fuel-core/src/schema/block.rs | 29 +++- crates/fuel-core/src/schema/tx.rs | 40 ++++- crates/fuel-core/src/schema/tx/types.rs | 2 +- crates/fuel-core/src/service/config.rs | 1 + crates/services/Cargo.toml | 1 + crates/services/src/lib.rs | 1 + crates/services/src/yield_stream.rs | 164 ++++++++++++++++++ tests/test-helpers/src/builder.rs | 9 + tests/tests/dos.rs | 42 ++++- 25 files changed, 430 insertions(+), 112 deletions(-) create mode 100644 crates/services/src/yield_stream.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 197f4381b7f..3d7afae994a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Changed - [2334](https://github.com/FuelLabs/fuel-core/pull/2334): Prepare the GraphQL service for the switching to `async` methods. - [2341](https://github.com/FuelLabs/fuel-core/pull/2341): Updated all pagination queries to work with the async stream instead of the sync iterator. +- [2340](https://github.com/FuelLabs/fuel-core/pull/2340): Avoid long heavy tasks in the GraphQL service by splitting work into batches. - [2350](https://github.com/FuelLabs/fuel-core/pull/2350): Limited the number of threads used by the GraphQL service. #### Breaking diff --git a/benches/benches/transaction_throughput.rs b/benches/benches/transaction_throughput.rs index 23454dda359..5d78818e8da 100644 --- a/benches/benches/transaction_throughput.rs +++ b/benches/benches/transaction_throughput.rs @@ -89,6 +89,7 @@ where test_builder.trigger = Trigger::Never; test_builder.utxo_validation = true; test_builder.gas_limit = Some(10_000_000_000); + test_builder.block_size_limit = Some(1_000_000_000_000); // spin up node let transactions: Vec = diff --git a/bin/fuel-core/src/cli/run.rs b/bin/fuel-core/src/cli/run.rs index 0b50c190bc2..7733e99b9a7 100644 --- a/bin/fuel-core/src/cli/run.rs +++ b/bin/fuel-core/src/cli/run.rs @@ -490,6 +490,7 @@ impl Command { graphql_config: GraphQLConfig { addr, number_of_threads: graphql.graphql_number_of_threads, + database_batch_size: graphql.database_batch_size, max_queries_depth: graphql.graphql_max_depth, max_queries_complexity: graphql.graphql_max_complexity, max_queries_recursive_depth: graphql.graphql_max_recursive_depth, diff --git a/bin/fuel-core/src/cli/run/graphql.rs b/bin/fuel-core/src/cli/run/graphql.rs index 5df36adce03..5816b2ecbc0 100644 --- a/bin/fuel-core/src/cli/run/graphql.rs +++ b/bin/fuel-core/src/cli/run/graphql.rs @@ -18,6 +18,10 @@ pub struct GraphQLArgs { #[clap(long = "graphql-number-of-threads", default_value = "2", env)] pub graphql_number_of_threads: usize, + /// The size of the batch fetched from the database by GraphQL service. + #[clap(long = "graphql-database-batch-size", default_value = "100", env)] + pub database_batch_size: usize, + /// The max depth of GraphQL queries. #[clap(long = "graphql-max-depth", default_value = "16", env)] pub graphql_max_depth: usize, diff --git a/crates/fuel-core/src/coins_query.rs b/crates/fuel-core/src/coins_query.rs index c878c071335..3d7abda01bc 100644 --- a/crates/fuel-core/src/coins_query.rs +++ b/crates/fuel-core/src/coins_query.rs @@ -280,10 +280,7 @@ mod tests { fuel_asm::Word, fuel_tx::*, }; - use futures::{ - StreamExt, - TryStreamExt, - }; + use futures::TryStreamExt; use itertools::Itertools; use rand::{ rngs::StdRng, @@ -988,7 +985,7 @@ mod tests { fn service_database(&self) -> ServiceDatabase { let on_chain = self.database.on_chain().clone(); let off_chain = self.database.off_chain().clone(); - ServiceDatabase::new(0u32.into(), on_chain, off_chain) + ServiceDatabase::new(100, 0u32.into(), on_chain, off_chain) } } @@ -1045,8 +1042,7 @@ mod tests { let query = self.service_database(); let query = query.test_view(); query - .owned_coins_ids(owner, None, IterDirection::Forward) - .map(|res| res.map(|id| query.coin(id).unwrap())) + .owned_coins(owner, None, IterDirection::Forward) .try_collect() .await .unwrap() @@ -1056,8 +1052,7 @@ mod tests { let query = self.service_database(); let query = query.test_view(); query - .owned_message_ids(owner, None, IterDirection::Forward) - .map(|res| res.map(|id| query.message(&id).unwrap())) + .owned_messages(owner, None, IterDirection::Forward) .try_collect() .await .unwrap() diff --git a/crates/fuel-core/src/database/block.rs b/crates/fuel-core/src/database/block.rs index c6295e62017..374a76a9663 100644 --- a/crates/fuel-core/src/database/block.rs +++ b/crates/fuel-core/src/database/block.rs @@ -67,7 +67,8 @@ impl OnChainIterableKeyValueView { let db_block = self.storage::().get(height)?; if let Some(block) = db_block { // fetch all the transactions - // TODO: optimize with multi-key get + // TODO: Use multiget when it's implemented. + // https://github.com/FuelLabs/fuel-core/issues/2344 let txs = block .transactions() .iter() diff --git a/crates/fuel-core/src/graphql_api.rs b/crates/fuel-core/src/graphql_api.rs index b7727a456b9..772bbc815ea 100644 --- a/crates/fuel-core/src/graphql_api.rs +++ b/crates/fuel-core/src/graphql_api.rs @@ -33,6 +33,7 @@ pub struct Config { pub struct ServiceConfig { pub addr: SocketAddr, pub number_of_threads: usize, + pub database_batch_size: usize, pub max_queries_depth: usize, pub max_queries_complexity: usize, pub max_queries_recursive_depth: usize, diff --git a/crates/fuel-core/src/graphql_api/api_service.rs b/crates/fuel-core/src/graphql_api/api_service.rs index 46395118be9..28a714f8b0e 100644 --- a/crates/fuel-core/src/graphql_api/api_service.rs +++ b/crates/fuel-core/src/graphql_api/api_service.rs @@ -236,8 +236,12 @@ where graphql_api::initialize_query_costs(config.config.costs.clone())?; let network_addr = config.config.addr; - let combined_read_database = - ReadDatabase::new(genesis_block_height, on_database, off_database); + let combined_read_database = ReadDatabase::new( + config.config.database_batch_size, + genesis_block_height, + on_database, + off_database, + ); let request_timeout = config.config.api_request_timeout; let concurrency_limit = config.config.max_concurrent_queries; let body_limit = config.config.request_body_bytes_limit; diff --git a/crates/fuel-core/src/graphql_api/database.rs b/crates/fuel-core/src/graphql_api/database.rs index 3f254032b0c..bf47c8d92a7 100644 --- a/crates/fuel-core/src/graphql_api/database.rs +++ b/crates/fuel-core/src/graphql_api/database.rs @@ -5,6 +5,7 @@ use crate::fuel_core_graphql_api::{ OnChainDatabase, }, }; +use fuel_core_services::yield_stream::StreamYieldExt; use fuel_core_storage::{ iter::{ BoxedIter, @@ -77,6 +78,8 @@ pub type OffChainView = Arc; /// The container of the on-chain and off-chain database view provides. /// It is used only by `ViewExtension` to create a [`ReadView`]. pub struct ReadDatabase { + /// The size of the batch during fetching from the database. + batch_size: usize, /// The height of the genesis block. genesis_height: BlockHeight, /// The on-chain database view provider. @@ -88,6 +91,7 @@ pub struct ReadDatabase { impl ReadDatabase { /// Creates a new [`ReadDatabase`] with the given on-chain and off-chain database view providers. pub fn new( + batch_size: usize, genesis_height: BlockHeight, on_chain: OnChain, off_chain: OffChain, @@ -99,6 +103,7 @@ impl ReadDatabase { OffChain::LatestView: OffChainDatabase, { Self { + batch_size, genesis_height, on_chain: Box::new(ArcWrapper::new(on_chain)), off_chain: Box::new(ArcWrapper::new(off_chain)), @@ -111,6 +116,7 @@ impl ReadDatabase { // It is not possible to implement until `view_at` is implemented for the `AtomicView`. // https://github.com/FuelLabs/fuel-core/issues/1582 Ok(ReadView { + batch_size: self.batch_size, genesis_height: self.genesis_height, on_chain: self.on_chain.latest_view()?, off_chain: self.off_chain.latest_view()?, @@ -125,6 +131,7 @@ impl ReadDatabase { #[derive(Clone)] pub struct ReadView { + pub(crate) batch_size: usize, pub(crate) genesis_height: BlockHeight, pub(crate) on_chain: OnChainView, pub(crate) off_chain: OffChainView, @@ -134,7 +141,7 @@ impl ReadView { pub fn transaction(&self, tx_id: &TxId) -> StorageResult { let result = self.on_chain.transaction(tx_id); if result.is_not_found() { - if let Some(tx) = self.old_transaction(tx_id)? { + if let Some(tx) = self.off_chain.old_transaction(tx_id)? { Ok(tx) } else { Err(not_found!(Transactions)) @@ -144,6 +151,21 @@ impl ReadView { } } + pub async fn transactions( + &self, + tx_ids: Vec, + ) -> Vec> { + // TODO: Use multiget when it's implemented. + // https://github.com/FuelLabs/fuel-core/issues/2344 + let result = tx_ids + .iter() + .map(|tx_id| self.transaction(tx_id)) + .collect::>(); + // Give a chance to other tasks to run. + tokio::task::yield_now().await; + result + } + pub fn block(&self, height: &BlockHeight) -> StorageResult { if *height >= self.genesis_height { self.on_chain.block(height) @@ -252,6 +274,7 @@ impl ReadView { direction: IterDirection, ) -> impl Stream> + '_ { futures::stream::iter(self.on_chain.all_messages(start_message_id, direction)) + .yield_each(self.batch_size) } pub fn message_exists(&self, nonce: &Nonce) -> StorageResult { @@ -276,6 +299,7 @@ impl ReadView { start_asset, direction, )) + .yield_each(self.batch_size) } pub fn da_height(&self) -> StorageResult { @@ -316,12 +340,12 @@ impl ReadView { futures::stream::iter(iter) } - pub fn owned_message_ids<'a>( - &'a self, - owner: &'a Address, + pub fn owned_message_ids( + &self, + owner: &Address, start_message_id: Option, direction: IterDirection, - ) -> impl Stream> + 'a { + ) -> impl Stream> + '_ { futures::stream::iter(self.off_chain.owned_message_ids( owner, start_message_id, @@ -345,29 +369,6 @@ impl ReadView { self.off_chain.contract_salt(contract_id) } - pub fn old_block(&self, height: &BlockHeight) -> StorageResult { - self.off_chain.old_block(height) - } - - pub fn old_blocks( - &self, - height: Option, - direction: IterDirection, - ) -> BoxedIter<'_, StorageResult> { - self.off_chain.old_blocks(height, direction) - } - - pub fn old_block_consensus(&self, height: &BlockHeight) -> StorageResult { - self.off_chain.old_block_consensus(height) - } - - pub fn old_transaction( - &self, - id: &TxId, - ) -> StorageResult> { - self.off_chain.old_transaction(id) - } - pub fn relayed_tx_status( &self, id: Bytes32, diff --git a/crates/fuel-core/src/query/balance.rs b/crates/fuel-core/src/query/balance.rs index 1a715b74522..161fd64b87e 100644 --- a/crates/fuel-core/src/query/balance.rs +++ b/crates/fuel-core/src/query/balance.rs @@ -4,6 +4,7 @@ use asset_query::{ AssetSpendTarget, AssetsQuery, }; +use fuel_core_services::yield_stream::StreamYieldExt; use fuel_core_storage::{ iter::IterDirection, Result as StorageResult, @@ -106,5 +107,6 @@ impl ReadView { }) .map_ok(|stream| stream.map(Ok)) .try_flatten() + .yield_each(self.batch_size) } } diff --git a/crates/fuel-core/src/query/balance/asset_query.rs b/crates/fuel-core/src/query/balance/asset_query.rs index 5482f6b5272..13a289ec1e4 100644 --- a/crates/fuel-core/src/query/balance/asset_query.rs +++ b/crates/fuel-core/src/query/balance/asset_query.rs @@ -15,7 +15,10 @@ use fuel_core_types::{ AssetId, }, }; -use futures::Stream; +use futures::{ + Stream, + TryStreamExt, +}; use std::collections::HashSet; use tokio_stream::StreamExt; @@ -78,7 +81,9 @@ impl<'a> AssetsQuery<'a> { fn coins_iter(mut self) -> impl Stream> + 'a { let allowed_assets = self.allowed_assets.take(); - self.database + let database = self.database; + let stream = self + .database .owned_coins_ids(self.owner, None, IterDirection::Forward) .map(|id| id.map(CoinId::from)) .filter(move |result| { @@ -99,12 +104,25 @@ impl<'a> AssetsQuery<'a> { } else { return Err(anyhow::anyhow!("The coin is not UTXO").into()); }; - // TODO: Fetch coin in a separate thread - let coin = self.database.coin(id)?; - - Ok(CoinType::Coin(coin)) + Ok(id) }) + }); + + futures::stream::StreamExt::chunks(stream, database.batch_size) + .map(|chunk| { + use itertools::Itertools; + + let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?; + Ok::<_, StorageError>(chunk) }) + .try_filter_map(move |chunk| async move { + let chunk = database + .coins(chunk) + .await + .map(|result| result.map(CoinType::Coin)); + Ok(Some(futures::stream::iter(chunk))) + }) + .try_flatten() .filter(move |result| { if let Ok(CoinType::Coin(coin)) = result { allowed_asset(&allowed_assets, &coin.asset_id) @@ -117,7 +135,8 @@ impl<'a> AssetsQuery<'a> { fn messages_iter(&self) -> impl Stream> + 'a { let exclude = self.exclude; let database = self.database; - self.database + let stream = self + .database .owned_message_ids(self.owner, None, IterDirection::Forward) .map(|id| id.map(CoinId::from)) .filter(move |result| { @@ -138,11 +157,22 @@ impl<'a> AssetsQuery<'a> { } else { return Err(anyhow::anyhow!("The coin is not a message").into()); }; - // TODO: Fetch message in a separate thread (https://github.com/FuelLabs/fuel-core/pull/2340) - let message = database.message(&id)?; - Ok(message) + Ok(id) }) + }); + + futures::stream::StreamExt::chunks(stream, database.batch_size) + .map(|chunk| { + use itertools::Itertools; + + let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?; + Ok(chunk) + }) + .try_filter_map(move |chunk| async move { + let chunk = database.messages(chunk).await; + Ok::<_, StorageError>(Some(futures::stream::iter(chunk))) }) + .try_flatten() .filter(|result| { if let Ok(message) = result { message.data().is_empty() diff --git a/crates/fuel-core/src/query/block.rs b/crates/fuel-core/src/query/block.rs index 3b725ab8b49..f4b461639f0 100644 --- a/crates/fuel-core/src/query/block.rs +++ b/crates/fuel-core/src/query/block.rs @@ -1,4 +1,5 @@ use crate::fuel_core_graphql_api::database::ReadView; +use fuel_core_services::yield_stream::StreamYieldExt; use fuel_core_storage::{ iter::IterDirection, Result as StorageResult, @@ -23,6 +24,6 @@ impl ReadView { height: Option, direction: IterDirection, ) -> impl Stream> + '_ { - futures::stream::iter(self.blocks(height, direction)) + futures::stream::iter(self.blocks(height, direction)).yield_each(self.batch_size) } } diff --git a/crates/fuel-core/src/query/coin.rs b/crates/fuel-core/src/query/coin.rs index 94a2fbafe54..c487bdba23c 100644 --- a/crates/fuel-core/src/query/coin.rs +++ b/crates/fuel-core/src/query/coin.rs @@ -3,6 +3,7 @@ use fuel_core_storage::{ iter::IterDirection, not_found, tables::Coins, + Error as StorageError, Result as StorageResult, StorageAsRef, }; @@ -14,6 +15,7 @@ use fuel_core_types::{ use futures::{ Stream, StreamExt, + TryStreamExt, }; impl ReadView { @@ -29,6 +31,18 @@ impl ReadView { Ok(coin.uncompress(utxo_id)) } + pub async fn coins( + &self, + utxo_ids: Vec, + ) -> impl Iterator> + '_ { + // TODO: Use multiget when it's implemented. + // https://github.com/FuelLabs/fuel-core/issues/2344 + let coins = utxo_ids.into_iter().map(|id| self.coin(id)); + // Give a chance to other tasks to run. + tokio::task::yield_now().await; + coins + } + pub fn owned_coins( &self, owner: &Address, @@ -36,11 +50,17 @@ impl ReadView { direction: IterDirection, ) -> impl Stream> + '_ { self.owned_coins_ids(owner, start_coin, direction) - .map(|res| { - res.and_then(|id| { - // TODO: Move fetching of the coin to a separate thread (https://github.com/FuelLabs/fuel-core/pull/2340) - self.coin(id) - }) + .chunks(self.batch_size) + .map(|chunk| { + use itertools::Itertools; + + let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?; + Ok::<_, StorageError>(chunk) + }) + .try_filter_map(move |chunk| async move { + let chunk = self.coins(chunk).await; + Ok(Some(futures::stream::iter(chunk))) }) + .try_flatten() } } diff --git a/crates/fuel-core/src/query/message.rs b/crates/fuel-core/src/query/message.rs index 56c83431ee5..c98b2358b2c 100644 --- a/crates/fuel-core/src/query/message.rs +++ b/crates/fuel-core/src/query/message.rs @@ -25,7 +25,6 @@ use fuel_core_types::{ fuel_tx::{ input::message::compute_message_id, Receipt, - Transaction, TxId, }, fuel_types::{ @@ -40,6 +39,7 @@ use fuel_core_types::{ use futures::{ Stream, StreamExt, + TryStreamExt, }; use itertools::Itertools; use std::borrow::Cow; @@ -81,6 +81,18 @@ impl ReadView { .map(Cow::into_owned) } + pub async fn messages( + &self, + ids: Vec, + ) -> impl Iterator> + '_ { + // TODO: Use multiget when it's implemented. + // https://github.com/FuelLabs/fuel-core/issues/2344 + let messages = ids.into_iter().map(|id| self.message(&id)); + // Give a chance to other tasks to run. + tokio::task::yield_now().await; + messages + } + pub fn owned_messages<'a>( &'a self, owner: &'a Address, @@ -88,12 +100,16 @@ impl ReadView { direction: IterDirection, ) -> impl Stream> + 'a { self.owned_message_ids(owner, start_message_id, direction) - .map(|result| { - result.and_then(|id| { - // TODO: Move `message` fetching to a separate thread (https://github.com/FuelLabs/fuel-core/pull/2340) - self.message(&id) - }) + .chunks(self.batch_size) + .map(|chunk| { + let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?; + Ok(chunk) + }) + .try_filter_map(move |chunk| async move { + let chunk = self.messages(chunk).await; + Ok::<_, StorageError>(Some(futures::stream::iter(chunk))) }) + .try_flatten() } } @@ -102,9 +118,6 @@ pub trait MessageProofData { /// Get the block. fn block(&self, id: &BlockHeight) -> StorageResult; - /// Get the transaction. - fn transaction(&self, transaction_id: &TxId) -> StorageResult; - /// Return all receipts in the given transaction. fn receipts(&self, transaction_id: &TxId) -> StorageResult>; @@ -128,10 +141,6 @@ impl MessageProofData for ReadView { self.block(id) } - fn transaction(&self, transaction_id: &TxId) -> StorageResult { - self.transaction(transaction_id) - } - fn receipts(&self, transaction_id: &TxId) -> StorageResult> { self.receipts(transaction_id) } @@ -140,7 +149,7 @@ impl MessageProofData for ReadView { &self, transaction_id: &TxId, ) -> StorageResult { - self.status(transaction_id) + self.tx_status(transaction_id) } fn block_history_proof( diff --git a/crates/fuel-core/src/query/message/test.rs b/crates/fuel-core/src/query/message/test.rs index d49f173370b..43d6ecbef1a 100644 --- a/crates/fuel-core/src/query/message/test.rs +++ b/crates/fuel-core/src/query/message/test.rs @@ -10,8 +10,6 @@ use fuel_core_types::{ fuel_tx::{ AssetId, ContractId, - Script, - Transaction, }, fuel_types::BlockHeight, tai64::Tai64, @@ -64,7 +62,6 @@ mockall::mock! { message_block_height: &BlockHeight, commit_block_height: &BlockHeight, ) -> StorageResult; - fn transaction(&self, transaction_id: &TxId) -> StorageResult; fn receipts(&self, transaction_id: &TxId) -> StorageResult>; fn transaction_status(&self, transaction_id: &TxId) -> StorageResult; } @@ -107,16 +104,6 @@ async fn can_build_message_proof() { } }); - data.expect_transaction().returning(move |txn_id| { - let tx = TXNS - .iter() - .find(|t| *t == txn_id) - .map(|_| Script::default().into()) - .ok_or(not_found!("Transaction in `TXNS`"))?; - - Ok(tx) - }); - let commit_block_header = PartialBlockHeader { application: ApplicationHeader { da_height: 0u64.into(), diff --git a/crates/fuel-core/src/query/tx.rs b/crates/fuel-core/src/query/tx.rs index 6898e9155a4..0bceeef8809 100644 --- a/crates/fuel-core/src/query/tx.rs +++ b/crates/fuel-core/src/query/tx.rs @@ -3,6 +3,7 @@ use fuel_core_storage::{ iter::IterDirection, not_found, tables::Transactions, + Error as StorageError, Result as StorageResult, }; use fuel_core_types::{ @@ -18,11 +19,12 @@ use fuel_core_types::{ use futures::{ Stream, StreamExt, + TryStreamExt, }; impl ReadView { pub fn receipts(&self, tx_id: &TxId) -> StorageResult> { - let status = self.status(tx_id)?; + let status = self.tx_status(tx_id)?; let receipts = match status { TransactionStatus::Success { receipts, .. } @@ -32,10 +34,6 @@ impl ReadView { receipts.ok_or(not_found!(Transactions)) } - pub fn status(&self, tx_id: &TxId) -> StorageResult { - self.tx_status(tx_id) - } - pub fn owned_transactions( &self, owner: Address, @@ -43,13 +41,22 @@ impl ReadView { direction: IterDirection, ) -> impl Stream> + '_ { self.owned_transactions_ids(owner, start, direction) - .map(|result| { - result.and_then(|(tx_pointer, tx_id)| { - // TODO: Fetch transactions in a separate thread (https://github.com/FuelLabs/fuel-core/pull/2340) - let tx = self.transaction(&tx_id)?; + .chunks(self.batch_size) + .map(|chunk| { + use itertools::Itertools; - Ok((tx_pointer, tx)) - }) + let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?; + Ok::<_, StorageError>(chunk) + }) + .try_filter_map(move |chunk| async move { + let tx_ids = chunk.iter().map(|(_, tx_id)| *tx_id).collect::>(); + let txs = self.transactions(tx_ids).await; + let txs = txs + .into_iter() + .zip(chunk) + .map(|(result, (tx_pointer, _))| result.map(|tx| (tx_pointer, tx))); + Ok(Some(futures::stream::iter(txs))) }) + .try_flatten() } } diff --git a/crates/fuel-core/src/schema/block.rs b/crates/fuel-core/src/schema/block.rs index 335eb66ed92..7cdfeff2d5b 100644 --- a/crates/fuel-core/src/schema/block.rs +++ b/crates/fuel-core/src/schema/block.rs @@ -44,12 +44,14 @@ use fuel_core_types::{ block::CompressedBlock, header::BlockHeader, }, + fuel_tx::TxId, fuel_types, fuel_types::BlockHeight, }; use futures::{ Stream, StreamExt, + TryStreamExt, }; pub struct Block(pub(crate) CompressedBlock); @@ -135,14 +137,27 @@ impl Block { ctx: &Context<'_>, ) -> async_graphql::Result> { let query = ctx.read_view()?; - self.0 - .transactions() - .iter() - .map(|tx_id| { - let tx = query.transaction(tx_id)?; - Ok(Transaction::from_tx(*tx_id, tx)) + let tx_ids = futures::stream::iter(self.0.transactions().iter().copied()); + + let result = tx_ids + .chunks(query.batch_size) + .filter_map(move |tx_ids: Vec| { + let async_query = query.as_ref().clone(); + async move { + let txs = async_query.transactions(tx_ids.clone()).await; + let txs = txs + .into_iter() + .zip(tx_ids.into_iter()) + .map(|(r, tx_id)| r.map(|tx| Transaction::from_tx(tx_id, tx))); + + Some(futures::stream::iter(txs)) + } }) - .collect() + .flatten() + .try_collect() + .await?; + + Ok(result) } } diff --git a/crates/fuel-core/src/schema/tx.rs b/crates/fuel-core/src/schema/tx.rs index 9ea589771d7..8ee21edf972 100644 --- a/crates/fuel-core/src/schema/tx.rs +++ b/crates/fuel-core/src/schema/tx.rs @@ -71,7 +71,6 @@ use std::{ borrow::Cow, iter, }; -use tokio_stream::StreamExt; use types::{ DryRunTransactionExecutionStatus, Transaction, @@ -123,7 +122,9 @@ impl TxQuery { ) -> async_graphql::Result< Connection, > { + use futures::stream::StreamExt; let query = ctx.read_view()?; + let query_ref = query.as_ref(); crate::schema::query_pagination( after, before, @@ -156,16 +157,34 @@ impl TxQuery { false }; - async move { Ok(skip) } + async move { Ok::<_, StorageError>(skip) } }) - .map(|result: StorageResult| { - result.and_then(|sorted| { - // TODO: Request transactions in a separate thread - let tx = query.transaction(&sorted.tx_id.0)?; + .chunks(query_ref.batch_size) + .map(|chunk| { + use itertools::Itertools; - Ok((sorted, Transaction::from_tx(sorted.tx_id.0, tx))) - }) - }); + let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?; + Ok::<_, StorageError>(chunk) + }) + .try_filter_map(move |chunk| { + let async_query = query_ref.clone(); + async move { + let tx_ids = chunk + .iter() + .map(|sorted| sorted.tx_id.0) + .collect::>(); + let txs = async_query.transactions(tx_ids).await; + let txs = txs.into_iter().zip(chunk.into_iter()).map( + |(result, sorted)| { + result.map(|tx| { + (sorted, Transaction::from_tx(sorted.tx_id.0, tx)) + }) + }, + ); + Ok(Some(futures::stream::iter(txs))) + } + }) + .try_flatten(); Ok(all_txs) }, @@ -188,6 +207,7 @@ impl TxQuery { before: Option, ) -> async_graphql::Result> { + use futures::stream::StreamExt; let query = ctx.read_view()?; let params = ctx .data_unchecked::() @@ -382,6 +402,7 @@ impl TxStatusSubscription { ) -> async_graphql::Result< impl Stream> + 'a, > { + use tokio_stream::StreamExt; let subscription = submit_and_await_status(ctx, tx).await?; Ok(subscription @@ -410,6 +431,7 @@ async fn submit_and_await_status<'a>( ) -> async_graphql::Result< impl Stream> + 'a, > { + use tokio_stream::StreamExt; let txpool = ctx.data_unchecked::(); let params = ctx .data_unchecked::() diff --git a/crates/fuel-core/src/schema/tx/types.rs b/crates/fuel-core/src/schema/tx/types.rs index d237bb817ae..effbc463d0c 100644 --- a/crates/fuel-core/src/schema/tx/types.rs +++ b/crates/fuel-core/src/schema/tx/types.rs @@ -987,7 +987,7 @@ pub(crate) async fn get_tx_status( txpool: &TxPool, ) -> Result, StorageError> { match query - .status(&id) + .tx_status(&id) .into_api_result::()? { Some(status) => { diff --git a/crates/fuel-core/src/service/config.rs b/crates/fuel-core/src/service/config.rs index f3f3d1ca614..1e242c75e03 100644 --- a/crates/fuel-core/src/service/config.rs +++ b/crates/fuel-core/src/service/config.rs @@ -139,6 +139,7 @@ impl Config { 0, ), number_of_threads: 0, + database_batch_size: 100, max_queries_depth: 16, max_queries_complexity: 80000, max_queries_recursive_depth: 16, diff --git a/crates/services/Cargo.toml b/crates/services/Cargo.toml index 345bcd64287..34d0726cdca 100644 --- a/crates/services/Cargo.toml +++ b/crates/services/Cargo.toml @@ -15,6 +15,7 @@ async-trait = { workspace = true } fuel-core-metrics = { workspace = true } futures = { workspace = true } parking_lot = { workspace = true } +pin-project-lite = { workspace = true } rayon = { workspace = true, optional = true } tokio = { workspace = true, features = ["full"] } tracing = { workspace = true } diff --git a/crates/services/src/lib.rs b/crates/services/src/lib.rs index 10da0bc75f7..e7fa438631f 100644 --- a/crates/services/src/lib.rs +++ b/crates/services/src/lib.rs @@ -11,6 +11,7 @@ mod state; mod sync; #[cfg(feature = "sync-processor")] mod sync_processor; +pub mod yield_stream; /// Re-exports for streaming utilities pub mod stream { diff --git a/crates/services/src/yield_stream.rs b/crates/services/src/yield_stream.rs new file mode 100644 index 00000000000..60331dbf119 --- /dev/null +++ b/crates/services/src/yield_stream.rs @@ -0,0 +1,164 @@ +//! Stream that yields each `batch_size` items allowing other tasks to work. + +use futures::{ + ready, + stream::Fuse, + Stream, + StreamExt, +}; +use std::{ + pin::Pin, + task::{ + Context, + Poll, + }, +}; + +pin_project_lite::pin_project! { + /// Stream that yields each `batch_size` items. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct YieldStream { + #[pin] + stream: Fuse, + item: Option, + counter: usize, + batch_size: usize, + } +} + +impl YieldStream { + /// Create a new `YieldStream` with the given `batch_size`. + pub fn new(stream: St, batch_size: usize) -> Self { + assert!(batch_size > 0); + + Self { + stream: stream.fuse(), + item: None, + counter: 0, + batch_size, + } + } +} + +impl Stream for YieldStream { + type Item = St::Item; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let mut this = self.as_mut().project(); + + // If we have a cached item, return it because that means we were woken up. + if let Some(item) = this.item.take() { + *this.counter = 1; + return Poll::Ready(Some(item)); + } + + match ready!(this.stream.as_mut().poll_next(cx)) { + // Return items, unless we reached the batch size. + // after that, we want to yield before returning the next item. + Some(item) => { + if this.counter < this.batch_size { + *this.counter = this.counter.saturating_add(1); + + Poll::Ready(Some(item)) + } else { + *this.item = Some(item); + + cx.waker().wake_by_ref(); + + Poll::Pending + } + } + + // Underlying stream ran out of values, so finish this stream as well. + None => Poll::Ready(None), + } + } + + fn size_hint(&self) -> (usize, Option) { + let cached_len = usize::from(self.item.is_some()); + let (lower, upper) = self.stream.size_hint(); + let lower = lower.saturating_add(cached_len); + let upper = match upper { + Some(x) => x.checked_add(cached_len), + None => None, + }; + (lower, upper) + } +} + +/// Extension trait for `Stream`. +pub trait StreamYieldExt: Stream { + /// Yields each `batch_size` items allowing other tasks to work. + fn yield_each(self, batch_size: usize) -> YieldStream + where + Self: Sized, + { + YieldStream::new(self, batch_size) + } +} + +impl StreamYieldExt for St where St: Stream {} + +#[cfg(test)] +#[allow(non_snake_case)] +mod tests { + use super::*; + + #[tokio::test] + async fn yield_stream__works_with_10_elements_loop() { + let stream = futures::stream::iter(0..10); + let mut yield_stream = YieldStream::new(stream, 3); + + let mut items = Vec::new(); + while let Some(item) = yield_stream.next().await { + items.push(item); + } + + assert_eq!(items, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + } + + #[tokio::test] + async fn yield_stream__works_with_10_elements__collect() { + let stream = futures::stream::iter(0..10); + let yield_stream = stream.yield_each(3); + + let items = yield_stream.collect::>().await; + + assert_eq!(items, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + } + + #[tokio::test] + async fn yield_stream__passed_control_to_another_future() { + let stream = futures::stream::iter(0..10); + let mut yield_stream = YieldStream::new(stream, 3); + + async fn second_future() -> i32 { + -1 + } + + let mut items = Vec::new(); + loop { + tokio::select! { + biased; + + item = yield_stream.next() => { + if let Some(item) = item { + items.push(item); + } else { + break; + } + } + + item = second_future() => { + items.push(item); + } + } + } + + assert_eq!(items, vec![0, 1, 2, -1, 3, 4, 5, -1, 6, 7, 8, -1, 9]); + } +} diff --git a/tests/test-helpers/src/builder.rs b/tests/test-helpers/src/builder.rs index 91134ceb7c3..0d16bc48dc6 100644 --- a/tests/test-helpers/src/builder.rs +++ b/tests/test-helpers/src/builder.rs @@ -93,6 +93,7 @@ pub struct TestSetupBuilder { pub initial_coins: Vec, pub starting_gas_price: u64, pub gas_limit: Option, + pub block_size_limit: Option, pub starting_block: Option, pub utxo_validation: bool, pub privileged_address: Address, @@ -201,6 +202,13 @@ impl TestSetupBuilder { .set_block_gas_limit(gas_limit); } + if let Some(block_size_limit) = self.block_size_limit { + chain_conf + .consensus_parameters + .set_block_transaction_size_limit(block_size_limit) + .expect("Should set new block size limit"); + } + chain_conf .consensus_parameters .set_privileged_address(self.privileged_address); @@ -251,6 +259,7 @@ impl Default for TestSetupBuilder { initial_coins: vec![], starting_gas_price: 0, gas_limit: None, + block_size_limit: None, starting_block: None, utxo_validation: true, privileged_address: Default::default(), diff --git a/tests/tests/dos.rs b/tests/tests/dos.rs index ca4295024fc..05c0e0fc030 100644 --- a/tests/tests/dos.rs +++ b/tests/tests/dos.rs @@ -1,6 +1,9 @@ #![allow(non_snake_case)] -use std::time::Instant; +use std::time::{ + Duration, + Instant, +}; use fuel_core::service::{ Config, @@ -681,3 +684,40 @@ async fn schema_is_retrievable() { let result = send_graph_ql_query(&url, query).await; assert!(result.contains("__schema"), "{:?}", result); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 8)] +async fn heavy_tasks_doesnt_block_graphql() { + let mut config = Config::local_node(); + + const NUM_OF_BLOCKS: u32 = 4000; + config.graphql_config.max_queries_complexity = 10_000_000; + + let query = FULL_BLOCK_QUERY.to_string(); + let query = query.replace("$NUMBER_OF_BLOCKS", NUM_OF_BLOCKS.to_string().as_str()); + + let node = FuelService::new_node(config).await.unwrap(); + let url = format!("http://{}/v1/graphql", node.bound_address); + let client = FuelClient::new(url.clone()).unwrap(); + client.produce_blocks(NUM_OF_BLOCKS, None).await.unwrap(); + + // Given + for _ in 0..50 { + let url = url.clone(); + let query = query.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(20)).await; + let result = send_graph_ql_query(&url, &query).await; + assert!(result.contains("transactions")); + }); + } + // Wait for all queries to start be processed on the node. + tokio::time::sleep(Duration::from_secs(1)).await; + + // When + let result = tokio::time::timeout(Duration::from_secs(5), client.health()).await; + + // Then + let result = result.expect("Health check timed out"); + let health = result.expect("Health check failed"); + assert!(health); +}