diff --git a/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs b/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs index 2c486da84..9af3f35df 100644 --- a/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs +++ b/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs @@ -17,6 +17,7 @@ use std::{ }; use tokio::sync::{mpsc, Mutex, OwnedSemaphorePermit, Semaphore}; +use tracing::{debug, instrument, Span}; use restate_core::{ network::{ @@ -33,7 +34,6 @@ use restate_types::{ replicated_loglet::ReplicatedLogletParams, GenerationalNodeId, }; -use tracing::instrument; use super::rpc_routers::SequencersRpc; use crate::loglet::{ @@ -107,7 +107,7 @@ where } #[instrument( - level="trace", + level="debug", skip_all, fields( otel.name = "replicated_loglet::remote_sequencer: append", @@ -148,16 +148,14 @@ where Ok(token) => break token, Err(err) => { match err.source { - NetworkError::ConnectError(_) - | NetworkError::ConnectionClosed(_) - | NetworkError::Timeout(_) => { - // we retry to re-connect one time + err @ NetworkError::Full => return Err(err.into()), + _ => { + // we retry on any other network error connection = self.renew_connection(connection).await?; msg = err.original; continue; } - err => return Err(err.into()), } } }; @@ -170,6 +168,7 @@ where } /// Gets or starts a new remote sequencer connection + #[instrument(level = "debug", skip_all)] async fn get_connection(&self) -> Result { let mut guard = self.connection.lock().await; if let Some(connection) = guard.deref() { @@ -190,6 +189,7 @@ where /// Renew a connection to a remote sequencer. This guarantees that only a single connection /// to the sequencer is available. + #[instrument(level = "debug", skip_all, fields(renewed = false))] async fn renew_connection( &self, old: RemoteSequencerConnection, @@ -207,6 +207,8 @@ where .node_connection(self.params.sequencer) .await?; + Span::current().record("renewed", true); + let connection = RemoteSequencerConnection::start(self.known_global_tail.clone(), connection)?; @@ -232,6 +234,7 @@ struct RemoteSequencerConnection { } impl RemoteSequencerConnection { + #[instrument(level = "debug", name = "connection_start", skip_all)] fn start( known_global_tail: TailOffsetWatch, connection: WeakConnection, @@ -260,6 +263,16 @@ impl RemoteSequencerConnection { sequencer: GenerationalNodeId, msg: Append, ) -> Result, NetworkSendError> { + // there are other reasons that can render this connection unusable + // even if the underlying connection is still valid. + // if the channel is closed, this connection cannot be used anymore + if self.tx.is_closed() { + return Err(NetworkSendError::new( + msg, + NetworkError::Unavailable("Inflight commits channel is closed".into()), + )); + } + let outgoing = Outgoing::new(sequencer, msg).assign_connection(self.inner.clone()); rpc_router @@ -283,6 +296,7 @@ impl RemoteSequencerConnection { if let Err(err) = self.tx.send(inflight_append) { // if we failed to push this to be processed by the connection reactor task // then we need to notify the caller + debug!("Inflight channel closed. Resolve commit as connection closed"); err.0 .commit_resolver .error(AppendError::retryable(NetworkError::ConnectionClosed( @@ -296,6 +310,7 @@ impl RemoteSequencerConnection { /// This task will run until the [`AppendStream`] is dropped. Once dropped /// all pending commits will be resolved with an error. it's up to the enqueuer /// to retry if needed. + #[instrument(level = "debug", skip_all)] async fn handle_appended_responses( known_global_tail: TailOffsetWatch, connection: WeakConnection, @@ -315,6 +330,7 @@ impl RemoteSequencerConnection { inflight } _ = &mut closed => { + debug!("Sequencer Connection closed while waiting for next inflight append"); break AppendError::retryable(NetworkError::ConnectionClosed(connection.peer())); } }; @@ -335,6 +351,7 @@ impl RemoteSequencerConnection { incoming.map_err(AppendError::Shutdown) }, _ = &mut closed => { + debug!("Sequencer Connection closed while waiting for response"); Err(AppendError::retryable(NetworkError::ConnectionClosed(connection.peer()))) } }; @@ -375,6 +392,7 @@ impl RemoteSequencerConnection { // While the UnknownLoglet status is non-terminal for the connection // (since only one request is bad), // the AppendError for the caller is terminal + debug!(error=%err, "Resolve commit with error"); commit_resolver.error(AppendError::other(err)); } } @@ -394,10 +412,15 @@ impl RemoteSequencerConnection { // // For now this should not be a problem since they can (possibly) retry // to do the write again later. + debug!(cause=%err, "Draining inflight channel"); + let mut count = 0; while let Some(inflight) = rx.recv().await { inflight.commit_resolver.error(err.clone()); + count += 1; } + debug!("Drained/Cancelled {count} inflight commits"); + Ok(()) } }