diff --git a/src/compute/src/logging.rs b/src/compute/src/logging.rs index 17c9d04279a49..23a9a18f02f6f 100644 --- a/src/compute/src/logging.rs +++ b/src/compute/src/logging.rs @@ -236,7 +236,7 @@ where CB: ContainerBuilder, L: Into, F: for<'a> FnMut( - ::Item<'a>, + ::ItemRef<'a>, &mut PermutedRowPacker, &mut OutputSession, ) + 'static, @@ -251,7 +251,7 @@ where move |input, output| { while let Some((time, data)) = input.next() { let mut session = output.session_with_builder(&time); - for item in data.drain() { + for item in data.iter().flatten().flat_map(|chunk| chunk.iter()) { logic(item, &mut packer, &mut session); } } diff --git a/src/compute/src/sink/copy_to_s3_oneshot.rs b/src/compute/src/sink/copy_to_s3_oneshot.rs index e338e4ebf628a..8e638c330a2d0 100644 --- a/src/compute/src/sink/copy_to_s3_oneshot.rs +++ b/src/compute/src/sink/copy_to_s3_oneshot.rs @@ -11,7 +11,7 @@ use std::any::Any; use std::cell::RefCell; use std::rc::Rc; -use differential_dataflow::{AsCollection, Collection, Hashable}; +use differential_dataflow::{Collection, Hashable}; use mz_compute_client::protocol::response::CopyToResponse; use mz_compute_types::dyncfgs::{ COPY_TO_S3_ARROW_BUILDER_BUFFER_RATIO, COPY_TO_S3_MULTIPART_PART_SIZE_BYTES, @@ -23,7 +23,6 @@ use mz_storage_types::controller::CollectionMetadata; use mz_storage_types::errors::DataflowError; use mz_timely_util::operator::consolidate_pact; use timely::dataflow::channels::pact::{Exchange, Pipeline}; -use timely::dataflow::operators::core::Map; use timely::dataflow::operators::Operator; use timely::dataflow::Scope; use timely::progress::Antichain; @@ -63,32 +62,20 @@ where // files based on the user provided `MAX_FILE_SIZE`. let batch_count = self.output_batch_count; - // This relies on an assumption the output order after the Exchange is deterministic, which - // is necessary to ensure the files written from each compute replica are identical. - // While this is not technically guaranteed, the current implementation uses a FIFO channel. - // In the storage copy_to operator we assert the ordering of rows to detect any regressions. + // We exchange the data according to batch, but we don't want to send the batch ID to the + // sink. The sink can re-compute the batch ID from the data. let input = consolidate_pact::, _, _>( - &sinked_collection - .map(move |row| { - let batch = row.hashed() % batch_count; - ((row, batch), ()) - }) - .inner, - Exchange::new(move |(((_, batch), _), _, _)| *batch), + &sinked_collection.map(move |row| (row, ())).inner, + Exchange::new(move |((row, ()), _, _): &((Row, _), _, _)| row.hashed() % batch_count), "Consolidated COPY TO S3 input", ); - // TODO: We're converting a stream of region-allocated data to a stream of vectors. - let input = input.map(Clone::clone).as_collection(); // We need to consolidate the error collection to ensure we don't act on retracted errors. let error = consolidate_pact::, _, _>( - &err_collection - .map(move |row| { - let batch = row.hashed() % batch_count; - ((row, batch), ()) - }) - .inner, - Exchange::new(move |(((_, batch), _), _, _)| *batch), + &err_collection.map(move |err| (err, ())).inner, + Exchange::new(move |((err, _), _, _): &((DataflowError, _), _, _)| { + err.hashed() % batch_count + }), "Consolidated COPY TO S3 errors", ); @@ -104,9 +91,14 @@ where while let Some((time, data)) = input.next() { if !up_to.less_equal(time.time()) && !received_one { received_one = true; - output - .session(&time) - .give_iterator(data.iter().next().cloned().into_iter()); + output.session(&time).give_iterator( + data.iter() + .flatten() + .flat_map(|chunk| chunk.iter().cloned()) + .next() + .map(|((err, ()), time, diff)| (err, time, diff)) + .into_iter(), + ); } } } @@ -132,6 +124,7 @@ where self.connection_id, params, result_callback, + self.output_batch_count, ); Some(token) diff --git a/src/storage-operators/src/s3_oneshot_sink.rs b/src/storage-operators/src/s3_oneshot_sink.rs index ca5dec42984a3..67f60776d8a5d 100644 --- a/src/storage-operators/src/s3_oneshot_sink.rs +++ b/src/storage-operators/src/s3_oneshot_sink.rs @@ -16,7 +16,7 @@ use std::rc::Rc; use anyhow::anyhow; use aws_types::sdk_config::SdkConfig; -use differential_dataflow::{Collection, Hashable}; +use differential_dataflow::Hashable; use futures::StreamExt; use mz_ore::cast::CastFrom; use mz_ore::error::ErrorExt; @@ -31,6 +31,7 @@ use mz_storage_types::sinks::{S3SinkFormat, S3UploadInfo}; use mz_timely_util::builder_async::{ Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton, }; +use timely::container::columnation::TimelyStack; use timely::dataflow::channels::pact::{Exchange, Pipeline}; use timely::dataflow::operators::Broadcast; use timely::dataflow::{Scope, Stream}; @@ -54,9 +55,12 @@ mod pgcopy; /// - completion: removes the sentinel file and calls the `worker_callback` /// /// Returns a token that should be held to keep the sink alive. +/// +/// The `input_collection` must be a stream of chains, partitioned and exchanged by the row's hash +/// modulo the number of batches. pub fn copy_to( - input_collection: Collection, - err_stream: Stream, + input_collection: Stream>>, + err_stream: Stream, up_to: Antichain, connection_details: S3UploadInfo, connection_context: ConnectionContext, @@ -65,6 +69,7 @@ pub fn copy_to( connection_id: CatalogItemId, params: CopyToParameters, worker_callback: F, + output_batch_count: u64, ) -> Rc where G: Scope, @@ -89,6 +94,7 @@ where up_to, start_stream, params, + output_batch_count, ), S3SinkFormat::Parquet => render_upload_operator::( scope.clone(), @@ -101,6 +107,7 @@ where up_to, start_stream, params, + output_batch_count, ), }; @@ -130,7 +137,7 @@ fn render_initialization_operator( scope: G, sink_id: GlobalId, up_to: Antichain, - err_stream: Stream, + err_stream: Stream, ) -> (Stream>, PressOnDropButton) where G: Scope, @@ -160,7 +167,7 @@ where while let Some(event) = error_handle.next().await { match event { AsyncEvent::Data(cap, data) => { - for (((error, _), _), ts, _) in data { + for (error, ts, _) in data { if !up_to.less_equal(&ts) { start_handle.give(&cap, Err(error.to_string())); return; @@ -281,6 +288,9 @@ where /// Returns a `completion_stream` which contains 1 event per worker of /// the result of the upload operation, either an error or the number of rows /// uploaded by the worker. +/// +/// The `input_collection` must be a stream of chains, partitioned and exchanged by the row's hash +/// modulo the number of batches. fn render_upload_operator( scope: G, connection_context: ConnectionContext, @@ -288,20 +298,20 @@ fn render_upload_operator( connection_id: CatalogItemId, connection_details: S3UploadInfo, sink_id: GlobalId, - input_collection: Collection, + input_collection: Stream>>, up_to: Antichain, start_stream: Stream>, params: CopyToParameters, + output_batch_count: u64, ) -> (Stream>, PressOnDropButton) where G: Scope, T: CopyToS3Uploader, { let worker_id = scope.index(); - let num_workers = scope.peers(); let mut builder = AsyncOperatorBuilder::new("CopyToS3-uploader".to_string(), scope.clone()); - let mut input_handle = builder.new_disconnected_input(&input_collection.inner, Pipeline); + let mut input_handle = builder.new_disconnected_input(&input_collection, Pipeline); let (completion_handle, completion_stream) = builder.new_output(); let mut start_handle = builder.new_input_for(&start_stream, Pipeline, &completion_handle); @@ -351,31 +361,24 @@ where } let mut row_count = 0; - let mut last_row = None; while let Some(event) = input_handle.next().await { match event { AsyncEvent::Data(_ts, data) => { - for (((row, batch), ()), ts, diff) in data { - // Check our assumption above that batches are - // always assigned to the worker with ID `batch % - // num_workers`. - if usize::cast_from(batch) % num_workers != worker_id { - anyhow::bail!( - "internal error: batch {} assigned to worker {} (expected worker {})", - batch, - worker_id, - usize::cast_from(batch) % num_workers - ); - } - if !up_to.less_equal(&ts) { - if diff < 0 { + for ((row, ()), ts, diff) in + data.iter().flatten().flat_map(|chunk| chunk.iter()) + { + // We're consuming a batch of data, and the upstream operator has to ensure + // that the data is exchanged according to the batch. + let batch = row.hashed() % output_batch_count; + if !up_to.less_equal(ts) { + if *diff < 0 { anyhow::bail!( "Invalid data in source errors, saw retractions ({}) for \ row that does not exist", - diff * -1, + *diff * -1, ) } - row_count += u64::try_from(diff).unwrap(); + row_count += u64::try_from(*diff).unwrap(); let uploader = match s3_uploaders.entry(batch) { Entry::Occupied(entry) => entry.into_mut(), Entry::Vacant(entry) => { @@ -389,18 +392,10 @@ where )?) } }; - for _ in 0..diff { - uploader.append_row(&row).await?; + for _ in 0..*diff { + uploader.append_row(row).await?; } } - // A very crude way to detect if there is ever a regression in the deterministic - // ordering of rows in our input, since we are depending on an implementation - // detail of timely communication (FIFO ordering over an exchange). - let cur = (row, batch); - if let Some(last) = last_row { - assert!(&last < &cur, "broken fifo ordering!"); - } - last_row = Some(cur); } } AsyncEvent::Progress(frontier) => { diff --git a/src/timely-util/src/operator.rs b/src/timely-util/src/operator.rs index 8254974de00af..cc6894d7a28c2 100644 --- a/src/timely-util/src/operator.rs +++ b/src/timely-util/src/operator.rs @@ -25,12 +25,11 @@ use timely::container::{ContainerBuilder, PushInto}; use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline}; use timely::dataflow::channels::pushers::Tee; use timely::dataflow::channels::ContainerBytes; -use timely::dataflow::operators::core::Map; use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc; use timely::dataflow::operators::generic::operator::{self, Operator}; use timely::dataflow::operators::generic::{InputHandleCore, OperatorInfo, OutputHandleCore}; use timely::dataflow::operators::Capability; -use timely::dataflow::{Scope, StreamCore}; +use timely::dataflow::{Scope, Stream, StreamCore}; use timely::progress::{Antichain, Timestamp}; use timely::{Container, Data, PartialOrder}; @@ -708,7 +707,18 @@ where h.finish() }); consolidate_pact::(&self.map(|k| (k, ())).inner, exchange, name) - .map(|((k, ()), time, diff)| (k.clone(), time.clone(), diff.clone())) + .unary(Pipeline, "unpack consolidated", |_, _| { + |input, output| { + input.for_each(|time, data| { + let mut session = output.session(&time); + for ((k, ()), t, d) in + data.iter().flatten().flat_map(|chunk| chunk.iter()) + { + session.give((k.clone(), t.clone(), d.clone())) + } + }) + } + }) .as_collection() } else { self @@ -730,7 +740,17 @@ where Exchange::new(move |update: &((D1, ()), G::Timestamp, R)| (update.0).0.hashed()); consolidate_pact::(&self.map(|k| (k, ())).inner, exchange, name) - .map(|((k, ()), time, diff)| (k.clone(), time.clone(), diff.clone())) + .unary(Pipeline, &format!("Unpack {name}"), |_, _| { + |input, output| { + input.for_each(|time, data| { + let mut session = output.session(&time); + for ((k, ()), t, d) in data.iter().flatten().flat_map(|chunk| chunk.iter()) + { + session.give((k.clone(), t.clone(), d.clone())) + } + }) + } + }) .as_collection() } } @@ -770,14 +790,15 @@ where /// Aggregates the weights of equal records into at most one record. /// -/// The data are accumulated in place, each held back until their timestamp has completed. +/// Produces a stream of chains of records, partitioned according to `pact`. The +/// data is sorted according to `Ba`. For each timestamp, it produces at most one chain. /// -/// This serves as a low-level building-block for more user-friendly functions. +/// The data are accumulated in place, each held back until their timestamp has completed. pub fn consolidate_pact( stream: &StreamCore, pact: P, name: &str, -) -> StreamCore +) -> Stream> where G: Scope, Ba: Batcher