-
Notifications
You must be signed in to change notification settings - Fork 465
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[copy_from] Proper cancelation via CancelOneshotIngestion
message
#31136
base: main
Are you sure you want to change the base?
Conversation
68b82bd
to
f16fa82
Compare
* add CancelOneshotIngestion message to the storage controller * handle new message in 'reduce' and 'reconcile' in the storage-controller * emit a CancelOntshotIngestion whenever an ingestion completes
f16fa82
to
56ccb4d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall. The thing I'm not clear about is what happens to oneshot ingestions when their target cluster is dropped. There are two places that have to deal with the possibility of a missing replica and both handle them differently (returning an error or gracefully ignoring, respectively). I think we should handle this consistently, but the right thing to do depends on whether or not we drop the controller state for pending oneshot ingestions when we drop an instance or not.
/// [`RunOneshotIngestion`]: crate::client::StorageCommand::RunOneshotIngestion | ||
CancelOneshotIngestion { | ||
ingestions: Vec<Uuid>, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to make these batched commands? In compute we at one point transformed all commands into unbatched ones because the batching made various things more cumbersome (mainly keeping statistics about the number of commands in the history) and it didn't provide any benefits wrt. protobuf encoding size. I think there are plans for also moving to unbatched commands for storage (either @aljoscha or @petrosagg mentioned that), so if that's still the case it'd make sense to introduce new commands as unbatched immediately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mentioned it, yeah. If possible we should use a flattened field here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I made these batched commands is because the loop in fn reconcile(...)
doesn't remove commands, instead of mutates the batch and removes relevant ones, so I decided to stick with this existing pattern.
Chatted with @petrosagg about this today though and I'll first try to refactor the loop and actually remove commands instead of just draining batched ones.
|
||
let instance = self.instances.get_mut(&pending.cluster_id).ok_or_else(|| { | ||
// TODO(cf2): Refine this error. | ||
StorageError::Generic(anyhow::anyhow!("missing cluster {}", pending.cluster_id)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we get here that would be because of a bug in the storage controller, not because of a usage error, right? I wouldn't return an error here, but do a (soft) panic instead.
for (ingestion_id, batches) in batches { | ||
match self.pending_oneshot_ingestions.remove(&ingestion_id) { | ||
Some(pending) => { | ||
// Send a cancel command so our command history is correct. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Also do avoid duplicate work once we have active replication.)
match self.pending_oneshot_ingestions.remove(&ingestion_id) { | ||
Some(pending) => { | ||
// Send a cancel command so our command history is correct. | ||
if let Some(instance) = self.instances.get_mut(&pending.cluster_id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to get here when the instance is already dropped? If not we should add a soft panic in the else
branch.
.filter(|ingestion_id| { | ||
let created = create_oneshot_ingestions.contains(ingestion_id); | ||
let dropped = cancel_oneshot_ingestions.contains(ingestion_id); | ||
!created && !dropped |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The check seems unnecessary. We shouldn't have any drop commands for ingestions we didn't previously create in the command stream, right? So !created && dropped
shouldn't be possible.
This PR fixes the
TODO(cf1)
related to canceling oneshot ingestions. It adds aStorageCommand::CancelOneshotIngestion
thatreduce
s/compacts away a correspondingStorageCommand::RunOneshotIngestion
, much likeComputeCommand::Peek
andComputeCommand::CancelPeek
.We send a
StorageCommand::CancelOneshotIngestion
whenever a user has canceled aCOPY FROM
statement, but also the storage controller will send one whenever aRunOneshotIngestion
command completes.Motivation
Fix
TODO(cf1)
related to cancelationChecklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way), then it is tagged with aT-proto
label.