Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: use exchange-based sequencing to broadcast internal commands #31147

Merged
merged 2 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions src/storage/src/healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ pub trait HealthOperator {

/// A default `HealthOperator` for use in normal cases.
pub struct DefaultWriter {
pub command_tx: Rc<RefCell<dyn InternalCommandSender>>,
pub command_tx: InternalCommandSender,
pub updates: Rc<RefCell<Vec<StatusUpdate>>>,
}

Expand Down Expand Up @@ -293,8 +293,7 @@ impl HealthOperator for DefaultWriter {

fn send_halt(&self, id: GlobalId, error: Option<(StatusNamespace, HealthStatusUpdate)>) {
self.command_tx
.borrow_mut()
.broadcast(InternalStorageCommand::SuspendAndRestart {
.send(InternalStorageCommand::SuspendAndRestart {
// Suspend and restart is expected to operate on the primary object and
// not any of the sub-objects
id,
Expand Down
216 changes: 196 additions & 20 deletions src/storage/src/internal_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
//! Types for cluster-internal control messages that can be broadcast to all
//! workers from individual operators/workers.

use std::collections::BTreeMap;
use std::time::Instant;
use std::cell::RefCell;
use std::collections::{BTreeMap, BTreeSet};
use std::rc::Rc;
use std::sync::mpsc;

use mz_repr::{GlobalId, Row};
use mz_rocksdb::config::SharedWriteBufferManager;
Expand All @@ -18,8 +20,11 @@ use mz_storage_types::sinks::{MetadataFilled, StorageSinkDesc};
use mz_storage_types::sources::IngestionDescription;
use serde::{Deserialize, Serialize};
use timely::communication::Allocate;
use timely::dataflow::channels::pact::{Exchange, Pipeline};
use timely::dataflow::operators::generic::{source, OutputHandle};
use timely::dataflow::operators::{Broadcast, Operator};
use timely::progress::Antichain;
use timely::synchronization::Sequencer;
use timely::scheduling::{Activator, Scheduler};
use timely::worker::Worker as TimelyWorker;

use crate::statistics::{SinkStatisticsRecord, SourceStatisticsRecord};
Expand Down Expand Up @@ -120,31 +125,202 @@ pub enum InternalStorageCommand {
},
}

/// Allows broadcasting [`internal commands`](InternalStorageCommand) to all
/// workers.
pub trait InternalCommandSender {
/// A sender broadcasting [`InternalStorageCommand`]s to all workers.
#[derive(Clone)]
pub struct InternalCommandSender {
tx: mpsc::Sender<InternalStorageCommand>,
activator: Rc<RefCell<Option<Activator>>>,
}

impl InternalCommandSender {
/// Broadcasts the given command to all workers.
fn broadcast(&mut self, internal_cmd: InternalStorageCommand);
pub fn send(&self, cmd: InternalStorageCommand) {
if self.tx.send(cmd).is_err() {
panic!("internal command channel disconnected");
}

/// Returns the next available command, if any. This returns `None` when
/// there are currently no commands but there might be commands again in the
/// future.
fn next(&mut self) -> Option<InternalStorageCommand>;
self.activator.borrow().as_ref().map(|a| a.activate());
}
}

impl InternalCommandSender for Sequencer<InternalStorageCommand> {
fn broadcast(&mut self, internal_cmd: InternalStorageCommand) {
self.push(internal_cmd);
}
/// A receiver for [`InternalStorageCommand`]s broadcasted by workers.
pub struct InternalCommandReceiver {
rx: mpsc::Receiver<InternalStorageCommand>,
}

fn next(&mut self) -> Option<InternalStorageCommand> {
Iterator::next(self)
impl InternalCommandReceiver {
/// Returns the next available command, if any.
///
/// This returns `None` when there are currently no commands but there might be commands again
/// in the future.
pub fn try_recv(&self) -> Option<InternalStorageCommand> {
match self.rx.try_recv() {
Ok(cmd) => Some(cmd),
Err(mpsc::TryRecvError::Empty) => None,
Err(mpsc::TryRecvError::Disconnected) => {
panic!("internal command channel disconnected")
}
}
}
}

pub(crate) fn setup_command_sequencer<'w, A: Allocate>(
timely_worker: &'w mut TimelyWorker<A>,
) -> Sequencer<InternalStorageCommand> {
// TODO(aljoscha): Use something based on `mz_ore::NowFn`?
Sequencer::new(timely_worker, Instant::now())
) -> (InternalCommandSender, InternalCommandReceiver) {
let (input_tx, input_rx) = mpsc::channel();
let (output_tx, output_rx) = mpsc::channel();
let activator = Rc::new(RefCell::new(None));

timely_worker.dataflow_named::<(), _, _>("command_sequencer", {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine, but it essentially uses the index as a capability. I think there's an alternative design that uses Timely's capabilities, but I'm leaving it to you which one you'd like to implement here. I think the order from the source to the sequencer doesn't matter, as long as it's not reordered -- you could also think of this as a batch of data and a capability (worker, index). After the sequencer, it's simpler as there is only a single operator instance that retains a capability, so it could just use a counter as its capability. The data then would be a vectors of updates per time, i.e., Stream<_, Vec<Command>>.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried using timestamps first, instead of putting the index into the data and it turned out more complicated. Maybe I missed something though.

  • I want to use a Timestamp type of (worker, index), so (usize, u64). Tuples already implement Timestamp, but they don't implement Refines<()>, so I can't use them directly. Instead I have to implement a new timestamp type, which requires a bunch of boilerplate.
  • If I do the above, I think I'd still have to do the put-commands-into-a-map-and-wait-for-the-next-index thing in both the sequencer and the sink because there is no guarantee that an operator sees updates in it inputs ordered by time (right?). So in the end the code would look pretty much the same, just with a bunch of more timestamp impl and capability management code.

I think the order from the source to the sequencer doesn't matter, as long as it's not reordered

The "as long as it's not reordered" is important though! Correct me if I'm wrong, but I think this could happen:

  • Source sends command C1 at (0, 0).
  • Source sends command C2 at (0, 1).
  • Sequencer receives command C2.
  • Sequencer receives command C1.

Or is there any guarantee that updates with different times are received in time order?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to use a Timestamp type of (worker, index), ...

Ah, I'm not sure you do! At least, (worker, index) is a totally ordered type, where worker trumps index no matter what, and roughly everyone should wait for a worker to finish out all of its index before moving on to the next worker. I think you might want Product<worker, index>, which I think should implement Refines<()>. Does that sound right? (I'm trying to back out the intent from my partial understanding).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, though the thing I was considering would not involve waiting for frontiers or even looking at frontiers at all, but just using the timestamp to transport the command index (instead of putting it into the data as this PR does). If you don't look at frontiers, the partial order of times doesn't matter. It's entirely possible that working with frontiers would lead to a simpler implementation, but I don't know what that could look like!

I think you might want Product<worker, index>

I'm not so sure! The only thing we need is that the commands coming from one worker retain their order in the output, between workers there is no defined order. So we'd need a (worker, index) where instances are only comparable if worker is equal. But that becomes awkward because Timestamp assumes the existence of a minimum that you can downgrade to all other timestamps. So I guess an (Option<worker>, index) where (None, 0) is that minimum?

let activator = Rc::clone(&activator);
move |scope| {
// Create a stream of commands received from `input_rx`.
//
// The output commands are tagged by worker ID and command index, allowing downstream
// operators to ensure their correct relative order.
let stream = source(scope, "command_sequencer::source", |cap, info| {
*activator.borrow_mut() = Some(scope.activator_for(info.address));

let worker_id = scope.index();
let mut cmd_index = 0;
let mut capability = Some(cap);

move |output: &mut OutputHandle<_, _, _>| {
let Some(cap) = &capability else {
return;
};

let mut session = output.session(cap);
loop {
match input_rx.try_recv() {
Ok(command) => {
let cmd = IndexedCommand {
index: cmd_index,
command,
};
session.give((worker_id, cmd));
cmd_index += 1;
}
Err(mpsc::TryRecvError::Empty) => break,
Err(mpsc::TryRecvError::Disconnected) => {
// Drop our capability to shut down.
capability = None;
break;
}
}
}
}
});

// Sequence all commands through a single worker to establish a unique order.
//
// The output commands are tagged by a command index, allowing downstream operators to
// ensure their correct relative order.
let stream = stream.unary_frontier(
Exchange::new(|_| 0),
"command_sequencer::sequencer",
|cap, _info| {
let mut cmd_index = 0;
let mut capability = Some(cap);

// For each worker, keep an ordered list of pending commands, as well as the
// current index of the next command.
let mut pending_commands = vec![(BTreeSet::new(), 0); scope.peers()];

move |input, output: &mut OutputHandle<_, _, _>| {
let Some(cap) = &capability else {
return;
};

while let Some((_cap, data)) = input.next() {
for (worker_id, cmd) in data.drain(..) {
pending_commands[worker_id].0.insert(cmd);
}
}

let mut session = output.session(cap);
for (commands, next_idx) in &mut pending_commands {
while commands.first().is_some_and(|c| c.index == *next_idx) {
let mut cmd = commands.pop_first().unwrap();
cmd.index = cmd_index;
session.give(cmd);

*next_idx += 1;
cmd_index += 1;
}
}

if input.frontier().is_empty() {
// Drop our capability to shut down.
capability = None;
}
}
},
);

// Broadcast the ordered commands to all workers.
let stream = stream.broadcast();

// Sink the stream back into `output_tx`.
stream.sink(Pipeline, "command_sequencer::sink", {
// Keep an ordered list of pending commands, as well as the current index of the
// next command.
let mut pending_commands = BTreeSet::new();
let mut next_idx = 0;

move |input| {
while let Some((_cap, data)) = input.next() {
pending_commands.extend(data.drain(..));
}

while pending_commands
.first()
.is_some_and(|c| c.index == next_idx)
{
let cmd = pending_commands.pop_first().unwrap();
let _ = output_tx.send(cmd.command);
next_idx += 1;
}
}
});
}
});

let tx = InternalCommandSender {
tx: input_tx,
activator,
};
let rx = InternalCommandReceiver { rx: output_rx };

(tx, rx)
}

// An [`InternalStorageCommand`] tagged with an index.
//
// This is a `(u64, InternalStorageCommand)` in spirit, but implements `Ord` (which
// `InternalStorageCommand` doesn't) by looking only at the index.
#[derive(Clone, Debug, Serialize, Deserialize)]
struct IndexedCommand {
index: u64,
command: InternalStorageCommand,
}

impl PartialEq for IndexedCommand {
fn eq(&self, other: &Self) -> bool {
self.cmp(other).is_eq()
}
}

impl Eq for IndexedCommand {}

impl PartialOrd for IndexedCommand {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl Ord for IndexedCommand {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.index.cmp(&other.index)
}
}
4 changes: 2 additions & 2 deletions src/storage/src/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ pub fn build_ingestion_dataflow<A: Allocate>(
"source",
&health_stream,
crate::healthcheck::DefaultWriter {
command_tx: Rc::clone(&storage_state.internal_cmd_tx),
command_tx: storage_state.internal_cmd_tx.clone(),
updates: Rc::clone(&storage_state.object_status_updates),
},
storage_state
Expand Down Expand Up @@ -456,7 +456,7 @@ pub fn build_export_dataflow<A: Allocate>(
"sink",
&health_stream,
crate::healthcheck::DefaultWriter {
command_tx: Rc::clone(&storage_state.internal_cmd_tx),
command_tx: storage_state.internal_cmd_tx.clone(),
updates: Rc::clone(&storage_state.object_status_updates),
},
storage_state
Expand Down
6 changes: 2 additions & 4 deletions src/storage/src/render/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

//! Logic related to the creation of dataflow sinks.

use std::rc::Rc;
use std::sync::Arc;
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -54,7 +53,7 @@ pub(crate) fn render_sink<'g, G: Scope<Timestamp = ()>>(
SnapshotMode::Exclude
};

let command_tx = Rc::clone(&storage_state.internal_cmd_tx);
let command_tx = storage_state.internal_cmd_tx.clone();

let (ok_collection, err_collection, persist_tokens) = persist_source::persist_source(
scope,
Expand All @@ -74,8 +73,7 @@ pub(crate) fn render_sink<'g, G: Scope<Timestamp = ()>>(
Box::pin(async move {
let error = format!("storage_sink: {error}");
tracing::info!("{error}");
let mut command_tx = command_tx.borrow_mut();
command_tx.broadcast(InternalStorageCommand::SuspendAndRestart {
command_tx.send(InternalStorageCommand::SuspendAndRestart {
id: sink_id,
reason: error,
});
Expand Down
Loading
Loading