Skip to content

Commit

Permalink
Normalize away the encoded part
Browse files Browse the repository at this point in the history
We built this for the streaming iterator, but it works well here too.
  • Loading branch information
bkirwi committed Jan 21, 2025
1 parent 74c5052 commit 27438aa
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 126 deletions.
17 changes: 11 additions & 6 deletions src/persist-client/src/cli/inspect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::async_runtime::IsolatedRuntime;
use crate::cache::StateCache;
use crate::cli::args::{make_blob, make_consensus, StateArgs, NO_COMMIT, READ_ALL_BUILD_INFO};
use crate::error::CodecConcreteType;
use crate::fetch::{Cursor, EncodedPart};
use crate::fetch::EncodedPart;
use crate::internal::encoding::{Rollup, UntypedState};
use crate::internal::paths::{
BlobKey, BlobKeyPrefix, PartialBatchKey, PartialBlobKey, PartialRollupKey, WriterKey,
Expand Down Expand Up @@ -361,15 +361,19 @@ pub async fn blob_batch_part(
desc,
updates: Vec::new(),
};
let mut cursor = Cursor::default();
while let Some(((k, v, t, d), _)) = cursor.pop(&encoded_part) {
let records = encoded_part.normalize(&metrics.columnar);
for ((k, v), t, d) in records
.records()
.expect("only implemented for records")
.iter()
{
if out.updates.len() > limit {
break;
}
out.updates.push(BatchPartUpdate {
k: format!("{:?}", PrettyBytes(k)),
v: format!("{:?}", PrettyBytes(v)),
t,
t: u64::from_le_bytes(t),
d: i64::from_le_bytes(d),
});
}
Expand Down Expand Up @@ -408,8 +412,9 @@ async fn consolidated_size(args: &StateArgs) -> Result<(), anyhow::Error> {
)
.await
.expect("part exists");
let mut cursor = Cursor::default();
while let Some(((k, v, mut t, d), _)) = cursor.pop(&encoded_part) {
let part = encoded_part.normalize(&state_versions.metrics.columnar);
for ((k, v), t, d) in part.records().expect("codec records").iter() {
let mut t = <u64 as Codec64>::decode(t);
t.advance_by(as_of);
let d = <i64 as Codec64>::decode(d);
updates.push(((k.to_owned(), v.to_owned()), t, d));
Expand Down
144 changes: 29 additions & 115 deletions src/persist-client/src/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedBlob<K, V,
pub struct FetchedPart<K: Codec, V: Codec, T, D> {
metrics: Arc<Metrics>,
ts_filter: FetchBatchFilter<T>,
part: EncodedPart<T>,
part: ColumnarRecords,
// If migration is Either, then the columnar one will have already been
// applied here.
structured_part: (
Expand All @@ -732,7 +732,7 @@ pub struct FetchedPart<K: Codec, V: Codec, T, D> {
part_decode_format: PartDecodeFormat,
migration: PartMigration<K, V>,
filter_pushdown_audit: Option<LazyPartStats>,
part_cursor: Cursor,
part_cursor: usize,
key_storage: Option<K::Storage>,
val_storage: Option<V::Storage>,

Expand All @@ -742,7 +742,7 @@ pub struct FetchedPart<K: Codec, V: Codec, T, D> {
impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedPart<K, V, T, D> {
fn new(
metrics: Arc<Metrics>,
mut part: EncodedPart<T>,
part: EncodedPart<T>,
migration: PartMigration<K, V>,
ts_filter: FetchBatchFilter<T>,
filter_pushdown_audit: bool,
Expand Down Expand Up @@ -793,11 +793,13 @@ impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedPart<K, V,
} => validate_structured,
PartDecodeFormat::Arrow => true,
};
let structured_part = match (&part.part.updates, should_downcast) {

let mut updates = part.normalize(&metrics.columnar);
let structured_part = match (updates.structured(), should_downcast) {
// Only downcast and create decoders if we have structured data AND
// (an audit of the data is requested OR we'd like to decode
// directly from the structured data).
(BlobTraceUpdates::Both(_codec, structured), true) => {
(Some(structured), true) => {
fn decode<C: Codec>(
name: &str,
schema: &C::Schema,
Expand Down Expand Up @@ -865,19 +867,18 @@ impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedPart<K, V,
// Fill in the codec data, since decode expects it.
// TODO(structured): make reading the codec data optional.
let write_schemas = migration.structured_write();
part.part
.updates
let codec_part = updates
.get_or_make_codec::<K, V>(write_schemas.key.as_ref(), write_schemas.val.as_ref());

FetchedPart {
metrics,
ts_filter,
part,
part: codec_part.clone(),
structured_part,
part_decode_format,
migration,
filter_pushdown_audit,
part_cursor: Cursor::default(),
part_cursor: 0,
key_storage: None,
val_storage: None,
_phantom: PhantomData,
Expand Down Expand Up @@ -922,30 +923,33 @@ where
val: &mut Option<V>,
result_override: Option<(K, V)>,
) -> Option<((Result<K, String>, Result<V, String>), T, D)> {
while let Some(((k, v, mut t, d), idx)) = self.part_cursor.pop(&self.part) {
while let Some(((k, v), t, d)) = self.part.get(self.part_cursor) {
self.part_cursor += 1;
let mut t = T::decode(t);
let mut d = D::decode(d);

if !self.ts_filter.filter_ts(&mut t) {
continue;
}

let mut d = D::decode(d);

// If `filter_ts` advances our timestamp, we may end up with the same K, V, T in successive
// records. If so, opportunistically consolidate those out.
while let Some((k_next, v_next, mut t_next, d_next)) = self.part_cursor.peek(&self.part)
{
while let Some(((k_next, v_next), t_next, d_next)) = self.part.get(self.part_cursor) {
if (k, v) != (k_next, v_next) {
break;
}

let mut t_next = T::decode(t_next);
if !self.ts_filter.filter_ts(&mut t_next) {
break;
}

if t != t_next {
break;
}

// All equal... consolidate!
self.part_cursor.idx += 1;
self.part_cursor += 1;
d.plus_equals(&D::decode(d_next));
}

Expand All @@ -971,7 +975,7 @@ where
if let ((Some(keys), Some(vals)), PartDecodeFormat::Arrow) =
(&self.structured_part, self.part_decode_format)
{
let (k, v) = self.decode_structured(idx, keys, vals, key, val);
let (k, v) = self.decode_structured(self.part_cursor - 1, keys, vals, key, val);
return Some(((k, v), t, d));
}

Expand All @@ -988,7 +992,8 @@ where
// Note: We only provide structured columns, if they were originally written, and a
// dyncfg was specified to run validation.
if let (Some(keys), Some(vals)) = &self.structured_part {
let (k_s, v_s) = self.decode_structured(idx, keys, vals, &mut None, &mut None);
let (k_s, v_s) =
self.decode_structured(self.part_cursor - 1, keys, vals, &mut None, &mut None);

// Purposefully do not trace to prevent blowing up Sentry.
let is_valid = self
Expand Down Expand Up @@ -1085,7 +1090,7 @@ where

fn size_hint(&self) -> (usize, Option<usize>) {
// We don't know in advance how restrictive the filter will be.
let max_len = self.part.part.updates.len();
let max_len = self.part.len();
(0, Some(max_len))
}
}
Expand Down Expand Up @@ -1294,6 +1299,12 @@ where
timestamps = realloc_array(&timestamps, metrics);
}

if self.ts_rewrite.is_some() {
self.metrics
.ts_rewrite
.inc_by(u64::cast_from(timestamps.len()));
}

match (codec, structured) {
(Some((key, value)), None) => {
BlobTraceUpdates::Row(ColumnarRecords::new(key, value, timestamps, diffs))
Expand All @@ -1311,103 +1322,6 @@ where
}
}

/// A pointer into a particular encoded part, with methods for fetching an update and
/// scanning forward to the next. It is an error to use the same cursor for distinct
/// parts.
///
/// We avoid implementing copy to make it hard to accidentally duplicate a cursor. However,
/// clone is very cheap.
#[derive(Debug, Clone, Default)]
pub(crate) struct Cursor {
idx: usize,
}

impl Cursor {
/// Get the tuple at the specified pair of indices. If there is no such tuple,
/// either because we are out of range or because this tuple has been filtered out,
/// this returns `None`.
pub fn get<'a, T: Timestamp + Lattice + Codec64>(
&self,
encoded: &'a EncodedPart<T>,
) -> Option<(&'a [u8], &'a [u8], T, [u8; 8])> {
// TODO(structured): replace before allowing structured-only parts
let part = encoded
.part
.updates
.records()
.expect("created cursor for non-codec data");
let ((k, v), t, d) = part.get(self.idx)?;

let mut t = T::decode(t);
// We assert on the write side that at most one of rewrite or
// truncation is used, so it shouldn't matter which is run first.
//
// That said, my (Dan's) intuition here is that rewrite goes first,
// though I don't particularly have a justification for it.
if let Some(ts_rewrite) = encoded.ts_rewrite.as_ref() {
t.advance_by(ts_rewrite.borrow());
encoded.metrics.ts_rewrite.inc();
}

// This filtering is really subtle, see the comment above for
// what's going on here.
let truncated_t = encoded.needs_truncation && {
!encoded.registered_desc.lower().less_equal(&t)
|| encoded.registered_desc.upper().less_equal(&t)
};
if truncated_t {
return None;
}
Some((k, v, t, d))
}

/// A cursor points to a particular update in the backing part data.
/// If the update it points to is not valid, advance it to the next valid update
/// if there is one, and return the pointed-to data.
pub fn peek<'a, T: Timestamp + Lattice + Codec64>(
&mut self,
part: &'a EncodedPart<T>,
) -> Option<(&'a [u8], &'a [u8], T, [u8; 8])> {
while !self.is_exhausted(part) {
let current = self.get(part);
if current.is_some() {
return current;
}
self.advance(part);
}
None
}

/// Similar to peek, but advance the cursor just past the end of the most recent update.
/// Returns the update and the `(part_idx, idx)` that is was popped at.
pub fn pop<'a, T: Timestamp + Lattice + Codec64>(
&mut self,
part: &'a EncodedPart<T>,
) -> Option<((&'a [u8], &'a [u8], T, [u8; 8]), usize)> {
while !self.is_exhausted(part) {
let current = self.get(part);
let popped_idx = self.idx;
self.advance(part);
if current.is_some() {
return current.map(|p| (p, popped_idx));
}
}
None
}

/// Returns true if the cursor is past the end of the part data.
pub fn is_exhausted<T: Timestamp + Codec64>(&self, part: &EncodedPart<T>) -> bool {
self.idx >= part.part.updates.len()
}

/// Advance the cursor just past the end of the most recent update, if there is one.
pub fn advance<T: Timestamp + Codec64>(&mut self, part: &EncodedPart<T>) {
if !self.is_exhausted(part) {
self.idx += 1;
}
}
}

/// This represents the serde encoding for [`LeasedBatchPart`]. We expose the struct
/// itself (unlike other encodable structs) to attempt to provide stricter drop
/// semantics on `LeasedBatchPart`, i.e. `SerdeLeasedBatchPart` is exchangeable
Expand Down
12 changes: 7 additions & 5 deletions src/persist-client/src/internal/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1422,7 +1422,7 @@ pub mod datadriven {
BatchParts, BLOB_TARGET_SIZE, BUILDER_STRUCTURED, STRUCTURED_ORDER,
};
use crate::cfg::COMPACTION_MEMORY_BOUND_BYTES;
use crate::fetch::{Cursor, EncodedPart};
use crate::fetch::EncodedPart;
use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
use crate::internal::datadriven::DirectiveArgs;
use crate::internal::encoding::Schemas;
Expand Down Expand Up @@ -1894,9 +1894,10 @@ pub mod datadriven {
)
.await
.expect("invalid batch part");
let mut cursor = Cursor::default();
while let Some(((k, _v, t, d), _)) = cursor.pop(&part) {
let part = part.normalize(&datadriven.client.metrics.columnar);
for ((k, _v), t, d) in part.records().expect("codec records").iter() {
let (k, d) = (String::decode(k, &StringSchema).unwrap(), i64::decode(d));
let t = u64::from_le_bytes(t);
write!(s, "{k} {t} {d}\n");
}
}
Expand Down Expand Up @@ -2148,10 +2149,11 @@ pub mod datadriven {
)
.await
.expect("invalid batch part");
let part = part.normalize(&datadriven.client.metrics.columnar);

let mut updates = Vec::new();
let mut cursor = Cursor::default();
while let Some(((k, _v, mut t, d), _)) = cursor.pop(&part) {
for ((k, _v), t, d) in part.records().expect("codec data").iter() {
let mut t = u64::decode(t);
t.advance_by(as_of.borrow());
updates.push((
String::decode(k, &StringSchema).unwrap(),
Expand Down

0 comments on commit 27438aa

Please sign in to comment.