Skip to content

Commit

Permalink
Slow the rate of growth in ConsolidatingVec (#31077)
Browse files Browse the repository at this point in the history
The growth of `ConsolidatingVec` was by doubling, which meant that we
can end up with substantially more memory use than we really need. This
PR attempts to make this configurable, growing by a smaller factor
determined by a positive integer `slop`. If the result of consolidation
is (1 - 1/slop)-full, we grow the vector by a factor of 1 + 1/(slop-1).
These relationships were chosen to match the 2x behavior, but it's
possible that they are imperfect.

I'm not certain how to test the impact of this, and whether we have "MV
rehydration memory" tests, but that would be the right way to see if
this is helpful. It comes at the potential expense of CPU.

### Motivation

<!--
Which of the following best describes the motivation behind this PR?

  * This PR fixes a recognized bug.

    [Ensure issue is linked somewhere.]

  * This PR adds a known-desirable feature.

    [Ensure issue is linked somewhere.]

  * This PR fixes a previously unreported bug.

    [Describe the bug in detail, as if you were filing a bug report.]

  * This PR adds a feature that has not yet been specified.

[Write a brief specification for the feature, including justification
for its inclusion in Materialize, as if you were writing the original
     feature specification.]

   * This PR refactors existing code.

[Describe what was wrong with the existing code, if it is not obvious.]
-->

### Tips for reviewer

<!--
Leave some tips for your reviewer, like:

    * The diff is much smaller if viewed with whitespace hidden.
    * [Some function/module/file] deserves extra attention.
* [Some function/module/file] is pure code movement and only needs a
skim.

Delete this section if no tips.
-->

### Checklist

- [ ] This PR has adequate test coverage / QA involvement has been duly
considered. ([trigger-ci for additional test/nightly
runs](https://trigger-ci.dev.materialize.com/))
- [ ] This PR has an associated up-to-date [design
doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md),
is a design doc
([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)),
or is sufficiently small to not require a design.
  <!-- Reference the design in the description. -->
- [ ] If this PR evolves [an existing `$T ⇔ Proto$T`
mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md)
(possibly in a backwards-incompatible way), then it is tagged with a
`T-proto` label.
- [ ] If this PR will require changes to cloud orchestration or tests,
there is a companion cloud PR to account for those changes that is
tagged with the release-blocker label
([example](MaterializeInc/cloud#5021)).
<!-- Ask in #team-cloud on Slack if you need help preparing the cloud
PR. -->
- [ ] If this PR includes major [user-facing behavior
changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note),
I have pinged the relevant PM to schedule a changelog post.
  • Loading branch information
frankmcsherry authored Jan 23, 2025
1 parent 6df1923 commit b6c1b0a
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 17 deletions.
9 changes: 9 additions & 0 deletions src/compute-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ pub const LGALLOC_SLOW_CLEAR_BYTES: Config<usize> = Config::new(
"Clear byte size per size class for every invocation",
);

/// The term `n` in the growth rate `1 + 1/(n + 1)` for `ConsolidatingVec`.
/// The smallest value `0` corresponds to the greatest allowed growth, of doubling.
pub const CONSOLIDATING_VEC_GROWTH_DAMPENER: Config<usize> = Config::new(
"consolidating_vec_growth_dampener",
0,
"Dampener in growth rate for consolidating vector size",
);

/// The number of dataflows that may hydrate concurrently.
pub const HYDRATION_CONCURRENCY: Config<usize> = Config::new(
"compute_hydration_concurrency",
Expand Down Expand Up @@ -179,4 +187,5 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&ENABLE_COMPUTE_REPLICA_EXPIRATION)
.add(&COMPUTE_REPLICA_EXPIRATION_OFFSET)
.add(&COMPUTE_APPLY_COLUMN_DEMANDS)
.add(&CONSOLIDATING_VEC_GROWTH_DAMPENER)
}
2 changes: 1 addition & 1 deletion src/compute/src/render/continual_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ impl<D: Ord> SinkState<D, Timestamp> {
SinkState {
append_times: BTreeSet::new(),
append_times_progress: Antichain::from_elem(Timestamp::minimum()),
to_append: ConsolidatingVec::with_min_capacity(128),
to_append: ConsolidatingVec::new(128, 0),
to_append_progress: Antichain::from_elem(Timestamp::minimum()),
output_progress: Antichain::from_elem(Timestamp::minimum()),
}
Expand Down
45 changes: 34 additions & 11 deletions src/compute/src/sink/correction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,24 @@ pub(super) struct Correction<D> {
metrics: SinkMetrics,
/// Per-worker persist sink metrics.
worker_metrics: SinkWorkerMetrics,
/// Configuration for `ConsolidatingVec` driving the growth rate down from doubling.
growth_dampener: usize,
}

impl<D> Correction<D> {
/// Construct a new `Correction` instance.
pub fn new(metrics: SinkMetrics, worker_metrics: SinkWorkerMetrics) -> Self {
pub fn new(
metrics: SinkMetrics,
worker_metrics: SinkWorkerMetrics,
growth_dampener: usize,
) -> Self {
Self {
updates: Default::default(),
since: Antichain::from_elem(Timestamp::MIN),
total_size: Default::default(),
metrics,
worker_metrics,
growth_dampener,
}
}

Expand Down Expand Up @@ -125,7 +132,8 @@ impl<D: Data> Correction<D> {
use std::collections::btree_map::Entry;
match self.updates.entry(time) {
Entry::Vacant(entry) => {
let vec: ConsolidatingVec<_> = data.collect();
let mut vec: ConsolidatingVec<_> = data.collect();
vec.growth_dampener = self.growth_dampener;
new_size += (vec.len(), vec.capacity());
entry.insert(vec);
}
Expand Down Expand Up @@ -304,13 +312,21 @@ pub(crate) struct ConsolidatingVec<D> {
/// A lower bound for how small we'll shrink the Vec's capacity. NB: The cap
/// might start smaller than this.
min_capacity: usize,
/// Dampener in the growth rate. 0 corresponds to doubling and in general `n` to `1+1/(n+1)`.
///
/// If consolidation didn't free enough space, at least a linear amount, increase the capacity
/// Setting this to 0 results in doubling whenever the list is at least half full.
/// Larger numbers result in more conservative approaches that use more CPU, but less memory.
growth_dampener: usize,
}

impl<D: Ord> ConsolidatingVec<D> {
pub fn with_min_capacity(min_capacity: usize) -> Self {
/// Creates a new instance from the necessary configuration arguments.
pub fn new(min_capacity: usize, growth_dampener: usize) -> Self {
ConsolidatingVec {
data: Vec::new(),
min_capacity,
growth_dampener,
}
}

Expand All @@ -326,21 +342,27 @@ impl<D: Ord> ConsolidatingVec<D> {

/// Pushes `item` into the vector.
///
/// If the vector does not have sufficient capacity, we try to consolidate and/or double its
/// capacity.
/// If the vector does not have sufficient capacity, we'll first consolidate and then increase
/// its capacity if the consolidated results still occupy a significant fraction of the vector.
///
/// The worst-case cost of this function is O(n log n) in the number of items the vector stores,
/// but amortizes to O(1).
/// but amortizes to O(log n).
pub fn push(&mut self, item: (D, Diff)) {
let capacity = self.data.capacity();
if self.data.len() == capacity {
// The vector is full. First, consolidate to try to recover some space.
self.consolidate();

// If consolidation didn't free at least half the available capacity, double the
// capacity. This ensures we won't consolidate over and over again with small gains.
if self.data.len() > capacity / 2 {
self.data.reserve(capacity);
// We may need more capacity if our current capacity is within `1+1/(n+1)` of the length.
// This corresponds to `cap < len + len/(n+1)`, which is the logic we use.
let length = self.data.len();
let dampener = self.growth_dampener;
if capacity < length + length / (dampener + 1) {
// We would like to increase the capacity by a factor of `1+1/(n+1)`, which involves
// determining the target capacity, and then reserving an amount that achieves this
// while working around the existing length.
let new_cap = capacity + capacity / (dampener + 1);
self.data.reserve_exact(new_cap - length);
}
}

Expand All @@ -352,7 +374,7 @@ impl<D: Ord> ConsolidatingVec<D> {
consolidate(&mut self.data);

// We may have the opportunity to reclaim allocated memory.
// Given that `push` will double the capacity when the vector is more than half full, and
// Given that `push` will at most double the capacity when the vector is more than half full, and
// we want to avoid entering into a resizing cycle, we choose to only shrink if the
// vector's length is less than one fourth of its capacity.
if self.data.len() < self.data.capacity() / 4 {
Expand Down Expand Up @@ -388,6 +410,7 @@ impl<D> FromIterator<(D, Diff)> for ConsolidatingVec<D> {
Self {
data: Vec::from_iter(iter),
min_capacity: 0,
growth_dampener: 0,
}
}
}
Expand Down
14 changes: 12 additions & 2 deletions src/compute/src/sink/materialized_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::{Collection, Hashable};
use futures::StreamExt;
use mz_compute_types::dyncfgs::CONSOLIDATING_VEC_GROWTH_DAMPENER;
use mz_compute_types::dyncfgs::ENABLE_MATERIALIZED_VIEW_SINK_V2;
use mz_compute_types::sinks::{ComputeSinkDesc, MaterializedViewSinkConnection};
use mz_ore::cast::CastFrom;
Expand Down Expand Up @@ -222,6 +223,8 @@ where
compute_state,
);

let growth_dampener = CONSOLIDATING_VEC_GROWTH_DAMPENER.get(&compute_state.worker_config);

let (written_batches, write_token) = write_batches(
sink_id.clone(),
operator_name.clone(),
Expand All @@ -233,6 +236,7 @@ where
&persist_errs,
Arc::clone(&persist_clients),
compute_state.read_only_rx.clone(),
growth_dampener,
);

let append_token = append_batches(
Expand Down Expand Up @@ -608,6 +612,7 @@ fn write_batches<G>(
persist_errs: &Stream<G, (DataflowError, Timestamp, Diff)>,
persist_clients: Arc<PersistClientCache>,
mut read_only: watch::Receiver<bool>,
growth_dampener: usize,
) -> (Stream<G, ProtoBatch>, Rc<dyn Any>)
where
G: Scope<Timestamp = Timestamp>,
Expand Down Expand Up @@ -667,8 +672,13 @@ where
// Contains `desired - persist`, reflecting the updates we would like to commit
// to `persist` in order to "correct" it to track `desired`. These collections are
// only modified by updates received from either the `desired` or `persist` inputs.
let mut correction_oks = Correction::new(sink_metrics.clone(), sink_worker_metrics.clone());
let mut correction_errs = Correction::new(sink_metrics, sink_worker_metrics);
let mut correction_oks = Correction::new(
sink_metrics.clone(),
sink_worker_metrics.clone(),
growth_dampener,
);
let mut correction_errs =
Correction::new(sink_metrics, sink_worker_metrics, growth_dampener);

// Contains descriptions of batches for which we know that we can
// write data. We got these from the "centralized" operator that
Expand Down
19 changes: 16 additions & 3 deletions src/compute/src/sink/materialized_view_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ use std::sync::Arc;

use differential_dataflow::{Collection, Hashable};
use futures::StreamExt;
use mz_compute_types::dyncfgs::CONSOLIDATING_VEC_GROWTH_DAMPENER;
use mz_ore::cast::CastFrom;
use mz_persist_client::batch::{Batch, ProtoBatch};
use mz_persist_client::cache::PersistClientCache;
Expand Down Expand Up @@ -204,13 +205,16 @@ where
&desired,
);

let growth_dampener = CONSOLIDATING_VEC_GROWTH_DAMPENER.get(&compute_state.worker_config);

let (batches, write_token) = write::render(
sink_id,
persist_api.clone(),
as_of,
&desired,
&persist,
&descs,
growth_dampener,
);

let append_token = append::render(sink_id, persist_api, active_worker_id, &descs, &batches);
Expand Down Expand Up @@ -668,6 +672,7 @@ mod write {
desired: &DesiredStreams<S>,
persist: &PersistStreams<S>,
descs: &Stream<S, BatchDescription>,
growth_dampener: usize,
) -> (BatchesStream<S>, PressOnDropButton)
where
S: Scope<Timestamp = Timestamp>,
Expand Down Expand Up @@ -702,7 +707,14 @@ mod write {

let writer = persist_api.open_writer().await;
let sink_metrics = persist_api.open_metrics().await;
let mut state = State::new(sink_id, worker_id, writer, sink_metrics, as_of);
let mut state = State::new(
sink_id,
worker_id,
writer,
sink_metrics,
as_of,
growth_dampener,
);

loop {
// Read from the inputs, extract `desired` updates as positive contributions to
Expand Down Expand Up @@ -821,6 +833,7 @@ mod write {
persist_writer: WriteHandle<SourceData, (), Timestamp, Diff>,
metrics: SinkMetrics,
as_of: Antichain<Timestamp>,
growth_dampener: usize,
) -> Self {
let worker_metrics = metrics.for_worker(worker_id);

Expand All @@ -833,8 +846,8 @@ mod write {
worker_id,
persist_writer,
corrections: OkErr::new(
Correction::new(metrics.clone(), worker_metrics.clone()),
Correction::new(metrics, worker_metrics),
Correction::new(metrics.clone(), worker_metrics.clone(), growth_dampener),
Correction::new(metrics, worker_metrics, growth_dampener),
),
desired_frontiers: OkErr::new_frontiers(),
persist_frontiers: OkErr::new_frontiers(),
Expand Down

0 comments on commit b6c1b0a

Please sign in to comment.