diff --git a/bazel/experiments.bzl b/bazel/experiments.bzl index dfecd523a99d0..4619d56c1627c 100644 --- a/bazel/experiments.bzl +++ b/bazel/experiments.bzl @@ -33,7 +33,6 @@ EXPERIMENT_ENABLES = { "promise_based_inproc_transport": "promise_based_inproc_transport", "rq_fast_reject": "rq_fast_reject", "schedule_cancellation_over_write": "schedule_cancellation_over_write", - "server_listener": "server_listener", "server_privacy": "server_privacy", "tcp_frame_size_tuning": "tcp_frame_size_tuning", "tcp_rcv_lowat": "tcp_rcv_lowat", @@ -41,6 +40,7 @@ EXPERIMENT_ENABLES = { "trace_record_callops": "trace_record_callops", "unconstrained_max_quota_buffer_size": "unconstrained_max_quota_buffer_size", "work_serializer_dispatch": "work_serializer_dispatch", + "server_listener": "server_listener,work_serializer_dispatch", } EXPERIMENT_POLLERS = [ diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.cc b/src/core/ext/transport/chttp2/server/chttp2_server.cc index 945e30802ad4f..e9f38f8105ce4 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.cc +++ b/src/core/ext/transport/chttp2/server/chttp2_server.cc @@ -1047,6 +1047,9 @@ class NewChttp2ServerListener : public Server::ListenerInterface { OrphanablePtr endpoint); void Start(const ChannelArgs& args); + void SendGoAway() override; + void DisconnectImmediately() override; + void Orphan() override; // Needed to be able to grab an external weak ref in @@ -1054,13 +1057,13 @@ class NewChttp2ServerListener : public Server::ListenerInterface { using InternallyRefCounted::RefAsSubclass; private: - bool SendGoAwayImplLocked() override; - void DisconnectImmediatelyImplLocked() override; static void OnClose(void* arg, grpc_error_handle error); - void OnDrainGraceTimeExpiry() ABSL_LOCKS_EXCLUDED(&mu_); + void SendGoAwayImplLocked(); + void DisconnectImmediatelyImplLocked(); RefCountedPtr const listener_; - // FOllowing fields are protected by WorkSerializer. + WorkSerializer work_serializer_; + // Following fields are protected by WorkSerializer. // Set by HandshakingState before the handshaking begins and reset when // handshaking is done. OrphanablePtr handshaking_state_; @@ -1087,6 +1090,12 @@ class NewChttp2ServerListener : public Server::ListenerInterface { static void DestroyListener(Server* /*server*/, void* arg, grpc_closure* destroy_done); + grpc_event_engine::experimental::EventEngine* event_engine() const { + return listener_state_->server() + ->channel_args() + .GetObject(); + } + grpc_tcp_server* tcp_server_ = nullptr; grpc_resolved_address resolved_address_; Server::ListenerState* listener_state_ = nullptr; @@ -1138,7 +1147,7 @@ NewChttp2ServerListener::ActiveConnection::HandshakingState:: } void NewChttp2ServerListener::ActiveConnection::HandshakingState::Orphan() { - connection_->work_serializer()->Run( + connection_->work_serializer_.Run( [this] { ShutdownLocked(absl::UnavailableError("Listener stopped serving.")); Unref(); @@ -1156,10 +1165,9 @@ void NewChttp2ServerListener::ActiveConnection::HandshakingState::StartLocked( std::move(endpoint_), channel_args, deadline_, acceptor_.get(), [self = Ref()](absl::StatusOr result) mutable { auto* self_ptr = self.get(); - self_ptr->connection_->work_serializer()->Run( + self_ptr->connection_->work_serializer_.Run( [self = std::move(self), result = std::move(result)]() mutable { self->OnHandshakeDoneLocked(std::move(result)); - self->Unref(); }, DEBUG_LOCATION); }); @@ -1187,10 +1195,11 @@ void NewChttp2ServerListener::ActiveConnection::HandshakingState:: void NewChttp2ServerListener::ActiveConnection::HandshakingState:: OnReceiveSettings(void* arg, grpc_error_handle /* error */) { HandshakingState* self = static_cast(arg); - self->connection_->work_serializer()->Run( + self->connection_->work_serializer_.Run( [self] { if (self->timer_handle_.has_value()) { - self->connection_->event_engine()->Cancel(*self->timer_handle_); + self->connection_->listener_->event_engine()->Cancel( + *self->timer_handle_); self->timer_handle_.reset(); } self->Unref(); @@ -1257,10 +1266,10 @@ void NewChttp2ServerListener::ActiveConnection::HandshakingState:: grpc_chttp2_transport_start_reading( transport.get(), (*result)->read_buffer.c_slice_buffer(), &on_receive_settings_, nullptr, on_close); - timer_handle_ = connection_->event_engine()->RunAfter( + timer_handle_ = connection_->listener_->event_engine()->RunAfter( deadline_ - Timestamp::Now(), [self = Ref()]() mutable { auto* self_ptr = self.get(); - self_ptr->connection_->work_serializer()->Run( + self_ptr->connection_->work_serializer_.Run( [self = std::move(self)]() mutable { ApplicationCallbackExecCtx callback_exec_ctx; ExecCtx exec_ctx; @@ -1306,8 +1315,9 @@ NewChttp2ServerListener::ActiveConnection::ActiveConnection( grpc_pollset* accepting_pollset, AcceptorPtr acceptor, const ChannelArgs& args, MemoryOwner memory_owner, OrphanablePtr endpoint) - : LogicalConnection(listener->listener_state_->server()), - listener_(std::move(listener)), + : listener_(std::move(listener)), + work_serializer_( + args.GetObjectRef()), handshaking_state_(memory_owner.MakeOrphanable( RefAsSubclass(), accepting_pollset, std::move(acceptor), args, std::move(endpoint))) { @@ -1316,7 +1326,7 @@ NewChttp2ServerListener::ActiveConnection::ActiveConnection( } void NewChttp2ServerListener::ActiveConnection::Orphan() { - work_serializer()->Run( + work_serializer_.Run( [this]() { shutdown_ = true; // Reset handshaking_state_ since we have been orphaned by the @@ -1327,28 +1337,24 @@ void NewChttp2ServerListener::ActiveConnection::Orphan() { DEBUG_LOCATION); } -bool NewChttp2ServerListener::ActiveConnection::SendGoAwayImplLocked() { - if (!shutdown_) { - // Shutdown the handshaker if it's still in progress. - if (handshaking_state_ != nullptr) { - handshaking_state_->ShutdownLocked( - absl::UnavailableError("Connection going away")); - } - shutdown_ = true; - // Send a GOAWAY if the transport exists - if (transport_ != nullptr) { - grpc_transport_op* op = grpc_make_transport_op(nullptr); - op->goaway_error = - GRPC_ERROR_CREATE("Server is stopping to serve requests."); - transport_->PerformOp(op); - return true; - } - } - return false; +void NewChttp2ServerListener::ActiveConnection::SendGoAway() { + work_serializer_.Run( + [self = RefAsSubclass()]() mutable { + self->SendGoAwayImplLocked(); + }, + DEBUG_LOCATION); +} + +void NewChttp2ServerListener::ActiveConnection::DisconnectImmediately() { + work_serializer_.Run( + [self = RefAsSubclass()]() mutable { + self->DisconnectImmediatelyImplLocked(); + }, + DEBUG_LOCATION); } void NewChttp2ServerListener::ActiveConnection::Start(const ChannelArgs& args) { - work_serializer()->Run( + work_serializer_.Run( [self = RefAsSubclass(), args]() mutable { // If the Connection is already shutdown at this point, it implies the // owning NewChttp2ServerListener and all associated @@ -1367,6 +1373,24 @@ void NewChttp2ServerListener::ActiveConnection::OnClose( self->Unref(); } +void NewChttp2ServerListener::ActiveConnection::SendGoAwayImplLocked() { + if (!shutdown_) { + // Shutdown the handshaker if it's still in progress. + if (handshaking_state_ != nullptr) { + handshaking_state_->ShutdownLocked( + absl::UnavailableError("Connection going away")); + } + shutdown_ = true; + // Send a GOAWAY if the transport exists + if (transport_ != nullptr) { + grpc_transport_op* op = grpc_make_transport_op(nullptr); + op->goaway_error = + GRPC_ERROR_CREATE("Server is stopping to serve requests."); + transport_->PerformOp(op); + } + } +} + void NewChttp2ServerListener::ActiveConnection:: DisconnectImmediatelyImplLocked() { if (transport_ != nullptr) { diff --git a/src/core/lib/experiments/experiments.cc b/src/core/lib/experiments/experiments.cc index b4fbcb17fd07e..ec973d1dfc84a 100644 --- a/src/core/lib/experiments/experiments.cc +++ b/src/core/lib/experiments/experiments.cc @@ -17,6 +17,7 @@ #include "src/core/lib/experiments/experiments.h" #include +#include #ifndef GRPC_EXPERIMENTS_ARE_FINAL @@ -76,9 +77,6 @@ const char* const description_schedule_cancellation_over_write = "Allow cancellation op to be scheduled over a write"; const char* const additional_constraints_schedule_cancellation_over_write = "{}"; -const char* const description_server_listener = - "If set, the new server listener classes are used."; -const char* const additional_constraints_server_listener = "{}"; const char* const description_server_privacy = "If set, server privacy"; const char* const additional_constraints_server_privacy = "{}"; const char* const description_tcp_frame_size_tuning = @@ -106,6 +104,11 @@ const char* const description_work_serializer_dispatch = "callback, instead of running things inline in the first thread that " "successfully enqueues work."; const char* const additional_constraints_work_serializer_dispatch = "{}"; +const char* const description_server_listener = + "If set, the new server listener classes are used."; +const char* const additional_constraints_server_listener = "{}"; +const uint8_t required_experiments_server_listener[] = { + static_cast(grpc_core::kExperimentIdWorkSerializerDispatch)}; } // namespace namespace grpc_core { @@ -150,8 +153,6 @@ const ExperimentMetadata g_experiment_metadata[] = { description_schedule_cancellation_over_write, additional_constraints_schedule_cancellation_over_write, nullptr, 0, false, true}, - {"server_listener", description_server_listener, - additional_constraints_server_listener, nullptr, 0, false, true}, {"server_privacy", description_server_privacy, additional_constraints_server_privacy, nullptr, 0, false, false}, {"tcp_frame_size_tuning", description_tcp_frame_size_tuning, @@ -168,6 +169,9 @@ const ExperimentMetadata g_experiment_metadata[] = { false, true}, {"work_serializer_dispatch", description_work_serializer_dispatch, additional_constraints_work_serializer_dispatch, nullptr, 0, false, true}, + {"server_listener", description_server_listener, + additional_constraints_server_listener, + required_experiments_server_listener, 1, false, true}, }; } // namespace grpc_core @@ -228,9 +232,6 @@ const char* const description_schedule_cancellation_over_write = "Allow cancellation op to be scheduled over a write"; const char* const additional_constraints_schedule_cancellation_over_write = "{}"; -const char* const description_server_listener = - "If set, the new server listener classes are used."; -const char* const additional_constraints_server_listener = "{}"; const char* const description_server_privacy = "If set, server privacy"; const char* const additional_constraints_server_privacy = "{}"; const char* const description_tcp_frame_size_tuning = @@ -258,6 +259,11 @@ const char* const description_work_serializer_dispatch = "callback, instead of running things inline in the first thread that " "successfully enqueues work."; const char* const additional_constraints_work_serializer_dispatch = "{}"; +const char* const description_server_listener = + "If set, the new server listener classes are used."; +const char* const additional_constraints_server_listener = "{}"; +const uint8_t required_experiments_server_listener[] = { + static_cast(grpc_core::kExperimentIdWorkSerializerDispatch)}; } // namespace namespace grpc_core { @@ -302,8 +308,6 @@ const ExperimentMetadata g_experiment_metadata[] = { description_schedule_cancellation_over_write, additional_constraints_schedule_cancellation_over_write, nullptr, 0, false, true}, - {"server_listener", description_server_listener, - additional_constraints_server_listener, nullptr, 0, false, true}, {"server_privacy", description_server_privacy, additional_constraints_server_privacy, nullptr, 0, false, false}, {"tcp_frame_size_tuning", description_tcp_frame_size_tuning, @@ -320,6 +324,9 @@ const ExperimentMetadata g_experiment_metadata[] = { false, true}, {"work_serializer_dispatch", description_work_serializer_dispatch, additional_constraints_work_serializer_dispatch, nullptr, 0, false, true}, + {"server_listener", description_server_listener, + additional_constraints_server_listener, + required_experiments_server_listener, 1, false, true}, }; } // namespace grpc_core @@ -380,9 +387,6 @@ const char* const description_schedule_cancellation_over_write = "Allow cancellation op to be scheduled over a write"; const char* const additional_constraints_schedule_cancellation_over_write = "{}"; -const char* const description_server_listener = - "If set, the new server listener classes are used."; -const char* const additional_constraints_server_listener = "{}"; const char* const description_server_privacy = "If set, server privacy"; const char* const additional_constraints_server_privacy = "{}"; const char* const description_tcp_frame_size_tuning = @@ -410,6 +414,11 @@ const char* const description_work_serializer_dispatch = "callback, instead of running things inline in the first thread that " "successfully enqueues work."; const char* const additional_constraints_work_serializer_dispatch = "{}"; +const char* const description_server_listener = + "If set, the new server listener classes are used."; +const char* const additional_constraints_server_listener = "{}"; +const uint8_t required_experiments_server_listener[] = { + static_cast(grpc_core::kExperimentIdWorkSerializerDispatch)}; } // namespace namespace grpc_core { @@ -454,8 +463,6 @@ const ExperimentMetadata g_experiment_metadata[] = { description_schedule_cancellation_over_write, additional_constraints_schedule_cancellation_over_write, nullptr, 0, false, true}, - {"server_listener", description_server_listener, - additional_constraints_server_listener, nullptr, 0, false, true}, {"server_privacy", description_server_privacy, additional_constraints_server_privacy, nullptr, 0, false, false}, {"tcp_frame_size_tuning", description_tcp_frame_size_tuning, @@ -472,6 +479,9 @@ const ExperimentMetadata g_experiment_metadata[] = { false, true}, {"work_serializer_dispatch", description_work_serializer_dispatch, additional_constraints_work_serializer_dispatch, nullptr, 0, true, true}, + {"server_listener", description_server_listener, + additional_constraints_server_listener, + required_experiments_server_listener, 1, false, true}, }; } // namespace grpc_core diff --git a/src/core/lib/experiments/experiments.h b/src/core/lib/experiments/experiments.h index 3524688438b28..e222fd0b8dba6 100644 --- a/src/core/lib/experiments/experiments.h +++ b/src/core/lib/experiments/experiments.h @@ -77,7 +77,6 @@ inline bool IsPickFirstNewEnabled() { return true; } inline bool IsPromiseBasedInprocTransportEnabled() { return false; } inline bool IsRqFastRejectEnabled() { return false; } inline bool IsScheduleCancellationOverWriteEnabled() { return false; } -inline bool IsServerListenerEnabled() { return false; } inline bool IsServerPrivacyEnabled() { return false; } inline bool IsTcpFrameSizeTuningEnabled() { return false; } inline bool IsTcpRcvLowatEnabled() { return false; } @@ -87,6 +86,7 @@ inline bool IsTimeCachingInPartyEnabled() { return true; } inline bool IsTraceRecordCallopsEnabled() { return true; } inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; } inline bool IsWorkSerializerDispatchEnabled() { return false; } +inline bool IsServerListenerEnabled() { return false; } #elif defined(GPR_WINDOWS) #define GRPC_EXPERIMENT_IS_INCLUDED_CALL_TRACER_IN_TRANSPORT @@ -112,7 +112,6 @@ inline bool IsPickFirstNewEnabled() { return true; } inline bool IsPromiseBasedInprocTransportEnabled() { return false; } inline bool IsRqFastRejectEnabled() { return false; } inline bool IsScheduleCancellationOverWriteEnabled() { return false; } -inline bool IsServerListenerEnabled() { return false; } inline bool IsServerPrivacyEnabled() { return false; } inline bool IsTcpFrameSizeTuningEnabled() { return false; } inline bool IsTcpRcvLowatEnabled() { return false; } @@ -122,6 +121,7 @@ inline bool IsTimeCachingInPartyEnabled() { return true; } inline bool IsTraceRecordCallopsEnabled() { return true; } inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; } inline bool IsWorkSerializerDispatchEnabled() { return false; } +inline bool IsServerListenerEnabled() { return false; } #else #define GRPC_EXPERIMENT_IS_INCLUDED_CALL_TRACER_IN_TRANSPORT @@ -147,7 +147,6 @@ inline bool IsPickFirstNewEnabled() { return true; } inline bool IsPromiseBasedInprocTransportEnabled() { return false; } inline bool IsRqFastRejectEnabled() { return false; } inline bool IsScheduleCancellationOverWriteEnabled() { return false; } -inline bool IsServerListenerEnabled() { return false; } inline bool IsServerPrivacyEnabled() { return false; } inline bool IsTcpFrameSizeTuningEnabled() { return false; } inline bool IsTcpRcvLowatEnabled() { return false; } @@ -158,6 +157,7 @@ inline bool IsTraceRecordCallopsEnabled() { return true; } inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_DISPATCH inline bool IsWorkSerializerDispatchEnabled() { return true; } +inline bool IsServerListenerEnabled() { return false; } #endif #else @@ -178,7 +178,6 @@ enum ExperimentIds { kExperimentIdPromiseBasedInprocTransport, kExperimentIdRqFastReject, kExperimentIdScheduleCancellationOverWrite, - kExperimentIdServerListener, kExperimentIdServerPrivacy, kExperimentIdTcpFrameSizeTuning, kExperimentIdTcpRcvLowat, @@ -186,6 +185,7 @@ enum ExperimentIds { kExperimentIdTraceRecordCallops, kExperimentIdUnconstrainedMaxQuotaBufferSize, kExperimentIdWorkSerializerDispatch, + kExperimentIdServerListener, kNumExperiments }; #define GRPC_EXPERIMENT_IS_INCLUDED_CALL_TRACER_IN_TRANSPORT @@ -252,10 +252,6 @@ inline bool IsRqFastRejectEnabled() { inline bool IsScheduleCancellationOverWriteEnabled() { return IsExperimentEnabled(); } -#define GRPC_EXPERIMENT_IS_INCLUDED_SERVER_LISTENER -inline bool IsServerListenerEnabled() { - return IsExperimentEnabled(); -} #define GRPC_EXPERIMENT_IS_INCLUDED_SERVER_PRIVACY inline bool IsServerPrivacyEnabled() { return IsExperimentEnabled(); @@ -284,6 +280,10 @@ inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { inline bool IsWorkSerializerDispatchEnabled() { return IsExperimentEnabled(); } +#define GRPC_EXPERIMENT_IS_INCLUDED_SERVER_LISTENER +inline bool IsServerListenerEnabled() { + return IsExperimentEnabled(); +} extern const ExperimentMetadata g_experiment_metadata[kNumExperiments]; diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml index adbf8cb12396e..aeeb18f026e89 100644 --- a/src/core/lib/experiments/experiments.yaml +++ b/src/core/lib/experiments/experiments.yaml @@ -139,6 +139,7 @@ - name: server_listener description: If set, the new server listener classes are used. + requires: ["work_serializer_dispatch"] expiry: 2025/03/31 owner: yashkt@google.com test_tags: ["xds_end2end_test", "core_end2end_test"] diff --git a/src/core/server/server.cc b/src/core/server/server.cc index 0a7dabdd05fa4..e262f15fbd53d 100644 --- a/src/core/server/server.cc +++ b/src/core/server/server.cc @@ -86,71 +86,6 @@ namespace grpc_core { -using grpc_event_engine::experimental::EventEngine; - -// -// Server::ListenerInterface::LogicalConnection -// - -void Server::ListenerInterface::LogicalConnection::SendGoAway() { - work_serializer_.Run( - [self = Ref(DEBUG_LOCATION, "SendGoAway")]() { - if (!self->SendGoAwayImplLocked()) { - return; - } - if (self->drain_grace_timer_handle_cancelled_) { - return; - } - CHECK(self->drain_grace_timer_handle_ == - EventEngine::TaskHandle::kInvalid); - self->drain_grace_timer_handle_ = self->event_engine()->RunAfter( - std::max(Duration::Zero(), - self->server_->channel_args() - .GetDurationFromIntMillis( - GRPC_ARG_SERVER_CONFIG_CHANGE_DRAIN_GRACE_TIME_MS) - .value_or(Duration::Minutes(10))), - [self = self->Ref()]() mutable { - ApplicationCallbackExecCtx callback_exec_ctx; - ExecCtx exec_ctx; - self->OnDrainGraceTimer(); - self.reset(); - }); - }, - DEBUG_LOCATION); -} - -void Server::ListenerInterface::LogicalConnection::CancelDrainGraceTimer() { - work_serializer_.Run( - [self = Ref(DEBUG_LOCATION, "CancelDrainGraceTimer")]() { - self->drain_grace_timer_handle_cancelled_ = true; - if (self->drain_grace_timer_handle_ != - EventEngine::TaskHandle::kInvalid) { - self->event_engine()->Cancel(self->drain_grace_timer_handle_); - self->drain_grace_timer_handle_ = EventEngine::TaskHandle::kInvalid; - } - }, - DEBUG_LOCATION); -} - -void Server::ListenerInterface::LogicalConnection::OnDrainGraceTimer() { - work_serializer_.Run( - [self = Ref(DEBUG_LOCATION, "OnDrainGraceTimer")]() mutable { - // If the drain_grace_timer_ was not cancelled, disconnect - // immediately. - bool disconnect_immediately = false; - { - self->drain_grace_timer_handle_ = EventEngine::TaskHandle::kInvalid; - if (!self->drain_grace_timer_handle_cancelled_) { - disconnect_immediately = true; - } - } - if (disconnect_immediately) { - self->DisconnectImmediatelyImplLocked(); - } - }, - DEBUG_LOCATION); -} - // // Server::ListenerState::ConfigFetcherWatcher // @@ -160,20 +95,11 @@ void Server::ListenerState::ConfigFetcherWatcher::UpdateConnectionManager( connection_manager) { RefCountedPtr connection_manager_to_destroy; - absl::flat_hash_set> - connections_to_shutdown; - auto cleanup = absl::MakeCleanup([&connections_to_shutdown]() { - // Send GOAWAYs on the transports so that they get disconnected when - // existing RPCs finish, and so that no new RPC is started on them. - for (auto& connection : connections_to_shutdown) { - connection->SendGoAway(); - } - }); { MutexLock lock(&listener_state_->mu_); connection_manager_to_destroy = listener_state_->connection_manager_; listener_state_->connection_manager_ = std::move(connection_manager); - connections_to_shutdown = std::move(listener_state_->connections_); + listener_state_->DrainConnectionsLocked(); if (listener_state_->server_->ShutdownCalled()) { return; } @@ -190,12 +116,7 @@ void Server::ListenerState::ConfigFetcherWatcher::StopServing() { { MutexLock lock(&listener_state_->mu_); listener_state_->is_serving_ = false; - connections = std::move(listener_state_->connections_); - } - // Send GOAWAYs on the transports so that they disconnect when existing - // RPCs finish. - for (auto& connection : connections) { - connection->SendGoAway(); + listener_state_->DrainConnectionsLocked(); } } @@ -302,9 +223,64 @@ void Server::ListenerState::RemoveLogicalConnection( auto connection_handle = connections_.extract(connection); if (!connection_handle.empty()) { connection_to_remove = std::move(connection_handle.value()); + return; + } + for (auto it = connections_to_be_drained_list_.begin(); + it != connections_to_be_drained_list_.end(); ++it) { + auto connection_handle = it->connections.extract(connection); + if (!connection_handle.empty()) { + connection_to_remove = std::move(connection_handle.value()); + if (it->connections.empty()) { + // Cancel the timer if the set of connections is now empty. + if (event_engine()->Cancel(it->drain_grace_timer_handle)) { + // Only remove the entry from the list if the cancellation was + // actually successful. OnDrainGraceTimer() will remove if + // cancellation is not successful. + connections_to_be_drained_list_.erase(it); + } + } + return; + } + } + } +} + +void Server::ListenerState::DrainConnectionsLocked() { + // Send GOAWAYs on the transports so that they disconnect when existing + // RPCs finish. + for (auto& connection : connections_) { + connection->SendGoAway(); + } + auto& connections_to_be_drained = connections_to_be_drained_list_.back(); + connections_to_be_drained.connections = std::move(connections_); + connections_to_be_drained.drain_grace_timer_handle = event_engine()->RunAfter( + std::max(Duration::Zero(), + server_->channel_args() + .GetDurationFromIntMillis( + GRPC_ARG_SERVER_CONFIG_CHANGE_DRAIN_GRACE_TIME_MS) + .value_or(Duration::Minutes(10))), + [this]() mutable { + ApplicationCallbackExecCtx callback_exec_ctx; + ExecCtx exec_ctx; + OnDrainGraceTimer(); + }); +} + +void Server::ListenerState::OnDrainGraceTimer() { + absl::flat_hash_set> + connections_to_be_drained; + { + MutexLock lock(&mu_); + if (connections_to_be_drained_list_.empty()) { + return; } + connections_to_be_drained = + std::move(connections_to_be_drained_list_.front().connections); + connections_to_be_drained_list_.pop_front(); + } + for (auto& connection : connections_to_be_drained) { + connection->DisconnectImmediately(); } - connection->CancelDrainGraceTimer(); } // diff --git a/src/core/server/server.h b/src/core/server/server.h index 8ce652dc8ac94..bf1b4bde073a0 100644 --- a/src/core/server/server.h +++ b/src/core/server/server.h @@ -150,54 +150,12 @@ class Server : public ServerInterface, // State for a connection that is being managed by this listener. class LogicalConnection : public InternallyRefCounted { public: - explicit LogicalConnection(Server* server) - : server_(server), - work_serializer_( - server_->channel_args() - .GetObjectRef< - grpc_event_engine::experimental::EventEngine>()) {} - LogicalConnection(const LogicalConnection&) = delete; - LogicalConnection& operator=(const LogicalConnection&) = delete; - LogicalConnection(LogicalConnection&&) = delete; - LogicalConnection& operator=(LogicalConnection&&) = delete; ~LogicalConnection() override = default; - void SendGoAway(); - - // Cancel drain grace timer. It won't be started in the future either. - void CancelDrainGraceTimer(); - - WorkSerializer* work_serializer() { return &work_serializer_; } - - protected: - grpc_event_engine::experimental::EventEngine* event_engine() const { - return server_->channel_args() - .GetObject(); - } - - private: // The following two methods are called in the context of a server config // event. - - // Returns true if the operation was successful, false, if it was - // not performed for example if the connection was already shutdown. - virtual bool SendGoAwayImplLocked() = 0; - virtual void DisconnectImmediatelyImplLocked() = 0; - - void OnDrainGraceTimer(); - - Server* server_; - // A mutex can be used here but using a WorkSerializer to make code easier - // to understand. - WorkSerializer work_serializer_; - // TODO(yashykt): Optimize this by having a single timer for all the - // connections that need to be drained. - grpc_event_engine::experimental::EventEngine::TaskHandle - drain_grace_timer_handle_ = grpc_event_engine::experimental:: - EventEngine::TaskHandle::kInvalid; - // The drain grace timer should only be started if it wasn't previously - // cancelled. - bool drain_grace_timer_handle_cancelled_ = false; + virtual void SendGoAway() = 0; + virtual void DisconnectImmediately() = 0; }; ~ListenerInterface() override = default; @@ -269,6 +227,23 @@ class Server : public ServerInterface, ListenerState* const listener_state_; }; + struct ConnectionsToBeDrained { + absl::flat_hash_set> + connections; + grpc_event_engine::experimental::EventEngine::TaskHandle + drain_grace_timer_handle = grpc_event_engine::experimental:: + EventEngine::TaskHandle::kInvalid; + }; + + void DrainConnectionsLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + + void OnDrainGraceTimer(); + + grpc_event_engine::experimental::EventEngine* event_engine() { + return server_->channel_args() + .GetObject(); + } + Server* const server_; OrphanablePtr listener_; grpc_closure destroy_done_; @@ -282,6 +257,7 @@ class Server : public ServerInterface, bool started_ ABSL_GUARDED_BY(mu_) = false; absl::flat_hash_set> connections_ ABSL_GUARDED_BY(mu_); + std::deque connections_to_be_drained_list_; }; explicit Server(const ChannelArgs& args);