Skip to content

Commit

Permalink
[chaotic-good] Pushback on message writes until they reach serializat…
Browse files Browse the repository at this point in the history
…ion (grpc#37894)

Don't complete writes of messages until they make it to the transports outbound loop. Since payloads could be large this introduces just enough pushback that, once grpc#37868 goes in also we should be able to sense when a transport is busy writing and stop sending at higher layers.

Closes grpc#37894

COPYBARA_INTEGRATE_REVIEW=grpc#37894 from ctiller:send-acked 0cb3d7f
PiperOrigin-RevId: 686689473
  • Loading branch information
ctiller authored and copybara-github committed Oct 17, 2024
1 parent 9945066 commit 7e06934
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 29 deletions.
26 changes: 17 additions & 9 deletions src/core/ext/transport/chaotic_good/client_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -266,20 +266,28 @@ uint32_t ChaoticGoodClientTransport::MakeStream(CallHandler call_handler) {
return stream_id;
}

namespace {
absl::Status BooleanSuccessToTransportError(bool success) {
return success ? absl::OkStatus()
: absl::UnavailableError("Transport closed.");
}
} // namespace

auto ChaoticGoodClientTransport::CallOutboundLoop(uint32_t stream_id,
CallHandler call_handler) {
auto send_fragment = [stream_id,
outgoing_frames = outgoing_frames_.MakeSender()](
ClientFragmentFrame frame) mutable {
frame.stream_id = stream_id;
return Map(outgoing_frames.Send(std::move(frame)),
[](bool success) -> absl::Status {
if (!success) {
// Failed to send outgoing frame.
return absl::UnavailableError("Transport closed.");
}
return absl::OkStatus();
});
BooleanSuccessToTransportError);
};
auto send_fragment_acked = [stream_id,
outgoing_frames = outgoing_frames_.MakeSender()](
ClientFragmentFrame frame) mutable {
frame.stream_id = stream_id;
return Map(outgoing_frames.SendAcked(std::move(frame)),
BooleanSuccessToTransportError);
};
return GRPC_LATENT_SEE_PROMISE(
"CallOutboundLoop",
Expand All @@ -296,7 +304,7 @@ auto ChaoticGoodClientTransport::CallOutboundLoop(uint32_t stream_id,
},
// Continuously send client frame with client to server messages.
ForEach(OutgoingMessages(call_handler),
[send_fragment, aligned_bytes = aligned_bytes_](
[send_fragment_acked, aligned_bytes = aligned_bytes_](
MessageHandle message) mutable {
ClientFragmentFrame frame;
// Construct frame header (flags, header_length and
Expand All @@ -310,7 +318,7 @@ auto ChaoticGoodClientTransport::CallOutboundLoop(uint32_t stream_id,
CHECK_EQ((message_length + padding) % aligned_bytes, 0u);
frame.message = FragmentMessage(std::move(message), padding,
message_length);
return send_fragment(std::move(frame));
return send_fragment_acked(std::move(frame));
}),
[send_fragment]() mutable {
ClientFragmentFrame frame;
Expand Down
33 changes: 25 additions & 8 deletions src/core/ext/transport/chaotic_good/server_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,15 @@ auto ChaoticGoodServerTransport::MaybePushFragmentIntoCall(
});
}

namespace {
auto BooleanSuccessToTransportErrorCapturingInitiator(CallInitiator initiator) {
return [initiator = std::move(initiator)](bool success) {
return success ? absl::OkStatus()
: absl::UnavailableError("Transport closed.");
};
}
} // namespace

auto ChaoticGoodServerTransport::SendFragment(
ServerFragmentFrame frame, MpscSender<ServerFrame> outgoing_frames,
CallInitiator call_initiator) {
Expand All @@ -132,13 +141,20 @@ auto ChaoticGoodServerTransport::SendFragment(
// Capture the call_initiator to ensure the underlying call spine is alive
// until the outgoing_frames.Send promise completes.
return Map(outgoing_frames.Send(std::move(frame)),
[call_initiator](bool success) -> absl::Status {
if (!success) {
// Failed to send outgoing frame.
return absl::UnavailableError("Transport closed.");
}
return absl::OkStatus();
});
BooleanSuccessToTransportErrorCapturingInitiator(
std::move(call_initiator)));
}

auto ChaoticGoodServerTransport::SendFragmentAcked(
ServerFragmentFrame frame, MpscSender<ServerFrame> outgoing_frames,
CallInitiator call_initiator) {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: SendFragmentAcked: frame=" << frame.ToString();
// Capture the call_initiator to ensure the underlying call spine is alive
// until the outgoing_frames.Send promise completes.
return Map(outgoing_frames.SendAcked(std::move(frame)),
BooleanSuccessToTransportErrorCapturingInitiator(
std::move(call_initiator)));
}

auto ChaoticGoodServerTransport::SendCallBody(
Expand All @@ -165,7 +181,8 @@ auto ChaoticGoodServerTransport::SendCallBody(
frame.message =
FragmentMessage(std::move(message), padding, message_length);
frame.stream_id = stream_id;
return SendFragment(std::move(frame), outgoing_frames, call_initiator);
return SendFragmentAcked(std::move(frame), outgoing_frames,
call_initiator);
});
}

Expand Down
3 changes: 3 additions & 0 deletions src/core/ext/transport/chaotic_good/server_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ class ChaoticGoodServerTransport final : public ServerTransport {
static auto SendFragment(ServerFragmentFrame frame,
MpscSender<ServerFrame> outgoing_frames,
CallInitiator call_initiator);
static auto SendFragmentAcked(ServerFragmentFrame frame,
MpscSender<ServerFrame> outgoing_frames,
CallInitiator call_initiator);
auto CallOutboundLoop(uint32_t stream_id, CallInitiator call_initiator);
auto OnTransportActivityDone(absl::string_view activity);
auto TransportReadLoop(RefCountedPtr<ChaoticGoodTransport> transport);
Expand Down
36 changes: 24 additions & 12 deletions src/core/lib/promise/mpsc.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,19 @@ class Center : public RefCounted<Center<T>> {
return true;
}

// Returns the batch number that the item was sent in, or kClosedBatch if the
// pipe is closed.
uint64_t Send(T t) {
// Return value:
// - if the pipe is closed, returns kClosedBatch
// - if await_receipt is false, returns the batch number the item was sent
// in.
// - if await_receipt is true, returns the first sending batch number that
// guarantees the item has been received.
uint64_t Send(T t, bool await_receipt) {
ReleasableMutexLock lock(&mu_);
if (batch_ == kClosedBatch) return kClosedBatch;
queue_.push_back(std::move(t));
auto receive_waker = std::move(receive_waker_);
const uint64_t batch = queue_.size() <= max_queued_ ? batch_ : batch_ + 1;
const uint64_t batch =
(!await_receipt && queue_.size() <= max_queued_) ? batch_ : batch_ + 1;
lock.Release();
receive_waker.Wakeup();
return batch;
Expand Down Expand Up @@ -135,12 +140,25 @@ class MpscSender {
// Return a promise that will send one item.
// Resolves to true if sent, false if the receiver was closed (and the value
// will never be successfully sent).
auto Send(T t) {
auto Send(T t) { return SendGeneric<false>(std::move(t)); }

// Per send, but do not resolve until the item has been received by the
// receiver.
auto SendAcked(T t) { return SendGeneric<true>(std::move(t)); }

bool UnbufferedImmediateSend(T t) {
return center_->Send(std::move(t), false) !=
mpscpipe_detail::Center<T>::kClosedBatch;
}

private:
template <bool kAwaitReceipt>
auto SendGeneric(T t) {
return [center = center_, t = std::move(t),
batch = uint64_t(0)]() mutable -> Poll<bool> {
if (center == nullptr) return false;
if (batch == 0) {
batch = center->Send(std::move(t));
batch = center->Send(std::move(t), kAwaitReceipt);
CHECK_NE(batch, 0u);
if (batch == mpscpipe_detail::Center<T>::kClosedBatch) return false;
}
Expand All @@ -150,12 +168,6 @@ class MpscSender {
};
}

bool UnbufferedImmediateSend(T t) {
return center_->Send(std::move(t)) !=
mpscpipe_detail::Center<T>::kClosedBatch;
}

private:
friend class MpscReceiver<T>;
explicit MpscSender(RefCountedPtr<mpscpipe_detail::Center<T>> center)
: center_(std::move(center)) {}
Expand Down
13 changes: 13 additions & 0 deletions test/core/promise/mpsc_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,19 @@ TEST(MpscTest, SendOneThingInstantly) {
EXPECT_THAT(sender.Send(MakePayload(1))(), IsReady(true));
}

TEST(MpscTest, SendAckedOneThingWaitsForRead) {
StrictMock<MockActivity> activity;
activity.Activate();
MpscReceiver<Payload> receiver(1);
MpscSender<Payload> sender = receiver.MakeSender();
auto send = sender.SendAcked(MakePayload(1));
EXPECT_THAT(send(), IsPending());
EXPECT_CALL(activity, WakeupRequested());
EXPECT_THAT(receiver.Next()(), IsReady());
EXPECT_THAT(send(), IsReady(true));
activity.Deactivate();
}

TEST(MpscTest, SendOneThingInstantlyAndReceiveInstantly) {
MpscReceiver<Payload> receiver(1);
MpscSender<Payload> sender = receiver.MakeSender();
Expand Down

0 comments on commit 7e06934

Please sign in to comment.