Skip to content

Commit

Permalink
Remove a now-unused trait
Browse files Browse the repository at this point in the history
  • Loading branch information
bkirwi committed Jan 22, 2025
1 parent b32b839 commit 8bc965e
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 58 deletions.
6 changes: 3 additions & 3 deletions src/storage-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use mz_service::grpc::{GrpcClient, GrpcServer, ProtoServiceTypes, ResponseStream
use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
use mz_storage_types::parameters::StorageParameters;
use mz_storage_types::sinks::{MetadataFilled, StorageSinkDesc};
use mz_storage_types::sinks::StorageSinkDesc;
use mz_storage_types::sources::IngestionDescription;
use mz_timely_util::progress::any_antichain;
use proptest::prelude::{any, Arbitrary};
Expand Down Expand Up @@ -260,7 +260,7 @@ impl RustType<ProtoRunSinkCommand> for RunSinkCommand<mz_repr::Timestamp> {
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct RunSinkCommand<T> {
pub id: GlobalId,
pub description: StorageSinkDesc<MetadataFilled, T>,
pub description: StorageSinkDesc<CollectionMetadata, T>,
}

impl Arbitrary for RunSinkCommand<mz_repr::Timestamp> {
Expand All @@ -270,7 +270,7 @@ impl Arbitrary for RunSinkCommand<mz_repr::Timestamp> {
fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
(
any::<GlobalId>(),
any::<StorageSinkDesc<MetadataFilled, mz_repr::Timestamp>>(),
any::<StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>>(),
)
.prop_map(|(id, description)| Self { id, description })
.boxed()
Expand Down
4 changes: 2 additions & 2 deletions src/storage-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCa
use mz_storage_types::parameters::StorageParameters;
use mz_storage_types::read_holds::ReadHold;
use mz_storage_types::read_policy::ReadPolicy;
use mz_storage_types::sinks::{MetadataUnfilled, StorageSinkConnection, StorageSinkDesc};
use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
use mz_storage_types::sources::{
GenericSourceConnection, IngestionDescription, SourceDesc, SourceExportDataConfig,
SourceExportDetails, Timeline,
Expand Down Expand Up @@ -157,7 +157,7 @@ impl<T> CollectionDescription<T> {

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ExportDescription<T = mz_repr::Timestamp> {
pub sink: StorageSinkDesc<MetadataUnfilled, T>,
pub sink: StorageSinkDesc<(), T>,
/// The ID of the instance in which to install the export.
pub instance_id: StorageInstanceId,
}
Expand Down
6 changes: 3 additions & 3 deletions src/storage-controller/src/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@ mod tests {
use mz_storage_types::instances::StorageInstanceId;
use mz_storage_types::sinks::{
KafkaIdStyle, KafkaSinkCompressionType, KafkaSinkConnection, KafkaSinkFormat,
KafkaSinkFormatType, MetadataFilled, SinkEnvelope, SinkPartitionStrategy,
StorageSinkConnection, StorageSinkDesc,
KafkaSinkFormatType, SinkEnvelope, SinkPartitionStrategy, StorageSinkConnection,
StorageSinkDesc,
};
use mz_storage_types::sources::load_generator::{
LoadGenerator, LoadGeneratorOutput, LoadGeneratorSourceExportDetails,
Expand Down Expand Up @@ -337,7 +337,7 @@ mod tests {
}
}

fn sink_description() -> StorageSinkDesc<MetadataFilled, u64> {
fn sink_description() -> StorageSinkDesc<CollectionMetadata, u64> {
StorageSinkDesc {
from: GlobalId::System(1),
from_desc: RelationDesc::new(
Expand Down
32 changes: 6 additions & 26 deletions src/storage-types/src/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use std::time::Duration;

use mz_dyncfg::ConfigSet;
use mz_expr::MirScalarExpr;
use mz_persist_types::ShardId;
use mz_pgcopy::CopyFormatParams;
use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
use mz_repr::bytes::ByteSize;
Expand All @@ -40,7 +39,7 @@ pub mod s3_oneshot_sink;

/// A sink for updates to a relational collection.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct StorageSinkDesc<S: StorageSinkDescFillState, T = mz_repr::Timestamp> {
pub struct StorageSinkDesc<S, T = mz_repr::Timestamp> {
pub from: GlobalId,
pub from_desc: RelationDesc,
pub connection: StorageSinkConnection,
Expand All @@ -49,11 +48,11 @@ pub struct StorageSinkDesc<S: StorageSinkDescFillState, T = mz_repr::Timestamp>
pub version: u64,
pub envelope: SinkEnvelope,
pub as_of: Antichain<T>,
pub from_storage_metadata: <S as StorageSinkDescFillState>::StorageMetadata,
pub from_storage_metadata: S,
}

impl<S: Debug + StorageSinkDescFillState + PartialEq, T: Debug + PartialEq + PartialOrder>
AlterCompatible for StorageSinkDesc<S, T>
impl<S: Debug + PartialEq, T: Debug + PartialEq + PartialOrder> AlterCompatible
for StorageSinkDesc<S, T>
{
/// Determines if `self` is compatible with another `StorageSinkDesc`, in
/// such a way that it is possible to turn `self` into `other` through a
Expand Down Expand Up @@ -118,26 +117,7 @@ impl<S: Debug + StorageSinkDescFillState + PartialEq, T: Debug + PartialEq + Par
}
}

pub trait StorageSinkDescFillState {
type StatusId: Debug + Clone + Serialize + for<'a> Deserialize<'a> + Eq + PartialEq;
type StorageMetadata: Debug + Clone + Serialize + for<'a> Deserialize<'a> + Eq + PartialEq;
}

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct MetadataUnfilled;
impl StorageSinkDescFillState for MetadataUnfilled {
type StatusId = GlobalId;
type StorageMetadata = ();
}

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct MetadataFilled;
impl StorageSinkDescFillState for MetadataFilled {
type StatusId = ShardId;
type StorageMetadata = CollectionMetadata;
}

impl Arbitrary for StorageSinkDesc<MetadataFilled, mz_repr::Timestamp> {
impl Arbitrary for StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp> {
type Strategy = BoxedStrategy<Self>;
type Parameters = ();

Expand Down Expand Up @@ -182,7 +162,7 @@ impl Arbitrary for StorageSinkDesc<MetadataFilled, mz_repr::Timestamp> {
}
}

impl RustType<ProtoStorageSinkDesc> for StorageSinkDesc<MetadataFilled, mz_repr::Timestamp> {
impl RustType<ProtoStorageSinkDesc> for StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp> {
fn into_proto(&self) -> ProtoStorageSinkDesc {
ProtoStorageSinkDesc {
connection: Some(self.connection.into_proto()),
Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/internal_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use mz_rocksdb::config::SharedWriteBufferManager;
use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
use mz_storage_types::parameters::StorageParameters;
use mz_storage_types::sinks::{MetadataFilled, StorageSinkDesc};
use mz_storage_types::sinks::StorageSinkDesc;
use mz_storage_types::sources::IngestionDescription;
use serde::{Deserialize, Serialize};
use timely::communication::Allocate;
Expand Down Expand Up @@ -98,7 +98,7 @@ pub enum InternalStorageCommand {
/// Render a sink dataflow.
RunSinkDataflow(
GlobalId,
StorageSinkDesc<MetadataFilled, mz_repr::Timestamp>,
StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>,
),
/// Drop all state and operators for a dataflow. This is a vec because some
/// dataflows have their state spread over multiple IDs (i.e. sources that
Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ use mz_repr::{GlobalId, Row};
use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::dyncfgs;
use mz_storage_types::oneshot_sources::{OneshotIngestionDescription, OneshotIngestionRequest};
use mz_storage_types::sinks::{MetadataFilled, StorageSinkDesc};
use mz_storage_types::sinks::StorageSinkDesc;
use mz_storage_types::sources::{GenericSourceConnection, IngestionDescription, SourceConnection};
use mz_timely_util::antichain::AntichainExt;
use timely::communication::Allocate;
Expand Down Expand Up @@ -425,7 +425,7 @@ pub fn build_export_dataflow<A: Allocate>(
timely_worker: &mut TimelyWorker<A>,
storage_state: &mut StorageState,
id: GlobalId,
description: StorageSinkDesc<MetadataFilled, mz_repr::Timestamp>,
description: StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>,
) {
let worker_logging = timely_worker.log_register().get("timely").map(Into::into);
let debug_name = id.to_string();
Expand Down
16 changes: 8 additions & 8 deletions src/storage/src/render/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ use std::rc::Rc;
use std::sync::Arc;
use std::time::{Duration, Instant};

use crate::healthcheck::HealthStatusMessage;
use crate::internal_control::InternalStorageCommand;
use crate::storage_state::StorageState;
use differential_dataflow::operators::arrange::Arrange;
use differential_dataflow::trace::implementations::ord_neu::{
ColValBatcher, ColValBuilder, ColValSpine,
Expand All @@ -23,26 +26,23 @@ use mz_interchange::envelopes::combine_at_timestamp;
use mz_persist_client::operators::shard_source::SnapshotMode;
use mz_repr::{Datum, Diff, GlobalId, Row, Timestamp};
use mz_storage_operators::persist_source;
use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::errors::DataflowError;
use mz_storage_types::sinks::{MetadataFilled, StorageSinkConnection, StorageSinkDesc};
use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
use mz_timely_util::builder_async::PressOnDropButton;
use timely::dataflow::operators::Leave;
use timely::dataflow::scopes::Child;
use timely::dataflow::{Scope, Stream};
use tracing::warn;

use crate::healthcheck::HealthStatusMessage;
use crate::internal_control::InternalStorageCommand;
use crate::storage_state::StorageState;

/// _Renders_ complete _differential_ [`Collection`]s
/// that represent the sink and its errors as requested
/// by the original `CREATE SINK` statement.
pub(crate) fn render_sink<'g, G: Scope<Timestamp = ()>>(
scope: &mut Child<'g, G, mz_repr::Timestamp>,
storage_state: &mut StorageState,
sink_id: GlobalId,
sink: &StorageSinkDesc<MetadataFilled, mz_repr::Timestamp>,
sink: &StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>,
) -> (Stream<G, HealthStatusMessage>, Vec<PressOnDropButton>) {
let sink_render = get_sink_render_for(&sink.connection);

Expand Down Expand Up @@ -104,7 +104,7 @@ pub(crate) fn render_sink<'g, G: Scope<Timestamp = ()>>(
/// `DiffPair`s.
fn zip_into_diff_pairs<G>(
sink_id: GlobalId,
sink: &StorageSinkDesc<MetadataFilled, mz_repr::Timestamp>,
sink: &StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>,
sink_render: &dyn SinkRender<G>,
collection: Collection<G, Row, Diff>,
) -> Collection<G, (Option<Row>, DiffPair<Row>), Diff>
Expand Down Expand Up @@ -211,7 +211,7 @@ where
fn render_sink(
&self,
storage_state: &mut StorageState,
sink: &StorageSinkDesc<MetadataFilled, Timestamp>,
sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
sink_id: GlobalId,
sinked_collection: Collection<G, (Option<Row>, DiffPair<Row>), Diff>,
err_collection: Collection<G, DataflowError, Diff>,
Expand Down
19 changes: 9 additions & 10 deletions src/storage/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;

use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
use crate::metrics::sink::kafka::KafkaSinkMetrics;
use crate::render::sinks::SinkRender;
use crate::statistics::SinkStatistics;
use crate::storage_state::StorageState;
use anyhow::{anyhow, bail, Context};
use differential_dataflow::{AsCollection, Collection, Hashable};
use futures::StreamExt;
Expand All @@ -101,11 +106,11 @@ use mz_ore::vec::VecExt;
use mz_repr::{Datum, DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
use mz_storage_client::sink::progress_key::ProgressKey;
use mz_storage_types::configuration::StorageConfiguration;
use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::dyncfgs::KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS;
use mz_storage_types::errors::{ContextCreationError, ContextCreationErrorExt, DataflowError};
use mz_storage_types::sinks::{
KafkaSinkConnection, KafkaSinkFormatType, MetadataFilled, SinkEnvelope, SinkPartitionStrategy,
StorageSinkDesc,
KafkaSinkConnection, KafkaSinkFormatType, SinkEnvelope, SinkPartitionStrategy, StorageSinkDesc,
};
use mz_timely_util::antichain::AntichainExt;
use mz_timely_util::builder_async::{
Expand All @@ -127,12 +132,6 @@ use tokio::sync::watch;
use tokio::time::{self, MissedTickBehavior};
use tracing::{error, info, warn};

use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
use crate::metrics::sink::kafka::KafkaSinkMetrics;
use crate::render::sinks::SinkRender;
use crate::statistics::SinkStatistics;
use crate::storage_state::StorageState;

impl<G: Scope<Timestamp = Timestamp>> SinkRender<G> for KafkaSinkConnection {
fn get_key_indices(&self) -> Option<&[usize]> {
self.key_desc_and_indices
Expand All @@ -147,7 +146,7 @@ impl<G: Scope<Timestamp = Timestamp>> SinkRender<G> for KafkaSinkConnection {
fn render_sink(
&self,
storage_state: &mut StorageState,
sink: &StorageSinkDesc<MetadataFilled, Timestamp>,
sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
sink_id: GlobalId,
input: Collection<G, (Option<Row>, DiffPair<Row>), Diff>,
// TODO(benesch): errors should stream out through the sink,
Expand Down Expand Up @@ -615,7 +614,7 @@ fn sink_collection<G: Scope<Timestamp = Timestamp>>(
connection: KafkaSinkConnection,
partition_strategy: SinkPartitionStrategy,
storage_configuration: StorageConfiguration,
sink: &StorageSinkDesc<MetadataFilled, Timestamp>,
sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
metrics: KafkaSinkMetrics,
statistics: SinkStatistics,
write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/storage_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ use mz_storage_types::configuration::StorageConfiguration;
use mz_storage_types::connections::ConnectionContext;
use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::oneshot_sources::OneshotIngestionDescription;
use mz_storage_types::sinks::{MetadataFilled, StorageSinkDesc};
use mz_storage_types::sinks::StorageSinkDesc;
use mz_storage_types::sources::IngestionDescription;
use mz_storage_types::AlterCompatible;
use mz_timely_util::builder_async::PressOnDropButton;
Expand Down Expand Up @@ -276,7 +276,7 @@ pub struct StorageState {
/// Descriptions of each installed ingestion.
pub ingestions: BTreeMap<GlobalId, IngestionDescription<CollectionMetadata>>,
/// Descriptions of each installed export.
pub exports: BTreeMap<GlobalId, StorageSinkDesc<MetadataFilled, mz_repr::Timestamp>>,
pub exports: BTreeMap<GlobalId, StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>>,
/// Descriptions of oneshot ingestions that are currently running.
pub oneshot_ingestions: BTreeMap<uuid::Uuid, OneshotIngestionDescription<ProtoBatch>>,
/// Undocumented
Expand Down

0 comments on commit 8bc965e

Please sign in to comment.