From 576c84371217c590e71a5cb4479fbfed388cfb2f Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Thu, 25 Jul 2024 15:51:15 +0200 Subject: [PATCH] fix tests --- src/lib.rs | 2 +- src/segment/merge.rs | 8 ++++-- src/segment/multi_writer.rs | 14 +++------ src/segment/reader.rs | 19 ++++++------ src/segment/writer.rs | 7 +++-- src/value_log.rs | 32 ++++++++++++++++----- test_fixture/v1_vlog/segments/0 | Bin 0 -> 370 bytes test_fixture/v1_vlog/segments/1 | Bin 432 -> 464 bytes test_fixture/v1_vlog/vlog_manifest | Bin 16 -> 24 bytes test_fixture/v1_vlog_corrupt/segments/0 | Bin 0 -> 370 bytes test_fixture/v1_vlog_corrupt/segments/1 | Bin 432 -> 464 bytes test_fixture/v1_vlog_corrupt/vlog_manifest | Bin 16 -> 24 bytes tests/accidental_drop_rc.rs | 4 +-- tests/basic_gc.rs | 4 +-- tests/basic_kv.rs | 2 +- tests/gc_space_amp.rs | 4 +-- tests/recovery.rs | 2 +- tests/rollover_index_fail_finish.rs | 2 +- tests/space_amp.rs | 4 +-- tests/vlog_load_fixture.rs | 26 +++++++++++++++-- 20 files changed, 83 insertions(+), 47 deletions(-) create mode 100644 test_fixture/v1_vlog/segments/0 create mode 100644 test_fixture/v1_vlog_corrupt/segments/0 diff --git a/src/lib.rs b/src/lib.rs index 56f36db..232ac38 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -57,7 +57,7 @@ //! //! let key = key.as_bytes(); //! -//! let handle = writer.get_next_value_handle(key); +//! let handle = writer.get_next_value_handle(); //! index_writer.insert_indirect(key, handle, value.len() as u32)?; //! //! writer.write(key, value)?; diff --git a/src/segment/merge.rs b/src/segment/merge.rs index 34576a4..f9b9fe2 100644 --- a/src/segment/merge.rs +++ b/src/segment/merge.rs @@ -12,6 +12,7 @@ struct IteratorValue { key: Arc<[u8]>, value: Arc<[u8]>, segment_id: SegmentId, + crc: u32, } impl PartialEq for IteratorValue { @@ -53,7 +54,7 @@ impl MergeReader { let reader = self.readers.get_mut(idx).expect("iter should exist"); if let Some(value) = reader.next() { - let (k, v) = value?; + let (k, v, crc) = value?; let segment_id = reader.segment_id; self.heap.push(IteratorValue { @@ -61,6 +62,7 @@ impl MergeReader { key: k, value: v, segment_id, + crc, }); } @@ -77,7 +79,7 @@ impl MergeReader { } impl Iterator for MergeReader { - type Item = crate::Result<(Arc<[u8]>, Arc<[u8]>, SegmentId)>; + type Item = crate::Result<(Arc<[u8]>, Arc<[u8]>, SegmentId, u32)>; fn next(&mut self) -> Option { if self.heap.is_empty() { @@ -105,7 +107,7 @@ impl Iterator for MergeReader { } } - return Some(Ok((head.key, head.value, head.segment_id))); + return Some(Ok((head.key, head.value, head.segment_id, head.crc))); } None diff --git a/src/segment/multi_writer.rs b/src/segment/multi_writer.rs index 49db050..62a5777 100644 --- a/src/segment/multi_writer.rs +++ b/src/segment/multi_writer.rs @@ -1,4 +1,4 @@ -use super::writer::{Writer, BLOB_HEADER_MAGIC}; +use super::writer::Writer; use crate::{ compression::Compressor, id::{IdGenerator, SegmentId}, @@ -75,22 +75,16 @@ impl MultiWriter { /// /// This can be used to index an item into an external `Index`. #[must_use] - pub fn get_next_value_handle(&self, key: &[u8]) -> ValueHandle { + pub fn get_next_value_handle(&self) -> ValueHandle { ValueHandle { - offset: self.offset(key), + offset: self.offset(), segment_id: self.segment_id(), } } #[must_use] - fn offset(&self, key: &[u8]) -> u64 { + fn offset(&self) -> u64 { self.get_active_writer().offset() - // NOTE: Point to the value record, not the key - // The key is not really needed when dereferencing a value handle - + (BLOB_HEADER_MAGIC.len() - + std::mem::size_of::() - + key.len() - ) as u64 } #[must_use] diff --git a/src/segment/reader.rs b/src/segment/reader.rs index 9eeab89..1b09558 100644 --- a/src/segment/reader.rs +++ b/src/segment/reader.rs @@ -34,7 +34,7 @@ impl Reader { } impl Iterator for Reader { - type Item = crate::Result<(Arc<[u8]>, Arc<[u8]>)>; + type Item = crate::Result<(Arc<[u8]>, Arc<[u8]>, u32)>; fn next(&mut self) -> Option { if self.is_terminated { @@ -60,7 +60,7 @@ impl Iterator for Reader { } } - let key_len = match self.inner.read_u16::() { + let crc = match self.inner.read_u32::() { Ok(v) => v, Err(e) => { if e.kind() == std::io::ErrorKind::UnexpectedEof { @@ -70,13 +70,7 @@ impl Iterator for Reader { } }; - let mut key = vec![0; key_len.into()]; - if let Err(e) = self.inner.read_exact(&mut key) { - return Some(Err(e.into())); - }; - - // TODO: handle crc - let _crc = match self.inner.read_u32::() { + let key_len = match self.inner.read_u16::() { Ok(v) => v, Err(e) => { if e.kind() == std::io::ErrorKind::UnexpectedEof { @@ -86,6 +80,11 @@ impl Iterator for Reader { } }; + let mut key = vec![0; key_len.into()]; + if let Err(e) = self.inner.read_exact(&mut key) { + return Some(Err(e.into())); + }; + let val_len = match self.inner.read_u32::() { Ok(v) => v, Err(e) => { @@ -101,6 +100,6 @@ impl Iterator for Reader { return Some(Err(e.into())); }; - Some(Ok((key.into(), val.into()))) + Some(Ok((key.into(), val.into(), crc))) } } diff --git a/src/segment/writer.rs b/src/segment/writer.rs index d6c8825..5ae3116 100644 --- a/src/segment/writer.rs +++ b/src/segment/writer.rs @@ -107,12 +107,16 @@ impl Writer { }; let mut hasher = crc32fast::Hasher::new(); + hasher.update(key); hasher.update(&value); let crc = hasher.finalize(); // Write header self.active_writer.write_all(BLOB_HEADER_MAGIC)?; + // Write CRC + self.active_writer.write_u32::(crc)?; + // Write key // NOTE: Truncation is okay and actually needed @@ -121,9 +125,6 @@ impl Writer { .write_u16::(key.len() as u16)?; self.active_writer.write_all(key)?; - // Write CRC - self.active_writer.write_u32::(crc)?; - // Write value // NOTE: Truncation is okay and actually needed diff --git a/src/value_log.rs b/src/value_log.rs index f54bcca..60ce813 100644 --- a/src/value_log.rs +++ b/src/value_log.rs @@ -5,7 +5,7 @@ use crate::{ manifest::{SegmentManifest, SEGMENTS_FOLDER, VLOG_MARKER}, path::absolute_path, scanner::{Scanner, SizeMap}, - segment::merge::MergeReader, + segment::{merge::MergeReader, writer::BLOB_HEADER_MAGIC}, value::UserValue, version::Version, Config, IndexReader, SegmentWriter, ValueHandle, @@ -89,7 +89,21 @@ impl ValueLog { pub fn verify(&self) -> crate::Result { let _lock = self.rollover_guard.lock().expect("lock is poisoned"); - Ok(0) + let mut sum = 0; + + for item in self.get_reader()? { + let (k, v, _, crc) = item?; + + let mut hasher = crc32fast::Hasher::new(); + hasher.update(&k); + hasher.update(&v); + + if hasher.finalize() != crc { + sum += 1; + } + } + + Ok(sum) } /// Creates a new empty value log in a directory. @@ -208,10 +222,16 @@ impl ValueLog { } let mut reader = BufReader::new(File::open(&segment.path)?); - reader.seek(std::io::SeekFrom::Start(handle.offset))?; + reader.seek(std::io::SeekFrom::Start( + handle.offset + BLOB_HEADER_MAGIC.len() as u64, + ))?; + // TODO: handle CRC let _crc = reader.read_u32::()?; + let key_len = reader.read_u16::()?; + reader.seek_relative(key_len.into())?; + let val_len = reader.read_u32::()?; let mut value = vec![0; val_len as usize]; @@ -219,8 +239,6 @@ impl ValueLog { let value = self.config.compression.decompress(&value)?; - // TODO: handle CRC - let val: UserValue = value.into(); self.blob_cache @@ -497,7 +515,7 @@ impl ValueLog { .use_compression(self.config.compression.clone()); for item in reader { - let (k, v, segment_id) = item?; + let (k, v, segment_id, _) = item?; match index_reader.get(&k)? { // If this value is in an older segment, we can discard it @@ -506,7 +524,7 @@ impl ValueLog { _ => {} } - let vhandle = writer.get_next_value_handle(&k); + let vhandle = writer.get_next_value_handle(); index_writer.insert_indirect(&k, vhandle, v.len() as u32)?; writer.write(&k, &v)?; diff --git a/test_fixture/v1_vlog/segments/0 b/test_fixture/v1_vlog/segments/0 new file mode 100644 index 0000000000000000000000000000000000000000..41abcf7fcf03ea1c3f7d599c43bfa6ae6878a953 GIT binary patch literal 370 zcmWIZad-0ZcQSmjeV;P}VQy@bA k?!mq;h9EW&us|pz5@%*Wly$KT0NGi>Ka2F64N1_qJvRP~}%h2+H2^o$aPq|}n~)YLqfGFihD zS`3UyKxM)q8JWcjIhkpx3W<3s3MoaYiMcRUKjpjm7#Nd*s-&GtOB5>eOTlV#Qx(d= tW+!E)r>7QyRC9yu-k{g2!@!s_G0T^6)x?9(Ctl#>2=noG4+-)y1OO60GZFv* delta 84 zcmcb>yn&gWfiZD*36H}>ehF0u#w1-enMei(2H}v5%wmO{%(PU6#Jm)Rl%mwcTn5Hu nD{kM(6Jz9odQ#4qmam)G<;&PL@!|7{A2>O}eEi)*f_w}C$J!a1 diff --git a/test_fixture/v1_vlog/vlog_manifest b/test_fixture/v1_vlog/vlog_manifest index c9153d40bb1eff9d30517cf631198fc93568a105..315a6ac6dc940cf16123ff280fa7e31309922c61 100644 GIT binary patch literal 24 PcmZQz00Sln1t%E+06PE# literal 16 PcmZQz00Tw{#lQ#v02%-T diff --git a/test_fixture/v1_vlog_corrupt/segments/0 b/test_fixture/v1_vlog_corrupt/segments/0 new file mode 100644 index 0000000000000000000000000000000000000000..41abcf7fcf03ea1c3f7d599c43bfa6ae6878a953 GIT binary patch literal 370 zcmWIZad-0ZcQSmjeV;P}VQy@bA k?!mq;h9EW&us|pz5@%*Wly$KT0NGi>Ka2F64N1_qJvRP~}%h2+H2^o$aPq|}n~)YLqfGFihD zS`3UyKxM)q8JWcjIhkpx3W<3s3MoaYiMcRUKjpjm7#Nd*s-&GtOB5>eOTlV#Qx(d= zW+!F3r>7QyRLg_x-k{g2!@!sVR4(I_TB2U8P*PN>P?E2ZmgzpR&XRG}#DmW#Uf|>i M^YM2N3Gy)n01XW|nE(I) delta 108 zcmcb>yn&gWfiZD*36H}>ehF0u#w1-enMei(2H}v5%wmO{%(PU6#Jm)Rl%mwcTn5Hu zD{kM(6Jw<1)6-Ln7#LH|n3k^tDwgp{Em1F4C@HE`D9Kky%S@j*&yulg;=|_?KX7t{ M`S`ns1o;>O09OSe1^@s6 diff --git a/test_fixture/v1_vlog_corrupt/vlog_manifest b/test_fixture/v1_vlog_corrupt/vlog_manifest index c9153d40bb1eff9d30517cf631198fc93568a105..315a6ac6dc940cf16123ff280fa7e31309922c61 100644 GIT binary patch literal 24 PcmZQz00Sln1t%E+06PE# literal 16 PcmZQz00Tw{#lQ#v02%-T diff --git a/tests/accidental_drop_rc.rs b/tests/accidental_drop_rc.rs index 050d2b8..95c5420 100644 --- a/tests/accidental_drop_rc.rs +++ b/tests/accidental_drop_rc.rs @@ -22,7 +22,7 @@ fn accidental_drop_rc() -> value_log::Result<()> { let mut index_writer = MockIndexWriter(index.clone()); let mut writer = value_log.get_writer()?; - let handle = writer.get_next_value_handle(key.as_bytes()); + let handle = writer.get_next_value_handle(); index_writer.insert_indirect(key.as_bytes(), handle, value.len() as u32)?; writer.write(key.as_bytes(), value.as_bytes())?; @@ -53,7 +53,7 @@ fn accidental_drop_rc() -> value_log::Result<()> { let mut index_writer = MockIndexWriter(index.clone()); let mut writer = value_log.get_writer()?; - let handle = writer.get_next_value_handle(key.as_bytes()); + let handle = writer.get_next_value_handle(); index_writer.insert_indirect(key.as_bytes(), handle, value.len() as u32)?; writer.write(key.as_bytes(), value.as_bytes())?; diff --git a/tests/basic_gc.rs b/tests/basic_gc.rs index 4b2ef80..34c1d49 100644 --- a/tests/basic_gc.rs +++ b/tests/basic_gc.rs @@ -22,7 +22,7 @@ fn basic_gc() -> value_log::Result<()> { let key = key.as_bytes(); - let handle = writer.get_next_value_handle(key); + let handle = writer.get_next_value_handle(); index_writer.insert_indirect(key, handle, value.len() as u32)?; writer.write(key, value)?; @@ -57,7 +57,7 @@ fn basic_gc() -> value_log::Result<()> { let key = key.as_bytes(); - let handle = writer.get_next_value_handle(key); + let handle = writer.get_next_value_handle(); index_writer.insert_indirect(key, handle, value.len() as u32)?; writer.write(key, value)?; diff --git a/tests/basic_kv.rs b/tests/basic_kv.rs index 029582f..6f4af13 100644 --- a/tests/basic_kv.rs +++ b/tests/basic_kv.rs @@ -23,7 +23,7 @@ fn basic_kv() -> value_log::Result<()> { let key = key.as_bytes(); - let handle = writer.get_next_value_handle(key); + let handle = writer.get_next_value_handle(); index_writer.insert_indirect(key, handle, value.len() as u32)?; writer.write(key, value)?; diff --git a/tests/gc_space_amp.rs b/tests/gc_space_amp.rs index f93ad75..edeaac7 100644 --- a/tests/gc_space_amp.rs +++ b/tests/gc_space_amp.rs @@ -22,7 +22,7 @@ fn gc_space_amp_target_1() -> value_log::Result<()> { let mut index_writer = MockIndexWriter(index.clone()); let mut writer = value_log.get_writer()?; - let handle = writer.get_next_value_handle(key); + let handle = writer.get_next_value_handle(); index_writer.insert_indirect(key, handle, value.len() as u32)?; writer.write(key, value.as_bytes())?; @@ -33,7 +33,7 @@ fn gc_space_amp_target_1() -> value_log::Result<()> { let key = key.as_bytes(); - let handle = writer.get_next_value_handle(key); + let handle = writer.get_next_value_handle(); index_writer.insert_indirect(key, handle, value.len() as u32)?; writer.write(key, value.as_bytes())?; diff --git a/tests/recovery.rs b/tests/recovery.rs index 36bcd7f..2f68e02 100644 --- a/tests/recovery.rs +++ b/tests/recovery.rs @@ -23,7 +23,7 @@ fn basic_recovery() -> value_log::Result<()> { let key = key.as_bytes(); - let handle = writer.get_next_value_handle(key); + let handle = writer.get_next_value_handle(); index_writer.insert_indirect(key, handle, value.len() as u32)?; writer.write(key, value)?; diff --git a/tests/rollover_index_fail_finish.rs b/tests/rollover_index_fail_finish.rs index 6af0059..9452f73 100644 --- a/tests/rollover_index_fail_finish.rs +++ b/tests/rollover_index_fail_finish.rs @@ -33,7 +33,7 @@ fn rollover_index_fail_finish() -> value_log::Result<()> { let value = key.repeat(10_000); let value = value.as_bytes(); - let handle = writer.get_next_value_handle(key.as_bytes()); + let handle = writer.get_next_value_handle(); index_writer.insert_indirect(key.as_bytes(), handle, value.len() as u32)?; writer.write(key.as_bytes(), value)?; diff --git a/tests/space_amp.rs b/tests/space_amp.rs index 6822d50..00a6efa 100644 --- a/tests/space_amp.rs +++ b/tests/space_amp.rs @@ -22,7 +22,7 @@ fn worst_case_space_amp() -> value_log::Result<()> { let mut index_writer = MockIndexWriter(index.clone()); let mut writer = value_log.get_writer()?; - let handle = writer.get_next_value_handle(key); + let handle = writer.get_next_value_handle(); index_writer.insert_indirect(key, handle, value.len() as u32)?; writer.write(key, value.as_bytes())?; @@ -61,7 +61,7 @@ fn no_overlap_space_amp() -> value_log::Result<()> { let mut index_writer = MockIndexWriter(index.clone()); let mut writer = value_log.get_writer()?; - let handle = writer.get_next_value_handle(key.as_bytes()); + let handle = writer.get_next_value_handle(); index_writer.insert_indirect(key.as_bytes(), handle, value.len() as u32)?; writer.write(key.as_bytes(), value.as_bytes())?; diff --git a/tests/vlog_load_fixture.rs b/tests/vlog_load_fixture.rs index afe2748..136f752 100644 --- a/tests/vlog_load_fixture.rs +++ b/tests/vlog_load_fixture.rs @@ -7,9 +7,31 @@ fn vlog_load_v1() -> value_log::Result<()> { let value_log = ValueLog::open(path, Config::default())?; - assert_eq!(4, value_log.get_reader()?.count()); + let count = { + let mut count = 0; + + for kv in value_log.get_reader()? { + let _ = kv?; + count += 1; + } + + count + }; + + assert_eq!(4, count); + assert_eq!(2, value_log.segment_count()); + assert_eq!(0, value_log.verify()?); Ok(()) } -// TODO: corrupt +#[test] +fn vlog_load_v1_corrupt() -> value_log::Result<()> { + let path = std::path::Path::new("test_fixture/v1_vlog_corrupt"); + + let value_log = ValueLog::open(path, Config::default())?; + + assert_eq!(2, value_log.verify()?); + + Ok(()) +}