Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate produces batch of data #31155

Merged
merged 1 commit into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/compute/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ where
CB: ContainerBuilder,
L: Into<LogVariant>,
F: for<'a> FnMut(
<B::Output as Container>::Item<'a>,
<B::Output as Container>::ItemRef<'a>,
&mut PermutedRowPacker,
&mut OutputSession<CB>,
) + 'static,
Expand All @@ -251,7 +251,7 @@ where
move |input, output| {
while let Some((time, data)) = input.next() {
let mut session = output.session_with_builder(&time);
for item in data.drain() {
for item in data.iter().flatten().flat_map(|chunk| chunk.iter()) {
logic(item, &mut packer, &mut session);
}
}
Expand Down
43 changes: 18 additions & 25 deletions src/compute/src/sink/copy_to_s3_oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::any::Any;
use std::cell::RefCell;
use std::rc::Rc;

use differential_dataflow::{AsCollection, Collection, Hashable};
use differential_dataflow::{Collection, Hashable};
use mz_compute_client::protocol::response::CopyToResponse;
use mz_compute_types::dyncfgs::{
COPY_TO_S3_ARROW_BUILDER_BUFFER_RATIO, COPY_TO_S3_MULTIPART_PART_SIZE_BYTES,
Expand All @@ -23,7 +23,6 @@ use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::errors::DataflowError;
use mz_timely_util::operator::consolidate_pact;
use timely::dataflow::channels::pact::{Exchange, Pipeline};
use timely::dataflow::operators::core::Map;
use timely::dataflow::operators::Operator;
use timely::dataflow::Scope;
use timely::progress::Antichain;
Expand Down Expand Up @@ -63,32 +62,20 @@ where
// files based on the user provided `MAX_FILE_SIZE`.
let batch_count = self.output_batch_count;

// This relies on an assumption the output order after the Exchange is deterministic, which
// is necessary to ensure the files written from each compute replica are identical.
// While this is not technically guaranteed, the current implementation uses a FIFO channel.
// In the storage copy_to operator we assert the ordering of rows to detect any regressions.
// We exchange the data according to batch, but we don't want to send the batch ID to the
// sink. The sink can re-compute the batch ID from the data.
let input = consolidate_pact::<KeyBatcher<_, _, _>, _, _>(
&sinked_collection
.map(move |row| {
let batch = row.hashed() % batch_count;
((row, batch), ())
})
.inner,
Exchange::new(move |(((_, batch), _), _, _)| *batch),
&sinked_collection.map(move |row| (row, ())).inner,
Exchange::new(move |((row, ()), _, _): &((Row, _), _, _)| row.hashed() % batch_count),
"Consolidated COPY TO S3 input",
);
// TODO: We're converting a stream of region-allocated data to a stream of vectors.
let input = input.map(Clone::clone).as_collection();

// We need to consolidate the error collection to ensure we don't act on retracted errors.
let error = consolidate_pact::<KeyBatcher<_, _, _>, _, _>(
&err_collection
.map(move |row| {
let batch = row.hashed() % batch_count;
((row, batch), ())
})
.inner,
Exchange::new(move |(((_, batch), _), _, _)| *batch),
&err_collection.map(move |err| (err, ())).inner,
Exchange::new(move |((err, _), _, _): &((DataflowError, _), _, _)| {
err.hashed() % batch_count
}),
"Consolidated COPY TO S3 errors",
);

Expand All @@ -104,9 +91,14 @@ where
while let Some((time, data)) = input.next() {
if !up_to.less_equal(time.time()) && !received_one {
received_one = true;
output
.session(&time)
.give_iterator(data.iter().next().cloned().into_iter());
output.session(&time).give_iterator(
data.iter()
.flatten()
.flat_map(|chunk| chunk.iter().cloned())
.next()
.map(|((err, ()), time, diff)| (err, time, diff))
.into_iter(),
);
}
}
}
Expand All @@ -132,6 +124,7 @@ where
self.connection_id,
params,
result_callback,
self.output_batch_count,
);

Some(token)
Expand Down
65 changes: 30 additions & 35 deletions src/storage-operators/src/s3_oneshot_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::rc::Rc;

use anyhow::anyhow;
use aws_types::sdk_config::SdkConfig;
use differential_dataflow::{Collection, Hashable};
use differential_dataflow::Hashable;
use futures::StreamExt;
use mz_ore::cast::CastFrom;
use mz_ore::error::ErrorExt;
Expand All @@ -31,6 +31,7 @@ use mz_storage_types::sinks::{S3SinkFormat, S3UploadInfo};
use mz_timely_util::builder_async::{
Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
};
use timely::container::columnation::TimelyStack;
use timely::dataflow::channels::pact::{Exchange, Pipeline};
use timely::dataflow::operators::Broadcast;
use timely::dataflow::{Scope, Stream};
Expand All @@ -54,9 +55,12 @@ mod pgcopy;
/// - completion: removes the sentinel file and calls the `worker_callback`
///
/// Returns a token that should be held to keep the sink alive.
///
/// The `input_collection` must be a stream of chains, partitioned and exchanged by the row's hash
/// modulo the number of batches.
pub fn copy_to<G, F>(
input_collection: Collection<G, ((Row, u64), ()), Diff>,
err_stream: Stream<G, (((DataflowError, u64), ()), G::Timestamp, Diff)>,
input_collection: Stream<G, Vec<TimelyStack<((Row, ()), G::Timestamp, Diff)>>>,
err_stream: Stream<G, (DataflowError, G::Timestamp, Diff)>,
up_to: Antichain<G::Timestamp>,
connection_details: S3UploadInfo,
connection_context: ConnectionContext,
Expand All @@ -65,6 +69,7 @@ pub fn copy_to<G, F>(
connection_id: CatalogItemId,
params: CopyToParameters,
worker_callback: F,
output_batch_count: u64,
) -> Rc<dyn Any>
where
G: Scope<Timestamp = Timestamp>,
Expand All @@ -89,6 +94,7 @@ where
up_to,
start_stream,
params,
output_batch_count,
),
S3SinkFormat::Parquet => render_upload_operator::<G, parquet::ParquetUploader>(
scope.clone(),
Expand All @@ -101,6 +107,7 @@ where
up_to,
start_stream,
params,
output_batch_count,
),
};

Expand Down Expand Up @@ -130,7 +137,7 @@ fn render_initialization_operator<G>(
scope: G,
sink_id: GlobalId,
up_to: Antichain<G::Timestamp>,
err_stream: Stream<G, (((DataflowError, u64), ()), G::Timestamp, Diff)>,
err_stream: Stream<G, (DataflowError, G::Timestamp, Diff)>,
) -> (Stream<G, Result<(), String>>, PressOnDropButton)
where
G: Scope<Timestamp = Timestamp>,
Expand Down Expand Up @@ -160,7 +167,7 @@ where
while let Some(event) = error_handle.next().await {
match event {
AsyncEvent::Data(cap, data) => {
for (((error, _), _), ts, _) in data {
for (error, ts, _) in data {
if !up_to.less_equal(&ts) {
start_handle.give(&cap, Err(error.to_string()));
return;
Expand Down Expand Up @@ -281,27 +288,30 @@ where
/// Returns a `completion_stream` which contains 1 event per worker of
/// the result of the upload operation, either an error or the number of rows
/// uploaded by the worker.
///
/// The `input_collection` must be a stream of chains, partitioned and exchanged by the row's hash
/// modulo the number of batches.
fn render_upload_operator<G, T>(
scope: G,
connection_context: ConnectionContext,
aws_connection: AwsConnection,
connection_id: CatalogItemId,
connection_details: S3UploadInfo,
sink_id: GlobalId,
input_collection: Collection<G, ((Row, u64), ()), Diff>,
input_collection: Stream<G, Vec<TimelyStack<((Row, ()), G::Timestamp, Diff)>>>,
up_to: Antichain<G::Timestamp>,
start_stream: Stream<G, Result<(), String>>,
params: CopyToParameters,
output_batch_count: u64,
) -> (Stream<G, Result<u64, String>>, PressOnDropButton)
where
G: Scope<Timestamp = Timestamp>,
T: CopyToS3Uploader,
{
let worker_id = scope.index();
let num_workers = scope.peers();
let mut builder = AsyncOperatorBuilder::new("CopyToS3-uploader".to_string(), scope.clone());

let mut input_handle = builder.new_disconnected_input(&input_collection.inner, Pipeline);
let mut input_handle = builder.new_disconnected_input(&input_collection, Pipeline);
let (completion_handle, completion_stream) = builder.new_output();
let mut start_handle = builder.new_input_for(&start_stream, Pipeline, &completion_handle);

Expand Down Expand Up @@ -351,31 +361,24 @@ where
}

let mut row_count = 0;
let mut last_row = None;
while let Some(event) = input_handle.next().await {
match event {
AsyncEvent::Data(_ts, data) => {
for (((row, batch), ()), ts, diff) in data {
// Check our assumption above that batches are
// always assigned to the worker with ID `batch %
// num_workers`.
if usize::cast_from(batch) % num_workers != worker_id {
anyhow::bail!(
"internal error: batch {} assigned to worker {} (expected worker {})",
batch,
worker_id,
usize::cast_from(batch) % num_workers
);
}
if !up_to.less_equal(&ts) {
if diff < 0 {
for ((row, ()), ts, diff) in
data.iter().flatten().flat_map(|chunk| chunk.iter())
{
// We're consuming a batch of data, and the upstream operator has to ensure
// that the data is exchanged according to the batch.
let batch = row.hashed() % output_batch_count;
if !up_to.less_equal(ts) {
if *diff < 0 {
anyhow::bail!(
"Invalid data in source errors, saw retractions ({}) for \
row that does not exist",
diff * -1,
*diff * -1,
)
}
row_count += u64::try_from(diff).unwrap();
row_count += u64::try_from(*diff).unwrap();
let uploader = match s3_uploaders.entry(batch) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => {
Expand All @@ -389,18 +392,10 @@ where
)?)
}
};
for _ in 0..diff {
uploader.append_row(&row).await?;
for _ in 0..*diff {
uploader.append_row(row).await?;
}
}
// A very crude way to detect if there is ever a regression in the deterministic
// ordering of rows in our input, since we are depending on an implementation
// detail of timely communication (FIFO ordering over an exchange).
let cur = (row, batch);
if let Some(last) = last_row {
assert!(&last < &cur, "broken fifo ordering!");
}
last_row = Some(cur);
}
}
AsyncEvent::Progress(frontier) => {
Expand Down
39 changes: 29 additions & 10 deletions src/timely-util/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ use timely::container::{ContainerBuilder, PushInto};
use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
use timely::dataflow::channels::pushers::Tee;
use timely::dataflow::channels::ContainerBytes;
use timely::dataflow::operators::core::Map;
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
use timely::dataflow::operators::generic::operator::{self, Operator};
use timely::dataflow::operators::generic::{InputHandleCore, OperatorInfo, OutputHandleCore};
use timely::dataflow::operators::Capability;
use timely::dataflow::{Scope, StreamCore};
use timely::dataflow::{Scope, Stream, StreamCore};
use timely::progress::{Antichain, Timestamp};
use timely::{Container, Data, PartialOrder};

Expand Down Expand Up @@ -708,7 +707,18 @@ where
h.finish()
});
consolidate_pact::<Ba, _, _>(&self.map(|k| (k, ())).inner, exchange, name)
.map(|((k, ()), time, diff)| (k.clone(), time.clone(), diff.clone()))
.unary(Pipeline, "unpack consolidated", |_, _| {
frankmcsherry marked this conversation as resolved.
Show resolved Hide resolved
|input, output| {
input.for_each(|time, data| {
let mut session = output.session(&time);
for ((k, ()), t, d) in
data.iter().flatten().flat_map(|chunk| chunk.iter())
{
session.give((k.clone(), t.clone(), d.clone()))
}
})
}
})
.as_collection()
} else {
self
Expand All @@ -730,7 +740,17 @@ where
Exchange::new(move |update: &((D1, ()), G::Timestamp, R)| (update.0).0.hashed());

consolidate_pact::<Ba, _, _>(&self.map(|k| (k, ())).inner, exchange, name)
.map(|((k, ()), time, diff)| (k.clone(), time.clone(), diff.clone()))
.unary(Pipeline, &format!("Unpack {name}"), |_, _| {
|input, output| {
input.for_each(|time, data| {
let mut session = output.session(&time);
for ((k, ()), t, d) in data.iter().flatten().flat_map(|chunk| chunk.iter())
{
session.give((k.clone(), t.clone(), d.clone()))
}
})
}
})
.as_collection()
}
}
Expand Down Expand Up @@ -770,14 +790,15 @@ where

/// Aggregates the weights of equal records into at most one record.
///
/// The data are accumulated in place, each held back until their timestamp has completed.
/// Produces a stream of chains of records, partitioned according to `pact`. The
/// data is sorted according to `Ba`. For each timestamp, it produces at most one chain.
///
/// This serves as a low-level building-block for more user-friendly functions.
/// The data are accumulated in place, each held back until their timestamp has completed.
pub fn consolidate_pact<Ba, P, G>(
stream: &StreamCore<G, Ba::Input>,
pact: P,
name: &str,
) -> StreamCore<G, Ba::Output>
) -> Stream<G, Vec<Ba::Output>>
where
G: Scope,
Ba: Batcher<Time = G::Timestamp> + 'static,
Expand Down Expand Up @@ -833,9 +854,7 @@ where
// Extract updates not in advance of `upper`.
let output =
batcher.seal::<ConsolidateBuilder<_, Ba::Output>>(upper.clone());
for mut batch in output {
session.give_container(&mut batch);
}
session.give(output);
}
}

Expand Down
Loading