Skip to content

Commit

Permalink
Actually create the shard for the new collection
Browse files Browse the repository at this point in the history
  • Loading branch information
bkirwi committed Jan 23, 2025
1 parent 464afa0 commit 79b21a1
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 77 deletions.
4 changes: 3 additions & 1 deletion src/adapter/src/catalog/transact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1057,8 +1057,10 @@ impl Catalog {
CatalogItem::ContinualTask(ct) => {
storage_collections_to_create.insert(ct.global_id());
}
CatalogItem::Sink(sink) => {
storage_collections_to_create.insert(sink.global_id());
}
CatalogItem::Log(_)
| CatalogItem::Sink(_)
| CatalogItem::View(_)
| CatalogItem::Index(_)
| CatalogItem::Type(_)
Expand Down
4 changes: 3 additions & 1 deletion src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ use mz_storage_types::connections::Connection as StorageConnection;
use mz_storage_types::connections::ConnectionContext;
use mz_storage_types::read_holds::ReadHold;
use mz_storage_types::sinks::S3SinkFormat;
use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
use mz_storage_types::sources::Timeline;
use mz_timestamp_oracle::postgres_oracle::{
PostgresTimestampOracle, PostgresTimestampOracleConfig,
Expand Down Expand Up @@ -2756,7 +2757,8 @@ impl Coordinator {
}
CatalogItem::Sink(sink) => {
let collection_desc = CollectionDescription {
desc: RelationDesc::empty(),
// TODO(sinks): make generic once we have more than one sink type.
desc: KAFKA_PROGRESS_DESC.clone(),
data_source: DataSource::Other,
since: None,
status_collection_id: None,
Expand Down
32 changes: 19 additions & 13 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,32 @@ use futures::{future, Future};
use itertools::Itertools;
use maplit::btreeset;
use mz_adapter_types::compaction::CompactionWindow;
use mz_adapter_types::connection::ConnectionId;
use mz_catalog::memory::objects::{
CatalogItem, Cluster, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
};
use mz_cloud_resources::VpcEndpointConfig;
use mz_controller_types::ReplicaId;
use mz_expr::{
CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
};
use mz_ore::cast::CastFrom;
use mz_ore::collections::{CollectionExt, HashSet};
use mz_ore::task::{self, spawn, JoinHandle};
use mz_ore::tracing::OpenTelemetryContext;
use mz_ore::vec::VecExt;
use mz_ore::{assert_none, instrument};
use mz_persist_client::stats::SnapshotPartStats;
use mz_repr::adt::jsonb::Jsonb;
use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
use mz_repr::explain::json::json_string;
use mz_repr::explain::ExprHumanizer;
use mz_repr::role_id::RoleId;
use mz_repr::{
CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, RelationDesc, RelationVersion,
CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, RelationVersion,
RelationVersionSelector, Row, RowArena, RowIterator, Timestamp,
};
use mz_sql::ast::AlterSourceAddSubsourceOption;
use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
use mz_sql::catalog::{
CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
Expand All @@ -52,17 +60,7 @@ use mz_sql::names::{
use mz_sql::plan::{ConnectionDetails, NetworkPolicyRule, StatementContext};
use mz_sql::pure::{generate_subsource_statements, PurifiedSourceExport};
use mz_storage_types::sinks::StorageSinkDesc;
use smallvec::SmallVec;
use timely::progress::Timestamp as TimelyTimestamp;
// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
use mz_adapter_types::connection::ConnectionId;
use mz_catalog::memory::objects::{
CatalogItem, Cluster, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
};
use mz_ore::cast::CastFrom;
use mz_ore::{assert_none, instrument};
use mz_persist_client::stats::SnapshotPartStats;
use mz_sql::ast::AlterSourceAddSubsourceOption;
use mz_sql::plan::{
AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
Expand All @@ -85,6 +83,7 @@ use mz_ssh_util::keys::SshKeyPairSet;
use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
use mz_storage_types::connections::inline::IntoInlineConnection;
use mz_storage_types::controller::StorageError;
use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
use mz_storage_types::stats::RelationPartStats;
use mz_storage_types::AlterCompatible;
use mz_transform::dataflow::DataflowMetainfo;
Expand All @@ -93,6 +92,8 @@ use mz_transform::EmptyStatisticsOracle;
use timely::progress::Antichain;
use tokio::sync::{oneshot, watch};
use tracing::{warn, Instrument, Span};
use smallvec::SmallVec;
use timely::progress::Timestamp as TimelyTimestamp;

use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
use crate::command::{ExecuteResponse, Response};
Expand Down Expand Up @@ -1346,7 +1347,7 @@ impl Coordinator {
};

let collection_desc = CollectionDescription {
desc: RelationDesc::empty(),
desc: KAFKA_PROGRESS_DESC.clone(),
data_source: DataSource::Other,
since: None,
status_collection_id: None,
Expand Down Expand Up @@ -3544,7 +3545,12 @@ impl Coordinator {
.expect("sink known to exist")
.write_frontier;
let as_of = ctx.read_hold.least_valid_read();
assert!(write_frontier.iter().all(|t| as_of.less_than(t)));
assert!(
write_frontier.iter().all(|t| as_of.less_than(t)),
"{:?} should be strictly less than {:?}",
&*as_of,
&**write_frontier
);

let catalog_sink = Sink {
create_sql: sink.create_sql,
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/memory/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1614,9 +1614,9 @@ impl CatalogItem {
CatalogItem::Table(_)
| CatalogItem::Source(_)
| CatalogItem::MaterializedView(_)
| CatalogItem::Sink(_)
| CatalogItem::ContinualTask(_) => true,
CatalogItem::Log(_)
| CatalogItem::Sink(_)
| CatalogItem::View(_)
| CatalogItem::Index(_)
| CatalogItem::Type(_)
Expand Down
122 changes: 61 additions & 61 deletions src/storage-controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1906,10 +1906,7 @@ where
for id in ids.iter() {
tracing::debug!("DroppedIds for collections {id}");

if let Some(_collection) = self.collections.remove(id) {
// Nothing to do, we already dropped read holds in
// `drop_sources_unvalidated`.
} else if let Some(export) = self.exports.get_mut(id) {
if let Some(export) = self.exports.get_mut(id) {
// TODO: Current main never drops export state, so we
// also don't do that, because it would be yet more
// refactoring. Instead, we downgrade to the empty
Expand All @@ -1920,6 +1917,9 @@ where
.read_hold
.try_downgrade(Antichain::new())
.expect("must be possible");
} else if let Some(_collection) = self.collections.remove(id) {
// Nothing to do, we already dropped read holds in
// `drop_sources_unvalidated`.
} else {
soft_panic_or_log!(
"DroppedIds for ID {id} but we have neither ingestion nor export \
Expand Down Expand Up @@ -2493,7 +2493,9 @@ where
let mut read_capability_changes = BTreeMap::default();

for (id, policy) in policies.into_iter() {
if let Some(collection) = self.collections.get_mut(&id) {
if let Some(_export) = self.exports.get_mut(&id) {
unreachable!("set_hold_policies is only called for ingestions");
} else if let Some(collection) = self.collections.get_mut(&id) {
let ingestion = match &mut collection.extra_state {
CollectionStateExtra::Ingestion(ingestion) => ingestion,
CollectionStateExtra::None => {
Expand All @@ -2514,8 +2516,6 @@ where
}

ingestion.hold_policy = policy;
} else if let Some(_export) = self.exports.get_mut(&id) {
unreachable!("set_hold_policies is only called for ingestions");
}
}

Expand All @@ -2529,7 +2529,37 @@ where
let mut read_capability_changes = BTreeMap::default();

for (id, new_upper) in updates.iter() {
if let Some(collection) = self.collections.get_mut(id) {
if let Ok(export) = self.export_mut(*id) {
if PartialOrder::less_than(&export.write_frontier, new_upper) {
export.write_frontier.clone_from(new_upper);
}

// Ignore read policy for sinks whose write frontiers are closed, which identifies
// the sink is being dropped; we need to advance the read frontier to the empty
// chain to signal to the dataflow machinery that they should deprovision this
// object.
let new_read_capability = if export.write_frontier.is_empty() {
export.write_frontier.clone()
} else {
export.read_policy.frontier(export.write_frontier.borrow())
};

if PartialOrder::less_equal(export.read_hold.since(), &new_read_capability) {
let mut update = ChangeBatch::new();
update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
update.extend(
export
.read_hold
.since()
.iter()
.map(|time| (time.clone(), -1)),
);

if !update.is_empty() {
read_capability_changes.insert(*id, update);
}
}
} else if let Some(collection) = self.collections.get_mut(id) {
let ingestion = match &mut collection.extra_state {
CollectionStateExtra::Ingestion(ingestion) => ingestion,
CollectionStateExtra::None => {
Expand Down Expand Up @@ -2562,36 +2592,6 @@ where
std::mem::swap(&mut ingestion.derived_since, &mut new_derived_since);
update.extend(new_derived_since.iter().map(|time| (time.clone(), -1)));

if !update.is_empty() {
read_capability_changes.insert(*id, update);
}
}
} else if let Ok(export) = self.export_mut(*id) {
if PartialOrder::less_than(&export.write_frontier, new_upper) {
export.write_frontier.clone_from(new_upper);
}

// Ignore read policy for sinks whose write frontiers are closed, which identifies
// the sink is being dropped; we need to advance the read frontier to the empty
// chain to signal to the dataflow machinery that they should deprovision this
// object.
let new_read_capability = if export.write_frontier.is_empty() {
export.write_frontier.clone()
} else {
export.read_policy.frontier(export.write_frontier.borrow())
};

if PartialOrder::less_equal(export.read_hold.since(), &new_read_capability) {
let mut update = ChangeBatch::new();
update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
update.extend(
export
.read_hold
.since()
.iter()
.map(|time| (time.clone(), -1)),
);

if !update.is_empty() {
read_capability_changes.insert(*id, update);
}
Expand Down Expand Up @@ -2636,7 +2636,29 @@ where
debug!(id = %key, ?update, "update_hold_capability");
}

if let Some(collection) = self.collections.get_mut(&key) {
if let Ok(export) = self.export_mut(key) {
// Seed with our current read hold, then apply changes, to
// derive how we need to change our read hold.
let mut staged_read_hold = MutableAntichain::new();
staged_read_hold
.update_iter(export.read_hold.since().iter().map(|t| (t.clone(), 1)));
let changes = staged_read_hold.update_iter(update.drain());
update.extend(changes);

// Make sure we also send `AllowCompaction` commands for sinks,
// which drives updating the sink's `as_of`, among other things.
let (changes, frontier, _cluster_id) =
exports_net.entry(key).or_insert_with(|| {
(
<ChangeBatch<_>>::new(),
Antichain::new(),
export.cluster_id(),
)
});

changes.extend(update.drain());
*frontier = staged_read_hold.frontier().to_owned();
} else if let Some(collection) = self.collections.get_mut(&key) {
let ingestion = match &mut collection.extra_state {
CollectionStateExtra::Ingestion(ingestion) => ingestion,
CollectionStateExtra::None => {
Expand All @@ -2663,28 +2685,6 @@ where

changes.extend(update.drain());
*frontier = ingestion.read_capabilities.frontier().to_owned();
} else if let Ok(export) = self.export_mut(key) {
// Seed with our current read hold, then apply changes, to
// derive how we need to change our read hold.
let mut staged_read_hold = MutableAntichain::new();
staged_read_hold
.update_iter(export.read_hold.since().iter().map(|t| (t.clone(), 1)));
let changes = staged_read_hold.update_iter(update.drain());
update.extend(changes);

// Make sure we also send `AllowCompaction` commands for sinks,
// which drives updating the sink's `as_of`, among other things.
let (changes, frontier, _cluster_id) =
exports_net.entry(key).or_insert_with(|| {
(
<ChangeBatch<_>>::new(),
Antichain::new(),
export.cluster_id(),
)
});

changes.extend(update.drain());
*frontier = staged_read_hold.frontier().to_owned();
} else {
// This is confusing and we should probably error.
tracing::warn!(id = ?key, ?update, "update_hold_capabilities for unknown object");
Expand Down

0 comments on commit 79b21a1

Please sign in to comment.