Skip to content

Commit

Permalink
Consolidate produces batch of data (#31155)
Browse files Browse the repository at this point in the history
Instead of flattening the outputs of consolidate, provide the output as
a
whole batch, encoded as an `Stream<_, Vec<C>>`. This allows downstream
operators
to act on a whole batch at once, without relying on undocumented
properties
of how Timely channels behave. For operators that need to flatten the
output need slightly different logic.

### Checklist

- [ ] This PR has adequate test coverage / QA involvement has been duly
considered. ([trigger-ci for additional test/nightly
runs](https://trigger-ci.dev.materialize.com/))
- [ ] This PR has an associated up-to-date [design
doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md),
is a design doc
([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)),
or is sufficiently small to not require a design.
  <!-- Reference the design in the description. -->
- [ ] If this PR evolves [an existing `$T ⇔ Proto$T`
mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md)
(possibly in a backwards-incompatible way), then it is tagged with a
`T-proto` label.
- [ ] If this PR will require changes to cloud orchestration or tests,
there is a companion cloud PR to account for those changes that is
tagged with the release-blocker label
([example](MaterializeInc/cloud#5021)).
<!-- Ask in #team-cloud on Slack if you need help preparing the cloud
PR. -->
- [ ] If this PR includes major [user-facing behavior
changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note),
I have pinged the relevant PM to schedule a changelog post.

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru authored Jan 27, 2025
1 parent 59ba5f2 commit 4242bb3
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 72 deletions.
4 changes: 2 additions & 2 deletions src/compute/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ where
CB: ContainerBuilder,
L: Into<LogVariant>,
F: for<'a> FnMut(
<B::Output as Container>::Item<'a>,
<B::Output as Container>::ItemRef<'a>,
&mut PermutedRowPacker,
&mut OutputSession<CB>,
) + 'static,
Expand All @@ -251,7 +251,7 @@ where
move |input, output| {
while let Some((time, data)) = input.next() {
let mut session = output.session_with_builder(&time);
for item in data.drain() {
for item in data.iter().flatten().flat_map(|chunk| chunk.iter()) {
logic(item, &mut packer, &mut session);
}
}
Expand Down
43 changes: 18 additions & 25 deletions src/compute/src/sink/copy_to_s3_oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::any::Any;
use std::cell::RefCell;
use std::rc::Rc;

use differential_dataflow::{AsCollection, Collection, Hashable};
use differential_dataflow::{Collection, Hashable};
use mz_compute_client::protocol::response::CopyToResponse;
use mz_compute_types::dyncfgs::{
COPY_TO_S3_ARROW_BUILDER_BUFFER_RATIO, COPY_TO_S3_MULTIPART_PART_SIZE_BYTES,
Expand All @@ -23,7 +23,6 @@ use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::errors::DataflowError;
use mz_timely_util::operator::consolidate_pact;
use timely::dataflow::channels::pact::{Exchange, Pipeline};
use timely::dataflow::operators::core::Map;
use timely::dataflow::operators::Operator;
use timely::dataflow::Scope;
use timely::progress::Antichain;
Expand Down Expand Up @@ -63,32 +62,20 @@ where
// files based on the user provided `MAX_FILE_SIZE`.
let batch_count = self.output_batch_count;

// This relies on an assumption the output order after the Exchange is deterministic, which
// is necessary to ensure the files written from each compute replica are identical.
// While this is not technically guaranteed, the current implementation uses a FIFO channel.
// In the storage copy_to operator we assert the ordering of rows to detect any regressions.
// We exchange the data according to batch, but we don't want to send the batch ID to the
// sink. The sink can re-compute the batch ID from the data.
let input = consolidate_pact::<KeyBatcher<_, _, _>, _, _>(
&sinked_collection
.map(move |row| {
let batch = row.hashed() % batch_count;
((row, batch), ())
})
.inner,
Exchange::new(move |(((_, batch), _), _, _)| *batch),
&sinked_collection.map(move |row| (row, ())).inner,
Exchange::new(move |((row, ()), _, _): &((Row, _), _, _)| row.hashed() % batch_count),
"Consolidated COPY TO S3 input",
);
// TODO: We're converting a stream of region-allocated data to a stream of vectors.
let input = input.map(Clone::clone).as_collection();

// We need to consolidate the error collection to ensure we don't act on retracted errors.
let error = consolidate_pact::<KeyBatcher<_, _, _>, _, _>(
&err_collection
.map(move |row| {
let batch = row.hashed() % batch_count;
((row, batch), ())
})
.inner,
Exchange::new(move |(((_, batch), _), _, _)| *batch),
&err_collection.map(move |err| (err, ())).inner,
Exchange::new(move |((err, _), _, _): &((DataflowError, _), _, _)| {
err.hashed() % batch_count
}),
"Consolidated COPY TO S3 errors",
);

Expand All @@ -104,9 +91,14 @@ where
while let Some((time, data)) = input.next() {
if !up_to.less_equal(time.time()) && !received_one {
received_one = true;
output
.session(&time)
.give_iterator(data.iter().next().cloned().into_iter());
output.session(&time).give_iterator(
data.iter()
.flatten()
.flat_map(|chunk| chunk.iter().cloned())
.next()
.map(|((err, ()), time, diff)| (err, time, diff))
.into_iter(),
);
}
}
}
Expand All @@ -132,6 +124,7 @@ where
self.connection_id,
params,
result_callback,
self.output_batch_count,
);

Some(token)
Expand Down
65 changes: 30 additions & 35 deletions src/storage-operators/src/s3_oneshot_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::rc::Rc;

use anyhow::anyhow;
use aws_types::sdk_config::SdkConfig;
use differential_dataflow::{Collection, Hashable};
use differential_dataflow::Hashable;
use futures::StreamExt;
use mz_ore::cast::CastFrom;
use mz_ore::error::ErrorExt;
Expand All @@ -31,6 +31,7 @@ use mz_storage_types::sinks::{S3SinkFormat, S3UploadInfo};
use mz_timely_util::builder_async::{
Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
};
use timely::container::columnation::TimelyStack;
use timely::dataflow::channels::pact::{Exchange, Pipeline};
use timely::dataflow::operators::Broadcast;
use timely::dataflow::{Scope, Stream};
Expand All @@ -54,9 +55,12 @@ mod pgcopy;
/// - completion: removes the sentinel file and calls the `worker_callback`
///
/// Returns a token that should be held to keep the sink alive.
///
/// The `input_collection` must be a stream of chains, partitioned and exchanged by the row's hash
/// modulo the number of batches.
pub fn copy_to<G, F>(
input_collection: Collection<G, ((Row, u64), ()), Diff>,
err_stream: Stream<G, (((DataflowError, u64), ()), G::Timestamp, Diff)>,
input_collection: Stream<G, Vec<TimelyStack<((Row, ()), G::Timestamp, Diff)>>>,
err_stream: Stream<G, (DataflowError, G::Timestamp, Diff)>,
up_to: Antichain<G::Timestamp>,
connection_details: S3UploadInfo,
connection_context: ConnectionContext,
Expand All @@ -65,6 +69,7 @@ pub fn copy_to<G, F>(
connection_id: CatalogItemId,
params: CopyToParameters,
worker_callback: F,
output_batch_count: u64,
) -> Rc<dyn Any>
where
G: Scope<Timestamp = Timestamp>,
Expand All @@ -89,6 +94,7 @@ where
up_to,
start_stream,
params,
output_batch_count,
),
S3SinkFormat::Parquet => render_upload_operator::<G, parquet::ParquetUploader>(
scope.clone(),
Expand All @@ -101,6 +107,7 @@ where
up_to,
start_stream,
params,
output_batch_count,
),
};

Expand Down Expand Up @@ -130,7 +137,7 @@ fn render_initialization_operator<G>(
scope: G,
sink_id: GlobalId,
up_to: Antichain<G::Timestamp>,
err_stream: Stream<G, (((DataflowError, u64), ()), G::Timestamp, Diff)>,
err_stream: Stream<G, (DataflowError, G::Timestamp, Diff)>,
) -> (Stream<G, Result<(), String>>, PressOnDropButton)
where
G: Scope<Timestamp = Timestamp>,
Expand Down Expand Up @@ -160,7 +167,7 @@ where
while let Some(event) = error_handle.next().await {
match event {
AsyncEvent::Data(cap, data) => {
for (((error, _), _), ts, _) in data {
for (error, ts, _) in data {
if !up_to.less_equal(&ts) {
start_handle.give(&cap, Err(error.to_string()));
return;
Expand Down Expand Up @@ -281,27 +288,30 @@ where
/// Returns a `completion_stream` which contains 1 event per worker of
/// the result of the upload operation, either an error or the number of rows
/// uploaded by the worker.
///
/// The `input_collection` must be a stream of chains, partitioned and exchanged by the row's hash
/// modulo the number of batches.
fn render_upload_operator<G, T>(
scope: G,
connection_context: ConnectionContext,
aws_connection: AwsConnection,
connection_id: CatalogItemId,
connection_details: S3UploadInfo,
sink_id: GlobalId,
input_collection: Collection<G, ((Row, u64), ()), Diff>,
input_collection: Stream<G, Vec<TimelyStack<((Row, ()), G::Timestamp, Diff)>>>,
up_to: Antichain<G::Timestamp>,
start_stream: Stream<G, Result<(), String>>,
params: CopyToParameters,
output_batch_count: u64,
) -> (Stream<G, Result<u64, String>>, PressOnDropButton)
where
G: Scope<Timestamp = Timestamp>,
T: CopyToS3Uploader,
{
let worker_id = scope.index();
let num_workers = scope.peers();
let mut builder = AsyncOperatorBuilder::new("CopyToS3-uploader".to_string(), scope.clone());

let mut input_handle = builder.new_disconnected_input(&input_collection.inner, Pipeline);
let mut input_handle = builder.new_disconnected_input(&input_collection, Pipeline);
let (completion_handle, completion_stream) = builder.new_output();
let mut start_handle = builder.new_input_for(&start_stream, Pipeline, &completion_handle);

Expand Down Expand Up @@ -351,31 +361,24 @@ where
}

let mut row_count = 0;
let mut last_row = None;
while let Some(event) = input_handle.next().await {
match event {
AsyncEvent::Data(_ts, data) => {
for (((row, batch), ()), ts, diff) in data {
// Check our assumption above that batches are
// always assigned to the worker with ID `batch %
// num_workers`.
if usize::cast_from(batch) % num_workers != worker_id {
anyhow::bail!(
"internal error: batch {} assigned to worker {} (expected worker {})",
batch,
worker_id,
usize::cast_from(batch) % num_workers
);
}
if !up_to.less_equal(&ts) {
if diff < 0 {
for ((row, ()), ts, diff) in
data.iter().flatten().flat_map(|chunk| chunk.iter())
{
// We're consuming a batch of data, and the upstream operator has to ensure
// that the data is exchanged according to the batch.
let batch = row.hashed() % output_batch_count;
if !up_to.less_equal(ts) {
if *diff < 0 {
anyhow::bail!(
"Invalid data in source errors, saw retractions ({}) for \
row that does not exist",
diff * -1,
*diff * -1,
)
}
row_count += u64::try_from(diff).unwrap();
row_count += u64::try_from(*diff).unwrap();
let uploader = match s3_uploaders.entry(batch) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => {
Expand All @@ -389,18 +392,10 @@ where
)?)
}
};
for _ in 0..diff {
uploader.append_row(&row).await?;
for _ in 0..*diff {
uploader.append_row(row).await?;
}
}
// A very crude way to detect if there is ever a regression in the deterministic
// ordering of rows in our input, since we are depending on an implementation
// detail of timely communication (FIFO ordering over an exchange).
let cur = (row, batch);
if let Some(last) = last_row {
assert!(&last < &cur, "broken fifo ordering!");
}
last_row = Some(cur);
}
}
AsyncEvent::Progress(frontier) => {
Expand Down
39 changes: 29 additions & 10 deletions src/timely-util/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ use timely::container::{ContainerBuilder, PushInto};
use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
use timely::dataflow::channels::pushers::Tee;
use timely::dataflow::channels::ContainerBytes;
use timely::dataflow::operators::core::Map;
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
use timely::dataflow::operators::generic::operator::{self, Operator};
use timely::dataflow::operators::generic::{InputHandleCore, OperatorInfo, OutputHandleCore};
use timely::dataflow::operators::Capability;
use timely::dataflow::{Scope, StreamCore};
use timely::dataflow::{Scope, Stream, StreamCore};
use timely::progress::{Antichain, Timestamp};
use timely::{Container, Data, PartialOrder};

Expand Down Expand Up @@ -708,7 +707,18 @@ where
h.finish()
});
consolidate_pact::<Ba, _, _>(&self.map(|k| (k, ())).inner, exchange, name)
.map(|((k, ()), time, diff)| (k.clone(), time.clone(), diff.clone()))
.unary(Pipeline, "unpack consolidated", |_, _| {
|input, output| {
input.for_each(|time, data| {
let mut session = output.session(&time);
for ((k, ()), t, d) in
data.iter().flatten().flat_map(|chunk| chunk.iter())
{
session.give((k.clone(), t.clone(), d.clone()))
}
})
}
})
.as_collection()
} else {
self
Expand All @@ -730,7 +740,17 @@ where
Exchange::new(move |update: &((D1, ()), G::Timestamp, R)| (update.0).0.hashed());

consolidate_pact::<Ba, _, _>(&self.map(|k| (k, ())).inner, exchange, name)
.map(|((k, ()), time, diff)| (k.clone(), time.clone(), diff.clone()))
.unary(Pipeline, &format!("Unpack {name}"), |_, _| {
|input, output| {
input.for_each(|time, data| {
let mut session = output.session(&time);
for ((k, ()), t, d) in data.iter().flatten().flat_map(|chunk| chunk.iter())
{
session.give((k.clone(), t.clone(), d.clone()))
}
})
}
})
.as_collection()
}
}
Expand Down Expand Up @@ -770,14 +790,15 @@ where

/// Aggregates the weights of equal records into at most one record.
///
/// The data are accumulated in place, each held back until their timestamp has completed.
/// Produces a stream of chains of records, partitioned according to `pact`. The
/// data is sorted according to `Ba`. For each timestamp, it produces at most one chain.
///
/// This serves as a low-level building-block for more user-friendly functions.
/// The data are accumulated in place, each held back until their timestamp has completed.
pub fn consolidate_pact<Ba, P, G>(
stream: &StreamCore<G, Ba::Input>,
pact: P,
name: &str,
) -> StreamCore<G, Ba::Output>
) -> Stream<G, Vec<Ba::Output>>
where
G: Scope,
Ba: Batcher<Time = G::Timestamp> + 'static,
Expand Down Expand Up @@ -833,9 +854,7 @@ where
// Extract updates not in advance of `upper`.
let output =
batcher.seal::<ConsolidateBuilder<_, Ba::Output>>(upper.clone());
for mut batch in output {
session.give_container(&mut batch);
}
session.give(output);
}
}

Expand Down

0 comments on commit 4242bb3

Please sign in to comment.