Skip to content

Commit

Permalink
Use a single timer per bunch of connections for draining
Browse files Browse the repository at this point in the history
  • Loading branch information
yashykt committed Oct 19, 2024
1 parent af5b045 commit 57e9f67
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 183 deletions.
2 changes: 1 addition & 1 deletion bazel/experiments.bzl

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

90 changes: 57 additions & 33 deletions src/core/ext/transport/chttp2/server/chttp2_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1047,20 +1047,23 @@ class NewChttp2ServerListener : public Server::ListenerInterface {
OrphanablePtr<grpc_endpoint> endpoint);
void Start(const ChannelArgs& args);

void SendGoAway() override;
void DisconnectImmediately() override;

void Orphan() override;

// Needed to be able to grab an external weak ref in
// NewChttp2ServerListener::OnAccept()
using InternallyRefCounted<LogicalConnection>::RefAsSubclass;

private:
bool SendGoAwayImplLocked() override;
void DisconnectImmediatelyImplLocked() override;
static void OnClose(void* arg, grpc_error_handle error);
void OnDrainGraceTimeExpiry() ABSL_LOCKS_EXCLUDED(&mu_);
void SendGoAwayImplLocked();
void DisconnectImmediatelyImplLocked();

RefCountedPtr<NewChttp2ServerListener> const listener_;
// FOllowing fields are protected by WorkSerializer.
WorkSerializer work_serializer_;
// Following fields are protected by WorkSerializer.
// Set by HandshakingState before the handshaking begins and reset when
// handshaking is done.
OrphanablePtr<HandshakingState> handshaking_state_;
Expand All @@ -1087,6 +1090,12 @@ class NewChttp2ServerListener : public Server::ListenerInterface {
static void DestroyListener(Server* /*server*/, void* arg,
grpc_closure* destroy_done);

grpc_event_engine::experimental::EventEngine* event_engine() const {
return listener_state_->server()
->channel_args()
.GetObject<grpc_event_engine::experimental::EventEngine>();
}

grpc_tcp_server* tcp_server_ = nullptr;
grpc_resolved_address resolved_address_;
Server::ListenerState* listener_state_ = nullptr;
Expand Down Expand Up @@ -1138,7 +1147,7 @@ NewChttp2ServerListener::ActiveConnection::HandshakingState::
}

void NewChttp2ServerListener::ActiveConnection::HandshakingState::Orphan() {
connection_->work_serializer()->Run(
connection_->work_serializer_.Run(
[this] {
ShutdownLocked(absl::UnavailableError("Listener stopped serving."));
Unref();
Expand All @@ -1156,10 +1165,9 @@ void NewChttp2ServerListener::ActiveConnection::HandshakingState::StartLocked(
std::move(endpoint_), channel_args, deadline_, acceptor_.get(),
[self = Ref()](absl::StatusOr<HandshakerArgs*> result) mutable {
auto* self_ptr = self.get();
self_ptr->connection_->work_serializer()->Run(
self_ptr->connection_->work_serializer_.Run(
[self = std::move(self), result = std::move(result)]() mutable {
self->OnHandshakeDoneLocked(std::move(result));
self->Unref();
},
DEBUG_LOCATION);
});
Expand Down Expand Up @@ -1187,10 +1195,11 @@ void NewChttp2ServerListener::ActiveConnection::HandshakingState::
void NewChttp2ServerListener::ActiveConnection::HandshakingState::
OnReceiveSettings(void* arg, grpc_error_handle /* error */) {
HandshakingState* self = static_cast<HandshakingState*>(arg);
self->connection_->work_serializer()->Run(
self->connection_->work_serializer_.Run(
[self] {
if (self->timer_handle_.has_value()) {
self->connection_->event_engine()->Cancel(*self->timer_handle_);
self->connection_->listener_->event_engine()->Cancel(
*self->timer_handle_);
self->timer_handle_.reset();
}
self->Unref();
Expand Down Expand Up @@ -1257,10 +1266,10 @@ void NewChttp2ServerListener::ActiveConnection::HandshakingState::
grpc_chttp2_transport_start_reading(
transport.get(), (*result)->read_buffer.c_slice_buffer(),
&on_receive_settings_, nullptr, on_close);
timer_handle_ = connection_->event_engine()->RunAfter(
timer_handle_ = connection_->listener_->event_engine()->RunAfter(
deadline_ - Timestamp::Now(), [self = Ref()]() mutable {
auto* self_ptr = self.get();
self_ptr->connection_->work_serializer()->Run(
self_ptr->connection_->work_serializer_.Run(
[self = std::move(self)]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
Expand Down Expand Up @@ -1306,8 +1315,9 @@ NewChttp2ServerListener::ActiveConnection::ActiveConnection(
grpc_pollset* accepting_pollset, AcceptorPtr acceptor,
const ChannelArgs& args, MemoryOwner memory_owner,
OrphanablePtr<grpc_endpoint> endpoint)
: LogicalConnection(listener->listener_state_->server()),
listener_(std::move(listener)),
: listener_(std::move(listener)),
work_serializer_(
args.GetObjectRef<grpc_event_engine::experimental::EventEngine>()),
handshaking_state_(memory_owner.MakeOrphanable<HandshakingState>(
RefAsSubclass<ActiveConnection>(), accepting_pollset,
std::move(acceptor), args, std::move(endpoint))) {
Expand All @@ -1316,7 +1326,7 @@ NewChttp2ServerListener::ActiveConnection::ActiveConnection(
}

void NewChttp2ServerListener::ActiveConnection::Orphan() {
work_serializer()->Run(
work_serializer_.Run(
[this]() {
shutdown_ = true;
// Reset handshaking_state_ since we have been orphaned by the
Expand All @@ -1327,28 +1337,24 @@ void NewChttp2ServerListener::ActiveConnection::Orphan() {
DEBUG_LOCATION);
}

bool NewChttp2ServerListener::ActiveConnection::SendGoAwayImplLocked() {
if (!shutdown_) {
// Shutdown the handshaker if it's still in progress.
if (handshaking_state_ != nullptr) {
handshaking_state_->ShutdownLocked(
absl::UnavailableError("Connection going away"));
}
shutdown_ = true;
// Send a GOAWAY if the transport exists
if (transport_ != nullptr) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->goaway_error =
GRPC_ERROR_CREATE("Server is stopping to serve requests.");
transport_->PerformOp(op);
return true;
}
}
return false;
void NewChttp2ServerListener::ActiveConnection::SendGoAway() {
work_serializer_.Run(
[self = RefAsSubclass<ActiveConnection>()]() mutable {
self->SendGoAwayImplLocked();
},
DEBUG_LOCATION);
}

void NewChttp2ServerListener::ActiveConnection::DisconnectImmediately() {
work_serializer_.Run(
[self = RefAsSubclass<ActiveConnection>()]() mutable {
self->DisconnectImmediatelyImplLocked();
},
DEBUG_LOCATION);
}

void NewChttp2ServerListener::ActiveConnection::Start(const ChannelArgs& args) {
work_serializer()->Run(
work_serializer_.Run(
[self = RefAsSubclass<ActiveConnection>(), args]() mutable {
// If the Connection is already shutdown at this point, it implies the
// owning NewChttp2ServerListener and all associated
Expand All @@ -1367,6 +1373,24 @@ void NewChttp2ServerListener::ActiveConnection::OnClose(
self->Unref();
}

void NewChttp2ServerListener::ActiveConnection::SendGoAwayImplLocked() {
if (!shutdown_) {
// Shutdown the handshaker if it's still in progress.
if (handshaking_state_ != nullptr) {
handshaking_state_->ShutdownLocked(
absl::UnavailableError("Connection going away"));
}
shutdown_ = true;
// Send a GOAWAY if the transport exists
if (transport_ != nullptr) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->goaway_error =
GRPC_ERROR_CREATE("Server is stopping to serve requests.");
transport_->PerformOp(op);
}
}
}

void NewChttp2ServerListener::ActiveConnection::
DisconnectImmediatelyImplLocked() {
if (transport_ != nullptr) {
Expand Down
40 changes: 25 additions & 15 deletions src/core/lib/experiments/experiments.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 57e9f67

Please sign in to comment.