Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[copy_from] Proper cancelation via CancelOneshotIngestion message #31136

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/adapter/src/active_compute_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;

use crate::coord::peek::PeekResponseUnary;
use crate::{AdapterError, ExecuteResponse};
use crate::{AdapterError, ExecuteContext, ExecuteResponse};

#[derive(Debug)]
/// A description of an active compute sink from the coordinator's perspective.
Expand Down Expand Up @@ -435,3 +435,12 @@ impl ActiveCopyTo {
let _ = self.tx.send(message);
}
}

/// State we keep in the `Coordinator` to track active `COPY FROM` statements.
#[derive(Debug)]
pub(crate) struct ActiveCopyFrom {
/// ID of the ingestion running in clusterd.
pub ingestion_id: uuid::Uuid,
/// Context of the SQL session that ran the statement.
pub ctx: ExecuteContext,
}
4 changes: 2 additions & 2 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ use tracing::{debug, info, info_span, span, warn, Instrument, Level, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use uuid::Uuid;

use crate::active_compute_sink::ActiveComputeSink;
use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
use crate::client::{Client, Handle};
use crate::command::{Command, ExecuteResponse};
Expand Down Expand Up @@ -1667,7 +1667,7 @@ pub struct Coordinator {
active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
/// A map of active `COPY FROM` statements. The Coordinator waits for `clusterd`
/// to stage Batches in Persist that we will then link into the shard.
active_copies: BTreeMap<ConnectionId, ExecuteContext>,
active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,

/// A map from connection ids to a watch channel that is set to `true` if the connection
/// received a cancel request.
Expand Down
24 changes: 19 additions & 5 deletions src/adapter/src/coord/sequencer/inner/copy_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use url::Url;
use uuid::Uuid;

use crate::coord::sequencer::inner::return_if_err;
use crate::coord::{Coordinator, TargetCluster};
use crate::coord::{ActiveCopyFrom, Coordinator, TargetCluster};
use crate::optimize::dataflows::{prep_scalar_expr, EvalTime, ExprPrepStyle};
use crate::session::{TransactionOps, WriteOp};
use crate::{AdapterError, ExecuteContext, ExecuteResponse};
Expand Down Expand Up @@ -108,8 +108,9 @@ impl Coordinator {
});
});
// Stash the execute context so we can cancel the COPY.
let conn_id = ctx.session().conn_id().clone();
self.active_copies
.insert(ctx.session().conn_id().clone(), ctx);
.insert(conn_id, ActiveCopyFrom { ingestion_id, ctx });

let _result = self
.controller
Expand All @@ -124,11 +125,17 @@ impl Coordinator {
table_id: CatalogItemId,
batches: Vec<Result<ProtoBatch, String>>,
) {
let Some(mut ctx) = self.active_copies.remove(&conn_id) else {
let Some(active_copy) = self.active_copies.remove(&conn_id) else {
tracing::warn!(?conn_id, "got response for canceled COPY FROM");
return;
};

let ActiveCopyFrom {
ingestion_id,
mut ctx,
} = active_copy;
tracing::info!(%ingestion_id, num_batches = ?batches.len(), "received batches to append");

let mut all_batches = SmallVec::with_capacity(batches.len());
let mut all_errors = SmallVec::<[String; 1]>::with_capacity(batches.len());
let mut row_count = 0u64;
Expand Down Expand Up @@ -179,8 +186,15 @@ impl Coordinator {
/// Cancel any active `COPY FROM` statements/oneshot ingestions.
#[mz_ore::instrument(level = "debug")]
pub(crate) fn cancel_pending_copy(&mut self, conn_id: &ConnectionId) {
// TODO(cf1): Also cancel the dataflow running on clusterd.
if let Some(ctx) = self.active_copies.remove(conn_id) {
if let Some(ActiveCopyFrom { ingestion_id, ctx }) = self.active_copies.remove(conn_id) {
let cancel_result = self
.controller
.storage
.cancel_oneshot_ingestion(ingestion_id);
if let Err(err) = cancel_result {
tracing::error!(?err, "failed to cancel OneshotIngestion");
}

ctx.retire(Err(AdapterError::Canceled));
}
}
Expand Down
13 changes: 11 additions & 2 deletions src/storage-client/src/client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,21 @@ message ProtoRunIngestionCommand {
mz_storage_types.sources.ProtoIngestionDescription description = 2;
}

message ProtoRunOneshotIngestionCommand {
message ProtoRunOneshotIngestion {
mz_proto.ProtoU128 ingestion_id = 1;
mz_repr.global_id.ProtoGlobalId collection_id = 2;
mz_storage_types.controller.ProtoCollectionMetadata storage_metadata = 3;
mz_storage_types.oneshot_sources.ProtoOneshotIngestionRequest request = 4;
}

message ProtoRunOneshotIngestionsCommand {
repeated ProtoRunOneshotIngestion ingestions = 1;
}

message ProtoCancelOneshotIngestionsCommand {
repeated mz_proto.ProtoU128 ingestions = 1;
}

message ProtoCreateSources {
repeated ProtoRunIngestionCommand sources = 1;
}
Expand Down Expand Up @@ -94,7 +102,8 @@ message ProtoStorageCommand {
google.protobuf.Empty allow_writes = 7;
ProtoRunSinks run_sinks = 4;
mz_storage_types.parameters.ProtoStorageParameters update_configuration = 5;
ProtoRunOneshotIngestionCommand oneshot_ingestion = 10;
ProtoRunOneshotIngestionsCommand run_oneshot_ingestions = 10;
ProtoCancelOneshotIngestionsCommand cancel_oneshot_ingestions = 11;
}
}

Expand Down
80 changes: 57 additions & 23 deletions src/storage-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use smallvec::SmallVec;
use timely::progress::frontier::{Antichain, MutableAntichain};
use timely::PartialOrder;
use tonic::{Request, Status as TonicStatus, Streaming};
use uuid::Uuid;

use crate::client::proto_storage_server::ProtoStorage;
use crate::metrics::ReplicaMetrics;
Expand Down Expand Up @@ -123,18 +124,29 @@ pub enum StorageCommand<T = mz_repr::Timestamp> {
UpdateConfiguration(StorageParameters),
/// Run the enumerated sources, each associated with its identifier.
RunIngestions(Vec<RunIngestionCommand>),
/// Run a dataflow which will ingest data from an external source and only __stage__ it in
/// Persist.
///
/// Unlike regular ingestions/sources, some other component (e.g. `environmentd`) is
/// responsible for linking the staged data into a shard.
RunOneshotIngestion(RunOneshotIngestionCommand),
/// Enable compaction in storage-managed collections.
///
/// Each entry in the vector names a collection and provides a frontier after which
/// accumulations must be correct.
AllowCompaction(Vec<(GlobalId, Antichain<T>)>),
RunSinks(Vec<RunSinkCommand<T>>),
/// Run a dataflow which will ingest data from an external source and only __stage__ it in
/// Persist.
///
/// Unlike regular ingestions/sources, some other component (e.g. `environmentd`) is
/// responsible for linking the staged data into a shard.
RunOneshotIngestion(Vec<RunOneshotIngestion>),
/// `CancelOneshotIngestion` instructs the replica to cancel the identified oneshot ingestions.
///
/// It is invalid to send a [`CancelOneshotIngestion`] command that references a oneshot
/// ingestion that was not created by a corresponding [`RunOneshotIngestion`] command before.
/// Doing so may cause the replica to exhibit undefined behavior.
///
/// [`CancelOneshotIngestion`]: crate::client::StorageCommand::CancelOneshotIngestion
/// [`RunOneshotIngestion`]: crate::client::StorageCommand::RunOneshotIngestion
CancelOneshotIngestion {
ingestions: Vec<Uuid>,
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to make these batched commands? In compute we at one point transformed all commands into unbatched ones because the batching made various things more cumbersome (mainly keeping statistics about the number of commands in the history) and it didn't provide any benefits wrt. protobuf encoding size. I think there are plans for also moving to unbatched commands for storage (either @aljoscha or @petrosagg mentioned that), so if that's still the case it'd make sense to introduce new commands as unbatched immediately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mentioned it, yeah. If possible we should use a flattened field here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I made these batched commands is because the loop in fn reconcile(...) doesn't remove commands, instead of mutates the batch and removes relevant ones, so I decided to stick with this existing pattern.

Chatted with @petrosagg about this today though and I'll first try to refactor the loop and actually remove commands instead of just draining batched ones.

}

impl<T> StorageCommand<T> {
Expand All @@ -146,7 +158,8 @@ impl<T> StorageCommand<T> {
| InitializationComplete
| AllowWrites
| UpdateConfiguration(_)
| AllowCompaction(_) => false,
| AllowCompaction(_)
| CancelOneshotIngestion { .. } => false,
// TODO(cf2): multi-replica oneshot ingestions. At the moment returning
// true here means we can't run `COPY FROM` on multi-replica clusters, this
// should be easy enough to support though.
Expand Down Expand Up @@ -199,7 +212,7 @@ impl RustType<ProtoRunIngestionCommand> for RunIngestionCommand {

/// A command that starts ingesting the given ingestion description
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct RunOneshotIngestionCommand {
pub struct RunOneshotIngestion {
/// The ID of the ingestion dataflow.
pub ingestion_id: uuid::Uuid,
/// The ID of collection we'll stage batches for.
Expand All @@ -210,30 +223,30 @@ pub struct RunOneshotIngestionCommand {
pub request: OneshotIngestionRequest,
}

impl RustType<ProtoRunOneshotIngestionCommand> for RunOneshotIngestionCommand {
fn into_proto(&self) -> ProtoRunOneshotIngestionCommand {
ProtoRunOneshotIngestionCommand {
impl RustType<ProtoRunOneshotIngestion> for RunOneshotIngestion {
fn into_proto(&self) -> ProtoRunOneshotIngestion {
ProtoRunOneshotIngestion {
ingestion_id: Some(self.ingestion_id.into_proto()),
collection_id: Some(self.collection_id.into_proto()),
storage_metadata: Some(self.collection_meta.into_proto()),
request: Some(self.request.into_proto()),
}
}

fn from_proto(proto: ProtoRunOneshotIngestionCommand) -> Result<Self, TryFromProtoError> {
Ok(RunOneshotIngestionCommand {
fn from_proto(proto: ProtoRunOneshotIngestion) -> Result<Self, TryFromProtoError> {
Ok(RunOneshotIngestion {
ingestion_id: proto
.ingestion_id
.into_rust_if_some("ProtoRunOneshotIngestionCommand::ingestion_id")?,
.into_rust_if_some("ProtoRunOneshotIngestion::ingestion_id")?,
collection_id: proto
.collection_id
.into_rust_if_some("ProtoRunOneshotIngestionCommand::collection_id")?,
.into_rust_if_some("ProtoRunOneshotIngestion::collection_id")?,
collection_meta: proto
.storage_metadata
.into_rust_if_some("ProtoRunOneshotIngestionCommand::storage_metadata")?,
.into_rust_if_some("ProtoRunOneshotIngestion::storage_metadata")?,
request: proto
.request
.into_rust_if_some("ProtoRunOneshotIngestionCommand::request")?,
.into_rust_if_some("ProtoRunOneshotIngestion::request")?,
})
}
}
Expand Down Expand Up @@ -300,12 +313,19 @@ impl RustType<ProtoStorageCommand> for StorageCommand<mz_repr::Timestamp> {
StorageCommand::RunIngestions(sources) => CreateSources(ProtoCreateSources {
sources: sources.into_proto(),
}),
StorageCommand::RunOneshotIngestion(oneshot) => {
OneshotIngestion(oneshot.into_proto())
}
StorageCommand::RunSinks(sinks) => RunSinks(ProtoRunSinks {
sinks: sinks.into_proto(),
}),
StorageCommand::RunOneshotIngestion(ingestions) => {
RunOneshotIngestions(ProtoRunOneshotIngestionsCommand {
ingestions: ingestions.iter().map(|cmd| cmd.into_proto()).collect(),
})
}
StorageCommand::CancelOneshotIngestion { ingestions } => {
CancelOneshotIngestions(ProtoCancelOneshotIngestionsCommand {
ingestions: ingestions.iter().map(|uuid| uuid.into_proto()).collect(),
})
}
}),
}
}
Expand Down Expand Up @@ -334,8 +354,21 @@ impl RustType<ProtoStorageCommand> for StorageCommand<mz_repr::Timestamp> {
Some(RunSinks(ProtoRunSinks { sinks })) => {
Ok(StorageCommand::RunSinks(sinks.into_rust()?))
}
Some(OneshotIngestion(oneshot)) => {
Ok(StorageCommand::RunOneshotIngestion(oneshot.into_rust()?))
Some(RunOneshotIngestions(oneshot)) => {
let ingestions = oneshot
.ingestions
.into_iter()
.map(|cmd| cmd.into_rust())
.collect::<Result<_, _>>()?;
Ok(StorageCommand::RunOneshotIngestion(ingestions))
}
Some(CancelOneshotIngestions(oneshot)) => {
let ingestions = oneshot
.ingestions
.into_iter()
.map(|uuid| uuid.into_rust())
.collect::<Result<_, _>>()?;
Ok(StorageCommand::CancelOneshotIngestion { ingestions })
}
None => Err(TryFromProtoError::missing_field(
"ProtoStorageCommand::kind",
Expand Down Expand Up @@ -802,7 +835,8 @@ where
| StorageCommand::AllowWrites
| StorageCommand::UpdateConfiguration(_)
| StorageCommand::AllowCompaction(_)
| StorageCommand::RunOneshotIngestion(_) => {}
| StorageCommand::RunOneshotIngestion(_)
| StorageCommand::CancelOneshotIngestion { .. } => {}
};
}

Expand Down
6 changes: 6 additions & 0 deletions src/storage-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,12 @@ pub trait StorageController: Debug {
result_tx: OneshotResultCallback<ProtoBatch>,
) -> Result<(), StorageError<Self::Timestamp>>;

/// Cancel a oneshot ingestion.
fn cancel_oneshot_ingestion(
&mut self,
ingestion_id: uuid::Uuid,
) -> Result<(), StorageError<Self::Timestamp>>;

/// Alter the sink identified by the given id to match the provided `ExportDescription`.
async fn alter_export(
&mut self,
Expand Down
34 changes: 18 additions & 16 deletions src/storage-controller/src/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl<T: std::fmt::Debug> CommandHistory<T> {
RunIngestions(x) => metrics.run_ingestions_count.add(x.len().cast_into()),
RunSinks(x) => metrics.run_sinks_count.add(x.len().cast_into()),
AllowCompaction(x) => metrics.allow_compaction_count.add(x.len().cast_into()),
RunOneshotIngestion(_) => {
RunOneshotIngestion(_) | CancelOneshotIngestion { .. } => {
// TODO(cf2): Add metrics for oneshot ingestions.
}
}
Expand Down Expand Up @@ -115,15 +115,21 @@ impl<T: std::fmt::Debug> CommandHistory<T> {
final_sinks.extend(cmds.into_iter().map(|c| (c.id, c)));
}
AllowCompaction(updates) => final_compactions.extend(updates),
RunOneshotIngestion(oneshot) => {
final_oneshot_ingestions.insert(oneshot.ingestion_id, oneshot);
RunOneshotIngestion(oneshots) => {
for oneshot in oneshots {
final_oneshot_ingestions.insert(oneshot.ingestion_id, oneshot);
}
}
CancelOneshotIngestion { ingestions } => {
for ingestion in ingestions {
final_oneshot_ingestions.remove(&ingestion);
}
}
}
}

let mut run_ingestions = Vec::new();
let mut run_sinks = Vec::new();
let mut run_oneshot_ingestions = Vec::new();
let mut allow_compaction = Vec::new();

// Discard ingestions that have been dropped, keep the rest.
Expand Down Expand Up @@ -155,10 +161,6 @@ impl<T: std::fmt::Debug> CommandHistory<T> {
run_sinks.push(sink);
}

// TODO(cf1): Add a CancelOneshotIngestion command similar to CancelPeek
// that will compact/reduce away the RunOneshotIngestion.
run_oneshot_ingestions.extend(final_oneshot_ingestions.into_values());

// Reconstitute the commands as a compact history.
//
// When we update `metrics`, we need to be careful to not transiently report incorrect
Expand Down Expand Up @@ -192,14 +194,14 @@ impl<T: std::fmt::Debug> CommandHistory<T> {
self.commands.push(StorageCommand::RunSinks(run_sinks));
}

// TODO(cf1): Add a CancelOneshotIngestion command, make sure we prevent
// re-sending commands for ingestions that we've already responded to.
if !run_oneshot_ingestions.is_empty() {
self.commands.extend(
run_oneshot_ingestions
.into_iter()
.map(|oneshot| StorageCommand::RunOneshotIngestion(oneshot)),
);
// Note: RunOneshotIngestion commands are reduced, as we receive
// CancelOneshotIngestion commands.
//
// TODO(cf2): Record metrics on the number of OneshotIngestion commands.
if !final_oneshot_ingestions.is_empty() {
let oneshots = final_oneshot_ingestions.into_values().collect();
self.commands
.push(StorageCommand::RunOneshotIngestion(oneshots));
}

let count = u64::cast_from(allow_compaction.len());
Expand Down
Loading
Loading