From 1478c1d4c0fa05716e45c3c636c084960df84b72 Mon Sep 17 00:00:00 2001 From: Lee Danilek Date: Tue, 4 Feb 2025 15:48:18 -0500 Subject: [PATCH] cancel in-progress snapshot imports (#33998) 1. bring back the snapshot import ui in the `/settings/snapshots` page, which is not linked from anywhere but still works if needed 2. add a cancel button for in-progress imports 3. manually tested that the cancellation works by doing a large import against local big-brain and canceling it from the dashboard before it finishes. 4. the cli showed the error immediately because it's listening to the import status. meanwhile the import continues in the background on the server, but when it gets to the end it throws an error because SnapshotImportModel makes sure you can't transition from Failed => Completed. 5. made sure the state is checked transactionally with finalizing the import, because otherwise the worker ends up committing the import and then throwing an error. 6. added test it's somewhat inefficient to continue the import in the background even after it has been cancelled. we could potentially make it more efficient with a subscription in the worker, but this solution works for now. GitOrigin-RevId: 75fa683872bce02cda6eeccfed66ff44cb38fc42 --- crates/application/src/snapshot_import/mod.rs | 26 +++++++ .../application/src/snapshot_import/tests.rs | 70 +++++++++++++++++++ crates/model/src/snapshot_imports/mod.rs | 6 +- .../deploymentSettings/SnapshotImport.tsx | 18 +++-- .../[deploymentName]/settings/snapshots.tsx | 2 + 5 files changed, 112 insertions(+), 10 deletions(-) diff --git a/crates/application/src/snapshot_import/mod.rs b/crates/application/src/snapshot_import/mod.rs index b3e7f063..265fe5e7 100644 --- a/crates/application/src/snapshot_import/mod.rs +++ b/crates/application/src/snapshot_import/mod.rs @@ -377,6 +377,7 @@ impl SnapshotImportExecutor { table_mapping_for_import, usage, audit_log_event, + Some(snapshot_import.id()), snapshot_import.requestor.clone(), ) .await?; @@ -687,6 +688,7 @@ pub async fn clear_tables( table_mapping_for_import, usage, DeploymentAuditLogEvent::ClearTables, + None, ImportRequestor::SnapshotImport, ) .await?; @@ -841,6 +843,7 @@ async fn finalize_import( table_mapping_for_import: TableMappingForImport, usage: FunctionUsageTracker, audit_log_event: DeploymentAuditLogEvent, + import_id: Option, requestor: ImportRequestor, ) -> anyhow::Result<(Timestamp, u64)> { let tables_affected = table_mapping_for_import.tables_affected(); @@ -861,6 +864,29 @@ async fn finalize_import( "snapshot_import_finalize", |tx| { async { + if let Some(import_id) = import_id { + // Only finalize the import if it's in progress. + let mut snapshot_import_model = SnapshotImportModel::new(tx); + let snapshot_import_state = + snapshot_import_model.must_get_state(import_id).await?; + match snapshot_import_state { + ImportState::InProgress { .. } => {}, + // This can happen if the import was canceled or somehow retried after + // completion. These errors won't show up to + // the user because they are already terminal states, + // so we won't transition to a new state due to this error. + ImportState::Failed(e) => anyhow::bail!("Import failed: {e}"), + ImportState::Completed { .. } => { + anyhow::bail!("Import already completed") + }, + // Indicates a bug -- we shouldn't be finalizing an import that hasn't + // started yet. + ImportState::Uploaded | ImportState::WaitingForConfirmation { .. } => { + anyhow::bail!("Import is not in progress") + }, + } + } + let mut documents_deleted = 0; for tablet_id in table_mapping_for_import.to_delete.keys() { let namespace = tx.table_mapping().tablet_namespace(*tablet_id)?; diff --git a/crates/application/src/snapshot_import/tests.rs b/crates/application/src/snapshot_import/tests.rs index b9a126dd..70e61e5e 100644 --- a/crates/application/src/snapshot_import/tests.rs +++ b/crates/application/src/snapshot_import/tests.rs @@ -1293,3 +1293,73 @@ async fn run_csv_import( .await .map(|_| ()) } + +#[convex_macro::test_runtime] +async fn test_cancel_in_progress_import( + rt: TestRuntime, + pause_controller: PauseController, +) -> anyhow::Result<()> { + let app = Application::new_for_tests(&rt).await?; + let table_name = "table1"; + let test_csv = r#" +a,b +"foo","bar" +"#; + + let hold_guard = pause_controller.hold("before_finalize_import"); + + let mut import_fut = run_csv_import(&app, table_name, test_csv).boxed(); + + select! { + r = import_fut.as_mut().fuse() => { + anyhow::bail!("import finished before pausing: {r:?}"); + }, + pause_guard = hold_guard.wait_for_blocked().fuse() => { + let pause_guard = pause_guard.unwrap(); + + // Cancel the import while it's in progress + let mut tx = app.begin(new_admin_id()).await?; + let mut import_model = model::snapshot_imports::SnapshotImportModel::new(&mut tx); + + // Find the in-progress import + let snapshot_import = import_model.import_in_state(ImportState::InProgress { + progress_message: String::new(), + checkpoint_messages: vec![], + }).await?.context("No in-progress import found")?; + + import_model.cancel_import(snapshot_import.id()).await?; + app.commit_test(tx).await?; + + pause_guard.unpause(); + }, + } + + let err = import_fut.await.unwrap_err(); + assert!(err.is_bad_request()); + assert!( + err.msg().contains("Import canceled"), + "Unexpected error message: {}", + err.msg() + ); + + // Verify the import was actually canceled + let mut tx = app.begin(new_admin_id()).await?; + let mut import_model = model::snapshot_imports::SnapshotImportModel::new(&mut tx); + let snapshot_import = import_model + .import_in_state(ImportState::Failed("Import was canceled".into())) + .await? + .context("No failed import found")?; + assert!(matches!( + snapshot_import.state.clone(), + ImportState::Failed(msg) if msg == "Import canceled" + )); + // Verify no data written + let table_name = TableName::from_str(table_name)?; + let table_size = tx + .must_count(TableNamespace::test_user(), &table_name) + .await?; + assert_eq!(table_size, 0); + assert!(!TableModel::new(&mut tx).table_exists(TableNamespace::test_user(), &table_name)); + + Ok(()) +} diff --git a/crates/model/src/snapshot_imports/mod.rs b/crates/model/src/snapshot_imports/mod.rs index 2a5cb379..997faef6 100644 --- a/crates/model/src/snapshot_imports/mod.rs +++ b/crates/model/src/snapshot_imports/mod.rs @@ -225,11 +225,11 @@ impl<'a, RT: Runtime> SnapshotImportModel<'a, RT> { pub async fn cancel_import(&mut self, id: ResolvedDocumentId) -> anyhow::Result<()> { let current_state = self.must_get_state(id).await?; match current_state { - ImportState::Uploaded | ImportState::WaitingForConfirmation { .. } => { + ImportState::Uploaded + | ImportState::WaitingForConfirmation { .. } + | ImportState::InProgress { .. } => { self.fail_import(id, "Import canceled".to_string()).await? }, - // TODO: support cancelling imports in progress - ImportState::InProgress { .. } => anyhow::bail!("Cannot cancel an import in progress"), ImportState::Completed { .. } => anyhow::bail!(ErrorMetadata::bad_request( "CannotCancelImport", "Cannot cancel an import that has completed" diff --git a/npm-packages/dashboard/src/components/deploymentSettings/SnapshotImport.tsx b/npm-packages/dashboard/src/components/deploymentSettings/SnapshotImport.tsx index c83539da..abcc8c86 100644 --- a/npm-packages/dashboard/src/components/deploymentSettings/SnapshotImport.tsx +++ b/npm-packages/dashboard/src/components/deploymentSettings/SnapshotImport.tsx @@ -112,14 +112,18 @@ function ImportStateBody({ ); case "in_progress": return ( -
- {snapshotImport.state.checkpoint_messages.map((message: string) => ( +
+ +
+ {snapshotImport.state.checkpoint_messages.map((message: string) => ( +
+ {message} +
+ ))}
- {message} + {" "} + {snapshotImport.state.progress_message}
- ))} -
- {snapshotImport.state.progress_message}
); @@ -371,7 +375,7 @@ export function SnapshotImport() {
-

Snapshot Import

+

Snapshot Import and Cloud Restore

Import tables into your database from a snapshot.{" "} + ); }