Skip to content

Commit

Permalink
cancel in-progress snapshot imports (#33998)
Browse files Browse the repository at this point in the history
1. bring back the snapshot import ui in the `/settings/snapshots` page, which is not linked from anywhere but still works if needed
2. add a cancel button for in-progress imports
3. manually tested that the cancellation works by doing a large import against local big-brain and canceling it from the dashboard before it finishes.
4. the cli showed the error immediately because it's listening to the import status. meanwhile the import continues in the background on the server, but when it gets to the end it throws an error because SnapshotImportModel makes sure you can't transition from Failed => Completed.
5. made sure the state is checked transactionally with finalizing the import, because otherwise the worker ends up committing the import and then throwing an error.
6. added test

it's somewhat inefficient to continue the import in the background even after it has been cancelled. we could potentially make it more efficient with a subscription in the worker, but this solution works for now.

GitOrigin-RevId: 75fa683872bce02cda6eeccfed66ff44cb38fc42
  • Loading branch information
ldanilek authored and Convex, Inc. committed Feb 4, 2025
1 parent 6d4326b commit 1478c1d
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 10 deletions.
26 changes: 26 additions & 0 deletions crates/application/src/snapshot_import/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ impl<RT: Runtime> SnapshotImportExecutor<RT> {
table_mapping_for_import,
usage,
audit_log_event,
Some(snapshot_import.id()),
snapshot_import.requestor.clone(),
)
.await?;
Expand Down Expand Up @@ -687,6 +688,7 @@ pub async fn clear_tables<RT: Runtime>(
table_mapping_for_import,
usage,
DeploymentAuditLogEvent::ClearTables,
None,
ImportRequestor::SnapshotImport,
)
.await?;
Expand Down Expand Up @@ -841,6 +843,7 @@ async fn finalize_import<RT: Runtime>(
table_mapping_for_import: TableMappingForImport,
usage: FunctionUsageTracker,
audit_log_event: DeploymentAuditLogEvent,
import_id: Option<ResolvedDocumentId>,
requestor: ImportRequestor,
) -> anyhow::Result<(Timestamp, u64)> {
let tables_affected = table_mapping_for_import.tables_affected();
Expand All @@ -861,6 +864,29 @@ async fn finalize_import<RT: Runtime>(
"snapshot_import_finalize",
|tx| {
async {
if let Some(import_id) = import_id {
// Only finalize the import if it's in progress.
let mut snapshot_import_model = SnapshotImportModel::new(tx);
let snapshot_import_state =
snapshot_import_model.must_get_state(import_id).await?;
match snapshot_import_state {
ImportState::InProgress { .. } => {},
// This can happen if the import was canceled or somehow retried after
// completion. These errors won't show up to
// the user because they are already terminal states,
// so we won't transition to a new state due to this error.
ImportState::Failed(e) => anyhow::bail!("Import failed: {e}"),
ImportState::Completed { .. } => {
anyhow::bail!("Import already completed")
},
// Indicates a bug -- we shouldn't be finalizing an import that hasn't
// started yet.
ImportState::Uploaded | ImportState::WaitingForConfirmation { .. } => {
anyhow::bail!("Import is not in progress")
},
}
}

let mut documents_deleted = 0;
for tablet_id in table_mapping_for_import.to_delete.keys() {
let namespace = tx.table_mapping().tablet_namespace(*tablet_id)?;
Expand Down
70 changes: 70 additions & 0 deletions crates/application/src/snapshot_import/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1293,3 +1293,73 @@ async fn run_csv_import(
.await
.map(|_| ())
}

#[convex_macro::test_runtime]
async fn test_cancel_in_progress_import(
rt: TestRuntime,
pause_controller: PauseController,
) -> anyhow::Result<()> {
let app = Application::new_for_tests(&rt).await?;
let table_name = "table1";
let test_csv = r#"
a,b
"foo","bar"
"#;

let hold_guard = pause_controller.hold("before_finalize_import");

let mut import_fut = run_csv_import(&app, table_name, test_csv).boxed();

select! {
r = import_fut.as_mut().fuse() => {
anyhow::bail!("import finished before pausing: {r:?}");
},
pause_guard = hold_guard.wait_for_blocked().fuse() => {
let pause_guard = pause_guard.unwrap();

// Cancel the import while it's in progress
let mut tx = app.begin(new_admin_id()).await?;
let mut import_model = model::snapshot_imports::SnapshotImportModel::new(&mut tx);

// Find the in-progress import
let snapshot_import = import_model.import_in_state(ImportState::InProgress {
progress_message: String::new(),
checkpoint_messages: vec![],
}).await?.context("No in-progress import found")?;

import_model.cancel_import(snapshot_import.id()).await?;
app.commit_test(tx).await?;

pause_guard.unpause();
},
}

let err = import_fut.await.unwrap_err();
assert!(err.is_bad_request());
assert!(
err.msg().contains("Import canceled"),
"Unexpected error message: {}",
err.msg()
);

// Verify the import was actually canceled
let mut tx = app.begin(new_admin_id()).await?;
let mut import_model = model::snapshot_imports::SnapshotImportModel::new(&mut tx);
let snapshot_import = import_model
.import_in_state(ImportState::Failed("Import was canceled".into()))
.await?
.context("No failed import found")?;
assert!(matches!(
snapshot_import.state.clone(),
ImportState::Failed(msg) if msg == "Import canceled"
));
// Verify no data written
let table_name = TableName::from_str(table_name)?;
let table_size = tx
.must_count(TableNamespace::test_user(), &table_name)
.await?;
assert_eq!(table_size, 0);
assert!(!TableModel::new(&mut tx).table_exists(TableNamespace::test_user(), &table_name));

Ok(())
}
6 changes: 3 additions & 3 deletions crates/model/src/snapshot_imports/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,11 @@ impl<'a, RT: Runtime> SnapshotImportModel<'a, RT> {
pub async fn cancel_import(&mut self, id: ResolvedDocumentId) -> anyhow::Result<()> {
let current_state = self.must_get_state(id).await?;
match current_state {
ImportState::Uploaded | ImportState::WaitingForConfirmation { .. } => {
ImportState::Uploaded
| ImportState::WaitingForConfirmation { .. }
| ImportState::InProgress { .. } => {
self.fail_import(id, "Import canceled".to_string()).await?
},
// TODO: support cancelling imports in progress
ImportState::InProgress { .. } => anyhow::bail!("Cannot cancel an import in progress"),
ImportState::Completed { .. } => anyhow::bail!(ErrorMetadata::bad_request(
"CannotCancelImport",
"Cannot cancel an import that has completed"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,18 @@ function ImportStateBody({
);
case "in_progress":
return (
<div className="flex flex-col">
{snapshotImport.state.checkpoint_messages.map((message: string) => (
<div>
<CancelImportButton importId={snapshotImport._id} />
<div className="flex flex-col">
{snapshotImport.state.checkpoint_messages.map((message: string) => (
<div className="flex items-center gap-2">
<CheckIcon /> {message}
</div>
))}
<div className="flex items-center gap-2">
<CheckIcon /> {message}
<Spinner className="ml-0" />{" "}
{snapshotImport.state.progress_message}
</div>
))}
<div className="flex items-center gap-2">
<Spinner className="ml-0" /> {snapshotImport.state.progress_message}
</div>
</div>
);
Expand Down Expand Up @@ -371,7 +375,7 @@ export function SnapshotImport() {
<div className="flex flex-col gap-4">
<div className="flex flex-col gap-4">
<div>
<h3 className="mb-2">Snapshot Import</h3>
<h3 className="mb-2">Snapshot Import and Cloud Restore</h3>
<p className="text-content-primary">
Import tables into your database from a snapshot.{" "}
<Link
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import { withAuthenticatedPage } from "lib/withAuthenticatedPage";
import { DeploymentSettingsLayout } from "dashboard-common";
import { SnapshotExport } from "components/deploymentSettings/SnapshotExport";
import { SnapshotImport } from "components/deploymentSettings/SnapshotImport";

export { getServerSideProps } from "lib/ssr";

function SnapshotExportPage() {
return (
<DeploymentSettingsLayout page="snapshots">
<SnapshotExport />
<SnapshotImport />
</DeploymentSettingsLayout>
);
}
Expand Down

0 comments on commit 1478c1d

Please sign in to comment.