diff --git a/src/adapter/src/active_compute_sink.rs b/src/adapter/src/active_compute_sink.rs index 7f151abb78e34..74b2cff630e04 100644 --- a/src/adapter/src/active_compute_sink.rs +++ b/src/adapter/src/active_compute_sink.rs @@ -29,7 +29,7 @@ use tokio::sync::{mpsc, oneshot}; use uuid::Uuid; use crate::coord::peek::PeekResponseUnary; -use crate::{AdapterError, ExecuteResponse}; +use crate::{AdapterError, ExecuteContext, ExecuteResponse}; #[derive(Debug)] /// A description of an active compute sink from the coordinator's perspective. @@ -435,3 +435,12 @@ impl ActiveCopyTo { let _ = self.tx.send(message); } } + +/// State we keep in the `Coordinator` to track active `COPY FROM` statements. +#[derive(Debug)] +pub(crate) struct ActiveCopyFrom { + /// ID of the ingestion running in clusterd. + pub ingestion_id: uuid::Uuid, + /// Context of the SQL session that ran the statement. + pub ctx: ExecuteContext, +} diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 2f26cff656e02..6c36a1dc00b29 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -170,7 +170,7 @@ use tracing::{debug, info, info_span, span, warn, Instrument, Level, Span}; use tracing_opentelemetry::OpenTelemetrySpanExt; use uuid::Uuid; -use crate::active_compute_sink::ActiveComputeSink; +use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom}; use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult}; use crate::client::{Client, Handle}; use crate::command::{Command, ExecuteResponse}; @@ -1667,7 +1667,7 @@ pub struct Coordinator { active_webhooks: BTreeMap, /// A map of active `COPY FROM` statements. The Coordinator waits for `clusterd` /// to stage Batches in Persist that we will then link into the shard. - active_copies: BTreeMap, + active_copies: BTreeMap, /// A map from connection ids to a watch channel that is set to `true` if the connection /// received a cancel request. diff --git a/src/adapter/src/coord/sequencer/inner/copy_from.rs b/src/adapter/src/coord/sequencer/inner/copy_from.rs index 58b80e3e378e6..721ae071f4455 100644 --- a/src/adapter/src/coord/sequencer/inner/copy_from.rs +++ b/src/adapter/src/coord/sequencer/inner/copy_from.rs @@ -20,7 +20,7 @@ use url::Url; use uuid::Uuid; use crate::coord::sequencer::inner::return_if_err; -use crate::coord::{Coordinator, TargetCluster}; +use crate::coord::{ActiveCopyFrom, Coordinator, TargetCluster}; use crate::optimize::dataflows::{prep_scalar_expr, EvalTime, ExprPrepStyle}; use crate::session::{TransactionOps, WriteOp}; use crate::{AdapterError, ExecuteContext, ExecuteResponse}; @@ -108,8 +108,9 @@ impl Coordinator { }); }); // Stash the execute context so we can cancel the COPY. + let conn_id = ctx.session().conn_id().clone(); self.active_copies - .insert(ctx.session().conn_id().clone(), ctx); + .insert(conn_id, ActiveCopyFrom { ingestion_id, ctx }); let _result = self .controller @@ -124,11 +125,17 @@ impl Coordinator { table_id: CatalogItemId, batches: Vec>, ) { - let Some(mut ctx) = self.active_copies.remove(&conn_id) else { + let Some(active_copy) = self.active_copies.remove(&conn_id) else { tracing::warn!(?conn_id, "got response for canceled COPY FROM"); return; }; + let ActiveCopyFrom { + ingestion_id, + mut ctx, + } = active_copy; + tracing::info!(%ingestion_id, num_batches = ?batches.len(), "received batches to append"); + let mut all_batches = SmallVec::with_capacity(batches.len()); let mut all_errors = SmallVec::<[String; 1]>::with_capacity(batches.len()); let mut row_count = 0u64; @@ -179,8 +186,15 @@ impl Coordinator { /// Cancel any active `COPY FROM` statements/oneshot ingestions. #[mz_ore::instrument(level = "debug")] pub(crate) fn cancel_pending_copy(&mut self, conn_id: &ConnectionId) { - // TODO(cf1): Also cancel the dataflow running on clusterd. - if let Some(ctx) = self.active_copies.remove(conn_id) { + if let Some(ActiveCopyFrom { ingestion_id, ctx }) = self.active_copies.remove(conn_id) { + let cancel_result = self + .controller + .storage + .cancel_oneshot_ingestion(ingestion_id); + if let Err(err) = cancel_result { + tracing::error!(?err, "failed to cancel OneshotIngestion"); + } + ctx.retire(Err(AdapterError::Canceled)); } } diff --git a/src/storage-client/src/client.proto b/src/storage-client/src/client.proto index e837c27647bcb..725304e76d4f7 100644 --- a/src/storage-client/src/client.proto +++ b/src/storage-client/src/client.proto @@ -48,13 +48,21 @@ message ProtoRunIngestionCommand { mz_storage_types.sources.ProtoIngestionDescription description = 2; } -message ProtoRunOneshotIngestionCommand { +message ProtoRunOneshotIngestion { mz_proto.ProtoU128 ingestion_id = 1; mz_repr.global_id.ProtoGlobalId collection_id = 2; mz_storage_types.controller.ProtoCollectionMetadata storage_metadata = 3; mz_storage_types.oneshot_sources.ProtoOneshotIngestionRequest request = 4; } +message ProtoRunOneshotIngestionsCommand { + repeated ProtoRunOneshotIngestion ingestions = 1; +} + +message ProtoCancelOneshotIngestionsCommand { + repeated mz_proto.ProtoU128 ingestions = 1; +} + message ProtoCreateSources { repeated ProtoRunIngestionCommand sources = 1; } @@ -94,7 +102,8 @@ message ProtoStorageCommand { google.protobuf.Empty allow_writes = 7; ProtoRunSinks run_sinks = 4; mz_storage_types.parameters.ProtoStorageParameters update_configuration = 5; - ProtoRunOneshotIngestionCommand oneshot_ingestion = 10; + ProtoRunOneshotIngestionsCommand run_oneshot_ingestions = 10; + ProtoCancelOneshotIngestionsCommand cancel_oneshot_ingestions = 11; } } diff --git a/src/storage-client/src/client.rs b/src/storage-client/src/client.rs index f653269d20df0..682072232aa83 100644 --- a/src/storage-client/src/client.rs +++ b/src/storage-client/src/client.rs @@ -40,6 +40,7 @@ use smallvec::SmallVec; use timely::progress::frontier::{Antichain, MutableAntichain}; use timely::PartialOrder; use tonic::{Request, Status as TonicStatus, Streaming}; +use uuid::Uuid; use crate::client::proto_storage_server::ProtoStorage; use crate::metrics::ReplicaMetrics; @@ -123,18 +124,29 @@ pub enum StorageCommand { UpdateConfiguration(StorageParameters), /// Run the enumerated sources, each associated with its identifier. RunIngestions(Vec), - /// Run a dataflow which will ingest data from an external source and only __stage__ it in - /// Persist. - /// - /// Unlike regular ingestions/sources, some other component (e.g. `environmentd`) is - /// responsible for linking the staged data into a shard. - RunOneshotIngestion(RunOneshotIngestionCommand), /// Enable compaction in storage-managed collections. /// /// Each entry in the vector names a collection and provides a frontier after which /// accumulations must be correct. AllowCompaction(Vec<(GlobalId, Antichain)>), RunSinks(Vec>), + /// Run a dataflow which will ingest data from an external source and only __stage__ it in + /// Persist. + /// + /// Unlike regular ingestions/sources, some other component (e.g. `environmentd`) is + /// responsible for linking the staged data into a shard. + RunOneshotIngestion(Vec), + /// `CancelOneshotIngestion` instructs the replica to cancel the identified oneshot ingestions. + /// + /// It is invalid to send a [`CancelOneshotIngestion`] command that references a oneshot + /// ingestion that was not created by a corresponding [`RunOneshotIngestion`] command before. + /// Doing so may cause the replica to exhibit undefined behavior. + /// + /// [`CancelOneshotIngestion`]: crate::client::StorageCommand::CancelOneshotIngestion + /// [`RunOneshotIngestion`]: crate::client::StorageCommand::RunOneshotIngestion + CancelOneshotIngestion { + ingestions: Vec, + }, } impl StorageCommand { @@ -146,7 +158,8 @@ impl StorageCommand { | InitializationComplete | AllowWrites | UpdateConfiguration(_) - | AllowCompaction(_) => false, + | AllowCompaction(_) + | CancelOneshotIngestion { .. } => false, // TODO(cf2): multi-replica oneshot ingestions. At the moment returning // true here means we can't run `COPY FROM` on multi-replica clusters, this // should be easy enough to support though. @@ -199,7 +212,7 @@ impl RustType for RunIngestionCommand { /// A command that starts ingesting the given ingestion description #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub struct RunOneshotIngestionCommand { +pub struct RunOneshotIngestion { /// The ID of the ingestion dataflow. pub ingestion_id: uuid::Uuid, /// The ID of collection we'll stage batches for. @@ -210,9 +223,9 @@ pub struct RunOneshotIngestionCommand { pub request: OneshotIngestionRequest, } -impl RustType for RunOneshotIngestionCommand { - fn into_proto(&self) -> ProtoRunOneshotIngestionCommand { - ProtoRunOneshotIngestionCommand { +impl RustType for RunOneshotIngestion { + fn into_proto(&self) -> ProtoRunOneshotIngestion { + ProtoRunOneshotIngestion { ingestion_id: Some(self.ingestion_id.into_proto()), collection_id: Some(self.collection_id.into_proto()), storage_metadata: Some(self.collection_meta.into_proto()), @@ -220,20 +233,20 @@ impl RustType for RunOneshotIngestionCommand { } } - fn from_proto(proto: ProtoRunOneshotIngestionCommand) -> Result { - Ok(RunOneshotIngestionCommand { + fn from_proto(proto: ProtoRunOneshotIngestion) -> Result { + Ok(RunOneshotIngestion { ingestion_id: proto .ingestion_id - .into_rust_if_some("ProtoRunOneshotIngestionCommand::ingestion_id")?, + .into_rust_if_some("ProtoRunOneshotIngestion::ingestion_id")?, collection_id: proto .collection_id - .into_rust_if_some("ProtoRunOneshotIngestionCommand::collection_id")?, + .into_rust_if_some("ProtoRunOneshotIngestion::collection_id")?, collection_meta: proto .storage_metadata - .into_rust_if_some("ProtoRunOneshotIngestionCommand::storage_metadata")?, + .into_rust_if_some("ProtoRunOneshotIngestion::storage_metadata")?, request: proto .request - .into_rust_if_some("ProtoRunOneshotIngestionCommand::request")?, + .into_rust_if_some("ProtoRunOneshotIngestion::request")?, }) } } @@ -300,12 +313,19 @@ impl RustType for StorageCommand { StorageCommand::RunIngestions(sources) => CreateSources(ProtoCreateSources { sources: sources.into_proto(), }), - StorageCommand::RunOneshotIngestion(oneshot) => { - OneshotIngestion(oneshot.into_proto()) - } StorageCommand::RunSinks(sinks) => RunSinks(ProtoRunSinks { sinks: sinks.into_proto(), }), + StorageCommand::RunOneshotIngestion(ingestions) => { + RunOneshotIngestions(ProtoRunOneshotIngestionsCommand { + ingestions: ingestions.iter().map(|cmd| cmd.into_proto()).collect(), + }) + } + StorageCommand::CancelOneshotIngestion { ingestions } => { + CancelOneshotIngestions(ProtoCancelOneshotIngestionsCommand { + ingestions: ingestions.iter().map(|uuid| uuid.into_proto()).collect(), + }) + } }), } } @@ -334,8 +354,21 @@ impl RustType for StorageCommand { Some(RunSinks(ProtoRunSinks { sinks })) => { Ok(StorageCommand::RunSinks(sinks.into_rust()?)) } - Some(OneshotIngestion(oneshot)) => { - Ok(StorageCommand::RunOneshotIngestion(oneshot.into_rust()?)) + Some(RunOneshotIngestions(oneshot)) => { + let ingestions = oneshot + .ingestions + .into_iter() + .map(|cmd| cmd.into_rust()) + .collect::>()?; + Ok(StorageCommand::RunOneshotIngestion(ingestions)) + } + Some(CancelOneshotIngestions(oneshot)) => { + let ingestions = oneshot + .ingestions + .into_iter() + .map(|uuid| uuid.into_rust()) + .collect::>()?; + Ok(StorageCommand::CancelOneshotIngestion { ingestions }) } None => Err(TryFromProtoError::missing_field( "ProtoStorageCommand::kind", @@ -802,7 +835,8 @@ where | StorageCommand::AllowWrites | StorageCommand::UpdateConfiguration(_) | StorageCommand::AllowCompaction(_) - | StorageCommand::RunOneshotIngestion(_) => {} + | StorageCommand::RunOneshotIngestion(_) + | StorageCommand::CancelOneshotIngestion { .. } => {} }; } diff --git a/src/storage-client/src/controller.rs b/src/storage-client/src/controller.rs index 25e28ce8b11d1..7ef77254a567c 100644 --- a/src/storage-client/src/controller.rs +++ b/src/storage-client/src/controller.rs @@ -501,6 +501,12 @@ pub trait StorageController: Debug { result_tx: OneshotResultCallback, ) -> Result<(), StorageError>; + /// Cancel a oneshot ingestion. + fn cancel_oneshot_ingestion( + &mut self, + ingestion_id: uuid::Uuid, + ) -> Result<(), StorageError>; + /// Alter the sink identified by the given id to match the provided `ExportDescription`. async fn alter_export( &mut self, diff --git a/src/storage-controller/src/history.rs b/src/storage-controller/src/history.rs index 549fde90a845c..8a8f0b5a5c151 100644 --- a/src/storage-controller/src/history.rs +++ b/src/storage-controller/src/history.rs @@ -71,7 +71,7 @@ impl CommandHistory { RunIngestions(x) => metrics.run_ingestions_count.add(x.len().cast_into()), RunSinks(x) => metrics.run_sinks_count.add(x.len().cast_into()), AllowCompaction(x) => metrics.allow_compaction_count.add(x.len().cast_into()), - RunOneshotIngestion(_) => { + RunOneshotIngestion(_) | CancelOneshotIngestion { .. } => { // TODO(cf2): Add metrics for oneshot ingestions. } } @@ -115,15 +115,21 @@ impl CommandHistory { final_sinks.extend(cmds.into_iter().map(|c| (c.id, c))); } AllowCompaction(updates) => final_compactions.extend(updates), - RunOneshotIngestion(oneshot) => { - final_oneshot_ingestions.insert(oneshot.ingestion_id, oneshot); + RunOneshotIngestion(oneshots) => { + for oneshot in oneshots { + final_oneshot_ingestions.insert(oneshot.ingestion_id, oneshot); + } + } + CancelOneshotIngestion { ingestions } => { + for ingestion in ingestions { + final_oneshot_ingestions.remove(&ingestion); + } } } } let mut run_ingestions = Vec::new(); let mut run_sinks = Vec::new(); - let mut run_oneshot_ingestions = Vec::new(); let mut allow_compaction = Vec::new(); // Discard ingestions that have been dropped, keep the rest. @@ -155,10 +161,6 @@ impl CommandHistory { run_sinks.push(sink); } - // TODO(cf1): Add a CancelOneshotIngestion command similar to CancelPeek - // that will compact/reduce away the RunOneshotIngestion. - run_oneshot_ingestions.extend(final_oneshot_ingestions.into_values()); - // Reconstitute the commands as a compact history. // // When we update `metrics`, we need to be careful to not transiently report incorrect @@ -192,14 +194,14 @@ impl CommandHistory { self.commands.push(StorageCommand::RunSinks(run_sinks)); } - // TODO(cf1): Add a CancelOneshotIngestion command, make sure we prevent - // re-sending commands for ingestions that we've already responded to. - if !run_oneshot_ingestions.is_empty() { - self.commands.extend( - run_oneshot_ingestions - .into_iter() - .map(|oneshot| StorageCommand::RunOneshotIngestion(oneshot)), - ); + // Note: RunOneshotIngestion commands are reduced, as we receive + // CancelOneshotIngestion commands. + // + // TODO(cf2): Record metrics on the number of OneshotIngestion commands. + if !final_oneshot_ingestions.is_empty() { + let oneshots = final_oneshot_ingestions.into_values().collect(); + self.commands + .push(StorageCommand::RunOneshotIngestion(oneshots)); } let count = u64::cast_from(allow_compaction.len()); diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs index c66097383f96f..1da2a27f1f4dc 100644 --- a/src/storage-controller/src/lib.rs +++ b/src/storage-controller/src/lib.rs @@ -49,7 +49,7 @@ use mz_repr::adt::interval::Interval; use mz_repr::adt::timestamp::CheckedTimestamp; use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation}; use mz_storage_client::client::{ - ProtoStorageCommand, ProtoStorageResponse, RunIngestionCommand, RunOneshotIngestionCommand, + ProtoStorageCommand, ProtoStorageResponse, RunIngestionCommand, RunOneshotIngestion, RunSinkCommand, Status, StatusUpdate, StorageCommand, StorageResponse, TableData, }; use mz_storage_client::controller::{ @@ -118,6 +118,16 @@ struct PendingCompactionCommand { cluster_id: Option, } +#[derive(Derivative)] +#[derivative(Debug)] +struct PendingOneshotIngestion { + /// Callback used to provide results of the ingestion. + #[derivative(Debug = "ignore")] + result_tx: OneshotResultCallback, + /// Cluster currently running this ingestion + cluster_id: StorageInstanceId, +} + /// A storage controller for a storage instance. #[derive(Derivative)] #[derivative(Debug)] @@ -162,7 +172,7 @@ pub struct Controller + Tim pending_table_handle_drops_rx: mpsc::UnboundedReceiver, /// Closures that can be used to send responses from oneshot ingestions. #[derivative(Debug = "ignore")] - pending_oneshot_ingestions: BTreeMap>, + pending_oneshot_ingestions: BTreeMap, /// Interface for managed collections pub(crate) collection_manager: collection_mgmt::CollectionManager, @@ -1403,7 +1413,7 @@ where // TODO(cf2): Refine this error. StorageError::Generic(anyhow::anyhow!("missing cluster {instance_id}")) })?; - let oneshot_cmd = RunOneshotIngestionCommand { + let oneshot_cmd = RunOneshotIngestion { ingestion_id, collection_id, collection_meta, @@ -1411,10 +1421,14 @@ where }; if !self.read_only { - instance.send(StorageCommand::RunOneshotIngestion(oneshot_cmd)); + instance.send(StorageCommand::RunOneshotIngestion(vec![oneshot_cmd])); + let pending = PendingOneshotIngestion { + result_tx, + cluster_id: instance_id, + }; let novel = self .pending_oneshot_ingestions - .insert(ingestion_id, result_tx); + .insert(ingestion_id, pending); assert!(novel.is_none()); Ok(()) } else { @@ -1422,6 +1436,33 @@ where } } + fn cancel_oneshot_ingestion( + &mut self, + ingestion_id: uuid::Uuid, + ) -> Result<(), StorageError> { + if self.read_only { + return Err(StorageError::ReadOnly); + } + + let pending = self + .pending_oneshot_ingestions + .remove(&ingestion_id) + .ok_or_else(|| { + // TODO(cf2): Refine this error. + StorageError::Generic(anyhow::anyhow!("missing oneshot ingestion {ingestion_id}")) + })?; + + let instance = self.instances.get_mut(&pending.cluster_id).ok_or_else(|| { + // TODO(cf2): Refine this error. + StorageError::Generic(anyhow::anyhow!("missing cluster {}", pending.cluster_id)) + })?; + instance.send(StorageCommand::CancelOneshotIngestion { + ingestions: vec![ingestion_id], + }); + + Ok(()) + } + async fn alter_export( &mut self, id: GlobalId, @@ -2016,12 +2057,21 @@ where self.record_status_updates(updates); } Some(StorageResponse::StagedBatches(batches)) => { - for (collection_id, batches) in batches { - match self.pending_oneshot_ingestions.remove(&collection_id) { - Some(sender) => (sender)(batches), + for (ingestion_id, batches) in batches { + match self.pending_oneshot_ingestions.remove(&ingestion_id) { + Some(pending) => { + // Send a cancel command so our command history is correct. + if let Some(instance) = self.instances.get_mut(&pending.cluster_id) { + instance.send(StorageCommand::CancelOneshotIngestion { + ingestions: vec![ingestion_id], + }); + } + // Send the results down our channel. + (pending.result_tx)(batches) + } // TODO(cf2): When we support running COPY FROM on multiple // replicas we can probably just ignore the case of `None`. - None => mz_ore::soft_panic_or_log!("no sender for {collection_id}!"), + None => mz_ore::soft_panic_or_log!("no sender for {ingestion_id}!"), } } } diff --git a/src/storage/src/storage_state.rs b/src/storage/src/storage_state.rs index 114544f22a02a..ece9a4dd81912 100644 --- a/src/storage/src/storage_state.rs +++ b/src/storage/src/storage_state.rs @@ -110,6 +110,7 @@ use timely::worker::Worker as TimelyWorker; use tokio::sync::{mpsc, watch}; use tokio::time::Instant; use tracing::{info, warn}; +use uuid::Uuid; use crate::internal_control::{ self, DataflowParameters, InternalCommandSender, InternalStorageCommand, @@ -919,6 +920,9 @@ impl<'w, A: Allocate> Worker<'w, A> { let mut running_ingestion_descriptions = self.storage_state.ingestions.clone(); let mut running_exports_descriptions = self.storage_state.exports.clone(); + let mut create_oneshot_ingestions: BTreeSet = BTreeSet::new(); + let mut cancel_oneshot_ingestions: BTreeSet = BTreeSet::new(); + for command in &mut commands { match command { StorageCommand::CreateTimely { .. } => { @@ -954,12 +958,6 @@ impl<'w, A: Allocate> Worker<'w, A> { } } } - StorageCommand::RunOneshotIngestion(oneshot) => { - info!(%worker_id, ?oneshot, "reconcile: received RunOneshotIngestion command"); - // TODO(cf1): Handle CancelOneshotIngestion, clean out stale oneshot - // ingestions from our state. Possibly here we respond to the client - // with a cancelation to make sure the client doesn't wait forever. - } StorageCommand::RunSinks(exports) => { info!(%worker_id, ?exports, "reconcile: received RunSinks command"); @@ -975,6 +973,15 @@ impl<'w, A: Allocate> Worker<'w, A> { } } } + StorageCommand::RunOneshotIngestion(ingestions) => { + info!(%worker_id, ?ingestions, "reconcile: received RunOneshotIngestion command"); + create_oneshot_ingestions + .extend(ingestions.iter().map(|ingestion| ingestion.ingestion_id)); + } + StorageCommand::CancelOneshotIngestion { ingestions } => { + info!(%worker_id, ?ingestions, "reconcile: received CancelOneshotIngestion command"); + cancel_oneshot_ingestions.extend(ingestions.iter()); + } StorageCommand::InitializationComplete | StorageCommand::AllowWrites | StorageCommand::UpdateConfiguration(_) => (), @@ -1082,11 +1089,28 @@ impl<'w, A: Allocate> Worker<'w, A> { } }) } + StorageCommand::RunOneshotIngestion(ingestions) => ingestions.retain(|ingestion| { + let already_running = self + .storage_state + .oneshot_ingestions + .contains_key(&ingestion.ingestion_id); + let was_canceled = cancel_oneshot_ingestions.contains(&ingestion.ingestion_id); + + !already_running && !was_canceled + }), + StorageCommand::CancelOneshotIngestion { ingestions } => { + ingestions.retain(|ingestion_id| { + let already_running = self + .storage_state + .oneshot_ingestions + .contains_key(ingestion_id); + already_running + }); + } StorageCommand::InitializationComplete | StorageCommand::AllowWrites | StorageCommand::UpdateConfiguration(_) - | StorageCommand::AllowCompaction(_) - | StorageCommand::RunOneshotIngestion(_) => (), + | StorageCommand::AllowCompaction(_) => (), } } @@ -1111,15 +1135,29 @@ impl<'w, A: Allocate> Worker<'w, A> { // Objects are considered stale if we did not see them re-created. .filter(|id| !expected_objects.contains(id)) .collect::>(); + let stale_oneshot_ingestions = self + .storage_state + .oneshot_ingestions + .keys() + .filter(|ingestion_id| { + let created = create_oneshot_ingestions.contains(ingestion_id); + let dropped = cancel_oneshot_ingestions.contains(ingestion_id); + !created && !dropped + }) + .copied() + .collect::>(); info!( - %worker_id, ?expected_objects, ?stale_objects, + %worker_id, ?expected_objects, ?stale_objects, ?stale_oneshot_ingestions, "reconcile: modifing storage state to match expected objects", ); for id in stale_objects { self.storage_state.drop_collection(id); } + for id in stale_oneshot_ingestions { + self.storage_state.drop_oneshot_ingestion(id); + } // Do not report dropping any objects that do not belong to expected // objects. @@ -1223,16 +1261,23 @@ impl StorageState { } } } - StorageCommand::RunOneshotIngestion(oneshot) => { + StorageCommand::RunOneshotIngestion(oneshots) => { if self.timely_worker_index == 0 { - self.internal_cmd_tx.borrow_mut().broadcast( - InternalStorageCommand::RunOneshotIngestion { - ingestion_id: oneshot.ingestion_id, - collection_id: oneshot.collection_id, - collection_meta: oneshot.collection_meta, - request: oneshot.request, - }, - ); + for oneshot in oneshots { + self.internal_cmd_tx.borrow_mut().broadcast( + InternalStorageCommand::RunOneshotIngestion { + ingestion_id: oneshot.ingestion_id, + collection_id: oneshot.collection_id, + collection_meta: oneshot.collection_meta, + request: oneshot.request, + }, + ); + } + } + } + StorageCommand::CancelOneshotIngestion { ingestions } => { + for id in ingestions { + self.drop_oneshot_ingestion(id); } } StorageCommand::RunSinks(exports) => { @@ -1322,4 +1367,10 @@ impl StorageState { .broadcast(InternalStorageCommand::DropDataflow(vec![id])); } } + + /// Drop the identified oneshot ingestion from the storage state. + fn drop_oneshot_ingestion(&mut self, ingestion_id: uuid::Uuid) { + let prev = self.oneshot_ingestions.remove(&ingestion_id); + tracing::info!(%ingestion_id, existed = %prev.is_some(), "dropping oneshot ingestion"); + } }