From e008647befc514a25cae5f45dc81df1b9f936913 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Thu, 6 Feb 2025 13:18:01 -0500 Subject: [PATCH] Allow with_snapshot to change when reconciling commands --- src/storage-types/src/sinks.rs | 21 ++++++++++++++++++--- src/storage/src/storage_state.rs | 3 ++- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/src/storage-types/src/sinks.rs b/src/storage-types/src/sinks.rs index 0f2fe70487639..ba06d68cbf504 100644 --- a/src/storage-types/src/sinks.rs +++ b/src/storage-types/src/sinks.rs @@ -15,6 +15,7 @@ use std::time::Duration; use mz_dyncfg::ConfigSet; use mz_expr::MirScalarExpr; +use mz_ore::soft_panic_or_log; use mz_pgcopy::CopyFormatParams; use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError}; use mz_repr::bytes::ByteSize; @@ -76,8 +77,7 @@ impl AlterCompatible connection, envelope, version: _, - // The as-of of the descriptions may differ. - as_of: _, + as_of, from_storage_metadata, partition_strategy, with_snapshot, @@ -92,7 +92,22 @@ impl AlterCompatible "connection", ), (envelope == &other.envelope, "envelope"), - (with_snapshot == &other.with_snapshot, "with_snapshot"), + // Sinks can have their as-of advance, at which point we've sunk out the initial + // snapshot and may no longer need to fetch it. + ( + // We didn't historically enforce this, though it should always be the case... + // for now, assert that it's true but keep the old behaviour in prod. + PartialOrder::less_equal(as_of, &other.as_of) || { + soft_panic_or_log!( + "as-of for sink {id} regressed: {:?} -> {:?}", + &as_of[..], + &other.as_of[..] + ); + true + }, + "as_of", + ), + (!with_snapshot || other.with_snapshot, "with_snapshot"), ( partition_strategy == &other.partition_strategy, "partition_strategy", diff --git a/src/storage/src/storage_state.rs b/src/storage/src/storage_state.rs index 13473967f3b3e..adda858bcdf93 100644 --- a/src/storage/src/storage_state.rs +++ b/src/storage/src/storage_state.rs @@ -81,6 +81,7 @@ use std::sync::Arc; use std::thread; use crossbeam_channel::{RecvError, TryRecvError}; +use differential_dataflow::lattice::Lattice; use fail::fail_point; use mz_ore::now::NowFn; use mz_ore::tracing::TracingHandle; @@ -1261,7 +1262,7 @@ impl StorageState { Some(export_description) => { // Update our knowledge of the `as_of`, in case we need to internally // restart a sink in the future. - export_description.as_of.clone_from(&frontier); + export_description.as_of.join_assign(&frontier); } // reported_frontiers contains both ingestions and their // exports