Skip to content

Commit

Permalink
make compression generic param
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin-j97 committed Aug 10, 2024
1 parent 4bb5eff commit 1c3d3a9
Show file tree
Hide file tree
Showing 22 changed files with 238 additions and 161 deletions.
22 changes: 18 additions & 4 deletions benches/value_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,22 @@ use criterion::{criterion_group, criterion_main, Criterion};
use rand::{Rng, RngCore};
use std::sync::Arc;
use value_log::{
BlobCache, Config, IndexReader, IndexWriter, MockIndex, MockIndexWriter, ValueLog,
BlobCache, Compressor, Config, IndexReader, IndexWriter, MockIndex, MockIndexWriter, ValueLog,
};

#[derive(Clone, Default)]
struct NoCompressor;

impl Compressor for NoCompressor {
fn compress(&self, bytes: &[u8]) -> value_log::Result<Vec<u8>> {
Ok(bytes.into())
}

fn decompress(&self, bytes: &[u8]) -> value_log::Result<Vec<u8>> {
Ok(bytes.into())
}
}

fn prefetch(c: &mut Criterion) {
let mut group = c.benchmark_group("prefetch range");

Expand All @@ -17,7 +30,7 @@ fn prefetch(c: &mut Criterion) {
let folder = tempfile::tempdir().unwrap();
let vl_path = folder.path();

let value_log = ValueLog::open(vl_path, Config::default()).unwrap();
let value_log = ValueLog::open(vl_path, Config::<NoCompressor>::default()).unwrap();

let mut writer = value_log.get_writer().unwrap();

Expand Down Expand Up @@ -106,7 +119,8 @@ fn load_value(c: &mut Criterion) {

let value_log = ValueLog::open(
vl_path,
Config::default().blob_cache(Arc::new(BlobCache::with_capacity_bytes(0))),
Config::<NoCompressor>::default()
.blob_cache(Arc::new(BlobCache::with_capacity_bytes(0))),
)
.unwrap();

Expand Down Expand Up @@ -154,7 +168,7 @@ fn load_value(c: &mut Criterion) {

let value_log = ValueLog::open(
vl_path,
Config::default()
Config::<NoCompressor>::default()
.blob_cache(Arc::new(BlobCache::with_capacity_bytes(64 * 1_024 * 1_024))),
)
.unwrap();
Expand Down
12 changes: 2 additions & 10 deletions src/compression.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,16 @@
/// Compression error
#[derive(Debug)]
pub struct CompressError(pub String);

/// Decompression error
#[derive(Debug)]
pub struct DecompressError(pub String);

/// Generic compression trait
pub trait Compressor {
/// Compresses a value
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
fn compress(&self, bytes: &[u8]) -> Result<Vec<u8>, CompressError>;
fn compress(&self, bytes: &[u8]) -> crate::Result<Vec<u8>>;

/// Decompresses a value
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
fn decompress(&self, bytes: &[u8]) -> Result<Vec<u8>, DecompressError>;
fn decompress(&self, bytes: &[u8]) -> crate::Result<Vec<u8>>;
}
34 changes: 5 additions & 29 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,53 +1,29 @@
use crate::{blob_cache::BlobCache, compression::Compressor};
use std::sync::Arc;

/// No compression
pub struct NoCompressor;

impl Compressor for NoCompressor {
fn compress(&self, bytes: &[u8]) -> Result<Vec<u8>, crate::compression::CompressError> {
Ok(bytes.into())
}

fn decompress(&self, bytes: &[u8]) -> Result<Vec<u8>, crate::compression::DecompressError> {
Ok(bytes.into())
}
}

/// Value log configuration
pub struct Config {
pub struct Config<C: Compressor + Clone> {
/// Target size of vLog segments
pub(crate) segment_size_bytes: u64,

/// Blob cache to use
pub(crate) blob_cache: Arc<BlobCache>,

/// Compression to use
pub(crate) compression: Arc<dyn Compressor + Send + Sync>,
pub(crate) compression: C,
}

impl Default for Config {
impl<C: Compressor + Clone + Default> Default for Config<C> {
fn default() -> Self {
Self {
segment_size_bytes: 256 * 1_024 * 1_024,
blob_cache: Arc::new(BlobCache::with_capacity_bytes(16 * 1_024 * 1_024)),
compression: Arc::new(NoCompressor),
compression: C::default(),
}
}
}

impl Config {
/// Sets the compression type to use.
///
/// Using compression is recommended.
///
/// Default = none
#[must_use]
pub fn use_compression(mut self, compressor: Arc<dyn Compressor + Send + Sync>) -> Self {
self.compression = compressor;
self
}

impl<C: Compressor + Clone> Config<C> {
/// Sets the blob cache.
///
/// You can create a global [`BlobCache`] and share it between multiple
Expand Down
17 changes: 2 additions & 15 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::{
compression::{CompressError, DecompressError},
serde::{DeserializeError, SerializeError},
version::Version,
};
Expand All @@ -20,10 +19,10 @@ pub enum Error {
Deserialize(DeserializeError),

/// Compression failed
Compress(CompressError),
Compress,

/// Decompression failed
Decompress(DecompressError),
Decompress,
// TODO:
// /// Checksum check failed
// ChecksumMismatch,
Expand Down Expand Up @@ -55,17 +54,5 @@ impl From<DeserializeError> for Error {
}
}

impl From<CompressError> for Error {
fn from(value: CompressError) -> Self {
Self::Compress(value)
}
}

impl From<DecompressError> for Error {
fn from(value: DecompressError) -> Self {
Self::Decompress(value)
}
}

/// Value log result
pub type Result<T> = std::result::Result<T, Error>;
19 changes: 14 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,20 @@
//! # let index = MockIndex::default();
//! # let path = folder.path();
//! #
//! # #[derive(Clone, Default)]
//! # struct MyCompressor;
//! #
//! # impl value_log::Compressor for MyCompressor {
//! # fn compress(&self, bytes: &[u8]) -> value_log::Result<Vec<u8>> {
//! # Ok(bytes.into())
//! # }
//! #
//! # fn decompress(&self, bytes: &[u8]) -> value_log::Result<Vec<u8>> {
//! # Ok(bytes.into())
//! # }
//! # }
//! // Open or recover value log from disk
//! let value_log = ValueLog::open(path, Config::default())?;
//! let value_log = ValueLog::open(path, Config::<MyCompressor>::default())?;
//!
//! // Write some data
//! # let mut index_writer = MockIndexWriter(index.clone());
Expand Down Expand Up @@ -109,7 +121,7 @@ pub(crate) type HashMap<K, V> = ahash::HashMap<K, V>;

pub use {
blob_cache::BlobCache,
compression::{CompressError, Compressor, DecompressError},
compression::Compressor,
config::Config,
error::{Error, Result},
handle::ValueHandle,
Expand All @@ -120,9 +132,6 @@ pub use {
version::Version,
};

#[doc(hidden)]
pub use config::NoCompressor;

#[doc(hidden)]
pub use segment::{reader::Reader as SegmentReader, Segment};

Expand Down
25 changes: 14 additions & 11 deletions src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ use crate::{
id::SegmentId,
key_range::KeyRange,
segment::{gc_stats::GcStats, meta::Metadata, trailer::SegmentFileTrailer},
HashMap, Segment, SegmentWriter as MultiWriter,
Compressor, HashMap, Segment, SegmentWriter as MultiWriter,
};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::{
io::{Cursor, Write},
marker::PhantomData,
path::{Path, PathBuf},
sync::{Arc, RwLock},
};
Expand Down Expand Up @@ -36,24 +37,24 @@ fn rewrite_atomic<P: AsRef<Path>>(path: P, content: &[u8]) -> std::io::Result<()
}

#[allow(clippy::module_name_repetitions)]
pub struct SegmentManifestInner {
pub struct SegmentManifestInner<C: Compressor + Clone> {
path: PathBuf,
pub segments: RwLock<HashMap<SegmentId, Arc<Segment>>>,
pub segments: RwLock<HashMap<SegmentId, Arc<Segment<C>>>>,
}

#[allow(clippy::module_name_repetitions)]
#[derive(Clone)]
pub struct SegmentManifest(Arc<SegmentManifestInner>);
pub struct SegmentManifest<C: Compressor + Clone>(Arc<SegmentManifestInner<C>>);

impl std::ops::Deref for SegmentManifest {
type Target = SegmentManifestInner;
impl<C: Compressor + Clone> std::ops::Deref for SegmentManifest<C> {
type Target = SegmentManifestInner<C>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl SegmentManifest {
impl<C: Compressor + Clone> SegmentManifest<C> {
fn remove_unfinished_segments<P: AsRef<Path>>(
folder: P,
registered_ids: &[u64],
Expand Down Expand Up @@ -129,6 +130,7 @@ impl SegmentManifest {
path,
meta: trailer.metadata,
gc_stats: GcStats::default(),
_phantom: PhantomData,
}),
);
}
Expand All @@ -155,7 +157,7 @@ impl SegmentManifest {
}

/// Modifies the level manifest atomically.
pub(crate) fn atomic_swap<F: FnOnce(&mut HashMap<SegmentId, Arc<Segment>>)>(
pub(crate) fn atomic_swap<F: FnOnce(&mut HashMap<SegmentId, Arc<Segment<C>>>)>(
&self,
f: F,
) -> crate::Result<()> {
Expand Down Expand Up @@ -189,7 +191,7 @@ impl SegmentManifest {
})
}

pub fn register(&self, writer: MultiWriter) -> crate::Result<()> {
pub fn register(&self, writer: MultiWriter<C>) -> crate::Result<()> {
let writers = writer.finish()?;

self.atomic_swap(move |recipe| {
Expand Down Expand Up @@ -235,6 +237,7 @@ impl SegmentManifest {
)),
},
gc_stats: GcStats::default(),
_phantom: PhantomData,
}),
);

Expand Down Expand Up @@ -272,7 +275,7 @@ impl SegmentManifest {

/// Gets a segment
#[must_use]
pub fn get_segment(&self, id: SegmentId) -> Option<Arc<Segment>> {
pub fn get_segment(&self, id: SegmentId) -> Option<Arc<Segment<C>>> {
self.segments
.read()
.expect("lock is poisoned")
Expand All @@ -294,7 +297,7 @@ impl SegmentManifest {

/// Lists all segments
#[must_use]
pub fn list_segments(&self) -> Vec<Arc<Segment>> {
pub fn list_segments(&self) -> Vec<Arc<Segment<C>>> {
self.segments
.read()
.expect("lock is poisoned")
Expand Down
4 changes: 2 additions & 2 deletions src/scanner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{id::SegmentId, ValueHandle, ValueLog};
use crate::{id::SegmentId, Compressor, ValueHandle, ValueLog};
use std::{collections::BTreeMap, sync::MutexGuard};

#[derive(Debug, Default)]
Expand All @@ -19,7 +19,7 @@ pub struct Scanner<'a, I: Iterator<Item = std::io::Result<(ValueHandle, u32)>>>
}

impl<'a, I: Iterator<Item = std::io::Result<(ValueHandle, u32)>>> Scanner<'a, I> {
pub fn new(vlog: &'a ValueLog, iter: I) -> Self {
pub fn new<C: Compressor + Clone>(vlog: &'a ValueLog<C>, iter: I) -> Self {
Self {
iter,
lock_guard: vlog.rollover_guard.lock().expect("lock is poisoned"),
Expand Down
12 changes: 6 additions & 6 deletions src/segment/merge.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{id::SegmentId, value::UserKey, SegmentReader, UserValue};
use crate::{id::SegmentId, value::UserKey, Compressor, SegmentReader, UserValue};
use std::cmp::Reverse;

// TODO: replace with MinHeap...
Expand Down Expand Up @@ -36,14 +36,14 @@ impl Ord for IteratorValue {

/// Interleaves multiple segment readers into a single, sorted stream
#[allow(clippy::module_name_repetitions)]
pub struct MergeReader {
readers: Vec<SegmentReader>,
pub struct MergeReader<C: Compressor + Clone> {
readers: Vec<SegmentReader<C>>,
heap: MinMaxHeap<IteratorValue>,
}

impl MergeReader {
impl<C: Compressor + Clone> MergeReader<C> {
/// Initializes a new merging reader
pub fn new(readers: Vec<SegmentReader>) -> Self {
pub fn new(readers: Vec<SegmentReader<C>>) -> Self {
Self {
readers,
heap: MinMaxHeap::new(),
Expand Down Expand Up @@ -78,7 +78,7 @@ impl MergeReader {
}
}

impl Iterator for MergeReader {
impl<C: Compressor + Clone> Iterator for MergeReader<C> {
type Item = crate::Result<(UserKey, UserValue, SegmentId, u64)>;

fn next(&mut self) -> Option<Self::Item> {
Expand Down
Loading

0 comments on commit 1c3d3a9

Please sign in to comment.