From 43bc2c247b17da6fa224a2f3124cc0c44f2fd154 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Mon, 20 Jan 2025 14:29:26 -0500 Subject: [PATCH] Refactor FetchedPart to avoid as much decoding as possible In particular, don't decode K/Vs when we have an override set, and make sure every K/V only gets decoded once. --- src/persist-client/src/fetch.rs | 80 +++++++++++++++++---------------- 1 file changed, 42 insertions(+), 38 deletions(-) diff --git a/src/persist-client/src/fetch.rs b/src/persist-client/src/fetch.rs index b8810e8125b82..7d294d6eab23d 100644 --- a/src/persist-client/src/fetch.rs +++ b/src/persist-client/src/fetch.rs @@ -736,6 +736,7 @@ pub struct FetchedPart { diffs: Int64Array, migration: PartMigration, filter_pushdown_audit: Option, + peek_stash: Option<((Result, Result), T, D)>, part_cursor: usize, key_storage: Option, val_storage: Option, @@ -864,6 +865,7 @@ impl FetchedPart, result_override: Option<(K, V)>, ) -> Option<((Result, Result), T, D)> { - while self.part_cursor < self.timestamps.len() { - let (k, v) = self.decode_kv(self.part_cursor, key, val); - let mut t = T::decode(self.timestamps.values()[self.part_cursor].to_le_bytes()); - let mut d = D::decode(self.diffs.values()[self.part_cursor].to_le_bytes()); - self.part_cursor += 1; - - if !self.ts_filter.filter_ts(&mut t) { - continue; - } - - // If `filter_ts` advances our timestamp, we may end up with the same K, V, T in successive - // records. If so, opportunistically consolidate those out. - while self.part_cursor < self.timestamps.len() { - let mut t_next = - T::decode(self.timestamps.values()[self.part_cursor].to_le_bytes()); - - if !self.ts_filter.filter_ts(&mut t_next) { - break; + let mut consolidated = self.peek_stash.take(); + loop { + // Fetch and decode the next tuple in the sequence. (Or break if there is none.) + let next = if self.part_cursor < self.timestamps.len() { + let next_idx = self.part_cursor; + self.part_cursor += 1; + let mut t = T::decode(self.timestamps.values()[next_idx].to_le_bytes()); + if !self.ts_filter.filter_ts(&mut t) { + continue; } - - if t != t_next { - break; + let d = D::decode(self.diffs.values()[next_idx].to_le_bytes()); + if d.is_zero() { + continue; } + let kv = if result_override.is_none() { + self.decode_kv(next_idx, key, val) + } else { + // This will be overridden later - just leave a placeholder here for now. + (Err("".to_string()), Err("".to_string())) + }; + (kv, t, d) + } else { + break; + }; - let (k_next, v_next) = self.decode_kv(self.part_cursor, key, val); - if (&k, &v) != (&k_next, &v_next) { + // Attempt to consolidate in the next tuple, stashing it if that's not possible. + if let Some((kv, t, d)) = &mut consolidated { + let (kv_next, t_next, d_next) = &next; + if kv == kv_next && t == t_next { + d.plus_equals(d_next); + if d.is_zero() { + consolidated = None; + } + } else { + self.peek_stash = Some(next); break; } - - // All equal... consolidate! - let d_next = D::decode(self.diffs.values()[self.part_cursor].to_le_bytes()); - self.part_cursor += 1; - d.plus_equals(&d_next); - } - - // If multiple updates consolidate out entirely, drop the record. - if d.is_zero() { - continue; + } else { + consolidated = Some(next); } + } - if let Some((key, val)) = result_override { - return Some(((Ok(key), Ok(val)), t, d)); - } + let (kv, t, d) = consolidated?; - return Some(((k, v), t, d)); + if let Some((key, val)) = result_override { + return Some(((Ok(key), Ok(val)), t, d)); } - None + + Some((kv, t, d)) } fn decode_kv(