Skip to content

Commit

Permalink
Allow with_snapshot to change when reconciling commands
Browse files Browse the repository at this point in the history
  • Loading branch information
bkirwi committed Feb 6, 2025
1 parent d045a42 commit e008647
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 4 deletions.
21 changes: 18 additions & 3 deletions src/storage-types/src/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::time::Duration;

use mz_dyncfg::ConfigSet;
use mz_expr::MirScalarExpr;
use mz_ore::soft_panic_or_log;
use mz_pgcopy::CopyFormatParams;
use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
use mz_repr::bytes::ByteSize;
Expand Down Expand Up @@ -76,8 +77,7 @@ impl<S: Debug + PartialEq, T: Debug + PartialEq + PartialOrder> AlterCompatible
connection,
envelope,
version: _,
// The as-of of the descriptions may differ.
as_of: _,
as_of,
from_storage_metadata,
partition_strategy,
with_snapshot,
Expand All @@ -92,7 +92,22 @@ impl<S: Debug + PartialEq, T: Debug + PartialEq + PartialOrder> AlterCompatible
"connection",
),
(envelope == &other.envelope, "envelope"),
(with_snapshot == &other.with_snapshot, "with_snapshot"),
// Sinks can have their as-of advance, at which point we've sunk out the initial
// snapshot and may no longer need to fetch it.
(
// We didn't historically enforce this, though it should always be the case...
// for now, assert that it's true but keep the old behaviour in prod.
PartialOrder::less_equal(as_of, &other.as_of) || {
soft_panic_or_log!(
"as-of for sink {id} regressed: {:?} -> {:?}",
&as_of[..],
&other.as_of[..]
);
true
},
"as_of",
),
(!with_snapshot || other.with_snapshot, "with_snapshot"),
(
partition_strategy == &other.partition_strategy,
"partition_strategy",
Expand Down
3 changes: 2 additions & 1 deletion src/storage/src/storage_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ use std::sync::Arc;
use std::thread;

use crossbeam_channel::{RecvError, TryRecvError};
use differential_dataflow::lattice::Lattice;
use fail::fail_point;
use mz_ore::now::NowFn;
use mz_ore::tracing::TracingHandle;
Expand Down Expand Up @@ -1261,7 +1262,7 @@ impl StorageState {
Some(export_description) => {
// Update our knowledge of the `as_of`, in case we need to internally
// restart a sink in the future.
export_description.as_of.clone_from(&frontier);
export_description.as_of.join_assign(&frontier);
}
// reported_frontiers contains both ingestions and their
// exports
Expand Down

0 comments on commit e008647

Please sign in to comment.