Skip to content

Commit

Permalink
[promises] Deal with promise endpoint deletion whilst read/write is i…
Browse files Browse the repository at this point in the history
…n progress (grpc#35409)

Whilst here, eliminate unnecessary mutexes and streamline some complexity in the read variants.

Closes grpc#35409

COPYBARA_INTEGRATE_REVIEW=grpc#35409 from ctiller:pbe 4f95881
PiperOrigin-RevId: 595006455
  • Loading branch information
ctiller authored and copybara-github committed Jan 2, 2024
1 parent a2c7a70 commit 2c18d16
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 276 deletions.
2 changes: 2 additions & 0 deletions src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -6371,6 +6371,8 @@ grpc_cc_library(
deps = [
"activity",
"event_engine_common",
"if",
"map",
"poll",
"slice",
"slice_buffer",
Expand Down
101 changes: 44 additions & 57 deletions src/core/lib/transport/promise_endpoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "src/core/lib/transport/promise_endpoint.h"

#include <atomic>
#include <functional>
#include <memory>
#include <utility>
Expand All @@ -39,19 +40,13 @@ PromiseEndpoint::PromiseEndpoint(
SliceBuffer already_received)
: endpoint_(std::move(endpoint)) {
GPR_ASSERT(endpoint_ != nullptr);
read_state_->endpoint = endpoint_;
// TODO(ladynana): Replace this with `SliceBufferCast<>` when it is
// available.
grpc_slice_buffer_swap(read_buffer_.c_slice_buffer(),
grpc_slice_buffer_swap(read_state_->buffer.c_slice_buffer(),
already_received.c_slice_buffer());
}

PromiseEndpoint::~PromiseEndpoint() {
// Promise endpoint close when last write result has not been polled.
write_result_.reset();
// Promise endpoint close when last read result has not been polled.
read_result_.reset();
}

const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
PromiseEndpoint::GetPeerAddress() const {
return endpoint_->GetPeerAddress();
Expand All @@ -62,60 +57,52 @@ PromiseEndpoint::GetLocalAddress() const {
return endpoint_->GetLocalAddress();
}

void PromiseEndpoint::WriteCallback(absl::Status status) {
MutexLock lock(&write_mutex_);
write_result_ = status;
write_waker_.Wakeup();
}
void PromiseEndpoint::ReadState::Complete(absl::Status status,
size_t num_bytes_requested) {
gpr_log(GPR_ERROR, "PromiseEndpoint::ReadState::Complete: status:%s",
status.ToString().c_str());

void PromiseEndpoint::ReadCallback(absl::Status status,
size_t num_bytes_requested) {
if (!status.ok()) {
// Invalidates all previous reads.
pending_read_buffer_.Clear();
read_buffer_.Clear();
MutexLock lock(&read_mutex_);
read_result_ = status;
read_waker_.Wakeup();
} else {
// Appends `pending_read_buffer_` to `read_buffer_`.
pending_read_buffer_.MoveFirstNBytesIntoSliceBuffer(
pending_read_buffer_.Length(), read_buffer_);
GPR_DEBUG_ASSERT(pending_read_buffer_.Count() == 0u);
if (read_buffer_.Length() < num_bytes_requested) {
// A further read is needed.
// Set read args with number of bytes needed as hint.
grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs
read_args = {static_cast<int64_t>(num_bytes_requested -
read_buffer_.Length())};
// If `Read()` returns true immediately, the callback will not be
// called. We still need to call our callback to pick up the result and
// maybe do further reads.
if (endpoint_->Read(std::bind(&PromiseEndpoint::ReadCallback, this,
std::placeholders::_1, num_bytes_requested),
&pending_read_buffer_, &read_args)) {
ReadCallback(absl::OkStatus(), num_bytes_requested);
}
} else {
MutexLock lock(&read_mutex_);
read_result_ = status;
read_waker_.Wakeup();
}
pending_buffer.Clear();
buffer.Clear();
result = status;
auto w = std::move(waker);
complete.store(true, std::memory_order_release);
w.Wakeup();
return;
}
}

void PromiseEndpoint::ReadByteCallback(absl::Status status) {
if (!status.ok()) {
// invalidates all previous reads
pending_read_buffer_.Clear();
read_buffer_.Clear();
} else {
pending_read_buffer_.MoveFirstNBytesIntoSliceBuffer(
pending_read_buffer_.Length(), read_buffer_);
// Appends `pending_buffer` to `buffer`.
pending_buffer.MoveFirstNBytesIntoSliceBuffer(pending_buffer.Length(),
buffer);
GPR_DEBUG_ASSERT(pending_buffer.Count() == 0u);
if (buffer.Length() < num_bytes_requested) {
// A further read is needed.
// Set read args with number of bytes needed as hint.
grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs read_args =
{static_cast<int64_t>(num_bytes_requested - buffer.Length())};
// If `Read()` returns true immediately, the callback will not be
// called. We still need to call our callback to pick up the result and
// maybe do further reads.
auto ep = endpoint.lock();
if (ep == nullptr) {
Complete(absl::UnavailableError("Endpoint closed during read."),
num_bytes_requested);
return;
}
if (ep->Read(
[self = Ref(), num_bytes_requested](absl::Status status) {
self->Complete(std::move(status), num_bytes_requested);
},
&pending_buffer, &read_args)) {
Complete(std::move(status), num_bytes_requested);
}
return;
}
MutexLock lock(&read_mutex_);
read_result_ = status;
read_waker_.Wakeup();
result = status;
auto w = std::move(waker);
complete.store(true, std::memory_order_release);
w.Wakeup();
}

} // namespace grpc_core
Loading

0 comments on commit 2c18d16

Please sign in to comment.