diff --git a/src/persist-client/src/fetch.rs b/src/persist-client/src/fetch.rs index 916b74c7d9816..b8810e8125b82 100644 --- a/src/persist-client/src/fetch.rs +++ b/src/persist-client/src/fetch.rs @@ -15,11 +15,12 @@ use std::sync::Arc; use std::time::Instant; use anyhow::anyhow; -use arrow::array::{Array, AsArray, BooleanArray}; +use arrow::array::{Array, AsArray, BooleanArray, Int64Array}; use arrow::compute::FilterBuilder; use differential_dataflow::difference::Semigroup; use differential_dataflow::lattice::Lattice; use differential_dataflow::trace::Description; +use itertools::EitherOrBoth; use mz_dyncfg::{Config, ConfigSet, ConfigValHandle}; use mz_ore::bytes::SegmentedBytes; use mz_ore::cast::CastFrom; @@ -722,14 +723,17 @@ impl FetchedBlob { metrics: Arc, ts_filter: FetchBatchFilter, - part: ColumnarRecords, // If migration is Either, then the columnar one will have already been - // applied here. - structured_part: ( - Option<>::Decoder>, - Option<>::Decoder>, - ), - part_decode_format: PartDecodeFormat, + // applied here on the structured data only. + part: EitherOrBoth< + ColumnarRecords, + ( + >::Decoder, + >::Decoder, + ), + >, + timestamps: Int64Array, + diffs: Int64Array, migration: PartMigration, filter_pushdown_audit: Option, part_cursor: usize, @@ -781,101 +785,87 @@ impl FetchedPart validate_structured, - PartDecodeFormat::Arrow => true, - }; + let downcast_structured = |structured: ColumnarRecordsStructuredExt| { + let key_size_before = ArrayOrd::new(&structured.key).goodbytes(); - let mut updates = part.normalize(&metrics.columnar); - let structured_part = match (updates.structured(), should_downcast) { - // Only downcast and create decoders if we have structured data AND - // (an audit of the data is requested OR we'd like to decode - // directly from the structured data). - (Some(structured), true) => { - fn decode( - name: &str, - schema: &C::Schema, - array: &Arc, - ) -> Option<>::Decoder> { - match Schema2::decoder_any(schema, array) { - Ok(x) => Some(x), - Err(err) => { - tracing::error!(?err, "failed to create {} decoder", name); - None - } - } + let structured = match &migration { + PartMigration::SameSchema { .. } => structured, + PartMigration::Codec { .. } => { + return None; } - match &migration { - PartMigration::SameSchema { both } => { - let key_size_before = ArrayOrd::new(&structured.key).goodbytes(); - - let key = decode::("key", &*both.key, &structured.key); - let val = decode::("val", &*both.val, &structured.val); - - if let Some(key_decoder) = key.as_ref() { - let key_size_after = key_decoder.goodbytes(); - let key_diff = key_size_before.saturating_sub(key_size_after); - metrics - .pushdown - .parts_projection_trimmed_bytes - .inc_by(u64::cast_from(key_diff)); - } - - (key, val) - } - PartMigration::Codec { .. } => (None, None), - PartMigration::Either { - write: _write, - read, - key_migration, - val_migration, - } => { - let start = Instant::now(); - let key = key_migration.migrate(Arc::clone(&structured.key)); - let val = val_migration.migrate(Arc::clone(&structured.val)); - metrics - .schema - .migration_migrate_seconds - .inc_by(start.elapsed().as_secs_f64()); - - let key_before_size = ArrayOrd::new(&structured.key).goodbytes(); - let key_after_size = ArrayOrd::new(&key).goodbytes(); - let key_diff = key_before_size.saturating_sub(key_after_size); - metrics - .pushdown - .parts_projection_trimmed_bytes - .inc_by(u64::cast_from(key_diff)); - - ( - decode::("key", &*read.key, &key), - decode::("val", &*read.val, &val), - ) - } + PartMigration::Either { + write: _, + read: _, + key_migration, + val_migration, + } => { + let start = Instant::now(); + let key = key_migration.migrate(structured.key); + let val = val_migration.migrate(structured.val); + metrics + .schema + .migration_migrate_seconds + .inc_by(start.elapsed().as_secs_f64()); + ColumnarRecordsStructuredExt { key, val } + } + }; + + let read_schema = migration.codec_read(); + let key = K::Schema::decoder_any(&*read_schema.key, &*structured.key); + let val = V::Schema::decoder_any(&*read_schema.val, &*structured.val); + + match &key { + Ok(key_decoder) => { + let key_size_after = key_decoder.goodbytes(); + let key_diff = key_size_before.saturating_sub(key_size_after); + metrics + .pushdown + .parts_projection_trimmed_bytes + .inc_by(u64::cast_from(key_diff)); + } + Err(e) => { + soft_panic_or_log!("failed to create decoder: {e:#?}"); } } - _ => (None, None), + + Some((key.ok()?, val.ok()?)) }; - // Fill in the codec data, since decode expects it. - // TODO(structured): make reading the codec data optional. - let write_schemas = migration.structured_write(); - let codec_part = updates - .get_or_make_codec::(write_schemas.key.as_ref(), write_schemas.val.as_ref()); + let updates = part.normalize(&metrics.columnar); + let timestamps = updates.timestamps().clone(); + let diffs = updates.diffs().clone(); + let part = match updates { + // If only one encoding is available, decode via that encoding. + BlobTraceUpdates::Row(records) => EitherOrBoth::Left(records), + BlobTraceUpdates::Structured { key_values, .. } => EitherOrBoth::Right( + // The structured-only data format was added after schema ids were recorded everywhere, + // so we expect this data to be present. + downcast_structured(key_values).expect("valid schemas for structured data"), + ), + // If both are available, respect the specified part decode format. + BlobTraceUpdates::Both(records, ext) => match part_decode_format { + PartDecodeFormat::Row { + validate_structured: false, + } => EitherOrBoth::Left(records), + PartDecodeFormat::Row { + validate_structured: true, + } => match downcast_structured(ext) { + Some(decoders) => EitherOrBoth::Both(records, decoders), + None => EitherOrBoth::Left(records), + }, + PartDecodeFormat::Arrow => match downcast_structured(ext) { + Some(decoders) => EitherOrBoth::Right(decoders), + None => EitherOrBoth::Left(records), + }, + }, + }; FetchedPart { metrics, ts_filter, - part: codec_part.clone(), - structured_part, - part_decode_format, + part, + timestamps, + diffs, migration, filter_pushdown_audit, part_cursor: 0, @@ -923,10 +913,11 @@ where val: &mut Option, result_override: Option<(K, V)>, ) -> Option<((Result, Result), T, D)> { - while let Some(((k, v), t, d)) = self.part.get(self.part_cursor) { + while self.part_cursor < self.timestamps.len() { + let (k, v) = self.decode_kv(self.part_cursor, key, val); + let mut t = T::decode(self.timestamps.values()[self.part_cursor].to_le_bytes()); + let mut d = D::decode(self.diffs.values()[self.part_cursor].to_le_bytes()); self.part_cursor += 1; - let mut t = T::decode(t); - let mut d = D::decode(d); if !self.ts_filter.filter_ts(&mut t) { continue; @@ -934,12 +925,10 @@ where // If `filter_ts` advances our timestamp, we may end up with the same K, V, T in successive // records. If so, opportunistically consolidate those out. - while let Some(((k_next, v_next), t_next, d_next)) = self.part.get(self.part_cursor) { - if (k, v) != (k_next, v_next) { - break; - } + while self.part_cursor < self.timestamps.len() { + let mut t_next = + T::decode(self.timestamps.values()[self.part_cursor].to_le_bytes()); - let mut t_next = T::decode(t_next); if !self.ts_filter.filter_ts(&mut t_next) { break; } @@ -948,9 +937,15 @@ where break; } + let (k_next, v_next) = self.decode_kv(self.part_cursor, key, val); + if (&k, &v) != (&k_next, &v_next) { + break; + } + // All equal... consolidate! + let d_next = D::decode(self.diffs.values()[self.part_cursor].to_le_bytes()); self.part_cursor += 1; - d.plus_equals(&D::decode(d_next)); + d.plus_equals(&d_next); } // If multiple updates consolidate out entirely, drop the record. @@ -962,39 +957,39 @@ where return Some(((Ok(key), Ok(val)), t, d)); } - // TODO: Putting this here relies on the Codec data still being - // populated (i.e. for the consolidate optimization above). - // Eventually we'll have to rewrite this path to work entirely - // without Codec data, but in the meantime, putting in here allows - // us to see the performance impact of decoding from arrow instead - // of Codec. - // - // Plus, it'll likely be easier to port all the logic here to work - // solely on arrow data once we finish migrating things like the - // ConsolidatingIter. - if let ((Some(keys), Some(vals)), PartDecodeFormat::Arrow) = - (&self.structured_part, self.part_decode_format) - { - let (k, v) = self.decode_structured(self.part_cursor - 1, keys, vals, key, val); - return Some(((k, v), t, d)); - } + return Some(((k, v), t, d)); + } + None + } - let (k, v) = Self::decode_codec( - &self.metrics, - self.migration.codec_read(), - k, - v, - key, - val, - &mut self.key_storage, - &mut self.val_storage, - ); - // Note: We only provide structured columns, if they were originally written, and a - // dyncfg was specified to run validation. - if let (Some(keys), Some(vals)) = &self.structured_part { - let (k_s, v_s) = - self.decode_structured(self.part_cursor - 1, keys, vals, &mut None, &mut None); + fn decode_kv( + &mut self, + index: usize, + key: &mut Option, + val: &mut Option, + ) -> (Result, Result) { + let decoded = self + .part + .as_ref() + .map_left(|codec| { + let ((ck, cv), _, _) = codec.get(index).expect("valid index"); + Self::decode_codec( + &*self.metrics, + self.migration.codec_read(), + ck, + cv, + key, + val, + &mut self.key_storage, + &mut self.val_storage, + ) + }) + .map_right(|(structured_key, structured_val)| { + self.decode_structured(index, structured_key, structured_val, key, val) + }); + match decoded { + EitherOrBoth::Both((k, v), (k_s, v_s)) => { // Purposefully do not trace to prevent blowing up Sentry. let is_valid = self .metrics @@ -1015,11 +1010,12 @@ where if !is_valid { soft_panic_no_log!("structured val did not match, {v_s:?} != {v:?}"); } - } - return Some(((k, v), t, d)); + (k, v) + } + EitherOrBoth::Left(kv) => kv, + EitherOrBoth::Right(kv) => kv, } - None } fn decode_codec( @@ -1090,7 +1086,7 @@ where fn size_hint(&self) -> (usize, Option) { // We don't know in advance how restrictive the filter will be. - let max_len = self.part.len(); + let max_len = self.timestamps.len(); (0, Some(max_len)) } } diff --git a/src/persist-client/src/schema.rs b/src/persist-client/src/schema.rs index 8d6aa5e43b80a..fed263f07167b 100644 --- a/src/persist-client/src/schema.rs +++ b/src/persist-client/src/schema.rs @@ -402,16 +402,6 @@ impl PartMigration { PartMigration::Either { read, .. } => read, } } - - pub(crate) fn structured_write(&self) -> &Schemas { - match self { - PartMigration::SameSchema { both } => both, - // We should have a write schema set for all parts that are written with the structured- - // only parquet format. - PartMigration::Codec { read } => read, - PartMigration::Either { write, .. } => write, - } - } } /// Returns if `new` is at least as nullable as `old`.