Skip to content

Commit

Permalink
use xxh3 as checksum
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin-j97 committed Aug 6, 2024
1 parent e8b304e commit ca451e0
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 96 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ serde = ["dep:serde"]
[dependencies]
ahash = "0.8.11"
byteorder = "1.5.0"
crc32fast = "1.4.2"
log = "0.4.21"
lz4_flex = { version = "0.11.3", optional = true }
min-max-heap = "1.3.0"
Expand All @@ -36,6 +35,7 @@ quick_cache = { version = "0.6.0", default-features = false, features = [
] }
serde = { version = "1.0.200", optional = true, features = ["derive", "rc"] }
tempfile = "3.10.1"
xxhash-rust = { version = "0.8.12", features = ["xxh3"] }

[dev-dependencies]
criterion = "0.5.1"
Expand Down
75 changes: 1 addition & 74 deletions benches/value_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,78 +198,5 @@ fn load_value(c: &mut Criterion) {
}
}

/* fn compression(c: &mut Criterion) {
let mut group = c.benchmark_group("compression");
let index = MockIndex::default();
let mut index_writer = MockIndexWriter(index.clone());
let folder = tempfile::tempdir().unwrap();
let vl_path = folder.path();
let value_log = ValueLog::open(
vl_path,
Config::default().blob_cache(Arc::new(BlobCache::with_capacity_bytes(0))),
)
.unwrap();
let mut writer = value_log.get_writer().unwrap();
let mut rng = rand::thread_rng();
let size_mb = 16;
{
let key = "random";
let mut data = vec![0u8; size_mb * 1_024 * 1_024];
rng.fill_bytes(&mut data);
index_writer
.insert_indirect(
key.as_bytes(),
writer.get_next_value_handle(),
data.len() as u32,
)
.unwrap();
writer.write(key.as_bytes(), &data).unwrap();
}
{
let key = "good_compression";
let dummy = b"abcdefgh";
let data = dummy.repeat(size_mb * 1_024 * 1_024 / dummy.len());
index_writer
.insert_indirect(
key.as_bytes(),
writer.get_next_value_handle(),
data.len() as u32,
)
.unwrap();
writer.write(key.as_bytes(), &data).unwrap();
}
value_log.register_writer(writer).unwrap();
let handle_random = index.get(b"random").unwrap().unwrap();
let handle_good_compression = index.get(b"good_compression").unwrap().unwrap();
group.bench_function("no compression", |b| {
b.iter(|| {
value_log.get(&handle_random).unwrap().unwrap();
})
});
group.bench_function("good compression", |b| {
b.iter(|| {
value_log.get(&handle_good_compression).unwrap().unwrap();
})
});
} */

criterion_group!(benches, hashmap, load_value, prefetch /* , compression */);
criterion_group!(benches, load_value, prefetch);
criterion_main!(benches);
4 changes: 2 additions & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ pub enum Error {
/// Decompression failed
Decompress(DecompressError),
// TODO:
// /// CRC check failed
// CrcMismatch,
// /// Checksum check failed
// ChecksumMismatch,
}

impl std::fmt::Display for Error {
Expand Down
10 changes: 5 additions & 5 deletions src/segment/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ struct IteratorValue {
key: UserKey,
value: UserValue,
segment_id: SegmentId,
crc: u32,
checksum: u64,
}

impl PartialEq for IteratorValue {
Expand Down Expand Up @@ -54,15 +54,15 @@ impl MergeReader {
let reader = self.readers.get_mut(idx).expect("iter should exist");

if let Some(value) = reader.next() {
let (k, v, crc) = value?;
let (k, v, checksum) = value?;
let segment_id = reader.segment_id;

self.heap.push(IteratorValue {
index: idx,
key: k,
value: v,
segment_id,
crc,
checksum,
});
}

Expand All @@ -79,7 +79,7 @@ impl MergeReader {
}

impl Iterator for MergeReader {
type Item = crate::Result<(UserKey, UserValue, SegmentId, u32)>;
type Item = crate::Result<(UserKey, UserValue, SegmentId, u64)>;

fn next(&mut self) -> Option<Self::Item> {
if self.heap.is_empty() {
Expand Down Expand Up @@ -107,7 +107,7 @@ impl Iterator for MergeReader {
}
}

return Some(Ok((head.key, head.value, head.segment_id, head.crc)));
return Some(Ok((head.key, head.value, head.segment_id, head.checksum)));
}

None
Expand Down
6 changes: 3 additions & 3 deletions src/segment/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl Reader {
}

impl Iterator for Reader {
type Item = crate::Result<(UserKey, UserValue, u32)>;
type Item = crate::Result<(UserKey, UserValue, u64)>;

fn next(&mut self) -> Option<Self::Item> {
if self.is_terminated {
Expand All @@ -76,7 +76,7 @@ impl Iterator for Reader {
}
}

let crc = match self.inner.read_u32::<BigEndian>() {
let checksum = match self.inner.read_u64::<BigEndian>() {
Ok(v) => v,
Err(e) => {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
Expand Down Expand Up @@ -124,6 +124,6 @@ impl Iterator for Reader {
None => val,
};

Some(Ok((key.into(), val.into(), crc)))
Some(Ok((key.into(), val.into(), checksum)))
}
}
12 changes: 6 additions & 6 deletions src/segment/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,16 @@ impl Writer {
None => value.to_vec(),
};

let mut hasher = crc32fast::Hasher::new();
let mut hasher = xxhash_rust::xxh3::Xxh3::new();
hasher.update(key);
hasher.update(&value);
let crc = hasher.finalize();
let checksum = hasher.digest();

// Write header
self.active_writer.write_all(BLOB_HEADER_MAGIC)?;

// Write CRC
self.active_writer.write_u32::<BigEndian>(crc)?;
// Write checksum
self.active_writer.write_u64::<BigEndian>(checksum)?;

// Write key

Expand All @@ -136,8 +136,8 @@ impl Writer {
// Header
self.offset += BLOB_HEADER_MAGIC.len() as u64;

// CRC
self.offset += std::mem::size_of::<u32>() as u64;
// Checksum
self.offset += std::mem::size_of::<u64>() as u64;

// Key
self.offset += std::mem::size_of::<u16>() as u64;
Expand Down
10 changes: 5 additions & 5 deletions src/value_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ impl ValueLog {
let mut sum = 0;

for item in self.get_reader()? {
let (k, v, _, crc) = item?;
let (k, v, _, expected_checksum) = item?;

let mut hasher = crc32fast::Hasher::new();
let mut hasher = xxhash_rust::xxh3::Xxh3::new();
hasher.update(&k);
hasher.update(&v);

if hasher.finalize() != crc {
if hasher.digest() != expected_checksum {
sum += 1;
}
}
Expand Down Expand Up @@ -241,7 +241,7 @@ impl ValueLog {
let Some(item) = reader.next() else {
return Ok(None);
};
let (_key, val, _crc) = item?;
let (_key, val, _checksum) = item?;

self.blob_cache
.insert((self.id, handle.clone()).into(), val.clone());
Expand All @@ -252,7 +252,7 @@ impl ValueLog {
let Some(item) = reader.next() else {
break;
};
let (_key, val, _crc) = item?;
let (_key, val, _checksum) = item?;

let value_handle = ValueHandle {
segment_id: handle.segment_id,
Expand Down
18 changes: 18 additions & 0 deletions tests/vlog_load_fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,21 @@ fn vlog_load_v1_corrupt() -> value_log::Result<()> {

Ok(())
}

/* {
let mut writer = value_log.get_writer()?;
writer.write("a", "")?;
writer.write("b", "")?;
writer.write("c", "")?;
writer.write("d", "")?;
value_log.register_writer(writer)?;
}
{
let mut writer = value_log.get_writer()?;
writer.write("a", "We're caught between")?;
writer.write("b", "This life and dream")?;
writer.write("c", "But you and me we're bigger")?;
writer.write("d", "Let's try to figure this out")?;
value_log.register_writer(writer)?;
} */

0 comments on commit ca451e0

Please sign in to comment.