Skip to content

Commit

Permalink
recovery wip
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin-j97 committed Mar 20, 2024
1 parent 94a19ea commit 5bca0eb
Show file tree
Hide file tree
Showing 21 changed files with 711 additions and 463 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ lz4 = ["dep:lz4_flex"]

[dependencies]
byteorder = "1.5.0"
chrono = "0.4.34"
crc32fast = "1.4.0"
log = "0.4.20"
lz4_flex = { version = "0.11.2", optional = true }
min-max-heap = "1.3.0"
path-absolutize = "3.1.1"
quick_cache = "0.4.1"
rand = "0.8.5"
serde = { version = "1.0.197", default-features = false, features = [
Expand All @@ -37,11 +37,11 @@ serde = { version = "1.0.197", default-features = false, features = [
"derive",
] }
serde_json = { version = "1.0.114" }
tempfile = "3.10.0"

[dev-dependencies]
criterion = "0.5.1"
env_logger = "0.11.2"
tempfile = "3.10.0"
test-log = "0.2.15"

[[bench]]
Expand Down
50 changes: 14 additions & 36 deletions benches/value_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@ use value_log::{BlobCache, Config, Index, ValueHandle, ValueLog};
#[derive(Default)]
pub struct DebugIndex(RwLock<BTreeMap<Arc<[u8]>, ValueHandle>>);

impl Index for DebugIndex {
fn get(&self, key: &[u8]) -> std::io::Result<Option<ValueHandle>> {
Ok(self.0.read().expect("lock is poisoned").get(key).cloned())
}

impl DebugIndex {
fn insert_indirection(&self, key: &[u8], value: ValueHandle) -> std::io::Result<()> {
self.0
.write()
Expand All @@ -24,6 +20,12 @@ impl Index for DebugIndex {
}
}

impl Index for DebugIndex {
fn get(&self, key: &[u8]) -> std::io::Result<Option<ValueHandle>> {
Ok(self.0.read().expect("lock is poisoned").get(key).cloned())
}
}

fn load_value(c: &mut Criterion) {
let mut group = c.benchmark_group("load blob");

Expand All @@ -48,7 +50,7 @@ fn load_value(c: &mut Criterion) {
let folder = tempfile::tempdir().unwrap();
let vl_path = folder.path();

let value_log = ValueLog::new(
let value_log = ValueLog::open(
vl_path,
Config::default().blob_cache(Arc::new(BlobCache::with_capacity_bytes(0))),
index.clone(),
Expand All @@ -65,13 +67,7 @@ fn load_value(c: &mut Criterion) {
let offset = writer.offset(key.as_bytes());

index
.insert_indirection(
key.as_bytes(),
ValueHandle {
offset,
segment_id: segment_id.clone(),
},
)
.insert_indirection(key.as_bytes(), ValueHandle { offset, segment_id })
.unwrap();

let mut data = vec![0u8; size];
Expand Down Expand Up @@ -101,7 +97,7 @@ fn load_value(c: &mut Criterion) {
let folder = tempfile::tempdir().unwrap();
let vl_path = folder.path();

let value_log = ValueLog::new(
let value_log = ValueLog::open(
vl_path,
Config::default()
.blob_cache(Arc::new(BlobCache::with_capacity_bytes(64 * 1_024 * 1_024))),
Expand All @@ -119,13 +115,7 @@ fn load_value(c: &mut Criterion) {
let offset = writer.offset(key.as_bytes());

index
.insert_indirection(
key.as_bytes(),
ValueHandle {
offset,
segment_id: segment_id.clone(),
},
)
.insert_indirection(key.as_bytes(), ValueHandle { offset, segment_id })
.unwrap();

let mut data = vec![0u8; size];
Expand Down Expand Up @@ -161,7 +151,7 @@ fn compression(c: &mut Criterion) {
let folder = tempfile::tempdir().unwrap();
let vl_path = folder.path();

let value_log = ValueLog::new(
let value_log = ValueLog::open(
vl_path,
Config::default().blob_cache(Arc::new(BlobCache::with_capacity_bytes(0))),
index.clone(),
Expand All @@ -180,13 +170,7 @@ fn compression(c: &mut Criterion) {
let offset = writer.offset(key.as_bytes());

index
.insert_indirection(
key.as_bytes(),
ValueHandle {
offset,
segment_id: segment_id.clone(),
},
)
.insert_indirection(key.as_bytes(), ValueHandle { offset, segment_id })
.unwrap();

let mut data = vec![0u8; size_mb * 1_024 * 1_024];
Expand All @@ -200,13 +184,7 @@ fn compression(c: &mut Criterion) {
let offset = writer.offset(key.as_bytes());

index
.insert_indirection(
key.as_bytes(),
ValueHandle {
offset,
segment_id: segment_id.clone(),
},
)
.insert_indirection(key.as_bytes(), ValueHandle { offset, segment_id })
.unwrap();

let dummy = b"abcdefgh";
Expand Down
4 changes: 3 additions & 1 deletion src/blob_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ type Item = Arc<[u8]>;
struct BlobWeighter;

impl Weighter<CacheKey, Item> for BlobWeighter {
// NOTE: quick_cache only supports u32 as weight, but that's fine
// 4 GB blobs are too big anyway
#[allow(clippy::cast_possible_truncation)]
fn weight(&self, _: &CacheKey, blob: &Item) -> u32 {
// TODO: quick_cache only supports u32 as weight...?
blob.len() as u32
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/handle.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::id::SegmentId;
use serde::{Deserialize, Serialize};
use std::hash::Hash;
use std::sync::Arc;

/// A value handle points into the value log.
#[allow(clippy::module_name_repetitions)]
#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
pub struct ValueHandle {
/// Segment ID
pub segment_id: Arc<str>,
pub segment_id: SegmentId,

/// Offset in file
pub offset: u64,
Expand Down
83 changes: 13 additions & 70 deletions src/id.rs
Original file line number Diff line number Diff line change
@@ -1,79 +1,22 @@
use chrono::{Datelike, Timelike};
use rand::Rng;
use std::sync::Arc;
use std::sync::{atomic::AtomicU64, Arc};

const BASE_36_RADIX: u32 = 36;

fn to_base36(mut x: u32) -> String {
let mut result = vec![];

loop {
let m = x % BASE_36_RADIX;
x /= BASE_36_RADIX;

result.push(std::char::from_digit(m, BASE_36_RADIX).expect("should be hex digit"));

if x == 0 {
break;
}
}

result.into_iter().rev().collect()
}

/// Generates an ID for a segment
///
/// Like <https://cassandra.apache.org/_/blog/Apache-Cassandra-4.1-New-SSTable-Identifiers.html>
#[allow(clippy::module_name_repetitions)]
#[doc(hidden)]
#[must_use]
pub fn generate_segment_id() -> Arc<str> {
let now = chrono::Utc::now();
pub type SegmentId = u64;

let year = now.year().unsigned_abs();
let month = now.month() as u8;
let day = (now.day() - 1) as u8;

let hour = now.hour() as u8;
let min = now.minute() as u8;

let sec = now.second() as u8;
let nano = now.timestamp_subsec_nanos();
#[allow(clippy::module_name_repetitions)]
#[derive(Clone, Default)]
pub struct IdGenerator(Arc<AtomicU64>);

let mut rng = rand::thread_rng();
let random = rng.gen::<u16>();
impl std::ops::Deref for IdGenerator {
type Target = Arc<AtomicU64>;

format!(
"{:0>4}_{}{}{:0>2}{:0>2}_{:0>2}{:0>8}_{:0>4}",
to_base36(year),
//
to_base36(u32::from(month)),
to_base36(u32::from(day)),
to_base36(u32::from(hour)),
to_base36(u32::from(min)),
//
to_base36(u32::from(sec)),
to_base36(nano),
//
to_base36(u32::from(random)),
)
.into()
fn deref(&self) -> &Self::Target {
&self.0
}
}

#[cfg(test)]
mod tests {
use super::*;
use test_log::test;

#[test]
pub fn id_monotonic_order() {
for _ in 0..1_000 {
let ids = (0..100).map(|_| generate_segment_id()).collect::<Vec<_>>();

let mut sorted = ids.clone();
sorted.sort();

assert_eq!(ids, sorted, "ID is not monotonic");
}
impl IdGenerator {
pub fn next(&self) -> SegmentId {
self.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
}
}
12 changes: 0 additions & 12 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,6 @@ pub trait Index {
///
/// Will return `Err` if an IO error occurs.
fn get(&self, key: &[u8]) -> std::io::Result<Option<ValueHandle>>;

// TODO: shouldn'be part of Index... remove
// TODO: flushing to value log should use `Writer` (atomic)

/// Inserts an value handle into the index.
///
/// This method is called during value log garbage collection.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
fn insert_indirection(&self, key: &[u8], value: ValueHandle) -> std::io::Result<()>;
}

/// Trait that allows writing into an external index
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ mod error;
mod handle;
mod id;
mod index;
mod manifest;
mod path;
mod segment;
mod value_log;
mod version;
Expand Down
4 changes: 2 additions & 2 deletions src/main.rs → src/main._rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fn main() -> value_log::Result<()> {
}
std::fs::create_dir_all(vl_path)?;

let value_log = ValueLog::new(vl_path, Config::default(), index.clone())?;
let value_log = ValueLog::open(vl_path, Config::default(), index.clone())?;

{
let mut writer = value_log.get_writer()?;
Expand Down Expand Up @@ -296,7 +296,7 @@ fn main() -> value_log::Result<()> {
);

for id in value_log.segments.read().unwrap().keys() {
value_log.refresh_stats(id)?;
value_log.refresh_stats(*id)?;
}

eprintln!("=== after refresh ===");
Expand Down
Loading

0 comments on commit 5bca0eb

Please sign in to comment.