diff --git a/Cargo.toml b/Cargo.toml index 6e3b68b..bfbc43b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,11 +24,11 @@ lz4 = ["dep:lz4_flex"] [dependencies] byteorder = "1.5.0" -chrono = "0.4.34" crc32fast = "1.4.0" log = "0.4.20" lz4_flex = { version = "0.11.2", optional = true } min-max-heap = "1.3.0" +path-absolutize = "3.1.1" quick_cache = "0.4.1" rand = "0.8.5" serde = { version = "1.0.197", default-features = false, features = [ @@ -37,11 +37,11 @@ serde = { version = "1.0.197", default-features = false, features = [ "derive", ] } serde_json = { version = "1.0.114" } +tempfile = "3.10.0" [dev-dependencies] criterion = "0.5.1" env_logger = "0.11.2" -tempfile = "3.10.0" test-log = "0.2.15" [[bench]] diff --git a/benches/value_log.rs b/benches/value_log.rs index 5a64781..9438e52 100644 --- a/benches/value_log.rs +++ b/benches/value_log.rs @@ -9,11 +9,7 @@ use value_log::{BlobCache, Config, Index, ValueHandle, ValueLog}; #[derive(Default)] pub struct DebugIndex(RwLock, ValueHandle>>); -impl Index for DebugIndex { - fn get(&self, key: &[u8]) -> std::io::Result> { - Ok(self.0.read().expect("lock is poisoned").get(key).cloned()) - } - +impl DebugIndex { fn insert_indirection(&self, key: &[u8], value: ValueHandle) -> std::io::Result<()> { self.0 .write() @@ -24,6 +20,12 @@ impl Index for DebugIndex { } } +impl Index for DebugIndex { + fn get(&self, key: &[u8]) -> std::io::Result> { + Ok(self.0.read().expect("lock is poisoned").get(key).cloned()) + } +} + fn load_value(c: &mut Criterion) { let mut group = c.benchmark_group("load blob"); @@ -48,7 +50,7 @@ fn load_value(c: &mut Criterion) { let folder = tempfile::tempdir().unwrap(); let vl_path = folder.path(); - let value_log = ValueLog::new( + let value_log = ValueLog::open( vl_path, Config::default().blob_cache(Arc::new(BlobCache::with_capacity_bytes(0))), index.clone(), @@ -65,13 +67,7 @@ fn load_value(c: &mut Criterion) { let offset = writer.offset(key.as_bytes()); index - .insert_indirection( - key.as_bytes(), - ValueHandle { - offset, - segment_id: segment_id.clone(), - }, - ) + .insert_indirection(key.as_bytes(), ValueHandle { offset, segment_id }) .unwrap(); let mut data = vec![0u8; size]; @@ -101,7 +97,7 @@ fn load_value(c: &mut Criterion) { let folder = tempfile::tempdir().unwrap(); let vl_path = folder.path(); - let value_log = ValueLog::new( + let value_log = ValueLog::open( vl_path, Config::default() .blob_cache(Arc::new(BlobCache::with_capacity_bytes(64 * 1_024 * 1_024))), @@ -119,13 +115,7 @@ fn load_value(c: &mut Criterion) { let offset = writer.offset(key.as_bytes()); index - .insert_indirection( - key.as_bytes(), - ValueHandle { - offset, - segment_id: segment_id.clone(), - }, - ) + .insert_indirection(key.as_bytes(), ValueHandle { offset, segment_id }) .unwrap(); let mut data = vec![0u8; size]; @@ -161,7 +151,7 @@ fn compression(c: &mut Criterion) { let folder = tempfile::tempdir().unwrap(); let vl_path = folder.path(); - let value_log = ValueLog::new( + let value_log = ValueLog::open( vl_path, Config::default().blob_cache(Arc::new(BlobCache::with_capacity_bytes(0))), index.clone(), @@ -180,13 +170,7 @@ fn compression(c: &mut Criterion) { let offset = writer.offset(key.as_bytes()); index - .insert_indirection( - key.as_bytes(), - ValueHandle { - offset, - segment_id: segment_id.clone(), - }, - ) + .insert_indirection(key.as_bytes(), ValueHandle { offset, segment_id }) .unwrap(); let mut data = vec![0u8; size_mb * 1_024 * 1_024]; @@ -200,13 +184,7 @@ fn compression(c: &mut Criterion) { let offset = writer.offset(key.as_bytes()); index - .insert_indirection( - key.as_bytes(), - ValueHandle { - offset, - segment_id: segment_id.clone(), - }, - ) + .insert_indirection(key.as_bytes(), ValueHandle { offset, segment_id }) .unwrap(); let dummy = b"abcdefgh"; diff --git a/src/blob_cache.rs b/src/blob_cache.rs index a339222..e12f6b6 100644 --- a/src/blob_cache.rs +++ b/src/blob_cache.rs @@ -9,8 +9,10 @@ type Item = Arc<[u8]>; struct BlobWeighter; impl Weighter for BlobWeighter { + // NOTE: quick_cache only supports u32 as weight, but that's fine + // 4 GB blobs are too big anyway + #[allow(clippy::cast_possible_truncation)] fn weight(&self, _: &CacheKey, blob: &Item) -> u32 { - // TODO: quick_cache only supports u32 as weight...? blob.len() as u32 } } diff --git a/src/handle.rs b/src/handle.rs index 851f3ef..a7900c7 100644 --- a/src/handle.rs +++ b/src/handle.rs @@ -1,13 +1,13 @@ +use crate::id::SegmentId; use serde::{Deserialize, Serialize}; use std::hash::Hash; -use std::sync::Arc; /// A value handle points into the value log. #[allow(clippy::module_name_repetitions)] #[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)] pub struct ValueHandle { /// Segment ID - pub segment_id: Arc, + pub segment_id: SegmentId, /// Offset in file pub offset: u64, diff --git a/src/id.rs b/src/id.rs index a187dae..4bcebbd 100644 --- a/src/id.rs +++ b/src/id.rs @@ -1,79 +1,22 @@ -use chrono::{Datelike, Timelike}; -use rand::Rng; -use std::sync::Arc; +use std::sync::{atomic::AtomicU64, Arc}; -const BASE_36_RADIX: u32 = 36; - -fn to_base36(mut x: u32) -> String { - let mut result = vec![]; - - loop { - let m = x % BASE_36_RADIX; - x /= BASE_36_RADIX; - - result.push(std::char::from_digit(m, BASE_36_RADIX).expect("should be hex digit")); - - if x == 0 { - break; - } - } - - result.into_iter().rev().collect() -} - -/// Generates an ID for a segment -/// -/// Like #[allow(clippy::module_name_repetitions)] -#[doc(hidden)] -#[must_use] -pub fn generate_segment_id() -> Arc { - let now = chrono::Utc::now(); +pub type SegmentId = u64; - let year = now.year().unsigned_abs(); - let month = now.month() as u8; - let day = (now.day() - 1) as u8; - - let hour = now.hour() as u8; - let min = now.minute() as u8; - - let sec = now.second() as u8; - let nano = now.timestamp_subsec_nanos(); +#[allow(clippy::module_name_repetitions)] +#[derive(Clone, Default)] +pub struct IdGenerator(Arc); - let mut rng = rand::thread_rng(); - let random = rng.gen::(); +impl std::ops::Deref for IdGenerator { + type Target = Arc; - format!( - "{:0>4}_{}{}{:0>2}{:0>2}_{:0>2}{:0>8}_{:0>4}", - to_base36(year), - // - to_base36(u32::from(month)), - to_base36(u32::from(day)), - to_base36(u32::from(hour)), - to_base36(u32::from(min)), - // - to_base36(u32::from(sec)), - to_base36(nano), - // - to_base36(u32::from(random)), - ) - .into() + fn deref(&self) -> &Self::Target { + &self.0 + } } -#[cfg(test)] -mod tests { - use super::*; - use test_log::test; - - #[test] - pub fn id_monotonic_order() { - for _ in 0..1_000 { - let ids = (0..100).map(|_| generate_segment_id()).collect::>(); - - let mut sorted = ids.clone(); - sorted.sort(); - - assert_eq!(ids, sorted, "ID is not monotonic"); - } +impl IdGenerator { + pub fn next(&self) -> SegmentId { + self.fetch_add(1, std::sync::atomic::Ordering::SeqCst) } } diff --git a/src/index.rs b/src/index.rs index 2345c59..7851ae7 100644 --- a/src/index.rs +++ b/src/index.rs @@ -13,18 +13,6 @@ pub trait Index { /// /// Will return `Err` if an IO error occurs. fn get(&self, key: &[u8]) -> std::io::Result>; - - // TODO: shouldn'be part of Index... remove - // TODO: flushing to value log should use `Writer` (atomic) - - /// Inserts an value handle into the index. - /// - /// This method is called during value log garbage collection. - /// - /// # Errors - /// - /// Will return `Err` if an IO error occurs. - fn insert_indirection(&self, key: &[u8], value: ValueHandle) -> std::io::Result<()>; } /// Trait that allows writing into an external index diff --git a/src/lib.rs b/src/lib.rs index 28de0b4..dfc3896 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,6 +44,8 @@ mod error; mod handle; mod id; mod index; +mod manifest; +mod path; mod segment; mod value_log; mod version; diff --git a/src/main.rs b/src/main._rs similarity index 98% rename from src/main.rs rename to src/main._rs index 160f4da..85dc7e6 100644 --- a/src/main.rs +++ b/src/main._rs @@ -46,7 +46,7 @@ fn main() -> value_log::Result<()> { } std::fs::create_dir_all(vl_path)?; - let value_log = ValueLog::new(vl_path, Config::default(), index.clone())?; + let value_log = ValueLog::open(vl_path, Config::default(), index.clone())?; { let mut writer = value_log.get_writer()?; @@ -296,7 +296,7 @@ fn main() -> value_log::Result<()> { ); for id in value_log.segments.read().unwrap().keys() { - value_log.refresh_stats(id)?; + value_log.refresh_stats(*id)?; } eprintln!("=== after refresh ==="); diff --git a/src/manifest.rs b/src/manifest.rs new file mode 100644 index 0000000..3e947fe --- /dev/null +++ b/src/manifest.rs @@ -0,0 +1,266 @@ +use crate::{id::SegmentId, segment::stats::Stats, Segment, SegmentWriter as MultiWriter}; +use std::{ + collections::HashMap, + fs::File, + io::Write, + path::{Path, PathBuf}, + sync::{atomic::AtomicU64, Arc}, +}; + +pub const VLOG_MARKER: &str = ".vlog"; +pub const SEGMENTS_FOLDER: &str = "segments"; + +/// Atomically rewrites a file +fn rewrite_atomic>(path: P, content: &[u8]) -> std::io::Result<()> { + let path = path.as_ref(); + let folder = path.parent().expect("should have a parent"); + + let mut temp_file = tempfile::NamedTempFile::new_in(folder)?; + temp_file.write_all(content)?; + temp_file.persist(path)?; + + #[cfg(not(target_os = "windows"))] + { + // TODO: Not sure if the fsync is really required, but just for the sake of it... + // TODO: also not sure why it fails on Windows... + let file = File::open(path)?; + file.sync_all()?; + } + + Ok(()) +} + +#[allow(clippy::module_name_repetitions)] +pub struct SegmentManifest { + path: PathBuf, + pub(crate) segments: HashMap>, +} + +impl SegmentManifest { + fn remove_unfinished_segments>( + folder: P, + registered_ids: &[u64], + ) -> crate::Result<()> { + // TODO: + Ok(()) + } + + pub(crate) fn recover>(folder: P) -> crate::Result { + let folder = folder.as_ref(); + let path = folder.join("segments.json"); + log::debug!("Loading value log manifest from {}", path.display()); + + let str = std::fs::read_to_string(&path)?; + let ids: Vec = serde_json::from_str(&str).expect("deserialize error"); + + Self::remove_unfinished_segments(folder, &ids)?; + + let segments = { + let mut map = HashMap::default(); + + for id in ids { + map.insert( + id, + Arc::new(Segment { + id, + path: folder.join(SEGMENTS_FOLDER).join(id.to_string()), + stats: Stats::default(), + }), + ); + } + + map + }; + + Ok(Self { path, segments }) + } + + pub(crate) fn create_new>(folder: P) -> crate::Result { + let path = folder.as_ref().join("segments.json"); + + let mut m = Self { + path, + segments: HashMap::default(), + }; + m.write_to_disk()?; + + Ok(m) + } + + pub fn drop_segments(&mut self, ids: &[u64]) -> crate::Result<()> { + self.segments.retain(|x, _| !ids.contains(x)); + self.write_to_disk() + } + + pub fn register(&mut self, writer: MultiWriter) -> crate::Result<()> { + let writers = writer.finish()?; + + for writer in writers { + let segment_id = writer.segment_id; + let segment_folder = writer.folder.clone(); + + self.segments.insert( + segment_id, + Arc::new(Segment { + id: segment_id, + path: segment_folder, + stats: Stats { + item_count: writer.item_count.into(), + total_bytes: writer.written_blob_bytes.into(), + stale_items: AtomicU64::default(), + stale_bytes: AtomicU64::default(), + }, + }), + ); + } + + self.write_to_disk() + } + + pub(crate) fn write_to_disk(&mut self) -> crate::Result<()> { + log::trace!("Writing segment manifest to {}", self.path.display()); + + let keys: Vec = self.segments.keys().copied().collect(); + + // NOTE: Serialization can't fail here + #[allow(clippy::expect_used)] + let json = serde_json::to_string_pretty(&keys).expect("should serialize"); + rewrite_atomic(&self.path, json.as_bytes())?; + + Ok(()) + } + + /// Gets a segment + #[must_use] + pub fn get_segment(&self, id: SegmentId) -> Option> { + self.segments.get(&id).cloned() + } + + /// Lists all segment IDs + #[must_use] + pub fn list_segment_ids(&self) -> Vec { + self.segments.keys().copied().collect() + } + + /// Lists all segments + #[must_use] + pub fn list_segments(&self) -> Vec> { + self.segments.values().cloned().collect() + } + + /// Returns the amount of bytes on disk that are occupied by blobs. + /// + /// This value may not be fresh, as it is only set after running [`ValueLog::refresh_stats`]. + #[must_use] + pub fn disk_space_used(&self) -> u64 { + self.segments + .values() + .map(|x| x.stats.total_bytes()) + .sum::() + } + + /// Returns the amount of bytes that can be freed on disk + /// if all segments were to be defragmented + /// + /// This value may not be fresh, as it is only set after running [`ValueLog::refresh_stats`]. + #[must_use] + pub fn reclaimable_bytes(&self) -> u64 { + self.segments + .values() + .map(|x| x.stats.get_stale_bytes()) + .sum::() + } + + /// Returns the amount of stale items + /// + /// This value may not be fresh, as it is only set after running [`ValueLog::refresh_stats`]. + #[must_use] + pub fn stale_items_count(&self) -> u64 { + self.segments + .values() + .map(|x| x.stats.get_stale_items()) + .sum::() + } + + /// Returns the percent of dead bytes in the value log + /// + /// This value may not be fresh, as it is only set after running [`ValueLog::refresh_stats`]. + #[must_use] + pub fn stale_ratio(&self) -> f32 { + let used_bytes = self + .segments + .values() + .map(|x| x.stats.total_bytes()) + .sum::(); + if used_bytes == 0 { + return 0.0; + } + + let stale_bytes = self + .segments + .values() + .map(|x| x.stats.get_stale_bytes()) + .sum::(); + if stale_bytes == 0 { + return 0.0; + } + + stale_bytes as f32 / used_bytes as f32 + } + + /// Returns the approximate space amplification + /// + /// Returns 0.0 if there are no items. + /// + /// This value may not be fresh, as it is only set after running [`ValueLog::refresh_stats`]. + #[must_use] + pub fn space_amp(&self) -> f32 { + let used_bytes = self + .segments + .values() + .map(|x| x.stats.total_bytes()) + .sum::(); + if used_bytes == 0 { + return 0.0; + } + + let stale_bytes = self + .segments + .values() + .map(|x| x.stats.get_stale_bytes()) + .sum::(); + + let alive_bytes = used_bytes - stale_bytes; + if alive_bytes == 0 { + return 0.0; + } + + used_bytes as f32 / alive_bytes as f32 + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::fs::File; + use std::io::Write; + use test_log::test; + + #[test] + fn test_atomic_rewrite() -> crate::Result<()> { + let dir = tempfile::tempdir()?; + + let path = dir.path().join("test.txt"); + { + let mut file = File::create(&path)?; + write!(file, "asdasdasdasdasd")?; + } + + rewrite_atomic(&path, b"newcontent")?; + + let content = std::fs::read_to_string(&path)?; + assert_eq!("newcontent", content); + + Ok(()) + } +} diff --git a/src/path.rs b/src/path.rs new file mode 100644 index 0000000..b5778c1 --- /dev/null +++ b/src/path.rs @@ -0,0 +1,11 @@ +use path_absolutize::Absolutize; +use std::path::{Path, PathBuf}; + +#[allow(clippy::module_name_repetitions)] +pub fn absolute_path>(path: P) -> PathBuf { + // TODO: replace with https://doc.rust-lang.org/std/path/fn.absolute.html once stable + path.as_ref() + .absolutize() + .expect("should be absolute path") + .into() +} diff --git a/src/segment/merge.rs b/src/segment/merge.rs index 0ae0907..16bc5e3 100644 --- a/src/segment/merge.rs +++ b/src/segment/merge.rs @@ -1,5 +1,5 @@ -use crate::SegmentReader; -use std::{cmp::Reverse, sync::Arc}; +use crate::{id::SegmentId, SegmentReader}; +use std::cmp::Reverse; // TODO: replace with MinHeap use min_max_heap::MinMaxHeap; @@ -11,7 +11,7 @@ struct IteratorValue { index: IteratorIndex, key: Vec, value: Vec, - segment_id: Arc, + segment_id: SegmentId, } impl PartialEq for IteratorValue { @@ -33,6 +33,7 @@ impl Ord for IteratorValue { } } +/// Interleaves multiple segment readers into a single, sorted stream #[allow(clippy::module_name_repetitions)] pub struct MergeReader { readers: Vec, @@ -53,7 +54,7 @@ impl MergeReader { if let Some(value) = reader.next() { let (k, v) = value?; - let segment_id = reader.segment_id.clone(); + let segment_id = reader.segment_id; self.heap.push(IteratorValue { index: idx, @@ -76,7 +77,7 @@ impl MergeReader { } impl Iterator for MergeReader { - type Item = crate::Result<(Vec, Vec, Arc)>; + type Item = crate::Result<(Vec, Vec, SegmentId)>; fn next(&mut self) -> Option { if self.heap.is_empty() { diff --git a/src/segment/mod.rs b/src/segment/mod.rs index fe26627..15c28e1 100644 --- a/src/segment/mod.rs +++ b/src/segment/mod.rs @@ -1,12 +1,13 @@ -use self::stats::Stats; -use std::{path::PathBuf, sync::Arc}; - pub mod merge; pub mod multi_writer; pub mod reader; pub mod stats; pub mod writer; +use self::stats::Stats; +use crate::id::SegmentId; +use std::path::PathBuf; + /// A disk segment is an immutable, sorted, contiguous file /// that contains key-value pairs. /// @@ -18,7 +19,7 @@ pub mod writer; #[derive(Debug)] pub struct Segment { /// Segment ID - pub id: Arc, + pub id: SegmentId, /// Segment path (folder) pub path: PathBuf, @@ -35,7 +36,7 @@ impl Segment { /// Will return `Err` if an IO error occurs. pub fn scan(&self) -> std::io::Result { let path = self.path.join("data"); - reader::Reader::new(path, self.id.clone()) + reader::Reader::new(path, self.id) } /// Always returns `false` @@ -45,6 +46,8 @@ impl Segment { /// Returns the amount of items in the segment pub fn len(&self) -> u64 { - self.stats.item_count + self.stats + .item_count + .load(std::sync::atomic::Ordering::Acquire) } } diff --git a/src/segment/multi_writer.rs b/src/segment/multi_writer.rs index 6e2737a..3e6721b 100644 --- a/src/segment/multi_writer.rs +++ b/src/segment/multi_writer.rs @@ -1,15 +1,13 @@ use super::writer::Writer; -use crate::id::generate_segment_id; -use std::{ - path::{Path, PathBuf}, - sync::Arc, -}; +use crate::id::{IdGenerator, SegmentId}; +use std::path::{Path, PathBuf}; /// Segment writer, may write multiple segments pub struct MultiWriter { root_folder: PathBuf, target_size: u64, writers: Vec, + id_generator: IdGenerator, } impl MultiWriter { @@ -19,12 +17,21 @@ impl MultiWriter { /// /// Will return `Err` if an IO error occurs. #[doc(hidden)] - pub fn new>(target_size: u64, folder: P) -> std::io::Result { + pub fn new>( + id_generator: IdGenerator, + target_size: u64, + folder: P, + ) -> std::io::Result { + let segment_id = id_generator.next(); + let folder = folder.as_ref(); - let segment_id = generate_segment_id(); - let path = folder.join("segments").join(&*segment_id).join("data"); + let path = folder + .join("segments") + .join(segment_id.to_string()) + .join("data"); Ok(Self { + id_generator, root_folder: folder.into(), target_size, writers: vec![Writer::new(segment_id, path)?], @@ -52,7 +59,7 @@ impl MultiWriter { /// Returns the segment ID #[must_use] - pub fn segment_id(&self) -> Arc { + pub fn segment_id(&self) -> SegmentId { self.get_active_writer().segment_id() } @@ -60,12 +67,12 @@ impl MultiWriter { fn rotate(&mut self) -> crate::Result<()> { log::debug!("Rotating segment writer"); - let new_segment_id = generate_segment_id(); + let new_segment_id = self.id_generator.next(); let path = self .root_folder .join("segments") - .join(&*new_segment_id) + .join(new_segment_id.to_string()) .join("data"); self.writers.push(Writer::new(new_segment_id, path)?); diff --git a/src/segment/reader.rs b/src/segment/reader.rs index a102285..f7168ac 100644 --- a/src/segment/reader.rs +++ b/src/segment/reader.rs @@ -1,14 +1,14 @@ +use crate::id::SegmentId; use byteorder::{BigEndian, ReadBytesExt}; use std::{ fs::File, io::{BufReader, Read}, path::PathBuf, - sync::Arc, }; /// Reads through a segment in order. pub struct Reader { - pub(crate) segment_id: Arc, + pub(crate) segment_id: SegmentId, inner: BufReader, } @@ -18,7 +18,7 @@ impl Reader { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub fn new>(path: P, segment_id: Arc) -> std::io::Result { + pub fn new>(path: P, segment_id: SegmentId) -> std::io::Result { let path = path.into(); let file_reader = BufReader::new(File::open(path)?); diff --git a/src/segment/stats.rs b/src/segment/stats.rs index 9965d95..ef27ae5 100644 --- a/src/segment/stats.rs +++ b/src/segment/stats.rs @@ -1,17 +1,25 @@ use serde::{Deserialize, Serialize}; use std::sync::atomic::AtomicU64; -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize)] pub struct Stats { - pub(crate) item_count: u64, + pub(crate) item_count: AtomicU64, pub(crate) stale_items: AtomicU64, - pub total_bytes: u64, + pub total_bytes: AtomicU64, pub(crate) stale_bytes: AtomicU64, // TODO: key range } impl Stats { + pub fn item_count(&self) -> u64 { + self.item_count.load(std::sync::atomic::Ordering::Acquire) + } + + pub fn total_bytes(&self) -> u64 { + self.total_bytes.load(std::sync::atomic::Ordering::Acquire) + } + /// Returns the percent of dead items in the segment pub fn stale_ratio(&self) -> f32 { let dead = self.get_stale_items() as f32; @@ -19,7 +27,7 @@ impl Stats { return 0.0; } - dead / self.item_count as f32 + dead / self.item_count() as f32 } /// Returns the amount of dead items in the segment diff --git a/src/segment/writer.rs b/src/segment/writer.rs index 92b0867..358a85c 100644 --- a/src/segment/writer.rs +++ b/src/segment/writer.rs @@ -1,15 +1,15 @@ +use crate::id::SegmentId; use byteorder::{BigEndian, WriteBytesExt}; use std::{ fs::File, io::{BufWriter, Write}, path::{Path, PathBuf}, - sync::Arc, }; /// Segment writer pub struct Writer { pub(crate) folder: PathBuf, - pub(crate) segment_id: Arc, + pub(crate) segment_id: SegmentId, inner: BufWriter, @@ -26,7 +26,7 @@ impl Writer { /// /// Will return `Err` if an IO error occurs. #[doc(hidden)] - pub fn new>(segment_id: Arc, path: P) -> std::io::Result { + pub fn new>(segment_id: SegmentId, path: P) -> std::io::Result { let path = path.as_ref(); let folder = path.parent().expect("should have parent directory"); @@ -46,27 +46,6 @@ impl Writer { /// Returns the current offset in the file. /// /// This can be used to index an item into an external `Index`. - /// - /// # Examples - /// - /// ``` - /// # use value_log::SegmentWriter; - /// # use std::collections::HashMap; - /// # - /// # let folder = tempfile::tempdir()?; - /// # std::fs::create_dir_all(folder.path().join("segments"))?; - /// # let mut writer = SegmentWriter::new(1_000, folder)?; - /// # let mut index = HashMap::new(); - /// # - /// # let items = [(b"1", b"1"), (b"2", b"2")]; - /// # - /// for (key, value) in items { - /// index.insert(key, writer.offset(key)); - /// writer.write(key, value)?; - /// } - /// # - /// # Ok::<(), value_log::Error>(()) - /// ``` #[must_use] pub fn offset(&self) -> u64 { self.offset @@ -74,8 +53,8 @@ impl Writer { /// Returns the segment ID #[must_use] - pub fn segment_id(&self) -> Arc { - self.segment_id.clone() + pub fn segment_id(&self) -> SegmentId { + self.segment_id } /// Writes an item into the file diff --git a/src/value_log.rs b/src/value_log.rs index 815b8ca..69cac52 100644 --- a/src/value_log.rs +++ b/src/value_log.rs @@ -1,17 +1,19 @@ use crate::{ blob_cache::BlobCache, + id::{IdGenerator, SegmentId}, index::Writer as IndexWriter, - segment::{merge::MergeReader, multi_writer::MultiWriter, stats::Stats}, + manifest::{SegmentManifest, SEGMENTS_FOLDER, VLOG_MARKER}, + path::absolute_path, + segment::merge::MergeReader, version::Version, - Config, Index, Segment, SegmentWriter, ValueHandle, + Config, Index, SegmentWriter, ValueHandle, }; use byteorder::{BigEndian, ReadBytesExt}; use std::{ - collections::BTreeMap, fs::File, io::{BufReader, Read, Seek}, path::PathBuf, - sync::{atomic::AtomicU64, Arc, Mutex, RwLock}, + sync::{Arc, Mutex, RwLock}, }; /// A disk-resident value log @@ -39,37 +41,48 @@ pub struct ValueLogInner { blob_cache: Arc, /// Segment manifest - pub segments: RwLock, Arc>>, + pub manifest: RwLock, + + id_generator: IdGenerator, rollover_guard: Mutex<()>, } impl ValueLog { - /// Creates or recovers a value log - pub fn new>( + /// Creates or recovers a value log in the given directory. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn open>( path: P, config: Config, index: Arc, ) -> crate::Result { - Self::create_new(path, config, index) - // TODO: recover if exists + let path = path.into(); + + if path.join(VLOG_MARKER).try_exists()? { + Self::recover(path, config, index) + } else { + Self::create_new(path, config, index) + } } - /// Creates a new empty value log in a folder + /// Creates a new empty value log in a directory. pub(crate) fn create_new>( path: P, config: Config, index: Arc, ) -> crate::Result { - let path = path.into(); + let path = absolute_path(path.into()); log::trace!("Creating value-log at {}", path.display()); std::fs::create_dir_all(&path)?; - let marker_path = path.join(".vlog"); + let marker_path = path.join(VLOG_MARKER); assert!(!marker_path.try_exists()?); - std::fs::create_dir_all(path.join("segments"))?; + std::fs::create_dir_all(path.join(SEGMENTS_FOLDER))?; // NOTE: Lastly, fsync .vlog marker, which contains the version // -> the V-log is fully initialized @@ -82,7 +95,7 @@ impl ValueLog { { // fsync folders on Unix - let folder = std::fs::File::open(path.join("segments"))?; + let folder = std::fs::File::open(path.join(SEGMENTS_FOLDER))?; folder.sync_all()?; let folder = std::fs::File::open(&path)?; @@ -90,58 +103,29 @@ impl ValueLog { } let blob_cache = config.blob_cache.clone(); + let manifest = SegmentManifest::create_new(&path)?; Ok(Self(Arc::new(ValueLogInner { config, - path, // TODO: absolute path + path, blob_cache, index, - segments: RwLock::new(BTreeMap::default()), + manifest: RwLock::new(manifest), + id_generator: IdGenerator::default(), rollover_guard: Mutex::new(()), }))) } - /// Gets a segment - #[must_use] - pub fn get_segment(&self, id: &Arc) -> Option> { - self.segments - .read() - .expect("lock is poisoned") - .get(id) - .cloned() - } - - /// Lists all segment IDs - #[must_use] - pub fn list_segment_ids(&self) -> Vec> { - self.segments - .read() - .expect("lock is poisoned") - .keys() - .cloned() - .collect() - } - - /// Lists all segments - #[must_use] - pub fn list_segments(&self) -> Vec> { - self.segments - .read() - .expect("lock is poisoned") - .values() - .cloned() - .collect() - } - pub(crate) fn recover>( path: P, - _index: Arc, - ) -> crate::Result<()> { + config: Config, + index: Arc, + ) -> crate::Result { let path = path.into(); log::info!("Recovering value-log at {}", path.display()); { - let bytes = std::fs::read(path.join(".vlog"))?; + let bytes = std::fs::read(path.join(VLOG_MARKER))?; if let Some(version) = Version::parse_file_header(&bytes) { if version != Version::V1 { @@ -152,7 +136,35 @@ impl ValueLog { } } - todo!() + let blob_cache = config.blob_cache.clone(); + let manifest = SegmentManifest::recover(&path)?; + + Ok(Self(Arc::new(ValueLogInner { + config, + path, + blob_cache, + index, + manifest: RwLock::new(manifest), + id_generator: IdGenerator::default(), + rollover_guard: Mutex::new(()), + }))) + } + + /// Registers writer + pub fn register(&self, writer: SegmentWriter) -> crate::Result<()> { + self.manifest + .write() + .expect("lock is poisoned") + .register(writer) + } + + /// Returns segment count + pub fn segment_count(&self) -> usize { + self.manifest + .read() + .expect("lock is poisoned") + .segments + .len() } /// Resolves a value handle @@ -162,11 +174,10 @@ impl ValueLog { /// Will return `Err` if an IO error occurs. pub fn get(&self, handle: &ValueHandle) -> crate::Result>> { let Some(segment) = self - .segments + .manifest .read() .expect("lock is poisoned") - .get(&handle.segment_id) - .cloned() + .get_segment(handle.segment_id) else { return Ok(None); }; @@ -204,137 +215,12 @@ impl ValueLog { /// Will return `Err` if an IO error occurs. pub fn get_writer(&self) -> crate::Result { Ok(SegmentWriter::new( + self.id_generator.clone(), self.config.segment_size_bytes, &self.path, )?) } - /// Registers a new segment (blob file) by consuming a writer - /// - /// # Errors - /// - /// Will return `Err` if an IO error occurs. - pub fn register(&self, writer: MultiWriter) -> crate::Result<()> { - let writers = writer.finish()?; - - let mut lock = self.segments.write().expect("lock is poisoned"); - - for writer in writers { - let segment_id = writer.segment_id.clone(); - let segment_folder = writer.folder.clone(); - - lock.insert( - segment_id.clone(), - Arc::new(Segment { - id: segment_id, - path: segment_folder, - stats: Stats { - item_count: writer.item_count, - total_bytes: writer.written_blob_bytes, - stale_items: AtomicU64::default(), - stale_bytes: AtomicU64::default(), - }, - }), - ); - } - - Ok(()) - } - - /// Returns the amount of bytes on disk that are occupied by blobs. - /// - /// This value may not be fresh, as it is only set after running [`ValueLog::refresh_stats`]. - #[must_use] - pub fn disk_space_used(&self) -> u64 { - self.segments - .read() - .expect("lock is poisoned") - .values() - .map(|x| x.stats.total_bytes) - .sum::() - } - - /// Returns the amount of bytes that can be freed on disk - /// if all segments were to be defragmented - /// - /// This value may not be fresh, as it is only set after running [`ValueLog::refresh_stats`]. - #[must_use] - pub fn reclaimable_bytes(&self) -> u64 { - self.segments - .read() - .expect("lock is poisoned") - .values() - .map(|x| x.stats.get_stale_bytes()) - .sum::() - } - - /// Returns the amount of stale items - /// - /// This value may not be fresh, as it is only set after running [`ValueLog::refresh_stats`]. - #[must_use] - pub fn stale_items_count(&self) -> u64 { - self.segments - .read() - .expect("lock is poisoned") - .values() - .map(|x| x.stats.get_stale_items()) - .sum::() - } - - /// Returns the percent of dead bytes in the value log - /// - /// This value may not be fresh, as it is only set after running [`ValueLog::refresh_stats`]. - #[must_use] - pub fn stale_ratio(&self) -> f32 { - let segments = self.segments.read().expect("lock is poisoned"); - - let used_bytes = segments.values().map(|x| x.stats.total_bytes).sum::(); - if used_bytes == 0 { - return 0.0; - } - - let stale_bytes = segments - .values() - .map(|x| x.stats.get_stale_bytes()) - .sum::(); - if stale_bytes == 0 { - return 0.0; - } - - drop(segments); - - stale_bytes as f32 / used_bytes as f32 - } - - /// Returns the approximate space amplification - /// - /// Returns 0.0 if there are no items. - /// - /// This value may not be fresh, as it is only set after running [`ValueLog::refresh_stats`]. - #[must_use] - pub fn space_amp(&self) -> f32 { - let segments = self.segments.read().expect("lock is poisoned"); - - let used_bytes = segments.values().map(|x| x.stats.total_bytes).sum::(); - if used_bytes == 0 { - return 0.0; - } - - let stale_bytes = segments - .values() - .map(|x| x.stats.get_stale_bytes()) - .sum::(); - - drop(segments); - - let alive_bytes = used_bytes - stale_bytes; - if alive_bytes == 0 { - return 0.0; - } - - used_bytes as f32 / alive_bytes as f32 - } - /// Scans through a segment, refreshing its statistics /// /// This function is blocking. @@ -342,13 +228,12 @@ impl ValueLog { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub fn refresh_stats(&self, segment_id: &Arc) -> std::io::Result<()> { + pub fn refresh_stats(&self, segment_id: SegmentId) -> std::io::Result<()> { let Some(segment) = self - .segments + .manifest .read() .expect("lock is poisoned") - .get(segment_id) - .cloned() + .get_segment(segment_id) else { return Ok(()); }; @@ -356,15 +241,20 @@ impl ValueLog { // Scan segment let scanner = segment.scan()?; + let mut item_count = 0; + let mut total_bytes = 0; + let mut stale_items = 0; let mut stale_bytes = 0; for item in scanner { let (key, val) = item?; + item_count += 1; + total_bytes += val.len() as u64; if let Some(item) = self.index.get(&key)? { // NOTE: Segment IDs are monotonically increasing - if item.segment_id > *segment_id { + if item.segment_id > segment_id { stale_items += 1; stale_bytes += val.len() as u64; } @@ -374,6 +264,16 @@ impl ValueLog { } } + segment + .stats + .item_count + .store(item_count, std::sync::atomic::Ordering::Release); + + segment + .stats + .total_bytes + .store(total_bytes, std::sync::atomic::Ordering::Release); + segment .stats .stale_items @@ -384,6 +284,9 @@ impl ValueLog { .stale_bytes .store(stale_bytes, std::sync::atomic::Ordering::Release); + // TODO: need to store stats atomically, to make recovery fast + // TODO: changing stats doesn't happen **too** often, so the I/O is fine + Ok(()) } @@ -395,17 +298,17 @@ impl ValueLog { /// Will return `Err` if an IO error occurs. pub fn rollover( &self, - ids: &[Arc], + ids: &[u64], index_writer: &W, ) -> crate::Result<()> { // IMPORTANT: Only allow 1 rollover at any given time let _guard = self.rollover_guard.lock().expect("lock is poisoned"); - let lock = self.segments.read().expect("lock is poisoned"); + let lock = self.manifest.read().expect("lock is poisoned"); let segments = ids .iter() - .map(|x| lock.get(&**x).cloned()) + .map(|x| lock.segments.get(x).cloned()) .collect::>>(); drop(lock); @@ -438,14 +341,11 @@ impl ValueLog { writer.write(&k, &v)?; } - self.register(writer)?; - index_writer.finish()?; + let mut manifest = self.manifest.write().expect("lock is poisoned"); - let mut lock = self.segments.write().expect("lock is poisoned"); - for id in ids { - std::fs::remove_dir_all(self.path.join("segments").join(&**id))?; - lock.remove(id); - } + manifest.register(writer)?; + index_writer.finish()?; + manifest.drop_segments(ids)?; Ok(()) } diff --git a/tests/basic_gc.rs b/tests/basic_gc.rs index f8673ff..e1f9fce 100644 --- a/tests/basic_gc.rs +++ b/tests/basic_gc.rs @@ -18,11 +18,7 @@ impl std::ops::Deref for DebugIndex { } } -impl Index for DebugIndex { - fn get(&self, key: &[u8]) -> std::io::Result> { - Ok(self.read().expect("lock is poisoned").get(key).cloned()) - } - +impl DebugIndex { fn insert_indirection(&self, key: &[u8], value: ValueHandle) -> std::io::Result<()> { self.write() .expect("lock is poisoned") @@ -31,6 +27,12 @@ impl Index for DebugIndex { } } +impl Index for DebugIndex { + fn get(&self, key: &[u8]) -> std::io::Result> { + Ok(self.read().expect("lock is poisoned").get(key).cloned()) + } +} + struct DebugIndexWriter(Arc); impl IndexWriter for DebugIndexWriter { @@ -52,7 +54,7 @@ fn basic_gc() -> value_log::Result<()> { let vl_path = folder.path(); std::fs::create_dir_all(vl_path)?; - let value_log = ValueLog::new(vl_path, Config::default(), index.clone())?; + let value_log = ValueLog::open(vl_path, Config::default(), index.clone())?; { let items = ["a", "b", "c", "d", "e"]; @@ -64,13 +66,7 @@ fn basic_gc() -> value_log::Result<()> { for key in &items { let offset = writer.offset(key.as_bytes()); - index.insert_indirection( - key.as_bytes(), - ValueHandle { - offset, - segment_id: segment_id.clone(), - }, - )?; + index.insert_indirection(key.as_bytes(), ValueHandle { offset, segment_id })?; writer.write(key.as_bytes(), key.repeat(500).as_bytes())?; } @@ -79,10 +75,13 @@ fn basic_gc() -> value_log::Result<()> { } { - let lock = value_log.segments.read().unwrap(); - assert_eq!(1, lock.len()); - assert_eq!(5, lock.values().next().unwrap().len()); - assert_eq!(0, lock.values().next().unwrap().stats.get_stale_items()); + assert_eq!(1, value_log.segment_count()); + + let segments = value_log.manifest.read().expect("lock is poisoned"); + let segments = segments.list_segments(); + + assert_eq!(5, segments.first().unwrap().len()); + assert_eq!(0, segments.first().unwrap().stats.get_stale_items()); } for (key, handle) in index.0.read().unwrap().iter() { @@ -100,13 +99,7 @@ fn basic_gc() -> value_log::Result<()> { for key in &items { let offset = writer.offset(key.as_bytes()); - index.insert_indirection( - key.as_bytes(), - ValueHandle { - offset, - segment_id: segment_id.clone(), - }, - )?; + index.insert_indirection(key.as_bytes(), ValueHandle { offset, segment_id })?; writer.write(key.as_bytes(), key.repeat(1_000).as_bytes())?; } @@ -115,10 +108,13 @@ fn basic_gc() -> value_log::Result<()> { } { - let lock = value_log.segments.read().unwrap(); - assert_eq!(2, lock.len()); - assert_eq!(5, lock.values().next().unwrap().len()); - assert_eq!(0, lock.values().next().unwrap().stats.get_stale_items()); + assert_eq!(2, value_log.segment_count()); + + let segments = value_log.manifest.read().expect("lock is poisoned"); + let segments = segments.list_segments(); + + assert_eq!(5, segments.first().unwrap().len()); + assert_eq!(0, segments.first().unwrap().stats.get_stale_items()); } for (key, handle) in index.0.read().unwrap().iter() { @@ -126,16 +122,22 @@ fn basic_gc() -> value_log::Result<()> { assert_eq!(item, key.repeat(1_000).into()); } - value_log.rollover( - &value_log.list_segment_ids(), - &DebugIndexWriter(index.clone()), - )?; + let ids = value_log + .manifest + .read() + .expect("lock is poisoned") + .list_segment_ids(); + + value_log.rollover(&ids, &DebugIndexWriter(index.clone()))?; { - let lock = value_log.segments.read().unwrap(); - assert_eq!(1, lock.len()); - assert_eq!(5, lock.values().next().unwrap().len()); - assert_eq!(0, lock.values().next().unwrap().stats.get_stale_items()); + assert_eq!(1, value_log.segment_count()); + + let segments = value_log.manifest.read().expect("lock is poisoned"); + let segments = segments.list_segments(); + + assert_eq!(5, segments.first().unwrap().len()); + assert_eq!(0, segments.first().unwrap().stats.get_stale_items()); } Ok(()) diff --git a/tests/basic_kv.rs b/tests/basic_kv.rs index 46602e0..0425dac 100644 --- a/tests/basic_kv.rs +++ b/tests/basic_kv.rs @@ -18,11 +18,7 @@ impl std::ops::Deref for DebugIndex { } } -impl Index for DebugIndex { - fn get(&self, key: &[u8]) -> std::io::Result> { - Ok(self.read().expect("lock is poisoned").get(key).cloned()) - } - +impl DebugIndex { fn insert_indirection(&self, key: &[u8], value: ValueHandle) -> std::io::Result<()> { self.write() .expect("lock is poisoned") @@ -32,6 +28,12 @@ impl Index for DebugIndex { } } +impl Index for DebugIndex { + fn get(&self, key: &[u8]) -> std::io::Result> { + Ok(self.read().expect("lock is poisoned").get(key).cloned()) + } +} + #[test] fn basic_kv() -> value_log::Result<()> { let folder = tempfile::tempdir()?; @@ -41,7 +43,7 @@ fn basic_kv() -> value_log::Result<()> { let vl_path = folder.path(); std::fs::create_dir_all(vl_path)?; - let value_log = ValueLog::new(vl_path, Config::default(), index.clone())?; + let value_log = ValueLog::open(vl_path, Config::default(), index.clone())?; let items = ["a", "b", "c", "d", "e"]; @@ -53,13 +55,7 @@ fn basic_kv() -> value_log::Result<()> { for key in &items { let offset = writer.offset(key.as_bytes()); - index.insert_indirection( - key.as_bytes(), - ValueHandle { - offset, - segment_id: segment_id.clone(), - }, - )?; + index.insert_indirection(key.as_bytes(), ValueHandle { offset, segment_id })?; writer.write(key.as_bytes(), key.repeat(1_000).as_bytes())?; } @@ -68,12 +64,13 @@ fn basic_kv() -> value_log::Result<()> { } { - let lock = value_log.segments.read().unwrap(); - assert_eq!(1, lock.len()); + assert_eq!(1, value_log.segment_count()); + + let segments = value_log.manifest.read().expect("lock is poisoned"); + let segments = segments.list_segments(); - let segment = lock.values().next().unwrap(); - assert_eq!(items.len() as u64, segment.len()); - assert_eq!(0, segment.stats.get_stale_items()); + assert_eq!(items.len() as u64, segments.first().unwrap().len()); + assert_eq!(0, segments.first().unwrap().stats.get_stale_items()); } for (key, handle) in index.0.read().unwrap().iter() { diff --git a/tests/recovery.rs b/tests/recovery.rs new file mode 100644 index 0000000..b0712df --- /dev/null +++ b/tests/recovery.rs @@ -0,0 +1,104 @@ +use std::{ + collections::BTreeMap, + sync::{Arc, RwLock}, +}; +use test_log::test; +use value_log::{Config, Index, ValueHandle, ValueLog}; + +type Inner = RwLock, ValueHandle>>; + +#[derive(Default)] +pub struct DebugIndex(Inner); + +impl std::ops::Deref for DebugIndex { + type Target = Inner; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DebugIndex { + fn insert_indirection(&self, key: &[u8], value: ValueHandle) -> std::io::Result<()> { + self.write() + .expect("lock is poisoned") + .insert(key.into(), value); + + Ok(()) + } +} + +impl Index for DebugIndex { + fn get(&self, key: &[u8]) -> std::io::Result> { + Ok(self.read().expect("lock is poisoned").get(key).cloned()) + } +} + +#[test] +fn basic_kv() -> value_log::Result<()> { + let folder = tempfile::tempdir()?; + + let index = DebugIndex(RwLock::new(BTreeMap::, ValueHandle>::default())); + let index = Arc::new(index); + + let vl_path = folder.path(); + std::fs::create_dir_all(vl_path)?; + + let items = ["a", "b", "c", "d", "e"]; + + { + let value_log = ValueLog::open(vl_path, Config::default(), index.clone())?; + + { + let mut writer = value_log.get_writer()?; + + let segment_id = writer.segment_id(); + + for key in &items { + let offset = writer.offset(key.as_bytes()); + + index.insert_indirection(key.as_bytes(), ValueHandle { offset, segment_id })?; + + writer.write(key.as_bytes(), key.repeat(1_000).as_bytes())?; + } + + value_log.register(writer)?; + } + + { + assert_eq!(1, value_log.segment_count()); + + let segments = value_log.manifest.read().expect("lock is poisoned"); + let segments = segments.list_segments(); + + assert_eq!(items.len() as u64, segments.first().unwrap().len()); + assert_eq!(0, segments.first().unwrap().stats.get_stale_items()); + } + + for (key, handle) in index.0.read().unwrap().iter() { + let item = value_log.get(handle)?.unwrap(); + assert_eq!(item, key.repeat(1_000).into()); + } + } + + { + let value_log = ValueLog::open(vl_path, Config::default(), index.clone())?; + + { + assert_eq!(1, value_log.segment_count()); + + let segments = value_log.manifest.read().expect("lock is poisoned"); + let segments = segments.list_segments(); + + assert_eq!(items.len() as u64, segments.first().unwrap().len()); + assert_eq!(0, segments.first().unwrap().stats.get_stale_items()); + } + + for (key, handle) in index.0.read().unwrap().iter() { + let item = value_log.get(handle)?.unwrap(); + assert_eq!(item, key.repeat(1_000).into()); + } + } + + Ok(()) +} diff --git a/tests/space_amp.rs b/tests/space_amp.rs index a2ff274..f6c5e5f 100644 --- a/tests/space_amp.rs +++ b/tests/space_amp.rs @@ -18,11 +18,7 @@ impl std::ops::Deref for DebugIndex { } } -impl Index for DebugIndex { - fn get(&self, key: &[u8]) -> std::io::Result> { - Ok(self.read().expect("lock is poisoned").get(key).cloned()) - } - +impl DebugIndex { fn insert_indirection(&self, key: &[u8], value: ValueHandle) -> std::io::Result<()> { self.write() .expect("lock is poisoned") @@ -32,6 +28,12 @@ impl Index for DebugIndex { } } +impl Index for DebugIndex { + fn get(&self, key: &[u8]) -> std::io::Result> { + Ok(self.read().expect("lock is poisoned").get(key).cloned()) + } +} + #[test] fn worst_case_space_amp() -> value_log::Result<()> { let folder = tempfile::tempdir()?; @@ -41,10 +43,24 @@ fn worst_case_space_amp() -> value_log::Result<()> { let vl_path = folder.path(); std::fs::create_dir_all(vl_path)?; - let value_log = ValueLog::new(vl_path, Config::default(), index.clone())?; + let value_log = ValueLog::open(vl_path, Config::default(), index.clone())?; - assert_eq!(0.0, value_log.space_amp()); - assert_eq!(0.0, value_log.stale_ratio()); + assert_eq!( + 0.0, + value_log + .manifest + .read() + .expect("lock is poisoned") + .space_amp() + ); + assert_eq!( + 0.0, + value_log + .manifest + .read() + .expect("lock is poisoned") + .stale_ratio() + ); let key = "key"; let value = "value"; @@ -57,25 +73,39 @@ fn worst_case_space_amp() -> value_log::Result<()> { let offset = writer.offset(key.as_bytes()); - index.insert_indirection( - key.as_bytes(), - ValueHandle { - offset, - segment_id: segment_id.clone(), - }, - )?; + index.insert_indirection(key.as_bytes(), ValueHandle { offset, segment_id })?; writer.write(key.as_bytes(), value.as_bytes())?; value_log.register(writer)?; - for id in value_log.list_segment_ids() { - value_log.refresh_stats(&id)?; + for id in value_log + .manifest + .read() + .expect("lock is poisoned") + .list_segment_ids() + { + value_log.refresh_stats(id)?; } - assert_eq!(x as f32, value_log.space_amp()); + assert_eq!( + x as f32, + value_log + .manifest + .read() + .expect("lock is poisoned") + .space_amp() + ); if x > 1 { - assert!((1.0 - (1.0 / x as f32) - value_log.stale_ratio()) < 0.00001); + assert!( + (1.0 - (1.0 / x as f32) + - value_log + .manifest + .read() + .expect("lock is poisoned") + .stale_ratio()) + < 0.00001 + ); } } @@ -91,10 +121,24 @@ fn no_overlap_space_amp() -> value_log::Result<()> { let vl_path = folder.path(); std::fs::create_dir_all(vl_path)?; - let value_log = ValueLog::new(vl_path, Config::default(), index.clone())?; + let value_log = ValueLog::open(vl_path, Config::default(), index.clone())?; - assert_eq!(0.0, value_log.stale_ratio()); - assert_eq!(0.0, value_log.space_amp()); + assert_eq!( + 0.0, + value_log + .manifest + .read() + .expect("lock is poisoned") + .stale_ratio() + ); + assert_eq!( + 0.0, + value_log + .manifest + .read() + .expect("lock is poisoned") + .space_amp() + ); // NOTE: No blobs overlap, so there are no dead blobs => space amp = 1.0 => perfect space amp for i in 0..100 { @@ -106,23 +150,36 @@ fn no_overlap_space_amp() -> value_log::Result<()> { let offset = writer.offset(key.as_bytes()); - index.insert_indirection( - key.as_bytes(), - ValueHandle { - offset, - segment_id: segment_id.clone(), - }, - )?; + index.insert_indirection(key.as_bytes(), ValueHandle { offset, segment_id })?; writer.write(key.as_bytes(), value.as_bytes())?; value_log.register(writer)?; - for id in value_log.list_segment_ids() { - value_log.refresh_stats(&id)?; + for id in value_log + .manifest + .read() + .expect("lock is poisoned") + .list_segment_ids() + { + value_log.refresh_stats(id)?; } - assert_eq!(1.0, value_log.space_amp()); - assert_eq!(0.0, value_log.stale_ratio()); + assert_eq!( + 1.0, + value_log + .manifest + .read() + .expect("lock is poisoned") + .space_amp() + ); + assert_eq!( + 0.0, + value_log + .manifest + .read() + .expect("lock is poisoned") + .stale_ratio() + ); } Ok(())