diff --git a/bin/fuel-core/src/cli/run.rs b/bin/fuel-core/src/cli/run.rs index bb5e56feac1..e6210d8330c 100644 --- a/bin/fuel-core/src/cli/run.rs +++ b/bin/fuel-core/src/cli/run.rs @@ -425,16 +425,12 @@ async fn shutdown_signal() -> anyhow::Result<()> { let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())?; - loop { - tokio::select! { - _ = sigterm.recv() => { - tracing::info!("sigterm received"); - break; - } - _ = sigint.recv() => { - tracing::info!("sigint received"); - break; - } + tokio::select! { + _ = sigterm.recv() => { + tracing::info!("sigterm received"); + } + _ = sigint.recv() => { + tracing::info!("sigint received"); } } } diff --git a/crates/fuel-core/src/database.rs b/crates/fuel-core/src/database.rs index b3a338afa23..f21d6bb45be 100644 --- a/crates/fuel-core/src/database.rs +++ b/crates/fuel-core/src/database.rs @@ -56,6 +56,7 @@ type DatabaseResult = Result; // TODO: Extract `Database` and all belongs into `fuel-core-database`. #[cfg(feature = "rocksdb")] use crate::state::rocks_db::RocksDb; +use crate::state::Value; #[cfg(feature = "rocksdb")] use std::path::Path; #[cfg(feature = "rocksdb")] @@ -84,7 +85,14 @@ pub mod transactions; /// Database tables column ids to the corresponding [`fuel_core_storage::Mappable`] table. #[repr(u32)] #[derive( - Copy, Clone, Debug, strum_macros::EnumCount, PartialEq, Eq, enum_iterator::Sequence, + Copy, + Clone, + Debug, + strum_macros::EnumCount, + strum_macros::IntoStaticStr, + PartialEq, + Eq, + enum_iterator::Sequence, )] pub enum Column { /// The column id of metadata about the blockchain @@ -152,6 +160,16 @@ impl Column { } } +impl crate::state::StorageColumn for Column { + fn name(&self) -> &'static str { + self.into() + } + + fn id(&self) -> u32 { + *self as u32 + } +} + #[derive(Clone, Debug)] pub struct Database { data: DataSource, @@ -253,13 +271,13 @@ impl Database { /// Mutable methods. // TODO: Add `&mut self` to them. impl Database { - fn insert, V: Serialize, R: DeserializeOwned>( + fn insert, V: Serialize + ?Sized, R: DeserializeOwned>( &self, key: K, column: Column, value: &V, ) -> DatabaseResult> { - let result = self.data.put( + let result = self.data.replace( key.as_ref(), column, Arc::new(postcard::to_stdvec(value).map_err(|_| DatabaseError::Codec)?), @@ -273,6 +291,16 @@ impl Database { } } + fn insert_raw, V: AsRef<[u8]>>( + &self, + key: K, + column: Column, + value: V, + ) -> DatabaseResult> { + self.data + .replace(key.as_ref(), column, Arc::new(value.as_ref().to_vec())) + } + fn batch_insert, V: Serialize, S>( &self, column: Column, @@ -299,36 +327,19 @@ impl Database { self.data.batch_write(&mut set.into_iter()) } - fn remove( + fn take( &self, key: &[u8], column: Column, ) -> DatabaseResult> { self.data - .delete(key, column)? + .take(key, column)? .map(|val| postcard::from_bytes(&val).map_err(|_| DatabaseError::Codec)) .transpose() } - fn write(&self, key: &[u8], column: Column, buf: &[u8]) -> DatabaseResult { - self.data.write(key, column, buf) - } - - fn replace( - &self, - key: &[u8], - column: Column, - buf: &[u8], - ) -> DatabaseResult<(usize, Option>)> { - self.data - .replace(key, column, buf) - .map(|(size, value)| (size, value.map(|value| value.deref().clone()))) - } - - fn take(&self, key: &[u8], column: Column) -> DatabaseResult>> { - self.data - .take(key, column) - .map(|value| value.map(|value| value.deref().clone())) + fn take_raw(&self, key: &[u8], column: Column) -> DatabaseResult> { + self.data.take(key, column) } } @@ -353,7 +364,7 @@ impl Database { fn read_alloc(&self, key: &[u8], column: Column) -> DatabaseResult>> { self.data - .read_alloc(key, column) + .get(key, column) .map(|value| value.map(|value| value.deref().clone())) } diff --git a/crates/fuel-core/src/database/balances.rs b/crates/fuel-core/src/database/balances.rs index 424345674d8..0c92179adf9 100644 --- a/crates/fuel-core/src/database/balances.rs +++ b/crates/fuel-core/src/database/balances.rs @@ -100,7 +100,7 @@ impl StorageMutate for Database { &mut self, key: &::Key, ) -> Result::OwnedValue>, Self::Error> { - let prev = Database::remove(self, key.as_ref(), Column::ContractsAssets) + let prev = Database::take(self, key.as_ref(), Column::ContractsAssets) .map_err(Into::into); // Get latest metadata entry for this contract id diff --git a/crates/fuel-core/src/database/block.rs b/crates/fuel-core/src/database/block.rs index 37c423f76d4..bada124d80a 100644 --- a/crates/fuel-core/src/database/block.rs +++ b/crates/fuel-core/src/database/block.rs @@ -106,7 +106,7 @@ impl StorageMutate for Database { fn remove(&mut self, key: &BlockId) -> Result, Self::Error> { let prev: Option = - Database::remove(self, key.as_slice(), Column::FuelBlocks)?; + Database::take(self, key.as_slice(), Column::FuelBlocks)?; if let Some(block) = &prev { let height = block.header().height(); diff --git a/crates/fuel-core/src/database/coin.rs b/crates/fuel-core/src/database/coin.rs index 1778cd6dc8f..c0b12bd3b6f 100644 --- a/crates/fuel-core/src/database/coin.rs +++ b/crates/fuel-core/src/database/coin.rs @@ -92,7 +92,7 @@ impl StorageMutate for Database { fn remove(&mut self, key: &UtxoId) -> Result, Self::Error> { let coin: Option = - Database::remove(self, &utxo_id_to_bytes(key), Column::Coins)?; + Database::take(self, &utxo_id_to_bytes(key), Column::Coins)?; // cleanup secondary index if let Some(coin) = &coin { diff --git a/crates/fuel-core/src/database/contracts.rs b/crates/fuel-core/src/database/contracts.rs index d092b3e07c4..bc061d6ea28 100644 --- a/crates/fuel-core/src/database/contracts.rs +++ b/crates/fuel-core/src/database/contracts.rs @@ -22,7 +22,6 @@ use fuel_core_storage::{ StorageMutate, StorageRead, StorageSize, - StorageWrite, }; use fuel_core_types::{ entities::contract::ContractUtxoInfo, @@ -74,19 +73,18 @@ impl StorageMutate for Database { key: &::Key, value: &::Value, ) -> Result::OwnedValue>, Self::Error> { - let existing = - Database::replace(self, key.as_ref(), Column::ContractsRawCode, value)?; - Ok(existing.1.map(Contract::from)) + let result = Database::insert_raw(self, key, Column::ContractsRawCode, value)?; + + Ok(result.map(|v| Contract::from(v.as_ref().clone()))) } fn remove( &mut self, key: &::Key, ) -> Result::OwnedValue>, Self::Error> { - Ok( - >::take(self, key)? - .map(Contract::from), - ) + let result = Database::take_raw(self, key.as_ref(), Column::ContractsRawCode)?; + + Ok(result.map(|v| Contract::from(v.as_ref().clone()))) } } @@ -110,44 +108,6 @@ impl StorageRead for Database { } } -impl StorageWrite for Database { - fn write(&mut self, key: &ContractId, buf: Vec) -> Result { - Ok(Database::write( - self, - key.as_ref(), - Column::ContractsRawCode, - &buf, - )?) - } - - fn replace( - &mut self, - key: &::Key, - buf: Vec, - ) -> Result<(usize, Option>), >::Error> - where - Self: StorageSize, - { - Ok(Database::replace( - self, - key.as_ref(), - Column::ContractsRawCode, - &buf, - )?) - } - - fn take( - &mut self, - key: &::Key, - ) -> Result>, Self::Error> { - Ok(Database::take( - self, - key.as_ref(), - Column::ContractsRawCode, - )?) - } -} - impl Database { pub fn get_contract_config_by_id( &self, diff --git a/crates/fuel-core/src/database/message.rs b/crates/fuel-core/src/database/message.rs index dc2c510ed04..308b7c155db 100644 --- a/crates/fuel-core/src/database/message.rs +++ b/crates/fuel-core/src/database/message.rs @@ -67,10 +67,10 @@ impl StorageMutate for Database { fn remove(&mut self, key: &Nonce) -> Result, Self::Error> { let result: Option = - Database::remove(self, key.database_key().as_ref(), Column::Messages)?; + Database::take(self, key.database_key().as_ref(), Column::Messages)?; if let Some(message) = &result { - Database::remove::( + Database::take::( self, &owner_msg_id_key(&message.recipient, key), Column::OwnedMessageIds, diff --git a/crates/fuel-core/src/database/state.rs b/crates/fuel-core/src/database/state.rs index 3dfb65335fb..d5af5db45d0 100644 --- a/crates/fuel-core/src/database/state.rs +++ b/crates/fuel-core/src/database/state.rs @@ -99,7 +99,7 @@ impl StorageMutate for Database { &mut self, key: &::Key, ) -> Result::OwnedValue>, Self::Error> { - let prev = Database::remove(self, key.as_ref(), Column::ContractsState) + let prev = Database::take(self, key.as_ref(), Column::ContractsState) .map_err(Into::into); // Get latest metadata entry for this contract id diff --git a/crates/fuel-core/src/database/storage.rs b/crates/fuel-core/src/database/storage.rs index 0dba6ceb4ea..6ceab3a776b 100644 --- a/crates/fuel-core/src/database/storage.rs +++ b/crates/fuel-core/src/database/storage.rs @@ -232,8 +232,7 @@ where } fn remove(&mut self, key: &T::Key) -> StorageResult> { - Database::remove(self, key.database_key().as_ref(), T::column()) - .map_err(Into::into) + Database::take(self, key.database_key().as_ref(), T::column()).map_err(Into::into) } } diff --git a/crates/fuel-core/src/state.rs b/crates/fuel-core/src/state.rs index 066f27eb0f3..2eaebb366fb 100644 --- a/crates/fuel-core/src/state.rs +++ b/crates/fuel-core/src/state.rs @@ -13,49 +13,98 @@ use std::{ sync::Arc, }; -pub type DataSource = Arc; +pub type DataSource = Arc>; pub type Value = Arc>; pub type KVItem = DatabaseResult<(Vec, Value)>; +/// A column of the storage. +pub trait StorageColumn: Clone { + /// Returns the name of the column. + fn name(&self) -> &'static str; + + /// Returns the id of the column. + fn id(&self) -> u32; +} + pub trait KeyValueStore { - fn put( + /// The type of the column. + type Column: StorageColumn; + + /// Inserts the `Value` into the storage. + fn put(&self, key: &[u8], column: Self::Column, value: Value) -> DatabaseResult<()> { + self.write(key, column, value.as_ref()).map(|_| ()) + } + + /// Put the `Value` into the storage and return the old value. + fn replace( &self, key: &[u8], - column: Column, + column: Self::Column, value: Value, - ) -> DatabaseResult>; - - fn write(&self, key: &[u8], column: Column, buf: &[u8]) -> DatabaseResult; + ) -> DatabaseResult> { + // FIXME: This is a race condition. We should use a transaction. + let old_value = self.get(key, column.clone())?; + self.put(key, column, value)?; + Ok(old_value) + } - fn replace( + /// Writes the `buf` into the storage and returns the number of written bytes. + fn write( &self, key: &[u8], - column: Column, + column: Self::Column, buf: &[u8], - ) -> DatabaseResult<(usize, Option)>; - - fn take(&self, key: &[u8], column: Column) -> DatabaseResult>; + ) -> DatabaseResult; + + /// Removes the value from the storage and returns it. + fn take(&self, key: &[u8], column: Self::Column) -> DatabaseResult> { + // FIXME: This is a race condition. We should use a transaction. + let old_value = self.get(key, column.clone())?; + self.delete(key, column)?; + Ok(old_value) + } - fn delete(&self, key: &[u8], column: Column) -> DatabaseResult>; + /// Removes the value from the storage. + fn delete(&self, key: &[u8], column: Self::Column) -> DatabaseResult<()>; - fn exists(&self, key: &[u8], column: Column) -> DatabaseResult; + /// Checks if the value exists in the storage. + fn exists(&self, key: &[u8], column: Self::Column) -> DatabaseResult { + Ok(self.size_of_value(key, column)?.is_some()) + } - fn size_of_value(&self, key: &[u8], column: Column) -> DatabaseResult>; + /// Returns the size of the value in the storage. + fn size_of_value( + &self, + key: &[u8], + column: Self::Column, + ) -> DatabaseResult> { + Ok(self.get(key, column.clone())?.map(|value| value.len())) + } - fn get(&self, key: &[u8], column: Column) -> DatabaseResult>; + /// Returns the value from the storage. + fn get(&self, key: &[u8], column: Self::Column) -> DatabaseResult>; + /// Reads the value from the storage into the `buf` and returns the number of read bytes. fn read( &self, key: &[u8], - column: Column, - buf: &mut [u8], - ) -> DatabaseResult>; - - fn read_alloc(&self, key: &[u8], column: Column) -> DatabaseResult>; + column: Self::Column, + mut buf: &mut [u8], + ) -> DatabaseResult> { + self.get(key, column.clone())? + .map(|value| { + let read = value.len(); + std::io::Write::write_all(&mut buf, value.as_ref()) + .map_err(|e| DatabaseError::Other(anyhow::anyhow!(e)))?; + Ok(read) + }) + .transpose() + } + /// Returns an iterator over the values in the storage. fn iter_all( &self, - column: Column, + column: Self::Column, prefix: Option<&[u8]>, start: Option<&[u8]>, direction: IterDirection, @@ -65,16 +114,15 @@ pub trait KeyValueStore { pub trait BatchOperations: KeyValueStore { fn batch_write( &self, - entries: &mut dyn Iterator, Column, WriteOperation)>, + entries: &mut dyn Iterator, Self::Column, WriteOperation)>, ) -> DatabaseResult<()> { for (key, column, op) in entries { match op { - // TODO: error handling WriteOperation::Insert(value) => { - let _ = self.put(&key, column, value); + self.put(&key, column, value)?; } WriteOperation::Remove => { - let _ = self.delete(&key, column); + self.delete(&key, column)?; } } } diff --git a/crates/fuel-core/src/state/in_memory/memory_store.rs b/crates/fuel-core/src/state/in_memory/memory_store.rs index 224929e0cb6..688ca0650f0 100644 --- a/crates/fuel-core/src/state/in_memory/memory_store.rs +++ b/crates/fuel-core/src/state/in_memory/memory_store.rs @@ -1,7 +1,6 @@ use crate::{ database::{ Column, - Error as DatabaseError, Result as DatabaseResult, }, state::{ @@ -100,15 +99,9 @@ impl MemoryStore { } impl KeyValueStore for MemoryStore { - fn get(&self, key: &[u8], column: Column) -> DatabaseResult> { - Ok(self.inner[column.as_usize()] - .lock() - .expect("poisoned") - .get(&key.to_vec()) - .cloned()) - } + type Column = Column; - fn put( + fn replace( &self, key: &[u8], column: Column, @@ -120,58 +113,27 @@ impl KeyValueStore for MemoryStore { .insert(key.to_vec(), value)) } - fn delete(&self, key: &[u8], column: Column) -> DatabaseResult> { - Ok(self.inner[column.as_usize()] + fn write(&self, key: &[u8], column: Column, buf: &[u8]) -> DatabaseResult { + let len = buf.len(); + self.inner[column.as_usize()] .lock() .expect("poisoned") - .remove(&key.to_vec())) + .insert(key.to_vec(), Arc::new(buf.to_vec())); + Ok(len) } - fn exists(&self, key: &[u8], column: Column) -> DatabaseResult { + fn take(&self, key: &[u8], column: Column) -> DatabaseResult> { Ok(self.inner[column.as_usize()] .lock() .expect("poisoned") - .contains_key(&key.to_vec())) - } - - fn iter_all( - &self, - column: Column, - prefix: Option<&[u8]>, - start: Option<&[u8]>, - direction: IterDirection, - ) -> BoxedIter { - self.iter_all(column, prefix, start, direction).into_boxed() + .remove(&key.to_vec())) } - fn size_of_value(&self, key: &[u8], column: Column) -> DatabaseResult> { - Ok(self.inner[column.as_usize()] - .lock() - .expect("poisoned") - .get(&key.to_vec()) - .map(|v| v.len())) + fn delete(&self, key: &[u8], column: Column) -> DatabaseResult<()> { + self.take(key, column).map(|_| ()) } - fn read( - &self, - key: &[u8], - column: Column, - mut buf: &mut [u8], - ) -> DatabaseResult> { - self.inner[column.as_usize()] - .lock() - .expect("poisoned") - .get(&key.to_vec()) - .map(|value| { - let read = value.len(); - std::io::Write::write_all(&mut buf, value.as_ref()) - .map_err(|e| DatabaseError::Other(anyhow::anyhow!(e)))?; - DatabaseResult::Ok(read) - }) - .transpose() - } - - fn read_alloc(&self, key: &[u8], column: Column) -> DatabaseResult> { + fn get(&self, key: &[u8], column: Column) -> DatabaseResult> { Ok(self.inner[column.as_usize()] .lock() .expect("poisoned") @@ -179,34 +141,14 @@ impl KeyValueStore for MemoryStore { .cloned()) } - fn write(&self, key: &[u8], column: Column, buf: &[u8]) -> DatabaseResult { - let len = buf.len(); - self.inner[column.as_usize()] - .lock() - .expect("poisoned") - .insert(key.to_vec(), Arc::new(buf.to_vec())); - Ok(len) - } - - fn replace( + fn iter_all( &self, - key: &[u8], column: Column, - buf: &[u8], - ) -> DatabaseResult<(usize, Option)> { - let len = buf.len(); - let existing = self.inner[column.as_usize()] - .lock() - .expect("poisoned") - .insert(key.to_vec(), Arc::new(buf.to_vec())); - Ok((len, existing)) - } - - fn take(&self, key: &[u8], column: Column) -> DatabaseResult> { - Ok(self.inner[column.as_usize()] - .lock() - .expect("poisoned") - .remove(&key.to_vec())) + prefix: Option<&[u8]>, + start: Option<&[u8]>, + direction: IterDirection, + ) -> BoxedIter { + self.iter_all(column, prefix, start, direction).into_boxed() } } @@ -246,10 +188,7 @@ mod tests { vec![(key.clone(), expected.clone())] ); - assert_eq!( - db.delete(&key, Column::Metadata).unwrap().unwrap(), - expected - ); + assert_eq!(db.take(&key, Column::Metadata).unwrap().unwrap(), expected); assert!(!db.exists(&key, Column::Metadata).unwrap()); } @@ -273,10 +212,7 @@ mod tests { vec![(key.clone(), expected.clone())] ); - assert_eq!( - db.delete(&key, Column::Metadata).unwrap().unwrap(), - expected - ); + assert_eq!(db.take(&key, Column::Metadata).unwrap().unwrap(), expected); assert!(!db.exists(&key, Column::Metadata).unwrap()); } @@ -300,10 +236,7 @@ mod tests { vec![(key.clone(), expected.clone())] ); - assert_eq!( - db.delete(&key, Column::Metadata).unwrap().unwrap(), - expected - ); + assert_eq!(db.take(&key, Column::Metadata).unwrap().unwrap(), expected); assert!(!db.exists(&key, Column::Metadata).unwrap()); } diff --git a/crates/fuel-core/src/state/in_memory/transaction.rs b/crates/fuel-core/src/state/in_memory/transaction.rs index 8c57edb87fe..8361c29e8ab 100644 --- a/crates/fuel-core/src/state/in_memory/transaction.rs +++ b/crates/fuel-core/src/state/in_memory/transaction.rs @@ -69,21 +69,9 @@ impl MemoryTransactionView { } impl KeyValueStore for MemoryTransactionView { - fn get(&self, key: &[u8], column: Column) -> DatabaseResult> { - // try to fetch data from View layer if any changes to the key - if self.changes[column.as_usize()] - .lock() - .expect("poisoned lock") - .contains_key(&key.to_vec()) - { - self.view_layer.get(key, column) - } else { - // fall-through to original data source - self.data_source.get(key, column) - } - } + type Column = Column; - fn put( + fn replace( &self, key: &[u8], column: Column, @@ -95,7 +83,7 @@ impl KeyValueStore for MemoryTransactionView { .expect("poisoned lock") .insert(key_vec, WriteOperation::Insert(value.clone())) .is_some(); - let res = self.view_layer.put(key, column, value); + let res = self.view_layer.replace(key, column, value); if contained_key { res } else { @@ -103,14 +91,24 @@ impl KeyValueStore for MemoryTransactionView { } } - fn delete(&self, key: &[u8], column: Column) -> DatabaseResult> { + fn write(&self, key: &[u8], column: Column, buf: &[u8]) -> DatabaseResult { let k = key.to_vec(); - let contained_key = self.changes[column.as_usize()] + self.changes[column.as_usize()] .lock() .expect("poisoned lock") - .insert(k, WriteOperation::Remove) - .is_some(); - let res = self.view_layer.delete(key, column); + .insert(k, WriteOperation::Insert(Arc::new(buf.to_vec()))); + self.view_layer.write(key, column, buf) + } + + fn take(&self, key: &[u8], column: Column) -> DatabaseResult> { + let k = key.to_vec(); + let contained_key = { + let mut lock = self.changes[column.as_usize()] + .lock() + .expect("poisoned lock"); + lock.insert(k, WriteOperation::Remove).is_some() + }; + let res = self.view_layer.take(key, column); if contained_key { res } else { @@ -118,16 +116,61 @@ impl KeyValueStore for MemoryTransactionView { } } - fn exists(&self, key: &[u8], column: Column) -> DatabaseResult { + fn delete(&self, key: &[u8], column: Column) -> DatabaseResult<()> { let k = key.to_vec(); + self.changes[column.as_usize()] + .lock() + .expect("poisoned lock") + .insert(k, WriteOperation::Remove); + self.view_layer.delete(key, column) + } + + fn size_of_value(&self, key: &[u8], column: Column) -> DatabaseResult> { + // try to fetch data from View layer if any changes to the key + if self.changes[column.as_usize()] + .lock() + .expect("poisoned lock") + .contains_key(&key.to_vec()) + { + self.view_layer.size_of_value(key, column) + } else { + // fall-through to original data source + // Note: The getting size from original database may be more performant than from `get` + self.data_source.size_of_value(key, column) + } + } + + fn get(&self, key: &[u8], column: Column) -> DatabaseResult> { + // try to fetch data from View layer if any changes to the key + if self.changes[column.as_usize()] + .lock() + .expect("poisoned lock") + .contains_key(&key.to_vec()) + { + self.view_layer.get(key, column) + } else { + // fall-through to original data source + self.data_source.get(key, column) + } + } + + fn read( + &self, + key: &[u8], + column: Column, + buf: &mut [u8], + ) -> DatabaseResult> { + // try to fetch data from View layer if any changes to the key if self.changes[column.as_usize()] .lock() .expect("poisoned lock") - .contains_key(&k) + .contains_key(&key.to_vec()) { - self.view_layer.exists(key, column) + self.view_layer.read(key, column, buf) } else { - self.data_source.exists(key, column) + // fall-through to original data source + // Note: The read from original database may be more performant than from `get` + self.data_source.read(key, column, buf) } } @@ -186,100 +229,6 @@ impl KeyValueStore for MemoryTransactionView { } }).into_boxed() } - - fn size_of_value(&self, key: &[u8], column: Column) -> DatabaseResult> { - // try to fetch data from View layer if any changes to the key - if self.changes[column.as_usize()] - .lock() - .expect("poisoned lock") - .contains_key(&key.to_vec()) - { - self.view_layer.size_of_value(key, column) - } else { - // fall-through to original data source - self.data_source.size_of_value(key, column) - } - } - - fn read( - &self, - key: &[u8], - column: Column, - buf: &mut [u8], - ) -> DatabaseResult> { - // try to fetch data from View layer if any changes to the key - if self.changes[column.as_usize()] - .lock() - .expect("poisoned lock") - .contains_key(&key.to_vec()) - { - self.view_layer.read(key, column, buf) - } else { - // fall-through to original data source - self.data_source.read(key, column, buf) - } - } - - fn read_alloc(&self, key: &[u8], column: Column) -> DatabaseResult> { - if self.changes[column.as_usize()] - .lock() - .expect("poisoned lock") - .contains_key(&key.to_vec()) - { - self.view_layer.read_alloc(key, column) - } else { - // fall-through to original data source - self.data_source.read_alloc(key, column) - } - } - - fn write(&self, key: &[u8], column: Column, buf: &[u8]) -> DatabaseResult { - let k = key.to_vec(); - self.changes[column.as_usize()] - .lock() - .expect("poisoned lock") - .insert(k, WriteOperation::Insert(Arc::new(buf.to_vec()))); - self.view_layer.write(key, column, buf) - } - - fn replace( - &self, - key: &[u8], - column: Column, - buf: &[u8], - ) -> DatabaseResult<(usize, Option)> { - let k = key.to_vec(); - let contained_key = { - let mut lock = self.changes[column.as_usize()] - .lock() - .expect("poisoned lock"); - lock.insert(k, WriteOperation::Insert(Arc::new(buf.to_vec()))) - .is_some() - }; - let res = self.view_layer.replace(key, column, buf)?; - let num_written = res.0; - if contained_key { - Ok(res) - } else { - Ok((num_written, self.data_source.read_alloc(key, column)?)) - } - } - - fn take(&self, key: &[u8], column: Column) -> DatabaseResult> { - let k = key.to_vec(); - let contained_key = { - let mut lock = self.changes[column.as_usize()] - .lock() - .expect("poisoned lock"); - lock.insert(k, WriteOperation::Remove).is_some() - }; - let res = self.view_layer.take(key, column); - if contained_key { - res - } else { - self.data_source.read_alloc(key, column) - } - } } impl BatchOperations for MemoryTransactionView {} @@ -352,10 +301,11 @@ mod tests { let store = Arc::new(MemoryStore::default()); let view = MemoryTransactionView::new(store); let expected = Arc::new(vec![1, 2, 3]); - let _ = view.put(&[0xA, 0xB, 0xC], Column::Metadata, expected.clone()); + view.put(&[0xA, 0xB, 0xC], Column::Metadata, expected.clone()) + .unwrap(); // test let ret = view - .put(&[0xA, 0xB, 0xC], Column::Metadata, Arc::new(vec![2, 4, 6])) + .replace(&[0xA, 0xB, 0xC], Column::Metadata, Arc::new(vec![2, 4, 6])) .unwrap(); // verify assert_eq!(ret, Some(expected)) @@ -370,7 +320,7 @@ mod tests { let expected = Arc::new(vec![1, 2, 3]); view.put(&key, Column::Metadata, expected.clone()).unwrap(); // test - let ret = view.delete(&key, Column::Metadata).unwrap(); + let ret = view.take(&key, Column::Metadata).unwrap(); let get = view.get(&key, Column::Metadata).unwrap(); // verify assert_eq!(ret, Some(expected)); @@ -386,7 +336,7 @@ mod tests { store.put(&key, Column::Metadata, expected.clone()).unwrap(); let view = MemoryTransactionView::new(store); // test - let ret = view.delete(&key, Column::Metadata).unwrap(); + let ret = view.take(&key, Column::Metadata).unwrap(); let get = view.get(&key, Column::Metadata).unwrap(); // verify assert_eq!(ret, Some(expected)); @@ -402,8 +352,8 @@ mod tests { store.put(&key, Column::Metadata, expected.clone()).unwrap(); let view = MemoryTransactionView::new(store); // test - let ret1 = view.delete(&key, Column::Metadata).unwrap(); - let ret2 = view.delete(&key, Column::Metadata).unwrap(); + let ret1 = view.take(&key, Column::Metadata).unwrap(); + let ret2 = view.take(&key, Column::Metadata).unwrap(); let get = view.get(&key, Column::Metadata).unwrap(); // verify assert_eq!(ret1, Some(expected)); @@ -578,8 +528,8 @@ mod tests { let view = MemoryTransactionView::new(store); // test - let _ = view.delete(&[0], Column::Metadata).unwrap(); - let _ = view.delete(&[6], Column::Metadata).unwrap(); + view.delete(&[0], Column::Metadata).unwrap(); + view.delete(&[6], Column::Metadata).unwrap(); let ret: Vec<_> = view .iter_all(Column::Metadata, None, None, IterDirection::Forward) @@ -611,10 +561,7 @@ mod tests { vec![(key.clone(), expected.clone())] ); - assert_eq!( - db.delete(&key, Column::Metadata).unwrap().unwrap(), - expected - ); + assert_eq!(db.take(&key, Column::Metadata).unwrap().unwrap(), expected); assert!(!db.exists(&key, Column::Metadata).unwrap()); @@ -653,10 +600,7 @@ mod tests { vec![(key.clone(), expected.clone())] ); - assert_eq!( - db.delete(&key, Column::Metadata).unwrap().unwrap(), - expected - ); + assert_eq!(db.take(&key, Column::Metadata).unwrap().unwrap(), expected); assert!(!db.exists(&key, Column::Metadata).unwrap()); @@ -695,10 +639,7 @@ mod tests { vec![(key.clone(), expected.clone())] ); - assert_eq!( - db.delete(&key, Column::Metadata).unwrap().unwrap(), - expected - ); + assert_eq!(db.take(&key, Column::Metadata).unwrap().unwrap(), expected); assert!(!db.exists(&key, Column::Metadata).unwrap()); diff --git a/crates/fuel-core/src/state/rocks_db.rs b/crates/fuel-core/src/state/rocks_db.rs index c6c6ca36190..54521524a03 100644 --- a/crates/fuel-core/src/state/rocks_db.rs +++ b/crates/fuel-core/src/state/rocks_db.rs @@ -307,55 +307,76 @@ impl RocksDb { } impl KeyValueStore for RocksDb { + type Column = Column; + + fn write(&self, key: &[u8], column: Column, buf: &[u8]) -> DatabaseResult { + let r = buf.len(); + self.db + .put_cf(&self.cf(column), key, buf) + .map_err(|e| DatabaseError::Other(e.into()))?; + + database_metrics().write_meter.inc(); + database_metrics().bytes_written.observe(r as f64); + + Ok(r) + } + + fn delete(&self, key: &[u8], column: Column) -> DatabaseResult<()> { + self.db + .delete_cf(&self.cf(column), key) + .map_err(|e| DatabaseError::Other(e.into())) + } + + fn size_of_value(&self, key: &[u8], column: Column) -> DatabaseResult> { + database_metrics().read_meter.inc(); + + Ok(self + .db + .get_pinned_cf(&self.cf(column), key) + .map_err(|e| DatabaseError::Other(e.into()))? + .map(|value| value.len())) + } + fn get(&self, key: &[u8], column: Column) -> DatabaseResult> { database_metrics().read_meter.inc(); + let value = self .db .get_cf(&self.cf(column), key) - .map_err(|e| DatabaseError::Other(e.into())); + .map_err(|e| DatabaseError::Other(e.into()))?; - if let Ok(Some(value)) = &value { + if let Some(value) = &value { database_metrics().bytes_read.observe(value.len() as f64); } - value.map(|value| value.map(Arc::new)) + Ok(value.map(Arc::new)) } - fn put( + fn read( &self, key: &[u8], column: Column, - value: Value, - ) -> DatabaseResult> { - database_metrics().write_meter.inc(); - database_metrics().bytes_written.observe(value.len() as f64); + mut buf: &mut [u8], + ) -> DatabaseResult> { + database_metrics().read_meter.inc(); - // FIXME: This is a race condition. We should use a transaction. - let prev = self.get(key, column)?; - // FIXME: This is a race condition. We should use a transaction. - self.db - .put_cf(&self.cf(column), key, value.as_ref()) - .map_err(|e| DatabaseError::Other(e.into())) - .map(|_| prev) - } + let r = self + .db + .get_pinned_cf(&self.cf(column), key) + .map_err(|e| DatabaseError::Other(e.into()))? + .map(|value| { + let read = value.len(); + std::io::Write::write_all(&mut buf, value.as_ref()) + .map_err(|e| DatabaseError::Other(anyhow::anyhow!(e)))?; + DatabaseResult::Ok(read) + }) + .transpose()?; - fn delete(&self, key: &[u8], column: Column) -> DatabaseResult> { - // FIXME: This is a race condition. We should use a transaction. - let prev = self.get(key, column)?; - // FIXME: This is a race condition. We should use a transaction. - self.db - .delete_cf(&self.cf(column), key) - .map_err(|e| DatabaseError::Other(e.into())) - .map(|_| prev) - } + if let Some(r) = &r { + database_metrics().bytes_read.observe(*r as f64); + } - fn exists(&self, key: &[u8], column: Column) -> DatabaseResult { - // use pinnable mem ref to avoid memcpy of values associated with the key - // since we're just checking for the existence of the key - self.db - .get_pinned_cf(&self.cf(column), key) - .map_err(|e| DatabaseError::Other(e.into())) - .map(|v| v.is_some()) + Ok(r) } fn iter_all( @@ -423,95 +444,6 @@ impl KeyValueStore for RocksDb { } } } - - fn size_of_value(&self, key: &[u8], column: Column) -> DatabaseResult> { - database_metrics().read_meter.inc(); - - Ok(self - .db - .get_pinned_cf(&self.cf(column), key) - .map_err(|e| DatabaseError::Other(e.into()))? - .map(|value| value.len())) - } - - fn read( - &self, - key: &[u8], - column: Column, - mut buf: &mut [u8], - ) -> DatabaseResult> { - database_metrics().read_meter.inc(); - - let r = self - .db - .get_pinned_cf(&self.cf(column), key) - .map_err(|e| DatabaseError::Other(e.into()))? - .map(|value| { - let read = value.len(); - std::io::Write::write_all(&mut buf, value.as_ref()) - .map_err(|e| DatabaseError::Other(anyhow::anyhow!(e)))?; - DatabaseResult::Ok(read) - }) - .transpose()?; - - if let Some(r) = &r { - database_metrics().bytes_read.observe(*r as f64); - } - - Ok(r) - } - - fn write(&self, key: &[u8], column: Column, buf: &[u8]) -> DatabaseResult { - database_metrics().write_meter.inc(); - database_metrics().bytes_written.observe(buf.len() as f64); - - let r = buf.len(); - self.db - .put_cf(&self.cf(column), key, buf) - .map_err(|e| DatabaseError::Other(e.into()))?; - - Ok(r) - } - - fn read_alloc(&self, key: &[u8], column: Column) -> DatabaseResult> { - database_metrics().read_meter.inc(); - - let r = self - .db - .get_pinned_cf(&self.cf(column), key) - .map_err(|e| DatabaseError::Other(e.into()))? - .map(|value| value.to_vec()); - - if let Some(r) = &r { - database_metrics().bytes_read.observe(r.len() as f64); - } - - Ok(r.map(Arc::new)) - } - - fn replace( - &self, - key: &[u8], - column: Column, - buf: &[u8], - ) -> DatabaseResult<(usize, Option)> { - // FIXME: This is a race condition. We should use a transaction. - let existing = self.read_alloc(key, column)?; - // FIXME: This is a race condition. We should use a transaction. - let r = self.write(key, column, buf)?; - - Ok((r, existing)) - } - - fn take(&self, key: &[u8], column: Column) -> DatabaseResult> { - // FIXME: This is a race condition. We should use a transaction. - let prev = self.read_alloc(key, column)?; - // FIXME: This is a race condition. We should use a transaction. - self.db - .delete_cf(&self.cf(column), key) - .map_err(|e| DatabaseError::Other(e.into())) - .map(|_| prev) - } } impl BatchOperations for RocksDb { @@ -609,7 +541,7 @@ mod tests { let expected = Arc::new(vec![1, 2, 3]); db.put(&key, Column::Metadata, expected.clone()).unwrap(); let prev = db - .put(&key, Column::Metadata, Arc::new(vec![2, 4, 6])) + .replace(&key, Column::Metadata, Arc::new(vec![2, 4, 6])) .unwrap(); assert_eq!(prev, Some(expected)); @@ -687,10 +619,7 @@ mod tests { (key.clone(), expected.clone()) ); - assert_eq!( - db.delete(&key, Column::Metadata).unwrap().unwrap(), - expected - ); + assert_eq!(db.take(&key, Column::Metadata).unwrap().unwrap(), expected); assert!(!db.exists(&key, Column::Metadata).unwrap()); } @@ -714,10 +643,7 @@ mod tests { (key.clone(), expected.clone()) ); - assert_eq!( - db.delete(&key, Column::Metadata).unwrap().unwrap(), - expected - ); + assert_eq!(db.take(&key, Column::Metadata).unwrap().unwrap(), expected); assert!(!db.exists(&key, Column::Metadata).unwrap()); } @@ -741,10 +667,7 @@ mod tests { (key.clone(), expected.clone()) ); - assert_eq!( - db.delete(&key, Column::Metadata).unwrap().unwrap(), - expected - ); + assert_eq!(db.take(&key, Column::Metadata).unwrap().unwrap(), expected); assert!(!db.exists(&key, Column::Metadata).unwrap()); }