From ef6aa6ac1e875a853eda2fc344a4e67d090ba28f Mon Sep 17 00:00:00 2001 From: xgreenx Date: Tue, 19 Dec 2023 16:38:00 +0100 Subject: [PATCH] Move `KeyValueStore` to the `fuel-core-storage` crate --- crates/database/src/lib.rs | 7 - crates/fuel-core/src/database.rs | 52 +++---- crates/fuel-core/src/database/block.rs | 7 +- crates/fuel-core/src/database/coin.rs | 13 +- crates/fuel-core/src/database/contracts.rs | 25 ++-- crates/fuel-core/src/database/message.rs | 5 +- crates/fuel-core/src/database/metadata.rs | 12 +- crates/fuel-core/src/database/transaction.rs | 2 +- crates/fuel-core/src/database/transactions.rs | 12 +- .../src/service/adapters/executor.rs | 4 +- .../src/service/adapters/graphql_api.rs | 10 +- .../fuel-core/src/service/adapters/txpool.rs | 5 +- crates/fuel-core/src/state.rs | 134 ++---------------- .../src/state/in_memory/memory_store.rs | 28 ++-- .../src/state/in_memory/transaction.rs | 34 +++-- crates/fuel-core/src/state/rocks_db.rs | 40 +++--- crates/storage/src/iter.rs | 19 ++- crates/storage/src/kv_store.rs | 128 +++++++++++++++++ crates/storage/src/lib.rs | 8 ++ 19 files changed, 293 insertions(+), 252 deletions(-) create mode 100644 crates/storage/src/kv_store.rs diff --git a/crates/database/src/lib.rs b/crates/database/src/lib.rs index c980ebca420..578f8c48010 100644 --- a/crates/database/src/lib.rs +++ b/crates/database/src/lib.rs @@ -13,7 +13,6 @@ use fuel_core_storage::Error as StorageError; use fuel_core_types::services::executor::Error as ExecutorError; -use std::array::TryFromSliceError; /// The error occurred during work with any of databases. #[derive(Debug, derive_more::Display, derive_more::From)] @@ -56,12 +55,6 @@ impl From for StorageError { } } -impl From for Error { - fn from(e: TryFromSliceError) -> Self { - Self::Other(anyhow::anyhow!(e)) - } -} - impl From for ExecutorError { fn from(e: Error) -> Self { ExecutorError::StorageError(anyhow::anyhow!(StorageError::from(e))) diff --git a/crates/fuel-core/src/database.rs b/crates/fuel-core/src/database.rs index f21d6bb45be..d2fb65cfddd 100644 --- a/crates/fuel-core/src/database.rs +++ b/crates/fuel-core/src/database.rs @@ -3,7 +3,6 @@ use crate::{ state::{ in_memory::memory_store::MemoryStore, DataSource, - WriteOperation, }, }; use fuel_core_chain_config::{ @@ -14,6 +13,11 @@ use fuel_core_chain_config::{ }; use fuel_core_storage::{ iter::IterDirection, + kv_store::{ + StorageColumn, + Value, + WriteOperation, + }, transactional::{ StorageTransaction, Transactional, @@ -50,13 +54,11 @@ use strum::EnumCount; pub use fuel_core_database::Error; pub type Result = core::result::Result; -type DatabaseError = Error; type DatabaseResult = Result; // TODO: Extract `Database` and all belongs into `fuel-core-database`. #[cfg(feature = "rocksdb")] use crate::state::rocks_db::RocksDb; -use crate::state::Value; #[cfg(feature = "rocksdb")] use std::path::Path; #[cfg(feature = "rocksdb")] @@ -160,7 +162,7 @@ impl Column { } } -impl crate::state::StorageColumn for Column { +impl StorageColumn for Column { fn name(&self) -> &'static str { self.into() } @@ -276,15 +278,15 @@ impl Database { key: K, column: Column, value: &V, - ) -> DatabaseResult> { + ) -> StorageResult> { let result = self.data.replace( key.as_ref(), column, - Arc::new(postcard::to_stdvec(value).map_err(|_| DatabaseError::Codec)?), + Arc::new(postcard::to_stdvec(value).map_err(|_| StorageError::Codec)?), )?; if let Some(previous) = result { Ok(Some( - postcard::from_bytes(&previous).map_err(|_| DatabaseError::Codec)?, + postcard::from_bytes(&previous).map_err(|_| StorageError::Codec)?, )) } else { Ok(None) @@ -296,7 +298,7 @@ impl Database { key: K, column: Column, value: V, - ) -> DatabaseResult> { + ) -> StorageResult> { self.data .replace(key.as_ref(), column, Arc::new(value.as_ref().to_vec())) } @@ -305,14 +307,14 @@ impl Database { &self, column: Column, set: S, - ) -> DatabaseResult<()> + ) -> StorageResult<()> where S: Iterator, { let set: Vec<_> = set .map(|(key, value)| { let value = - postcard::to_stdvec(&value).map_err(|_| DatabaseError::Codec)?; + postcard::to_stdvec(&value).map_err(|_| StorageError::Codec)?; let tuple = ( key.as_ref().to_vec(), @@ -320,7 +322,7 @@ impl Database { WriteOperation::Insert(Arc::new(value)), ); - Ok::<_, DatabaseError>(tuple) + Ok::<_, StorageError>(tuple) }) .try_collect()?; @@ -331,25 +333,25 @@ impl Database { &self, key: &[u8], column: Column, - ) -> DatabaseResult> { + ) -> StorageResult> { self.data .take(key, column)? - .map(|val| postcard::from_bytes(&val).map_err(|_| DatabaseError::Codec)) + .map(|val| postcard::from_bytes(&val).map_err(|_| StorageError::Codec)) .transpose() } - fn take_raw(&self, key: &[u8], column: Column) -> DatabaseResult> { + fn take_raw(&self, key: &[u8], column: Column) -> StorageResult> { self.data.take(key, column) } } /// Read-only methods. impl Database { - fn contains_key(&self, key: &[u8], column: Column) -> DatabaseResult { + fn contains_key(&self, key: &[u8], column: Column) -> StorageResult { self.data.exists(key, column) } - fn size_of_value(&self, key: &[u8], column: Column) -> DatabaseResult> { + fn size_of_value(&self, key: &[u8], column: Column) -> StorageResult> { self.data.size_of_value(key, column) } @@ -358,11 +360,11 @@ impl Database { key: &[u8], column: Column, buf: &mut [u8], - ) -> DatabaseResult> { + ) -> StorageResult> { self.data.read(key, column, buf) } - fn read_alloc(&self, key: &[u8], column: Column) -> DatabaseResult>> { + fn read_alloc(&self, key: &[u8], column: Column) -> StorageResult>> { self.data .get(key, column) .map(|value| value.map(|value| value.deref().clone())) @@ -372,10 +374,10 @@ impl Database { &self, key: &[u8], column: Column, - ) -> DatabaseResult> { + ) -> StorageResult> { self.data .get(key, column)? - .map(|val| postcard::from_bytes(&val).map_err(|_| DatabaseError::Codec)) + .map(|val| postcard::from_bytes(&val).map_err(|_| StorageError::Codec)) .transpose() } @@ -383,7 +385,7 @@ impl Database { &self, column: Column, direction: Option, - ) -> impl Iterator> + '_ + ) -> impl Iterator> + '_ where K: From>, V: DeserializeOwned, @@ -395,7 +397,7 @@ impl Database { &self, column: Column, prefix: Option

, - ) -> impl Iterator> + '_ + ) -> impl Iterator> + '_ where K: From>, V: DeserializeOwned, @@ -409,7 +411,7 @@ impl Database { column: Column, start: Option, direction: Option, - ) -> impl Iterator> + '_ + ) -> impl Iterator> + '_ where K: From>, V: DeserializeOwned, @@ -424,7 +426,7 @@ impl Database { prefix: Option

, start: Option, direction: Option, - ) -> impl Iterator> + '_ + ) -> impl Iterator> + '_ where K: From>, V: DeserializeOwned, @@ -442,7 +444,7 @@ impl Database { val.and_then(|(key, value)| { let key = K::from(key); let value: V = - postcard::from_bytes(&value).map_err(|_| DatabaseError::Codec)?; + postcard::from_bytes(&value).map_err(|_| StorageError::Codec)?; Ok((key, value)) }) }) diff --git a/crates/fuel-core/src/database/block.rs b/crates/fuel-core/src/database/block.rs index bada124d80a..f4fbbe3342d 100644 --- a/crates/fuel-core/src/database/block.rs +++ b/crates/fuel-core/src/database/block.rs @@ -9,7 +9,6 @@ use crate::database::{ Column, Database, Error as DatabaseError, - Result as DatabaseResult, }; use fuel_core_storage::{ iter::IterDirection, @@ -161,7 +160,7 @@ impl Database { &self, start: Option, direction: IterDirection, - ) -> impl Iterator> + '_ { + ) -> impl Iterator> + '_ { let start = start.map(|b| b.to_bytes()); self.iter_all_by_start::, BlockId, _>( Column::FuelBlockSecondaryKeyBlockHeights, @@ -178,7 +177,7 @@ impl Database { }) } - pub fn ids_of_genesis_block(&self) -> DatabaseResult<(BlockHeight, BlockId)> { + pub fn ids_of_genesis_block(&self) -> StorageResult<(BlockHeight, BlockId)> { self.iter_all( Column::FuelBlockSecondaryKeyBlockHeights, Some(IterDirection::Forward), @@ -192,7 +191,7 @@ impl Database { }) } - pub fn ids_of_latest_block(&self) -> DatabaseResult> { + pub fn ids_of_latest_block(&self) -> StorageResult> { let ids = self .iter_all::, BlockId>( Column::FuelBlockSecondaryKeyBlockHeights, diff --git a/crates/fuel-core/src/database/coin.rs b/crates/fuel-core/src/database/coin.rs index c0b12bd3b6f..b56ca30daf3 100644 --- a/crates/fuel-core/src/database/coin.rs +++ b/crates/fuel-core/src/database/coin.rs @@ -2,8 +2,6 @@ use crate::database::{ storage::DatabaseColumn, Column, Database, - Error as DatabaseError, - Result as DatabaseResult, }; use fuel_core_chain_config::CoinConfig; use fuel_core_storage::{ @@ -110,7 +108,7 @@ impl Database { owner: &Address, start_coin: Option, direction: Option, - ) -> impl Iterator> + '_ { + ) -> impl Iterator> + '_ { self.iter_all_filtered::, bool, _, _>( Column::OwnedCoins, Some(*owner), @@ -138,14 +136,13 @@ impl Database { Ok(coin) } - pub fn get_coin_config(&self) -> DatabaseResult>> { + pub fn get_coin_config(&self) -> StorageResult>> { let configs = self .iter_all::, CompressedCoin>(Column::Coins, None) - .map(|raw_coin| -> DatabaseResult { + .map(|raw_coin| -> StorageResult { let coin = raw_coin?; - let byte_id = - Bytes32::new(coin.0[..32].try_into().map_err(DatabaseError::from)?); + let byte_id = Bytes32::new(coin.0[..32].try_into()?); let output_index = coin.0[32]; Ok(CoinConfig { @@ -159,7 +156,7 @@ impl Database { asset_id: coin.1.asset_id, }) }) - .collect::>>()?; + .collect::>>()?; Ok(Some(configs)) } diff --git a/crates/fuel-core/src/database/contracts.rs b/crates/fuel-core/src/database/contracts.rs index bc061d6ea28..48cbb1a7809 100644 --- a/crates/fuel-core/src/database/contracts.rs +++ b/crates/fuel-core/src/database/contracts.rs @@ -2,8 +2,6 @@ use crate::database::{ storage::DatabaseColumn, Column, Database, - Error as DatabaseError, - Result as DatabaseResult, }; use fuel_core_chain_config::ContractConfig; use fuel_core_storage::{ @@ -90,7 +88,7 @@ impl StorageMutate for Database { impl StorageSize for Database { fn size_of_value(&self, key: &ContractId) -> Result, Self::Error> { - Ok(self.size_of_value(key.as_ref(), Column::ContractsRawCode)?) + self.size_of_value(key.as_ref(), Column::ContractsRawCode) } } @@ -100,11 +98,11 @@ impl StorageRead for Database { key: &ContractId, buf: &mut [u8], ) -> Result, Self::Error> { - Ok(self.read(key.as_ref(), Column::ContractsRawCode, buf)?) + self.read(key.as_ref(), Column::ContractsRawCode, buf) } fn read_alloc(&self, key: &ContractId) -> Result>, Self::Error> { - Ok(self.read_alloc(key.as_ref(), Column::ContractsRawCode)?) + self.read_alloc(key.as_ref(), Column::ContractsRawCode) } } @@ -142,7 +140,7 @@ impl Database { Column::ContractsState, Some(contract_id.as_ref()), ) - .map(|res| -> DatabaseResult<(Bytes32, Bytes32)> { + .map(|res| -> StorageResult<(Bytes32, Bytes32)> { let safe_res = res?; // We don't need to store ContractId which is the first 32 bytes of this @@ -152,7 +150,7 @@ impl Database { Ok((state_key, safe_res.1)) }) .filter(|val| val.is_ok()) - .collect::>>()?, + .collect::>>()?, ); let balances = Some( @@ -163,9 +161,7 @@ impl Database { .map(|res| { let safe_res = res?; - let asset_id = AssetId::new( - safe_res.0[32..].try_into().map_err(DatabaseError::from)?, - ); + let asset_id = AssetId::new(safe_res.0[32..].try_into()?); Ok((asset_id, safe_res.1)) }) @@ -191,7 +187,7 @@ impl Database { contract: ContractId, start_asset: Option, direction: Option, - ) -> impl Iterator> + '_ { + ) -> impl Iterator> + '_ { self.iter_all_filtered::, Word, _, _>( Column::ContractsAssets, Some(contract), @@ -209,11 +205,8 @@ impl Database { let configs = self .iter_all::, Word>(Column::ContractsRawCode, None) .map(|raw_contract_id| -> StorageResult { - let contract_id = ContractId::new( - raw_contract_id.unwrap().0[..32] - .try_into() - .map_err(DatabaseError::from)?, - ); + let contract_id = + ContractId::new(raw_contract_id.unwrap().0[..32].try_into()?); self.get_contract_config_by_id(contract_id) }) .collect::>>()?; diff --git a/crates/fuel-core/src/database/message.rs b/crates/fuel-core/src/database/message.rs index 308b7c155db..cccbf8abb1c 100644 --- a/crates/fuel-core/src/database/message.rs +++ b/crates/fuel-core/src/database/message.rs @@ -2,7 +2,6 @@ use crate::database::{ storage::ToDatabaseKey, Column, Database, - Result as DatabaseResult, }; use fuel_core_chain_config::MessageConfig; use fuel_core_storage::{ @@ -93,7 +92,7 @@ impl Database { owner: &Address, start_message_id: Option, direction: Option, - ) -> impl Iterator> + '_ { + ) -> impl Iterator> + '_ { self.iter_all_filtered::, bool, _, _>( Column::OwnedMessageIds, Some(*owner), @@ -112,7 +111,7 @@ impl Database { &self, start: Option, direction: Option, - ) -> impl Iterator> + '_ { + ) -> impl Iterator> + '_ { let start = start.map(|v| v.deref().to_vec()); self.iter_all_by_start::, Message, _>(Column::Messages, start, direction) .map(|res| res.map(|(_, message)| message)) diff --git a/crates/fuel-core/src/database/metadata.rs b/crates/fuel-core/src/database/metadata.rs index 88ae9391ba1..5239e58401e 100644 --- a/crates/fuel-core/src/database/metadata.rs +++ b/crates/fuel-core/src/database/metadata.rs @@ -2,9 +2,9 @@ use crate::database::{ Column, Database, Error as DatabaseError, - Result as DatabaseResult, }; use fuel_core_chain_config::ChainConfig; +use fuel_core_storage::Result as StorageResult; pub(crate) const DB_VERSION_KEY: &[u8] = b"version"; pub(crate) const CHAIN_NAME_KEY: &[u8] = b"chain_name"; @@ -17,13 +17,13 @@ pub(crate) const DB_VERSION: u32 = 0x00; impl Database { /// Ensures the database is initialized and that the database version is correct - pub fn init(&self, config: &ChainConfig) -> DatabaseResult<()> { + pub fn init(&self, config: &ChainConfig) -> StorageResult<()> { // initialize chain name if not set if self.get_chain_name()?.is_none() { self.insert(CHAIN_NAME_KEY, Column::Metadata, &config.chain_name) .and_then(|v: Option| { if v.is_some() { - Err(DatabaseError::ChainAlreadyInitialized) + Err(DatabaseError::ChainAlreadyInitialized.into()) } else { Ok(()) } @@ -45,11 +45,11 @@ impl Database { Ok(()) } - pub fn get_chain_name(&self) -> DatabaseResult> { + pub fn get_chain_name(&self) -> StorageResult> { self.get(CHAIN_NAME_KEY, Column::Metadata) } - pub fn increase_tx_count(&self, new_txs: u64) -> DatabaseResult { + pub fn increase_tx_count(&self, new_txs: u64) -> StorageResult { // TODO: how should tx count be initialized after regenesis? let current_tx_count: u64 = self.get(TX_COUNT, Column::Metadata)?.unwrap_or_default(); @@ -59,7 +59,7 @@ impl Database { Ok(new_tx_count) } - pub fn get_tx_count(&self) -> DatabaseResult { + pub fn get_tx_count(&self) -> StorageResult { self.get(TX_COUNT, Column::Metadata) .map(|v| v.unwrap_or_default()) } diff --git a/crates/fuel-core/src/database/transaction.rs b/crates/fuel-core/src/database/transaction.rs index b9c47d954bf..2f8829ab406 100644 --- a/crates/fuel-core/src/database/transaction.rs +++ b/crates/fuel-core/src/database/transaction.rs @@ -58,7 +58,7 @@ impl Default for DatabaseTransaction { impl Transaction for DatabaseTransaction { fn commit(&mut self) -> StorageResult<()> { // TODO: should commit be fallible if this api is meant to be atomic? - Ok(self.changes.commit()?) + self.changes.commit() } } diff --git a/crates/fuel-core/src/database/transactions.rs b/crates/fuel-core/src/database/transactions.rs index a6c99a7c588..e41e84b7ece 100644 --- a/crates/fuel-core/src/database/transactions.rs +++ b/crates/fuel-core/src/database/transactions.rs @@ -2,11 +2,11 @@ use crate::database::{ storage::DatabaseColumn, Column, Database, - Result as DatabaseResult, }; use fuel_core_storage::{ iter::IterDirection, tables::Transactions, + Result as StorageResult, }; use fuel_core_types::{ self, @@ -37,7 +37,7 @@ impl Database { &self, start: Option<&Bytes32>, direction: Option, - ) -> impl Iterator> + '_ { + ) -> impl Iterator> + '_ { let start = start.map(|b| b.as_ref().to_vec()); self.iter_all_by_start::, Transaction, _>( Column::Transactions, @@ -56,7 +56,7 @@ impl Database { owner: Address, start: Option, direction: Option, - ) -> impl Iterator> + '_ { + ) -> impl Iterator> + '_ { let start = start .map(|cursor| owned_tx_index_key(&owner, cursor.block_height, cursor.tx_idx)); self.iter_all_filtered::( @@ -76,7 +76,7 @@ impl Database { block_height: BlockHeight, tx_idx: TransactionIndex, tx_id: &Bytes32, - ) -> DatabaseResult> { + ) -> StorageResult> { self.insert( owned_tx_index_key(owner, block_height, tx_idx), Column::TransactionsByOwnerBlockIdx, @@ -88,14 +88,14 @@ impl Database { &self, id: &Bytes32, status: TransactionStatus, - ) -> DatabaseResult> { + ) -> StorageResult> { self.insert(id, Column::TransactionStatus, &status) } pub fn get_tx_status( &self, id: &Bytes32, - ) -> DatabaseResult> { + ) -> StorageResult> { self.get(&id.deref()[..], Column::TransactionStatus) } } diff --git a/crates/fuel-core/src/service/adapters/executor.rs b/crates/fuel-core/src/service/adapters/executor.rs index 83d74991ace..bb6f27083f3 100644 --- a/crates/fuel-core/src/service/adapters/executor.rs +++ b/crates/fuel-core/src/service/adapters/executor.rs @@ -99,7 +99,7 @@ impl fuel_core_executor::ports::TxIdOwnerRecorder for Database { tx_idx: u16, tx_id: &Bytes32, ) -> Result, Self::Error> { - Ok(self.record_tx_id_owner(owner, block_height, tx_idx, tx_id)?) + self.record_tx_id_owner(owner, block_height, tx_idx, tx_id) } fn update_tx_status( @@ -107,7 +107,7 @@ impl fuel_core_executor::ports::TxIdOwnerRecorder for Database { id: &Bytes32, status: TransactionStatus, ) -> Result, Self::Error> { - Ok(self.update_tx_status(id, status)?) + self.update_tx_status(id, status) } } diff --git a/crates/fuel-core/src/service/adapters/graphql_api.rs b/crates/fuel-core/src/service/adapters/graphql_api.rs index 39ec466be1b..4faea60040a 100644 --- a/crates/fuel-core/src/service/adapters/graphql_api.rs +++ b/crates/fuel-core/src/service/adapters/graphql_api.rs @@ -95,19 +95,17 @@ impl DatabaseBlocks for Database { } fn ids_of_latest_block(&self) -> StorageResult<(BlockHeight, BlockId)> { - Ok(self - .ids_of_latest_block() + self.ids_of_latest_block() .transpose() - .ok_or(not_found!("BlockId"))??) + .ok_or(not_found!("BlockId"))? } } impl DatabaseTransactions for Database { fn tx_status(&self, tx_id: &TxId) -> StorageResult { - Ok(self - .get_tx_status(tx_id) + self.get_tx_status(tx_id) .transpose() - .ok_or(not_found!("TransactionId"))??) + .ok_or(not_found!("TransactionId"))? } fn owned_transactions_ids( diff --git a/crates/fuel-core/src/service/adapters/txpool.rs b/crates/fuel-core/src/service/adapters/txpool.rs index 3dd9ba9c089..6f1593f6d77 100644 --- a/crates/fuel-core/src/service/adapters/txpool.rs +++ b/crates/fuel-core/src/service/adapters/txpool.rs @@ -149,9 +149,8 @@ impl fuel_core_txpool::ports::TxPoolDb for Database { &self, tx_id: &fuel_core_types::fuel_types::Bytes32, ) -> StorageResult { - Ok(self - .get_tx_status(tx_id) + self.get_tx_status(tx_id) .transpose() - .ok_or(not_found!("TransactionId"))??) + .ok_or(not_found!("TransactionId"))? } } diff --git a/crates/fuel-core/src/state.rs b/crates/fuel-core/src/state.rs index 2eaebb366fb..49ca2b7a73a 100644 --- a/crates/fuel-core/src/state.rs +++ b/crates/fuel-core/src/state.rs @@ -4,9 +4,12 @@ use crate::database::{ Error as DatabaseError, Result as DatabaseResult, }; -use fuel_core_storage::iter::{ - BoxedIter, - IterDirection, +use fuel_core_storage::{ + iter::{ + IterDirection, + IteratorableStore, + }, + kv_store::BatchOperations, }; use std::{ fmt::Debug, @@ -14,129 +17,10 @@ use std::{ }; pub type DataSource = Arc>; -pub type Value = Arc>; -pub type KVItem = DatabaseResult<(Vec, Value)>; -/// A column of the storage. -pub trait StorageColumn: Clone { - /// Returns the name of the column. - fn name(&self) -> &'static str; - - /// Returns the id of the column. - fn id(&self) -> u32; -} - -pub trait KeyValueStore { - /// The type of the column. - type Column: StorageColumn; - - /// Inserts the `Value` into the storage. - fn put(&self, key: &[u8], column: Self::Column, value: Value) -> DatabaseResult<()> { - self.write(key, column, value.as_ref()).map(|_| ()) - } - - /// Put the `Value` into the storage and return the old value. - fn replace( - &self, - key: &[u8], - column: Self::Column, - value: Value, - ) -> DatabaseResult> { - // FIXME: This is a race condition. We should use a transaction. - let old_value = self.get(key, column.clone())?; - self.put(key, column, value)?; - Ok(old_value) - } - - /// Writes the `buf` into the storage and returns the number of written bytes. - fn write( - &self, - key: &[u8], - column: Self::Column, - buf: &[u8], - ) -> DatabaseResult; - - /// Removes the value from the storage and returns it. - fn take(&self, key: &[u8], column: Self::Column) -> DatabaseResult> { - // FIXME: This is a race condition. We should use a transaction. - let old_value = self.get(key, column.clone())?; - self.delete(key, column)?; - Ok(old_value) - } - - /// Removes the value from the storage. - fn delete(&self, key: &[u8], column: Self::Column) -> DatabaseResult<()>; - - /// Checks if the value exists in the storage. - fn exists(&self, key: &[u8], column: Self::Column) -> DatabaseResult { - Ok(self.size_of_value(key, column)?.is_some()) - } - - /// Returns the size of the value in the storage. - fn size_of_value( - &self, - key: &[u8], - column: Self::Column, - ) -> DatabaseResult> { - Ok(self.get(key, column.clone())?.map(|value| value.len())) - } - - /// Returns the value from the storage. - fn get(&self, key: &[u8], column: Self::Column) -> DatabaseResult>; - - /// Reads the value from the storage into the `buf` and returns the number of read bytes. - fn read( - &self, - key: &[u8], - column: Self::Column, - mut buf: &mut [u8], - ) -> DatabaseResult> { - self.get(key, column.clone())? - .map(|value| { - let read = value.len(); - std::io::Write::write_all(&mut buf, value.as_ref()) - .map_err(|e| DatabaseError::Other(anyhow::anyhow!(e)))?; - Ok(read) - }) - .transpose() - } - - /// Returns an iterator over the values in the storage. - fn iter_all( - &self, - column: Self::Column, - prefix: Option<&[u8]>, - start: Option<&[u8]>, - direction: IterDirection, - ) -> BoxedIter; -} - -pub trait BatchOperations: KeyValueStore { - fn batch_write( - &self, - entries: &mut dyn Iterator, Self::Column, WriteOperation)>, - ) -> DatabaseResult<()> { - for (key, column, op) in entries { - match op { - WriteOperation::Insert(value) => { - self.put(&key, column, value)?; - } - WriteOperation::Remove => { - self.delete(&key, column)?; - } - } - } - Ok(()) - } -} - -#[derive(Debug)] -pub enum WriteOperation { - Insert(Value), - Remove, -} - -pub trait TransactableStorage: BatchOperations + Debug + Send + Sync { +pub trait TransactableStorage: + IteratorableStore + BatchOperations + Debug + Send + Sync +{ fn checkpoint(&self) -> DatabaseResult { Err(DatabaseError::Other(anyhow::anyhow!( "Checkpoint is not supported" diff --git a/crates/fuel-core/src/state/in_memory/memory_store.rs b/crates/fuel-core/src/state/in_memory/memory_store.rs index 688ca0650f0..bcab81cb7f0 100644 --- a/crates/fuel-core/src/state/in_memory/memory_store.rs +++ b/crates/fuel-core/src/state/in_memory/memory_store.rs @@ -6,15 +6,21 @@ use crate::{ state::{ BatchOperations, IterDirection, + TransactableStorage, + }, +}; +use fuel_core_storage::{ + iter::{ + BoxedIter, + IntoBoxedIter, + IteratorableStore, + }, + kv_store::{ KVItem, KeyValueStore, - TransactableStorage, Value, }, -}; -use fuel_core_storage::iter::{ - BoxedIter, - IntoBoxedIter, + Result as StorageResult, }; use std::{ collections::BTreeMap, @@ -106,14 +112,14 @@ impl KeyValueStore for MemoryStore { key: &[u8], column: Column, value: Value, - ) -> DatabaseResult> { + ) -> StorageResult> { Ok(self.inner[column.as_usize()] .lock() .expect("poisoned") .insert(key.to_vec(), value)) } - fn write(&self, key: &[u8], column: Column, buf: &[u8]) -> DatabaseResult { + fn write(&self, key: &[u8], column: Column, buf: &[u8]) -> StorageResult { let len = buf.len(); self.inner[column.as_usize()] .lock() @@ -122,25 +128,27 @@ impl KeyValueStore for MemoryStore { Ok(len) } - fn take(&self, key: &[u8], column: Column) -> DatabaseResult> { + fn take(&self, key: &[u8], column: Column) -> StorageResult> { Ok(self.inner[column.as_usize()] .lock() .expect("poisoned") .remove(&key.to_vec())) } - fn delete(&self, key: &[u8], column: Column) -> DatabaseResult<()> { + fn delete(&self, key: &[u8], column: Column) -> StorageResult<()> { self.take(key, column).map(|_| ()) } - fn get(&self, key: &[u8], column: Column) -> DatabaseResult> { + fn get(&self, key: &[u8], column: Column) -> StorageResult> { Ok(self.inner[column.as_usize()] .lock() .expect("poisoned") .get(&key.to_vec()) .cloned()) } +} +impl IteratorableStore for MemoryStore { fn iter_all( &self, column: Column, diff --git a/crates/fuel-core/src/state/in_memory/transaction.rs b/crates/fuel-core/src/state/in_memory/transaction.rs index 8361c29e8ab..e249a3b5c78 100644 --- a/crates/fuel-core/src/state/in_memory/transaction.rs +++ b/crates/fuel-core/src/state/in_memory/transaction.rs @@ -8,16 +8,22 @@ use crate::{ BatchOperations, DataSource, IterDirection, + TransactableStorage, + }, +}; +use fuel_core_storage::{ + iter::{ + BoxedIter, + IntoBoxedIter, + IteratorableStore, + }, + kv_store::{ KVItem, KeyValueStore, - TransactableStorage, Value, WriteOperation, }, -}; -use fuel_core_storage::iter::{ - BoxedIter, - IntoBoxedIter, + Result as StorageResult, }; use itertools::{ EitherOrBoth, @@ -52,7 +58,7 @@ impl MemoryTransactionView { } } - pub fn commit(&self) -> DatabaseResult<()> { + pub fn commit(&self) -> StorageResult<()> { let mut iter = self .changes .iter() @@ -76,7 +82,7 @@ impl KeyValueStore for MemoryTransactionView { key: &[u8], column: Column, value: Value, - ) -> DatabaseResult> { + ) -> StorageResult> { let key_vec = key.to_vec(); let contained_key = self.changes[column.as_usize()] .lock() @@ -91,7 +97,7 @@ impl KeyValueStore for MemoryTransactionView { } } - fn write(&self, key: &[u8], column: Column, buf: &[u8]) -> DatabaseResult { + fn write(&self, key: &[u8], column: Column, buf: &[u8]) -> StorageResult { let k = key.to_vec(); self.changes[column.as_usize()] .lock() @@ -100,7 +106,7 @@ impl KeyValueStore for MemoryTransactionView { self.view_layer.write(key, column, buf) } - fn take(&self, key: &[u8], column: Column) -> DatabaseResult> { + fn take(&self, key: &[u8], column: Column) -> StorageResult> { let k = key.to_vec(); let contained_key = { let mut lock = self.changes[column.as_usize()] @@ -116,7 +122,7 @@ impl KeyValueStore for MemoryTransactionView { } } - fn delete(&self, key: &[u8], column: Column) -> DatabaseResult<()> { + fn delete(&self, key: &[u8], column: Column) -> StorageResult<()> { let k = key.to_vec(); self.changes[column.as_usize()] .lock() @@ -125,7 +131,7 @@ impl KeyValueStore for MemoryTransactionView { self.view_layer.delete(key, column) } - fn size_of_value(&self, key: &[u8], column: Column) -> DatabaseResult> { + fn size_of_value(&self, key: &[u8], column: Column) -> StorageResult> { // try to fetch data from View layer if any changes to the key if self.changes[column.as_usize()] .lock() @@ -140,7 +146,7 @@ impl KeyValueStore for MemoryTransactionView { } } - fn get(&self, key: &[u8], column: Column) -> DatabaseResult> { + fn get(&self, key: &[u8], column: Column) -> StorageResult> { // try to fetch data from View layer if any changes to the key if self.changes[column.as_usize()] .lock() @@ -159,7 +165,7 @@ impl KeyValueStore for MemoryTransactionView { key: &[u8], column: Column, buf: &mut [u8], - ) -> DatabaseResult> { + ) -> StorageResult> { // try to fetch data from View layer if any changes to the key if self.changes[column.as_usize()] .lock() @@ -173,7 +179,9 @@ impl KeyValueStore for MemoryTransactionView { self.data_source.read(key, column, buf) } } +} +impl IteratorableStore for MemoryTransactionView { fn iter_all( &self, column: Column, diff --git a/crates/fuel-core/src/state/rocks_db.rs b/crates/fuel-core/src/state/rocks_db.rs index 54521524a03..85b37faab3a 100644 --- a/crates/fuel-core/src/state/rocks_db.rs +++ b/crates/fuel-core/src/state/rocks_db.rs @@ -9,17 +9,23 @@ use crate::{ state::{ BatchOperations, IterDirection, + TransactableStorage, + }, +}; +use fuel_core_metrics::core_metrics::database_metrics; +use fuel_core_storage::{ + iter::{ + BoxedIter, + IntoBoxedIter, + IteratorableStore, + }, + kv_store::{ KVItem, KeyValueStore, - TransactableStorage, Value, WriteOperation, }, -}; -use fuel_core_metrics::core_metrics::database_metrics; -use fuel_core_storage::iter::{ - BoxedIter, - IntoBoxedIter, + Result as StorageResult, }; use rand::RngCore; use rocksdb::{ @@ -301,7 +307,7 @@ impl RocksDb { (key_as_vec, Arc::new(value_as_vec)) }) - .map_err(|e| DatabaseError::Other(e.into())) + .map_err(|e| DatabaseError::Other(e.into()).into()) }) } } @@ -309,7 +315,7 @@ impl RocksDb { impl KeyValueStore for RocksDb { type Column = Column; - fn write(&self, key: &[u8], column: Column, buf: &[u8]) -> DatabaseResult { + fn write(&self, key: &[u8], column: Column, buf: &[u8]) -> StorageResult { let r = buf.len(); self.db .put_cf(&self.cf(column), key, buf) @@ -321,13 +327,13 @@ impl KeyValueStore for RocksDb { Ok(r) } - fn delete(&self, key: &[u8], column: Column) -> DatabaseResult<()> { + fn delete(&self, key: &[u8], column: Column) -> StorageResult<()> { self.db .delete_cf(&self.cf(column), key) - .map_err(|e| DatabaseError::Other(e.into())) + .map_err(|e| DatabaseError::Other(e.into()).into()) } - fn size_of_value(&self, key: &[u8], column: Column) -> DatabaseResult> { + fn size_of_value(&self, key: &[u8], column: Column) -> StorageResult> { database_metrics().read_meter.inc(); Ok(self @@ -337,7 +343,7 @@ impl KeyValueStore for RocksDb { .map(|value| value.len())) } - fn get(&self, key: &[u8], column: Column) -> DatabaseResult> { + fn get(&self, key: &[u8], column: Column) -> StorageResult> { database_metrics().read_meter.inc(); let value = self @@ -357,7 +363,7 @@ impl KeyValueStore for RocksDb { key: &[u8], column: Column, mut buf: &mut [u8], - ) -> DatabaseResult> { + ) -> StorageResult> { database_metrics().read_meter.inc(); let r = self @@ -368,7 +374,7 @@ impl KeyValueStore for RocksDb { let read = value.len(); std::io::Write::write_all(&mut buf, value.as_ref()) .map_err(|e| DatabaseError::Other(anyhow::anyhow!(e)))?; - DatabaseResult::Ok(read) + StorageResult::Ok(read) }) .transpose()?; @@ -378,7 +384,9 @@ impl KeyValueStore for RocksDb { Ok(r) } +} +impl IteratorableStore for RocksDb { fn iter_all( &self, column: Column, @@ -450,7 +458,7 @@ impl BatchOperations for RocksDb { fn batch_write( &self, entries: &mut dyn Iterator, Column, WriteOperation)>, - ) -> DatabaseResult<()> { + ) -> StorageResult<()> { let mut batch = WriteBatch::default(); for (key, column, op) in entries { @@ -471,7 +479,7 @@ impl BatchOperations for RocksDb { self.db .write(batch) - .map_err(|e| DatabaseError::Other(e.into())) + .map_err(|e| DatabaseError::Other(e.into()).into()) } } diff --git a/crates/storage/src/iter.rs b/crates/storage/src/iter.rs index 380aaf83342..271fb48ebae 100644 --- a/crates/storage/src/iter.rs +++ b/crates/storage/src/iter.rs @@ -1,4 +1,9 @@ -//! Iterators returned by the storage. +//! The module defines primitives that allow iterating of the storage. + +use crate::kv_store::{ + KVItem, + KeyValueStore, +}; /// A boxed variant of the iterator that can be used as a return type of the traits. pub struct BoxedIter<'a, T> { @@ -44,3 +49,15 @@ impl Default for IterDirection { Self::Forward } } + +/// A trait for iterating over the storage of [`KeyValueStore`]. +pub trait IteratorableStore: KeyValueStore { + /// Returns an iterator over the values in the storage. + fn iter_all( + &self, + column: Self::Column, + prefix: Option<&[u8]>, + start: Option<&[u8]>, + direction: IterDirection, + ) -> BoxedIter; +} diff --git a/crates/storage/src/kv_store.rs b/crates/storage/src/kv_store.rs new file mode 100644 index 00000000000..430d50f426a --- /dev/null +++ b/crates/storage/src/kv_store.rs @@ -0,0 +1,128 @@ +//! The module provides plain abstract definition of the key-value store. + +use crate::{ + Error as StorageError, + Result as StorageResult, +}; +use std::sync::Arc; + +/// The value of the storage. It is wrapped into the `Arc` to provide less cloning of massive objects. +pub type Value = Arc>; +/// The pair of key and value from the storage. +pub type KVItem = StorageResult<(Vec, Value)>; + +/// A column of the storage. +pub trait StorageColumn: Clone { + /// Returns the name of the column. + fn name(&self) -> &'static str; + + /// Returns the id of the column. + fn id(&self) -> u32; +} + +/// The definition of the key-value store. +pub trait KeyValueStore { + /// The type of the column. + type Column: StorageColumn; + + /// Inserts the `Value` into the storage. + fn put(&self, key: &[u8], column: Self::Column, value: Value) -> StorageResult<()> { + self.write(key, column, value.as_ref()).map(|_| ()) + } + + /// Put the `Value` into the storage and return the old value. + fn replace( + &self, + key: &[u8], + column: Self::Column, + value: Value, + ) -> StorageResult> { + // FIXME: This is a race condition. We should use a transaction. + let old_value = self.get(key, column.clone())?; + self.put(key, column, value)?; + Ok(old_value) + } + + /// Writes the `buf` into the storage and returns the number of written bytes. + fn write(&self, key: &[u8], column: Self::Column, buf: &[u8]) + -> StorageResult; + + /// Removes the value from the storage and returns it. + fn take(&self, key: &[u8], column: Self::Column) -> StorageResult> { + // FIXME: This is a race condition. We should use a transaction. + let old_value = self.get(key, column.clone())?; + self.delete(key, column)?; + Ok(old_value) + } + + /// Removes the value from the storage. + fn delete(&self, key: &[u8], column: Self::Column) -> StorageResult<()>; + + /// Checks if the value exists in the storage. + fn exists(&self, key: &[u8], column: Self::Column) -> StorageResult { + Ok(self.size_of_value(key, column)?.is_some()) + } + + /// Returns the size of the value in the storage. + fn size_of_value( + &self, + key: &[u8], + column: Self::Column, + ) -> StorageResult> { + Ok(self.get(key, column.clone())?.map(|value| value.len())) + } + + /// Returns the value from the storage. + fn get(&self, key: &[u8], column: Self::Column) -> StorageResult>; + + /// Reads the value from the storage into the `buf` and returns the number of read bytes. + fn read( + &self, + key: &[u8], + column: Self::Column, + buf: &mut [u8], + ) -> StorageResult> { + self.get(key, column.clone())? + .map(|value| { + let read = value.len(); + if read != buf.len() { + return Err(StorageError::Other(anyhow::anyhow!( + "Buffer size is not equal to the value size" + ))); + } + buf.copy_from_slice(value.as_ref()); + Ok(read) + }) + .transpose() + } +} + +/// The operation to write into the storage. +#[derive(Debug)] +pub enum WriteOperation { + /// Insert the value into the storage. + Insert(Value), + /// Remove the value from the storage. + Remove, +} + +/// The definition of the key-value store with batch operations. +pub trait BatchOperations: KeyValueStore { + /// Writes the batch of the entries into the storage. + fn batch_write( + &self, + entries: &mut dyn Iterator, Self::Column, WriteOperation)>, + ) -> StorageResult<()> { + for (key, column, op) in entries { + match op { + WriteOperation::Insert(value) => { + self.put(&key, column, value)?; + } + WriteOperation::Remove => { + self.delete(&key, column)?; + } + } + } + Ok(()) + } +} diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 40db60fccc4..e6a345a1ce5 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -10,6 +10,7 @@ #![deny(missing_docs)] #![deny(warnings)] +use core::array::TryFromSliceError; use fuel_core_types::services::executor::Error as ExecutorError; pub use fuel_vm_private::{ @@ -21,6 +22,7 @@ pub use fuel_vm_private::{ }; pub mod iter; +pub mod kv_store; pub mod tables; #[cfg(feature = "test-helpers")] pub mod test_helpers; @@ -78,6 +80,12 @@ impl From for fuel_vm_private::prelude::RuntimeError { } } +impl From for Error { + fn from(e: TryFromSliceError) -> Self { + Self::Other(anyhow::anyhow!(e)) + } +} + /// The helper trait to work with storage errors. pub trait IsNotFound { /// Return `true` if the error is [`Error::NotFound`].