diff --git a/benches/value_log.rs b/benches/value_log.rs index ba9b75e..54c9649 100644 --- a/benches/value_log.rs +++ b/benches/value_log.rs @@ -2,9 +2,22 @@ use criterion::{criterion_group, criterion_main, Criterion}; use rand::{Rng, RngCore}; use std::sync::Arc; use value_log::{ - BlobCache, Config, IndexReader, IndexWriter, MockIndex, MockIndexWriter, ValueLog, + BlobCache, Compressor, Config, IndexReader, IndexWriter, MockIndex, MockIndexWriter, ValueLog, }; +#[derive(Clone, Default)] +struct NoCompressor; + +impl Compressor for NoCompressor { + fn compress(&self, bytes: &[u8]) -> value_log::Result> { + Ok(bytes.into()) + } + + fn decompress(&self, bytes: &[u8]) -> value_log::Result> { + Ok(bytes.into()) + } +} + fn prefetch(c: &mut Criterion) { let mut group = c.benchmark_group("prefetch range"); @@ -17,7 +30,7 @@ fn prefetch(c: &mut Criterion) { let folder = tempfile::tempdir().unwrap(); let vl_path = folder.path(); - let value_log = ValueLog::open(vl_path, Config::default()).unwrap(); + let value_log = ValueLog::open(vl_path, Config::::default()).unwrap(); let mut writer = value_log.get_writer().unwrap(); @@ -106,7 +119,8 @@ fn load_value(c: &mut Criterion) { let value_log = ValueLog::open( vl_path, - Config::default().blob_cache(Arc::new(BlobCache::with_capacity_bytes(0))), + Config::::default() + .blob_cache(Arc::new(BlobCache::with_capacity_bytes(0))), ) .unwrap(); @@ -154,7 +168,7 @@ fn load_value(c: &mut Criterion) { let value_log = ValueLog::open( vl_path, - Config::default() + Config::::default() .blob_cache(Arc::new(BlobCache::with_capacity_bytes(64 * 1_024 * 1_024))), ) .unwrap(); diff --git a/src/compression.rs b/src/compression.rs index 121ae29..c263661 100644 --- a/src/compression.rs +++ b/src/compression.rs @@ -1,11 +1,3 @@ -/// Compression error -#[derive(Debug)] -pub struct CompressError(pub String); - -/// Decompression error -#[derive(Debug)] -pub struct DecompressError(pub String); - /// Generic compression trait pub trait Compressor { /// Compresses a value @@ -13,12 +5,12 @@ pub trait Compressor { /// # Errors /// /// Will return `Err` if an IO error occurs. - fn compress(&self, bytes: &[u8]) -> Result, CompressError>; + fn compress(&self, bytes: &[u8]) -> crate::Result>; /// Decompresses a value /// /// # Errors /// /// Will return `Err` if an IO error occurs. - fn decompress(&self, bytes: &[u8]) -> Result, DecompressError>; + fn decompress(&self, bytes: &[u8]) -> crate::Result>; } diff --git a/src/config.rs b/src/config.rs index 622df32..ec05e9a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,21 +1,8 @@ use crate::{blob_cache::BlobCache, compression::Compressor}; use std::sync::Arc; -/// No compression -pub struct NoCompressor; - -impl Compressor for NoCompressor { - fn compress(&self, bytes: &[u8]) -> Result, crate::compression::CompressError> { - Ok(bytes.into()) - } - - fn decompress(&self, bytes: &[u8]) -> Result, crate::compression::DecompressError> { - Ok(bytes.into()) - } -} - /// Value log configuration -pub struct Config { +pub struct Config { /// Target size of vLog segments pub(crate) segment_size_bytes: u64, @@ -23,31 +10,20 @@ pub struct Config { pub(crate) blob_cache: Arc, /// Compression to use - pub(crate) compression: Arc, + pub(crate) compression: C, } -impl Default for Config { +impl Default for Config { fn default() -> Self { Self { segment_size_bytes: 256 * 1_024 * 1_024, blob_cache: Arc::new(BlobCache::with_capacity_bytes(16 * 1_024 * 1_024)), - compression: Arc::new(NoCompressor), + compression: C::default(), } } } -impl Config { - /// Sets the compression type to use. - /// - /// Using compression is recommended. - /// - /// Default = none - #[must_use] - pub fn use_compression(mut self, compressor: Arc) -> Self { - self.compression = compressor; - self - } - +impl Config { /// Sets the blob cache. /// /// You can create a global [`BlobCache`] and share it between multiple diff --git a/src/error.rs b/src/error.rs index 0885545..9f8e1e7 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,5 +1,4 @@ use crate::{ - compression::{CompressError, DecompressError}, serde::{DeserializeError, SerializeError}, version::Version, }; @@ -20,10 +19,10 @@ pub enum Error { Deserialize(DeserializeError), /// Compression failed - Compress(CompressError), + Compress, /// Decompression failed - Decompress(DecompressError), + Decompress, // TODO: // /// Checksum check failed // ChecksumMismatch, @@ -55,17 +54,5 @@ impl From for Error { } } -impl From for Error { - fn from(value: CompressError) -> Self { - Self::Compress(value) - } -} - -impl From for Error { - fn from(value: DecompressError) -> Self { - Self::Decompress(value) - } -} - /// Value log result pub type Result = std::result::Result; diff --git a/src/lib.rs b/src/lib.rs index dcbb3ac..ddcfe6d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,8 +44,20 @@ //! # let index = MockIndex::default(); //! # let path = folder.path(); //! # +//! # #[derive(Clone, Default)] +//! # struct MyCompressor; +//! # +//! # impl value_log::Compressor for MyCompressor { +//! # fn compress(&self, bytes: &[u8]) -> value_log::Result> { +//! # Ok(bytes.into()) +//! # } +//! # +//! # fn decompress(&self, bytes: &[u8]) -> value_log::Result> { +//! # Ok(bytes.into()) +//! # } +//! # } //! // Open or recover value log from disk -//! let value_log = ValueLog::open(path, Config::default())?; +//! let value_log = ValueLog::open(path, Config::::default())?; //! //! // Write some data //! # let mut index_writer = MockIndexWriter(index.clone()); @@ -109,7 +121,7 @@ pub(crate) type HashMap = ahash::HashMap; pub use { blob_cache::BlobCache, - compression::{CompressError, Compressor, DecompressError}, + compression::Compressor, config::Config, error::{Error, Result}, handle::ValueHandle, @@ -120,9 +132,6 @@ pub use { version::Version, }; -#[doc(hidden)] -pub use config::NoCompressor; - #[doc(hidden)] pub use segment::{reader::Reader as SegmentReader, Segment}; diff --git a/src/manifest.rs b/src/manifest.rs index 04afbc5..a5e88ee 100644 --- a/src/manifest.rs +++ b/src/manifest.rs @@ -2,11 +2,12 @@ use crate::{ id::SegmentId, key_range::KeyRange, segment::{gc_stats::GcStats, meta::Metadata, trailer::SegmentFileTrailer}, - HashMap, Segment, SegmentWriter as MultiWriter, + Compressor, HashMap, Segment, SegmentWriter as MultiWriter, }; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use std::{ io::{Cursor, Write}, + marker::PhantomData, path::{Path, PathBuf}, sync::{Arc, RwLock}, }; @@ -36,24 +37,24 @@ fn rewrite_atomic>(path: P, content: &[u8]) -> std::io::Result<() } #[allow(clippy::module_name_repetitions)] -pub struct SegmentManifestInner { +pub struct SegmentManifestInner { path: PathBuf, - pub segments: RwLock>>, + pub segments: RwLock>>>, } #[allow(clippy::module_name_repetitions)] #[derive(Clone)] -pub struct SegmentManifest(Arc); +pub struct SegmentManifest(Arc>); -impl std::ops::Deref for SegmentManifest { - type Target = SegmentManifestInner; +impl std::ops::Deref for SegmentManifest { + type Target = SegmentManifestInner; fn deref(&self) -> &Self::Target { &self.0 } } -impl SegmentManifest { +impl SegmentManifest { fn remove_unfinished_segments>( folder: P, registered_ids: &[u64], @@ -129,6 +130,7 @@ impl SegmentManifest { path, meta: trailer.metadata, gc_stats: GcStats::default(), + _phantom: PhantomData, }), ); } @@ -155,7 +157,7 @@ impl SegmentManifest { } /// Modifies the level manifest atomically. - pub(crate) fn atomic_swap>)>( + pub(crate) fn atomic_swap>>)>( &self, f: F, ) -> crate::Result<()> { @@ -189,7 +191,7 @@ impl SegmentManifest { }) } - pub fn register(&self, writer: MultiWriter) -> crate::Result<()> { + pub fn register(&self, writer: MultiWriter) -> crate::Result<()> { let writers = writer.finish()?; self.atomic_swap(move |recipe| { @@ -235,6 +237,7 @@ impl SegmentManifest { )), }, gc_stats: GcStats::default(), + _phantom: PhantomData, }), ); @@ -272,7 +275,7 @@ impl SegmentManifest { /// Gets a segment #[must_use] - pub fn get_segment(&self, id: SegmentId) -> Option> { + pub fn get_segment(&self, id: SegmentId) -> Option>> { self.segments .read() .expect("lock is poisoned") @@ -294,7 +297,7 @@ impl SegmentManifest { /// Lists all segments #[must_use] - pub fn list_segments(&self) -> Vec> { + pub fn list_segments(&self) -> Vec>> { self.segments .read() .expect("lock is poisoned") diff --git a/src/scanner.rs b/src/scanner.rs index 7eb526b..b3823b9 100644 --- a/src/scanner.rs +++ b/src/scanner.rs @@ -1,4 +1,4 @@ -use crate::{id::SegmentId, ValueHandle, ValueLog}; +use crate::{id::SegmentId, Compressor, ValueHandle, ValueLog}; use std::{collections::BTreeMap, sync::MutexGuard}; #[derive(Debug, Default)] @@ -19,7 +19,7 @@ pub struct Scanner<'a, I: Iterator>> } impl<'a, I: Iterator>> Scanner<'a, I> { - pub fn new(vlog: &'a ValueLog, iter: I) -> Self { + pub fn new(vlog: &'a ValueLog, iter: I) -> Self { Self { iter, lock_guard: vlog.rollover_guard.lock().expect("lock is poisoned"), diff --git a/src/segment/merge.rs b/src/segment/merge.rs index 4a51758..621aad4 100644 --- a/src/segment/merge.rs +++ b/src/segment/merge.rs @@ -1,4 +1,4 @@ -use crate::{id::SegmentId, value::UserKey, SegmentReader, UserValue}; +use crate::{id::SegmentId, value::UserKey, Compressor, SegmentReader, UserValue}; use std::cmp::Reverse; // TODO: replace with MinHeap... @@ -36,14 +36,14 @@ impl Ord for IteratorValue { /// Interleaves multiple segment readers into a single, sorted stream #[allow(clippy::module_name_repetitions)] -pub struct MergeReader { - readers: Vec, +pub struct MergeReader { + readers: Vec>, heap: MinMaxHeap, } -impl MergeReader { +impl MergeReader { /// Initializes a new merging reader - pub fn new(readers: Vec) -> Self { + pub fn new(readers: Vec>) -> Self { Self { readers, heap: MinMaxHeap::new(), @@ -78,7 +78,7 @@ impl MergeReader { } } -impl Iterator for MergeReader { +impl Iterator for MergeReader { type Item = crate::Result<(UserKey, UserValue, SegmentId, u64)>; fn next(&mut self) -> Option { diff --git a/src/segment/mod.rs b/src/segment/mod.rs index 91f48a2..29becc9 100644 --- a/src/segment/mod.rs +++ b/src/segment/mod.rs @@ -6,15 +6,15 @@ pub mod reader; pub mod trailer; pub mod writer; -use crate::id::SegmentId; +use crate::{id::SegmentId, Compressor}; use gc_stats::GcStats; use meta::Metadata; -use std::path::PathBuf; +use std::{marker::PhantomData, path::PathBuf}; /// A disk segment is an immutable, sorted, contiguous file /// that contains key-value pairs. #[derive(Debug)] -pub struct Segment { +pub struct Segment { /// Segment ID pub id: SegmentId, @@ -26,15 +26,17 @@ pub struct Segment { /// Runtime stats for garbage collection pub gc_stats: GcStats, + + pub(crate) _phantom: PhantomData, } -impl Segment { +impl Segment { /// Returns a scanner that can iterate through the segment. /// /// # Errors /// /// Will return `Err` if an IO error occurs. - pub fn scan(&self) -> crate::Result { + pub fn scan(&self) -> crate::Result> { reader::Reader::new(&self.path, self.id) } diff --git a/src/segment/multi_writer.rs b/src/segment/multi_writer.rs index f19ac83..c2744c0 100644 --- a/src/segment/multi_writer.rs +++ b/src/segment/multi_writer.rs @@ -4,24 +4,21 @@ use crate::{ id::{IdGenerator, SegmentId}, ValueHandle, }; -use std::{ - path::{Path, PathBuf}, - sync::Arc, -}; +use std::path::{Path, PathBuf}; /// Segment writer, may write multiple segments -pub struct MultiWriter { +pub struct MultiWriter { folder: PathBuf, target_size: u64, - writers: Vec, + writers: Vec>, id_generator: IdGenerator, - compression: Option>, + compression: Option, } -impl MultiWriter { +impl MultiWriter { /// Initializes a new segment writer. /// /// # Errors @@ -52,7 +49,7 @@ impl MultiWriter { /// Sets the compression method #[must_use] #[doc(hidden)] - pub fn use_compression(mut self, compressor: Arc) -> Self { + pub fn use_compression(mut self, compressor: C) -> Self { self.compression = Some(compressor.clone()); self.get_active_writer_mut().compression = Some(compressor); self @@ -60,13 +57,13 @@ impl MultiWriter { #[doc(hidden)] #[must_use] - pub fn get_active_writer(&self) -> &Writer { + pub fn get_active_writer(&self) -> &Writer { // NOTE: initialized in constructor #[allow(clippy::expect_used)] self.writers.last().expect("should exist") } - fn get_active_writer_mut(&mut self) -> &mut Writer { + fn get_active_writer_mut(&mut self) -> &mut Writer { // NOTE: initialized in constructor #[allow(clippy::expect_used)] self.writers.last_mut().expect("should exist") @@ -101,11 +98,8 @@ impl MultiWriter { let new_segment_id = self.id_generator.next(); let segment_path = self.folder.join(new_segment_id.to_string()); - let mut new_writer = Writer::new(segment_path, new_segment_id)?; - - if let Some(compressor) = &self.compression { - new_writer = new_writer.use_compression(compressor.clone()); - } + let new_writer = + Writer::new(segment_path, new_segment_id)?.use_compression(self.compression.clone()); self.writers.push(new_writer); @@ -140,7 +134,7 @@ impl MultiWriter { Ok(bytes_written) } - pub(crate) fn finish(mut self) -> crate::Result> { + pub(crate) fn finish(mut self) -> crate::Result>> { let writer = self.get_active_writer_mut(); if writer.item_count > 0 { diff --git a/src/segment/reader.rs b/src/segment/reader.rs index bd971ea..8e1f66e 100644 --- a/src/segment/reader.rs +++ b/src/segment/reader.rs @@ -5,18 +5,17 @@ use std::{ fs::File, io::{BufReader, Read, Seek}, path::Path, - sync::Arc, }; /// Reads through a segment in order. -pub struct Reader { +pub struct Reader { pub(crate) segment_id: SegmentId, inner: BufReader, is_terminated: bool, - compression: Option>, + compression: Option, } -impl Reader { +impl Reader { /// Initializes a new segment reader. /// /// # Errors @@ -43,13 +42,13 @@ impl Reader { } } - pub(crate) fn use_compression(mut self, compressor: Arc) -> Self { + pub(crate) fn use_compression(mut self, compressor: C) -> Self { self.compression = Some(compressor); self } } -impl Iterator for Reader { +impl Iterator for Reader { type Item = crate::Result<(UserKey, UserValue, u64)>; fn next(&mut self) -> Option { @@ -119,7 +118,7 @@ impl Iterator for Reader { let val = match &self.compression { Some(compressor) => match compressor.decompress(&val) { Ok(val) => val, - Err(e) => return Some(Err(e.into())), + Err(e) => return Some(Err(e)), }, None => val, }; diff --git a/src/segment/writer.rs b/src/segment/writer.rs index 9352a28..db896a6 100644 --- a/src/segment/writer.rs +++ b/src/segment/writer.rs @@ -8,13 +8,12 @@ use std::{ fs::File, io::{BufWriter, Seek, Write}, path::{Path, PathBuf}, - sync::Arc, }; pub const BLOB_HEADER_MAGIC: &[u8] = &[b'V', b'L', b'G', b'B', b'L', b'O', b'B', b'1']; /// Segment writer -pub struct Writer { +pub struct Writer { pub path: PathBuf, pub(crate) segment_id: SegmentId, @@ -30,10 +29,10 @@ pub struct Writer { pub(crate) first_key: Option, pub(crate) last_key: Option, - pub(crate) compression: Option>, + pub(crate) compression: Option, } -impl Writer { +impl Writer { /// Initializes a new segment writer. /// /// # Errors @@ -61,8 +60,8 @@ impl Writer { }) } - pub fn use_compression(mut self, compressor: Arc) -> Self { - self.compression = Some(compressor); + pub fn use_compression(mut self, compressor: Option) -> Self { + self.compression = compressor; self } diff --git a/src/value_log.rs b/src/value_log.rs index e2f12d1..6976adf 100644 --- a/src/value_log.rs +++ b/src/value_log.rs @@ -8,7 +8,7 @@ use crate::{ segment::merge::MergeReader, value::UserValue, version::Version, - Config, IndexReader, SegmentReader, SegmentWriter, ValueHandle, + Compressor, Config, IndexReader, SegmentReader, SegmentWriter, ValueHandle, }; use std::{ fs::File, @@ -29,10 +29,10 @@ pub fn get_next_vlog_id() -> ValueLogId { /// A disk-resident value log #[derive(Clone)] -pub struct ValueLog(Arc); +pub struct ValueLog(Arc>); -impl std::ops::Deref for ValueLog { - type Target = ValueLogInner; +impl std::ops::Deref for ValueLog { + type Target = ValueLogInner; fn deref(&self) -> &Self::Target { &self.0 @@ -40,7 +40,7 @@ impl std::ops::Deref for ValueLog { } #[allow(clippy::module_name_repetitions)] -pub struct ValueLogInner { +pub struct ValueLogInner { /// Unique value log ID id: u64, @@ -48,14 +48,14 @@ pub struct ValueLogInner { pub path: PathBuf, /// Value log configuration - config: Config, + config: Config, /// In-memory blob cache blob_cache: Arc, /// Segment manifest #[doc(hidden)] - pub manifest: SegmentManifest, + pub manifest: SegmentManifest, /// Generator to get next segment ID id_generator: IdGenerator, @@ -65,7 +65,7 @@ pub struct ValueLogInner { pub(crate) rollover_guard: Mutex<()>, } -impl ValueLog { +impl ValueLog { /// Creates or recovers a value log in the given directory. /// /// # Errors @@ -73,7 +73,7 @@ impl ValueLog { /// Will return `Err` if an IO error occurs. pub fn open>( path: P, // TODO: move path into config? - config: Config, + config: Config, ) -> crate::Result { let path = path.into(); @@ -106,7 +106,7 @@ impl ValueLog { } /// Creates a new empty value log in a directory. - pub(crate) fn create_new>(path: P, config: Config) -> crate::Result { + pub(crate) fn create_new>(path: P, config: Config) -> crate::Result { let path = absolute_path(path.into()); log::trace!("Creating value-log at {}", path.display()); @@ -149,7 +149,7 @@ impl ValueLog { }))) } - pub(crate) fn recover>(path: P, config: Config) -> crate::Result { + pub(crate) fn recover>(path: P, config: Config) -> crate::Result { let path = path.into(); log::info!("Recovering vLog at {}", path.display()); @@ -193,7 +193,7 @@ impl ValueLog { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub fn register_writer(&self, writer: SegmentWriter) -> crate::Result<()> { + pub fn register_writer(&self, writer: SegmentWriter) -> crate::Result<()> { let _lock = self.rollover_guard.lock().expect("lock is poisoned"); self.manifest.register(writer)?; Ok(()) @@ -269,7 +269,7 @@ impl ValueLog { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub fn get_writer(&self) -> crate::Result { + pub fn get_writer(&self) -> crate::Result> { Ok(SegmentWriter::new( self.id_generator.clone(), self.config.segment_size_bytes, @@ -477,7 +477,7 @@ impl ValueLog { } #[doc(hidden)] - pub fn get_reader(&self) -> crate::Result { + pub fn get_reader(&self) -> crate::Result> { let segments = self.manifest.segments.read().expect("lock is poisoned"); let readers = segments diff --git a/tests/accidental_drop_rc.rs b/tests/accidental_drop_rc.rs index 95c5420..5b25e20 100644 --- a/tests/accidental_drop_rc.rs +++ b/tests/accidental_drop_rc.rs @@ -5,7 +5,20 @@ // count of 0. Then it would be dropped even though it was just created. use test_log::test; -use value_log::{Config, IndexWriter, MockIndex, MockIndexWriter, ValueLog}; +use value_log::{Compressor, Config, IndexWriter, MockIndex, MockIndexWriter, ValueLog}; + +#[derive(Clone, Default)] +struct NoCompressor; + +impl Compressor for NoCompressor { + fn compress(&self, bytes: &[u8]) -> value_log::Result> { + Ok(bytes.into()) + } + + fn decompress(&self, bytes: &[u8]) -> value_log::Result> { + Ok(bytes.into()) + } +} #[test] fn accidental_drop_rc() -> value_log::Result<()> { @@ -14,7 +27,7 @@ fn accidental_drop_rc() -> value_log::Result<()> { let index = MockIndex::default(); - let value_log = ValueLog::open(vl_path, Config::default())?; + let value_log = ValueLog::open(vl_path, Config::::default())?; for key in ["a", "b"] { let value = &key; diff --git a/tests/basic_gc.rs b/tests/basic_gc.rs index 34c1d49..73877a7 100644 --- a/tests/basic_gc.rs +++ b/tests/basic_gc.rs @@ -1,5 +1,18 @@ use test_log::test; -use value_log::{Config, IndexWriter, MockIndex, MockIndexWriter, ValueLog}; +use value_log::{Compressor, Config, IndexWriter, MockIndex, MockIndexWriter, ValueLog}; + +#[derive(Clone, Default)] +struct NoCompressor; + +impl Compressor for NoCompressor { + fn compress(&self, bytes: &[u8]) -> value_log::Result> { + Ok(bytes.into()) + } + + fn decompress(&self, bytes: &[u8]) -> value_log::Result> { + Ok(bytes.into()) + } +} #[test] fn basic_gc() -> value_log::Result<()> { @@ -8,7 +21,7 @@ fn basic_gc() -> value_log::Result<()> { let index = MockIndex::default(); - let value_log = ValueLog::open(vl_path, Config::default())?; + let value_log = ValueLog::open(vl_path, Config::::default())?; { let items = ["a", "b", "c", "d", "e"]; diff --git a/tests/basic_kv.rs b/tests/basic_kv.rs index 6f4af13..c98f71d 100644 --- a/tests/basic_kv.rs +++ b/tests/basic_kv.rs @@ -1,6 +1,19 @@ use std::sync::Arc; use test_log::test; -use value_log::{Config, IndexWriter, KeyRange, MockIndex, MockIndexWriter, ValueLog}; +use value_log::{Compressor, Config, IndexWriter, KeyRange, MockIndex, MockIndexWriter, ValueLog}; + +#[derive(Clone, Default)] +struct NoCompressor; + +impl Compressor for NoCompressor { + fn compress(&self, bytes: &[u8]) -> value_log::Result> { + Ok(bytes.into()) + } + + fn decompress(&self, bytes: &[u8]) -> value_log::Result> { + Ok(bytes.into()) + } +} #[test] fn basic_kv() -> value_log::Result<()> { @@ -9,7 +22,7 @@ fn basic_kv() -> value_log::Result<()> { let index = MockIndex::default(); - let value_log = ValueLog::open(vl_path, Config::default())?; + let value_log = ValueLog::open(vl_path, Config::::default())?; let items = ["a", "b", "c", "d", "e"]; diff --git a/tests/compression.rs b/tests/compression.rs index 00f79fa..cf8beb4 100644 --- a/tests/compression.rs +++ b/tests/compression.rs @@ -1,16 +1,15 @@ -use std::sync::Arc; use test_log::test; use value_log::{Compressor, Config, IndexWriter, MockIndex, MockIndexWriter, ValueLog}; +#[derive(Clone, Default)] struct Lz4Compressor; impl Compressor for Lz4Compressor { - fn compress(&self, bytes: &[u8]) -> Result, value_log::CompressError> { + fn compress(&self, bytes: &[u8]) -> value_log::Result> { Ok(lz4_flex::compress_prepend_size(bytes)) } - fn decompress(&self, bytes: &[u8]) -> Result, value_log::DecompressError> { - lz4_flex::decompress_size_prepended(bytes) - .map_err(|e| value_log::DecompressError(e.to_string())) + fn decompress(&self, bytes: &[u8]) -> value_log::Result> { + lz4_flex::decompress_size_prepended(bytes).map_err(|_| value_log::Error::Decompress) } } @@ -21,10 +20,7 @@ fn compression() -> value_log::Result<()> { let index = MockIndex::default(); - let value_log = ValueLog::open( - vl_path, - Config::default().use_compression(Arc::new(Lz4Compressor)), - )?; + let value_log = ValueLog::open(vl_path, Config::::default())?; let mut index_writer = MockIndexWriter(index.clone()); let mut writer = value_log.get_writer()?; diff --git a/tests/gc_space_amp.rs b/tests/gc_space_amp.rs index edeaac7..2dd4aa7 100644 --- a/tests/gc_space_amp.rs +++ b/tests/gc_space_amp.rs @@ -1,5 +1,18 @@ use test_log::test; -use value_log::{Config, IndexWriter, MockIndex, MockIndexWriter, ValueLog}; +use value_log::{Compressor, Config, IndexWriter, MockIndex, MockIndexWriter, ValueLog}; + +#[derive(Clone, Default)] +struct NoCompressor; + +impl Compressor for NoCompressor { + fn compress(&self, bytes: &[u8]) -> value_log::Result> { + Ok(bytes.into()) + } + + fn decompress(&self, bytes: &[u8]) -> value_log::Result> { + Ok(bytes.into()) + } +} #[test] fn gc_space_amp_target_1() -> value_log::Result<()> { @@ -8,7 +21,7 @@ fn gc_space_amp_target_1() -> value_log::Result<()> { let index = MockIndex::default(); - let value_log = ValueLog::open(vl_path, Config::default())?; + let value_log = ValueLog::open(vl_path, Config::::default())?; assert_eq!(0.0, value_log.space_amp()); assert_eq!(0.0, value_log.manifest.stale_ratio()); diff --git a/tests/recovery.rs b/tests/recovery.rs index 2f68e02..4191ce8 100644 --- a/tests/recovery.rs +++ b/tests/recovery.rs @@ -1,5 +1,18 @@ use test_log::test; -use value_log::{Config, IndexWriter, MockIndex, MockIndexWriter, ValueLog}; +use value_log::{Compressor, Config, IndexWriter, MockIndex, MockIndexWriter, ValueLog}; + +#[derive(Clone, Default)] +struct NoCompressor; + +impl Compressor for NoCompressor { + fn compress(&self, bytes: &[u8]) -> value_log::Result> { + Ok(bytes.into()) + } + + fn decompress(&self, bytes: &[u8]) -> value_log::Result> { + Ok(bytes.into()) + } +} #[test] fn basic_recovery() -> value_log::Result<()> { @@ -11,7 +24,7 @@ fn basic_recovery() -> value_log::Result<()> { let items = ["a", "b", "c", "d", "e"]; { - let value_log = ValueLog::open(vl_path, Config::default())?; + let value_log = ValueLog::open(vl_path, Config::::default())?; { let mut index_writer = MockIndexWriter(index.clone()); @@ -48,7 +61,7 @@ fn basic_recovery() -> value_log::Result<()> { } { - let value_log = ValueLog::open(vl_path, Config::default())?; + let value_log = ValueLog::open(vl_path, Config::::default())?; value_log.scan_for_stats(index.read().unwrap().values().cloned().map(Ok))?; @@ -82,12 +95,12 @@ fn delete_unfinished_segment_folders() -> value_log::Result<()> { assert!(mock_path.try_exists()?); { - let _value_log = ValueLog::open(vl_path, Config::default())?; + let _value_log = ValueLog::open(vl_path, Config::::default())?; assert!(mock_path.try_exists()?); } { - let _value_log = ValueLog::open(vl_path, Config::default())?; + let _value_log = ValueLog::open(vl_path, Config::::default())?; assert!(!mock_path.try_exists()?); } diff --git a/tests/rollover_index_fail_finish.rs b/tests/rollover_index_fail_finish.rs index 9452f73..6bc20ec 100644 --- a/tests/rollover_index_fail_finish.rs +++ b/tests/rollover_index_fail_finish.rs @@ -1,5 +1,20 @@ use test_log::test; -use value_log::{Config, IndexWriter, MockIndex, MockIndexWriter, ValueHandle, ValueLog}; +use value_log::{ + Compressor, Config, IndexWriter, MockIndex, MockIndexWriter, ValueHandle, ValueLog, +}; + +#[derive(Clone, Default)] +struct NoCompressor; + +impl Compressor for NoCompressor { + fn compress(&self, bytes: &[u8]) -> value_log::Result> { + Ok(bytes.into()) + } + + fn decompress(&self, bytes: &[u8]) -> value_log::Result> { + Ok(bytes.into()) + } +} #[allow(clippy::module_name_repetitions)] pub struct DebugIndexWriter; @@ -21,7 +36,7 @@ fn rollover_index_fail_finish() -> value_log::Result<()> { let index = MockIndex::default(); - let value_log = ValueLog::open(vl_path, Config::default())?; + let value_log = ValueLog::open(vl_path, Config::::default())?; let items = ["a", "b", "c", "d", "e"]; diff --git a/tests/space_amp.rs b/tests/space_amp.rs index 00a6efa..e3cf686 100644 --- a/tests/space_amp.rs +++ b/tests/space_amp.rs @@ -1,5 +1,18 @@ use test_log::test; -use value_log::{Config, IndexWriter, MockIndex, MockIndexWriter, ValueLog}; +use value_log::{Compressor, Config, IndexWriter, MockIndex, MockIndexWriter, ValueLog}; + +#[derive(Clone, Default)] +struct NoCompressor; + +impl Compressor for NoCompressor { + fn compress(&self, bytes: &[u8]) -> value_log::Result> { + Ok(bytes.into()) + } + + fn decompress(&self, bytes: &[u8]) -> value_log::Result> { + Ok(bytes.into()) + } +} #[test] fn worst_case_space_amp() -> value_log::Result<()> { @@ -8,7 +21,7 @@ fn worst_case_space_amp() -> value_log::Result<()> { let index = MockIndex::default(); - let value_log = ValueLog::open(vl_path, Config::default())?; + let value_log = ValueLog::open(vl_path, Config::::default())?; assert_eq!(0.0, value_log.space_amp()); assert_eq!(0.0, value_log.manifest.stale_ratio()); @@ -48,7 +61,7 @@ fn no_overlap_space_amp() -> value_log::Result<()> { let index = MockIndex::default(); - let value_log = ValueLog::open(vl_path, Config::default())?; + let value_log = ValueLog::open(vl_path, Config::::default())?; assert_eq!(0.0, value_log.manifest.stale_ratio()); assert_eq!(0.0, value_log.space_amp()); diff --git a/tests/vlog_load_fixture.rs b/tests/vlog_load_fixture.rs index 136f752..c20e0df 100644 --- a/tests/vlog_load_fixture.rs +++ b/tests/vlog_load_fixture.rs @@ -1,11 +1,24 @@ use test_log::test; -use value_log::{Config, ValueLog}; +use value_log::{Compressor, Config, ValueLog}; + +#[derive(Clone, Default)] +struct NoCompressor; + +impl Compressor for NoCompressor { + fn compress(&self, bytes: &[u8]) -> value_log::Result> { + Ok(bytes.into()) + } + + fn decompress(&self, bytes: &[u8]) -> value_log::Result> { + Ok(bytes.into()) + } +} #[test] fn vlog_load_v1() -> value_log::Result<()> { let path = std::path::Path::new("test_fixture/v1_vlog"); - let value_log = ValueLog::open(path, Config::default())?; + let value_log = ValueLog::open(path, Config::::default())?; let count = { let mut count = 0; @@ -29,7 +42,7 @@ fn vlog_load_v1() -> value_log::Result<()> { fn vlog_load_v1_corrupt() -> value_log::Result<()> { let path = std::path::Path::new("test_fixture/v1_vlog_corrupt"); - let value_log = ValueLog::open(path, Config::default())?; + let value_log = ValueLog::open(path, Config::::default())?; assert_eq!(2, value_log.verify()?);