diff --git a/crates/application/src/snapshot_import/mod.rs b/crates/application/src/snapshot_import/mod.rs index b3e7f063..265fe5e7 100644 --- a/crates/application/src/snapshot_import/mod.rs +++ b/crates/application/src/snapshot_import/mod.rs @@ -377,6 +377,7 @@ impl SnapshotImportExecutor { table_mapping_for_import, usage, audit_log_event, + Some(snapshot_import.id()), snapshot_import.requestor.clone(), ) .await?; @@ -687,6 +688,7 @@ pub async fn clear_tables( table_mapping_for_import, usage, DeploymentAuditLogEvent::ClearTables, + None, ImportRequestor::SnapshotImport, ) .await?; @@ -841,6 +843,7 @@ async fn finalize_import( table_mapping_for_import: TableMappingForImport, usage: FunctionUsageTracker, audit_log_event: DeploymentAuditLogEvent, + import_id: Option, requestor: ImportRequestor, ) -> anyhow::Result<(Timestamp, u64)> { let tables_affected = table_mapping_for_import.tables_affected(); @@ -861,6 +864,29 @@ async fn finalize_import( "snapshot_import_finalize", |tx| { async { + if let Some(import_id) = import_id { + // Only finalize the import if it's in progress. + let mut snapshot_import_model = SnapshotImportModel::new(tx); + let snapshot_import_state = + snapshot_import_model.must_get_state(import_id).await?; + match snapshot_import_state { + ImportState::InProgress { .. } => {}, + // This can happen if the import was canceled or somehow retried after + // completion. These errors won't show up to + // the user because they are already terminal states, + // so we won't transition to a new state due to this error. + ImportState::Failed(e) => anyhow::bail!("Import failed: {e}"), + ImportState::Completed { .. } => { + anyhow::bail!("Import already completed") + }, + // Indicates a bug -- we shouldn't be finalizing an import that hasn't + // started yet. + ImportState::Uploaded | ImportState::WaitingForConfirmation { .. } => { + anyhow::bail!("Import is not in progress") + }, + } + } + let mut documents_deleted = 0; for tablet_id in table_mapping_for_import.to_delete.keys() { let namespace = tx.table_mapping().tablet_namespace(*tablet_id)?; diff --git a/crates/application/src/snapshot_import/tests.rs b/crates/application/src/snapshot_import/tests.rs index b9a126dd..70e61e5e 100644 --- a/crates/application/src/snapshot_import/tests.rs +++ b/crates/application/src/snapshot_import/tests.rs @@ -1293,3 +1293,73 @@ async fn run_csv_import( .await .map(|_| ()) } + +#[convex_macro::test_runtime] +async fn test_cancel_in_progress_import( + rt: TestRuntime, + pause_controller: PauseController, +) -> anyhow::Result<()> { + let app = Application::new_for_tests(&rt).await?; + let table_name = "table1"; + let test_csv = r#" +a,b +"foo","bar" +"#; + + let hold_guard = pause_controller.hold("before_finalize_import"); + + let mut import_fut = run_csv_import(&app, table_name, test_csv).boxed(); + + select! { + r = import_fut.as_mut().fuse() => { + anyhow::bail!("import finished before pausing: {r:?}"); + }, + pause_guard = hold_guard.wait_for_blocked().fuse() => { + let pause_guard = pause_guard.unwrap(); + + // Cancel the import while it's in progress + let mut tx = app.begin(new_admin_id()).await?; + let mut import_model = model::snapshot_imports::SnapshotImportModel::new(&mut tx); + + // Find the in-progress import + let snapshot_import = import_model.import_in_state(ImportState::InProgress { + progress_message: String::new(), + checkpoint_messages: vec![], + }).await?.context("No in-progress import found")?; + + import_model.cancel_import(snapshot_import.id()).await?; + app.commit_test(tx).await?; + + pause_guard.unpause(); + }, + } + + let err = import_fut.await.unwrap_err(); + assert!(err.is_bad_request()); + assert!( + err.msg().contains("Import canceled"), + "Unexpected error message: {}", + err.msg() + ); + + // Verify the import was actually canceled + let mut tx = app.begin(new_admin_id()).await?; + let mut import_model = model::snapshot_imports::SnapshotImportModel::new(&mut tx); + let snapshot_import = import_model + .import_in_state(ImportState::Failed("Import was canceled".into())) + .await? + .context("No failed import found")?; + assert!(matches!( + snapshot_import.state.clone(), + ImportState::Failed(msg) if msg == "Import canceled" + )); + // Verify no data written + let table_name = TableName::from_str(table_name)?; + let table_size = tx + .must_count(TableNamespace::test_user(), &table_name) + .await?; + assert_eq!(table_size, 0); + assert!(!TableModel::new(&mut tx).table_exists(TableNamespace::test_user(), &table_name)); + + Ok(()) +} diff --git a/crates/model/src/snapshot_imports/mod.rs b/crates/model/src/snapshot_imports/mod.rs index 2a5cb379..997faef6 100644 --- a/crates/model/src/snapshot_imports/mod.rs +++ b/crates/model/src/snapshot_imports/mod.rs @@ -225,11 +225,11 @@ impl<'a, RT: Runtime> SnapshotImportModel<'a, RT> { pub async fn cancel_import(&mut self, id: ResolvedDocumentId) -> anyhow::Result<()> { let current_state = self.must_get_state(id).await?; match current_state { - ImportState::Uploaded | ImportState::WaitingForConfirmation { .. } => { + ImportState::Uploaded + | ImportState::WaitingForConfirmation { .. } + | ImportState::InProgress { .. } => { self.fail_import(id, "Import canceled".to_string()).await? }, - // TODO: support cancelling imports in progress - ImportState::InProgress { .. } => anyhow::bail!("Cannot cancel an import in progress"), ImportState::Completed { .. } => anyhow::bail!(ErrorMetadata::bad_request( "CannotCancelImport", "Cannot cancel an import that has completed" diff --git a/npm-packages/dashboard/src/components/deploymentSettings/SnapshotImport.tsx b/npm-packages/dashboard/src/components/deploymentSettings/SnapshotImport.tsx index c83539da..abcc8c86 100644 --- a/npm-packages/dashboard/src/components/deploymentSettings/SnapshotImport.tsx +++ b/npm-packages/dashboard/src/components/deploymentSettings/SnapshotImport.tsx @@ -112,14 +112,18 @@ function ImportStateBody({ ); case "in_progress": return ( -
- {snapshotImport.state.checkpoint_messages.map((message: string) => ( +
+ +
+ {snapshotImport.state.checkpoint_messages.map((message: string) => ( +
+ {message} +
+ ))}
- {message} + {" "} + {snapshotImport.state.progress_message}
- ))} -
- {snapshotImport.state.progress_message}
); @@ -371,7 +375,7 @@ export function SnapshotImport() {
-

Snapshot Import

+

Snapshot Import and Cloud Restore

Import tables into your database from a snapshot.{" "} + ); }