From 0e8a461bf383f0bd1bdf09c60521426760d277b4 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Wed, 22 Jan 2025 15:46:44 -0500 Subject: [PATCH 1/9] Add collection metadata for the sink's output collection --- src/adapter/src/coord/ddl.rs | 1 + src/adapter/src/coord/sequencer/inner.rs | 1 + src/storage-controller/src/history.rs | 16 ++++++++++++++++ src/storage-controller/src/lib.rs | 6 ++++++ src/storage-types/src/sinks.proto | 1 + src/storage-types/src/sinks.rs | 16 ++++++++++++++++ 6 files changed, 41 insertions(+) diff --git a/src/adapter/src/coord/ddl.rs b/src/adapter/src/coord/ddl.rs index 5a285902678eb..d57ff791d5a13 100644 --- a/src/adapter/src/coord/ddl.rs +++ b/src/adapter/src/coord/ddl.rs @@ -1330,6 +1330,7 @@ impl Coordinator { with_snapshot: sink.with_snapshot, version: sink.version, from_storage_metadata: (), + to_storage_metadata: (), }; let res = self diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index 113cf32f1ef99..1b5a2fec61a50 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -3576,6 +3576,7 @@ impl Coordinator { version: sink.version, partition_strategy: sink.partition_strategy, from_storage_metadata: (), + to_storage_metadata: (), }; self.controller diff --git a/src/storage-controller/src/history.rs b/src/storage-controller/src/history.rs index 2e5b3a143fef7..e068e55e842b2 100644 --- a/src/storage-controller/src/history.rs +++ b/src/storage-controller/src/history.rs @@ -406,6 +406,22 @@ mod tests { ), txns_shard: Default::default(), }, + to_storage_metadata: CollectionMetadata { + persist_location: PersistLocation { + blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"), + consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"), + }, + remap_shard: Default::default(), + data_shard: Default::default(), + relation_desc: RelationDesc::new( + RelationType { + column_types: Default::default(), + keys: Default::default(), + }, + Vec::::new(), + ), + txns_shard: Default::default(), + }, } } diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs index 0a6c99744582d..fa4f41e248a78 100644 --- a/src/storage-controller/src/lib.rs +++ b/src/storage-controller/src/lib.rs @@ -1440,6 +1440,7 @@ where .expect("missing dependency") .into_element(); let from_storage_metadata = self.storage_collections.collection_metadata(from_id)?; + let to_storage_metadata = self.storage_collections.collection_metadata(id)?; // Check whether the sink's write frontier is beyond the read hold we got let cur_export = self @@ -1480,6 +1481,7 @@ where partition_strategy: new_description.sink.partition_strategy, from_storage_metadata, with_snapshot: new_description.sink.with_snapshot, + to_storage_metadata, }, }; @@ -1529,6 +1531,7 @@ where let from_storage_metadata = self .storage_collections .collection_metadata(new_export_description.sink.from)?; + let to_storage_metadata = self.storage_collections.collection_metadata(id)?; let cmd = RunSinkCommand { id, @@ -1552,6 +1555,7 @@ where // TODO(petrosagg): change the controller to explicitly track dataflow executions as_of: as_of.to_owned(), from_storage_metadata, + to_storage_metadata, }, }; @@ -3155,6 +3159,7 @@ where let from_storage_metadata = self .storage_collections .collection_metadata(description.sink.from)?; + let to_storage_metadata = self.storage_collections.collection_metadata(id)?; let cmd = RunSinkCommand { id, @@ -3168,6 +3173,7 @@ where partition_strategy: description.sink.partition_strategy.clone(), from_storage_metadata, with_snapshot: description.sink.with_snapshot, + to_storage_metadata, }, }; diff --git a/src/storage-types/src/sinks.proto b/src/storage-types/src/sinks.proto index 236c67d8a8256..6ade6172513a4 100644 --- a/src/storage-types/src/sinks.proto +++ b/src/storage-types/src/sinks.proto @@ -29,6 +29,7 @@ message ProtoStorageSinkDesc { ProtoStorageSinkConnection connection = 3; optional ProtoSinkEnvelope envelope = 4; optional mz_storage_types.controller.ProtoCollectionMetadata from_storage_metadata = 6; + optional mz_storage_types.controller.ProtoCollectionMetadata to_storage_metadata = 15; mz_repr.antichain.ProtoU64Antichain as_of = 11; bool with_snapshot = 12; uint64 version = 13; diff --git a/src/storage-types/src/sinks.rs b/src/storage-types/src/sinks.rs index b66209f4dc1cc..e7489302031ba 100644 --- a/src/storage-types/src/sinks.rs +++ b/src/storage-types/src/sinks.rs @@ -49,6 +49,7 @@ pub struct StorageSinkDesc { pub envelope: SinkEnvelope, pub as_of: Antichain, pub from_storage_metadata: S, + pub to_storage_metadata: S, } impl AlterCompatible @@ -80,6 +81,7 @@ impl AlterCompatible from_storage_metadata, partition_strategy, with_snapshot, + to_storage_metadata, } = self; let compatibility_checks = [ @@ -99,6 +101,10 @@ impl AlterCompatible from_storage_metadata == &other.from_storage_metadata, "from_storage_metadata", ), + ( + to_storage_metadata == &other.to_storage_metadata, + "to_storage_metadata", + ), ]; for (compatible, field) in compatibility_checks { @@ -132,6 +138,7 @@ impl Arbitrary for StorageSinkDesc { any::(), any::(), any::(), + any::(), ) .prop_map( |( @@ -144,6 +151,7 @@ impl Arbitrary for StorageSinkDesc { partition_strategy, with_snapshot, version, + to_storage_metadata, )| { StorageSinkDesc { from, @@ -155,9 +163,13 @@ impl Arbitrary for StorageSinkDesc { from_storage_metadata, partition_strategy, with_snapshot, + to_storage_metadata, } }, ) + .prop_filter("identical source and sink", |desc| { + desc.from_storage_metadata != desc.to_storage_metadata + }) .boxed() } } @@ -171,6 +183,7 @@ impl RustType for StorageSinkDesc for StorageSinkDesc Date: Wed, 22 Jan 2025 16:57:34 -0500 Subject: [PATCH 2/9] Create the sink collection --- src/adapter/src/coord.rs | 10 ++++++++++ src/adapter/src/coord/sequencer/inner.rs | 19 ++++++++++++++++++- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index fbda0a3246618..5b620d15dd1ea 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -2731,6 +2731,16 @@ impl Coordinator { collections.push((ct.global_id(), collection_desc)); } } + CatalogItem::Sink(sink) => { + let collection_desc = CollectionDescription { + desc: RelationDesc::empty(), + data_source: DataSource::Other, + since: None, + status_collection_id: None, + timeline: None, + }; + collections.push((sink.global_id, collection_desc)); + } _ => (), } } diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index 1b5a2fec61a50..14ebc1ee8a10a 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -36,7 +36,7 @@ use mz_repr::explain::json::json_string; use mz_repr::explain::ExprHumanizer; use mz_repr::role_id::RoleId; use mz_repr::{ - CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, RelationVersion, + CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, RelationDesc, RelationVersion, RelationVersionSelector, Row, RowArena, RowIterator, Timestamp, }; use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName}; @@ -1345,6 +1345,23 @@ impl Coordinator { } }; + let collection_desc = CollectionDescription { + desc: RelationDesc::empty(), + data_source: DataSource::Other, + since: None, + status_collection_id: None, + timeline: None, + }; + let collections = vec![(global_id, collection_desc)]; + + // Create the collections. + let storage_metadata = self.catalog.state().storage_metadata(); + self.controller + .storage + .create_collections(storage_metadata, None, collections) + .await + .unwrap_or_terminate("cannot fail to create collections"); + self.create_storage_export(global_id, &catalog_sink) .await .unwrap_or_terminate("cannot fail to create exports"); From 894e64971ba1d7685fb99c0dee013b0a152b5ec6 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Thu, 23 Jan 2025 14:40:57 -0500 Subject: [PATCH 3/9] Actually create the shard for the new collection --- src/adapter/src/catalog/transact.rs | 4 +- src/adapter/src/coord.rs | 4 +- src/adapter/src/coord/sequencer/inner.rs | 33 +++--- src/catalog/src/memory/objects.rs | 2 +- src/storage-controller/src/lib.rs | 122 +++++++++++------------ 5 files changed, 88 insertions(+), 77 deletions(-) diff --git a/src/adapter/src/catalog/transact.rs b/src/adapter/src/catalog/transact.rs index 4b4d6f0cf0bac..751e6054dc912 100644 --- a/src/adapter/src/catalog/transact.rs +++ b/src/adapter/src/catalog/transact.rs @@ -1057,8 +1057,10 @@ impl Catalog { CatalogItem::ContinualTask(ct) => { storage_collections_to_create.insert(ct.global_id()); } + CatalogItem::Sink(sink) => { + storage_collections_to_create.insert(sink.global_id()); + } CatalogItem::Log(_) - | CatalogItem::Sink(_) | CatalogItem::View(_) | CatalogItem::Index(_) | CatalogItem::Type(_) diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 5b620d15dd1ea..02940cd1a4b79 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -152,6 +152,7 @@ use mz_storage_types::connections::Connection as StorageConnection; use mz_storage_types::connections::ConnectionContext; use mz_storage_types::read_holds::ReadHold; use mz_storage_types::sinks::S3SinkFormat; +use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC; use mz_storage_types::sources::Timeline; use mz_timestamp_oracle::postgres_oracle::{ PostgresTimestampOracle, PostgresTimestampOracleConfig, @@ -2733,7 +2734,8 @@ impl Coordinator { } CatalogItem::Sink(sink) => { let collection_desc = CollectionDescription { - desc: RelationDesc::empty(), + // TODO(sinks): make generic once we have more than one sink type. + desc: KAFKA_PROGRESS_DESC.clone(), data_source: DataSource::Other, since: None, status_collection_id: None, diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index 14ebc1ee8a10a..ba8b1e4cd5dc1 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -21,24 +21,32 @@ use futures::{future, Future}; use itertools::Itertools; use maplit::btreeset; use mz_adapter_types::compaction::CompactionWindow; +use mz_adapter_types::connection::ConnectionId; +use mz_catalog::memory::objects::{ + CatalogItem, Cluster, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type, +}; use mz_cloud_resources::VpcEndpointConfig; use mz_controller_types::ReplicaId; use mz_expr::{ CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing, }; +use mz_ore::cast::CastFrom; use mz_ore::collections::{CollectionExt, HashSet}; use mz_ore::task::{self, spawn, JoinHandle}; use mz_ore::tracing::OpenTelemetryContext; use mz_ore::vec::VecExt; +use mz_ore::{assert_none, instrument}; +use mz_persist_client::stats::SnapshotPartStats; use mz_repr::adt::jsonb::Jsonb; use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap}; use mz_repr::explain::json::json_string; use mz_repr::explain::ExprHumanizer; use mz_repr::role_id::RoleId; use mz_repr::{ - CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, RelationDesc, RelationVersion, + CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, RelationVersion, RelationVersionSelector, Row, RowArena, RowIterator, Timestamp, }; +use mz_sql::ast::AlterSourceAddSubsourceOption; use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName}; use mz_sql::catalog::{ CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError, @@ -52,17 +60,7 @@ use mz_sql::names::{ use mz_sql::plan::{ConnectionDetails, NetworkPolicyRule, StatementContext}; use mz_sql::pure::{generate_subsource_statements, PurifiedSourceExport}; use mz_storage_types::sinks::StorageSinkDesc; -use smallvec::SmallVec; -use timely::progress::Timestamp as TimelyTimestamp; // Import `plan` module, but only import select elements to avoid merge conflicts on use statements. -use mz_adapter_types::connection::ConnectionId; -use mz_catalog::memory::objects::{ - CatalogItem, Cluster, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type, -}; -use mz_ore::cast::CastFrom; -use mz_ore::{assert_none, instrument}; -use mz_persist_client::stats::SnapshotPartStats; -use mz_sql::ast::AlterSourceAddSubsourceOption; use mz_sql::plan::{ AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan, Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption, @@ -85,12 +83,15 @@ use mz_ssh_util::keys::SshKeyPairSet; use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription}; use mz_storage_types::connections::inline::IntoInlineConnection; use mz_storage_types::controller::StorageError; +use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC; use mz_storage_types::stats::RelationPartStats; use mz_storage_types::AlterCompatible; use mz_transform::dataflow::DataflowMetainfo; use mz_transform::notice::{OptimizerNoticeApi, OptimizerNoticeKind, RawOptimizerNotice}; use mz_transform::EmptyStatisticsOracle; +use smallvec::SmallVec; use timely::progress::Antichain; +use timely::progress::Timestamp as TimelyTimestamp; use tokio::sync::{oneshot, watch}; use tracing::{warn, Instrument, Span}; @@ -1346,7 +1347,8 @@ impl Coordinator { }; let collection_desc = CollectionDescription { - desc: RelationDesc::empty(), + // TODO(sinks): make generic once we have more than one sink type. + desc: KAFKA_PROGRESS_DESC.clone(), data_source: DataSource::Other, since: None, status_collection_id: None, @@ -3544,7 +3546,12 @@ impl Coordinator { .expect("sink known to exist") .write_frontier; let as_of = ctx.read_hold.least_valid_read(); - assert!(write_frontier.iter().all(|t| as_of.less_than(t))); + assert!( + write_frontier.iter().all(|t| as_of.less_than(t)), + "{:?} should be strictly less than {:?}", + &*as_of, + &**write_frontier + ); let catalog_sink = Sink { create_sql: sink.create_sql, diff --git a/src/catalog/src/memory/objects.rs b/src/catalog/src/memory/objects.rs index d7992f6f4889b..5b257f3fbada4 100644 --- a/src/catalog/src/memory/objects.rs +++ b/src/catalog/src/memory/objects.rs @@ -1614,9 +1614,9 @@ impl CatalogItem { CatalogItem::Table(_) | CatalogItem::Source(_) | CatalogItem::MaterializedView(_) + | CatalogItem::Sink(_) | CatalogItem::ContinualTask(_) => true, CatalogItem::Log(_) - | CatalogItem::Sink(_) | CatalogItem::View(_) | CatalogItem::Index(_) | CatalogItem::Type(_) diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs index fa4f41e248a78..db6e59d30d6c5 100644 --- a/src/storage-controller/src/lib.rs +++ b/src/storage-controller/src/lib.rs @@ -1905,10 +1905,7 @@ where Some(StorageResponse::DroppedId(id)) => { tracing::debug!("DroppedId for collection {id}"); - if let Some(_collection) = self.collections.remove(&id) { - // Nothing to do, we already dropped read holds in - // `drop_sources_unvalidated`. - } else if let Some(export) = self.exports.get_mut(&id) { + if let Some(export) = self.exports.get_mut(&id) { // TODO: Current main never drops export state, so we // also don't do that, because it would be yet more // refactoring. Instead, we downgrade to the empty @@ -1919,6 +1916,9 @@ where .read_hold .try_downgrade(Antichain::new()) .expect("must be possible"); + } else if let Some(_collection) = self.collections.remove(&id) { + // Nothing to do, we already dropped read holds in + // `drop_sources_unvalidated`. } else { soft_panic_or_log!( "DroppedId for ID {id} but we have neither ingestion nor export \ @@ -2491,7 +2491,9 @@ where let mut read_capability_changes = BTreeMap::default(); for (id, policy) in policies.into_iter() { - if let Some(collection) = self.collections.get_mut(&id) { + if let Some(_export) = self.exports.get_mut(&id) { + unreachable!("set_hold_policies is only called for ingestions"); + } else if let Some(collection) = self.collections.get_mut(&id) { let ingestion = match &mut collection.extra_state { CollectionStateExtra::Ingestion(ingestion) => ingestion, CollectionStateExtra::None => { @@ -2512,8 +2514,6 @@ where } ingestion.hold_policy = policy; - } else if let Some(_export) = self.exports.get_mut(&id) { - unreachable!("set_hold_policies is only called for ingestions"); } } @@ -2527,7 +2527,37 @@ where let mut read_capability_changes = BTreeMap::default(); for (id, new_upper) in updates.iter() { - if let Some(collection) = self.collections.get_mut(id) { + if let Ok(export) = self.export_mut(*id) { + if PartialOrder::less_than(&export.write_frontier, new_upper) { + export.write_frontier.clone_from(new_upper); + } + + // Ignore read policy for sinks whose write frontiers are closed, which identifies + // the sink is being dropped; we need to advance the read frontier to the empty + // chain to signal to the dataflow machinery that they should deprovision this + // object. + let new_read_capability = if export.write_frontier.is_empty() { + export.write_frontier.clone() + } else { + export.read_policy.frontier(export.write_frontier.borrow()) + }; + + if PartialOrder::less_equal(export.read_hold.since(), &new_read_capability) { + let mut update = ChangeBatch::new(); + update.extend(new_read_capability.iter().map(|time| (time.clone(), 1))); + update.extend( + export + .read_hold + .since() + .iter() + .map(|time| (time.clone(), -1)), + ); + + if !update.is_empty() { + read_capability_changes.insert(*id, update); + } + } + } else if let Some(collection) = self.collections.get_mut(id) { let ingestion = match &mut collection.extra_state { CollectionStateExtra::Ingestion(ingestion) => ingestion, CollectionStateExtra::None => { @@ -2560,36 +2590,6 @@ where std::mem::swap(&mut ingestion.derived_since, &mut new_derived_since); update.extend(new_derived_since.iter().map(|time| (time.clone(), -1))); - if !update.is_empty() { - read_capability_changes.insert(*id, update); - } - } - } else if let Ok(export) = self.export_mut(*id) { - if PartialOrder::less_than(&export.write_frontier, new_upper) { - export.write_frontier.clone_from(new_upper); - } - - // Ignore read policy for sinks whose write frontiers are closed, which identifies - // the sink is being dropped; we need to advance the read frontier to the empty - // chain to signal to the dataflow machinery that they should deprovision this - // object. - let new_read_capability = if export.write_frontier.is_empty() { - export.write_frontier.clone() - } else { - export.read_policy.frontier(export.write_frontier.borrow()) - }; - - if PartialOrder::less_equal(export.read_hold.since(), &new_read_capability) { - let mut update = ChangeBatch::new(); - update.extend(new_read_capability.iter().map(|time| (time.clone(), 1))); - update.extend( - export - .read_hold - .since() - .iter() - .map(|time| (time.clone(), -1)), - ); - if !update.is_empty() { read_capability_changes.insert(*id, update); } @@ -2634,7 +2634,29 @@ where debug!(id = %key, ?update, "update_hold_capability"); } - if let Some(collection) = self.collections.get_mut(&key) { + if let Ok(export) = self.export_mut(key) { + // Seed with our current read hold, then apply changes, to + // derive how we need to change our read hold. + let mut staged_read_hold = MutableAntichain::new(); + staged_read_hold + .update_iter(export.read_hold.since().iter().map(|t| (t.clone(), 1))); + let changes = staged_read_hold.update_iter(update.drain()); + update.extend(changes); + + // Make sure we also send `AllowCompaction` commands for sinks, + // which drives updating the sink's `as_of`, among other things. + let (changes, frontier, _cluster_id) = + exports_net.entry(key).or_insert_with(|| { + ( + >::new(), + Antichain::new(), + export.cluster_id(), + ) + }); + + changes.extend(update.drain()); + *frontier = staged_read_hold.frontier().to_owned(); + } else if let Some(collection) = self.collections.get_mut(&key) { let ingestion = match &mut collection.extra_state { CollectionStateExtra::Ingestion(ingestion) => ingestion, CollectionStateExtra::None => { @@ -2661,28 +2683,6 @@ where changes.extend(update.drain()); *frontier = ingestion.read_capabilities.frontier().to_owned(); - } else if let Ok(export) = self.export_mut(key) { - // Seed with our current read hold, then apply changes, to - // derive how we need to change our read hold. - let mut staged_read_hold = MutableAntichain::new(); - staged_read_hold - .update_iter(export.read_hold.since().iter().map(|t| (t.clone(), 1))); - let changes = staged_read_hold.update_iter(update.drain()); - update.extend(changes); - - // Make sure we also send `AllowCompaction` commands for sinks, - // which drives updating the sink's `as_of`, among other things. - let (changes, frontier, _cluster_id) = - exports_net.entry(key).or_insert_with(|| { - ( - >::new(), - Antichain::new(), - export.cluster_id(), - ) - }); - - changes.extend(update.drain()); - *frontier = staged_read_hold.frontier().to_owned(); } else { // This is confusing and we should probably error. tracing::warn!(id = ?key, ?update, "update_hold_capabilities for unknown object"); From ace85d99e7e59beb69ef8896e193f1584345bbb8 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Thu, 30 Jan 2025 11:28:59 -0500 Subject: [PATCH 4/9] Relax some frontier assertions Tweaking tests slightly to assert properties that are correct in both the old and new implementations. --- test/cluster/mzcompose.py | 8 ++++---- .../controller-frontiers.td | 2 +- test/testdrive/controller-frontiers.td | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/test/cluster/mzcompose.py b/test/cluster/mzcompose.py index cb07802182962..c7f36ae40d080 100644 --- a/test/cluster/mzcompose.py +++ b/test/cluster/mzcompose.py @@ -5143,12 +5143,12 @@ def workflow_test_constant_sink(c: Composition) -> None: > SELECT write_frontier FROM mz_internal.mz_frontiers JOIN mz_sinks ON id = object_id - WHERE name = 'snk' + WHERE name = 'snk' AND write_frontier IS NOT NULL > SELECT status FROM mz_internal.mz_sink_statuses WHERE name = 'snk' - dropped + running """ ) ) @@ -5162,12 +5162,12 @@ def workflow_test_constant_sink(c: Composition) -> None: > SELECT write_frontier FROM mz_internal.mz_frontiers JOIN mz_sinks ON id = object_id - WHERE name = 'snk' + WHERE name = 'snk' AND write_frontier IS NOT NULL > SELECT status FROM mz_internal.mz_sink_statuses WHERE name = 'snk' - dropped + running """ ) ) diff --git a/test/testdrive-old-kafka-src-syntax/controller-frontiers.td b/test/testdrive-old-kafka-src-syntax/controller-frontiers.td index 523e6bec04de1..d4b4981bded78 100644 --- a/test/testdrive-old-kafka-src-syntax/controller-frontiers.td +++ b/test/testdrive-old-kafka-src-syntax/controller-frontiers.td @@ -156,7 +156,7 @@ sink1 s1 ON frontiers.object_id = sinks.id WHERE frontiers.object_id LIKE 'u%' AND - frontiers.read_frontier IS NULL AND + frontiers.read_frontier != 0 AND frontiers.write_frontier > 0 sink1 diff --git a/test/testdrive/controller-frontiers.td b/test/testdrive/controller-frontiers.td index 7b977167fae35..2fb5f50a491d7 100644 --- a/test/testdrive/controller-frontiers.td +++ b/test/testdrive/controller-frontiers.td @@ -186,7 +186,7 @@ sink1 s1 ON frontiers.object_id = sinks.id WHERE frontiers.object_id LIKE 'u%' AND - frontiers.read_frontier IS NULL AND + frontiers.read_frontier != 0 AND frontiers.write_frontier > 0 sink1 From 45fc87573b421f9453bdcfee58b9f2092207f333 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Thu, 23 Jan 2025 15:55:32 -0500 Subject: [PATCH 5/9] Make sinks a variant of collections --- src/adapter/src/coord.rs | 33 +- src/adapter/src/coord/ddl.rs | 31 +- src/adapter/src/coord/sequencer/inner.rs | 22 +- src/adapter/src/util.rs | 2 +- src/storage-client/src/controller.rs | 82 ++- src/storage-client/src/storage_collections.rs | 15 +- src/storage-controller/src/lib.rs | 594 ++++++++---------- src/storage-types/src/controller.rs | 6 +- 8 files changed, 373 insertions(+), 412 deletions(-) diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 02940cd1a4b79..6f6c2bb4948ad 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -146,12 +146,12 @@ use mz_sql::session::vars::SystemVars; use mz_sql_parser::ast::display::AstDisplay; use mz_sql_parser::ast::ExplainStage; use mz_storage_client::client::TableData; -use mz_storage_client::controller::{CollectionDescription, DataSource}; +use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription}; use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection}; use mz_storage_types::connections::Connection as StorageConnection; use mz_storage_types::connections::ConnectionContext; use mz_storage_types::read_holds::ReadHold; -use mz_storage_types::sinks::S3SinkFormat; +use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc}; use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC; use mz_storage_types::sources::Timeline; use mz_timestamp_oracle::postgres_oracle::{ @@ -2076,9 +2076,11 @@ impl Coordinator { self.ship_dataflow(df_desc, mview.cluster_id, None).await; } CatalogItem::Sink(sink) => { - self.create_storage_export(sink.global_id(), sink) - .await - .unwrap_or_terminate("cannot fail to create exports"); + policies_to_set + .entry(CompactionWindow::Default) + .or_insert_with(Default::default) + .storage_ids + .insert(sink.global_id()); } CatalogItem::Connection(catalog_connection) => { if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details { @@ -2736,7 +2738,26 @@ impl Coordinator { let collection_desc = CollectionDescription { // TODO(sinks): make generic once we have more than one sink type. desc: KAFKA_PROGRESS_DESC.clone(), - data_source: DataSource::Other, + data_source: DataSource::Sink { + desc: ExportDescription { + sink: StorageSinkDesc { + from: sink.from, + from_desc: KAFKA_PROGRESS_DESC.clone(), + connection: sink + .connection + .clone() + .into_inline_connection(self.catalog().state()), + partition_strategy: sink.partition_strategy.clone(), + envelope: sink.envelope, + as_of: Antichain::from_elem(Timestamp::minimum()), + with_snapshot: sink.with_snapshot, + version: sink.version, + from_storage_metadata: (), + to_storage_metadata: (), + }, + instance_id: sink.cluster_id, + }, + }, since: None, status_collection_id: None, timeline: None, diff --git a/src/adapter/src/coord/ddl.rs b/src/adapter/src/coord/ddl.rs index d57ff791d5a13..1a42f5c71b8c2 100644 --- a/src/adapter/src/coord/ddl.rs +++ b/src/adapter/src/coord/ddl.rs @@ -46,10 +46,11 @@ use mz_sql::session::vars::{ MAX_REPLICAS_PER_CLUSTER, MAX_ROLES, MAX_SCHEMAS_PER_DATABASE, MAX_SECRETS, MAX_SINKS, MAX_SOURCES, MAX_TABLES, }; -use mz_storage_client::controller::ExportDescription; +use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription}; use mz_storage_types::connections::inline::IntoInlineConnection; use mz_storage_types::connections::PostgresConnection; use mz_storage_types::read_policy::ReadPolicy; +use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC; use mz_storage_types::sources::GenericSourceConnection; use serde_json::json; use tracing::{event, info_span, warn, Instrument, Level}; @@ -1056,9 +1057,10 @@ impl Coordinator { } pub(crate) fn drop_storage_sinks(&mut self, sink_gids: Vec) { + let storage_metadata = self.catalog.state().storage_metadata(); self.controller .storage - .drop_sinks(sink_gids) + .drop_sinks(storage_metadata, sink_gids) .unwrap_or_terminate("cannot fail to drop sinks"); } @@ -1333,16 +1335,27 @@ impl Coordinator { to_storage_metadata: (), }; - let res = self - .controller - .storage - .create_exports(vec![( - id, - ExportDescription { + let collection_desc = CollectionDescription { + // TODO(sinks): make generic once we have more than one sink type. + desc: KAFKA_PROGRESS_DESC.clone(), + data_source: DataSource::Sink { + desc: ExportDescription { sink: storage_sink_desc, instance_id: sink.cluster_id, }, - )]) + }, + since: None, + status_collection_id: None, + timeline: None, + }; + let collections = vec![(id, collection_desc)]; + + // Create the collections. + let storage_metadata = self.catalog.state().storage_metadata(); + let res = self + .controller + .storage + .create_collections(storage_metadata, None, collections) .await; // Drop read holds after the export has been created, at which point diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index ba8b1e4cd5dc1..23555fbd1d36d 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -83,7 +83,6 @@ use mz_ssh_util::keys::SshKeyPairSet; use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription}; use mz_storage_types::connections::inline::IntoInlineConnection; use mz_storage_types::controller::StorageError; -use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC; use mz_storage_types::stats::RelationPartStats; use mz_storage_types::AlterCompatible; use mz_transform::dataflow::DataflowMetainfo; @@ -1346,28 +1345,13 @@ impl Coordinator { } }; - let collection_desc = CollectionDescription { - // TODO(sinks): make generic once we have more than one sink type. - desc: KAFKA_PROGRESS_DESC.clone(), - data_source: DataSource::Other, - since: None, - status_collection_id: None, - timeline: None, - }; - let collections = vec![(global_id, collection_desc)]; - - // Create the collections. - let storage_metadata = self.catalog.state().storage_metadata(); - self.controller - .storage - .create_collections(storage_metadata, None, collections) - .await - .unwrap_or_terminate("cannot fail to create collections"); - self.create_storage_export(global_id, &catalog_sink) .await .unwrap_or_terminate("cannot fail to create exports"); + self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default) + .await; + ctx.retire(Ok(ExecuteResponse::CreatedSink)) } diff --git a/src/adapter/src/util.rs b/src/adapter/src/util.rs index e741e66477fb6..57405b32f9db1 100644 --- a/src/adapter/src/util.rs +++ b/src/adapter/src/util.rs @@ -373,7 +373,7 @@ impl ShouldTerminateGracefully for StorageError { | StorageError::ReadBeforeSince(_) | StorageError::InvalidUppers(_) | StorageError::InvalidUsage(_) - | StorageError::SourceIdReused(_) + | StorageError::CollectionIdReused(_) | StorageError::SinkIdReused(_) | StorageError::IdentifierMissing(_) | StorageError::IdentifierInvalid(_) diff --git a/src/storage-client/src/controller.rs b/src/storage-client/src/controller.rs index d8970294310cf..64f165fbdb9b3 100644 --- a/src/storage-client/src/controller.rs +++ b/src/storage-client/src/controller.rs @@ -27,8 +27,8 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; +use differential_dataflow::lattice::Lattice; use mz_cluster_client::client::ClusterReplicaLocation; -use mz_cluster_client::metrics::WallclockLagMetrics; use mz_cluster_client::ReplicaId; use mz_persist_client::batch::ProtoBatch; use mz_persist_types::{Codec64, Opaque, ShardId}; @@ -47,6 +47,7 @@ use mz_storage_types::sources::{ SourceExportDetails, Timeline, }; use serde::{Deserialize, Serialize}; +use timely::progress::frontier::MutableAntichain; use timely::progress::Timestamp as TimelyTimestamp; use timely::progress::{Antichain, Timestamp}; use tokio::sync::{mpsc, oneshot}; @@ -96,7 +97,7 @@ pub enum IntrospectionType { /// Describes how data is written to the collection. #[derive(Clone, Debug, Eq, PartialEq)] -pub enum DataSource { +pub enum DataSource { /// Ingest data from some external source. Ingestion(IngestionDescription), /// This source receives its data from the identified ingestion, @@ -122,9 +123,11 @@ pub enum DataSource { /// `storage-controller` we the primary as a dependency. primary: Option, }, - /// This source's data is does not need to be managed by the storage + /// This source's data does not need to be managed by the storage /// controller, e.g. it's a materialized view or the catalog collection. Other, + /// This collection is the output collection of a sink. + Sink { desc: ExportDescription }, } /// Describes a request to create a source. @@ -133,7 +136,7 @@ pub struct CollectionDescription { /// The schema of this collection pub desc: RelationDesc, /// The source of this collection's data. - pub data_source: DataSource, + pub data_source: DataSource, /// An optional frontier to which the collection's `since` should be advanced. pub since: Option>, /// A GlobalId to use for this collection to use for the status collection. @@ -168,7 +171,7 @@ pub enum Response { } /// Metadata that the storage controller must know to properly handle the life -/// cycle of creating and dropping collections.j +/// cycle of creating and dropping collections. /// /// This data should be kept consistent with the state modified using /// [`StorageTxn`]. @@ -485,12 +488,6 @@ pub trait StorageController: Debug { id: GlobalId, ) -> Result<&mut ExportState, StorageError>; - /// Create the sinks described by the `ExportDescription`. - async fn create_exports( - &mut self, - exports: Vec<(GlobalId, ExportDescription)>, - ) -> Result<(), StorageError>; - /// Create a oneshot ingestion. async fn create_oneshot_ingestion( &mut self, @@ -532,6 +529,7 @@ pub trait StorageController: Debug { /// Drops the read capability for the sinks and allows their resources to be reclaimed. fn drop_sinks( &mut self, + storage_metadata: &StorageMetadata, identifiers: Vec, ) -> Result<(), StorageError>; @@ -545,7 +543,11 @@ pub trait StorageController: Debug { /// created, but have been forgotten by the controller due to a restart. /// Once command history becomes durable we can remove this method and use the normal /// `drop_sinks`. - fn drop_sinks_unvalidated(&mut self, identifiers: Vec); + fn drop_sinks_unvalidated( + &mut self, + storage_metadata: &StorageMetadata, + identifiers: Vec, + ); /// Drops the read capability for the sources and allows their resources to be reclaimed. /// @@ -659,7 +661,7 @@ pub trait StorageController: Debug { >; } -impl DataSource { +impl DataSource { /// Returns true if the storage controller manages the data shard for this /// source using txn-wal. pub fn in_txns(&self) -> bool { @@ -671,6 +673,7 @@ impl DataSource { | DataSource::Introspection(_) | DataSource::Progress | DataSource::Webhook => false, + DataSource::Sink { .. } => false, } } } @@ -710,51 +713,66 @@ impl From for PersistEpoch { /// State maintained about individual exports. #[derive(Debug)] pub struct ExportState { - /// Description with which the export was created - pub description: ExportDescription, + /// Really only for keeping track of changes to the `derived_since`. + pub read_capabilities: MutableAntichain, - /// The read hold that this export has on its dependencies (inputs). When + /// The cluster this export is associated with. + pub cluster_id: StorageInstanceId, + + /// The current since frontier, derived from `write_frontier` using + /// `hold_policy`. + pub derived_since: Antichain, + + /// The read holds that this export has on its dependencies (its input and itself). When /// the upper of the export changes, we downgrade this, which in turn /// downgrades holds we have on our dependencies' sinces. - pub read_hold: ReadHold, + pub read_holds: [ReadHold; 2], /// The policy to use to downgrade `self.read_capability`. pub read_policy: ReadPolicy, /// Reported write frontier. pub write_frontier: Antichain, - - /// Maximum frontier wallclock lag since the last introspection update. - pub wallclock_lag_max: Duration, - /// Frontier wallclock lag metrics tracked for this collection. - pub wallclock_lag_metrics: WallclockLagMetrics, } impl ExportState { pub fn new( - description: ExportDescription, + cluster_id: StorageInstanceId, read_hold: ReadHold, + self_hold: ReadHold, + write_frontier: Antichain, read_policy: ReadPolicy, - wallclock_lag_metrics: WallclockLagMetrics, - ) -> Self { + ) -> Self + where + T: Lattice, + { + let mut dependency_since = Antichain::from_elem(T::minimum()); + for read_hold in [&read_hold, &self_hold] { + dependency_since.join_assign(read_hold.since()); + } Self { - description, - read_hold, + read_capabilities: MutableAntichain::from(dependency_since.borrow()), + cluster_id, + derived_since: dependency_since, + read_holds: [read_hold, self_hold], read_policy, - write_frontier: Antichain::from_elem(Timestamp::minimum()), - wallclock_lag_max: Default::default(), - wallclock_lag_metrics, + write_frontier, } } /// Returns the cluster to which the export is bound. pub fn cluster_id(&self) -> StorageInstanceId { - self.description.instance_id + self.cluster_id + } + + /// Returns the cluster to which the export is bound. + pub fn input_hold(&self) -> &ReadHold { + &self.read_holds[0] } /// Returns whether the export was dropped. pub fn is_dropped(&self) -> bool { - self.read_hold.since().is_empty() + self.read_holds.iter().all(|h| h.since().is_empty()) } } /// A channel that allows you to append a set of updates to a pre-defined [`GlobalId`]. diff --git a/src/storage-client/src/storage_collections.rs b/src/storage-client/src/storage_collections.rs index a9115fd7668ad..36a51fbfcfbec 100644 --- a/src/storage-client/src/storage_collections.rs +++ b/src/storage-client/src/storage_collections.rs @@ -339,6 +339,7 @@ impl SnapshotCursor { } /// Frontiers of the collection identified by `id`. +#[derive(Debug)] pub struct CollectionFrontiers { /// The [GlobalId] of the collection that these frontiers belong to. pub id: GlobalId, @@ -846,7 +847,7 @@ where fn determine_collection_dependencies( &self, self_collections: &BTreeMap>, - data_source: &DataSource, + data_source: &DataSource, ) -> Result, StorageError> { let dependencies = match &data_source { DataSource::Introspection(_) @@ -878,6 +879,7 @@ where } // Ingestions depend on their remap collection. DataSource::Ingestion(ingestion) => vec![ingestion.remap_collection_id], + DataSource::Sink { desc } => vec![desc.sink.from], }; Ok(dependencies) @@ -1663,7 +1665,7 @@ where collections.dedup(); for pos in 1..collections.len() { if collections[pos - 1].0 == collections[pos].0 { - return Err(StorageError::SourceIdReused(collections[pos].0)); + return Err(StorageError::CollectionIdReused(collections[pos].0)); } } @@ -1677,7 +1679,7 @@ where for (id, description) in collections.iter() { if let Some(existing_collection) = self_collections.get(id) { if &existing_collection.description != description { - return Err(StorageError::SourceIdReused(*id)); + return Err(StorageError::CollectionIdReused(*id)); } } } @@ -1779,6 +1781,7 @@ where | DataSource::Ingestion(_) | DataSource::Progress | DataSource::Other => {} + DataSource::Sink { .. } => {} DataSource::Table { .. } => { let register_ts = register_ts.expect( "caller should have provided a register_ts when creating a table", @@ -1868,7 +1871,7 @@ where let initial_since = match storage_dependencies .iter() .at_most_one() - .expect("should have at most one depdendency") + .expect("should have at most one dependency") { Some(dep) => { let dependency_collection = self_collections @@ -1995,6 +1998,9 @@ where DataSource::Ingestion(_) => { self_collections.insert(id, collection_state); } + DataSource::Sink { .. } => { + self_collections.insert(id, collection_state); + } } self.register_handles(id, is_in_txns(id, &metadata), since_handle, write_handle); @@ -2507,6 +2513,7 @@ where } // Materialized views, continual tasks, etc, aren't managed by storage. Other => {} + Sink { .. } => {} }; } Ok(result) diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs index db6e59d30d6c5..40a067bce0d8c 100644 --- a/src/storage-controller/src/lib.rs +++ b/src/storage-controller/src/lib.rs @@ -142,8 +142,6 @@ pub struct Controller + Tim /// This is to prevent the re-binding of identifiers to other descriptions. pub(crate) collections: BTreeMap>, - pub(crate) exports: BTreeMap>, - /// Write handle for table shards. pub(crate) persist_table_worker: persist_handles::PersistTableWriteWorker, /// A shared TxnsCache running in a task and communicated with over a channel. @@ -358,6 +356,11 @@ where match &collection.extra_state { CollectionStateExtra::Ingestion(ingestion_state) => Ok(ingestion_state.hydrated), + CollectionStateExtra::Export(_) => { + // For now, sinks are always considered hydrated. + // TODO(sinks): base this off of the sink shard's frontier? + Ok(true) + } CollectionStateExtra::None => { // For now, objects that are not ingestions are always // considered hydrated. @@ -373,37 +376,30 @@ where (Antichain, Antichain), StorageError, > { - Ok(match self.export(id) { - Ok(export) => ( - export.read_hold.since().clone(), - export.write_frontier.clone(), - ), - Err(_) => { - let frontiers = self.storage_collections.collection_frontiers(id)?; - (frontiers.implied_capability, frontiers.write_frontier) - } - }) + let frontiers = self.storage_collections.collection_frontiers(id)?; + Ok((frontiers.implied_capability, frontiers.write_frontier)) } fn collections_frontiers( &self, mut ids: Vec, ) -> Result, Antichain)>, StorageError> { - // The ids might be either normal collections or exports. Both have frontiers that might be - // interesting to external observers. let mut result = vec![]; + // In theory, we could pull all our frontiers from storage collections... + // but in practice those frontiers may not be identical. For historical reasons, we use the + // locally-tracked frontier for sinks but the storage-collections-maintained frontier for + // sources. ids.retain(|&id| match self.export(id) { Ok(export) => { result.push(( id, - export.read_hold.since().clone(), + export.input_hold().since().clone(), export.write_frontier.clone(), )); false } Err(_) => true, }); - result.extend( self.storage_collections .collections_frontiers(ids)? @@ -601,7 +597,7 @@ where collections.dedup(); for pos in 1..collections.len() { if collections[pos - 1].0 == collections[pos].0 { - return Err(StorageError::SourceIdReused(collections[pos].0)); + return Err(StorageError::CollectionIdReused(collections[pos].0)); } } @@ -732,6 +728,7 @@ where // statistics. let mut new_source_statistic_entries = BTreeSet::new(); let mut new_webhook_statistic_entries = BTreeSet::new(); + let mut new_sink_statistic_entries = BTreeSet::new(); for (id, description, write, metadata) in to_register { let is_in_txns = |id, metadata: &CollectionMetadata| { @@ -776,7 +773,10 @@ where // upper of the shard's `write_handle` generally isn't the logical upper of // the shard. Instead we need to thread through the upper of the `txn` shard // here so we can check this invariant. - if !dependency_read_holds.is_empty() && !is_in_txns(id, &metadata) { + if !dependency_read_holds.is_empty() + && !is_in_txns(id, &metadata) + && !matches!(&data_source, DataSource::Sink { .. }) + { // The dependency since cannot be beyond the dependent (our) // upper unless the collection is new. In practice, the // depdenency is the remap shard of a source (export), and if @@ -941,6 +941,27 @@ where new_source_statistic_entries.insert(id); } + DataSource::Sink { desc } => { + let mut dependency_since = Antichain::from_elem(T::minimum()); + for read_hold in dependency_read_holds.iter() { + dependency_since.join_assign(read_hold.since()); + } + + let [self_hold, read_hold] = + dependency_read_holds.try_into().expect("two holds"); + + let state = ExportState::new( + desc.instance_id, + read_hold, + self_hold, + write_frontier.clone(), + ReadPolicy::step_back(), + ); + maybe_instance_id = Some(state.cluster_id); + extra_state = CollectionStateExtra::Export(state); + + new_sink_statistic_entries.insert(id); + } } let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(id, maybe_instance_id); @@ -962,6 +983,7 @@ where // statistics scrapers, but in the interest of safety, avoid overriding existing // statistics values. let mut source_statistics = self.source_statistics.lock().expect("poisoned"); + let mut sink_statistics = self.sink_statistics.lock().expect("poisoned"); for id in new_source_statistic_entries { source_statistics @@ -972,6 +994,11 @@ where for id in new_webhook_statistic_entries { source_statistics.webhook_statistics.entry(id).or_default(); } + for id in new_sink_statistic_entries { + sink_statistics + .entry(id) + .or_insert(StatsState::new(SinkStatisticsUpdate::new(id))); + } } // Register the tables all in one batch. @@ -1021,6 +1048,11 @@ where "ingestion exports do not execute directly, but instead schedule their source to be re-executed" ), DataSource::Introspection(_) | DataSource::Webhook | DataSource::Table { .. } | DataSource::Progress | DataSource::Other => {} + DataSource::Sink { .. } => { + if !self.read_only { + self.run_export(id)?; + } + } }; } @@ -1300,8 +1332,12 @@ where &self, id: GlobalId, ) -> Result<&ExportState, StorageError> { - self.exports + self.collections .get(&id) + .and_then(|c| match &c.extra_state { + CollectionStateExtra::Export(state) => Some(state), + _ => None, + }) .ok_or(StorageError::IdentifierMissing(id)) } @@ -1309,83 +1345,15 @@ where &mut self, id: GlobalId, ) -> Result<&mut ExportState, StorageError> { - self.exports + self.collections .get_mut(&id) + .and_then(|c| match &mut c.extra_state { + CollectionStateExtra::Export(state) => Some(state), + _ => None, + }) .ok_or(StorageError::IdentifierMissing(id)) } - async fn create_exports( - &mut self, - exports: Vec<(GlobalId, ExportDescription)>, - ) -> Result<(), StorageError> { - // Validate first, to avoid corrupting state. - let mut dedup = BTreeMap::new(); - for (id, desc) in exports.iter() { - if dedup.insert(id, desc).is_some() { - return Err(StorageError::SinkIdReused(*id)); - } - if let Ok(export) = self.export(*id) { - if &export.description != desc { - return Err(StorageError::SinkIdReused(*id)); - } - } - } - - for (id, description) in exports { - let from_id = description.sink.from; - - // Acquire read holds at StorageCollections to ensure that the - // sinked collection is not dropped while we're sinking it. - let desired_read_holds = vec![from_id.clone()]; - let read_hold = self - .storage_collections - .acquire_read_holds(desired_read_holds) - .expect("missing dependency") - .into_element(); - - info!( - sink_id = id.to_string(), - from_id = from_id.to_string(), - acquired_read_hold = ?read_hold, - "sink acquired read holds" - ); - let read_policy = ReadPolicy::step_back(); - - info!( - sink_id = id.to_string(), - from_id = from_id.to_string(), - as_of = ?description.sink.as_of, - "create_exports: creating sink" - ); - - let wallclock_lag_metrics = self - .metrics - .wallclock_lag_metrics(id, Some(description.instance_id)); - - let export_state = ExportState::new( - description.clone(), - read_hold, - read_policy, - wallclock_lag_metrics, - ); - self.exports.insert(id, export_state); - - // Just like with `new_source_statistic_entries`, we can probably - // `insert` here, but in the interest of safety, never override - // existing values. - self.sink_statistics - .lock() - .expect("poisoned") - .entry(id) - .or_insert(StatsState::new(SinkStatisticsUpdate::new(id))); - - if !self.read_only { - self.run_export(id)?; - } - } - Ok(()) - } - /// Create a oneshot ingestion. async fn create_oneshot_ingestion( &mut self, @@ -1433,39 +1401,33 @@ where // Acquire read holds at StorageCollections to ensure that the // sinked collection is not dropped while we're sinking it. - let desired_read_holds = vec![from_id.clone()]; - let read_hold = self + let desired_read_holds = vec![from_id.clone(), id.clone()]; + let [input_hold, self_hold] = self .storage_collections .acquire_read_holds(desired_read_holds) .expect("missing dependency") - .into_element(); + .try_into() + .expect("expected number of holds"); let from_storage_metadata = self.storage_collections.collection_metadata(from_id)?; let to_storage_metadata = self.storage_collections.collection_metadata(id)?; // Check whether the sink's write frontier is beyond the read hold we got - let cur_export = self - .exports - .get_mut(&id) - .ok_or_else(|| StorageError::IdentifierMissing(id))?; + let cur_export = self.export_mut(id)?; let input_readable = cur_export .write_frontier .iter() - .all(|t| read_hold.since().less_than(t)); + .all(|t| input_hold.since().less_than(t)); if !input_readable { return Err(StorageError::ReadBeforeSince(from_id)); } - let wallclock_lag_metrics = self - .metrics - .wallclock_lag_metrics(id, Some(new_description.instance_id)); - let new_export = ExportState { - description: new_description.clone(), - read_hold, + read_capabilities: cur_export.read_capabilities.clone(), + cluster_id: new_description.instance_id, + derived_since: cur_export.derived_since.clone(), + read_holds: [input_hold, self_hold], read_policy: cur_export.read_policy.clone(), write_frontier: cur_export.write_frontier.clone(), - wallclock_lag_max: Default::default(), - wallclock_lag_metrics, }; *cur_export = new_export; @@ -1514,10 +1476,16 @@ where // update that because `ExportState` is not clone, because it holds // a `ReadHandle` and cloning that would cause additional work for // whoever guarantees those read holds. - let (mut new_export_description, as_of) = { - let export = self.export(id).expect("export exists"); - let export_description = export.description.clone(); - let as_of = export.read_hold.since().clone(); + let (mut new_export_description, as_of): (ExportDescription, _) = { + let export = &self.collections[&id]; + let DataSource::Sink { desc } = &export.data_source else { + panic!("export exists") + }; + let CollectionStateExtra::Export(state) = &export.extra_state else { + panic!("export exists") + }; + let export_description = desc.clone(); + let as_of = state.input_hold().since().clone(); (export_description, as_of) }; @@ -1589,8 +1557,13 @@ where // Update state only after all possible errors have occurred. for (id, new_export_description) in export_updates { - let export = self.export_mut(id).expect("export known to exist"); - export.description = new_export_description; + let Some(state) = self.collections.get_mut(&id) else { + panic!("export known to exist") + }; + let DataSource::Sink { desc } = &mut state.data_source else { + panic!("export known to exist") + }; + *desc = new_export_description; } } @@ -1735,6 +1708,7 @@ where ingestions_to_drop.insert(id); } DataSource::Other | DataSource::Introspection(_) | DataSource::Progress => (), + DataSource::Sink { .. } => {} } } } @@ -1772,27 +1746,42 @@ where /// Drops the read capability for the sinks and allows their resources to be reclaimed. fn drop_sinks( &mut self, + storage_metadata: &StorageMetadata, identifiers: Vec, ) -> Result<(), StorageError> { self.validate_export_ids(identifiers.iter().cloned())?; - self.drop_sinks_unvalidated(identifiers); + self.drop_sinks_unvalidated(storage_metadata, identifiers); Ok(()) } - fn drop_sinks_unvalidated(&mut self, identifiers: Vec) { - for id in identifiers { - // Already removed. - if self.export(id).is_err() { - continue; - } + fn drop_sinks_unvalidated( + &mut self, + storage_metadata: &StorageMetadata, + mut identifiers: Vec, + ) { + // Ignore exports that have already been removed. + identifiers.retain(|id| self.export(*id).is_ok()); - // We don't explicitly remove read capabilities! Downgrading the - // frontier of the sink to `[]` (the empty Antichain), will - // propagate to the storage dependencies. + // TODO: ideally we'd advance the write frontier ourselves here, but this function's + // not yet marked async. - // Remove sink by removing its write frontier and arranging for deprovisioning. - self.update_write_frontiers(&[(id, Antichain::new())]); - } + // We don't explicitly remove read capabilities! Downgrading the + // frontier of the source to `[]` (the empty Antichain), will propagate + // to the storage dependencies. + let drop_policy = identifiers + .iter() + .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new()))) + .collect(); + + tracing::debug!( + ?drop_policy, + "dropping sources by setting read hold policies" + ); + self.set_hold_policies(drop_policy); + + // Also let StorageCollections know! + self.storage_collections + .drop_collections_unvalidated(storage_metadata, identifiers); } #[instrument(level = "debug")] @@ -1905,18 +1894,7 @@ where Some(StorageResponse::DroppedId(id)) => { tracing::debug!("DroppedId for collection {id}"); - if let Some(export) = self.exports.get_mut(&id) { - // TODO: Current main never drops export state, so we - // also don't do that, because it would be yet more - // refactoring. Instead, we downgrade to the empty - // frontier, which satisfies StorageCollections just as - // much. - tracing::info!("downgrading read hold of export {id} to empty frontier!"); - export - .read_hold - .try_downgrade(Antichain::new()) - .expect("must be possible"); - } else if let Some(_collection) = self.collections.remove(&id) { + if let Some(_collection) = self.collections.remove(&id) { // Nothing to do, we already dropped read holds in // `drop_sources_unvalidated`. } else { @@ -1987,6 +1965,9 @@ where ingestion_state.hydrated = true; } } + CollectionStateExtra::Export(_) => { + // TODO(sinks): track sink hydration? + } CollectionStateExtra::None => { // Nothing to do } @@ -2051,36 +2032,42 @@ where let instance = cluster_id.and_then(|cluster_id| self.instances.get_mut(&cluster_id)); if read_frontier.is_empty() { - if instance.is_some() && self.collections.contains_key(&id) { - let collection = self.collections.get(&id).expect("known to exist"); - match collection.extra_state { - CollectionStateExtra::Ingestion(_) => { - pending_source_drops.push(id); - } - CollectionStateExtra::None => { - // Nothing to do - } - } - } else if let Some(collection) = self.collections.get(&id) { - match collection.data_source { - DataSource::Table { .. } => { - pending_collection_drops.push(id); + if let Some(collection) = self.collections.get(&id) { + if instance.is_some() { + match collection.extra_state { + CollectionStateExtra::Ingestion(_) => { + pending_source_drops.push(id); + } + CollectionStateExtra::None => { + // Nothing to do + } + CollectionStateExtra::Export(_) => { + pending_sink_drops.push(id); + } } - DataSource::Webhook => { - pending_collection_drops.push(id); - // TODO(parkmycar): The Collection Manager and PersistMonotonicWriter - // could probably use some love and maybe get merged together? - let fut = self.collection_manager.unregister_collection(id); - mz_ore::task::spawn(|| format!("storage-webhook-cleanup-{id}"), fut); + } else { + match collection.data_source { + DataSource::Table { .. } => { + pending_collection_drops.push(id); + } + DataSource::Webhook => { + pending_collection_drops.push(id); + // TODO(parkmycar): The Collection Manager and PersistMonotonicWriter + // could probably use some love and maybe get merged together? + let fut = self.collection_manager.unregister_collection(id); + mz_ore::task::spawn( + || format!("storage-webhook-cleanup-{id}"), + fut, + ); + } + DataSource::Ingestion(_) => (), + DataSource::IngestionExport { .. } => (), + DataSource::Introspection(_) => (), + DataSource::Progress => (), + DataSource::Other => (), + DataSource::Sink { .. } => (), } - DataSource::Ingestion(_) => (), - DataSource::IngestionExport { .. } => (), - DataSource::Introspection(_) => (), - DataSource::Progress => (), - DataSource::Other => (), } - } else if instance.is_some() && self.exports.contains_key(&id) { - pending_sink_drops.push(id); } else if instance.is_none() { tracing::info!("Compaction command for id {id}, but we don't have a client."); } else { @@ -2438,7 +2425,6 @@ where Self { build_info, collections: BTreeMap::default(), - exports: BTreeMap::default(), persist_table_worker, txns_read, txns_metrics, @@ -2491,29 +2477,31 @@ where let mut read_capability_changes = BTreeMap::default(); for (id, policy) in policies.into_iter() { - if let Some(_export) = self.exports.get_mut(&id) { - unreachable!("set_hold_policies is only called for ingestions"); - } else if let Some(collection) = self.collections.get_mut(&id) { - let ingestion = match &mut collection.extra_state { - CollectionStateExtra::Ingestion(ingestion) => ingestion, + if let Some(collection) = self.collections.get_mut(&id) { + let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state + { + CollectionStateExtra::Ingestion(ingestion) => ( + ingestion.write_frontier.borrow(), + &mut ingestion.derived_since, + &mut ingestion.hold_policy, + ), CollectionStateExtra::None => { unreachable!("set_hold_policies is only called for ingestions"); } + CollectionStateExtra::Export(export) => ( + export.write_frontier.borrow(), + &mut export.derived_since, + &mut export.read_policy, + ), }; - let mut new_derived_since = policy.frontier(ingestion.write_frontier.borrow()); - if PartialOrder::less_equal(&ingestion.derived_since, &new_derived_since) { - let mut update = ChangeBatch::new(); - update.extend(new_derived_since.iter().map(|time| (time.clone(), 1))); - std::mem::swap(&mut ingestion.derived_since, &mut new_derived_since); - update.extend(new_derived_since.iter().map(|time| (time.clone(), -1))); - - if !update.is_empty() { - read_capability_changes.insert(id, update); - } + let new_derived_since = policy.frontier(write_frontier); + let mut update = swap_updates(derived_since, new_derived_since); + if !update.is_empty() { + read_capability_changes.insert(id, update); } - ingestion.hold_policy = policy; + *hold_policy = policy; } } @@ -2527,39 +2515,14 @@ where let mut read_capability_changes = BTreeMap::default(); for (id, new_upper) in updates.iter() { - if let Ok(export) = self.export_mut(*id) { - if PartialOrder::less_than(&export.write_frontier, new_upper) { - export.write_frontier.clone_from(new_upper); - } - - // Ignore read policy for sinks whose write frontiers are closed, which identifies - // the sink is being dropped; we need to advance the read frontier to the empty - // chain to signal to the dataflow machinery that they should deprovision this - // object. - let new_read_capability = if export.write_frontier.is_empty() { - export.write_frontier.clone() - } else { - export.read_policy.frontier(export.write_frontier.borrow()) - }; - - if PartialOrder::less_equal(export.read_hold.since(), &new_read_capability) { - let mut update = ChangeBatch::new(); - update.extend(new_read_capability.iter().map(|time| (time.clone(), 1))); - update.extend( - export - .read_hold - .since() - .iter() - .map(|time| (time.clone(), -1)), - ); - - if !update.is_empty() { - read_capability_changes.insert(*id, update); - } - } - } else if let Some(collection) = self.collections.get_mut(id) { - let ingestion = match &mut collection.extra_state { - CollectionStateExtra::Ingestion(ingestion) => ingestion, + if let Some(collection) = self.collections.get_mut(id) { + let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state + { + CollectionStateExtra::Ingestion(ingestion) => ( + &mut ingestion.write_frontier, + &mut ingestion.derived_since, + &ingestion.hold_policy, + ), CollectionStateExtra::None => { if matches!(collection.data_source, DataSource::Progress) { // We do get these, but can't do anything with it! @@ -2572,27 +2535,21 @@ where } continue; } + CollectionStateExtra::Export(export) => ( + &mut export.write_frontier, + &mut export.derived_since, + &export.read_policy, + ), }; - if PartialOrder::less_than(&ingestion.write_frontier, new_upper) { - ingestion.write_frontier.clone_from(new_upper); + if PartialOrder::less_than(write_frontier, new_upper) { + write_frontier.clone_from(new_upper); } - debug!(%id, ?ingestion, ?new_upper, "upper update for ingestion!"); - - let mut new_derived_since = ingestion - .hold_policy - .frontier(ingestion.write_frontier.borrow()); - - if PartialOrder::less_equal(&ingestion.derived_since, &new_derived_since) { - let mut update = ChangeBatch::new(); - update.extend(new_derived_since.iter().map(|time| (time.clone(), 1))); - std::mem::swap(&mut ingestion.derived_since, &mut new_derived_since); - update.extend(new_derived_since.iter().map(|time| (time.clone(), -1))); - - if !update.is_empty() { - read_capability_changes.insert(*id, update); - } + let new_derived_since = hold_policy.frontier(write_frontier.borrow()); + let mut update = swap_updates(derived_since, new_derived_since); + if !update.is_empty() { + read_capability_changes.insert(*id, update); } } else if self.storage_collections.check_exists(*id).is_ok() { // StorageCollections is handling it! @@ -2621,7 +2578,6 @@ where fn update_hold_capabilities(&mut self, updates: &mut BTreeMap>) { // Location to record consequences that we need to act on. let mut collections_net = BTreeMap::new(); - let mut exports_net = BTreeMap::new(); // We must not rely on any specific relative ordering of `GlobalId`s. // That said, it is reasonable to assume that collections generally have @@ -2634,31 +2590,24 @@ where debug!(id = %key, ?update, "update_hold_capability"); } - if let Ok(export) = self.export_mut(key) { - // Seed with our current read hold, then apply changes, to - // derive how we need to change our read hold. - let mut staged_read_hold = MutableAntichain::new(); - staged_read_hold - .update_iter(export.read_hold.since().iter().map(|t| (t.clone(), 1))); - let changes = staged_read_hold.update_iter(update.drain()); - update.extend(changes); - - // Make sure we also send `AllowCompaction` commands for sinks, - // which drives updating the sink's `as_of`, among other things. - let (changes, frontier, _cluster_id) = - exports_net.entry(key).or_insert_with(|| { - ( - >::new(), - Antichain::new(), - export.cluster_id(), - ) - }); - - changes.extend(update.drain()); - *frontier = staged_read_hold.frontier().to_owned(); - } else if let Some(collection) = self.collections.get_mut(&key) { - let ingestion = match &mut collection.extra_state { - CollectionStateExtra::Ingestion(ingestion) => ingestion, + if let Some(collection) = self.collections.get_mut(&key) { + match &mut collection.extra_state { + CollectionStateExtra::Ingestion(ingestion) => { + let changes = ingestion.read_capabilities.update_iter(update.drain()); + update.extend(changes); + + let (changes, frontier, _cluster_id) = + collections_net.entry(key).or_insert_with(|| { + ( + >::new(), + Antichain::new(), + ingestion.instance_id, + ) + }); + + changes.extend(update.drain()); + *frontier = ingestion.read_capabilities.frontier().to_owned(); + } CollectionStateExtra::None => { // WIP: See if this ever panics in ci. soft_panic_or_log!( @@ -2667,22 +2616,19 @@ where ); continue; } - }; + CollectionStateExtra::Export(export) => { + let changes = export.read_capabilities.update_iter(update.drain()); + update.extend(changes); - let changes = ingestion.read_capabilities.update_iter(update.drain()); - update.extend(changes); + let (changes, frontier, _cluster_id) = + collections_net.entry(key).or_insert_with(|| { + (>::new(), Antichain::new(), export.cluster_id) + }); - let (changes, frontier, _cluster_id) = - collections_net.entry(key).or_insert_with(|| { - ( - >::new(), - Antichain::new(), - ingestion.instance_id, - ) - }); - - changes.extend(update.drain()); - *frontier = ingestion.read_capabilities.frontier().to_owned(); + changes.extend(update.drain()); + *frontier = export.read_capabilities.frontier().to_owned(); + } + } } else { // This is confusing and we should probably error. tracing::warn!(id = ?key, ?update, "update_hold_capabilities for unknown object"); @@ -2691,11 +2637,7 @@ where // Translate our net compute actions into `AllowCompaction` commands and // downgrade persist sinces. The actual downgrades are performed by a Tokio - // task asynchorously. - // - // N.B. We only downgrade persist sinces for collections because - // exports/sinks don't have an associated collection. We still _do_ want - // to sent `AllowCompaction` commands to workers for them, though. + // task asynchronously. let mut worker_compaction_commands = BTreeMap::default(); for (key, (mut changes, frontier, cluster_id)) in collections_net { @@ -2709,8 +2651,11 @@ where .get_mut(&key) .expect("missing collection state"); - let ingestion = match &mut collection.extra_state { - CollectionStateExtra::Ingestion(ingestion) => ingestion, + let read_holds = match &mut collection.extra_state { + CollectionStateExtra::Ingestion(ingestion) => { + ingestion.dependency_read_holds.as_mut_slice() + } + CollectionStateExtra::Export(export) => export.read_holds.as_mut_slice(), CollectionStateExtra::None => { soft_panic_or_log!( "trying to downgrade read holds for collection which is not an \ @@ -2720,7 +2665,7 @@ where } }; - for read_hold in ingestion.dependency_read_holds.iter_mut() { + for read_hold in read_holds.iter_mut() { read_hold .try_downgrade(frontier.clone()) .expect("we only advance the frontier"); @@ -2729,18 +2674,6 @@ where worker_compaction_commands.insert(key, (frontier.clone(), cluster_id)); } } - for (key, (mut changes, frontier, cluster_id)) in exports_net { - if !changes.is_empty() { - let export_state = self.exports.get_mut(&key).expect("missing export state"); - - export_state - .read_hold - .try_downgrade(frontier.clone()) - .expect("we only advance the frontier"); - - worker_compaction_commands.insert(key, (frontier, cluster_id)); - } - } for (id, (read_frontier, cluster_id)) in worker_compaction_commands { // Acquiring a client for a storage instance requires await, so we @@ -2776,14 +2709,6 @@ where Ok(()) } - /// Iterate over exports that have not been dropped. - fn active_exports(&self) -> impl Iterator)> { - self.exports - .iter() - .filter(|(_id, e)| !e.is_dropped()) - .map(|(id, e)| (*id, e)) - } - /// Opens a write and critical since handles for the given `shard`. /// /// `since` is an optional `since` that the read handle will be forwarded to if it is less than @@ -2947,7 +2872,7 @@ where self.sink_statistics .lock() .expect("poisoned") - .retain(|k, _| self.exports.contains_key(k)); + .retain(|k, _| self.export(*k).is_ok()); } /// Appends a new global ID, shard ID pair to the appropriate collection. @@ -2995,7 +2920,7 @@ where fn determine_collection_dependencies( &self, self_id: GlobalId, - data_source: &DataSource, + data_source: &DataSource, ) -> Result, StorageError> { let dependency = match &data_source { DataSource::Introspection(_) @@ -3032,6 +2957,10 @@ where // track themselves and the remap shard as dependencies. vec![self_id, ingestion.remap_collection_id] } + DataSource::Sink { desc } => { + // Sinks hold back their own frontier and the frontier of their input. + vec![self_id, desc.sink.from] + } }; Ok(dependency) @@ -3057,7 +2986,7 @@ where for update in updates { let id = update.id; - if self.exports.contains_key(&id) { + if self.export(id).is_ok() { sink_status_updates.push(update); } else if self.storage_collections.check_exists(id).is_ok() { source_status_updates.push(update); @@ -3146,8 +3075,9 @@ where /// Runs the identified export using the current definition of the export /// that we have in memory. fn run_export(&mut self, id: GlobalId) -> Result<(), StorageError> { - let export = self.export(id)?; - let description = &export.description; + let DataSource::Sink { desc: description } = &self.collections[&id].data_source else { + return Err(StorageError::IdentifierMissing(id)); + }; info!( sink_id = %id, @@ -3208,26 +3138,13 @@ where let instance = self .collections .get(&id) - .and_then(|c| match &c.extra_state { - CollectionStateExtra::Ingestion(ingestion) => Some(ingestion), + .and_then(|collection_state| match &collection_state.extra_state { + CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id), + CollectionStateExtra::Export(export) => Some(export.cluster_id()), CollectionStateExtra::None => None, }) - .and_then(|i| self.instances.get(&i.instance_id)); - if let Some(instance) = instance { - for replica_id in instance.replica_ids() { - replica_frontiers.insert((id, replica_id), upper.clone()); - } - } - - global_frontiers.insert(id, (since, upper)); - } - - for (id, export) in self.active_exports() { - // Exports cannot be read from, so their `since` is always the empty frontier. - let since = Antichain::new(); - let upper = export.write_frontier.clone(); + .and_then(|i| self.instances.get(&i)); - let instance = self.instances.get(&export.cluster_id()); if let Some(instance) = instance { for replica_id in instance.replica_ids() { replica_frontiers.insert((id, replica_id), upper.clone()); @@ -3353,20 +3270,6 @@ where collection.wallclock_lag_metrics.observe(lag); } - let active_exports = self.exports.iter_mut().filter(|(_id, e)| !e.is_dropped()); - for (id, export) in active_exports { - let lag = frontier_lag(&export.write_frontier); - export.wallclock_lag_max = std::cmp::max(export.wallclock_lag_max, lag); - - if let Some(updates) = &mut introspection_updates { - let lag = std::mem::take(&mut export.wallclock_lag_max); - let row = pack_row(*id, lag); - updates.push((row, 1)); - } - - export.wallclock_lag_metrics.observe(lag); - } - if let Some(updates) = introspection_updates { self.append_introspection_updates(IntrospectionType::WallclockLagHistory, updates); self.wallclock_lag_last_refresh = Instant::now(); @@ -3481,7 +3384,7 @@ where #[derive(Debug)] struct CollectionState { /// The source of this collection's data. - pub data_source: DataSource, + pub data_source: DataSource, pub collection_metadata: CollectionMetadata, @@ -3497,6 +3400,7 @@ struct CollectionState { #[derive(Debug)] enum CollectionStateExtra { Ingestion(IngestionState), + Export(ExportState), None, } @@ -3614,3 +3518,17 @@ fn replica_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc< extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()), } } + +/// Replace one antichain with another, tracking the overall changes in the returned `ChangeBatch`. +fn swap_updates( + from: &mut Antichain, + mut replace_with: Antichain, +) -> ChangeBatch { + let mut update = ChangeBatch::new(); + if PartialOrder::less_equal(from, &replace_with) { + update.extend(replace_with.iter().map(|time| (time.clone(), 1))); + std::mem::swap(from, &mut replace_with); + update.extend(replace_with.iter().map(|time| (time.clone(), -1))); + } + update +} diff --git a/src/storage-types/src/controller.rs b/src/storage-types/src/controller.rs index 4bde9bf9b2695..02d2b98c43731 100644 --- a/src/storage-types/src/controller.rs +++ b/src/storage-types/src/controller.rs @@ -154,7 +154,7 @@ impl RustType for DurableCollectionMetadata { pub enum StorageError { /// The source identifier was re-created after having been dropped, /// or installed with a different description. - SourceIdReused(GlobalId), + CollectionIdReused(GlobalId), /// The sink identifier was re-created after having been dropped, or /// installed with a different description. SinkIdReused(GlobalId), @@ -228,7 +228,7 @@ pub enum StorageError { impl Error for StorageError { fn source(&self) -> Option<&(dyn Error + 'static)> { match self { - Self::SourceIdReused(_) => None, + Self::CollectionIdReused(_) => None, Self::SinkIdReused(_) => None, Self::IdentifierMissing(_) => None, Self::IdentifierInvalid(_) => None, @@ -260,7 +260,7 @@ impl fmt::Display for StorageError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.write_str("storage error: ")?; match self { - Self::SourceIdReused(id) => write!( + Self::CollectionIdReused(id) => write!( f, "source identifier was re-created after having been dropped: {id}" ), From 742eb50e2678ae41749aae5efab0a5c31829cf57 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Mon, 3 Feb 2025 13:43:13 -0500 Subject: [PATCH 6/9] Push the frontier along in the sink --- src/adapter/src/coord.rs | 10 ++++++- src/adapter/src/coord/ddl.rs | 2 +- src/storage-types/src/sinks.rs | 2 +- src/storage/src/sink/kafka.rs | 49 ++++++++++++++++++++++++++++++++ src/storage/src/storage_state.rs | 2 +- 5 files changed, 61 insertions(+), 4 deletions(-) diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 6f6c2bb4948ad..dc874699a65ee 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -2735,6 +2735,14 @@ impl Coordinator { } } CatalogItem::Sink(sink) => { + let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from); + let from_desc = storage_sink_from_entry + .desc(&self.catalog().resolve_full_name( + storage_sink_from_entry.name(), + storage_sink_from_entry.conn_id(), + )) + .expect("sinks can only be built on items with descs") + .into_owned(); let collection_desc = CollectionDescription { // TODO(sinks): make generic once we have more than one sink type. desc: KAFKA_PROGRESS_DESC.clone(), @@ -2742,7 +2750,7 @@ impl Coordinator { desc: ExportDescription { sink: StorageSinkDesc { from: sink.from, - from_desc: KAFKA_PROGRESS_DESC.clone(), + from_desc, connection: sink .connection .clone() diff --git a/src/adapter/src/coord/ddl.rs b/src/adapter/src/coord/ddl.rs index 1a42f5c71b8c2..cbec42f985001 100644 --- a/src/adapter/src/coord/ddl.rs +++ b/src/adapter/src/coord/ddl.rs @@ -1320,7 +1320,7 @@ impl Coordinator { storage_sink_from_entry.name(), storage_sink_from_entry.conn_id(), )) - .expect("indexes can only be built on items with descs") + .expect("sinks can only be built on items with descs") .into_owned(), connection: sink .connection diff --git a/src/storage-types/src/sinks.rs b/src/storage-types/src/sinks.rs index e7489302031ba..0f2fe70487639 100644 --- a/src/storage-types/src/sinks.rs +++ b/src/storage-types/src/sinks.rs @@ -76,7 +76,7 @@ impl AlterCompatible connection, envelope, version: _, - // The as of of the descriptions may differ. + // The as-of of the descriptions may differ. as_of: _, from_storage_metadata, partition_strategy, diff --git a/src/storage/src/sink/kafka.rs b/src/storage/src/sink/kafka.rs index 3402ed56782ae..1caebcafe97db 100644 --- a/src/storage/src/sink/kafka.rs +++ b/src/storage/src/sink/kafka.rs @@ -73,6 +73,7 @@ use std::cell::RefCell; use std::cmp::Ordering; use std::collections::BTreeMap; +use std::future::Future; use std::rc::Rc; use std::sync::atomic::AtomicU64; use std::sync::Arc; @@ -98,6 +99,9 @@ use mz_ore::error::ErrorExt; use mz_ore::future::InTask; use mz_ore::task::{self, AbortOnDropHandle}; use mz_ore::vec::VecExt; +use mz_persist_client::write::WriteHandle; +use mz_persist_client::Diagnostics; +use mz_persist_types::codec_impls::UnitSchema; use mz_repr::{Datum, DatumVec, Diff, GlobalId, Row, RowArena, Timestamp}; use mz_storage_client::sink::progress_key::ProgressKey; use mz_storage_types::configuration::StorageConfiguration; @@ -107,6 +111,7 @@ use mz_storage_types::errors::{ContextCreationError, ContextCreationErrorExt, Da use mz_storage_types::sinks::{ KafkaSinkConnection, KafkaSinkFormatType, SinkEnvelope, SinkPartitionStrategy, StorageSinkDesc, }; +use mz_storage_types::sources::SourceData; use mz_timely_util::antichain::AntichainExt; use mz_timely_util::builder_async::{ Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton, @@ -156,6 +161,23 @@ impl> SinkRender for KafkaSinkConnection { ) -> (Stream, Vec) { let mut scope = input.scope(); + let write_handle = { + let persist = Arc::clone(&storage_state.persist_clients); + let shard_meta = sink.to_storage_metadata.clone(); + async move { + let client = persist.open(shard_meta.persist_location).await?; + let handle = client + .open_writer( + shard_meta.data_shard, + Arc::new(shard_meta.relation_desc), + Arc::new(UnitSchema), + Diagnostics::from_purpose("sink handle"), + ) + .await?; + Ok(handle) + } + }; + let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum()))); storage_state .sink_write_frontiers @@ -186,6 +208,7 @@ impl> SinkRender for KafkaSinkConnection { sink, metrics, statistics, + write_handle, write_frontier, ); @@ -618,6 +641,8 @@ fn sink_collection>( sink: &StorageSinkDesc, metrics: KafkaSinkMetrics, statistics: SinkStatistics, + write_handle: impl Future>> + + 'static, write_frontier: Rc>>, ) -> (Stream, PressOnDropButton) { let scope = input.scope(); @@ -644,6 +669,8 @@ fn sink_collection>( ContextCreationError::Other(anyhow::anyhow!("synthetic error")) )); + let mut write_handle = write_handle.await?; + let metrics = Arc::new(metrics); let (mut producer, resume_upper) = TransactionalProducer::new( @@ -774,6 +801,28 @@ fn sink_collection>( debug!("{name}: committing transaction for {}", progress.pretty()); producer.commit_transaction(progress.clone()).await?; transaction_begun = false; + let mut expect_upper = write_handle.shared_upper(); + loop { + if PartialOrder::less_equal(&progress, &expect_upper) { + // The frontier has already been advanced as far as necessary. + break; + } + // TODO(sinks): include the high water mark in the output topic for + // the messages we've published, if and when we allow reads to the sink + // directly, to allow monitoring the progress of the sink in terms of + // the output system. + const EMPTY: &[((SourceData, ()), Timestamp, Diff)] = &[]; + match write_handle + .compare_and_append(EMPTY, expect_upper, progress.clone()) + .await + .expect("valid usage") + { + Ok(()) => break, + Err(mismatch) => { + expect_upper = mismatch.current; + } + } + } write_frontier.borrow_mut().clone_from(&progress); match progress.into_option() { Some(new_upper) => upper = new_upper, diff --git a/src/storage/src/storage_state.rs b/src/storage/src/storage_state.rs index d976b17cc3cd0..13473967f3b3e 100644 --- a/src/storage/src/storage_state.rs +++ b/src/storage/src/storage_state.rs @@ -967,7 +967,7 @@ impl<'w, A: Allocate> Worker<'w, A> { if let Some(prev_export) = prev { prev_export .alter_compatible(export.id, &export.description) - .expect("only alter compatible ingestions permitted"); + .expect("only alter compatible exports permitted"); } } } From c2261d89ddc0c030b1f57fec0104b6100e8babce Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Mon, 3 Feb 2025 16:52:17 -0500 Subject: [PATCH 7/9] Use the shard frontiers to choose snapshot status --- src/adapter/src/coord/sequencer/inner.rs | 2 +- src/storage-controller/src/lib.rs | 30 +++++++++++++++++------- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index 23555fbd1d36d..8cb2111184e79 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -3444,7 +3444,7 @@ impl Coordinator { ctx: ExecuteContext, plan: plan::AlterSinkPlan, ) { - // 1. Put a read hold on the new relation + // Put a read hold on the new relation let id_bundle = crate::CollectionIdBundle { storage_ids: BTreeSet::from_iter([plan.sink.from]), compute_ids: BTreeMap::new(), diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs index 40a067bce0d8c..e057b6095d837 100644 --- a/src/storage-controller/src/lib.rs +++ b/src/storage-controller/src/lib.rs @@ -3079,18 +3079,32 @@ where return Err(StorageError::IdentifierMissing(id)); }; + let from_storage_metadata = self + .storage_collections + .collection_metadata(description.sink.from)?; + let to_storage_metadata = self.storage_collections.collection_metadata(id)?; + + // Choose an as-of frontier for this execution of the sink. If the write frontier of the sink + // is strictly larger than its read hold, it must have at least written out its snapshot, and we can skip + // reading it; otherwise assume we may have to replay from the beginning. + let export_state = self.storage_collections.collection_frontiers(id)?; + let mut as_of = description.sink.as_of.clone(); + as_of.join_assign(&export_state.implied_capability); + let with_snapshot = if PartialOrder::less_than(&as_of, &export_state.write_frontier) { + false + } else { + description.sink.with_snapshot + }; + info!( sink_id = %id, from_id = %description.sink.from, - as_of = ?description.sink.as_of, + write_frontier = ?export_state.write_frontier, + ?as_of, + ?with_snapshot, "run_export" ); - let from_storage_metadata = self - .storage_collections - .collection_metadata(description.sink.from)?; - let to_storage_metadata = self.storage_collections.collection_metadata(id)?; - let cmd = RunSinkCommand { id, description: StorageSinkDesc { @@ -3098,11 +3112,11 @@ where from_desc: description.sink.from_desc.clone(), connection: description.sink.connection.clone(), envelope: description.sink.envelope, - as_of: description.sink.as_of.clone(), + as_of, version: description.sink.version, partition_strategy: description.sink.partition_strategy.clone(), from_storage_metadata, - with_snapshot: description.sink.with_snapshot, + with_snapshot, to_storage_metadata, }, }; From b759df4c065e56db5aeabe273a349c2a7c5b71c2 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Tue, 4 Feb 2025 14:57:40 -0500 Subject: [PATCH 8/9] REVERTME: Add some additional logging during ALTER SINK --- src/adapter/src/coord/sequencer/inner.rs | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index 8cb2111184e79..63d35606b2e43 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -92,7 +92,7 @@ use smallvec::SmallVec; use timely::progress::Antichain; use timely::progress::Timestamp as TimelyTimestamp; use tokio::sync::{oneshot, watch}; -use tracing::{warn, Instrument, Span}; +use tracing::{info, warn, Instrument, Span}; use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant}; use crate::command::{ExecuteResponse, Response}; @@ -3486,6 +3486,15 @@ impl Coordinator { } }; + info!( + "preparing alter sink for {}: frontiers={:?} export={:?}", + plan.global_id, + self.controller + .storage_collections + .collections_frontiers(vec![plan.global_id, plan.sink.from]), + self.controller.storage.export(plan.global_id) + ); + // Now we must wait for the sink to make enough progress such that there is overlap between // the new `from` collection's read hold and the sink's write frontier. self.install_storage_watch_set( @@ -3513,6 +3522,17 @@ impl Coordinator { return; } } + { + let plan = &ctx.plan; + info!( + "finishing alter sink for {}: frontiers={:?} export={:?}", + plan.global_id, + self.controller + .storage_collections + .collections_frontiers(vec![plan.global_id, plan.sink.from]), + self.controller.storage.export(plan.global_id) + ); + } let plan::AlterSinkPlan { item_id, From 237410d90655f1f2b2ae76a9a56670499f73fdad Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Thu, 6 Feb 2025 13:18:01 -0500 Subject: [PATCH 9/9] Allow with_snapshot to change when reconciling commands --- src/storage-types/src/sinks.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/storage-types/src/sinks.rs b/src/storage-types/src/sinks.rs index 0f2fe70487639..a1089c9cfd67a 100644 --- a/src/storage-types/src/sinks.rs +++ b/src/storage-types/src/sinks.rs @@ -92,7 +92,9 @@ impl AlterCompatible "connection", ), (envelope == &other.envelope, "envelope"), - (with_snapshot == &other.with_snapshot, "with_snapshot"), + // This can legally change from true to false once the snapshot has been + // written out. + (*with_snapshot || !other.with_snapshot, "with_snapshot"), ( partition_strategy == &other.partition_strategy, "partition_strategy",