Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin-j97 committed Jul 25, 2024
1 parent f42efa6 commit 576c843
Show file tree
Hide file tree
Showing 20 changed files with 83 additions and 47 deletions.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
//!
//! let key = key.as_bytes();
//!
//! let handle = writer.get_next_value_handle(key);
//! let handle = writer.get_next_value_handle();
//! index_writer.insert_indirect(key, handle, value.len() as u32)?;
//!
//! writer.write(key, value)?;
Expand Down
8 changes: 5 additions & 3 deletions src/segment/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ struct IteratorValue {
key: Arc<[u8]>,
value: Arc<[u8]>,
segment_id: SegmentId,
crc: u32,
}

impl PartialEq for IteratorValue {
Expand Down Expand Up @@ -53,14 +54,15 @@ impl MergeReader {
let reader = self.readers.get_mut(idx).expect("iter should exist");

if let Some(value) = reader.next() {
let (k, v) = value?;
let (k, v, crc) = value?;
let segment_id = reader.segment_id;

self.heap.push(IteratorValue {
index: idx,
key: k,
value: v,
segment_id,
crc,
});
}

Expand All @@ -77,7 +79,7 @@ impl MergeReader {
}

impl Iterator for MergeReader {
type Item = crate::Result<(Arc<[u8]>, Arc<[u8]>, SegmentId)>;
type Item = crate::Result<(Arc<[u8]>, Arc<[u8]>, SegmentId, u32)>;

fn next(&mut self) -> Option<Self::Item> {
if self.heap.is_empty() {
Expand Down Expand Up @@ -105,7 +107,7 @@ impl Iterator for MergeReader {
}
}

return Some(Ok((head.key, head.value, head.segment_id)));
return Some(Ok((head.key, head.value, head.segment_id, head.crc)));
}

None
Expand Down
14 changes: 4 additions & 10 deletions src/segment/multi_writer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::writer::{Writer, BLOB_HEADER_MAGIC};
use super::writer::Writer;
use crate::{
compression::Compressor,
id::{IdGenerator, SegmentId},
Expand Down Expand Up @@ -75,22 +75,16 @@ impl MultiWriter {
///
/// This can be used to index an item into an external `Index`.
#[must_use]
pub fn get_next_value_handle(&self, key: &[u8]) -> ValueHandle {
pub fn get_next_value_handle(&self) -> ValueHandle {
ValueHandle {
offset: self.offset(key),
offset: self.offset(),
segment_id: self.segment_id(),
}
}

#[must_use]
fn offset(&self, key: &[u8]) -> u64 {
fn offset(&self) -> u64 {
self.get_active_writer().offset()
// NOTE: Point to the value record, not the key
// The key is not really needed when dereferencing a value handle
+ (BLOB_HEADER_MAGIC.len()
+ std::mem::size_of::<u16>()
+ key.len()
) as u64
}

#[must_use]
Expand Down
19 changes: 9 additions & 10 deletions src/segment/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl Reader {
}

impl Iterator for Reader {
type Item = crate::Result<(Arc<[u8]>, Arc<[u8]>)>;
type Item = crate::Result<(Arc<[u8]>, Arc<[u8]>, u32)>;

fn next(&mut self) -> Option<Self::Item> {
if self.is_terminated {
Expand All @@ -60,7 +60,7 @@ impl Iterator for Reader {
}
}

let key_len = match self.inner.read_u16::<BigEndian>() {
let crc = match self.inner.read_u32::<BigEndian>() {
Ok(v) => v,
Err(e) => {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
Expand All @@ -70,13 +70,7 @@ impl Iterator for Reader {
}
};

let mut key = vec![0; key_len.into()];
if let Err(e) = self.inner.read_exact(&mut key) {
return Some(Err(e.into()));
};

// TODO: handle crc
let _crc = match self.inner.read_u32::<BigEndian>() {
let key_len = match self.inner.read_u16::<BigEndian>() {
Ok(v) => v,
Err(e) => {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
Expand All @@ -86,6 +80,11 @@ impl Iterator for Reader {
}
};

let mut key = vec![0; key_len.into()];
if let Err(e) = self.inner.read_exact(&mut key) {
return Some(Err(e.into()));
};

let val_len = match self.inner.read_u32::<BigEndian>() {
Ok(v) => v,
Err(e) => {
Expand All @@ -101,6 +100,6 @@ impl Iterator for Reader {
return Some(Err(e.into()));
};

Some(Ok((key.into(), val.into())))
Some(Ok((key.into(), val.into(), crc)))
}
}
7 changes: 4 additions & 3 deletions src/segment/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,16 @@ impl Writer {
};

let mut hasher = crc32fast::Hasher::new();
hasher.update(key);
hasher.update(&value);
let crc = hasher.finalize();

// Write header
self.active_writer.write_all(BLOB_HEADER_MAGIC)?;

// Write CRC
self.active_writer.write_u32::<BigEndian>(crc)?;

// Write key

// NOTE: Truncation is okay and actually needed
Expand All @@ -121,9 +125,6 @@ impl Writer {
.write_u16::<BigEndian>(key.len() as u16)?;
self.active_writer.write_all(key)?;

// Write CRC
self.active_writer.write_u32::<BigEndian>(crc)?;

// Write value

// NOTE: Truncation is okay and actually needed
Expand Down
32 changes: 25 additions & 7 deletions src/value_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
manifest::{SegmentManifest, SEGMENTS_FOLDER, VLOG_MARKER},
path::absolute_path,
scanner::{Scanner, SizeMap},
segment::merge::MergeReader,
segment::{merge::MergeReader, writer::BLOB_HEADER_MAGIC},
value::UserValue,
version::Version,
Config, IndexReader, SegmentWriter, ValueHandle,
Expand Down Expand Up @@ -89,7 +89,21 @@ impl ValueLog {
pub fn verify(&self) -> crate::Result<usize> {
let _lock = self.rollover_guard.lock().expect("lock is poisoned");

Ok(0)
let mut sum = 0;

for item in self.get_reader()? {
let (k, v, _, crc) = item?;

let mut hasher = crc32fast::Hasher::new();
hasher.update(&k);
hasher.update(&v);

if hasher.finalize() != crc {
sum += 1;
}
}

Ok(sum)
}

/// Creates a new empty value log in a directory.
Expand Down Expand Up @@ -208,19 +222,23 @@ impl ValueLog {
}

let mut reader = BufReader::new(File::open(&segment.path)?);
reader.seek(std::io::SeekFrom::Start(handle.offset))?;
reader.seek(std::io::SeekFrom::Start(
handle.offset + BLOB_HEADER_MAGIC.len() as u64,
))?;

// TODO: handle CRC
let _crc = reader.read_u32::<BigEndian>()?;

let key_len = reader.read_u16::<BigEndian>()?;
reader.seek_relative(key_len.into())?;

let val_len = reader.read_u32::<BigEndian>()?;

let mut value = vec![0; val_len as usize];
reader.read_exact(&mut value)?;

let value = self.config.compression.decompress(&value)?;

// TODO: handle CRC

let val: UserValue = value.into();

self.blob_cache
Expand Down Expand Up @@ -497,7 +515,7 @@ impl ValueLog {
.use_compression(self.config.compression.clone());

for item in reader {
let (k, v, segment_id) = item?;
let (k, v, segment_id, _) = item?;

match index_reader.get(&k)? {
// If this value is in an older segment, we can discard it
Expand All @@ -506,7 +524,7 @@ impl ValueLog {
_ => {}
}

let vhandle = writer.get_next_value_handle(&k);
let vhandle = writer.get_next_value_handle();
index_writer.insert_indirect(&k, vhandle, v.len() as u32)?;

writer.write(&k, &v)?;
Expand Down
Binary file added test_fixture/v1_vlog/segments/0
Binary file not shown.
Binary file modified test_fixture/v1_vlog/segments/1
Binary file not shown.
Binary file modified test_fixture/v1_vlog/vlog_manifest
Binary file not shown.
Binary file added test_fixture/v1_vlog_corrupt/segments/0
Binary file not shown.
Binary file modified test_fixture/v1_vlog_corrupt/segments/1
Binary file not shown.
Binary file modified test_fixture/v1_vlog_corrupt/vlog_manifest
Binary file not shown.
4 changes: 2 additions & 2 deletions tests/accidental_drop_rc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ fn accidental_drop_rc() -> value_log::Result<()> {
let mut index_writer = MockIndexWriter(index.clone());
let mut writer = value_log.get_writer()?;

let handle = writer.get_next_value_handle(key.as_bytes());
let handle = writer.get_next_value_handle();
index_writer.insert_indirect(key.as_bytes(), handle, value.len() as u32)?;

writer.write(key.as_bytes(), value.as_bytes())?;
Expand Down Expand Up @@ -53,7 +53,7 @@ fn accidental_drop_rc() -> value_log::Result<()> {
let mut index_writer = MockIndexWriter(index.clone());
let mut writer = value_log.get_writer()?;

let handle = writer.get_next_value_handle(key.as_bytes());
let handle = writer.get_next_value_handle();
index_writer.insert_indirect(key.as_bytes(), handle, value.len() as u32)?;

writer.write(key.as_bytes(), value.as_bytes())?;
Expand Down
4 changes: 2 additions & 2 deletions tests/basic_gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ fn basic_gc() -> value_log::Result<()> {

let key = key.as_bytes();

let handle = writer.get_next_value_handle(key);
let handle = writer.get_next_value_handle();
index_writer.insert_indirect(key, handle, value.len() as u32)?;

writer.write(key, value)?;
Expand Down Expand Up @@ -57,7 +57,7 @@ fn basic_gc() -> value_log::Result<()> {

let key = key.as_bytes();

let handle = writer.get_next_value_handle(key);
let handle = writer.get_next_value_handle();
index_writer.insert_indirect(key, handle, value.len() as u32)?;

writer.write(key, value)?;
Expand Down
2 changes: 1 addition & 1 deletion tests/basic_kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn basic_kv() -> value_log::Result<()> {

let key = key.as_bytes();

let handle = writer.get_next_value_handle(key);
let handle = writer.get_next_value_handle();
index_writer.insert_indirect(key, handle, value.len() as u32)?;

writer.write(key, value)?;
Expand Down
4 changes: 2 additions & 2 deletions tests/gc_space_amp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ fn gc_space_amp_target_1() -> value_log::Result<()> {
let mut index_writer = MockIndexWriter(index.clone());
let mut writer = value_log.get_writer()?;

let handle = writer.get_next_value_handle(key);
let handle = writer.get_next_value_handle();
index_writer.insert_indirect(key, handle, value.len() as u32)?;

writer.write(key, value.as_bytes())?;
Expand All @@ -33,7 +33,7 @@ fn gc_space_amp_target_1() -> value_log::Result<()> {

let key = key.as_bytes();

let handle = writer.get_next_value_handle(key);
let handle = writer.get_next_value_handle();
index_writer.insert_indirect(key, handle, value.len() as u32)?;

writer.write(key, value.as_bytes())?;
Expand Down
2 changes: 1 addition & 1 deletion tests/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn basic_recovery() -> value_log::Result<()> {

let key = key.as_bytes();

let handle = writer.get_next_value_handle(key);
let handle = writer.get_next_value_handle();
index_writer.insert_indirect(key, handle, value.len() as u32)?;

writer.write(key, value)?;
Expand Down
2 changes: 1 addition & 1 deletion tests/rollover_index_fail_finish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ fn rollover_index_fail_finish() -> value_log::Result<()> {
let value = key.repeat(10_000);
let value = value.as_bytes();

let handle = writer.get_next_value_handle(key.as_bytes());
let handle = writer.get_next_value_handle();
index_writer.insert_indirect(key.as_bytes(), handle, value.len() as u32)?;

writer.write(key.as_bytes(), value)?;
Expand Down
4 changes: 2 additions & 2 deletions tests/space_amp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ fn worst_case_space_amp() -> value_log::Result<()> {
let mut index_writer = MockIndexWriter(index.clone());
let mut writer = value_log.get_writer()?;

let handle = writer.get_next_value_handle(key);
let handle = writer.get_next_value_handle();
index_writer.insert_indirect(key, handle, value.len() as u32)?;

writer.write(key, value.as_bytes())?;
Expand Down Expand Up @@ -61,7 +61,7 @@ fn no_overlap_space_amp() -> value_log::Result<()> {
let mut index_writer = MockIndexWriter(index.clone());
let mut writer = value_log.get_writer()?;

let handle = writer.get_next_value_handle(key.as_bytes());
let handle = writer.get_next_value_handle();
index_writer.insert_indirect(key.as_bytes(), handle, value.len() as u32)?;

writer.write(key.as_bytes(), value.as_bytes())?;
Expand Down
26 changes: 24 additions & 2 deletions tests/vlog_load_fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,31 @@ fn vlog_load_v1() -> value_log::Result<()> {

let value_log = ValueLog::open(path, Config::default())?;

assert_eq!(4, value_log.get_reader()?.count());
let count = {
let mut count = 0;

for kv in value_log.get_reader()? {
let _ = kv?;
count += 1;
}

count
};

assert_eq!(4, count);
assert_eq!(2, value_log.segment_count());
assert_eq!(0, value_log.verify()?);

Ok(())
}

// TODO: corrupt
#[test]
fn vlog_load_v1_corrupt() -> value_log::Result<()> {
let path = std::path::Path::new("test_fixture/v1_vlog_corrupt");

let value_log = ValueLog::open(path, Config::default())?;

assert_eq!(2, value_log.verify()?);

Ok(())
}

0 comments on commit 576c843

Please sign in to comment.