Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dnr] perf testing #31172

Merged
merged 10 commits into from
Jan 27, 2025
7 changes: 5 additions & 2 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1050,8 +1050,11 @@ def __init__(
BOOLEAN_FLAG_VALUES
)
self.flags_with_values["enable_eager_delta_joins"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["persist_batch_columnar_format"] = ["row", "both_v2"]
self.flags_with_values["persist_record_schema_id"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["persist_batch_columnar_format"] = [
"row",
"both_v2",
"structured",
]
self.flags_with_values["persist_batch_structured_order"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["persist_batch_builder_structured"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["persist_batch_structured_key_lower_len"] = [
Expand Down
31 changes: 6 additions & 25 deletions src/persist-client/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,13 +688,7 @@ where
.codecs
.val
.encode(|| V::encode(val, &mut self.val_buf));
validate_schema(
&self.builder.write_schemas,
&self.key_buf,
&self.val_buf,
Some(key),
Some(val),
);
validate_schema(&self.builder.write_schemas, key, val);

let update = (
(self.key_buf.as_slice(), self.val_buf.as_slice()),
Expand Down Expand Up @@ -881,32 +875,18 @@ where
// inline it at the two callers.
pub(crate) fn validate_schema<K: Codec, V: Codec>(
stats_schemas: &Schemas<K, V>,
key: &[u8],
val: &[u8],
decoded_key: Option<&K>,
decoded_val: Option<&V>,
decoded_key: &K,
decoded_val: &V,
) {
// Attempt to catch any bad schema usage in CI. This is probably too
// expensive to run in prod.
if !mz_ore::assert::SOFT_ASSERTIONS.load(Ordering::Relaxed) {
return;
}
let key_valid = match decoded_key {
Some(key) => K::validate(key, &stats_schemas.key),
None => {
let key = K::decode(key, &stats_schemas.key).expect("valid encoded key");
K::validate(&key, &stats_schemas.key)
}
};
let key_valid = K::validate(decoded_key, &stats_schemas.key);
let () = key_valid
.unwrap_or_else(|err| panic!("constructing batch with mismatched key schema: {}", err));
let val_valid = match decoded_val {
Some(val) => V::validate(val, &stats_schemas.val),
None => {
let val = V::decode(val, &stats_schemas.val).expect("valid encoded val");
V::validate(&val, &stats_schemas.val)
}
};
let val_valid = V::validate(decoded_val, &stats_schemas.val);
let () = val_valid
.unwrap_or_else(|err| panic!("constructing batch with mismatched val schema: {}", err));
}
Expand Down Expand Up @@ -1135,6 +1115,7 @@ impl<T: Timestamp + Codec64> BatchParts<T> {
BatchColumnarFormat::Both(_) => {
self.cfg.inline_writes_single_max_bytes.saturating_div(2)
}
BatchColumnarFormat::Structured => self.cfg.inline_writes_single_max_bytes,
};

let (name, write_future) = if updates.goodbytes() < inline_threshold {
Expand Down
17 changes: 11 additions & 6 deletions src/persist-client/src/cli/inspect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::async_runtime::IsolatedRuntime;
use crate::cache::StateCache;
use crate::cli::args::{make_blob, make_consensus, StateArgs, NO_COMMIT, READ_ALL_BUILD_INFO};
use crate::error::CodecConcreteType;
use crate::fetch::{Cursor, EncodedPart};
use crate::fetch::EncodedPart;
use crate::internal::encoding::{Rollup, UntypedState};
use crate::internal::paths::{
BlobKey, BlobKeyPrefix, PartialBatchKey, PartialBlobKey, PartialRollupKey, WriterKey,
Expand Down Expand Up @@ -361,15 +361,19 @@ pub async fn blob_batch_part(
desc,
updates: Vec::new(),
};
let mut cursor = Cursor::default();
while let Some(((k, v, t, d), _)) = cursor.pop(&encoded_part) {
let records = encoded_part.normalize(&metrics.columnar);
for ((k, v), t, d) in records
.records()
.expect("only implemented for records")
.iter()
{
if out.updates.len() > limit {
break;
}
out.updates.push(BatchPartUpdate {
k: format!("{:?}", PrettyBytes(k)),
v: format!("{:?}", PrettyBytes(v)),
t,
t: u64::from_le_bytes(t),
d: i64::from_le_bytes(d),
});
}
Expand Down Expand Up @@ -408,8 +412,9 @@ async fn consolidated_size(args: &StateArgs) -> Result<(), anyhow::Error> {
)
.await
.expect("part exists");
let mut cursor = Cursor::default();
while let Some(((k, v, mut t, d), _)) = cursor.pop(&encoded_part) {
let part = encoded_part.normalize(&state_versions.metrics.columnar);
for ((k, v), t, d) in part.records().expect("codec records").iter() {
let mut t = <u64 as Codec64>::decode(t);
t.advance_by(as_of);
let d = <i64 as Codec64>::decode(d);
updates.push(((k.to_owned(), v.to_owned()), t, d));
Expand Down
Loading
Loading