Skip to content

Commit

Permalink
Improve RemoteSequencer tracing and logging
Browse files Browse the repository at this point in the history
Also avoid sending over the open connection if the
inflight commits channel is closed
  • Loading branch information
muhamadazmy committed Nov 14, 2024
1 parent 484a0df commit 1993419
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 61 deletions.
35 changes: 27 additions & 8 deletions crates/bifrost/src/loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,21 +156,40 @@ pub type SendableLogletReadStream = Pin<Box<dyn LogletReadStream + Send>>;

#[allow(dead_code)]
pub(crate) struct LogletCommitResolver {
tx: oneshot::Sender<Result<LogletOffset, AppendError>>,
tx: Option<oneshot::Sender<Result<LogletOffset, AppendError>>>,
}

#[allow(dead_code)]
impl LogletCommitResolver {
pub fn sealed(self) {
let _ = self.tx.send(Err(AppendError::Sealed));
pub fn sealed(mut self) {
let _ = self
.tx
.take()
.expect("must be set")
.send(Err(AppendError::Sealed));
}

pub fn offset(self, offset: LogletOffset) {
let _ = self.tx.send(Ok(offset));
pub fn offset(mut self, offset: LogletOffset) {
let _ = self.tx.take().expect("must be set").send(Ok(offset));
}

pub fn error(self, err: AppendError) {
let _ = self.tx.send(Err(err));
pub fn error(mut self, err: AppendError) {
let _ = self.tx.take().expect("must be set").send(Err(err));
}
}

#[derive(Debug, Clone, Copy, thiserror::Error)]
#[error("Commit resolver was dropped")]
struct CommitCancelled;

/// If a LogletCommitResolver is dropped without being
/// 'resolved', we resolve it automatically as being cancelled
/// To make it distinguished from a Shutdown.
impl Drop for LogletCommitResolver {
fn drop(&mut self) {
if let Some(tx) = self.tx.take() {
let _ = tx.send(Err(AppendError::retryable(CommitCancelled)));
}
}
}

Expand All @@ -194,7 +213,7 @@ impl LogletCommit {
#[allow(dead_code)]
pub(crate) fn deferred() -> (Self, LogletCommitResolver) {
let (tx, rx) = oneshot::channel();
(Self { rx }, LogletCommitResolver { tx })
(Self { rx }, LogletCommitResolver { tx: Some(tx) })
}
}

Expand Down
131 changes: 78 additions & 53 deletions crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ use std::{
};

use tokio::sync::{mpsc, Mutex, OwnedSemaphorePermit, Semaphore};
use tracing::{debug, instrument, Span};

use restate_core::{
cancellation_watcher,
network::{
rpc_router::{RpcRouter, RpcToken},
NetworkError, NetworkSendError, Networking, Outgoing, TransportConnect, WeakConnection,
Expand All @@ -33,7 +35,6 @@ use restate_types::{
replicated_loglet::ReplicatedLogletParams,
GenerationalNodeId,
};
use tracing::instrument;

use super::rpc_routers::SequencersRpc;
use crate::loglet::{
Expand Down Expand Up @@ -107,7 +108,7 @@ where
}

#[instrument(
level="trace",
level="debug",
skip_all,
fields(
otel.name = "replicated_loglet::remote_sequencer: append",
Expand Down Expand Up @@ -148,16 +149,14 @@ where
Ok(token) => break token,
Err(err) => {
match err.source {
NetworkError::ConnectError(_)
| NetworkError::ConnectionClosed(_)
| NetworkError::Timeout(_) => {
// we retry to re-connect one time
err @ NetworkError::Full => return Err(err.into()),
_ => {
// we retry on any other network error
connection = self.renew_connection(connection).await?;

msg = err.original;
continue;
}
err => return Err(err.into()),
}
}
};
Expand All @@ -170,6 +169,7 @@ where
}

/// Gets or starts a new remote sequencer connection
#[instrument(level = "debug", skip_all)]
async fn get_connection(&self) -> Result<RemoteSequencerConnection, NetworkError> {
let mut guard = self.connection.lock().await;
if let Some(connection) = guard.deref() {
Expand All @@ -190,6 +190,7 @@ where

/// Renew a connection to a remote sequencer. This guarantees that only a single connection
/// to the sequencer is available.
#[instrument(level = "debug", skip_all, fields(renewed = false))]
async fn renew_connection(
&self,
old: RemoteSequencerConnection,
Expand All @@ -207,6 +208,8 @@ where
.node_connection(self.params.sequencer)
.await?;

Span::current().record("renewed", true);

let connection =
RemoteSequencerConnection::start(self.known_global_tail.clone(), connection)?;

Expand All @@ -232,14 +235,15 @@ struct RemoteSequencerConnection {
}

impl RemoteSequencerConnection {
#[instrument(level = "debug", name = "connection_start", skip_all)]
fn start(
known_global_tail: TailOffsetWatch,
connection: WeakConnection,
) -> Result<Self, ShutdownError> {
let (tx, rx) = mpsc::unbounded_channel();

task_center().spawn(
TaskKind::NetworkMessageHandler,
TaskKind::SequencerAppender,
"remote-sequencer-connection",
None,
Self::handle_appended_responses(known_global_tail, connection.clone(), rx),
Expand All @@ -260,6 +264,16 @@ impl RemoteSequencerConnection {
sequencer: GenerationalNodeId,
msg: Append,
) -> Result<RpcToken<Appended>, NetworkSendError<Append>> {
// there are other reasons that can render this connection unusable
// even if the underlying connection is still valid.
// if the channel is closed, this connection cannot be used anymore
if self.tx.is_closed() {
return Err(NetworkSendError::new(
msg,
NetworkError::Unavailable("Inflight commits channel is closed".into()),
));
}

let outgoing = Outgoing::new(sequencer, msg).assign_connection(self.inner.clone());

rpc_router
Expand All @@ -283,6 +297,7 @@ impl RemoteSequencerConnection {
if let Err(err) = self.tx.send(inflight_append) {
// if we failed to push this to be processed by the connection reactor task
// then we need to notify the caller
debug!("Inflight channel closed. Resolve commit as connection closed");
err.0
.commit_resolver
.error(AppendError::retryable(NetworkError::ConnectionClosed(
Expand All @@ -296,55 +311,84 @@ impl RemoteSequencerConnection {
/// This task will run until the [`AppendStream`] is dropped. Once dropped
/// all pending commits will be resolved with an error. it's up to the enqueuer
/// to retry if needed.
#[instrument(level = "debug", skip_all)]
async fn handle_appended_responses(
known_global_tail: TailOffsetWatch,
connection: WeakConnection,
mut rx: mpsc::UnboundedReceiver<RemoteInflightAppend>,
) -> anyhow::Result<()> {
let mut closed = std::pin::pin!(connection.closed());
let cancelled = cancellation_watcher();

let termination_err = tokio::select! {
_ = &mut closed => {
AppendError::retryable(NetworkError::ConnectionClosed(connection.peer()))
}
_ = cancelled => {
AppendError::Shutdown(ShutdownError)
}
err = Self::handler_loop(known_global_tail, connection.peer(), &mut rx) => {
err
}
};

// close channel to stop any further appends calls on the same connection
rx.close();

// Drain and resolve ALL pending appends on this connection.
//
// todo(azmy): The order of the RemoteInflightAppend's on the channel
// does not necessary matches the actual append calls. This is
// since sending on the connection and pushing on the rx channel is not an atomic
// operation. Which means that, it's possible when we are draining
// the pending requests here that we also end up cancelling some inflight appends
// that has already received a positive response from the sequencer.
//
// For now this should not be a problem since they can (possibly) retry
// to do the write again later.
debug!(cause=%termination_err, "Draining inflight channel");
let mut count = 0;
while let Some(inflight) = rx.recv().await {
inflight.commit_resolver.error(termination_err.clone());
count += 1;
}

debug!("Drained/Cancelled {count} inflight commits");

Ok(())
}

async fn handler_loop(
known_global_tail: TailOffsetWatch,
peer: GenerationalNodeId,
rx: &mut mpsc::UnboundedReceiver<RemoteInflightAppend>,
) -> AppendError {
// handle all rpc tokens in an infinite loop
// this loop only breaks when it encounters a terminal
// AppendError.
// When this happens, the receiver channel is closed
// and drained. The same error is then used to resolve
// all pending tokens
let err = loop {
let inflight = tokio::select! {
inflight = rx.recv() => {
inflight
}
_ = &mut closed => {
break AppendError::retryable(NetworkError::ConnectionClosed(connection.peer()));
loop {
let inflight = match rx.recv().await {
Some(inflight) => inflight,
None => {
return AppendError::retryable(NetworkError::ConnectionClosed(peer));
}
};

let Some(inflight) = inflight else {
// connection was dropped.
break AppendError::retryable(NetworkError::ConnectionClosed(connection.peer()));
};

let RemoteInflightAppend {
rpc_token,
commit_resolver,
permit: _permit,
} = inflight;

let appended = tokio::select! {
incoming = rpc_token.recv() => {
incoming.map_err(AppendError::Shutdown)
},
_ = &mut closed => {
Err(AppendError::retryable(NetworkError::ConnectionClosed(connection.peer())))
}
};

let appended = match appended {
let appended = match rpc_token.recv().await {
Ok(appended) => appended.into_body(),
Err(err) => {
// this can only be a terminal error (either shutdown or connection is closing)
commit_resolver.error(err.clone());
break err;
commit_resolver.error(AppendError::Shutdown(err));
return AppendError::Shutdown(err);
}
};

Expand All @@ -363,7 +407,7 @@ impl RemoteSequencerConnection {
// A sealed status returns a terminal error since we can immediately cancel
// all inflight append jobs.
commit_resolver.sealed();
break AppendError::Sealed;
return AppendError::Sealed;
}
SequencerStatus::UnknownLogId
| SequencerStatus::UnknownSegmentIndex
Expand All @@ -375,30 +419,11 @@ impl RemoteSequencerConnection {
// While the UnknownLoglet status is non-terminal for the connection
// (since only one request is bad),
// the AppendError for the caller is terminal
debug!(error=%err, "Resolve commit with error");
commit_resolver.error(AppendError::other(err));
}
}
};

// close channel to stop any further appends calls on the same connection
rx.close();

// Drain and resolve ALL pending appends on this connection.
//
// todo(azmy): The order of the RemoteInflightAppend's on the channel
// does not necessary matches the actual append calls. This is
// since sending on the connection and pushing on the rx channel is not an atomic
// operation. Which means that, it's possible when we are draining
// the pending requests here that we also end up cancelling some inflight appends
// that has already received a positive response from the sequencer.
//
// For now this should not be a problem since they can (possibly) retry
// to do the write again later.
while let Some(inflight) = rx.recv().await {
inflight.commit_resolver.error(err.clone());
}

Ok(())
}
}

Expand Down

0 comments on commit 1993419

Please sign in to comment.