Skip to content

Commit

Permalink
Merge pull request #31080 from bkirwi/structured-on-disk-2
Browse files Browse the repository at this point in the history
[persist] Structured file format
  • Loading branch information
bkirwi authored Jan 27, 2025
2 parents ed513e1 + 41d0aea commit 59ba5f2
Show file tree
Hide file tree
Showing 12 changed files with 374 additions and 513 deletions.
7 changes: 5 additions & 2 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1050,8 +1050,11 @@ def __init__(
BOOLEAN_FLAG_VALUES
)
self.flags_with_values["enable_eager_delta_joins"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["persist_batch_columnar_format"] = ["row", "both_v2"]
self.flags_with_values["persist_record_schema_id"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["persist_batch_columnar_format"] = [
"row",
"both_v2",
"structured",
]
self.flags_with_values["persist_batch_structured_order"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["persist_batch_builder_structured"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["persist_batch_structured_key_lower_len"] = [
Expand Down
31 changes: 6 additions & 25 deletions src/persist-client/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,13 +688,7 @@ where
.codecs
.val
.encode(|| V::encode(val, &mut self.val_buf));
validate_schema(
&self.builder.write_schemas,
&self.key_buf,
&self.val_buf,
Some(key),
Some(val),
);
validate_schema(&self.builder.write_schemas, key, val);

let update = (
(self.key_buf.as_slice(), self.val_buf.as_slice()),
Expand Down Expand Up @@ -881,32 +875,18 @@ where
// inline it at the two callers.
pub(crate) fn validate_schema<K: Codec, V: Codec>(
stats_schemas: &Schemas<K, V>,
key: &[u8],
val: &[u8],
decoded_key: Option<&K>,
decoded_val: Option<&V>,
decoded_key: &K,
decoded_val: &V,
) {
// Attempt to catch any bad schema usage in CI. This is probably too
// expensive to run in prod.
if !mz_ore::assert::SOFT_ASSERTIONS.load(Ordering::Relaxed) {
return;
}
let key_valid = match decoded_key {
Some(key) => K::validate(key, &stats_schemas.key),
None => {
let key = K::decode(key, &stats_schemas.key).expect("valid encoded key");
K::validate(&key, &stats_schemas.key)
}
};
let key_valid = K::validate(decoded_key, &stats_schemas.key);
let () = key_valid
.unwrap_or_else(|err| panic!("constructing batch with mismatched key schema: {}", err));
let val_valid = match decoded_val {
Some(val) => V::validate(val, &stats_schemas.val),
None => {
let val = V::decode(val, &stats_schemas.val).expect("valid encoded val");
V::validate(&val, &stats_schemas.val)
}
};
let val_valid = V::validate(decoded_val, &stats_schemas.val);
let () = val_valid
.unwrap_or_else(|err| panic!("constructing batch with mismatched val schema: {}", err));
}
Expand Down Expand Up @@ -1135,6 +1115,7 @@ impl<T: Timestamp + Codec64> BatchParts<T> {
BatchColumnarFormat::Both(_) => {
self.cfg.inline_writes_single_max_bytes.saturating_div(2)
}
BatchColumnarFormat::Structured => self.cfg.inline_writes_single_max_bytes,
};

let (name, write_future) = if updates.goodbytes() < inline_threshold {
Expand Down
17 changes: 11 additions & 6 deletions src/persist-client/src/cli/inspect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::async_runtime::IsolatedRuntime;
use crate::cache::StateCache;
use crate::cli::args::{make_blob, make_consensus, StateArgs, NO_COMMIT, READ_ALL_BUILD_INFO};
use crate::error::CodecConcreteType;
use crate::fetch::{Cursor, EncodedPart};
use crate::fetch::EncodedPart;
use crate::internal::encoding::{Rollup, UntypedState};
use crate::internal::paths::{
BlobKey, BlobKeyPrefix, PartialBatchKey, PartialBlobKey, PartialRollupKey, WriterKey,
Expand Down Expand Up @@ -361,15 +361,19 @@ pub async fn blob_batch_part(
desc,
updates: Vec::new(),
};
let mut cursor = Cursor::default();
while let Some(((k, v, t, d), _)) = cursor.pop(&encoded_part) {
let records = encoded_part.normalize(&metrics.columnar);
for ((k, v), t, d) in records
.records()
.expect("only implemented for records")
.iter()
{
if out.updates.len() > limit {
break;
}
out.updates.push(BatchPartUpdate {
k: format!("{:?}", PrettyBytes(k)),
v: format!("{:?}", PrettyBytes(v)),
t,
t: u64::from_le_bytes(t),
d: i64::from_le_bytes(d),
});
}
Expand Down Expand Up @@ -408,8 +412,9 @@ async fn consolidated_size(args: &StateArgs) -> Result<(), anyhow::Error> {
)
.await
.expect("part exists");
let mut cursor = Cursor::default();
while let Some(((k, v, mut t, d), _)) = cursor.pop(&encoded_part) {
let part = encoded_part.normalize(&state_versions.metrics.columnar);
for ((k, v), t, d) in part.records().expect("codec records").iter() {
let mut t = <u64 as Codec64>::decode(t);
t.advance_by(as_of);
let d = <i64 as Codec64>::decode(d);
updates.push(((k.to_owned(), v.to_owned()), t, d));
Expand Down
Loading

0 comments on commit 59ba5f2

Please sign in to comment.