From b6c1b0a415d4f3e601f72b3b7dbd8fa012945fa9 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 23 Jan 2025 09:12:12 -0500 Subject: [PATCH] Slow the rate of growth in `ConsolidatingVec` (#31077) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The growth of `ConsolidatingVec` was by doubling, which meant that we can end up with substantially more memory use than we really need. This PR attempts to make this configurable, growing by a smaller factor determined by a positive integer `slop`. If the result of consolidation is (1 - 1/slop)-full, we grow the vector by a factor of 1 + 1/(slop-1). These relationships were chosen to match the 2x behavior, but it's possible that they are imperfect. I'm not certain how to test the impact of this, and whether we have "MV rehydration memory" tests, but that would be the right way to see if this is helpful. It comes at the potential expense of CPU. ### Motivation ### Tips for reviewer ### Checklist - [ ] This PR has adequate test coverage / QA involvement has been duly considered. ([trigger-ci for additional test/nightly runs](https://trigger-ci.dev.materialize.com/)) - [ ] This PR has an associated up-to-date [design doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md), is a design doc ([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)), or is sufficiently small to not require a design. - [ ] If this PR evolves [an existing `$T ⇔ Proto$T` mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md) (possibly in a backwards-incompatible way), then it is tagged with a `T-proto` label. - [ ] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label ([example](https://github.com/MaterializeInc/cloud/pull/5021)). - [ ] If this PR includes major [user-facing behavior changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note), I have pinged the relevant PM to schedule a changelog post. --- src/compute-types/src/dyncfgs.rs | 9 ++++ src/compute/src/render/continual_task.rs | 2 +- src/compute/src/sink/correction.rs | 45 +++++++++++++++----- src/compute/src/sink/materialized_view.rs | 14 +++++- src/compute/src/sink/materialized_view_v2.rs | 19 +++++++-- 5 files changed, 72 insertions(+), 17 deletions(-) diff --git a/src/compute-types/src/dyncfgs.rs b/src/compute-types/src/dyncfgs.rs index d862efadcae86..b952c5528ea9f 100644 --- a/src/compute-types/src/dyncfgs.rs +++ b/src/compute-types/src/dyncfgs.rs @@ -99,6 +99,14 @@ pub const LGALLOC_SLOW_CLEAR_BYTES: Config = Config::new( "Clear byte size per size class for every invocation", ); +/// The term `n` in the growth rate `1 + 1/(n + 1)` for `ConsolidatingVec`. +/// The smallest value `0` corresponds to the greatest allowed growth, of doubling. +pub const CONSOLIDATING_VEC_GROWTH_DAMPENER: Config = Config::new( + "consolidating_vec_growth_dampener", + 0, + "Dampener in growth rate for consolidating vector size", +); + /// The number of dataflows that may hydrate concurrently. pub const HYDRATION_CONCURRENCY: Config = Config::new( "compute_hydration_concurrency", @@ -179,4 +187,5 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { .add(&ENABLE_COMPUTE_REPLICA_EXPIRATION) .add(&COMPUTE_REPLICA_EXPIRATION_OFFSET) .add(&COMPUTE_APPLY_COLUMN_DEMANDS) + .add(&CONSOLIDATING_VEC_GROWTH_DAMPENER) } diff --git a/src/compute/src/render/continual_task.rs b/src/compute/src/render/continual_task.rs index 16be4f1557024..7c336c0cfd8f9 100644 --- a/src/compute/src/render/continual_task.rs +++ b/src/compute/src/render/continual_task.rs @@ -670,7 +670,7 @@ impl SinkState { SinkState { append_times: BTreeSet::new(), append_times_progress: Antichain::from_elem(Timestamp::minimum()), - to_append: ConsolidatingVec::with_min_capacity(128), + to_append: ConsolidatingVec::new(128, 0), to_append_progress: Antichain::from_elem(Timestamp::minimum()), output_progress: Antichain::from_elem(Timestamp::minimum()), } diff --git a/src/compute/src/sink/correction.rs b/src/compute/src/sink/correction.rs index aff9b8e433e9a..a59a9a68bc32c 100644 --- a/src/compute/src/sink/correction.rs +++ b/src/compute/src/sink/correction.rs @@ -47,17 +47,24 @@ pub(super) struct Correction { metrics: SinkMetrics, /// Per-worker persist sink metrics. worker_metrics: SinkWorkerMetrics, + /// Configuration for `ConsolidatingVec` driving the growth rate down from doubling. + growth_dampener: usize, } impl Correction { /// Construct a new `Correction` instance. - pub fn new(metrics: SinkMetrics, worker_metrics: SinkWorkerMetrics) -> Self { + pub fn new( + metrics: SinkMetrics, + worker_metrics: SinkWorkerMetrics, + growth_dampener: usize, + ) -> Self { Self { updates: Default::default(), since: Antichain::from_elem(Timestamp::MIN), total_size: Default::default(), metrics, worker_metrics, + growth_dampener, } } @@ -125,7 +132,8 @@ impl Correction { use std::collections::btree_map::Entry; match self.updates.entry(time) { Entry::Vacant(entry) => { - let vec: ConsolidatingVec<_> = data.collect(); + let mut vec: ConsolidatingVec<_> = data.collect(); + vec.growth_dampener = self.growth_dampener; new_size += (vec.len(), vec.capacity()); entry.insert(vec); } @@ -304,13 +312,21 @@ pub(crate) struct ConsolidatingVec { /// A lower bound for how small we'll shrink the Vec's capacity. NB: The cap /// might start smaller than this. min_capacity: usize, + /// Dampener in the growth rate. 0 corresponds to doubling and in general `n` to `1+1/(n+1)`. + /// + /// If consolidation didn't free enough space, at least a linear amount, increase the capacity + /// Setting this to 0 results in doubling whenever the list is at least half full. + /// Larger numbers result in more conservative approaches that use more CPU, but less memory. + growth_dampener: usize, } impl ConsolidatingVec { - pub fn with_min_capacity(min_capacity: usize) -> Self { + /// Creates a new instance from the necessary configuration arguments. + pub fn new(min_capacity: usize, growth_dampener: usize) -> Self { ConsolidatingVec { data: Vec::new(), min_capacity, + growth_dampener, } } @@ -326,21 +342,27 @@ impl ConsolidatingVec { /// Pushes `item` into the vector. /// - /// If the vector does not have sufficient capacity, we try to consolidate and/or double its - /// capacity. + /// If the vector does not have sufficient capacity, we'll first consolidate and then increase + /// its capacity if the consolidated results still occupy a significant fraction of the vector. /// /// The worst-case cost of this function is O(n log n) in the number of items the vector stores, - /// but amortizes to O(1). + /// but amortizes to O(log n). pub fn push(&mut self, item: (D, Diff)) { let capacity = self.data.capacity(); if self.data.len() == capacity { // The vector is full. First, consolidate to try to recover some space. self.consolidate(); - // If consolidation didn't free at least half the available capacity, double the - // capacity. This ensures we won't consolidate over and over again with small gains. - if self.data.len() > capacity / 2 { - self.data.reserve(capacity); + // We may need more capacity if our current capacity is within `1+1/(n+1)` of the length. + // This corresponds to `cap < len + len/(n+1)`, which is the logic we use. + let length = self.data.len(); + let dampener = self.growth_dampener; + if capacity < length + length / (dampener + 1) { + // We would like to increase the capacity by a factor of `1+1/(n+1)`, which involves + // determining the target capacity, and then reserving an amount that achieves this + // while working around the existing length. + let new_cap = capacity + capacity / (dampener + 1); + self.data.reserve_exact(new_cap - length); } } @@ -352,7 +374,7 @@ impl ConsolidatingVec { consolidate(&mut self.data); // We may have the opportunity to reclaim allocated memory. - // Given that `push` will double the capacity when the vector is more than half full, and + // Given that `push` will at most double the capacity when the vector is more than half full, and // we want to avoid entering into a resizing cycle, we choose to only shrink if the // vector's length is less than one fourth of its capacity. if self.data.len() < self.data.capacity() / 4 { @@ -388,6 +410,7 @@ impl FromIterator<(D, Diff)> for ConsolidatingVec { Self { data: Vec::from_iter(iter), min_capacity: 0, + growth_dampener: 0, } } } diff --git a/src/compute/src/sink/materialized_view.rs b/src/compute/src/sink/materialized_view.rs index f4f71e1fc91d2..a38b9ff4a73c8 100644 --- a/src/compute/src/sink/materialized_view.rs +++ b/src/compute/src/sink/materialized_view.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use differential_dataflow::lattice::Lattice; use differential_dataflow::{Collection, Hashable}; use futures::StreamExt; +use mz_compute_types::dyncfgs::CONSOLIDATING_VEC_GROWTH_DAMPENER; use mz_compute_types::dyncfgs::ENABLE_MATERIALIZED_VIEW_SINK_V2; use mz_compute_types::sinks::{ComputeSinkDesc, MaterializedViewSinkConnection}; use mz_ore::cast::CastFrom; @@ -222,6 +223,8 @@ where compute_state, ); + let growth_dampener = CONSOLIDATING_VEC_GROWTH_DAMPENER.get(&compute_state.worker_config); + let (written_batches, write_token) = write_batches( sink_id.clone(), operator_name.clone(), @@ -233,6 +236,7 @@ where &persist_errs, Arc::clone(&persist_clients), compute_state.read_only_rx.clone(), + growth_dampener, ); let append_token = append_batches( @@ -608,6 +612,7 @@ fn write_batches( persist_errs: &Stream, persist_clients: Arc, mut read_only: watch::Receiver, + growth_dampener: usize, ) -> (Stream, Rc) where G: Scope, @@ -667,8 +672,13 @@ where // Contains `desired - persist`, reflecting the updates we would like to commit // to `persist` in order to "correct" it to track `desired`. These collections are // only modified by updates received from either the `desired` or `persist` inputs. - let mut correction_oks = Correction::new(sink_metrics.clone(), sink_worker_metrics.clone()); - let mut correction_errs = Correction::new(sink_metrics, sink_worker_metrics); + let mut correction_oks = Correction::new( + sink_metrics.clone(), + sink_worker_metrics.clone(), + growth_dampener, + ); + let mut correction_errs = + Correction::new(sink_metrics, sink_worker_metrics, growth_dampener); // Contains descriptions of batches for which we know that we can // write data. We got these from the "centralized" operator that diff --git a/src/compute/src/sink/materialized_view_v2.rs b/src/compute/src/sink/materialized_view_v2.rs index 6f849d4cda3b6..6d2e603fa1e8a 100644 --- a/src/compute/src/sink/materialized_view_v2.rs +++ b/src/compute/src/sink/materialized_view_v2.rs @@ -115,6 +115,7 @@ use std::sync::Arc; use differential_dataflow::{Collection, Hashable}; use futures::StreamExt; +use mz_compute_types::dyncfgs::CONSOLIDATING_VEC_GROWTH_DAMPENER; use mz_ore::cast::CastFrom; use mz_persist_client::batch::{Batch, ProtoBatch}; use mz_persist_client::cache::PersistClientCache; @@ -204,6 +205,8 @@ where &desired, ); + let growth_dampener = CONSOLIDATING_VEC_GROWTH_DAMPENER.get(&compute_state.worker_config); + let (batches, write_token) = write::render( sink_id, persist_api.clone(), @@ -211,6 +214,7 @@ where &desired, &persist, &descs, + growth_dampener, ); let append_token = append::render(sink_id, persist_api, active_worker_id, &descs, &batches); @@ -668,6 +672,7 @@ mod write { desired: &DesiredStreams, persist: &PersistStreams, descs: &Stream, + growth_dampener: usize, ) -> (BatchesStream, PressOnDropButton) where S: Scope, @@ -702,7 +707,14 @@ mod write { let writer = persist_api.open_writer().await; let sink_metrics = persist_api.open_metrics().await; - let mut state = State::new(sink_id, worker_id, writer, sink_metrics, as_of); + let mut state = State::new( + sink_id, + worker_id, + writer, + sink_metrics, + as_of, + growth_dampener, + ); loop { // Read from the inputs, extract `desired` updates as positive contributions to @@ -821,6 +833,7 @@ mod write { persist_writer: WriteHandle, metrics: SinkMetrics, as_of: Antichain, + growth_dampener: usize, ) -> Self { let worker_metrics = metrics.for_worker(worker_id); @@ -833,8 +846,8 @@ mod write { worker_id, persist_writer, corrections: OkErr::new( - Correction::new(metrics.clone(), worker_metrics.clone()), - Correction::new(metrics, worker_metrics), + Correction::new(metrics.clone(), worker_metrics.clone(), growth_dampener), + Correction::new(metrics, worker_metrics, growth_dampener), ), desired_frontiers: OkErr::new_frontiers(), persist_frontiers: OkErr::new_frontiers(),