Skip to content

Commit

Permalink
Use AtomicView in the TxPool (#1590)
Browse files Browse the repository at this point in the history
The change is related to
#1589.

The idea of the change is to start using the `AtomicView` inside of the
`TxPool` to generate consistent database representation during the
insertion of the transactions.
  • Loading branch information
xgreenx authored Jan 19, 2024
1 parent 6e7c7bf commit 7de49ae
Show file tree
Hide file tree
Showing 20 changed files with 711 additions and 576 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Description of the upcoming release here.
### Changed

- [#1591](https://github.com/FuelLabs/fuel-core/pull/1591): Simplify libp2p dependencies and not depend on all sub modules directly.
- [#1590](https://github.com/FuelLabs/fuel-core/pull/1590): Use `AtomicView` in the `TxPool` to read the state of the database during insertion of the transactions.
- [#1587](https://github.com/FuelLabs/fuel-core/pull/1587): Use `BlockHeight` as a primary key for the `FuelsBlock` table.
- [#1585](https://github.com/FuelLabs/fuel-core/pull/1585): Let `NetworkBehaviour` macro generate `FuelBehaviorEvent` in p2p
- [#1579](https://github.com/FuelLabs/fuel-core/pull/1579): The change extracts the off-chain-related logic from the executor and moves it to the GraphQL off-chain worker. It creates two new concepts - Off-chain and On-chain databases where the GraphQL worker has exclusive ownership of the database and may modify it without intersecting with the On-chain database.
Expand Down
12 changes: 6 additions & 6 deletions crates/fuel-core/src/graphql_api/api_service.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use crate::{
fuel_core_graphql_api::{
database::{
OffChainView,
OnChainView,
},
metrics_extension::MetricsExtension,
ports::{
BlockProducerPort,
ConsensusModulePort,
OffChainDatabase,
OnChainDatabase,
P2pPort,
TxPoolPort,
},
Expand Down Expand Up @@ -178,8 +176,10 @@ pub fn new_service<OnChain, OffChain>(
request_timeout: Duration,
) -> anyhow::Result<Service>
where
OnChain: AtomicView<OnChainView> + 'static,
OffChain: AtomicView<OffChainView> + 'static,
OnChain: AtomicView + 'static,
OffChain: AtomicView + 'static,
OnChain::View: OnChainDatabase,
OffChain::View: OffChainDatabase,
{
let network_addr = config.addr;
let combined_read_database = ReadDatabase::new(on_database, off_database);
Expand Down
35 changes: 21 additions & 14 deletions crates/fuel-core/src/graphql_api/database.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
use crate::fuel_core_graphql_api::ports::{
DatabaseBlocks,
DatabaseChain,
DatabaseContracts,
DatabaseMessageProof,
DatabaseMessages,
OffChainDatabase,
OnChainDatabase,
mod arc_wrapper;

use crate::fuel_core_graphql_api::{
database::arc_wrapper::ArcWrapper,
ports::{
DatabaseBlocks,
DatabaseChain,
DatabaseContracts,
DatabaseMessageProof,
DatabaseMessages,
OffChainDatabase,
OnChainDatabase,
},
};
use fuel_core_storage::{
iter::{
Expand Down Expand Up @@ -64,21 +69,23 @@ pub type OffChainView = Arc<dyn OffChainDatabase>;
/// It is used only by `ViewExtension` to create a [`ReadView`].
pub struct ReadDatabase {
/// The on-chain database view provider.
on_chain: Box<dyn AtomicView<OnChainView>>,
on_chain: Box<dyn AtomicView<View = OnChainView>>,
/// The off-chain database view provider.
off_chain: Box<dyn AtomicView<OffChainView>>,
off_chain: Box<dyn AtomicView<View = OffChainView>>,
}

impl ReadDatabase {
/// Creates a new [`ReadDatabase`] with the given on-chain and off-chain database view providers.
pub fn new<OnChain, OffChain>(on_chain: OnChain, off_chain: OffChain) -> Self
where
OnChain: AtomicView<OnChainView> + 'static,
OffChain: AtomicView<OffChainView> + 'static,
OnChain: AtomicView + 'static,
OffChain: AtomicView + 'static,
OnChain::View: OnChainDatabase,
OffChain::View: OffChainDatabase,
{
Self {
on_chain: Box::new(on_chain),
off_chain: Box::new(off_chain),
on_chain: Box::new(ArcWrapper::new(on_chain)),
off_chain: Box::new(ArcWrapper::new(off_chain)),
}
}

Expand Down
66 changes: 66 additions & 0 deletions crates/fuel-core/src/graphql_api/database/arc_wrapper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use crate::fuel_core_graphql_api::{
database::{
OffChainView,
OnChainView,
},
ports::{
OffChainDatabase,
OnChainDatabase,
},
};
use fuel_core_storage::{
transactional::AtomicView,
Result as StorageResult,
};
use fuel_core_types::fuel_types::BlockHeight;
use std::sync::Arc;

/// The GraphQL can't work with the generics in [`async_graphql::Context::data_unchecked`] and requires a known type.
/// It is an `Arc` wrapper around the generic for on-chain and off-chain databases.
pub struct ArcWrapper<Provider, ArcView> {
inner: Provider,
_marker: core::marker::PhantomData<ArcView>,
}

impl<Provider, ArcView> ArcWrapper<Provider, ArcView> {
pub fn new(inner: Provider) -> Self {
Self {
inner,
_marker: core::marker::PhantomData,
}
}
}

impl<Provider, View> AtomicView for ArcWrapper<Provider, OnChainView>
where
Provider: AtomicView<View = View>,
View: OnChainDatabase + 'static,
{
type View = OnChainView;

fn view_at(&self, height: BlockHeight) -> StorageResult<Self::View> {
let view = self.inner.view_at(height)?;
Ok(Arc::new(view))
}

fn latest_view(&self) -> Self::View {
Arc::new(self.inner.latest_view())
}
}

impl<Provider, View> AtomicView for ArcWrapper<Provider, OffChainView>
where
Provider: AtomicView<View = View>,
View: OffChainDatabase + 'static,
{
type View = OffChainView;

fn view_at(&self, height: BlockHeight) -> StorageResult<Self::View> {
let view = self.inner.view_at(height)?;
Ok(Arc::new(view))
}

fn latest_view(&self) -> Self::View {
Arc::new(self.inner.latest_view())
}
}
20 changes: 19 additions & 1 deletion crates/fuel-core/src/service/adapters/graphql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use crate::{
};
use async_trait::async_trait;
use fuel_core_services::stream::BoxStream;
use fuel_core_storage::Result as StorageResult;
use fuel_core_storage::{
transactional::AtomicView,
Result as StorageResult,
};
use fuel_core_txpool::{
service::TxStatusMessage,
types::TxId,
Expand Down Expand Up @@ -145,3 +148,18 @@ impl worker::BlockImporter for BlockImporterAdapter {
)
}
}

impl AtomicView for Database {
type View = Database;

fn view_at(&self, _: BlockHeight) -> StorageResult<Self::View> {
unimplemented!(
"Unimplemented until of the https://github.com/FuelLabs/fuel-core/issues/451"
)
}

fn latest_view(&self) -> Self::View {
// TODO: https://github.com/FuelLabs/fuel-core/issues/1581
self.clone()
}
}
24 changes: 3 additions & 21 deletions crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@ use crate::{
transactions::OwnedTransactionIndexCursor,
Database,
},
fuel_core_graphql_api::{
database::OffChainView,
ports::{
worker,
OffChainDatabase,
},
fuel_core_graphql_api::ports::{
worker,
OffChainDatabase,
},
};
use fuel_core_storage::{
Expand All @@ -18,7 +15,6 @@ use fuel_core_storage::{
IterDirection,
},
not_found,
transactional::AtomicView,
Error as StorageError,
Result as StorageResult,
};
Expand All @@ -36,7 +32,6 @@ use fuel_core_types::{
},
services::txpool::TransactionStatus,
};
use std::sync::Arc;

impl OffChainDatabase for Database {
fn owned_message_ids(
Expand Down Expand Up @@ -83,19 +78,6 @@ impl OffChainDatabase for Database {
}
}

impl AtomicView<OffChainView> for Database {
fn view_at(&self, _: BlockHeight) -> StorageResult<OffChainView> {
unimplemented!(
"Unimplemented until of the https://github.com/FuelLabs/fuel-core/issues/451"
)
}

fn latest_view(&self) -> OffChainView {
// TODO: https://github.com/FuelLabs/fuel-core/issues/1581
Arc::new(self.clone())
}
}

impl worker::OffChainDatabase for Database {
fn record_tx_id_owner(
&mut self,
Expand Down
30 changes: 6 additions & 24 deletions crates/fuel-core/src/service/adapters/graphql_api/on_chain.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use crate::{
database::Database,
fuel_core_graphql_api::{
database::OnChainView,
ports::{
DatabaseBlocks,
DatabaseChain,
DatabaseContracts,
DatabaseMessages,
OnChainDatabase,
},
fuel_core_graphql_api::ports::{
DatabaseBlocks,
DatabaseChain,
DatabaseContracts,
DatabaseMessages,
OnChainDatabase,
},
};
use fuel_core_importer::ports::ImporterDatabase;
Expand All @@ -20,7 +17,6 @@ use fuel_core_storage::{
},
not_found,
tables::FuelBlocks,
transactional::AtomicView,
Error as StorageError,
Result as StorageResult,
};
Expand All @@ -41,7 +37,6 @@ use fuel_core_types::{
},
services::graphql_api::ContractBalance,
};
use std::sync::Arc;

impl DatabaseBlocks for Database {
fn block_height(&self, id: &BlockId) -> StorageResult<BlockHeight> {
Expand Down Expand Up @@ -130,16 +125,3 @@ impl DatabaseChain for Database {
}

impl OnChainDatabase for Database {}

impl AtomicView<OnChainView> for Database {
fn view_at(&self, _: BlockHeight) -> StorageResult<OnChainView> {
unimplemented!(
"Unimplemented until of the https://github.com/FuelLabs/fuel-core/issues/451"
)
}

fn latest_view(&self) -> OnChainView {
// TODO: https://github.com/FuelLabs/fuel-core/issues/1581
Arc::new(self.clone())
}
}
5 changes: 0 additions & 5 deletions crates/fuel-core/src/service/adapters/txpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use fuel_core_types::{
UtxoId,
},
fuel_types::{
BlockHeight,
ContractId,
Nonce,
},
Expand Down Expand Up @@ -139,8 +138,4 @@ impl fuel_core_txpool::ports::TxPoolDb for Database {
fn is_message_spent(&self, id: &Nonce) -> StorageResult<bool> {
self.storage::<SpentMessages>().contains_key(id)
}

fn current_block_height(&self) -> StorageResult<BlockHeight> {
self.latest_height()
}
}
2 changes: 2 additions & 0 deletions crates/fuel-core/src/service/sub_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub fn init_sub_services(
let last_block = database.get_current_block()?.ok_or(anyhow::anyhow!(
"The blockchain is not initialized with any block"
))?;
let last_height = *last_block.header().height();
#[cfg(feature = "relayer")]
let relayer_service = if let Some(config) = &config.relayer {
Some(fuel_core_relayer::new_service(
Expand Down Expand Up @@ -140,6 +141,7 @@ pub fn init_sub_services(
database.clone(),
importer_adapter.clone(),
p2p_adapter.clone(),
last_height,
);
let tx_pool_adapter = TxPoolAdapter::new(txpool.shared.clone());

Expand Down
1 change: 1 addition & 0 deletions crates/services/txpool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ tracing = { workspace = true }
[dev-dependencies]
fuel-core-trace = { path = "./../../trace" }
fuel-core-txpool = { path = "", features = ["test-helpers"] }
fuel-core-types = { path = "../../types", features = ["test-helpers"] }
itertools = { workspace = true }
mockall = { workspace = true }
proptest = { workspace = true }
Expand Down
19 changes: 16 additions & 3 deletions crates/services/txpool/src/mock_db.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use crate::ports::TxPoolDb;
use fuel_core_storage::Result as StorageResult;
use fuel_core_storage::{
transactional::AtomicView,
Result as StorageResult,
};
use fuel_core_types::{
entities::{
coins::coin::{
Expand Down Expand Up @@ -91,8 +94,18 @@ impl TxPoolDb for MockDb {
fn is_message_spent(&self, id: &Nonce) -> StorageResult<bool> {
Ok(self.data.lock().unwrap().spent_messages.contains(id))
}
}

pub struct MockDBProvider(pub MockDb);

impl AtomicView for MockDBProvider {
type View = MockDb;

fn view_at(&self, _: BlockHeight) -> StorageResult<Self::View> {
Ok(self.latest_view())
}

fn current_block_height(&self) -> StorageResult<BlockHeight> {
Ok(Default::default())
fn latest_view(&self) -> Self::View {
self.0.clone()
}
}
3 changes: 0 additions & 3 deletions crates/services/txpool/src/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use fuel_core_types::{
UtxoId,
},
fuel_types::{
BlockHeight,
ContractId,
Nonce,
},
Expand Down Expand Up @@ -55,6 +54,4 @@ pub trait TxPoolDb: Send + Sync {
fn message(&self, message_id: &Nonce) -> StorageResult<Option<Message>>;

fn is_message_spent(&self, message_id: &Nonce) -> StorageResult<bool>;

fn current_block_height(&self) -> StorageResult<BlockHeight>;
}
Loading

0 comments on commit 7de49ae

Please sign in to comment.