Skip to content

Commit

Permalink
Make decoding codec data optional in FetchedPart
Browse files Browse the repository at this point in the history
  • Loading branch information
bkirwi committed Jan 17, 2025
1 parent a40cdba commit ef767c6
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 147 deletions.
270 changes: 133 additions & 137 deletions src/persist-client/src/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ use std::sync::Arc;
use std::time::Instant;

use anyhow::anyhow;
use arrow::array::{Array, AsArray, BooleanArray};
use arrow::array::{Array, AsArray, BooleanArray, Int64Array};
use arrow::compute::FilterBuilder;
use differential_dataflow::difference::Semigroup;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::trace::Description;
use itertools::EitherOrBoth;
use mz_dyncfg::{Config, ConfigSet, ConfigValHandle};
use mz_ore::bytes::SegmentedBytes;
use mz_ore::cast::CastFrom;
Expand Down Expand Up @@ -722,14 +723,17 @@ impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedBlob<K, V,
pub struct FetchedPart<K: Codec, V: Codec, T, D> {
metrics: Arc<Metrics>,
ts_filter: FetchBatchFilter<T>,
part: ColumnarRecords,
// If migration is Either, then the columnar one will have already been
// applied here.
structured_part: (
Option<<K::Schema as Schema2<K>>::Decoder>,
Option<<V::Schema as Schema2<V>>::Decoder>,
),
part_decode_format: PartDecodeFormat,
// applied here on the structured data only.
part: EitherOrBoth<
ColumnarRecords,
(
<K::Schema as Schema2<K>>::Decoder,
<V::Schema as Schema2<V>>::Decoder,
),
>,
timestamps: Int64Array,
diffs: Int64Array,
migration: PartMigration<K, V>,
filter_pushdown_audit: Option<LazyPartStats>,
part_cursor: usize,
Expand Down Expand Up @@ -781,101 +785,87 @@ impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedPart<K, V,
None
};

// TODO(parkmycar): We should probably refactor this since these columns are duplicated
// (via a smart pointer) in EncodedPart.
//
// For structured columnar data we need to downcast from `dyn Array`s to concrete types.
// Downcasting is relatively expensive so we want to do this once, which is why we do it
// when creating a FetchedPart.
let should_downcast = match part_decode_format {
PartDecodeFormat::Row {
validate_structured,
} => validate_structured,
PartDecodeFormat::Arrow => true,
};
let downcast_structured = |structured: ColumnarRecordsStructuredExt| {
let key_size_before = ArrayOrd::new(&structured.key).goodbytes();

let mut updates = part.normalize(&metrics.columnar);
let structured_part = match (updates.structured(), should_downcast) {
// Only downcast and create decoders if we have structured data AND
// (an audit of the data is requested OR we'd like to decode
// directly from the structured data).
(Some(structured), true) => {
fn decode<C: Codec>(
name: &str,
schema: &C::Schema,
array: &Arc<dyn Array>,
) -> Option<<C::Schema as Schema2<C>>::Decoder> {
match Schema2::decoder_any(schema, array) {
Ok(x) => Some(x),
Err(err) => {
tracing::error!(?err, "failed to create {} decoder", name);
None
}
}
let structured = match &migration {
PartMigration::SameSchema { .. } => structured,
PartMigration::Codec { .. } => {
return None;
}
match &migration {
PartMigration::SameSchema { both } => {
let key_size_before = ArrayOrd::new(&structured.key).goodbytes();

let key = decode::<K>("key", &*both.key, &structured.key);
let val = decode::<V>("val", &*both.val, &structured.val);

if let Some(key_decoder) = key.as_ref() {
let key_size_after = key_decoder.goodbytes();
let key_diff = key_size_before.saturating_sub(key_size_after);
metrics
.pushdown
.parts_projection_trimmed_bytes
.inc_by(u64::cast_from(key_diff));
}

(key, val)
}
PartMigration::Codec { .. } => (None, None),
PartMigration::Either {
write: _write,
read,
key_migration,
val_migration,
} => {
let start = Instant::now();
let key = key_migration.migrate(Arc::clone(&structured.key));
let val = val_migration.migrate(Arc::clone(&structured.val));
metrics
.schema
.migration_migrate_seconds
.inc_by(start.elapsed().as_secs_f64());

let key_before_size = ArrayOrd::new(&structured.key).goodbytes();
let key_after_size = ArrayOrd::new(&key).goodbytes();
let key_diff = key_before_size.saturating_sub(key_after_size);
metrics
.pushdown
.parts_projection_trimmed_bytes
.inc_by(u64::cast_from(key_diff));

(
decode::<K>("key", &*read.key, &key),
decode::<V>("val", &*read.val, &val),
)
}
PartMigration::Either {
write: _,
read: _,
key_migration,
val_migration,
} => {
let start = Instant::now();
let key = key_migration.migrate(structured.key);
let val = val_migration.migrate(structured.val);
metrics
.schema
.migration_migrate_seconds
.inc_by(start.elapsed().as_secs_f64());
ColumnarRecordsStructuredExt { key, val }
}
};

let read_schema = migration.codec_read();
let key = K::Schema::decoder_any(&*read_schema.key, &*structured.key);
let val = V::Schema::decoder_any(&*read_schema.val, &*structured.val);

match &key {
Ok(key_decoder) => {
let key_size_after = key_decoder.goodbytes();
let key_diff = key_size_before.saturating_sub(key_size_after);
metrics
.pushdown
.parts_projection_trimmed_bytes
.inc_by(u64::cast_from(key_diff));
}
Err(e) => {
soft_panic_or_log!("failed to create decoder: {e:#?}");
}
}
_ => (None, None),

Some((key.ok()?, val.ok()?))
};

// Fill in the codec data, since decode expects it.
// TODO(structured): make reading the codec data optional.
let write_schemas = migration.structured_write();
let codec_part = updates
.get_or_make_codec::<K, V>(write_schemas.key.as_ref(), write_schemas.val.as_ref());
let updates = part.normalize(&metrics.columnar);
let timestamps = updates.timestamps().clone();
let diffs = updates.diffs().clone();
let part = match updates {
// If only one encoding is available, decode via that encoding.
BlobTraceUpdates::Row(records) => EitherOrBoth::Left(records),
BlobTraceUpdates::Structured { key_values, .. } => EitherOrBoth::Right(
// The structured-only data format was added after schema ids were recorded everywhere,
// so we expect this data to be present.
downcast_structured(key_values).expect("valid schemas for structured data"),
),
// If both are available, respect the specified part decode format.
BlobTraceUpdates::Both(records, ext) => match part_decode_format {
PartDecodeFormat::Row {
validate_structured: false,
} => EitherOrBoth::Left(records),
PartDecodeFormat::Row {
validate_structured: true,
} => match downcast_structured(ext) {
Some(decoders) => EitherOrBoth::Both(records, decoders),
None => EitherOrBoth::Left(records),
},
PartDecodeFormat::Arrow => match downcast_structured(ext) {
Some(decoders) => EitherOrBoth::Right(decoders),
None => EitherOrBoth::Left(records),
},
},
};

FetchedPart {
metrics,
ts_filter,
part: codec_part.clone(),
structured_part,
part_decode_format,
part,
timestamps,
diffs,
migration,
filter_pushdown_audit,
part_cursor: 0,
Expand Down Expand Up @@ -923,23 +913,22 @@ where
val: &mut Option<V>,
result_override: Option<(K, V)>,
) -> Option<((Result<K, String>, Result<V, String>), T, D)> {
while let Some(((k, v), t, d)) = self.part.get(self.part_cursor) {
while self.part_cursor < self.timestamps.len() {
let (k, v) = self.decode_kv(self.part_cursor, key, val);
let mut t = T::decode(self.timestamps.values()[self.part_cursor].to_le_bytes());
let mut d = D::decode(self.diffs.values()[self.part_cursor].to_le_bytes());
self.part_cursor += 1;
let mut t = T::decode(t);
let mut d = D::decode(d);

if !self.ts_filter.filter_ts(&mut t) {
continue;
}

// If `filter_ts` advances our timestamp, we may end up with the same K, V, T in successive
// records. If so, opportunistically consolidate those out.
while let Some(((k_next, v_next), t_next, d_next)) = self.part.get(self.part_cursor) {
if (k, v) != (k_next, v_next) {
break;
}
while self.part_cursor < self.timestamps.len() {
let mut t_next =
T::decode(self.timestamps.values()[self.part_cursor].to_le_bytes());

let mut t_next = T::decode(t_next);
if !self.ts_filter.filter_ts(&mut t_next) {
break;
}
Expand All @@ -948,9 +937,15 @@ where
break;
}

let (k_next, v_next) = self.decode_kv(self.part_cursor, key, val);
if (&k, &v) != (&k_next, &v_next) {
break;
}

// All equal... consolidate!
let d_next = D::decode(self.diffs.values()[self.part_cursor].to_le_bytes());
self.part_cursor += 1;
d.plus_equals(&D::decode(d_next));
d.plus_equals(&d_next);
}

// If multiple updates consolidate out entirely, drop the record.
Expand All @@ -962,39 +957,39 @@ where
return Some(((Ok(key), Ok(val)), t, d));
}

// TODO: Putting this here relies on the Codec data still being
// populated (i.e. for the consolidate optimization above).
// Eventually we'll have to rewrite this path to work entirely
// without Codec data, but in the meantime, putting in here allows
// us to see the performance impact of decoding from arrow instead
// of Codec.
//
// Plus, it'll likely be easier to port all the logic here to work
// solely on arrow data once we finish migrating things like the
// ConsolidatingIter.
if let ((Some(keys), Some(vals)), PartDecodeFormat::Arrow) =
(&self.structured_part, self.part_decode_format)
{
let (k, v) = self.decode_structured(self.part_cursor - 1, keys, vals, key, val);
return Some(((k, v), t, d));
}
return Some(((k, v), t, d));
}
None
}

let (k, v) = Self::decode_codec(
&self.metrics,
self.migration.codec_read(),
k,
v,
key,
val,
&mut self.key_storage,
&mut self.val_storage,
);
// Note: We only provide structured columns, if they were originally written, and a
// dyncfg was specified to run validation.
if let (Some(keys), Some(vals)) = &self.structured_part {
let (k_s, v_s) =
self.decode_structured(self.part_cursor - 1, keys, vals, &mut None, &mut None);
fn decode_kv(
&mut self,
index: usize,
key: &mut Option<K>,
val: &mut Option<V>,
) -> (Result<K, String>, Result<V, String>) {
let decoded = self
.part
.as_ref()
.map_left(|codec| {
let ((ck, cv), _, _) = codec.get(index).expect("valid index");
Self::decode_codec(
&*self.metrics,
self.migration.codec_read(),
ck,
cv,
key,
val,
&mut self.key_storage,
&mut self.val_storage,
)
})
.map_right(|(structured_key, structured_val)| {
self.decode_structured(index, structured_key, structured_val, key, val)
});

match decoded {
EitherOrBoth::Both((k, v), (k_s, v_s)) => {
// Purposefully do not trace to prevent blowing up Sentry.
let is_valid = self
.metrics
Expand All @@ -1015,11 +1010,12 @@ where
if !is_valid {
soft_panic_no_log!("structured val did not match, {v_s:?} != {v:?}");
}
}

return Some(((k, v), t, d));
(k, v)
}
EitherOrBoth::Left(kv) => kv,
EitherOrBoth::Right(kv) => kv,
}
None
}

fn decode_codec(
Expand Down Expand Up @@ -1090,7 +1086,7 @@ where

fn size_hint(&self) -> (usize, Option<usize>) {
// We don't know in advance how restrictive the filter will be.
let max_len = self.part.len();
let max_len = self.timestamps.len();
(0, Some(max_len))
}
}
Expand Down
10 changes: 0 additions & 10 deletions src/persist-client/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,16 +402,6 @@ impl<K: Codec, V: Codec> PartMigration<K, V> {
PartMigration::Either { read, .. } => read,
}
}

pub(crate) fn structured_write(&self) -> &Schemas<K, V> {
match self {
PartMigration::SameSchema { both } => both,
// We should have a write schema set for all parts that are written with the structured-
// only parquet format.
PartMigration::Codec { read } => read,
PartMigration::Either { write, .. } => write,
}
}
}

/// Returns if `new` is at least as nullable as `old`.
Expand Down

0 comments on commit ef767c6

Please sign in to comment.