diff --git a/src/adapter/src/catalog/transact.rs b/src/adapter/src/catalog/transact.rs index 4b4d6f0cf0bac..751e6054dc912 100644 --- a/src/adapter/src/catalog/transact.rs +++ b/src/adapter/src/catalog/transact.rs @@ -1057,8 +1057,10 @@ impl Catalog { CatalogItem::ContinualTask(ct) => { storage_collections_to_create.insert(ct.global_id()); } + CatalogItem::Sink(sink) => { + storage_collections_to_create.insert(sink.global_id()); + } CatalogItem::Log(_) - | CatalogItem::Sink(_) | CatalogItem::View(_) | CatalogItem::Index(_) | CatalogItem::Type(_) diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 2f26cff656e02..973c90b0396b5 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -146,12 +146,13 @@ use mz_sql::session::vars::SystemVars; use mz_sql_parser::ast::display::AstDisplay; use mz_sql_parser::ast::ExplainStage; use mz_storage_client::client::TableData; -use mz_storage_client::controller::{CollectionDescription, DataSource}; +use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription}; use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection}; use mz_storage_types::connections::Connection as StorageConnection; use mz_storage_types::connections::ConnectionContext; use mz_storage_types::read_holds::ReadHold; -use mz_storage_types::sinks::S3SinkFormat; +use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc}; +use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC; use mz_storage_types::sources::Timeline; use mz_timestamp_oracle::postgres_oracle::{ PostgresTimestampOracle, PostgresTimestampOracleConfig, @@ -2101,6 +2102,11 @@ impl Coordinator { self.create_storage_export(sink.global_id(), sink) .await .unwrap_or_terminate("cannot fail to create exports"); + policies_to_set + .entry(CompactionWindow::Default) + .or_insert_with(Default::default) + .storage_ids + .insert(sink.global_id()); } CatalogItem::Connection(catalog_connection) => { if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details { @@ -2754,6 +2760,36 @@ impl Coordinator { collections.push((ct.global_id(), collection_desc)); } } + CatalogItem::Sink(sink) => { + let collection_desc = CollectionDescription { + // TODO(sinks): make generic once we have more than one sink type. + desc: KAFKA_PROGRESS_DESC.clone(), + data_source: DataSource::Sink { + desc: ExportDescription { + sink: StorageSinkDesc { + from: sink.from, + from_desc: KAFKA_PROGRESS_DESC.clone(), + connection: sink + .connection + .clone() + .into_inline_connection(self.catalog().state()), + partition_strategy: sink.partition_strategy.clone(), + envelope: sink.envelope, + as_of: Antichain::from_elem(Timestamp::minimum()), + with_snapshot: sink.with_snapshot, + version: sink.version, + from_storage_metadata: (), + to_storage_metadata: (), + }, + instance_id: sink.cluster_id, + }, + }, + since: None, + status_collection_id: None, + timeline: None, + }; + collections.push((sink.global_id, collection_desc)); + } _ => (), } } diff --git a/src/adapter/src/coord/ddl.rs b/src/adapter/src/coord/ddl.rs index 5a285902678eb..c2e92fb52ed18 100644 --- a/src/adapter/src/coord/ddl.rs +++ b/src/adapter/src/coord/ddl.rs @@ -14,6 +14,16 @@ use std::collections::{BTreeMap, BTreeSet}; use std::sync::Arc; use std::time::Duration; +use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason}; +use crate::catalog::{DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult}; +use crate::coord::appends::BuiltinTableAppendNotify; +use crate::coord::timeline::{TimelineContext, TimelineState}; +use crate::coord::{Coordinator, ReplicaMetadata}; +use crate::session::{Session, Transaction, TransactionOps}; +use crate::statement_logging::StatementEndedExecutionReason; +use crate::telemetry::{EventDetails, SegmentClientExt}; +use crate::util::ResultExt; +use crate::{catalog, flags, AdapterError, TimestampProvider}; use fail::fail_point; use futures::Future; use maplit::{btreemap, btreeset}; @@ -46,25 +56,15 @@ use mz_sql::session::vars::{ MAX_REPLICAS_PER_CLUSTER, MAX_ROLES, MAX_SCHEMAS_PER_DATABASE, MAX_SECRETS, MAX_SINKS, MAX_SOURCES, MAX_TABLES, }; -use mz_storage_client::controller::ExportDescription; +use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription}; use mz_storage_types::connections::inline::IntoInlineConnection; use mz_storage_types::connections::PostgresConnection; use mz_storage_types::read_policy::ReadPolicy; +use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC; use mz_storage_types::sources::GenericSourceConnection; use serde_json::json; use tracing::{event, info_span, warn, Instrument, Level}; -use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason}; -use crate::catalog::{DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult}; -use crate::coord::appends::BuiltinTableAppendNotify; -use crate::coord::timeline::{TimelineContext, TimelineState}; -use crate::coord::{Coordinator, ReplicaMetadata}; -use crate::session::{Session, Transaction, TransactionOps}; -use crate::statement_logging::StatementEndedExecutionReason; -use crate::telemetry::{EventDetails, SegmentClientExt}; -use crate::util::ResultExt; -use crate::{catalog, flags, AdapterError, TimestampProvider}; - impl Coordinator { /// Same as [`Self::catalog_transact_conn`] but takes a [`Session`]. #[instrument(name = "coord::catalog_transact")] @@ -1056,9 +1056,10 @@ impl Coordinator { } pub(crate) fn drop_storage_sinks(&mut self, sink_gids: Vec) { + let storage_metadata = self.catalog.state().storage_metadata(); self.controller .storage - .drop_sinks(sink_gids) + .drop_sinks(storage_metadata, sink_gids) .unwrap_or_terminate("cannot fail to drop sinks"); } @@ -1286,9 +1287,6 @@ impl Coordinator { id: GlobalId, sink: &Sink, ) -> Result<(), AdapterError> { - // Validate `sink.from` is in fact a storage collection - self.controller.storage.check_exists(sink.from)?; - // The AsOf is used to determine at what time to snapshot reading from // the persist collection. This is primarily relevant when we do _not_ // want to include the snapshot in the sink. @@ -1308,6 +1306,7 @@ impl Coordinator { // TODO: Maybe in the future, pass those holds on to storage, to hold on // to them and downgrade when possible? let read_holds = self.acquire_read_holds(&id_bundle); + let as_of = self.least_valid_read(&read_holds); let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from); @@ -1330,25 +1329,40 @@ impl Coordinator { with_snapshot: sink.with_snapshot, version: sink.version, from_storage_metadata: (), + to_storage_metadata: (), }; - let res = self - .controller - .storage - .create_exports(vec![( - id, - ExportDescription { + let collection_desc = CollectionDescription { + // TODO(sinks): make generic once we have more than one sink type. + desc: KAFKA_PROGRESS_DESC.clone(), + data_source: DataSource::Sink { + desc: ExportDescription { sink: storage_sink_desc, instance_id: sink.cluster_id, }, - )]) - .await; + }, + since: None, + status_collection_id: None, + timeline: None, + }; + let collections = vec![(id, collection_desc)]; + + // Create the collections. + let storage_metadata = self.catalog.state().storage_metadata(); + self.controller + .storage + .create_collections(storage_metadata, None, collections) + .await + .unwrap_or_terminate("cannot fail to create collections"); + + // Validate `sink.from` is in fact a storage collection + self.controller.storage.check_exists(sink.from)?; // Drop read holds after the export has been created, at which point // storage will have put in its own read holds. drop(read_holds); - Ok(res?) + Ok(()) } /// Validate all resource limits in a catalog transaction and return an error if that limit is diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index 113cf32f1ef99..23555fbd1d36d 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -21,15 +21,22 @@ use futures::{future, Future}; use itertools::Itertools; use maplit::btreeset; use mz_adapter_types::compaction::CompactionWindow; +use mz_adapter_types::connection::ConnectionId; +use mz_catalog::memory::objects::{ + CatalogItem, Cluster, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type, +}; use mz_cloud_resources::VpcEndpointConfig; use mz_controller_types::ReplicaId; use mz_expr::{ CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing, }; +use mz_ore::cast::CastFrom; use mz_ore::collections::{CollectionExt, HashSet}; use mz_ore::task::{self, spawn, JoinHandle}; use mz_ore::tracing::OpenTelemetryContext; use mz_ore::vec::VecExt; +use mz_ore::{assert_none, instrument}; +use mz_persist_client::stats::SnapshotPartStats; use mz_repr::adt::jsonb::Jsonb; use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap}; use mz_repr::explain::json::json_string; @@ -39,6 +46,7 @@ use mz_repr::{ CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, RelationVersion, RelationVersionSelector, Row, RowArena, RowIterator, Timestamp, }; +use mz_sql::ast::AlterSourceAddSubsourceOption; use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName}; use mz_sql::catalog::{ CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError, @@ -52,17 +60,7 @@ use mz_sql::names::{ use mz_sql::plan::{ConnectionDetails, NetworkPolicyRule, StatementContext}; use mz_sql::pure::{generate_subsource_statements, PurifiedSourceExport}; use mz_storage_types::sinks::StorageSinkDesc; -use smallvec::SmallVec; -use timely::progress::Timestamp as TimelyTimestamp; // Import `plan` module, but only import select elements to avoid merge conflicts on use statements. -use mz_adapter_types::connection::ConnectionId; -use mz_catalog::memory::objects::{ - CatalogItem, Cluster, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type, -}; -use mz_ore::cast::CastFrom; -use mz_ore::{assert_none, instrument}; -use mz_persist_client::stats::SnapshotPartStats; -use mz_sql::ast::AlterSourceAddSubsourceOption; use mz_sql::plan::{ AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan, Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption, @@ -90,7 +88,9 @@ use mz_storage_types::AlterCompatible; use mz_transform::dataflow::DataflowMetainfo; use mz_transform::notice::{OptimizerNoticeApi, OptimizerNoticeKind, RawOptimizerNotice}; use mz_transform::EmptyStatisticsOracle; +use smallvec::SmallVec; use timely::progress::Antichain; +use timely::progress::Timestamp as TimelyTimestamp; use tokio::sync::{oneshot, watch}; use tracing::{warn, Instrument, Span}; @@ -1349,6 +1349,9 @@ impl Coordinator { .await .unwrap_or_terminate("cannot fail to create exports"); + self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default) + .await; + ctx.retire(Ok(ExecuteResponse::CreatedSink)) } @@ -3527,7 +3530,12 @@ impl Coordinator { .expect("sink known to exist") .write_frontier; let as_of = ctx.read_hold.least_valid_read(); - assert!(write_frontier.iter().all(|t| as_of.less_than(t))); + assert!( + write_frontier.iter().all(|t| as_of.less_than(t)), + "{:?} should be strictly less than {:?}", + &*as_of, + &**write_frontier + ); let catalog_sink = Sink { create_sql: sink.create_sql, @@ -3576,6 +3584,7 @@ impl Coordinator { version: sink.version, partition_strategy: sink.partition_strategy, from_storage_metadata: (), + to_storage_metadata: (), }; self.controller diff --git a/src/catalog/src/memory/objects.rs b/src/catalog/src/memory/objects.rs index d7992f6f4889b..5b257f3fbada4 100644 --- a/src/catalog/src/memory/objects.rs +++ b/src/catalog/src/memory/objects.rs @@ -1614,9 +1614,9 @@ impl CatalogItem { CatalogItem::Table(_) | CatalogItem::Source(_) | CatalogItem::MaterializedView(_) + | CatalogItem::Sink(_) | CatalogItem::ContinualTask(_) => true, CatalogItem::Log(_) - | CatalogItem::Sink(_) | CatalogItem::View(_) | CatalogItem::Index(_) | CatalogItem::Type(_) diff --git a/src/storage-client/src/controller.rs b/src/storage-client/src/controller.rs index d8970294310cf..f6e8486f0cab2 100644 --- a/src/storage-client/src/controller.rs +++ b/src/storage-client/src/controller.rs @@ -27,8 +27,8 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; +use differential_dataflow::lattice::Lattice; use mz_cluster_client::client::ClusterReplicaLocation; -use mz_cluster_client::metrics::WallclockLagMetrics; use mz_cluster_client::ReplicaId; use mz_persist_client::batch::ProtoBatch; use mz_persist_types::{Codec64, Opaque, ShardId}; @@ -47,6 +47,7 @@ use mz_storage_types::sources::{ SourceExportDetails, Timeline, }; use serde::{Deserialize, Serialize}; +use timely::progress::frontier::MutableAntichain; use timely::progress::Timestamp as TimelyTimestamp; use timely::progress::{Antichain, Timestamp}; use tokio::sync::{mpsc, oneshot}; @@ -96,7 +97,7 @@ pub enum IntrospectionType { /// Describes how data is written to the collection. #[derive(Clone, Debug, Eq, PartialEq)] -pub enum DataSource { +pub enum DataSource { /// Ingest data from some external source. Ingestion(IngestionDescription), /// This source receives its data from the identified ingestion, @@ -122,9 +123,11 @@ pub enum DataSource { /// `storage-controller` we the primary as a dependency. primary: Option, }, - /// This source's data is does not need to be managed by the storage + /// This source's data does not need to be managed by the storage /// controller, e.g. it's a materialized view or the catalog collection. Other, + /// This collection is the output collection of a sink. + Sink { desc: ExportDescription }, } /// Describes a request to create a source. @@ -133,7 +136,7 @@ pub struct CollectionDescription { /// The schema of this collection pub desc: RelationDesc, /// The source of this collection's data. - pub data_source: DataSource, + pub data_source: DataSource, /// An optional frontier to which the collection's `since` should be advanced. pub since: Option>, /// A GlobalId to use for this collection to use for the status collection. @@ -168,7 +171,7 @@ pub enum Response { } /// Metadata that the storage controller must know to properly handle the life -/// cycle of creating and dropping collections.j +/// cycle of creating and dropping collections. /// /// This data should be kept consistent with the state modified using /// [`StorageTxn`]. @@ -485,12 +488,6 @@ pub trait StorageController: Debug { id: GlobalId, ) -> Result<&mut ExportState, StorageError>; - /// Create the sinks described by the `ExportDescription`. - async fn create_exports( - &mut self, - exports: Vec<(GlobalId, ExportDescription)>, - ) -> Result<(), StorageError>; - /// Create a oneshot ingestion. async fn create_oneshot_ingestion( &mut self, @@ -532,6 +529,7 @@ pub trait StorageController: Debug { /// Drops the read capability for the sinks and allows their resources to be reclaimed. fn drop_sinks( &mut self, + storage_metadata: &StorageMetadata, identifiers: Vec, ) -> Result<(), StorageError>; @@ -545,7 +543,11 @@ pub trait StorageController: Debug { /// created, but have been forgotten by the controller due to a restart. /// Once command history becomes durable we can remove this method and use the normal /// `drop_sinks`. - fn drop_sinks_unvalidated(&mut self, identifiers: Vec); + fn drop_sinks_unvalidated( + &mut self, + storage_metadata: &StorageMetadata, + identifiers: Vec, + ); /// Drops the read capability for the sources and allows their resources to be reclaimed. /// @@ -659,7 +661,7 @@ pub trait StorageController: Debug { >; } -impl DataSource { +impl DataSource { /// Returns true if the storage controller manages the data shard for this /// source using txn-wal. pub fn in_txns(&self) -> bool { @@ -671,6 +673,7 @@ impl DataSource { | DataSource::Introspection(_) | DataSource::Progress | DataSource::Webhook => false, + DataSource::Sink { .. } => false, } } } @@ -710,51 +713,61 @@ impl From for PersistEpoch { /// State maintained about individual exports. #[derive(Debug)] pub struct ExportState { - /// Description with which the export was created - pub description: ExportDescription, + /// Really only for keeping track of changes to the `derived_since`. + pub read_capabilities: MutableAntichain, + + /// The cluster this export is associated with. + pub cluster_id: StorageInstanceId, + + /// The current since frontier, derived from `write_frontier` using + /// `hold_policy`. + pub derived_since: Antichain, - /// The read hold that this export has on its dependencies (inputs). When + /// The read holds that this export has on its dependencies (its input and itself). When /// the upper of the export changes, we downgrade this, which in turn /// downgrades holds we have on our dependencies' sinces. - pub read_hold: ReadHold, + pub read_holds: [ReadHold; 2], /// The policy to use to downgrade `self.read_capability`. pub read_policy: ReadPolicy, /// Reported write frontier. pub write_frontier: Antichain, - - /// Maximum frontier wallclock lag since the last introspection update. - pub wallclock_lag_max: Duration, - /// Frontier wallclock lag metrics tracked for this collection. - pub wallclock_lag_metrics: WallclockLagMetrics, } impl ExportState { pub fn new( - description: ExportDescription, + cluster_id: StorageInstanceId, read_hold: ReadHold, + self_hold: ReadHold, read_policy: ReadPolicy, - wallclock_lag_metrics: WallclockLagMetrics, - ) -> Self { + ) -> Self + where + T: Lattice, + { + let write_frontier = Antichain::from_elem(Timestamp::minimum()); + let mut dependency_since = Antichain::from_elem(T::minimum()); + for read_hold in [&read_hold, &self_hold] { + dependency_since.join_assign(read_hold.since()); + } Self { - description, - read_hold, + read_capabilities: MutableAntichain::from(dependency_since.borrow()), + cluster_id, + derived_since: dependency_since, + read_holds: [read_hold, self_hold], read_policy, - write_frontier: Antichain::from_elem(Timestamp::minimum()), - wallclock_lag_max: Default::default(), - wallclock_lag_metrics, + write_frontier, } } /// Returns the cluster to which the export is bound. pub fn cluster_id(&self) -> StorageInstanceId { - self.description.instance_id + self.cluster_id } /// Returns whether the export was dropped. pub fn is_dropped(&self) -> bool { - self.read_hold.since().is_empty() + self.read_holds.iter().all(|h| h.since().is_empty()) } } /// A channel that allows you to append a set of updates to a pre-defined [`GlobalId`]. diff --git a/src/storage-client/src/storage_collections.rs b/src/storage-client/src/storage_collections.rs index 9f0460965fc14..06929da26446f 100644 --- a/src/storage-client/src/storage_collections.rs +++ b/src/storage-client/src/storage_collections.rs @@ -343,6 +343,7 @@ impl SnapshotCursor { } /// Frontiers of the collection identified by `id`. +#[derive(Debug)] pub struct CollectionFrontiers { /// The [GlobalId] of the collection that these frontiers belong to. pub id: GlobalId, @@ -850,7 +851,7 @@ where fn determine_collection_dependencies( &self, self_collections: &BTreeMap>, - data_source: &DataSource, + data_source: &DataSource, ) -> Result, StorageError> { let dependencies = match &data_source { DataSource::Introspection(_) @@ -882,6 +883,7 @@ where } // Ingestions depend on their remap collection. DataSource::Ingestion(ingestion) => vec![ingestion.remap_collection_id], + DataSource::Sink { desc } => vec![desc.sink.from], }; Ok(dependencies) @@ -1134,7 +1136,7 @@ where collections: &mut BTreeMap>, policies: Vec<(GlobalId, ReadPolicy)>, ) { - trace!("set_read_policies: {:?}", policies); + info!("set_read_policies: {:?}", policies); let mut read_capability_changes = BTreeMap::default(); @@ -1163,7 +1165,7 @@ where for (id, changes) in read_capability_changes.iter() { if id.is_user() { - trace!(%id, ?changes, "in set_read_policies, capability changes"); + info!(%id, ?changes, "in set_read_policies, capability changes"); } } @@ -1195,7 +1197,7 @@ where let mut update = updates.remove(&id).unwrap(); if id.is_user() { - trace!(id = ?id, update = ?update, "update_read_capabilities"); + info!(id = ?id, update = ?update, "update_read_capabilities"); } let collection = if let Some(c) = collections.get_mut(&id) { @@ -1240,8 +1242,10 @@ where update.extend(changes); if id.is_user() { - trace!( + info!( %id, + ?current_read_capabilities, + ?collection.read_capabilities, ?collection.storage_dependencies, ?update, "forwarding update to storage dependencies"); @@ -1780,6 +1784,7 @@ where | DataSource::Ingestion(_) | DataSource::Progress | DataSource::Other => {} + DataSource::Sink { .. } => {} DataSource::Table { .. } => { let register_ts = register_ts.expect( "caller should have provided a register_ts when creating a table", @@ -1869,7 +1874,7 @@ where let initial_since = match storage_dependencies .iter() .at_most_one() - .expect("should have at most one depdendency") + .expect("should have at most one dependency") { Some(dep) => { let dependency_collection = self_collections @@ -1996,6 +2001,9 @@ where DataSource::Ingestion(_) => { self_collections.insert(id, collection_state); } + DataSource::Sink { .. } => { + self_collections.insert(id, collection_state); + } } self.register_handles(id, is_in_txns(id, &metadata), since_handle, write_handle); @@ -2315,8 +2323,6 @@ where storage_metadata: &StorageMetadata, identifiers: Vec, ) { - debug!(?identifiers, "drop_collections_unvalidated"); - let mut self_collections = self.collections.lock().expect("lock poisoned"); for id in identifiers.iter() { @@ -2332,6 +2338,10 @@ where None => continue, }; + if matches!(dropped_data_source, DataSource::Sink { .. }) { + dbg!(id, &metadata); + } + // If we are dropping source exports, we need to modify the // ingestion that it runs on. if let DataSource::IngestionExport { ingestion_id, .. } = dropped_data_source { @@ -2508,6 +2518,7 @@ where } // Materialized views, continual tasks, etc, aren't managed by storage. Other => {} + Sink { .. } => {} // FIXME: is this right? }; } Ok(result) diff --git a/src/storage-controller/src/history.rs b/src/storage-controller/src/history.rs index 2e5b3a143fef7..e068e55e842b2 100644 --- a/src/storage-controller/src/history.rs +++ b/src/storage-controller/src/history.rs @@ -406,6 +406,22 @@ mod tests { ), txns_shard: Default::default(), }, + to_storage_metadata: CollectionMetadata { + persist_location: PersistLocation { + blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"), + consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"), + }, + remap_shard: Default::default(), + data_shard: Default::default(), + relation_desc: RelationDesc::new( + RelationType { + column_types: Default::default(), + keys: Default::default(), + }, + Vec::::new(), + ), + txns_shard: Default::default(), + }, } } diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs index 7263d4c3ffcab..6449687961abb 100644 --- a/src/storage-controller/src/lib.rs +++ b/src/storage-controller/src/lib.rs @@ -142,8 +142,6 @@ pub struct Controller + Tim /// This is to prevent the re-binding of identifiers to other descriptions. pub(crate) collections: BTreeMap>, - pub(crate) exports: BTreeMap>, - /// Write handle for table shards. pub(crate) persist_table_worker: persist_handles::PersistTableWriteWorker, /// A shared TxnsCache running in a task and communicated with over a channel. @@ -358,6 +356,11 @@ where match &collection.extra_state { CollectionStateExtra::Ingestion(ingestion_state) => Ok(ingestion_state.hydrated), + CollectionStateExtra::Export(_) => { + // For now, sinks are always considered hydrated. + // TODO(sinks): base this off of the sink shard's frontier? + Ok(true) + } CollectionStateExtra::None => { // For now, objects that are not ingestions are always // considered hydrated. @@ -373,37 +376,15 @@ where (Antichain, Antichain), StorageError, > { - Ok(match self.export(id) { - Ok(export) => ( - export.read_hold.since().clone(), - export.write_frontier.clone(), - ), - Err(_) => { - let frontiers = self.storage_collections.collection_frontiers(id)?; - (frontiers.implied_capability, frontiers.write_frontier) - } - }) + let frontiers = self.storage_collections.collection_frontiers(id)?; + Ok((frontiers.implied_capability, frontiers.write_frontier)) } fn collections_frontiers( &self, - mut ids: Vec, + ids: Vec, ) -> Result, Antichain)>, StorageError> { - // The ids might be either normal collections or exports. Both have frontiers that might be - // interesting to external observers. let mut result = vec![]; - ids.retain(|&id| match self.export(id) { - Ok(export) => { - result.push(( - id, - export.read_hold.since().clone(), - export.write_frontier.clone(), - )); - false - } - Err(_) => true, - }); - result.extend( self.storage_collections .collections_frontiers(ids)? @@ -732,6 +713,7 @@ where // statistics. let mut new_source_statistic_entries = BTreeSet::new(); let mut new_webhook_statistic_entries = BTreeSet::new(); + let mut new_sink_statistic_entries = BTreeSet::new(); for (id, description, write, metadata) in to_register { let is_in_txns = |id, metadata: &CollectionMetadata| { @@ -941,6 +923,27 @@ where new_source_statistic_entries.insert(id); } + DataSource::Sink { desc } => { + let mut dependency_since = Antichain::from_elem(T::minimum()); + for read_hold in dependency_read_holds.iter() { + dependency_since.join_assign(read_hold.since()); + } + + let [self_hold, read_hold] = + dependency_read_holds.try_into().expect("two holds"); + + let state = ExportState::new( + desc.instance_id, + read_hold, + self_hold, + // TODO: really? + ReadPolicy::step_back(), + ); + maybe_instance_id = Some(state.cluster_id); + extra_state = CollectionStateExtra::Export(state); + + new_sink_statistic_entries.insert(id); + } } let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(id, maybe_instance_id); @@ -962,6 +965,7 @@ where // statistics scrapers, but in the interest of safety, avoid overriding existing // statistics values. let mut source_statistics = self.source_statistics.lock().expect("poisoned"); + let mut sink_statistics = self.sink_statistics.lock().expect("poisoned"); for id in new_source_statistic_entries { source_statistics @@ -972,6 +976,11 @@ where for id in new_webhook_statistic_entries { source_statistics.webhook_statistics.entry(id).or_default(); } + for id in new_sink_statistic_entries { + sink_statistics + .entry(id) + .or_insert(StatsState::new(SinkStatisticsUpdate::new(id))); + } } // Register the tables all in one batch. @@ -1021,6 +1030,11 @@ where "ingestion exports do not execute directly, but instead schedule their source to be re-executed" ), DataSource::Introspection(_) | DataSource::Webhook | DataSource::Table { .. } | DataSource::Progress | DataSource::Other => {} + DataSource::Sink { .. } => { + if !self.read_only { + self.run_export(id)?; + } + } }; } @@ -1300,8 +1314,12 @@ where &self, id: GlobalId, ) -> Result<&ExportState, StorageError> { - self.exports + self.collections .get(&id) + .and_then(|c| match &c.extra_state { + CollectionStateExtra::Export(state) => Some(state), + _ => None, + }) .ok_or(StorageError::IdentifierMissing(id)) } @@ -1309,83 +1327,15 @@ where &mut self, id: GlobalId, ) -> Result<&mut ExportState, StorageError> { - self.exports + self.collections .get_mut(&id) + .and_then(|c| match &mut c.extra_state { + CollectionStateExtra::Export(state) => Some(state), + _ => None, + }) .ok_or(StorageError::IdentifierMissing(id)) } - async fn create_exports( - &mut self, - exports: Vec<(GlobalId, ExportDescription)>, - ) -> Result<(), StorageError> { - // Validate first, to avoid corrupting state. - let mut dedup = BTreeMap::new(); - for (id, desc) in exports.iter() { - if dedup.insert(id, desc).is_some() { - return Err(StorageError::SinkIdReused(*id)); - } - if let Ok(export) = self.export(*id) { - if &export.description != desc { - return Err(StorageError::SinkIdReused(*id)); - } - } - } - - for (id, description) in exports { - let from_id = description.sink.from; - - // Acquire read holds at StorageCollections to ensure that the - // sinked collection is not dropped while we're sinking it. - let desired_read_holds = vec![from_id.clone()]; - let read_hold = self - .storage_collections - .acquire_read_holds(desired_read_holds) - .expect("missing dependency") - .into_element(); - - info!( - sink_id = id.to_string(), - from_id = from_id.to_string(), - acquired_read_hold = ?read_hold, - "sink acquired read holds" - ); - let read_policy = ReadPolicy::step_back(); - - info!( - sink_id = id.to_string(), - from_id = from_id.to_string(), - as_of = ?description.sink.as_of, - "create_exports: creating sink" - ); - - let wallclock_lag_metrics = self - .metrics - .wallclock_lag_metrics(id, Some(description.instance_id)); - - let export_state = ExportState::new( - description.clone(), - read_hold, - read_policy, - wallclock_lag_metrics, - ); - self.exports.insert(id, export_state); - - // Just like with `new_source_statistic_entries`, we can probably - // `insert` here, but in the interest of safety, never override - // existing values. - self.sink_statistics - .lock() - .expect("poisoned") - .entry(id) - .or_insert(StatsState::new(SinkStatisticsUpdate::new(id))); - - if !self.read_only { - self.run_export(id)?; - } - } - Ok(()) - } - /// Create a oneshot ingestion. async fn create_oneshot_ingestion( &mut self, @@ -1433,38 +1383,33 @@ where // Acquire read holds at StorageCollections to ensure that the // sinked collection is not dropped while we're sinking it. - let desired_read_holds = vec![from_id.clone()]; - let read_hold = self + let desired_read_holds = vec![from_id.clone(), id.clone()]; + let read_holds: [ReadHold; 2] = self .storage_collections .acquire_read_holds(desired_read_holds) .expect("missing dependency") - .into_element(); + .try_into() + .expect("expected number of holds"); let from_storage_metadata = self.storage_collections.collection_metadata(from_id)?; + let to_storage_metadata = self.storage_collections.collection_metadata(id)?; // Check whether the sink's write frontier is beyond the read hold we got - let cur_export = self - .exports - .get_mut(&id) - .ok_or_else(|| StorageError::IdentifierMissing(id))?; + let cur_export = self.export_mut(id)?; let input_readable = cur_export .write_frontier .iter() - .all(|t| read_hold.since().less_than(t)); + .all(|t| read_holds[0].since().less_than(t)); if !input_readable { return Err(StorageError::ReadBeforeSince(from_id)); } - let wallclock_lag_metrics = self - .metrics - .wallclock_lag_metrics(id, Some(new_description.instance_id)); - let new_export = ExportState { - description: new_description.clone(), - read_hold, + read_capabilities: cur_export.read_capabilities.clone(), + cluster_id: new_description.instance_id, + derived_since: cur_export.derived_since.clone(), + read_holds, read_policy: cur_export.read_policy.clone(), write_frontier: cur_export.write_frontier.clone(), - wallclock_lag_max: Default::default(), - wallclock_lag_metrics, }; *cur_export = new_export; @@ -1480,6 +1425,7 @@ where partition_strategy: new_description.sink.partition_strategy, from_storage_metadata, with_snapshot: new_description.sink.with_snapshot, + to_storage_metadata, }, }; @@ -1512,10 +1458,16 @@ where // update that because `ExportState` is not clone, because it holds // a `ReadHandle` and cloning that would cause additional work for // whoever guarantees those read holds. - let (mut new_export_description, as_of) = { - let export = self.export(id).expect("export exists"); - let export_description = export.description.clone(); - let as_of = export.read_hold.since().clone(); + let (mut new_export_description, as_of): (ExportDescription, _) = { + let export = &self.collections[&id]; + let DataSource::Sink { desc } = &export.data_source else { + panic!("yikes...") + }; + let CollectionStateExtra::Export(state) = &export.extra_state else { + panic!("yikes...") + }; + let export_description = desc.clone(); + let as_of = state.read_holds[0].since().clone(); (export_description, as_of) }; @@ -1529,6 +1481,7 @@ where let from_storage_metadata = self .storage_collections .collection_metadata(new_export_description.sink.from)?; + let to_storage_metadata = self.storage_collections.collection_metadata(id)?; let cmd = RunSinkCommand { id, @@ -1552,6 +1505,7 @@ where // TODO(petrosagg): change the controller to explicitly track dataflow executions as_of: as_of.to_owned(), from_storage_metadata, + to_storage_metadata, }, }; @@ -1585,8 +1539,14 @@ where // Update state only after all possible errors have occurred. for (id, new_export_description) in export_updates { - let export = self.export_mut(id).expect("export known to exist"); - export.description = new_export_description; + let got = self + .collections + .get_mut(&id) + .expect("export known to exist"); + let DataSource::Sink { desc } = &mut got.data_source else { + panic!("export known to exist") + }; + *desc = new_export_description; } } @@ -1731,6 +1691,7 @@ where ingestions_to_drop.insert(id); } DataSource::Other | DataSource::Introspection(_) | DataSource::Progress => (), + DataSource::Sink { .. } => {} } } } @@ -1768,27 +1729,50 @@ where /// Drops the read capability for the sinks and allows their resources to be reclaimed. fn drop_sinks( &mut self, + storage_metadata: &StorageMetadata, identifiers: Vec, ) -> Result<(), StorageError> { self.validate_export_ids(identifiers.iter().cloned())?; - self.drop_sinks_unvalidated(identifiers); + self.drop_sinks_unvalidated(storage_metadata, identifiers); Ok(()) } - fn drop_sinks_unvalidated(&mut self, identifiers: Vec) { - for id in identifiers { - // Already removed. - if self.export(id).is_err() { - continue; - } + fn drop_sinks_unvalidated( + &mut self, + storage_metadata: &StorageMetadata, + mut identifiers: Vec, + ) { + dbg!(&identifiers); - // We don't explicitly remove read capabilities! Downgrading the - // frontier of the sink to `[]` (the empty Antichain), will - // propagate to the storage dependencies. + // Ignore exports that have already been removed. + identifiers.retain(|id| self.export(*id).is_ok()); - // Remove sink by removing its write frontier and arranging for deprovisioning. - self.update_write_frontiers(&[(id, Antichain::new())]); - } + // TODO: ideally we'd advance the write frontier ourselves here, but this function's + // not yet marked async. + + // let write_frontiers: Vec<_> = identifiers + // .iter() + // .map(|id| (*id, Antichain::new())) + // .collect(); + // self.update_write_frontiers(write_frontiers.as_slice()); + + // We don't explicitly remove read capabilities! Downgrading the + // frontier of the source to `[]` (the empty Antichain), will propagate + // to the storage dependencies. + let drop_policy = identifiers + .iter() + .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new()))) + .collect(); + + tracing::debug!( + ?drop_policy, + "dropping sources by setting read hold policies" + ); + self.set_hold_policies(drop_policy); + + // Also let StorageCollections know! + self.storage_collections + .drop_collections_unvalidated(storage_metadata, identifiers); } #[instrument(level = "debug")] @@ -1905,17 +1889,6 @@ where if let Some(_collection) = self.collections.remove(id) { // Nothing to do, we already dropped read holds in // `drop_sources_unvalidated`. - } else if let Some(export) = self.exports.get_mut(id) { - // TODO: Current main never drops export state, so we - // also don't do that, because it would be yet more - // refactoring. Instead, we downgrade to the empty - // frontier, which satisfies StorageCollections just as - // much. - tracing::info!("downgrading read hold of export {id} to empty frontier!"); - export - .read_hold - .try_downgrade(Antichain::new()) - .expect("must be possible"); } else { soft_panic_or_log!( "DroppedIds for ID {id} but we have neither ingestion nor export \ @@ -1985,6 +1958,9 @@ where ingestion_state.hydrated = true; } } + CollectionStateExtra::Export(_) => { + // TODO(sinks): track sink hydration? + } CollectionStateExtra::None => { // Nothing to do } @@ -2049,36 +2025,42 @@ where let instance = cluster_id.and_then(|cluster_id| self.instances.get_mut(&cluster_id)); if read_frontier.is_empty() { - if instance.is_some() && self.collections.contains_key(&id) { - let collection = self.collections.get(&id).expect("known to exist"); - match collection.extra_state { - CollectionStateExtra::Ingestion(_) => { - pending_source_drops.push(id); - } - CollectionStateExtra::None => { - // Nothing to do - } - } - } else if let Some(collection) = self.collections.get(&id) { - match collection.data_source { - DataSource::Table { .. } => { - pending_collection_drops.push(id); + if let Some(collection) = self.collections.get(&id) { + if instance.is_some() { + match collection.extra_state { + CollectionStateExtra::Ingestion(_) => { + pending_source_drops.push(id); + } + CollectionStateExtra::None => { + // Nothing to do + } + CollectionStateExtra::Export(_) => { + pending_sink_drops.push(id); + } } - DataSource::Webhook => { - pending_collection_drops.push(id); - // TODO(parkmycar): The Collection Manager and PersistMonotonicWriter - // could probably use some love and maybe get merged together? - let fut = self.collection_manager.unregister_collection(id); - mz_ore::task::spawn(|| format!("storage-webhook-cleanup-{id}"), fut); + } else { + match collection.data_source { + DataSource::Table { .. } => { + pending_collection_drops.push(id); + } + DataSource::Webhook => { + pending_collection_drops.push(id); + // TODO(parkmycar): The Collection Manager and PersistMonotonicWriter + // could probably use some love and maybe get merged together? + let fut = self.collection_manager.unregister_collection(id); + mz_ore::task::spawn( + || format!("storage-webhook-cleanup-{id}"), + fut, + ); + } + DataSource::Ingestion(_) => (), + DataSource::IngestionExport { .. } => (), + DataSource::Introspection(_) => (), + DataSource::Progress => (), + DataSource::Other => (), + DataSource::Sink { .. } => (), } - DataSource::Ingestion(_) => (), - DataSource::IngestionExport { .. } => (), - DataSource::Introspection(_) => (), - DataSource::Progress => (), - DataSource::Other => (), } - } else if instance.is_some() && self.exports.contains_key(&id) { - pending_sink_drops.push(id); } else if instance.is_none() { tracing::info!("Compaction command for id {id}, but we don't have a client."); } else { @@ -2436,7 +2418,6 @@ where Self { build_info, collections: BTreeMap::default(), - exports: BTreeMap::default(), persist_table_worker, txns_read, txns_metrics, @@ -2490,28 +2471,44 @@ where for (id, policy) in policies.into_iter() { if let Some(collection) = self.collections.get_mut(&id) { - let ingestion = match &mut collection.extra_state { - CollectionStateExtra::Ingestion(ingestion) => ingestion, + match &mut collection.extra_state { + CollectionStateExtra::Ingestion(ingestion) => { + let mut new_derived_since = + policy.frontier(ingestion.write_frontier.borrow()); + + if PartialOrder::less_equal(&ingestion.derived_since, &new_derived_since) { + let mut update = ChangeBatch::new(); + update.extend(new_derived_since.iter().map(|time| (time.clone(), 1))); + std::mem::swap(&mut ingestion.derived_since, &mut new_derived_since); + update.extend(new_derived_since.iter().map(|time| (time.clone(), -1))); + + if !update.is_empty() { + read_capability_changes.insert(id, update); + } + } + + ingestion.hold_policy = policy; + } CollectionStateExtra::None => { unreachable!("set_hold_policies is only called for ingestions"); } - }; - let mut new_derived_since = policy.frontier(ingestion.write_frontier.borrow()); + CollectionStateExtra::Export(export) => { + let mut new_derived_since = policy.frontier(export.write_frontier.borrow()); - if PartialOrder::less_equal(&ingestion.derived_since, &new_derived_since) { - let mut update = ChangeBatch::new(); - update.extend(new_derived_since.iter().map(|time| (time.clone(), 1))); - std::mem::swap(&mut ingestion.derived_since, &mut new_derived_since); - update.extend(new_derived_since.iter().map(|time| (time.clone(), -1))); + if PartialOrder::less_equal(&export.derived_since, &new_derived_since) { + let mut update = ChangeBatch::new(); + update.extend(new_derived_since.iter().map(|time| (time.clone(), 1))); + std::mem::swap(&mut export.derived_since, &mut new_derived_since); + update.extend(new_derived_since.iter().map(|time| (time.clone(), -1))); - if !update.is_empty() { - read_capability_changes.insert(id, update); - } - } + if !update.is_empty() { + read_capability_changes.insert(id, update); + } + } - ingestion.hold_policy = policy; - } else if let Some(_export) = self.exports.get_mut(&id) { - unreachable!("set_hold_policies is only called for ingestions"); + export.read_policy = policy; + } + }; } } @@ -2526,8 +2523,27 @@ where for (id, new_upper) in updates.iter() { if let Some(collection) = self.collections.get_mut(id) { - let ingestion = match &mut collection.extra_state { - CollectionStateExtra::Ingestion(ingestion) => ingestion, + match &mut collection.extra_state { + CollectionStateExtra::Ingestion(ingestion) => { + if PartialOrder::less_than(&ingestion.write_frontier, new_upper) { + ingestion.write_frontier.clone_from(new_upper); + } + + debug!(%id, ?ingestion, ?new_upper, "upper update for ingestion!"); + + let new_derived_since = ingestion + .hold_policy + .frontier(ingestion.write_frontier.borrow()); + + if PartialOrder::less_equal(&ingestion.derived_since, &new_derived_since) { + let mut update = + swap_updates(&mut ingestion.derived_since, new_derived_since); + + if !update.is_empty() { + read_capability_changes.insert(*id, update); + } + } + } CollectionStateExtra::None => { if matches!(collection.data_source, DataSource::Progress) { // We do get these, but can't do anything with it! @@ -2540,56 +2556,26 @@ where } continue; } - }; - - if PartialOrder::less_than(&ingestion.write_frontier, new_upper) { - ingestion.write_frontier.clone_from(new_upper); - } - - debug!(%id, ?ingestion, ?new_upper, "upper update for ingestion!"); - - let mut new_derived_since = ingestion - .hold_policy - .frontier(ingestion.write_frontier.borrow()); - - if PartialOrder::less_equal(&ingestion.derived_since, &new_derived_since) { - let mut update = ChangeBatch::new(); - update.extend(new_derived_since.iter().map(|time| (time.clone(), 1))); - std::mem::swap(&mut ingestion.derived_since, &mut new_derived_since); - update.extend(new_derived_since.iter().map(|time| (time.clone(), -1))); - - if !update.is_empty() { - read_capability_changes.insert(*id, update); - } - } - } else if let Ok(export) = self.export_mut(*id) { - if PartialOrder::less_than(&export.write_frontier, new_upper) { - export.write_frontier.clone_from(new_upper); - } + CollectionStateExtra::Export(export) => { + if PartialOrder::less_than(&export.write_frontier, new_upper) { + export.write_frontier.clone_from(new_upper); + } - // Ignore read policy for sinks whose write frontiers are closed, which identifies - // the sink is being dropped; we need to advance the read frontier to the empty - // chain to signal to the dataflow machinery that they should deprovision this - // object. - let new_read_capability = if export.write_frontier.is_empty() { - export.write_frontier.clone() - } else { - export.read_policy.frontier(export.write_frontier.borrow()) - }; + // Ignore read policy for sinks whose write frontiers are closed, which identifies + // the sink is being dropped; we need to advance the read frontier to the empty + // chain to signal to the dataflow machinery that they should deprovision this + // object. + let new_derived_since = + export.read_policy.frontier(export.write_frontier.borrow()); - if PartialOrder::less_equal(export.read_hold.since(), &new_read_capability) { - let mut update = ChangeBatch::new(); - update.extend(new_read_capability.iter().map(|time| (time.clone(), 1))); - update.extend( - export - .read_hold - .since() - .iter() - .map(|time| (time.clone(), -1)), - ); + if PartialOrder::less_equal(&export.derived_since, &new_derived_since) { + let mut update = + swap_updates(&mut export.derived_since, new_derived_since); - if !update.is_empty() { - read_capability_changes.insert(*id, update); + if !update.is_empty() { + read_capability_changes.insert(*id, update); + } + } } } } else if self.storage_collections.check_exists(*id).is_ok() { @@ -2619,7 +2605,6 @@ where fn update_hold_capabilities(&mut self, updates: &mut BTreeMap>) { // Location to record consequences that we need to act on. let mut collections_net = BTreeMap::new(); - let mut exports_net = BTreeMap::new(); // We must not rely on any specific relative ordering of `GlobalId`s. // That said, it is reasonable to assume that collections generally have @@ -2633,8 +2618,23 @@ where } if let Some(collection) = self.collections.get_mut(&key) { - let ingestion = match &mut collection.extra_state { - CollectionStateExtra::Ingestion(ingestion) => ingestion, + match &mut collection.extra_state { + CollectionStateExtra::Ingestion(ingestion) => { + let changes = ingestion.read_capabilities.update_iter(update.drain()); + update.extend(changes); + + let (changes, frontier, _cluster_id) = + collections_net.entry(key).or_insert_with(|| { + ( + >::new(), + Antichain::new(), + ingestion.instance_id, + ) + }); + + changes.extend(update.drain()); + *frontier = ingestion.read_capabilities.frontier().to_owned(); + } CollectionStateExtra::None => { // WIP: See if this ever panics in ci. soft_panic_or_log!( @@ -2643,44 +2643,19 @@ where ); continue; } - }; - - let changes = ingestion.read_capabilities.update_iter(update.drain()); - update.extend(changes); + CollectionStateExtra::Export(export) => { + let changes = export.read_capabilities.update_iter(update.drain()); + update.extend(changes); - let (changes, frontier, _cluster_id) = - collections_net.entry(key).or_insert_with(|| { - ( - >::new(), - Antichain::new(), - ingestion.instance_id, - ) - }); + let (changes, frontier, _cluster_id) = + collections_net.entry(key).or_insert_with(|| { + (>::new(), Antichain::new(), export.cluster_id) + }); - changes.extend(update.drain()); - *frontier = ingestion.read_capabilities.frontier().to_owned(); - } else if let Ok(export) = self.export_mut(key) { - // Seed with our current read hold, then apply changes, to - // derive how we need to change our read hold. - let mut staged_read_hold = MutableAntichain::new(); - staged_read_hold - .update_iter(export.read_hold.since().iter().map(|t| (t.clone(), 1))); - let changes = staged_read_hold.update_iter(update.drain()); - update.extend(changes); - - // Make sure we also send `AllowCompaction` commands for sinks, - // which drives updating the sink's `as_of`, among other things. - let (changes, frontier, _cluster_id) = - exports_net.entry(key).or_insert_with(|| { - ( - >::new(), - Antichain::new(), - export.cluster_id(), - ) - }); - - changes.extend(update.drain()); - *frontier = staged_read_hold.frontier().to_owned(); + changes.extend(update.drain()); + *frontier = export.read_capabilities.frontier().to_owned(); + } + } } else { // This is confusing and we should probably error. tracing::warn!(id = ?key, ?update, "update_hold_capabilities for unknown object"); @@ -2689,11 +2664,7 @@ where // Translate our net compute actions into `AllowCompaction` commands and // downgrade persist sinces. The actual downgrades are performed by a Tokio - // task asynchorously. - // - // N.B. We only downgrade persist sinces for collections because - // exports/sinks don't have an associated collection. We still _do_ want - // to sent `AllowCompaction` commands to workers for them, though. + // task asynchronously. let mut worker_compaction_commands = BTreeMap::default(); for (key, (mut changes, frontier, cluster_id)) in collections_net { @@ -2707,8 +2678,11 @@ where .get_mut(&key) .expect("missing collection state"); - let ingestion = match &mut collection.extra_state { - CollectionStateExtra::Ingestion(ingestion) => ingestion, + let read_holds = match &mut collection.extra_state { + CollectionStateExtra::Ingestion(ingestion) => { + ingestion.dependency_read_holds.as_mut_slice() + } + CollectionStateExtra::Export(export) => export.read_holds.as_mut_slice(), CollectionStateExtra::None => { soft_panic_or_log!( "trying to downgrade read holds for collection which is not an \ @@ -2718,7 +2692,7 @@ where } }; - for read_hold in ingestion.dependency_read_holds.iter_mut() { + for read_hold in read_holds.iter_mut() { read_hold .try_downgrade(frontier.clone()) .expect("we only advance the frontier"); @@ -2727,18 +2701,6 @@ where worker_compaction_commands.insert(key, (frontier.clone(), cluster_id)); } } - for (key, (mut changes, frontier, cluster_id)) in exports_net { - if !changes.is_empty() { - let export_state = self.exports.get_mut(&key).expect("missing export state"); - - export_state - .read_hold - .try_downgrade(frontier.clone()) - .expect("we only advance the frontier"); - - worker_compaction_commands.insert(key, (frontier, cluster_id)); - } - } for (id, (read_frontier, cluster_id)) in worker_compaction_commands { // Acquiring a client for a storage instance requires await, so we @@ -2774,14 +2736,6 @@ where Ok(()) } - /// Iterate over exports that have not been dropped. - fn active_exports(&self) -> impl Iterator)> { - self.exports - .iter() - .filter(|(_id, e)| !e.is_dropped()) - .map(|(id, e)| (*id, e)) - } - /// Opens a write and critical since handles for the given `shard`. /// /// `since` is an optional `since` that the read handle will be forwarded to if it is less than @@ -2945,7 +2899,7 @@ where self.sink_statistics .lock() .expect("poisoned") - .retain(|k, _| self.exports.contains_key(k)); + .retain(|k, _| self.export(*k).is_ok()); } /// Appends a new global ID, shard ID pair to the appropriate collection. @@ -2993,7 +2947,7 @@ where fn determine_collection_dependencies( &self, self_id: GlobalId, - data_source: &DataSource, + data_source: &DataSource, ) -> Result, StorageError> { let dependency = match &data_source { DataSource::Introspection(_) @@ -3030,6 +2984,10 @@ where // track themselves and the remap shard as dependencies. vec![self_id, ingestion.remap_collection_id] } + DataSource::Sink { desc } => { + // Sinks hold back their own frontier and the frontier of their input. + vec![self_id, desc.sink.from] + } }; Ok(dependency) @@ -3055,7 +3013,7 @@ where for update in updates { let id = update.id; - if self.exports.contains_key(&id) { + if self.export(id).is_ok() { sink_status_updates.push(update); } else if self.storage_collections.check_exists(id).is_ok() { source_status_updates.push(update); @@ -3144,8 +3102,9 @@ where /// Runs the identified export using the current definition of the export /// that we have in memory. fn run_export(&mut self, id: GlobalId) -> Result<(), StorageError> { - let export = self.export(id)?; - let description = &export.description; + let DataSource::Sink { desc: description } = &self.collections[&id].data_source else { + return Err(StorageError::IdentifierMissing(id)); + }; info!( sink_id = %id, @@ -3157,6 +3116,7 @@ where let from_storage_metadata = self .storage_collections .collection_metadata(description.sink.from)?; + let to_storage_metadata = self.storage_collections.collection_metadata(id)?; let cmd = RunSinkCommand { id, @@ -3170,6 +3130,7 @@ where partition_strategy: description.sink.partition_strategy.clone(), from_storage_metadata, with_snapshot: description.sink.with_snapshot, + to_storage_metadata, }, }; @@ -3198,32 +3159,23 @@ where for collection_frontiers in self.storage_collections.active_collection_frontiers() { let id = collection_frontiers.id; + + if id.is_user() { + dbg!(format!("id: {:?}", collection_frontiers)); + } let since = collection_frontiers.read_capabilities; let upper = collection_frontiers.write_frontier; let instance = self .collections .get(&id) - .and_then(|c| match &c.extra_state { - CollectionStateExtra::Ingestion(ingestion) => Some(ingestion), + .and_then(|collection_state| match &collection_state.extra_state { + CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id), + CollectionStateExtra::Export(export) => Some(export.cluster_id()), CollectionStateExtra::None => None, }) - .and_then(|i| self.instances.get(&i.instance_id)); - if let Some(instance) = instance { - for replica_id in instance.replica_ids() { - replica_frontiers.insert((id, replica_id), upper.clone()); - } - } - - global_frontiers.insert(id, (since, upper)); - } + .and_then(|i| self.instances.get(&i)); - for (id, export) in self.active_exports() { - // Exports cannot be read from, so their `since` is always the empty frontier. - let since = Antichain::new(); - let upper = export.write_frontier.clone(); - - let instance = self.instances.get(&export.cluster_id()); if let Some(instance) = instance { for replica_id in instance.replica_ids() { replica_frontiers.insert((id, replica_id), upper.clone()); @@ -3349,20 +3301,6 @@ where collection.wallclock_lag_metrics.observe(lag); } - let active_exports = self.exports.iter_mut().filter(|(_id, e)| !e.is_dropped()); - for (id, export) in active_exports { - let lag = frontier_lag(&export.write_frontier); - export.wallclock_lag_max = std::cmp::max(export.wallclock_lag_max, lag); - - if let Some(updates) = &mut introspection_updates { - let lag = std::mem::take(&mut export.wallclock_lag_max); - let row = pack_row(*id, lag); - updates.push((row, 1)); - } - - export.wallclock_lag_metrics.observe(lag); - } - if let Some(updates) = introspection_updates { self.append_introspection_updates(IntrospectionType::WallclockLagHistory, updates); self.wallclock_lag_last_refresh = Instant::now(); @@ -3477,7 +3415,7 @@ where #[derive(Debug)] struct CollectionState { /// The source of this collection's data. - pub data_source: DataSource, + pub data_source: DataSource, pub collection_metadata: CollectionMetadata, @@ -3493,6 +3431,7 @@ struct CollectionState { #[derive(Debug)] enum CollectionStateExtra { Ingestion(IngestionState), + Export(ExportState), None, } @@ -3610,3 +3549,14 @@ fn replica_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc< extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()), } } + +fn swap_updates( + from: &mut Antichain, + mut replace_with: Antichain, +) -> ChangeBatch { + let mut update = ChangeBatch::new(); + update.extend(replace_with.iter().map(|time| (time.clone(), 1))); + std::mem::swap(from, &mut replace_with); + update.extend(replace_with.iter().map(|time| (time.clone(), -1))); + update +} diff --git a/src/storage-types/src/sinks.proto b/src/storage-types/src/sinks.proto index 236c67d8a8256..6ade6172513a4 100644 --- a/src/storage-types/src/sinks.proto +++ b/src/storage-types/src/sinks.proto @@ -29,6 +29,7 @@ message ProtoStorageSinkDesc { ProtoStorageSinkConnection connection = 3; optional ProtoSinkEnvelope envelope = 4; optional mz_storage_types.controller.ProtoCollectionMetadata from_storage_metadata = 6; + optional mz_storage_types.controller.ProtoCollectionMetadata to_storage_metadata = 15; mz_repr.antichain.ProtoU64Antichain as_of = 11; bool with_snapshot = 12; uint64 version = 13; diff --git a/src/storage-types/src/sinks.rs b/src/storage-types/src/sinks.rs index b66209f4dc1cc..a3c2da2df168a 100644 --- a/src/storage-types/src/sinks.rs +++ b/src/storage-types/src/sinks.rs @@ -49,6 +49,7 @@ pub struct StorageSinkDesc { pub envelope: SinkEnvelope, pub as_of: Antichain, pub from_storage_metadata: S, + pub to_storage_metadata: S, } impl AlterCompatible @@ -80,6 +81,7 @@ impl AlterCompatible from_storage_metadata, partition_strategy, with_snapshot, + to_storage_metadata, } = self; let compatibility_checks = [ @@ -99,6 +101,10 @@ impl AlterCompatible from_storage_metadata == &other.from_storage_metadata, "from_storage_metadata", ), + ( + to_storage_metadata == &other.to_storage_metadata, + "from_storage_metadata", + ), ]; for (compatible, field) in compatibility_checks { @@ -132,6 +138,7 @@ impl Arbitrary for StorageSinkDesc { any::(), any::(), any::(), + any::(), ) .prop_map( |( @@ -144,6 +151,7 @@ impl Arbitrary for StorageSinkDesc { partition_strategy, with_snapshot, version, + to_storage_metadata, )| { StorageSinkDesc { from, @@ -155,9 +163,13 @@ impl Arbitrary for StorageSinkDesc { from_storage_metadata, partition_strategy, with_snapshot, + to_storage_metadata, } }, ) + .prop_filter("identical source and sink", |desc| { + desc.from_storage_metadata != desc.to_storage_metadata + }) .boxed() } } @@ -171,6 +183,7 @@ impl RustType for StorageSinkDesc for StorageSinkDesc> SinkRender for KafkaSinkConnection { ) -> (Stream, Vec) { let mut scope = input.scope(); + let write_handle = { + let persist = Arc::clone(&storage_state.persist_clients); + let shard_meta = sink.to_storage_metadata.clone(); + async move { + let client = persist.open(shard_meta.persist_location).await?; + let handle = client + .open_writer( + shard_meta.data_shard, + Arc::new(shard_meta.relation_desc), + Arc::new(UnitSchema), + Diagnostics::from_purpose("sink handle"), + ) + .await?; + Ok(handle) + } + }; + let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum()))); storage_state .sink_write_frontiers @@ -186,6 +208,7 @@ impl> SinkRender for KafkaSinkConnection { sink, metrics, statistics, + write_handle, write_frontier, ); @@ -618,6 +641,8 @@ fn sink_collection>( sink: &StorageSinkDesc, metrics: KafkaSinkMetrics, statistics: SinkStatistics, + write_handle: impl Future>> + + 'static, write_frontier: Rc>>, ) -> (Stream, PressOnDropButton) { let scope = input.scope(); @@ -644,6 +669,8 @@ fn sink_collection>( ContextCreationError::Other(anyhow::anyhow!("synthetic error")) )); + let mut write_handle = write_handle.await?; + let metrics = Arc::new(metrics); let (mut producer, resume_upper) = TransactionalProducer::new( @@ -774,6 +801,23 @@ fn sink_collection>( info!("{name}: committing transaction for {}", progress.pretty()); producer.commit_transaction(progress.clone()).await?; transaction_begun = false; + let mut expect_upper = write_handle.shared_upper(); + loop { + if PartialOrder::less_equal(&progress, &expect_upper) { + break; + } + const EMPTY: &[((SourceData, ()), Timestamp, Diff)] = &[]; + match write_handle + .compare_and_append(EMPTY, expect_upper, progress.clone()) + .await + .expect("valid usage") + { + Ok(()) => break, + Err(mismatch) => { + expect_upper = mismatch.current; + } + } + } write_frontier.borrow_mut().clone_from(&progress); match progress.into_option() { Some(new_upper) => upper = new_upper, diff --git a/test/testdrive/controller-frontiers.td b/test/testdrive/controller-frontiers.td index 7b977167fae35..2fb5f50a491d7 100644 --- a/test/testdrive/controller-frontiers.td +++ b/test/testdrive/controller-frontiers.td @@ -186,7 +186,7 @@ sink1 s1 ON frontiers.object_id = sinks.id WHERE frontiers.object_id LIKE 'u%' AND - frontiers.read_frontier IS NULL AND + frontiers.read_frontier != 0 AND frontiers.write_frontier > 0 sink1