Skip to content

Commit

Permalink
MVP
Browse files Browse the repository at this point in the history
  • Loading branch information
ParkMyCar committed Jan 25, 2025
1 parent 3d8c1bf commit b4dc4ff
Show file tree
Hide file tree
Showing 32 changed files with 831 additions and 87 deletions.
1 change: 1 addition & 0 deletions src/adapter/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ Issue a SQL query to get started. Need help?
{
(ExecuteResponse::SendingRows { future, .. }, _) => match future.await {
PeekResponseUnary::Rows(rows) => Ok(rows),
PeekResponseUnary::Batches(_) => bail!("unexpected staged result"),
PeekResponseUnary::Canceled => bail!("query canceled"),
PeekResponseUnary::Error(e) => bail!(e),
},
Expand Down
31 changes: 24 additions & 7 deletions src/adapter/src/coord/peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use mz_expr::{
use mz_ore::cast::CastFrom;
use mz_ore::str::{separated, StrExt};
use mz_ore::tracing::OpenTelemetryContext;
use mz_persist_client::batch::ProtoBatch;
use mz_repr::explain::text::DisplayText;
use mz_repr::explain::{CompactScalars, IndexUsageType, PlanRenderingContext, UsedIndexes};
use mz_repr::{Diff, GlobalId, IntoRowIterator, RelationType, Row, RowIterator};
Expand All @@ -44,6 +45,7 @@ use timely::progress::Timestamp;
use uuid::Uuid;

use crate::coord::timestamp_selection::TimestampDetermination;
use crate::optimize::peek::PeekOutput;
use crate::optimize::OptimizerError;
use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
use crate::util::ResultExt;
Expand Down Expand Up @@ -72,14 +74,15 @@ pub(crate) struct PendingPeek {
#[derive(Debug)]
pub enum PeekResponseUnary {
Rows(Box<dyn RowIterator + Send + Sync>),
Batches(Vec<ProtoBatch>),
Error(String),
Canceled,
}

#[derive(Clone, Debug)]
pub struct PeekDataflowPlan<T = mz_repr::Timestamp> {
pub(crate) desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
pub(crate) id: GlobalId,
pub(crate) output: PeekOutput,
key: Vec<MirScalarExpr>,
permutation: Vec<usize>,
thinned_arity: usize,
Expand All @@ -88,7 +91,7 @@ pub struct PeekDataflowPlan<T = mz_repr::Timestamp> {
impl<T> PeekDataflowPlan<T> {
pub fn new(
desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
id: GlobalId,
output: PeekOutput,
typ: &RelationType,
) -> Self {
let arity = typ.arity();
Expand All @@ -100,7 +103,7 @@ impl<T> PeekDataflowPlan<T> {
let (permutation, thinning) = permutation_for_arrangement(&key, arity);
Self {
desc,
id,
output,
key,
permutation,
thinned_arity: thinning.len(),
Expand Down Expand Up @@ -559,12 +562,14 @@ impl crate::coord::Coordinator {
// n.b. this index_id identifies a transient index the
// caller created, so it is guaranteed to be on
// `compute_instance`.
id: index_id,
//
// TODO(parkmycar): Update this comment.
output,
key: index_key,
permutation: index_permutation,
thinned_arity: index_thinned_arity,
}) => {
let output_ids = dataflow.export_ids().collect();
let output_ids = dataflow.exported_index_ids().collect();

// Very important: actually create the dataflow (here, so we can destructure).
self.controller
Expand All @@ -579,6 +584,15 @@ impl crate::coord::Coordinator {
)
.await;

let (target, index_id) = match output {
PeekOutput::Index {
transient_id: index_id,
} => (PeekTarget::Index { id: index_id }, Some(index_id)),
PeekOutput::ReadThenWrite { sink_id, .. } => {
(PeekTarget::Sinked { select_id: sink_id }, None)
}
};

// Create an identity MFP operator.
let mut map_filter_project = mz_expr::MapFilterProject::new(source_arity);
map_filter_project.permute_fn(
Expand All @@ -588,9 +602,9 @@ impl crate::coord::Coordinator {
let map_filter_project = mfp_to_safe_plan(map_filter_project)?;
(
(None, timestamp, map_filter_project),
Some(index_id),
index_id,
false,
PeekTarget::Index { id: index_id },
target,
StatementExecutionStrategy::Standard,
)
}
Expand Down Expand Up @@ -658,6 +672,9 @@ impl crate::coord::Coordinator {
Err(e) => PeekResponseUnary::Error(e),
}
}
PeekResponse::Staged(response) => {
PeekResponseUnary::Batches(response.staged_batches)
}
PeekResponse::Canceled => PeekResponseUnary::Canceled,
PeekResponse::Error(e) => PeekResponseUnary::Error(e),
},
Expand Down
124 changes: 88 additions & 36 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,11 @@ use mz_sql::names::{
Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
SchemaSpecifier, SystemObjectId,
};
use mz_sql::plan::{ConnectionDetails, NetworkPolicyRule, StatementContext};
use mz_sql::plan::{
ConnectionDetails, NetworkPolicyRule, ReadThenWriteFormat, SelectOutput, StatementContext,
};
use mz_sql::pure::{generate_subsource_statements, PurifiedSourceExport};
use mz_storage_client::client::TableData;
use mz_storage_types::sinks::StorageSinkDesc;
use smallvec::SmallVec;
use timely::progress::Timestamp as TimelyTimestamp;
Expand Down Expand Up @@ -2832,16 +2835,18 @@ impl Coordinator {
} = plan;

// Read then writes can be queued, so re-verify the id exists.
let desc = match self.catalog().try_get_entry(&id) {
let (desc, collection_id) = match self.catalog().try_get_entry(&id) {
Some(table) => {
let full_name = self
.catalog()
.resolve_full_name(table.name(), Some(ctx.session().conn_id()));
// Inserts always occur at the latest version of the table.
table
let gid = table.latest_global_id();
let desc = table
.desc_latest(&full_name)
.expect("desc called on table")
.into_owned()
.into_owned();
(desc, gid)
}
None => {
ctx.retire(Err(AdapterError::Catalog(
Expand Down Expand Up @@ -2954,14 +2959,37 @@ impl Coordinator {
session,
Default::default(),
);

let style = ExprPrepStyle::OneShot {
logical_time: EvalTime::NotAvailable,
session: peek_ctx.session(),
catalog_state: self.catalog().state(),
};
for expr in assignments.values_mut() {
return_if_err!(prep_scalar_expr(expr, style.clone()), peek_ctx);
}
let mutation_kind = match kind {
MutationKind::Insert => mz_expr::MutationKind::Insert,
MutationKind::Update => mz_expr::MutationKind::Update {
assignments: assignments.clone(),
},
MutationKind::Delete => mz_expr::MutationKind::Delete,
};
let output = SelectOutput::ReadThenWrite(ReadThenWriteFormat {
collection_id,
mutation_kind,
// TODO(parkmycar): This is wrong.
from_desc: desc.clone(),
});

self.sequence_peek(
peek_ctx,
plan::SelectPlan {
select: None,
source: selection,
when: QueryWhen::FreshestTableWrite,
finishing,
copy_to: None,
output,
},
TargetCluster::Active,
None,
Expand Down Expand Up @@ -3059,40 +3087,64 @@ impl Coordinator {
}
Ok(diffs)
};
let diffs = match peek_response {
ExecuteResponse::SendingRows { future: batch, .. } => {
// TODO(jkosh44): This timeout should be removed;
// we should instead periodically ensure clusters are
// healthy and actively cancel any work waiting on unhealthy
// clusters.
match tokio::time::timeout(timeout_dur, batch).await {
Ok(res) => match res {
PeekResponseUnary::Rows(rows) => make_diffs(rows),
PeekResponseUnary::Canceled => Err(AdapterError::Canceled),
PeekResponseUnary::Error(e) => {
Err(AdapterError::Unstructured(anyhow!(e)))
}
},
Err(_) => {
// We timed out, so remove the pending peek. This is
// best-effort and doesn't guarantee we won't
// receive a response.
// It is not an error for this timeout to occur after `internal_cmd_rx` has been dropped.
let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
conn_id: ctx.session().conn_id().clone(),
});
if let Err(e) = result {
warn!("internal_cmd_rx dropped before we could send: {:?}", e);
let diffs =
match peek_response {
ExecuteResponse::SendingRows { future: batch, .. } => {
// TODO(jkosh44): This timeout should be removed;
// we should instead periodically ensure clusters are
// healthy and actively cancel any work waiting on unhealthy
// clusters.
match tokio::time::timeout(timeout_dur, batch).await {
Ok(res) => match res {
PeekResponseUnary::Rows(rows) => make_diffs(rows),
PeekResponseUnary::Batches(resp) => {
let _ = ctx.session_mut().take_transaction_timestamp_context();
// TODO(parkmycar): This needs to be majorly refactored.
let row_count: u64 = resp
.iter()
.filter_map(|b| b.batch.as_ref())
.map(|b| b.len)
.sum();
let stage_write = ctx.session_mut().add_transaction_ops(
TransactionOps::Writes(vec![WriteOp {
id,
rows: TableData::Batches(resp.into()),
}]),
);
if let Err(err) = stage_write {
ctx.retire(Err(err));
} else {
ctx.retire(Ok(ExecuteResponse::Inserted(
usize::cast_from(row_count),
)));
}
return;
}
PeekResponseUnary::Canceled => Err(AdapterError::Canceled),
PeekResponseUnary::Error(e) => {
Err(AdapterError::Unstructured(anyhow!(e)))
}
},
Err(_) => {
// We timed out, so remove the pending peek. This is
// best-effort and doesn't guarantee we won't
// receive a response.
// It is not an error for this timeout to occur after `internal_cmd_rx` has been dropped.
let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
conn_id: ctx.session().conn_id().clone(),
});
if let Err(e) = result {
warn!("internal_cmd_rx dropped before we could send: {:?}", e);
}
Err(AdapterError::StatementTimeout)
}
Err(AdapterError::StatementTimeout)
}
}
}
ExecuteResponse::SendingRowsImmediate { rows } => make_diffs(rows),
resp => Err(AdapterError::Unstructured(anyhow!(
"unexpected peek response: {resp:?}"
))),
};
ExecuteResponse::SendingRowsImmediate { rows } => make_diffs(rows),
resp => Err(AdapterError::Unstructured(anyhow!(
"unexpected peek response: {resp:?}"
))),
};
let mut returning_rows = Vec::new();
let mut diff_err: Option<AdapterError> = None;
if !returning.is_empty() && diffs.is_ok() {
Expand Down
49 changes: 40 additions & 9 deletions src/adapter/src/coord/sequencer/inner/peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use mz_sql::ast::{ExplainStage, Statement};
use mz_sql::catalog::CatalogCluster;
// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
use mz_catalog::memory::objects::CatalogItem;
use mz_sql::plan::QueryWhen;
use mz_sql::plan::{self, HirScalarExpr};
use mz_sql::plan::{self, HirScalarExpr, ReadThenWriteFormat};
use mz_sql::plan::{QueryWhen, SelectOutput};
use mz_sql::session::metadata::SessionMetadata;
use mz_transform::EmptyStatisticsOracle;
use tokio::sync::oneshot;
Expand All @@ -54,6 +54,7 @@ use crate::explain::insights::PlanInsightsContext;
use crate::explain::optimizer_trace::OptimizerTrace;
use crate::notice::AdapterNotice;
use crate::optimize::dataflows::{prep_scalar_expr, EvalTime, ExprPrepStyle};
use crate::optimize::peek::PeekOutput;
use crate::optimize::{self, Optimize};
use crate::session::{RequireLinearization, Session, TransactionOps, TransactionStatus};
use crate::statement_logging::StatementLifecycleEvent;
Expand Down Expand Up @@ -300,15 +301,42 @@ impl Coordinator {
.instance_snapshot(cluster.id())
.expect("compute instance does not exist");
let (_, view_id) = self.allocate_transient_id();
let (_, index_id) = self.allocate_transient_id();

let output = match &plan.output {
SelectOutput::Rows => {
let (_, index_id) = self.allocate_transient_id();
PeekOutput::Index {
transient_id: index_id,
}
}
SelectOutput::ReadThenWrite(ReadThenWriteFormat {
collection_id,
from_desc,
mutation_kind,
}) => {
let (_, sink_id) = self.allocate_transient_id();
let collection_meta = self
.controller
.storage
.collection_metadata(*collection_id)?;
PeekOutput::ReadThenWrite {
sink_id,
collection_id: *collection_id,
collection_meta,
from_desc: from_desc.clone(),
mutation_kind: mutation_kind.clone(),
}
}
SelectOutput::CopyTo(_) => unreachable!("handled elsewhere"),
};

// Build an optimizer for this SELECT.
Either::Left(optimize::peek::Optimizer::new(
Arc::clone(&catalog),
compute_instance,
plan.finishing.clone(),
output,
view_id,
index_id,
optimizer_config,
self.optimizer_metrics(),
))
Expand Down Expand Up @@ -621,7 +649,7 @@ impl Coordinator {
session,
timestamp_context,
view_id: optimizer.select_id(),
index_id: optimizer.index_id(),
output: optimizer.output().clone(),
enable_re_optimize,
}).map(Box::new);
match explain_ctx {
Expand Down Expand Up @@ -867,7 +895,10 @@ impl Coordinator {

if let Some(transient_index_id) = match &planned_peek.plan {
peek::PeekPlan::FastPath(_) => None,
peek::PeekPlan::SlowPath(PeekDataflowPlan { id, .. }) => Some(id),
peek::PeekPlan::SlowPath(PeekDataflowPlan { output, .. }) => match output {
PeekOutput::Index { transient_id } => Some(transient_id),
PeekOutput::ReadThenWrite { .. } => None,
},
} {
if let Some(statement_logging_id) = ctx.extra.contents() {
self.set_transient_index_id(statement_logging_id, *transient_index_id);
Expand Down Expand Up @@ -940,9 +971,9 @@ impl Coordinator {
.add_notice(AdapterNotice::QueryTimestamp { explanation });
}

let resp = match plan.copy_to {
None => resp,
Some(format) => ExecuteResponse::CopyTo {
let resp = match plan.output {
SelectOutput::Rows | SelectOutput::ReadThenWrite { .. } => resp,
SelectOutput::CopyTo(format) => ExecuteResponse::CopyTo {
format,
resp: Box::new(resp),
},
Expand Down
Loading

0 comments on commit b4dc4ff

Please sign in to comment.