Skip to content

Commit

Permalink
feat: add miniz compression
Browse files Browse the repository at this point in the history
compression is now behind feature flags
  • Loading branch information
marvin-j97 committed Jun 10, 2024
1 parent 234c9ee commit 05a1f30
Show file tree
Hide file tree
Showing 11 changed files with 149 additions and 20 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ name = "value_log"
path = "src/lib.rs"

[features]
default = ["lz4"]
default = []
lz4 = ["dep:lz4_flex"]
miniz = ["dep:miniz_oxide"]
serde = ["dep:serde"]

[dependencies]
Expand All @@ -29,6 +30,7 @@ crc32fast = "1.4.2"
log = "0.4.21"
lz4_flex = { version = "0.11.3", optional = true }
min-max-heap = "1.3.0"
miniz_oxide = { version = "0.7.3", optional = true }
path-absolutize = "3.1.1"
quick_cache = "0.5.1"
serde = { version = "1.0.200", optional = true, features = ["derive", "rc"] }
Expand Down
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@ Keys are limited to 65536 bytes, values are limited to 2^32 bytes.

Allows using `LZ4` compression, powered by [`lz4_flex`](https://github.com/PSeitz/lz4_flex).

*Enabled by default.*
*Disabled by default.*

### miniz

Allows using `DEFLATE/zlib` compression, powered by [`miniz_oxide`](https://github.com/Frommi/miniz_oxide).

*Disabled by default.*

### serde

Expand Down
67 changes: 67 additions & 0 deletions src/compression.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/// Compression type
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[allow(clippy::module_name_repetitions)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub enum CompressionType {
/// No compression
None,

/// LZ4 compression (speed-optimized)
#[cfg(feature = "lz4")]
Lz4,

// TODO: compression level
/// Zlib/DEFLATE compression (space-optimized)
#[cfg(feature = "miniz")]
Miniz,
}

impl From<CompressionType> for u8 {
fn from(val: CompressionType) -> Self {
match val {
CompressionType::None => 0,

#[cfg(feature = "lz4")]
CompressionType::Lz4 => 1,

#[cfg(feature = "miniz")]
CompressionType::Miniz => 2,
}
}
}

impl TryFrom<u8> for CompressionType {
type Error = ();

fn try_from(value: u8) -> Result<Self, Self::Error> {
match value {
0 => Ok(Self::None),

#[cfg(feature = "lz4")]
1 => Ok(Self::Lz4),

#[cfg(feature = "miniz")]
2 => Ok(Self::Miniz),

_ => Err(()),
}
}
}

impl std::fmt::Display for CompressionType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match self {
Self::None => "no compression",

#[cfg(feature = "lz4")]
Self::Lz4 => "lz4",

#[cfg(feature = "miniz")]
Self::Miniz => "miniz",
}
)
}
}
7 changes: 6 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
use crate::blob_cache::BlobCache;
use crate::{blob_cache::BlobCache, CompressionType};
use std::sync::Arc;

/// Value log configuration
#[derive(Debug)]
pub struct Config {
pub(crate) segment_size_bytes: u64,
pub(crate) blob_cache: Arc<BlobCache>,

pub(crate) compression: CompressionType,
}

impl Default for Config {
fn default() -> Self {
Self {
segment_size_bytes: 256 * 1_024 * 1_024,
blob_cache: Arc::new(BlobCache::with_capacity_bytes(16 * 1_024 * 1_024)),

// TODO: setter method
compression: CompressionType::None,
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{
serde::{DeserializeError, SerializeError},
version::Version,
CompressionType,
};

/// Represents errors that can occur in the value-log
Expand All @@ -17,6 +18,9 @@ pub enum Error {

/// Deserialization failed
Deserialize(DeserializeError),

/// Decompression failed
Decompress(CompressionType),
// TODO:
// /// CRC check failed
// CrcMismatch,
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
#![allow(clippy::missing_const_for_fn)]

mod blob_cache;
mod compression;
mod config;
mod error;
mod handle;
Expand All @@ -106,6 +107,7 @@ mod version;

pub use {
blob_cache::BlobCache,
compression::CompressionType,
config::Config,
error::{Error, Result},
handle::ValueHandle,
Expand Down
1 change: 1 addition & 0 deletions src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ impl SegmentManifest {
.clone()
.expect("should have written at least 1 item"),
)),
compression: writer.compression,
},
gc_stats: GcStats::default(),
}),
Expand Down
13 changes: 10 additions & 3 deletions src/segment/meta.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{
key_range::KeyRange,
serde::{Deserializable, DeserializeError, Serializable, SerializeError},
CompressionType,
};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::io::{Read, Write};
Expand All @@ -19,9 +20,9 @@ pub struct Metadata {
/// true size in bytes (if no compression were used)
pub total_uncompressed_bytes: u64,

// TODO: 1.0.0
///// What type of compression is used
// pub compression: CompressionType,
/// What type of compression is used
pub compression: CompressionType,

/// Key range
pub key_range: KeyRange,
}
Expand All @@ -35,6 +36,8 @@ impl Serializable for Metadata {
writer.write_u64::<BigEndian>(self.compressed_bytes)?;
writer.write_u64::<BigEndian>(self.total_uncompressed_bytes)?;

writer.write_u8(self.compression.into())?;

self.key_range.serialize(writer)?;

Ok(())
Expand All @@ -55,13 +58,17 @@ impl Deserializable for Metadata {
let compressed_bytes = reader.read_u64::<BigEndian>()?;
let total_uncompressed_bytes = reader.read_u64::<BigEndian>()?;

let compression = reader.read_u8()?;
let compression = CompressionType::try_from(compression).expect("invalid compression type");

let key_range = KeyRange::deserialize(reader)?;

Ok(Self {
item_count,
compressed_bytes,
total_uncompressed_bytes,
key_range,
compression,
})
}
}
13 changes: 10 additions & 3 deletions src/segment/multi_writer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use super::writer::Writer;
use crate::id::{IdGenerator, SegmentId};
use crate::{
id::{IdGenerator, SegmentId},
CompressionType,
};
use std::path::{Path, PathBuf};

/// Segment writer, may write multiple segments
Expand All @@ -8,6 +11,8 @@ pub struct MultiWriter {
target_size: u64,
writers: Vec<Writer>,
id_generator: IdGenerator,

compression: CompressionType,
}

impl MultiWriter {
Expand All @@ -21,6 +26,7 @@ impl MultiWriter {
id_generator: IdGenerator,
target_size: u64,
folder: P,
compression: CompressionType,
) -> std::io::Result<Self> {
let folder = folder.as_ref();

Expand All @@ -31,7 +37,8 @@ impl MultiWriter {
id_generator,
folder: folder.into(),
target_size,
writers: vec![Writer::new(segment_path, segment_id)?],
writers: vec![Writer::new(segment_path, segment_id, compression)?],
compression,
})
}

Expand Down Expand Up @@ -67,7 +74,7 @@ impl MultiWriter {
let new_segment_id = self.id_generator.next();

self.writers
.push(Writer::new(&self.folder, new_segment_id)?);
.push(Writer::new(&self.folder, new_segment_id, self.compression)?);

Ok(())
}
Expand Down
30 changes: 24 additions & 6 deletions src/segment/writer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use super::{meta::Metadata, trailer::SegmentFileTrailer};
use crate::{id::SegmentId, key_range::KeyRange, serde::Serializable, value::UserKey};
use crate::{
id::SegmentId, key_range::KeyRange, serde::Serializable, value::UserKey, CompressionType,
};
use byteorder::{BigEndian, WriteBytesExt};
use std::{
fs::File,
Expand All @@ -20,8 +22,10 @@ pub struct Writer {
pub(crate) written_blob_bytes: u64,
pub(crate) uncompressed_bytes: u64,

pub first_key: Option<UserKey>,
pub last_key: Option<UserKey>,
pub(crate) first_key: Option<UserKey>,
pub(crate) last_key: Option<UserKey>,

pub(crate) compression: CompressionType,
}

impl Writer {
Expand All @@ -31,7 +35,11 @@ impl Writer {
///
/// Will return `Err` if an IO error occurs.
#[doc(hidden)]
pub fn new<P: AsRef<Path>>(path: P, segment_id: SegmentId) -> std::io::Result<Self> {
pub fn new<P: AsRef<Path>>(
path: P,
segment_id: SegmentId,
compression: CompressionType,
) -> std::io::Result<Self> {
let path = path.as_ref();

let file = File::create(path)?;
Expand All @@ -47,6 +55,8 @@ impl Writer {

first_key: None,
last_key: None,

compression,
})
}

Expand Down Expand Up @@ -85,8 +95,15 @@ impl Writer {

self.uncompressed_bytes += value.len() as u64;

#[cfg(feature = "lz4")]
let value = lz4_flex::compress_prepend_size(value);
let value = match self.compression {
CompressionType::None => value.to_vec(),

#[cfg(feature = "lz4")]
CompressionType::Lz4 => lz4_flex::compress_prepend_size(&value),

#[cfg(feature = "miniz")]
CompressionType::Miniz => miniz_oxide::deflate::compress_to_vec(&value, 10),
};

let mut hasher = crc32fast::Hasher::new();
hasher.update(&value);
Expand Down Expand Up @@ -140,6 +157,7 @@ impl Writer {
.clone()
.expect("should have written at least 1 item"),
)),
compression: self.compression,
};
metadata.serialize(&mut self.writer)?;

Expand Down
20 changes: 15 additions & 5 deletions src/value_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,24 @@ impl ValueLog {

let val_len = reader.read_u32::<BigEndian>()?;

let mut val = vec![0; val_len as usize];
reader.read_exact(&mut val)?;
let mut value = vec![0; val_len as usize];
reader.read_exact(&mut value)?;

#[cfg(feature = "lz4")]
let val = lz4_flex::decompress_size_prepended(&val).expect("should decompress");
let value = match segment.meta.compression {
crate::CompressionType::None => value,

#[cfg(feature = "lz4")]
crate::CompressionType::Lz4 => lz4_flex::decompress_size_prepended(&value)
.map_err(|_| crate::Error::Decompress(segment.meta.compression))?,

#[cfg(feature = "miniz")]
crate::CompressionType::Miniz => miniz_oxide::inflate::decompress_to_vec(&value)
.map_err(|_| crate::Error::Decompress(segment.meta.compression))?,
};

// TODO: handle CRC

let val: UserValue = val.into();
let val: UserValue = value.into();

self.blob_cache
.insert((self.id, handle.clone()).into(), val.clone());
Expand All @@ -223,6 +232,7 @@ impl ValueLog {
self.id_generator.clone(),
self.config.segment_size_bytes,
self.path.join(SEGMENTS_FOLDER),
self.config.compression,
)?)
}

Expand Down

0 comments on commit 05a1f30

Please sign in to comment.