Skip to content

Commit

Permalink
Refactor FetchedPart to avoid as much decoding as possible
Browse files Browse the repository at this point in the history
In particular, don't decode K/Vs when we have an override set, and make
sure every K/V only gets decoded once.
  • Loading branch information
bkirwi committed Jan 21, 2025
1 parent 6078362 commit e60157d
Showing 1 changed file with 42 additions and 38 deletions.
80 changes: 42 additions & 38 deletions src/persist-client/src/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,7 @@ pub struct FetchedPart<K: Codec, V: Codec, T, D> {
diffs: Int64Array,
migration: PartMigration<K, V>,
filter_pushdown_audit: Option<LazyPartStats>,
peek_stash: Option<((Result<K, String>, Result<V, String>), T, D)>,
part_cursor: usize,
key_storage: Option<K::Storage>,
val_storage: Option<V::Storage>,
Expand Down Expand Up @@ -864,6 +865,7 @@ impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedPart<K, V,
metrics,
ts_filter,
part,
peek_stash: None,
timestamps,
diffs,
migration,
Expand Down Expand Up @@ -913,53 +915,55 @@ where
val: &mut Option<V>,
result_override: Option<(K, V)>,
) -> Option<((Result<K, String>, Result<V, String>), T, D)> {
while self.part_cursor < self.timestamps.len() {
let (k, v) = self.decode_kv(self.part_cursor, key, val);
let mut t = T::decode(self.timestamps.values()[self.part_cursor].to_le_bytes());
let mut d = D::decode(self.diffs.values()[self.part_cursor].to_le_bytes());
self.part_cursor += 1;

if !self.ts_filter.filter_ts(&mut t) {
continue;
}

// If `filter_ts` advances our timestamp, we may end up with the same K, V, T in successive
// records. If so, opportunistically consolidate those out.
while self.part_cursor < self.timestamps.len() {
let mut t_next =
T::decode(self.timestamps.values()[self.part_cursor].to_le_bytes());

if !self.ts_filter.filter_ts(&mut t_next) {
break;
let mut consolidated = self.peek_stash.take();
loop {
// Fetch and decode the next tuple in the sequence. (Or break if there is none.)
let next = if self.part_cursor < self.timestamps.len() {
let next_idx = self.part_cursor;
self.part_cursor += 1;
let mut t = T::decode(self.timestamps.values()[next_idx].to_le_bytes());
if !self.ts_filter.filter_ts(&mut t) {
continue;
}

if t != t_next {
break;
let d = D::decode(self.diffs.values()[next_idx].to_le_bytes());
if d.is_zero() {
continue;
}
let kv = if result_override.is_none() {
self.decode_kv(next_idx, key, val)
} else {
// This will be overridden later - just leave a placeholder here for now.
(Err("".to_string()), Err("".to_string()))
};
(kv, t, d)
} else {
break;
};

let (k_next, v_next) = self.decode_kv(self.part_cursor, key, val);
if (&k, &v) != (&k_next, &v_next) {
// Attempt to consolidate in the next tuple, stashing it if that's not possible.
if let Some((kv, t, d)) = &mut consolidated {
let (kv_next, t_next, d_next) = &next;
if kv == kv_next && t == t_next {
d.plus_equals(d_next);
if d.is_zero() {
consolidated = None;
}
} else {
self.peek_stash = Some(next);
break;
}

// All equal... consolidate!
let d_next = D::decode(self.diffs.values()[self.part_cursor].to_le_bytes());
self.part_cursor += 1;
d.plus_equals(&d_next);
}

// If multiple updates consolidate out entirely, drop the record.
if d.is_zero() {
continue;
} else {
consolidated = Some(next);
}
}

if let Some((key, val)) = result_override {
return Some(((Ok(key), Ok(val)), t, d));
}
let (kv, t, d) = consolidated?;

return Some(((k, v), t, d));
if let Some((key, val)) = result_override {
return Some(((Ok(key), Ok(val)), t, d));
}
None

Some((kv, t, d))
}

fn decode_kv(
Expand Down

0 comments on commit e60157d

Please sign in to comment.