From d4fa2e6d253853ee6864524a169faf05d7c092b2 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Tue, 7 Jan 2025 16:30:02 -0800 Subject: [PATCH] [ConfigFetcherWatcher] Redesign interfaces between Server and Listener (#37601) Closes #37601 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37601 from yashykt:LogicalConnection 50aacc3dd145d2d8ca19abf6fd2ed9d029200992 PiperOrigin-RevId: 713079761 --- BUILD | 27 +- CMakeLists.txt | 53 ++ bazel/experiments.bzl | 13 + build_autogenerated.yaml | 31 + src/core/BUILD | 1 + .../chaotic_good/server/chaotic_good_server.h | 12 +- .../server/chaotic_good_server.h | 12 +- .../transport/chttp2/server/chttp2_server.cc | 690 +++++++++++++++--- .../transport/chttp2/server/chttp2_server.h | 202 ++++- src/core/lib/experiments/experiments.cc | 24 + src/core/lib/experiments/experiments.h | 8 + src/core/lib/experiments/experiments.yaml | 7 + src/core/lib/experiments/rollouts.yaml | 2 + .../lib/resource_quota/connection_quota.h | 4 + src/core/server/server.cc | 314 +++++++- src/core/server/server.h | 260 +++++-- src/core/server/xds_server_config_fetcher.cc | 32 +- test/core/transport/chttp2/BUILD | 19 + .../chttp2/chttp2_server_listener_test.cc | 461 ++++++++++++ .../xds/xds_enabled_server_end2end_test.cc | 4 - tools/run_tests/generated/tests.json | 24 + 21 files changed, 1979 insertions(+), 221 deletions(-) create mode 100644 test/core/transport/chttp2/chttp2_server_listener_test.cc diff --git a/BUILD b/BUILD index 235130a67cb1c..6f8f43b7bef3d 100644 --- a/BUILD +++ b/BUILD @@ -556,6 +556,9 @@ grpc_cc_library( external_deps = [ "absl/base:core_headers", "absl/log:log", + "absl/status", + "absl/status:statusor", + "absl/strings", "absl/time:time", "address_sorting", ], @@ -574,6 +577,7 @@ grpc_cc_library( "grpc_base", "grpc_client_channel", "grpc_common", + "grpc_core_credentials_header", "grpc_http_filters", "grpc_security_base", "grpc_trace", @@ -630,6 +634,9 @@ grpc_cc_library( external_deps = [ "absl/base:core_headers", "absl/log:log", + "absl/status", + "absl/status:statusor", + "absl/strings", "absl/time:time", "address_sorting", ], @@ -657,6 +664,7 @@ grpc_cc_library( "grpc_base", "grpc_client_channel", "grpc_common", + "grpc_core_credentials_header", "grpc_credentials_util", "grpc_http_filters", "grpc_jwt_credentials", @@ -772,6 +780,7 @@ grpc_cc_library( "absl/synchronization", "absl/time:time", "absl/types:optional", + "absl/types:variant", ], language = "c++", public_hdrs = GPR_PUBLIC_HDRS, @@ -1132,12 +1141,15 @@ grpc_cc_library( "src/cpp/server/insecure_server_credentials.cc", ], external_deps = [ + "absl/functional:any_invocable", "absl/log:check", "absl/log:log", "absl/log:absl_check", "absl/log:absl_log", + "absl/status", "absl/status:statusor", "absl/strings", + "absl/strings:cord", "absl/synchronization", ], language = "c++", @@ -1154,6 +1166,7 @@ grpc_cc_library( "gpr", "grpc++_base_unsecure", "grpc++_codegen_proto", + "grpc++_config_proto", "grpc_core_credentials_header", "grpc_public_hdrs", "grpc_security_base", @@ -1509,8 +1522,8 @@ grpc_cc_library( "absl/container:flat_hash_map", "absl/container:flat_hash_set", "absl/functional:any_invocable", + "absl/log", "absl/log:check", - "absl/log:log", "absl/status", "absl/status:statusor", "absl/strings", @@ -1533,6 +1546,7 @@ grpc_cc_library( "debug_location", "exec_ctx", "gpr", + "grpc_core_credentials_header", "grpc_public_hdrs", "grpc_trace", "iomgr_buffer_list", @@ -1558,7 +1572,6 @@ grpc_cc_library( "//src/core:event_engine_shim", "//src/core:event_engine_tcp_socket_utils", "//src/core:event_log", - "//src/core:examine_stack", "//src/core:experiments", "//src/core:gpr_atm", "//src/core:gpr_manual_constructor", @@ -1764,8 +1777,8 @@ grpc_cc_library( "absl/container:flat_hash_map", "absl/container:flat_hash_set", "absl/hash", + "absl/log", "absl/log:check", - "absl/log:log", "absl/random", "absl/status", "absl/status:statusor", @@ -1787,12 +1800,14 @@ grpc_cc_library( "gpr", "grpc_base", "grpc_public_hdrs", + "grpc_security_base", "grpc_trace", "iomgr", "legacy_channel", "orphanable", "promise", "ref_counted_ptr", + "sockaddr_utils", "stats", "//src/core:activity", "//src/core:arena_promise", @@ -1802,6 +1817,7 @@ grpc_cc_library( "//src/core:channel_fwd", "//src/core:channel_stack_type", "//src/core:closure", + "//src/core:connection_quota", "//src/core:connectivity_state", "//src/core:context", "//src/core:dual_ref_counted", @@ -1816,6 +1832,7 @@ grpc_cc_library( "//src/core:poll", "//src/core:pollset_set", "//src/core:random_early_detection", + "//src/core:resolved_address", "//src/core:seq", "//src/core:server_interface", "//src/core:slice", @@ -1892,6 +1909,7 @@ grpc_cc_library( "absl/status:statusor", "absl/strings", "absl/strings:str_format", + "absl/time", "absl/types:optional", "absl/utility", "madler_zlib", @@ -2330,6 +2348,7 @@ grpc_cc_library( "absl/status", "absl/status:statusor", "absl/strings", + "absl/strings:cord", "absl/strings:str_format", "absl/synchronization", "absl/memory", @@ -2353,6 +2372,7 @@ grpc_cc_library( "gpr", "grpc", "grpc++_codegen_proto", + "grpc++_config_proto", "grpc_base", "grpc_core_credentials_header", "grpc_credentials_util", @@ -2418,6 +2438,7 @@ grpc_cc_library( "absl/status", "absl/status:statusor", "absl/strings", + "absl/strings:cord", "absl/synchronization", "absl/log:absl_check", "absl/log:absl_log", diff --git a/CMakeLists.txt b/CMakeLists.txt index 6beddffa81d4e..a9a9a5b5d9dab 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1364,6 +1364,7 @@ if(gRPC_BUILD_TESTS) endif() add_dependencies(buildtests_cxx check_gcp_environment_linux_test) add_dependencies(buildtests_cxx check_gcp_environment_windows_test) + add_dependencies(buildtests_cxx chttp2_server_listener_test) add_dependencies(buildtests_cxx chunked_vector_test) add_dependencies(buildtests_cxx cli_call_test) add_dependencies(buildtests_cxx client_auth_filter_test) @@ -11801,6 +11802,58 @@ target_link_libraries(check_gcp_environment_windows_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(chttp2_server_listener_test + test/core/end2end/cq_verifier.cc + test/core/test_util/cmdline.cc + test/core/test_util/fuzzer_util.cc + test/core/test_util/grpc_profiler.cc + test/core/test_util/histogram.cc + test/core/test_util/mock_endpoint.cc + test/core/test_util/parse_hexstring.cc + test/core/test_util/resolve_localhost_ip46.cc + test/core/test_util/slice_splitter.cc + test/core/test_util/tracer_util.cc + test/core/transport/chttp2/chttp2_server_listener_test.cc +) +if(WIN32 AND MSVC) + if(BUILD_SHARED_LIBS) + target_compile_definitions(chttp2_server_listener_test + PRIVATE + "GPR_DLL_IMPORTS" + "GRPC_DLL_IMPORTS" + ) + endif() +endif() +target_compile_features(chttp2_server_listener_test PUBLIC cxx_std_17) +target_include_directories(chttp2_server_listener_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(chttp2_server_listener_test + ${_gRPC_ALLTARGETS_LIBRARIES} + gtest + grpc_test_util +) + + endif() if(gRPC_BUILD_TESTS) diff --git a/bazel/experiments.bzl b/bazel/experiments.bzl index 4ee9b92f3698c..c30a7def161a7 100644 --- a/bazel/experiments.bzl +++ b/bazel/experiments.bzl @@ -47,6 +47,7 @@ EXPERIMENT_ENABLES = { "trace_record_callops": "trace_record_callops", "unconstrained_max_quota_buffer_size": "unconstrained_max_quota_buffer_size", "work_serializer_dispatch": "work_serializer_dispatch", + "server_listener": "server_listener,work_serializer_dispatch", } EXPERIMENT_POLLERS = [ @@ -67,6 +68,7 @@ EXPERIMENTS = { "local_connector_secure", "posix_ee_skip_grpc_init", "retry_in_callv3", + "server_listener", ], "cpp_end2end_test": [ "posix_ee_skip_grpc_init", @@ -84,6 +86,9 @@ EXPERIMENTS = { "free_large_allocator", "unconstrained_max_quota_buffer_size", ], + "xds_end2end_test": [ + "server_listener", + ], }, "on": { "cancel_ares_query_test": [ @@ -129,6 +134,7 @@ EXPERIMENTS = { "local_connector_secure", "posix_ee_skip_grpc_init", "retry_in_callv3", + "server_listener", ], "cpp_end2end_test": [ "posix_ee_skip_grpc_init", @@ -146,6 +152,9 @@ EXPERIMENTS = { "free_large_allocator", "unconstrained_max_quota_buffer_size", ], + "xds_end2end_test": [ + "server_listener", + ], }, "on": { "core_end2end_test": [ @@ -177,6 +186,7 @@ EXPERIMENTS = { "local_connector_secure", "posix_ee_skip_grpc_init", "retry_in_callv3", + "server_listener", ], "cpp_end2end_test": [ "posix_ee_skip_grpc_init", @@ -194,6 +204,9 @@ EXPERIMENTS = { "free_large_allocator", "unconstrained_max_quota_buffer_size", ], + "xds_end2end_test": [ + "server_listener", + ], }, "on": { "cancel_ares_query_test": [ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 64062d5fd6a22..7ac76cd1d52b9 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -8885,6 +8885,37 @@ targets: deps: - gtest - grpc_test_util +- name: chttp2_server_listener_test + gtest: true + build: test + language: c++ + headers: + - test/core/end2end/cq_verifier.h + - test/core/test_util/cmdline.h + - test/core/test_util/evaluate_args_test_util.h + - test/core/test_util/fuzzer_util.h + - test/core/test_util/grpc_profiler.h + - test/core/test_util/histogram.h + - test/core/test_util/mock_endpoint.h + - test/core/test_util/parse_hexstring.h + - test/core/test_util/resolve_localhost_ip46.h + - test/core/test_util/slice_splitter.h + - test/core/test_util/tracer_util.h + src: + - test/core/end2end/cq_verifier.cc + - test/core/test_util/cmdline.cc + - test/core/test_util/fuzzer_util.cc + - test/core/test_util/grpc_profiler.cc + - test/core/test_util/histogram.cc + - test/core/test_util/mock_endpoint.cc + - test/core/test_util/parse_hexstring.cc + - test/core/test_util/resolve_localhost_ip46.cc + - test/core/test_util/slice_splitter.cc + - test/core/test_util/tracer_util.cc + - test/core/transport/chttp2/chttp2_server_listener_test.cc + deps: + - gtest + - grpc_test_util - name: chunked_vector_test gtest: true build: test diff --git a/src/core/BUILD b/src/core/BUILD index 638271cf85ccc..eda740ee1f6a7 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -7807,6 +7807,7 @@ grpc_cc_library( "grpc_insecure_credentials", "handshaker_registry", "iomgr_fwd", + "match", "memory_quota", "pollset_set", "resolved_address", diff --git a/src/core/ext/transport/chaotic_good/server/chaotic_good_server.h b/src/core/ext/transport/chaotic_good/server/chaotic_good_server.h index d7862afb3ae6c..978aac618468f 100644 --- a/src/core/ext/transport/chaotic_good/server/chaotic_good_server.h +++ b/src/core/ext/transport/chaotic_good/server/chaotic_good_server.h @@ -172,14 +172,20 @@ class ChaoticGoodServerListener final : public Server::ListenerInterface { bool shutdown_ ABSL_GUARDED_BY(mu_) = false; }; - void Start(Server*, const std::vector*) override { - StartListening().IgnoreError(); - }; + void Start() override { StartListening().IgnoreError(); }; channelz::ListenSocketNode* channelz_listen_socket_node() const override { return nullptr; } + void SetServerListenerState(RefCountedPtr) override {} + + const grpc_resolved_address* resolved_address() const override { + // chaotic good doesn't use the new ListenerState interface yet. + Crash("Unimplemented"); + return nullptr; + } + void SetOnDestroyDone(grpc_closure* closure) override { MutexLock lock(&mu_); on_destroy_done_ = closure; diff --git a/src/core/ext/transport/chaotic_good_legacy/server/chaotic_good_server.h b/src/core/ext/transport/chaotic_good_legacy/server/chaotic_good_server.h index c804b68cba2ea..270a176019326 100644 --- a/src/core/ext/transport/chaotic_good_legacy/server/chaotic_good_server.h +++ b/src/core/ext/transport/chaotic_good_legacy/server/chaotic_good_server.h @@ -126,14 +126,20 @@ class ChaoticGoodServerListener final : public Server::ListenerInterface { int32_t data_alignment_; }; - void Start(Server*, const std::vector*) override { - StartListening().IgnoreError(); - }; + void Start() override { StartListening().IgnoreError(); }; channelz::ListenSocketNode* channelz_listen_socket_node() const override { return nullptr; } + void SetServerListenerState(RefCountedPtr) override {} + + const grpc_resolved_address* resolved_address() const override { + // chaotic good doesn't use the new ListenerState interface yet. + Crash("Unimplemented"); + return nullptr; + } + void SetOnDestroyDone(grpc_closure* closure) override { MutexLock lock(&mu_); on_destroy_done_ = closure; diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.cc b/src/core/ext/transport/chttp2/server/chttp2_server.cc index 3358d185038c6..051cf35f6d975 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.cc +++ b/src/core/ext/transport/chttp2/server/chttp2_server.cc @@ -82,6 +82,7 @@ #include "src/core/lib/transport/transport.h" #include "src/core/server/server.h" #include "src/core/util/debug_location.h" +#include "src/core/util/match.h" #include "src/core/util/orphanable.h" #include "src/core/util/ref_counted_ptr.h" #include "src/core/util/status_helper.h" @@ -107,24 +108,26 @@ const char kUnixUriPrefix[] = "unix:"; const char kUnixAbstractUriPrefix[] = "unix-abstract:"; const char kVSockUriPrefix[] = "vsock:"; -struct AcceptorDeleter { - void operator()(grpc_tcp_server_acceptor* acceptor) const { - gpr_free(acceptor); - } -}; +namespace { +Timestamp GetConnectionDeadline(const ChannelArgs& args) { + return Timestamp::Now() + + std::max( + Duration::Milliseconds(1), + args.GetDurationFromIntMillis(GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS) + .value_or(Duration::Minutes(2))); +} +} // namespace + using AcceptorPtr = std::unique_ptr; class Chttp2ServerListener : public Server::ListenerInterface { public: static grpc_error_handle Create(Server* server, const EventEngine::ResolvedAddress& addr, - const ChannelArgs& args, - Chttp2ServerArgsModifier args_modifier, - int* port_num); + const ChannelArgs& args, int* port_num); - static grpc_error_handle CreateWithAcceptor( - Server* server, const char* name, const ChannelArgs& args, - Chttp2ServerArgsModifier args_modifier); + static grpc_error_handle CreateWithAcceptor(Server* server, const char* name, + const ChannelArgs& args); static Chttp2ServerListener* CreateForPassiveListener( Server* server, const ChannelArgs& args, @@ -132,14 +135,12 @@ class Chttp2ServerListener : public Server::ListenerInterface { // Do not instantiate directly. Use one of the factory methods above. Chttp2ServerListener(Server* server, const ChannelArgs& args, - Chttp2ServerArgsModifier args_modifier, - grpc_server_config_fetcher* config_fetcher, + ServerConfigFetcher* config_fetcher, std::shared_ptr passive_listener = nullptr); ~Chttp2ServerListener() override; - void Start(Server* server, - const std::vector* pollsets) override; + void Start() override; void AcceptConnectedEndpoint(std::unique_ptr endpoint); @@ -147,6 +148,15 @@ class Chttp2ServerListener : public Server::ListenerInterface { return channelz_listen_socket_.get(); } + void SetServerListenerState( + RefCountedPtr /*listener_state*/) override {} + + const grpc_resolved_address* resolved_address() const override { + // Should only be invoked with experiment server_listener + Crash("Illegal"); + return nullptr; + } + void SetOnDestroyDone(grpc_closure* on_destroy_done) override; void Orphan() override; @@ -154,14 +164,13 @@ class Chttp2ServerListener : public Server::ListenerInterface { private: friend class experimental::PassiveListenerImpl; - class ConfigFetcherWatcher - : public grpc_server_config_fetcher::WatcherInterface { + class ConfigFetcherWatcher : public ServerConfigFetcher::WatcherInterface { public: explicit ConfigFetcherWatcher(RefCountedPtr listener) : listener_(std::move(listener)) {} void UpdateConnectionManager( - RefCountedPtr + RefCountedPtr connection_manager) override; void StopServing() override; @@ -267,12 +276,11 @@ class Chttp2ServerListener : public Server::ListenerInterface { Server* const server_ = nullptr; grpc_tcp_server* tcp_server_ = nullptr; grpc_resolved_address resolved_address_; - Chttp2ServerArgsModifier const args_modifier_; ConfigFetcherWatcher* config_fetcher_watcher_ = nullptr; ChannelArgs args_; Mutex mu_; - RefCountedPtr - connection_manager_ ABSL_GUARDED_BY(mu_); + RefCountedPtr connection_manager_ + ABSL_GUARDED_BY(mu_); // Signals whether grpc_tcp_server_start() has been called. bool started_ ABSL_GUARDED_BY(mu_) = false; // Signals whether grpc_tcp_server_start() has completed. @@ -288,7 +296,7 @@ class Chttp2ServerListener : public Server::ListenerInterface { RefCountedPtr channelz_listen_socket_; MemoryQuotaRefPtr memory_quota_; ConnectionQuotaRefPtr connection_quota_; - grpc_server_config_fetcher* config_fetcher_ = nullptr; + ServerConfigFetcher* config_fetcher_ = nullptr; // TODO(yashykt): consider using absl::variant<> to minimize memory usage for // disjoint cases where different fields are used. std::shared_ptr passive_listener_; @@ -299,9 +307,8 @@ class Chttp2ServerListener : public Server::ListenerInterface { // void Chttp2ServerListener::ConfigFetcherWatcher::UpdateConnectionManager( - RefCountedPtr - connection_manager) { - RefCountedPtr + RefCountedPtr connection_manager) { + RefCountedPtr connection_manager_to_destroy; class GracefulShutdownExistingConnections { public: @@ -369,14 +376,6 @@ void Chttp2ServerListener::ConfigFetcherWatcher::StopServing() { // Chttp2ServerListener::ActiveConnection::HandshakingState // -Timestamp GetConnectionDeadline(const ChannelArgs& args) { - return Timestamp::Now() + - std::max( - Duration::Milliseconds(1), - args.GetDurationFromIntMillis(GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS) - .value_or(Duration::Seconds(120))); -} - Chttp2ServerListener::ActiveConnection::HandshakingState::HandshakingState( RefCountedPtr connection_ref, grpc_pollset* accepting_pollset, AcceptorPtr acceptor, @@ -708,11 +707,10 @@ void Chttp2ServerListener::ActiveConnection::OnDrainGraceTimeExpiry() { grpc_error_handle Chttp2ServerListener::Create( Server* server, const EventEngine::ResolvedAddress& addr, - const ChannelArgs& args, Chttp2ServerArgsModifier args_modifier, - int* port_num) { + const ChannelArgs& args, int* port_num) { // Create Chttp2ServerListener. OrphanablePtr listener = - MakeOrphanable(server, args, args_modifier, + MakeOrphanable(server, args, server->config_fetcher()); // The tcp_server will be unreffed when the listener is orphaned, which could // be at the end of this function if the listener was not added to the @@ -752,10 +750,9 @@ grpc_error_handle Chttp2ServerListener::Create( } grpc_error_handle Chttp2ServerListener::CreateWithAcceptor( - Server* server, const char* name, const ChannelArgs& args, - Chttp2ServerArgsModifier args_modifier) { + Server* server, const char* name, const ChannelArgs& args) { auto listener = MakeOrphanable( - server, args, args_modifier, server->config_fetcher()); + server, args, server->config_fetcher()); grpc_error_handle error = grpc_tcp_server_create( &listener->tcp_server_shutdown_complete_, ChannelArgsEndpointConfig(args), OnAccept, listener.get(), &listener->tcp_server_); @@ -772,9 +769,7 @@ Chttp2ServerListener* Chttp2ServerListener::CreateForPassiveListener( std::shared_ptr passive_listener) { // TODO(hork): figure out how to handle channelz in this case auto listener = MakeOrphanable( - server, args, /*args_modifier=*/ - [](const ChannelArgs& args, grpc_error_handle*) { return args; }, nullptr, - std::move(passive_listener)); + server, args, nullptr, std::move(passive_listener)); auto listener_ptr = listener.get(); server->AddListener(std::move(listener)); return listener_ptr; @@ -782,11 +777,9 @@ Chttp2ServerListener* Chttp2ServerListener::CreateForPassiveListener( Chttp2ServerListener::Chttp2ServerListener( Server* server, const ChannelArgs& args, - Chttp2ServerArgsModifier args_modifier, - grpc_server_config_fetcher* config_fetcher, + ServerConfigFetcher* config_fetcher, std::shared_ptr passive_listener) : server_(server), - args_modifier_(args_modifier), args_(args), memory_quota_(args.GetObject()->memory_quota()), connection_quota_(MakeRefCounted()), @@ -812,8 +805,7 @@ Chttp2ServerListener::~Chttp2ServerListener() { } // Server callback: start listening on our ports -void Chttp2ServerListener::Start( - Server* /*server*/, const std::vector* /* pollsets */) { +void Chttp2ServerListener::Start() { if (config_fetcher_ != nullptr) { auto watcher = std::make_unique( RefAsSubclass()); @@ -848,6 +840,27 @@ void Chttp2ServerListener::AcceptConnectedEndpoint( /*accepting_pollset=*/nullptr, /*acceptor=*/nullptr); } +namespace { + +ChannelArgs ModifyArgsForConnection(const ChannelArgs& args, + grpc_error_handle* error) { + auto* server_credentials = args.GetObject(); + if (server_credentials == nullptr) { + *error = GRPC_ERROR_CREATE("Could not find server credentials"); + return args; + } + auto security_connector = server_credentials->create_security_connector(args); + if (security_connector == nullptr) { + *error = GRPC_ERROR_CREATE( + absl::StrCat("Unable to create secure server with credentials of type ", + server_credentials->type().name())); + return args; + } + return args.SetObject(security_connector); +} + +} // namespace + void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp, grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* server_acceptor) { @@ -855,8 +868,7 @@ void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp, ChannelArgs args = self->args_; OrphanablePtr endpoint(tcp); AcceptorPtr acceptor(server_acceptor); - RefCountedPtr - connection_manager; + RefCountedPtr connection_manager; { MutexLock lock(&self->mu_); connection_manager = self->connection_manager_; @@ -875,7 +887,7 @@ void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp, return; } grpc_error_handle error; - args = self->args_modifier_(*args_result, &error); + args = ModifyArgsForConnection(*args_result, &error); if (!error.ok()) { return; } @@ -953,19 +965,498 @@ void Chttp2ServerListener::Orphan() { } // -// Chttp2ServerAddPort() +// NewChttp2ServerListener::ActiveConnection::HandshakingState +// + +NewChttp2ServerListener::ActiveConnection::HandshakingState::HandshakingState( + RefCountedPtr connection_ref, grpc_tcp_server* tcp_server, + grpc_pollset* accepting_pollset, AcceptorPtr acceptor, + const ChannelArgs& args, OrphanablePtr endpoint) + : connection_(std::move(connection_ref)), + tcp_server_(tcp_server), + accepting_pollset_(accepting_pollset), + acceptor_(std::move(acceptor)), + interested_parties_(grpc_pollset_set_create()), + deadline_(GetConnectionDeadline(args)), + endpoint_(std::move(endpoint)), + handshake_mgr_(MakeRefCounted()) { + if (accepting_pollset != nullptr) { + grpc_pollset_set_add_pollset(interested_parties_, accepting_pollset_); + } +} + +NewChttp2ServerListener::ActiveConnection::HandshakingState:: + ~HandshakingState() { + if (accepting_pollset_ != nullptr) { + grpc_pollset_set_del_pollset(interested_parties_, accepting_pollset_); + } + grpc_pollset_set_destroy(interested_parties_); + if (tcp_server_ != nullptr) { + grpc_tcp_server_unref(tcp_server_); + } +} + +void NewChttp2ServerListener::ActiveConnection::HandshakingState::Orphan() { + connection_->work_serializer_.Run( + [this] { + ShutdownLocked(absl::UnavailableError("Listener stopped serving.")); + Unref(); + }, + DEBUG_LOCATION); +} + +void NewChttp2ServerListener::ActiveConnection::HandshakingState::StartLocked( + const ChannelArgs& channel_args) { + if (handshake_mgr_ == nullptr) { + // The connection is already shutting down. + return; + } + CoreConfiguration::Get().handshaker_registry().AddHandshakers( + HANDSHAKER_SERVER, channel_args, interested_parties_, + handshake_mgr_.get()); + handshake_mgr_->DoHandshake( + std::move(endpoint_), channel_args, deadline_, acceptor_.get(), + [self = Ref()](absl::StatusOr result) mutable { + auto* self_ptr = self.get(); + self_ptr->connection_->work_serializer_.Run( + [self = std::move(self), result = std::move(result)]() mutable { + self->OnHandshakeDoneLocked(std::move(result)); + }, + DEBUG_LOCATION); + }); +} + +void NewChttp2ServerListener::ActiveConnection::HandshakingState:: + ShutdownLocked(absl::Status status) { + if (handshake_mgr_ != nullptr) { + handshake_mgr_->Shutdown(std::move(status)); + } +} + +void NewChttp2ServerListener::ActiveConnection::HandshakingState:: + OnTimeoutLocked() { + if (!timer_handle_.has_value()) { + return; + } + timer_handle_.reset(); + auto t = absl::get>(connection_->state_); + t->DisconnectWithError(GRPC_ERROR_CREATE( + "Did not receive HTTP/2 settings before handshake timeout")); +} + +void NewChttp2ServerListener::ActiveConnection::HandshakingState:: + OnReceiveSettings(void* arg, grpc_error_handle /* error */) { + HandshakingState* self = static_cast(arg); + self->connection_->work_serializer_.Run( + [self] { + if (self->timer_handle_.has_value()) { + self->connection_->listener_state_->event_engine()->Cancel( + *self->timer_handle_); + self->timer_handle_.reset(); + } + self->Unref(); + }, + DEBUG_LOCATION); +} + +void NewChttp2ServerListener::ActiveConnection::HandshakingState:: + OnHandshakeDoneLocked(absl::StatusOr result) { + OrphanablePtr handshaking_state_ref; + RefCountedPtr handshake_mgr; + // If the handshaking succeeded but there is no endpoint, then the + // handshaker may have handed off the connection to some external + // code, so we can just clean up here without creating a transport. + if (!connection_->shutdown_ && result.ok() && + (*result)->endpoint != nullptr) { + RefCountedPtr transport = + grpc_create_chttp2_transport((*result)->args, + std::move((*result)->endpoint), false) + ->Ref(); + grpc_error_handle channel_init_err = + connection_->listener_state_->server()->SetupTransport( + transport.get(), accepting_pollset_, (*result)->args, + grpc_chttp2_transport_get_socket_node(transport.get())); + if (channel_init_err.ok()) { + // Use notify_on_receive_settings callback to enforce the + // handshake deadline. + connection_->state_ = + DownCast(transport.get())->Ref(); + Ref().release(); // Held by OnReceiveSettings(). + GRPC_CLOSURE_INIT(&on_receive_settings_, OnReceiveSettings, this, + grpc_schedule_on_exec_ctx); + grpc_closure* on_close = &connection_->on_close_; + // Refs helds by OnClose() + connection_->Ref().release(); + grpc_chttp2_transport_start_reading( + transport.get(), (*result)->read_buffer.c_slice_buffer(), + &on_receive_settings_, nullptr, on_close); + timer_handle_ = connection_->listener_state_->event_engine()->RunAfter( + deadline_ - Timestamp::Now(), [self = Ref()]() mutable { + // HandshakingState deletion might require an active ExecCtx. + ApplicationCallbackExecCtx callback_exec_ctx; + ExecCtx exec_ctx; + auto* self_ptr = self.get(); + self_ptr->connection_->work_serializer_.Run( + [self = std::move(self)]() { self->OnTimeoutLocked(); }, + DEBUG_LOCATION); + }); + } else { + // Failed to create channel from transport. Clean up. + LOG(ERROR) << "Failed to create channel: " + << StatusToString(channel_init_err); + transport->Orphan(); + } + } + // Since the handshake manager is done, the connection no longer needs to + // shutdown the handshake when the listener needs to stop serving. + handshake_mgr_.reset(); + connection_->listener_state_->OnHandshakeDone(connection_.get()); + // Clean up if we don't have a transport + if (!absl::holds_alternative>( + connection_->state_)) { + connection_->listener_state_->connection_quota()->ReleaseConnections(1); + connection_->listener_state_->RemoveLogicalConnection(connection_.get()); + } +} + +// +// NewChttp2ServerListener::ActiveConnection +// + +NewChttp2ServerListener::ActiveConnection::ActiveConnection( + RefCountedPtr listener_state, + grpc_tcp_server* tcp_server, grpc_pollset* accepting_pollset, + AcceptorPtr acceptor, const ChannelArgs& args, MemoryOwner memory_owner, + OrphanablePtr endpoint) + : listener_state_(std::move(listener_state)), + work_serializer_( + args.GetObjectRef()), + state_(memory_owner.MakeOrphanable( + RefAsSubclass(), tcp_server, accepting_pollset, + std::move(acceptor), args, std::move(endpoint))) { + GRPC_CLOSURE_INIT(&on_close_, ActiveConnection::OnClose, this, + grpc_schedule_on_exec_ctx); +} + +void NewChttp2ServerListener::ActiveConnection::Orphan() { + work_serializer_.Run( + [this]() { + // If ActiveConnection is orphaned before handshake is established, + // shutdown the handshaker. If the server is stopping to serve or + // shutting down and a transport has already been established, GOAWAYs + // should be sent separately. + shutdown_ = true; + if (absl::holds_alternative>(state_)) { + state_ = OrphanablePtr(nullptr); + } + Unref(); + }, + DEBUG_LOCATION); +} + +void NewChttp2ServerListener::ActiveConnection::SendGoAway() { + work_serializer_.Run( + [self = RefAsSubclass()]() mutable { + self->SendGoAwayImplLocked(); + }, + DEBUG_LOCATION); +} + +void NewChttp2ServerListener::ActiveConnection::DisconnectImmediately() { + work_serializer_.Run( + [self = RefAsSubclass()]() mutable { + self->DisconnectImmediatelyImplLocked(); + }, + DEBUG_LOCATION); +} + +void NewChttp2ServerListener::ActiveConnection::Start(const ChannelArgs& args) { + work_serializer_.Run( + [self = RefAsSubclass(), args]() mutable { + // If the Connection is already shutdown at this point, it implies the + // owning NewChttp2ServerListener and all associated + // ActiveConnections have been orphaned. + if (self->shutdown_) return; + absl::get>(self->state_) + ->StartLocked(args); + }, + DEBUG_LOCATION); +} + +void NewChttp2ServerListener::ActiveConnection::OnClose( + void* arg, grpc_error_handle /* error */) { + ActiveConnection* self = static_cast(arg); + self->listener_state_->RemoveLogicalConnection(self); + self->listener_state_->connection_quota()->ReleaseConnections(1); + self->Unref(); +} + +void NewChttp2ServerListener::ActiveConnection::SendGoAwayImplLocked() { + if (!shutdown_) { + shutdown_ = true; + Match( + state_, + [](const OrphanablePtr& handshaking_state) { + // Shutdown the handshaker if it's still in progress. + if (handshaking_state != nullptr) { + handshaking_state->ShutdownLocked( + absl::UnavailableError("Connection going away")); + } + }, + [](const RefCountedPtr& transport) { + // Send a GOAWAY if the transport exists + if (transport != nullptr) { + grpc_transport_op* op = grpc_make_transport_op(nullptr); + op->goaway_error = + GRPC_ERROR_CREATE("Server is stopping to serve requests."); + transport->PerformOp(op); + } + }); + } +} + +void NewChttp2ServerListener::ActiveConnection:: + DisconnectImmediatelyImplLocked() { + shutdown_ = true; + Match( + state_, + [](const OrphanablePtr& handshaking_state) { + // Shutdown the handshaker if it's still in progress. + if (handshaking_state != nullptr) { + handshaking_state->ShutdownLocked( + absl::UnavailableError("Connection to be disconnected")); + } + }, + [](const RefCountedPtr& transport) { + // Disconnect immediately if the transport exists + if (transport != nullptr) { + grpc_transport_op* op = grpc_make_transport_op(nullptr); + op->disconnect_with_error = GRPC_ERROR_CREATE( + "Drain grace time expired. Closing connection immediately."); + transport->PerformOp(op); + } + }); +} + +// +// NewChttp2ServerListener // +grpc_error_handle NewChttp2ServerListener::Create( + Server* server, const EventEngine::ResolvedAddress& addr, + const ChannelArgs& args, int* port_num) { + // Create NewChttp2ServerListener. + OrphanablePtr listener = + MakeOrphanable(args); + // The tcp_server will be unreffed when the listener is orphaned, which + // could be at the end of this function if the listener was not added to the + // server's set of listeners. + grpc_error_handle error = grpc_tcp_server_create( + &listener->tcp_server_shutdown_complete_, ChannelArgsEndpointConfig(args), + OnAccept, listener.get(), &listener->tcp_server_); + if (!error.ok()) return error; + // TODO(yijiem): remove this conversion when we remove all + // grpc_resolved_address usages. + grpc_resolved_address iomgr_addr = + grpc_event_engine::experimental::CreateGRPCResolvedAddress(addr); + if (server->config_fetcher() != nullptr) { + // TODO(yashykt): Consider binding so as to be able to return the port + // number. + listener->resolved_address_ = iomgr_addr; + { + MutexLock lock(&listener->mu_); + listener->add_port_on_start_ = true; + } + } else { + error = + grpc_tcp_server_add_port(listener->tcp_server_, &iomgr_addr, port_num); + if (!error.ok()) return error; + } + // Create channelz node. + if (args.GetBool(GRPC_ARG_ENABLE_CHANNELZ) + .value_or(GRPC_ENABLE_CHANNELZ_DEFAULT)) { + auto string_address = + grpc_event_engine::experimental::ResolvedAddressToString(addr); + if (!string_address.ok()) { + return GRPC_ERROR_CREATE(string_address.status().ToString()); + } + listener->channelz_listen_socket_ = + MakeRefCounted( + *string_address, absl::StrCat("chttp2 listener ", *string_address)); + } + // Register with the server only upon success + server->AddListener(std::move(listener)); + return absl::OkStatus(); +} + +grpc_error_handle NewChttp2ServerListener::CreateWithAcceptor( + Server* server, const char* name, const ChannelArgs& args) { + auto listener = MakeOrphanable(args); + grpc_error_handle error = grpc_tcp_server_create( + &listener->tcp_server_shutdown_complete_, ChannelArgsEndpointConfig(args), + OnAccept, listener.get(), &listener->tcp_server_); + if (!error.ok()) return error; + // TODO(yangg) channelz + TcpServerFdHandler** arg_val = args.GetPointer(name); + *arg_val = grpc_tcp_server_create_fd_handler(listener->tcp_server_); + server->AddListener(std::move(listener)); + return absl::OkStatus(); +} + +NewChttp2ServerListener* NewChttp2ServerListener::CreateForPassiveListener( + Server* server, const ChannelArgs& args, + std::shared_ptr passive_listener) { + // TODO(hork): figure out how to handle channelz in this case + auto listener = MakeOrphanable( + args, std::move(passive_listener)); + auto listener_ptr = listener.get(); + server->AddListener(std::move(listener)); + return listener_ptr; +} + +NewChttp2ServerListener::NewChttp2ServerListener( + const ChannelArgs& args, + std::shared_ptr passive_listener) + : args_(args), passive_listener_(std::move(passive_listener)) { + GRPC_CLOSURE_INIT(&tcp_server_shutdown_complete_, TcpServerShutdownComplete, + this, grpc_schedule_on_exec_ctx); +} + +NewChttp2ServerListener::~NewChttp2ServerListener() { + if (passive_listener_ != nullptr) { + passive_listener_->ListenerDestroyed(); + } + if (on_destroy_done_ != nullptr) { + ExecCtx::Run(DEBUG_LOCATION, on_destroy_done_, absl::OkStatus()); + } +} + +void NewChttp2ServerListener::Start() { + bool should_add_port = false; + grpc_tcp_server* tcp_server = nullptr; + { + MutexLock lock(&mu_); + if (!shutdown_) { + should_add_port = std::exchange(add_port_on_start_, false); + // Hold a ref while we start the server + if (tcp_server_ != nullptr) { + grpc_tcp_server_ref(tcp_server_); + tcp_server = tcp_server_; + } + } + } + if (should_add_port) { + int port_temp; + grpc_error_handle error = + grpc_tcp_server_add_port(tcp_server_, resolved_address(), &port_temp); + if (!error.ok()) { + LOG(ERROR) << "Error adding port to server: " << StatusToString(error); + // TODO(yashykt): We wouldn't need to assert here if we bound to the + // port earlier during AddPort. + CHECK(0); + } + } + if (tcp_server != nullptr) { + grpc_tcp_server_start(tcp_server, &listener_state_->server()->pollsets()); + // Give up the ref we took earlier + grpc_tcp_server_unref(tcp_server); + } +} + +void NewChttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) { + MutexLock lock(&mu_); + on_destroy_done_ = on_destroy_done; +} + +void NewChttp2ServerListener::AcceptConnectedEndpoint( + std::unique_ptr endpoint) { + OnAccept(this, grpc_event_engine_endpoint_create(std::move(endpoint)), + /*accepting_pollset=*/nullptr, /*acceptor=*/nullptr); +} + +void NewChttp2ServerListener::OnAccept( + void* arg, grpc_endpoint* tcp, grpc_pollset* accepting_pollset, + grpc_tcp_server_acceptor* server_acceptor) { + NewChttp2ServerListener* self = static_cast(arg); + OrphanablePtr endpoint(tcp); + AcceptorPtr acceptor(server_acceptor); + if (!self->listener_state_->connection_quota()->AllowIncomingConnection( + self->listener_state_->memory_quota(), + grpc_endpoint_get_peer(endpoint.get()))) { + return; + } + { + // The ref for the tcp_server need to be taken in the critical region + // after having made sure that the listener has not been Orphaned, so as + // to avoid heap-use-after-free issues where `Ref()` is invoked when the + // listener is already shutdown. Note that the listener holds a ref to the + // tcp_server but this ref is given away when the listener is orphaned + // (shutdown). A connection needs the tcp_server to outlast the handshake + // since the acceptor needs it. + MutexLock lock(&self->mu_); + if (self->shutdown_) { + self->listener_state_->connection_quota()->ReleaseConnections(1); + return; + } + if (self->tcp_server_ != nullptr) { + grpc_tcp_server_ref(self->tcp_server_); + } + } + auto memory_owner = + self->listener_state_->memory_quota()->CreateMemoryOwner(); + auto connection = memory_owner.MakeOrphanable( + self->listener_state_, self->tcp_server_, accepting_pollset, + std::move(acceptor), self->args_, std::move(memory_owner), + std::move(endpoint)); + RefCountedPtr connection_ref = + connection->RefAsSubclass(); + absl::optional new_args = + self->listener_state_->AddLogicalConnection(std::move(connection), + self->args_, tcp); + if (new_args.has_value()) { + connection_ref->Start(*new_args); + } else { + self->listener_state_->connection_quota()->ReleaseConnections(1); + } +} + +void NewChttp2ServerListener::TcpServerShutdownComplete( + void* arg, grpc_error_handle /*error*/) { + NewChttp2ServerListener* self = static_cast(arg); + self->channelz_listen_socket_.reset(); + self->Unref(); +} + +// Server callback: destroy the tcp listener (so we don't generate further +// callbacks) +void NewChttp2ServerListener::Orphan() { + grpc_tcp_server* tcp_server; + { + MutexLock lock(&mu_); + shutdown_ = true; + tcp_server = tcp_server_; + } + if (tcp_server != nullptr) { + grpc_tcp_server_shutdown_listeners(tcp_server); + grpc_tcp_server_unref(tcp_server); + } else { + Unref(); + } +} + +namespace { + grpc_error_handle Chttp2ServerAddPort(Server* server, const char* addr, - const ChannelArgs& args, - Chttp2ServerArgsModifier args_modifier, - int* port_num) { + const ChannelArgs& args, int* port_num) { if (addr == nullptr) { return GRPC_ERROR_CREATE("Invalid address: addr cannot be a nullptr."); } if (strncmp(addr, "external:", 9) == 0) { - return Chttp2ServerListener::CreateWithAcceptor(server, addr, args, - args_modifier); + if (IsServerListenerEnabled()) { + return NewChttp2ServerListener::CreateWithAcceptor(server, addr, args); + } else { + return Chttp2ServerListener::CreateWithAcceptor(server, addr, args); + } } *port_num = -1; absl::StatusOr> resolved; @@ -1029,8 +1520,11 @@ grpc_error_handle Chttp2ServerAddPort(Server* server, const char* addr, *port_num); } int port_temp = -1; - error = Chttp2ServerListener::Create(server, addr, args, args_modifier, - &port_temp); + if (IsServerListenerEnabled()) { + error = NewChttp2ServerListener::Create(server, addr, args, &port_temp); + } else { + error = Chttp2ServerListener::Create(server, addr, args, &port_temp); + } if (!error.ok()) { error_list.push_back(error); } else { @@ -1063,25 +1557,6 @@ grpc_error_handle Chttp2ServerAddPort(Server* server, const char* addr, return error; } -namespace { - -ChannelArgs ModifyArgsForConnection(const ChannelArgs& args, - grpc_error_handle* error) { - auto* server_credentials = args.GetObject(); - if (server_credentials == nullptr) { - *error = GRPC_ERROR_CREATE("Could not find server credentials"); - return args; - } - auto security_connector = server_credentials->create_security_connector(args); - if (security_connector == nullptr) { - *error = GRPC_ERROR_CREATE( - absl::StrCat("Unable to create secure server with credentials of type ", - server_credentials->type().name())); - return args; - } - return args.SetObject(security_connector); -} - } // namespace namespace experimental { @@ -1089,19 +1564,40 @@ namespace experimental { absl::Status PassiveListenerImpl::AcceptConnectedEndpoint( std::unique_ptr endpoint) { CHECK_NE(server_.get(), nullptr); - RefCountedPtr listener; - { - MutexLock lock(&mu_); - if (listener_ != nullptr) { - listener = - listener_->RefIfNonZero().TakeAsSubclass(); + if (IsServerListenerEnabled()) { + RefCountedPtr new_listener; + { + MutexLock lock(&mu_); + auto* new_listener_ptr = + absl::get_if(&listener_); + if (new_listener_ptr != nullptr && *new_listener_ptr != nullptr) { + new_listener = (*new_listener_ptr) + ->RefIfNonZero() + .TakeAsSubclass(); + } } + if (new_listener == nullptr) { + return absl::UnavailableError("passive listener already shut down"); + } + ExecCtx exec_ctx; + new_listener->AcceptConnectedEndpoint(std::move(endpoint)); + } else { + RefCountedPtr listener; + { + MutexLock lock(&mu_); + auto* listener_ptr = absl::get_if(&listener_); + if (listener_ptr != nullptr && *listener_ptr != nullptr) { + listener = (*listener_ptr) + ->RefIfNonZero() + .TakeAsSubclass(); + } + } + if (listener == nullptr) { + return absl::UnavailableError("passive listener already shut down"); + } + ExecCtx exec_ctx; + listener->AcceptConnectedEndpoint(std::move(endpoint)); } - if (listener == nullptr) { - return absl::UnavailableError("passive listener already shut down"); - } - ExecCtx exec_ctx; - listener->AcceptConnectedEndpoint(std::move(endpoint)); return absl::OkStatus(); } @@ -1123,7 +1619,7 @@ absl::Status PassiveListenerImpl::AcceptConnectedFd(int fd) { void PassiveListenerImpl::ListenerDestroyed() { MutexLock lock(&mu_); - listener_ = nullptr; + listener_ = static_cast(nullptr); } } // namespace experimental @@ -1169,8 +1665,7 @@ int grpc_server_add_http2_port(grpc_server* server, const char* addr, args = args.SetObject(creds->Ref()).SetObject(sc); } // Add server port. - err = grpc_core::Chttp2ServerAddPort( - core_server, addr, args, grpc_core::ModifyArgsForConnection, &port_num); + err = grpc_core::Chttp2ServerAddPort(core_server, addr, args, &port_num); done: sc.reset(DEBUG_LOCATION, "server"); if (!err.ok()) { @@ -1250,9 +1745,16 @@ absl::Status grpc_server_add_passive_listener( auto args = server->channel_args() .SetObject(credentials->Ref()) .SetObject(std::move(sc)); - passive_listener->listener_ = - grpc_core::Chttp2ServerListener::CreateForPassiveListener( - server, args, passive_listener); + if (grpc_core::IsServerListenerEnabled()) { + passive_listener->listener_ = + grpc_core::NewChttp2ServerListener::CreateForPassiveListener( + server, args, passive_listener); + } else { + passive_listener->listener_ = + grpc_core::Chttp2ServerListener::CreateForPassiveListener( + server, args, passive_listener); + } + passive_listener->server_ = server->Ref(); return absl::OkStatus(); } diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.h b/src/core/ext/transport/chttp2/server/chttp2_server.h index 0ecbf39e7455a..5db755df69db5 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.h +++ b/src/core/ext/transport/chttp2/server/chttp2_server.h @@ -19,36 +19,212 @@ #ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_SERVER_CHTTP2_SERVER_H #define GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_SERVER_CHTTP2_SERVER_H +#include #include #include #include +#include "src/core/ext/transport/chttp2/transport/internal.h" +#include "src/core/handshaker/handshaker.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/error.h" #include "src/core/server/server.h" namespace grpc_core { -// A function to modify channel args for a listening addr:port. Note that this -// is used to create a security connector for listeners when the servers are -// configured with a config fetcher. Not invoked if there is no config fetcher -// added to the server. On failure, the error parameter will be set. -using Chttp2ServerArgsModifier = - std::function; - -/// Adds a port to \a server. Sets \a port_num to the port number. -/// Takes ownership of \a args. -grpc_error_handle Chttp2ServerAddPort( - Server* server, const char* addr, const ChannelArgs& args, - Chttp2ServerArgsModifier connection_args_modifier, int* port_num); +struct AcceptorDeleter { + void operator()(grpc_tcp_server_acceptor* acceptor) const { + gpr_free(acceptor); + } +}; class Chttp2ServerListener; + +namespace testing { +class Chttp2ServerListenerTestPeer; +class ActiveConnectionTestPeer; +class HandshakingStateTestPeer; +} // namespace testing + +// New ChttpServerListener used if experiment "server_listener" is enabled +class NewChttp2ServerListener : public Server::ListenerInterface { + public: + using AcceptorPtr = + std::unique_ptr; + + // Keeps state for an individual connection. Lifetime: Internally refcounted + // and owned by Server::ListenerState. + class ActiveConnection : public LogicalConnection { + public: + // State for handshake. Lifetime: Owned by ActiveConnection and lasts while + // the handshake is ongoing. + class HandshakingState : public InternallyRefCounted { + public: + HandshakingState(RefCountedPtr connection_ref, + grpc_tcp_server* tcp_server, + grpc_pollset* accepting_pollset, AcceptorPtr acceptor, + const ChannelArgs& args, + OrphanablePtr endpoint); + + ~HandshakingState() override; + + void Orphan() override; + + void StartLocked(const ChannelArgs& args); + + void ShutdownLocked(absl::Status status); + + private: + friend class grpc_core::testing::HandshakingStateTestPeer; + + void OnTimeoutLocked(); + static void OnReceiveSettings(void* arg, grpc_error_handle /* error */); + void OnHandshakeDoneLocked(absl::StatusOr result); + + RefCountedPtr const connection_; + grpc_tcp_server* const tcp_server_; + grpc_pollset* const accepting_pollset_; + const AcceptorPtr acceptor_; + grpc_pollset_set* const interested_parties_; + Timestamp const deadline_; + OrphanablePtr endpoint_; + // Following fields are protected by WorkSerializer. + RefCountedPtr handshake_mgr_; + // State for enforcing handshake timeout on receiving HTTP/2 settings. + absl::optional + timer_handle_; + grpc_closure on_receive_settings_; + }; + + ActiveConnection(RefCountedPtr listener_state, + grpc_tcp_server* tcp_server, + grpc_pollset* accepting_pollset, AcceptorPtr acceptor, + const ChannelArgs& args, MemoryOwner memory_owner, + OrphanablePtr endpoint); + void Start(const ChannelArgs& args); + + void SendGoAway() override; + void DisconnectImmediately() override; + + void Orphan() override; + + // Needed to be able to grab an external weak ref in + // NewChttp2ServerListener::OnAccept() + using InternallyRefCounted::RefAsSubclass; + + private: + friend class grpc_core::testing::ActiveConnectionTestPeer; + friend class grpc_core::testing::HandshakingStateTestPeer; + + static void OnClose(void* arg, grpc_error_handle error); + + void SendGoAwayImplLocked(); + void DisconnectImmediatelyImplLocked(); + + RefCountedPtr const listener_state_; + WorkSerializer work_serializer_; + // Following fields are protected by WorkSerializer. + // Set by HandshakingState before the handshaking begins and set to a valid + // transport when handshaking is done successfully. + absl::variant, + RefCountedPtr> + state_; + grpc_closure on_close_; + bool shutdown_ = false; + }; + + static grpc_error_handle Create( + Server* server, + const grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr, + const ChannelArgs& args, int* port_num); + + static grpc_error_handle CreateWithAcceptor(Server* server, const char* name, + const ChannelArgs& args); + + static NewChttp2ServerListener* CreateForPassiveListener( + Server* server, const ChannelArgs& args, + std::shared_ptr passive_listener); + + // Do not instantiate directly. Use one of the factory methods above. + explicit NewChttp2ServerListener( + const ChannelArgs& args, + std::shared_ptr passive_listener = + nullptr); + ~NewChttp2ServerListener() override; + + void AcceptConnectedEndpoint( + std::unique_ptr + endpoint); + + channelz::ListenSocketNode* channelz_listen_socket_node() const override { + return channelz_listen_socket_.get(); + } + + void SetServerListenerState( + RefCountedPtr listener_state) override { + listener_state_ = std::move(listener_state); + } + + const grpc_resolved_address* resolved_address() const override { + return &resolved_address_; + } + + void SetOnDestroyDone(grpc_closure* on_destroy_done) override; + + void Orphan() override; + + private: + friend class experimental::PassiveListenerImpl; + + // To allow access to RefCounted<> like interface. + friend class RefCountedPtr; + + friend class grpc_core::testing::Chttp2ServerListenerTestPeer; + + // Should only be called once so as to start the TCP server. This should + // only be called by the config fetcher. + void Start() override; + + static void OnAccept(void* arg, grpc_endpoint* tcp, + grpc_pollset* accepting_pollset, + grpc_tcp_server_acceptor* acceptor); + + static void TcpServerShutdownComplete(void* arg, grpc_error_handle error); + + static void DestroyListener(Server* /*server*/, void* arg, + grpc_closure* destroy_done); + + grpc_event_engine::experimental::EventEngine* event_engine() const { + return listener_state_->server() + ->channel_args() + .GetObject(); + } + + grpc_tcp_server* tcp_server_ = nullptr; + grpc_resolved_address resolved_address_; + RefCountedPtr listener_state_; + ChannelArgs args_; + Mutex mu_; + bool add_port_on_start_ ABSL_GUARDED_BY(mu_) = false; + // Signals whether the application has triggered shutdown. + bool shutdown_ ABSL_GUARDED_BY(mu_) = false; + grpc_closure tcp_server_shutdown_complete_ ABSL_GUARDED_BY(mu_); + grpc_closure* on_destroy_done_ ABSL_GUARDED_BY(mu_) = nullptr; + RefCountedPtr channelz_listen_socket_; + // TODO(yashykt): consider using absl::variant<> to minimize memory usage for + // disjoint cases where different fields are used. + std::shared_ptr passive_listener_; +}; + namespace experimental { // An implementation of the public C++ passive listener interface. // The server builder holds a weak_ptr to one of these objects, and the // application owns the instance. +// TODO(yashykt): Move this to C-Core since this should be transport agnostic. +// Refer to https://github.com/grpc/grpc/pull/37601/files#r1803547924 for +// details. class PassiveListenerImpl final : public PassiveListener { public: absl::Status AcceptConnectedEndpoint( @@ -71,7 +247,7 @@ class PassiveListenerImpl final : public PassiveListener { Mutex mu_; // Data members will be populated when initialized. RefCountedPtr server_; - Chttp2ServerListener* listener_; + absl::variant listener_; }; } // namespace experimental diff --git a/src/core/lib/experiments/experiments.cc b/src/core/lib/experiments/experiments.cc index 23f6f54671bf2..e3a32fe31d49b 100644 --- a/src/core/lib/experiments/experiments.cc +++ b/src/core/lib/experiments/experiments.cc @@ -142,6 +142,11 @@ const char* const description_work_serializer_dispatch = "callback, instead of running things inline in the first thread that " "successfully enqueues work."; const char* const additional_constraints_work_serializer_dispatch = "{}"; +const char* const description_server_listener = + "If set, the new server listener classes are used."; +const char* const additional_constraints_server_listener = "{}"; +const uint8_t required_experiments_server_listener[] = { + static_cast(grpc_core::kExperimentIdWorkSerializerDispatch)}; } // namespace namespace grpc_core { @@ -227,6 +232,9 @@ const ExperimentMetadata g_experiment_metadata[] = { false, true}, {"work_serializer_dispatch", description_work_serializer_dispatch, additional_constraints_work_serializer_dispatch, nullptr, 0, true, true}, + {"server_listener", description_server_listener, + additional_constraints_server_listener, + required_experiments_server_listener, 1, false, true}, }; } // namespace grpc_core @@ -352,6 +360,11 @@ const char* const description_work_serializer_dispatch = "callback, instead of running things inline in the first thread that " "successfully enqueues work."; const char* const additional_constraints_work_serializer_dispatch = "{}"; +const char* const description_server_listener = + "If set, the new server listener classes are used."; +const char* const additional_constraints_server_listener = "{}"; +const uint8_t required_experiments_server_listener[] = { + static_cast(grpc_core::kExperimentIdWorkSerializerDispatch)}; } // namespace namespace grpc_core { @@ -437,6 +450,9 @@ const ExperimentMetadata g_experiment_metadata[] = { false, true}, {"work_serializer_dispatch", description_work_serializer_dispatch, additional_constraints_work_serializer_dispatch, nullptr, 0, true, true}, + {"server_listener", description_server_listener, + additional_constraints_server_listener, + required_experiments_server_listener, 1, false, true}, }; } // namespace grpc_core @@ -562,6 +578,11 @@ const char* const description_work_serializer_dispatch = "callback, instead of running things inline in the first thread that " "successfully enqueues work."; const char* const additional_constraints_work_serializer_dispatch = "{}"; +const char* const description_server_listener = + "If set, the new server listener classes are used."; +const char* const additional_constraints_server_listener = "{}"; +const uint8_t required_experiments_server_listener[] = { + static_cast(grpc_core::kExperimentIdWorkSerializerDispatch)}; } // namespace namespace grpc_core { @@ -647,6 +668,9 @@ const ExperimentMetadata g_experiment_metadata[] = { false, true}, {"work_serializer_dispatch", description_work_serializer_dispatch, additional_constraints_work_serializer_dispatch, nullptr, 0, true, true}, + {"server_listener", description_server_listener, + additional_constraints_server_listener, + required_experiments_server_listener, 1, false, true}, }; } // namespace grpc_core diff --git a/src/core/lib/experiments/experiments.h b/src/core/lib/experiments/experiments.h index bc82e053c2a28..0a1213651b0da 100644 --- a/src/core/lib/experiments/experiments.h +++ b/src/core/lib/experiments/experiments.h @@ -96,6 +96,7 @@ inline bool IsTraceRecordCallopsEnabled() { return true; } inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_DISPATCH inline bool IsWorkSerializerDispatchEnabled() { return true; } +inline bool IsServerListenerEnabled() { return false; } #elif defined(GPR_WINDOWS) #define GRPC_EXPERIMENT_IS_INCLUDED_BACKOFF_CAP_INITIAL_AT_MAX @@ -140,6 +141,7 @@ inline bool IsTraceRecordCallopsEnabled() { return true; } inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_DISPATCH inline bool IsWorkSerializerDispatchEnabled() { return true; } +inline bool IsServerListenerEnabled() { return false; } #else #define GRPC_EXPERIMENT_IS_INCLUDED_BACKOFF_CAP_INITIAL_AT_MAX @@ -184,6 +186,7 @@ inline bool IsTraceRecordCallopsEnabled() { return true; } inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_DISPATCH inline bool IsWorkSerializerDispatchEnabled() { return true; } +inline bool IsServerListenerEnabled() { return false; } #endif #else @@ -218,6 +221,7 @@ enum ExperimentIds { kExperimentIdTraceRecordCallops, kExperimentIdUnconstrainedMaxQuotaBufferSize, kExperimentIdWorkSerializerDispatch, + kExperimentIdServerListener, kNumExperiments }; #define GRPC_EXPERIMENT_IS_INCLUDED_BACKOFF_CAP_INITIAL_AT_MAX @@ -341,6 +345,10 @@ inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { inline bool IsWorkSerializerDispatchEnabled() { return IsExperimentEnabled(); } +#define GRPC_EXPERIMENT_IS_INCLUDED_SERVER_LISTENER +inline bool IsServerListenerEnabled() { + return IsExperimentEnabled(); +} extern const ExperimentMetadata g_experiment_metadata[kNumExperiments]; diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml index 920981388968d..7170a8d2845be 100644 --- a/src/core/lib/experiments/experiments.yaml +++ b/src/core/lib/experiments/experiments.yaml @@ -187,6 +187,13 @@ expiry: 2025/03/01 owner: vigneshbabu@google.com test_tags: [] +- name: server_listener + description: + If set, the new server listener classes are used. + requires: ["work_serializer_dispatch"] + expiry: 2025/03/31 + owner: yashkt@google.com + test_tags: ["xds_end2end_test", "core_end2end_test"] - name: tcp_frame_size_tuning description: If set, enables TCP to use RPC size estimation made by higher layers. diff --git a/src/core/lib/experiments/rollouts.yaml b/src/core/lib/experiments/rollouts.yaml index 247f7d2c7963e..c712dd78e9463 100644 --- a/src/core/lib/experiments/rollouts.yaml +++ b/src/core/lib/experiments/rollouts.yaml @@ -108,6 +108,8 @@ default: false - name: schedule_cancellation_over_write default: false +- name: server_listener + default: false - name: server_privacy default: false - name: tcp_frame_size_tuning diff --git a/src/core/lib/resource_quota/connection_quota.h b/src/core/lib/resource_quota/connection_quota.h index 4181a1878a1c4..f30ea21806329 100644 --- a/src/core/lib/resource_quota/connection_quota.h +++ b/src/core/lib/resource_quota/connection_quota.h @@ -48,6 +48,10 @@ class ConnectionQuota : public RefCounted { // Mark connections as closed. void ReleaseConnections(int num_connections); + int TestOnlyActiveIncomingConnections() const { + return active_incoming_connections_; + } + private: std::atomic active_incoming_connections_{0}; std::atomic max_incoming_connections_{std::numeric_limits::max()}; diff --git a/src/core/server/server.cc b/src/core/server/server.cc index f4ef152e64ee9..94cd5e022f224 100644 --- a/src/core/server/server.cc +++ b/src/core/server/server.cc @@ -47,6 +47,7 @@ #include "src/core/channelz/channel_trace.h" #include "src/core/channelz/channelz.h" #include "src/core/config/core_configuration.h" +#include "src/core/lib/address_utils/sockaddr_utils.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args_preconditioning.h" #include "src/core/lib/experiments/experiments.h" @@ -62,6 +63,7 @@ #include "src/core/lib/promise/seq.h" #include "src/core/lib/promise/try_join.h" #include "src/core/lib/promise/try_seq.h" +#include "src/core/lib/security/credentials/credentials.h" #include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/call.h" @@ -84,6 +86,270 @@ namespace grpc_core { +// +// Server::ListenerState::ConfigFetcherWatcher +// + +void Server::ListenerState::ConfigFetcherWatcher::UpdateConnectionManager( + RefCountedPtr connection_manager) { + RefCountedPtr + connection_manager_to_destroy; + { + MutexLock lock(&listener_state_->mu_); + connection_manager_to_destroy = listener_state_->connection_manager_; + listener_state_->connection_manager_ = std::move(connection_manager); + listener_state_->DrainConnectionsLocked(); + if (listener_state_->server_->ShutdownCalled()) { + return; + } + listener_state_->is_serving_ = true; + if (listener_state_->started_) return; + listener_state_->started_ = true; + } + listener_state_->listener_->Start(); +} + +void Server::ListenerState::ConfigFetcherWatcher::StopServing() { + MutexLock lock(&listener_state_->mu_); + listener_state_->is_serving_ = false; + listener_state_->DrainConnectionsLocked(); +} + +// +// Server::ListenerState +// + +Server::ListenerState::ListenerState(RefCountedPtr server, + OrphanablePtr l) + : server_(std::move(server)), + memory_quota_( + server_->channel_args().GetObject()->memory_quota()), + connection_quota_(MakeRefCounted()), + event_engine_( + server_->channel_args() + .GetObject()), + listener_(std::move(l)) { + auto max_allowed_incoming_connections = + server_->channel_args().GetInt(GRPC_ARG_MAX_ALLOWED_INCOMING_CONNECTIONS); + if (max_allowed_incoming_connections.has_value()) { + connection_quota_->SetMaxIncomingConnections( + max_allowed_incoming_connections.value()); + } +} + +void Server::ListenerState::Start() { + if (IsServerListenerEnabled()) { + if (server_->config_fetcher() != nullptr) { + auto watcher = std::make_unique(this); + config_fetcher_watcher_ = watcher.get(); + server_->config_fetcher()->StartWatch( + grpc_sockaddr_to_string(listener_->resolved_address(), false).value(), + std::move(watcher)); + } else { + { + MutexLock lock(&mu_); + started_ = true; + is_serving_ = true; + } + listener_->Start(); + } + } else { + listener_->Start(); + } +} + +void Server::ListenerState::Stop() { + if (IsServerListenerEnabled()) { + absl::flat_hash_set> + connections; + { + MutexLock lock(&mu_); + // Orphan the connections so that they can start cleaning up. + connections = std::move(connections_); + connections_.clear(); + is_serving_ = false; + } + if (config_fetcher_watcher_ != nullptr) { + CHECK_NE(server_->config_fetcher(), nullptr); + server_->config_fetcher()->CancelWatch(config_fetcher_watcher_); + } + } + GRPC_CLOSURE_INIT(&destroy_done_, ListenerDestroyDone, server_.get(), + grpc_schedule_on_exec_ctx); + listener_->SetOnDestroyDone(&destroy_done_); + listener_.reset(); +} + +absl::optional Server::ListenerState::AddLogicalConnection( + OrphanablePtr connection, + const ChannelArgs& args, grpc_endpoint* endpoint) { + RefCountedPtr connection_manager; + { + MutexLock lock(&mu_); + if (!is_serving_) { + // Not serving + return absl::nullopt; + } + connection_manager = connection_manager_; + } + // The following section is intentionally outside the critical section. The + // operation to update channel args for a connection is heavy and complicated. + // For example, if using the xDS config fetcher, an involved matching process + // is performed to determine the filter chain to apply for this connection, + // prepare the filters, config selector and credentials. Subsequently, the + // credentials are used to create a security connector as well. Doing this + // outside the critical region allows us to get a larger degree of parallelism + // for the handling of incoming connections. + ChannelArgs new_args = args; + if (server_->config_fetcher() != nullptr) { + if (connection_manager == nullptr) { + // Connection manager not available + return absl::nullopt; + } + absl::StatusOr args_result = + connection_manager->UpdateChannelArgsForConnection(new_args, endpoint); + if (!args_result.ok()) { + return absl::nullopt; + } + auto* server_credentials = + (*args_result).GetObject(); + if (server_credentials == nullptr) { + // Could not find server credentials + return absl::nullopt; + } + auto security_connector = + server_credentials->create_security_connector(*args_result); + if (security_connector == nullptr) { + // Unable to create secure server with credentials + return absl::nullopt; + } + new_args = (*args_result).SetObject(security_connector); + } + MutexLock lock(&mu_); + // Since we let go of the lock earlier, we need to protect ourselves against + // time-of-check-to-time-of-use cases. The server may have stopped serving + // or the connection manager may have changed before we add the connection + // to the list of tracked connections. + if (!is_serving_ || connection_manager != connection_manager_) { + // Not serving + return absl::nullopt; + } + connections_.emplace(std::move(connection)); + return new_args; +} + +void Server::ListenerState::OnHandshakeDone( + ListenerInterface::LogicalConnection* connection) { + if (server_->config_fetcher() != nullptr) { + return; + } + // There is no config fetcher, so we can remove this connection from being + // tracked immediately. + OrphanablePtr connection_to_remove; + { + // Remove the connection if it wasn't already removed. + MutexLock lock(&mu_); + auto connection_handle = connections_.extract(connection); + if (!connection_handle.empty()) { + connection_to_remove = std::move(connection_handle.value()); + } + // We do not need to check connections_to_be_drained_list_ since that only + // gets set if there is a config fetcher. + } +} + +void Server::ListenerState::RemoveLogicalConnection( + ListenerInterface::LogicalConnection* connection) { + OrphanablePtr connection_to_remove; + // Remove the connection if it wasn't already removed. + MutexLock lock(&mu_); + auto connection_handle = connections_.extract(connection); + if (!connection_handle.empty()) { + connection_to_remove = std::move(connection_handle.value()); + return; + } + // The connection might be getting drained. + for (auto it = connections_to_be_drained_list_.begin(); + it != connections_to_be_drained_list_.end(); ++it) { + auto connection_handle = it->connections.extract(connection); + if (!connection_handle.empty()) { + connection_to_remove = std::move(connection_handle.value()); + RemoveConnectionsToBeDrainedOnEmptyLocked(it); + return; + } + } +} + +void Server::ListenerState::DrainConnectionsLocked() { + if (connections_.empty()) { + return; + } + // Send GOAWAYs on the transports so that they disconnect when existing + // RPCs finish. + for (auto& connection : connections_) { + connection->SendGoAway(); + } + connections_to_be_drained_list_.emplace_back(); + auto& connections_to_be_drained = connections_to_be_drained_list_.back(); + connections_to_be_drained.connections = std::move(connections_); + connections_.clear(); + connections_to_be_drained.timestamp = + Timestamp::Now() + + std::max(Duration::Zero(), + server_->channel_args() + .GetDurationFromIntMillis( + GRPC_ARG_SERVER_CONFIG_CHANGE_DRAIN_GRACE_TIME_MS) + .value_or(Duration::Minutes(10))); + MaybeStartNewGraceTimerLocked(); +} + +void Server::ListenerState::OnDrainGraceTimer() { + absl::flat_hash_set> + connections_to_be_drained; + { + MutexLock lock(&mu_); + if (connections_to_be_drained_list_.empty()) { + return; + } + connections_to_be_drained = + std::move(connections_to_be_drained_list_.front().connections); + connections_to_be_drained_list_.pop_front(); + MaybeStartNewGraceTimerLocked(); + } + for (auto& connection : connections_to_be_drained) { + connection->DisconnectImmediately(); + } +} + +void Server::ListenerState::MaybeStartNewGraceTimerLocked() { + if (connections_to_be_drained_list_.empty()) { + return; + } + drain_grace_timer_handle_ = event_engine()->RunAfter( + connections_to_be_drained_list_.front().timestamp - Timestamp::Now(), + [self = Ref()]() mutable { + ApplicationCallbackExecCtx callback_exec_ctx; + ExecCtx exec_ctx; + self->OnDrainGraceTimer(); + // resetting within an active ExecCtx + self.reset(); + }); +} + +void Server::ListenerState::RemoveConnectionsToBeDrainedOnEmptyLocked( + std::deque::iterator it) { + if (it->connections.empty()) { + // Cancel the timer if the set of connections is now empty. + if (event_engine()->Cancel(drain_grace_timer_handle_)) { + // Only remove the entry from the list if the cancellation was + // actually successful. OnDrainGraceTimer() will remove if + // cancellation is not successful. + connections_to_be_drained_list_.erase(it); + MaybeStartNewGraceTimerLocked(); + } + } +} + // // Server::RequestMatcherInterface // @@ -932,7 +1198,10 @@ void Server::AddListener(OrphanablePtr listener) { channelz_node_->AddChildListenSocket( listen_socket_node->RefAsSubclass()); } - listeners_.emplace_back(std::move(listener)); + ListenerInterface* ptr = listener.get(); + listener_states_.emplace_back( + MakeRefCounted(Ref(), std::move(listener))); + ptr->SetServerListenerState(listener_states_.back()); } void Server::Start() { @@ -964,8 +1233,8 @@ void Server::Start() { pollset); } } - for (auto& listener : listeners_) { - listener.listener->Start(this, &pollsets_); + for (auto& listener_state : listener_states_) { + listener_state->Start(); } MutexLock lock(&mu_global_); starting_ = false; @@ -1115,15 +1384,15 @@ void Server::MaybeFinishShutdown() { KillPendingWorkLocked(GRPC_ERROR_CREATE("Server Shutdown")); } if (!channels_.empty() || connections_open_ > 0 || - listeners_destroyed_ < listeners_.size()) { + listeners_destroyed_ < listener_states_.size()) { if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), last_shutdown_message_time_), gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) { last_shutdown_message_time_ = gpr_now(GPR_CLOCK_REALTIME); VLOG(2) << "Waiting for " << channels_.size() << " channels " << connections_open_ << " connections and " - << listeners_.size() - listeners_destroyed_ << "/" - << listeners_.size() + << listener_states_.size() - listeners_destroyed_ << "/" + << listener_states_.size() << " listeners to be destroyed before shutting down server"; } return; @@ -1220,18 +1489,15 @@ void Server::ShutdownAndNotify(grpc_completion_queue* cq, void* tag) { } void Server::StopListening() { - for (auto& listener : listeners_) { - if (listener.listener == nullptr) continue; + for (auto& listener_state : listener_states_) { + if (listener_state->listener() == nullptr) continue; channelz::ListenSocketNode* channelz_listen_socket_node = - listener.listener->channelz_listen_socket_node(); + listener_state->listener()->channelz_listen_socket_node(); if (channelz_node_ != nullptr && channelz_listen_socket_node != nullptr) { channelz_node_->RemoveChildListenSocket( channelz_listen_socket_node->uuid()); } - GRPC_CLOSURE_INIT(&listener.destroy_done, ListenerDestroyDone, this, - grpc_schedule_on_exec_ctx); - listener.listener->SetOnDestroyDone(&listener.destroy_done); - listener.listener.reset(); + listener_state->Stop(); } } @@ -1257,9 +1523,10 @@ void Server::SendGoaways() { void Server::Orphan() { { MutexLock lock(&mu_global_); - CHECK(ShutdownCalled() || listeners_.empty()); - CHECK(listeners_destroyed_ == listeners_.size()); + CHECK(ShutdownCalled() || listener_states_.empty()); + CHECK(listeners_destroyed_ == listener_states_.size()); } + listener_states_.clear(); Unref(); } @@ -1875,9 +2142,8 @@ grpc_call_error grpc_server_request_call( grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; GRPC_TRACE_LOG(api, INFO) - << "grpc_server_request_call(" - << "server=" << server << ", call=" << call << ", details=" << details - << ", initial_metadata=" << request_metadata + << "grpc_server_request_call(" << "server=" << server << ", call=" << call + << ", details=" << details << ", initial_metadata=" << request_metadata << ", cq_bound_to_call=" << cq_bound_to_call << ", cq_for_notification=" << cq_for_notification << ", tag=" << tag; return grpc_core::Server::FromC(server)->RequestCall( @@ -1896,10 +2162,9 @@ grpc_call_error grpc_server_request_registered_call( auto* rm = static_cast(registered_method); GRPC_TRACE_LOG(api, INFO) - << "grpc_server_request_registered_call(" - << "server=" << server << ", registered_method=" << registered_method - << ", call=" << call << ", deadline=" << deadline - << ", request_metadata=" << request_metadata + << "grpc_server_request_registered_call(" << "server=" << server + << ", registered_method=" << registered_method << ", call=" << call + << ", deadline=" << deadline << ", request_metadata=" << request_metadata << ", optional_payload=" << optional_payload << ", cq_bound_to_call=" << cq_bound_to_call << ", cq_for_notification=" << cq_for_notification << ", tag=" << tag_new @@ -1917,7 +2182,8 @@ void grpc_server_set_config_fetcher( << "grpc_server_set_config_fetcher(server=" << server << ", config_fetcher=" << server_config_fetcher << ")"; grpc_core::Server::FromC(server)->set_config_fetcher( - std::unique_ptr(server_config_fetcher)); + std::unique_ptr( + grpc_core::ServerConfigFetcher::FromC(server_config_fetcher))); } void grpc_server_config_fetcher_destroy( @@ -1927,5 +2193,5 @@ void grpc_server_config_fetcher_destroy( GRPC_TRACE_LOG(api, INFO) << "grpc_server_config_fetcher_destroy(config_fetcher=" << server_config_fetcher << ")"; - delete server_config_fetcher; + delete grpc_core::ServerConfigFetcher::FromC(server_config_fetcher); } diff --git a/src/core/server/server.h b/src/core/server/server.h index 92993a29e9e0d..a5403bda13c17 100644 --- a/src/core/server/server.h +++ b/src/core/server/server.h @@ -53,7 +53,9 @@ #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/iomgr_fwd.h" +#include "src/core/lib/iomgr/resolved_address.h" #include "src/core/lib/promise/arena_promise.h" +#include "src/core/lib/resource_quota/connection_quota.h" #include "src/core/lib/slice/slice.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/completion_queue.h" @@ -74,10 +76,48 @@ "grpc.server.max_pending_requests_hard_limit" namespace grpc_core { + +class ServerConfigFetcher + : public CppImplOf { + public: + class ConnectionManager + : public grpc_core::DualRefCounted { + public: + virtual absl::StatusOr + UpdateChannelArgsForConnection(const grpc_core::ChannelArgs& args, + grpc_endpoint* tcp) = 0; + }; + + class WatcherInterface { + public: + virtual ~WatcherInterface() = default; + // UpdateConnectionManager() is invoked by the config fetcher when a new + // config is available. Implementations should update the connection manager + // and start serving if not already serving. + virtual void UpdateConnectionManager( + grpc_core::RefCountedPtr manager) = 0; + // Implementations should stop serving when this is called. Serving should + // only resume when UpdateConfig() is invoked. + virtual void StopServing() = 0; + }; + + virtual ~ServerConfigFetcher() = default; + + virtual void StartWatch(std::string listening_address, + std::unique_ptr watcher) = 0; + virtual void CancelWatch(WatcherInterface* watcher) = 0; + virtual grpc_pollset_set* interested_parties() = 0; +}; + namespace experimental { class PassiveListenerImpl; } // namespace experimental +namespace testing { +class ServerTestPeer; +class ListenerStateTestPeer; +} // namespace testing + class Server : public ServerInterface, public InternallyRefCounted, public CppImplOf { @@ -110,27 +150,167 @@ class Server : public ServerInterface, grpc_completion_queue* cq; }; + class ListenerState; + /// Interface for listeners. - /// Implementations must override the Orphan() method, which should stop - /// listening and initiate destruction of the listener. class ListenerInterface : public InternallyRefCounted { public: + // State for a connection that is being managed by this listener. + // The LogicalConnection interface helps the server keep track of + // connections during handshake. If the server uses a config fetcher, the + // connection continues to be tracked by the server to drain connections on + // a config update. If not, the server stops the tracking after handshake is + // done. As such, implementations of `LogicalConnection` should cancel the + // handshake on `Orphan` if still in progress, but not close down the + // transport. + // Implementations are responsible for informing ListenerState about the + // following stages of a connection - + // 1) Invoke AddLogicalConnection() on accepting a new connection. Do not + // invoke if the connection is going to be closed immediately. + // 2) Invoke OnHandshakeDone() (irrespective of error) once handshake is + // done. No need to invoke if `RemoveLogicalConnection()` has already been + // invoked. + // 3) Invoke RemoveLogicalConnection() when the connection is closed. Do not + // invoke if the connection was never added. + // TODO(yashykt): In the case where there is no config fetcher, we remove + // the connection from our map and instead use `ChannelData` to keep track + // of the connections. This is much cheaper (8 bytes per connection) as + // compared to implementations of LogicalConnection which can be more than + // 24 bytes based on the chttp2 implementation. This complexity causes + // weirdness for our interfaces. Figure out a way to combine these two + // tracking systems, without increasing memory utilization. + class LogicalConnection : public InternallyRefCounted { + public: + ~LogicalConnection() override = default; + + // The following two methods are called in the context of a server config + // event. + virtual void SendGoAway() = 0; + virtual void DisconnectImmediately() = 0; + }; + ~ListenerInterface() override = default; - /// Starts listening. This listener may refer to the pollset object beyond - /// this call, so it is a pointer rather than a reference. - virtual void Start(Server* server, - const std::vector* pollsets) = 0; + /// Starts listening. + virtual void Start() = 0; /// Returns the channelz node for the listen socket, or null if not /// supported. virtual channelz::ListenSocketNode* channelz_listen_socket_node() const = 0; + virtual void SetServerListenerState( + RefCountedPtr listener_state) = 0; + + virtual const grpc_resolved_address* resolved_address() const = 0; + /// Sets a closure to be invoked by the listener when its destruction /// is complete. virtual void SetOnDestroyDone(grpc_closure* on_destroy_done) = 0; }; + // Implements the connection management and config fetching mechanism for + // listeners. + // Note that an alternative implementation would have been to combine the + // ListenerInterface and ListenerState into a single parent class, but + // they are being separated to make code simpler to understand. + class ListenerState : public RefCounted { + public: + explicit ListenerState(RefCountedPtr server, + OrphanablePtr l); + + void Start(); + + void Stop(); + + ListenerInterface* listener() { return listener_.get(); } + + Server* server() const { return server_.get(); } + + // Adds a LogicalConnection to the listener and updates the channel args if + // needed, and returns ChannelArgs if successful. + absl::optional AddLogicalConnection( + OrphanablePtr connection, + const ChannelArgs& args, grpc_endpoint* endpoint) + ABSL_LOCKS_EXCLUDED(mu_); + + void OnHandshakeDone(ListenerInterface::LogicalConnection* connection); + + // Removes the logical connection from being tracked. This could happen for + // reasons such as the connection being closed, or the connection has been + // established (including handshake) and doesn't have a server config + // fetcher. + void RemoveLogicalConnection( + ListenerInterface::LogicalConnection* connection); + + const MemoryQuotaRefPtr& memory_quota() const { return memory_quota_; } + + const ConnectionQuotaRefPtr& connection_quota() const { + return connection_quota_; + } + + grpc_event_engine::experimental::EventEngine* event_engine() const { + return event_engine_; + } + + private: + friend class grpc_core::testing::ListenerStateTestPeer; + + class ConfigFetcherWatcher : public ServerConfigFetcher::WatcherInterface { + public: + explicit ConfigFetcherWatcher(ListenerState* listener_state) + : listener_state_(listener_state) {} + + void UpdateConnectionManager( + RefCountedPtr + connection_manager) override; + + void StopServing() override; + + private: + // This doesn't need to be ref-counted since we start and stop config + // fetching as part of starting and stopping the listener. + ListenerState* const listener_state_; + }; + + struct ConnectionsToBeDrained { + absl::flat_hash_set> + connections; + grpc_core::Timestamp timestamp; + }; + + void DrainConnectionsLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + + void OnDrainGraceTimer(); + + void MaybeStartNewGraceTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + + void RemoveConnectionsToBeDrainedOnEmptyLocked( + std::deque::iterator it) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + + RefCountedPtr const server_; + MemoryQuotaRefPtr const memory_quota_; + ConnectionQuotaRefPtr connection_quota_; + grpc_event_engine::experimental::EventEngine* const event_engine_; + OrphanablePtr listener_; + grpc_closure destroy_done_; + ConfigFetcherWatcher* config_fetcher_watcher_ = nullptr; + Mutex mu_; // We could share this mutex with Listener implementations. It's + // a tradeoff between increased memory requirement and more + // granular critical regions. + RefCountedPtr connection_manager_ + ABSL_GUARDED_BY(mu_); + bool is_serving_ ABSL_GUARDED_BY(mu_) = false; + bool started_ ABSL_GUARDED_BY(mu_) = false; + absl::flat_hash_set> + connections_ ABSL_GUARDED_BY(mu_); + std::deque connections_to_be_drained_list_ + ABSL_GUARDED_BY(mu_); + grpc_event_engine::experimental::EventEngine::TaskHandle + drain_grace_timer_handle_ ABSL_GUARDED_BY(mu_) = + grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid; + }; + explicit Server(const ChannelArgs& args); ~Server() override; @@ -146,16 +326,15 @@ class Server : public ServerInterface, // result is valid for the lifetime of the server. const std::vector& pollsets() const { return pollsets_; } - grpc_server_config_fetcher* config_fetcher() const { - return config_fetcher_.get(); - } + ServerConfigFetcher* config_fetcher() const { return config_fetcher_.get(); } ServerCallTracerFactory* server_call_tracer_factory() const override { return server_call_tracer_factory_; } - void set_config_fetcher( - std::unique_ptr config_fetcher); + void set_config_fetcher(std::unique_ptr config_fetcher) { + config_fetcher_ = std::move(config_fetcher); + } bool HasOpenConnections() ABSL_LOCKS_EXCLUDED(mu_global_); @@ -173,7 +352,8 @@ class Server : public ServerInterface, grpc_error_handle SetupTransport( Transport* transport, grpc_pollset* accepting_pollset, const ChannelArgs& args, - const RefCountedPtr& socket_node); + const RefCountedPtr& socket_node) + ABSL_LOCKS_EXCLUDED(mu_global_); void RegisterCompletionQueue(grpc_completion_queue* cq); @@ -225,6 +405,9 @@ class Server : public ServerInterface, grpc_core::Server* server, grpc_server_credentials* credentials, std::shared_ptr passive_listener); + + friend class grpc_core::testing::ServerTestPeer; + struct RequestedCall; class RequestMatcherInterface; @@ -360,13 +543,6 @@ class Server : public ServerInterface, CallCombiner* call_combiner_; }; - struct Listener { - explicit Listener(OrphanablePtr l) - : listener(std::move(l)) {} - OrphanablePtr listener; - grpc_closure destroy_done; - }; - struct ShutdownTag { ShutdownTag(void* tag_arg, grpc_completion_queue* cq_arg) : tag(tag_arg), cq(cq_arg) {} @@ -459,7 +635,7 @@ class Server : public ServerInterface, ChannelArgs const channel_args_; RefCountedPtr channelz_node_; - std::unique_ptr config_fetcher_; + std::unique_ptr config_fetcher_; ServerCallTracerFactory* const server_call_tracer_factory_; std::vector cqs_; @@ -515,9 +691,11 @@ class Server : public ServerInterface, std::list channels_; absl::flat_hash_set> connections_ ABSL_GUARDED_BY(mu_global_); + RefCountedPtr connection_manager_ + ABSL_GUARDED_BY(mu_global_); size_t connections_open_ ABSL_GUARDED_BY(mu_global_) = 0; - std::list listeners_; + std::list> listener_states_; size_t listeners_destroyed_ = 0; // The last time we printed a shutdown progress message. @@ -526,44 +704,4 @@ class Server : public ServerInterface, } // namespace grpc_core -struct grpc_server_config_fetcher { - public: - class ConnectionManager - : public grpc_core::DualRefCounted { - public: - virtual absl::StatusOr - UpdateChannelArgsForConnection(const grpc_core::ChannelArgs& args, - grpc_endpoint* tcp) = 0; - }; - - class WatcherInterface { - public: - virtual ~WatcherInterface() = default; - // UpdateConnectionManager() is invoked by the config fetcher when a new - // config is available. Implementations should update the connection manager - // and start serving if not already serving. - virtual void UpdateConnectionManager( - grpc_core::RefCountedPtr manager) = 0; - // Implementations should stop serving when this is called. Serving should - // only resume when UpdateConfig() is invoked. - virtual void StopServing() = 0; - }; - - virtual ~grpc_server_config_fetcher() = default; - - virtual void StartWatch(std::string listening_address, - std::unique_ptr watcher) = 0; - virtual void CancelWatch(WatcherInterface* watcher) = 0; - virtual grpc_pollset_set* interested_parties() = 0; -}; - -namespace grpc_core { - -inline void Server::set_config_fetcher( - std::unique_ptr config_fetcher) { - config_fetcher_ = std::move(config_fetcher); -} - -} // namespace grpc_core - #endif // GRPC_SRC_CORE_SERVER_SERVER_H diff --git a/src/core/server/xds_server_config_fetcher.cc b/src/core/server/xds_server_config_fetcher.cc index ee300de6f17b0..2bc1d0ed82f3b 100644 --- a/src/core/server/xds_server_config_fetcher.cc +++ b/src/core/server/xds_server_config_fetcher.cc @@ -97,7 +97,7 @@ using ReadDelayHandle = XdsClient::ReadDelayHandle; // A server config fetcher that fetches the information for configuring server // listeners from the xDS control plane. -class XdsServerConfigFetcher final : public grpc_server_config_fetcher { +class XdsServerConfigFetcher final : public ServerConfigFetcher { public: XdsServerConfigFetcher(RefCountedPtr xds_client, grpc_server_xds_status_notifier notifier); @@ -106,12 +106,11 @@ class XdsServerConfigFetcher final : public grpc_server_config_fetcher { xds_client_.reset(DEBUG_LOCATION, "XdsServerConfigFetcher"); } - void StartWatch(std::string listening_address, - std::unique_ptr - watcher) override; + void StartWatch( + std::string listening_address, + std::unique_ptr watcher) override; - void CancelWatch( - grpc_server_config_fetcher::WatcherInterface* watcher) override; + void CancelWatch(ServerConfigFetcher::WatcherInterface* watcher) override; // Return the interested parties from the xds client so that it can be polled. grpc_pollset_set* interested_parties() override { @@ -124,7 +123,7 @@ class XdsServerConfigFetcher final : public grpc_server_config_fetcher { RefCountedPtr xds_client_; const grpc_server_xds_status_notifier serving_status_notifier_; Mutex mu_; - std::map + std::map listener_watchers_ ABSL_GUARDED_BY(mu_); }; @@ -142,7 +141,7 @@ class XdsServerConfigFetcher::ListenerWatcher final : public XdsListenerResourceType::WatcherInterface { public: ListenerWatcher(RefCountedPtr xds_client, - std::unique_ptr + std::unique_ptr server_config_watcher, grpc_server_xds_status_notifier serving_status_notifier, std::string listening_address); @@ -180,7 +179,7 @@ class XdsServerConfigFetcher::ListenerWatcher final ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); RefCountedPtr xds_client_; - const std::unique_ptr + const std::unique_ptr server_config_watcher_; const grpc_server_xds_status_notifier serving_status_notifier_; const std::string listening_address_; @@ -197,7 +196,7 @@ class XdsServerConfigFetcher::ListenerWatcher final // args that configure the right mTLS certs and cause the right set of HTTP // filters to be injected. class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager final - : public grpc_server_config_fetcher::ConnectionManager { + : public ServerConfigFetcher::ConnectionManager { public: FilterChainMatchManager(RefCountedPtr xds_client, XdsListenerResource::FilterChainMap filter_chain_map, @@ -522,8 +521,8 @@ std::string ListenerResourceName(absl::string_view resource_name_template, void XdsServerConfigFetcher::StartWatch( std::string listening_address, - std::unique_ptr watcher) { - grpc_server_config_fetcher::WatcherInterface* watcher_ptr = watcher.get(); + std::unique_ptr watcher) { + ServerConfigFetcher::WatcherInterface* watcher_ptr = watcher.get(); auto listener_watcher = MakeRefCounted( xds_client_.Ref(DEBUG_LOCATION, "ListenerWatcher"), std::move(watcher), serving_status_notifier_, listening_address); @@ -540,7 +539,7 @@ void XdsServerConfigFetcher::StartWatch( } void XdsServerConfigFetcher::CancelWatch( - grpc_server_config_fetcher::WatcherInterface* watcher) { + ServerConfigFetcher::WatcherInterface* watcher) { MutexLock lock(&mu_); auto it = listener_watchers_.find(watcher); if (it != listener_watchers_.end()) { @@ -562,7 +561,7 @@ void XdsServerConfigFetcher::CancelWatch( XdsServerConfigFetcher::ListenerWatcher::ListenerWatcher( RefCountedPtr xds_client, - std::unique_ptr + std::unique_ptr server_config_watcher, grpc_server_xds_status_notifier serving_status_notifier, std::string listening_address) @@ -1315,6 +1314,7 @@ grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create( "bootstrap file."; return nullptr; } - return new grpc_core::XdsServerConfigFetcher(std::move(*xds_client), - notifier); + return (new grpc_core::XdsServerConfigFetcher(std::move(*xds_client), + notifier)) + ->c_ptr(); } diff --git a/test/core/transport/chttp2/BUILD b/test/core/transport/chttp2/BUILD index e575b76ea9f0e..6bc9c47a15b35 100644 --- a/test/core/transport/chttp2/BUILD +++ b/test/core/transport/chttp2/BUILD @@ -520,6 +520,25 @@ grpc_cc_test( ], ) +grpc_cc_test( + name = "chttp2_server_listener_test", + srcs = ["chttp2_server_listener_test.cc"], + data = [ + "//src/core/tsi/test_creds:client.pem", + "//src/core/tsi/test_creds:server1.key", + "//src/core/tsi/test_creds:server1.pem", + ], + external_deps = ["gtest"], + language = "C++", + deps = [ + "//:gpr", + "//:grpc", + "//test/core/end2end:cq_verifier", + "//test/core/test_util:grpc_test_util", + "//test/core/test_util:grpc_test_util_base", + ], +) + grpc_fuzz_test( name = "write_size_policy_fuzztest", srcs = ["write_size_policy_fuzztest.cc"], diff --git a/test/core/transport/chttp2/chttp2_server_listener_test.cc b/test/core/transport/chttp2/chttp2_server_listener_test.cc new file mode 100644 index 0000000000000..0c771b71d7a11 --- /dev/null +++ b/test/core/transport/chttp2/chttp2_server_listener_test.cc @@ -0,0 +1,461 @@ +// +// +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#include +#include + +#include + +#include "absl/synchronization/notification.h" +#include "src/core/config/core_configuration.h" +#include "src/core/ext/transport/chttp2/server/chttp2_server.h" +#include "src/core/lib/security/credentials/credentials.h" +#include "src/core/lib/security/credentials/insecure/insecure_credentials.h" +#include "src/core/lib/security/credentials/tls/grpc_tls_credentials_options.h" +#include "src/core/lib/security/credentials/tls/tls_credentials.h" +#include "src/core/server/server.h" +#include "src/core/util/host_port.h" +#include "test/core/end2end/cq_verifier.h" +#include "test/core/test_util/mock_endpoint.h" +#include "test/core/test_util/port.h" +#include "test/core/test_util/test_config.h" +#include "test/core/test_util/tls_utils.h" + +using grpc_event_engine::experimental::EventEngine; + +namespace grpc_core { +namespace testing { + +class Chttp2ServerListenerTestPeer { + public: + explicit Chttp2ServerListenerTestPeer(NewChttp2ServerListener* listener) + : listener_(listener) {} + + static OrphanablePtr MakeListener( + const ChannelArgs& args) { + return MakeOrphanable(args); + } + + void OnAccept(grpc_endpoint* tcp, grpc_pollset* accepting_pollset, + grpc_tcp_server_acceptor* server_acceptor) { + NewChttp2ServerListener::OnAccept(listener_, tcp, accepting_pollset, + server_acceptor); + } + + RefCountedPtr Ref() { + return listener_->RefAsSubclass(); + } + + private: + NewChttp2ServerListener* listener_; +}; + +class ActiveConnectionTestPeer { + public: + explicit ActiveConnectionTestPeer( + NewChttp2ServerListener::ActiveConnection* connection) + : connection_(connection) {} + + void OnClose() { + NewChttp2ServerListener::ActiveConnection::OnClose(connection_, + absl::OkStatus()); + } + + NewChttp2ServerListener::ActiveConnection::HandshakingState* + handshaking_state() { + absl::Notification notification; + NewChttp2ServerListener::ActiveConnection::HandshakingState* + handshaking_state = nullptr; + connection_->work_serializer_.Run( + [&]() { + handshaking_state = absl::get>( + connection_->state_) + .get(); + notification.Notify(); + }, + DEBUG_LOCATION); + notification.WaitForNotificationWithTimeout(absl::Seconds(5) * + grpc_test_slowdown_factor()); + return handshaking_state; + } + + private: + NewChttp2ServerListener::ActiveConnection* connection_; +}; + +class HandshakingStateTestPeer { + public: + explicit HandshakingStateTestPeer( + NewChttp2ServerListener::ActiveConnection::HandshakingState* + handshaking_state) + : handshaking_state_(handshaking_state) {} + + RefCountedPtr + Ref() { + return handshaking_state_->Ref(); + } + + bool WaitForSettingsFrame() { + absl::Notification settings_received_notification; + absl::Time deadline = + absl::Now() + absl::Seconds(5) * grpc_test_slowdown_factor(); + // When settings frame is received, the handshaking state will no longer + // have a valid timer. + do { + absl::Notification callback_done; + handshaking_state_->connection_->work_serializer_.Run( + [&]() { + if (!handshaking_state_->timer_handle_.has_value()) { + settings_received_notification.Notify(); + } + callback_done.Notify(); + }, + DEBUG_LOCATION); + if (!callback_done.WaitForNotificationWithDeadline(deadline)) { + break; + } + } while (!settings_received_notification.HasBeenNotified() && + absl::Now() < deadline); + return settings_received_notification.HasBeenNotified(); + } + + private: + NewChttp2ServerListener::ActiveConnection::HandshakingState* + handshaking_state_; +}; + +class ServerTestPeer { + public: + explicit ServerTestPeer(Server* server) : server_(server) {} + + const std::list>& listener_states() + const { + return server_->listener_states_; + } + + private: + Server* server_; +}; + +class ListenerStateTestPeer { + public: + explicit ListenerStateTestPeer(Server::ListenerState* listener_state) + : listener_state_(listener_state) {} + + // Returns the number of connections currently being actively tracked + size_t ConnectionsSize() { + MutexLock lock(&listener_state_->mu_); + return listener_state_->connections_.size(); + } + + private: + Server::ListenerState* listener_state_; +}; + +namespace { + +class Chttp2ServerListenerTest : public ::testing::Test { + protected: + void SetUpServer(const RefCountedPtr& creds = + MakeRefCounted()) { + args_ = CoreConfiguration::Get() + .channel_args_preconditioning() + .PreconditionChannelArgs(nullptr); + server_ = MakeOrphanable(args_); + grpc_server_add_http2_port( + server_->c_ptr(), + JoinHostPort("localhost", grpc_pick_unused_port_or_die()).c_str(), + creds.get()); + cq_ = grpc_completion_queue_create_for_next(/*reserved=*/nullptr); + server_->RegisterCompletionQueue(cq_); + grpc_server_start(server_->c_ptr()); + listener_state_ = + ServerTestPeer(server_.get()).listener_states().front().get(); + listener_ = DownCast(listener_state_->listener()); + } + + void TearDown() override { + CqVerifier cqv(cq_); + grpc_server_shutdown_and_notify(server_->c_ptr(), cq_, CqVerifier::tag(-1)); + cqv.Expect(CqVerifier::tag(-1), true); + cqv.Verify(); + server_.reset(); + grpc_completion_queue_destroy(cq_); + } + + ChannelArgs args_; + OrphanablePtr server_; + Server::ListenerState* listener_state_; + NewChttp2ServerListener* listener_; + grpc_completion_queue* cq_ = nullptr; +}; + +TEST_F(Chttp2ServerListenerTest, Basic) { + SetUpServer(); + listener_state_->connection_quota()->SetMaxIncomingConnections(10); + auto mock_endpoint_controller = + grpc_event_engine::experimental::MockEndpointController::Create( + args_.GetObjectRef()); + Chttp2ServerListenerTestPeer(listener_).OnAccept( + /*tcp=*/mock_endpoint_controller->TakeCEndpoint(), + /*accepting_pollset=*/nullptr, + /*server_acceptor=*/nullptr); + EXPECT_EQ( + listener_state_->connection_quota()->TestOnlyActiveIncomingConnections(), + 1); +} + +TEST_F(Chttp2ServerListenerTest, NoConnectionQuota) { + SetUpServer(); + listener_state_->connection_quota()->SetMaxIncomingConnections(0); + auto mock_endpoint_controller = + grpc_event_engine::experimental::MockEndpointController::Create( + args_.GetObjectRef()); + Chttp2ServerListenerTestPeer(listener_).OnAccept( + /*tcp=*/mock_endpoint_controller->TakeCEndpoint(), + /*accepting_pollset=*/nullptr, + /*server_acceptor=*/nullptr); + EXPECT_EQ( + listener_state_->connection_quota()->TestOnlyActiveIncomingConnections(), + 0); +} + +TEST_F(Chttp2ServerListenerTest, ConnectionRefusedAfterShutdown) { + SetUpServer(); + listener_state_->connection_quota()->SetMaxIncomingConnections(10); + // Take ref on listener to prevent destruction of listener + RefCountedPtr listener_ref = + Chttp2ServerListenerTestPeer(listener_).Ref(); + grpc_server_shutdown_and_notify(server_->c_ptr(), cq_, CqVerifier::tag(1)); + auto mock_endpoint_controller = + grpc_event_engine::experimental::MockEndpointController::Create( + args_.GetObjectRef()); + Chttp2ServerListenerTestPeer(listener_).OnAccept( + /*tcp=*/mock_endpoint_controller->TakeCEndpoint(), + /*accepting_pollset=*/nullptr, + /*server_acceptor=*/nullptr); + EXPECT_EQ( + listener_state_->connection_quota()->TestOnlyActiveIncomingConnections(), + 0); + // Let go of the ref to allow server shutdown to complete. + { + // TODO(yashykt): Remove ExecCtx when we are no longer using it for shutdown + // notification. + ExecCtx exec_ctx; + listener_ref.reset(); + } + CqVerifier cqv(cq_); + cqv.Expect(CqVerifier::tag(1), true); + cqv.Verify(); +} + +using Chttp2ActiveConnectionTest = Chttp2ServerListenerTest; + +TEST_F(Chttp2ActiveConnectionTest, CloseWithoutHandshakeStarting) { + SetUpServer(); + listener_state_->connection_quota()->SetMaxIncomingConnections(10); + // Add a connection + ASSERT_TRUE(listener_state_->connection_quota()->AllowIncomingConnection( + listener_state_->memory_quota(), "peer")); + auto connection = MakeOrphanable( + listener_state_->Ref(), /*tcp_server=*/nullptr, + /*accepting_pollset=*/nullptr, + /*acceptor=*/nullptr, args_, + listener_state_->memory_quota()->CreateMemoryOwner(), nullptr); + EXPECT_EQ( + listener_state_->connection_quota()->TestOnlyActiveIncomingConnections(), + 1); + connection->RefAsSubclass() + .release(); // Ref for OnClose + // On close, the connection count should go back to 0. + ActiveConnectionTestPeer(connection.get()).OnClose(); + EXPECT_EQ( + listener_state_->connection_quota()->TestOnlyActiveIncomingConnections(), + 0); +} + +RefCountedPtr CreateSecureServerCredentials() { + std::string ca_cert = + testing::GetFileContents("src/core/tsi/test_creds/client.pem"); + std::string server_cert = + testing::GetFileContents("src/core/tsi/test_creds/server1.pem"); + std::string server_key = + testing::GetFileContents("src/core/tsi/test_creds/server1.key"); + grpc_tls_credentials_options* options = grpc_tls_credentials_options_create(); + // Set credential provider. + grpc_tls_identity_pairs* server_pairs = grpc_tls_identity_pairs_create(); + grpc_tls_identity_pairs_add_pair(server_pairs, server_key.c_str(), + server_cert.c_str()); + grpc_tls_certificate_provider* server_provider = + grpc_tls_certificate_provider_static_data_create(ca_cert.c_str(), + server_pairs); + grpc_tls_credentials_options_set_certificate_provider(options, + server_provider); + grpc_tls_certificate_provider_release(server_provider); + grpc_tls_credentials_options_watch_root_certs(options); + grpc_tls_credentials_options_watch_identity_key_cert_pairs(options); + // Set client certificate request type. + grpc_tls_credentials_options_set_cert_request_type( + options, GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY); + grpc_server_credentials* creds = grpc_tls_server_credentials_create(options); + return RefCountedPtr(creds); +} + +TEST_F(Chttp2ActiveConnectionTest, CloseDuringHandshake) { + // Use TlsCreds to make sure handshake doesn't complete. + SetUpServer(CreateSecureServerCredentials()); + listener_state_->connection_quota()->SetMaxIncomingConnections(10); + // Add a connection + ASSERT_TRUE(listener_state_->connection_quota()->AllowIncomingConnection( + listener_state_->memory_quota(), "peer")); + auto connection = MakeOrphanable( + listener_state_->Ref(), /*tcp_server=*/nullptr, + /*accepting_pollset=*/nullptr, + /*acceptor=*/nullptr, args_, + listener_state_->memory_quota()->CreateMemoryOwner(), nullptr); + EXPECT_EQ( + listener_state_->connection_quota()->TestOnlyActiveIncomingConnections(), + 1); + connection->RefAsSubclass() + .release(); // Ref for OnClose + // On close, the connection count should go back to 0. + ActiveConnectionTestPeer(connection.get()).OnClose(); + EXPECT_EQ( + listener_state_->connection_quota()->TestOnlyActiveIncomingConnections(), + 0); +} + +TEST_F(Chttp2ActiveConnectionTest, CloseAfterHandshakeButBeforeSettingsFrame) { + SetUpServer(); + listener_state_->connection_quota()->SetMaxIncomingConnections(10); + // Add a connection + auto mock_endpoint_controller = + grpc_event_engine::experimental::MockEndpointController::Create( + args_.GetObjectRef()); + mock_endpoint_controller->NoMoreReads(); + ASSERT_TRUE(listener_state_->connection_quota()->AllowIncomingConnection( + listener_state_->memory_quota(), "peer")); + auto connection = MakeOrphanable( + listener_state_->Ref(), /*tcp_server=*/nullptr, + /*accepting_pollset=*/nullptr, + /*acceptor=*/nullptr, args_, + listener_state_->memory_quota()->CreateMemoryOwner(), + OrphanablePtr(mock_endpoint_controller->TakeCEndpoint())); + auto* connection_ptr = connection.get(); + listener_state_->AddLogicalConnection(std::move(connection), args_, + /*endpoint=*/nullptr); + EXPECT_EQ( + listener_state_->connection_quota()->TestOnlyActiveIncomingConnections(), + 1); + connection_ptr->Start(args_); + // Wait for handshake to be done. When handshake is done, the connection will + // be removed from the ListenerState's connection map since there is no config + // fetcher. + absl::Time test_deadline = + absl::Now() + absl::Seconds(10) * grpc_test_slowdown_factor(); + while (ListenerStateTestPeer(listener_state_).ConnectionsSize() != 0) { + ASSERT_LE(absl::Now(), test_deadline); + // Yield for other threads to make progress. + // If the test turns out to be flaky, convert this to a sleep. + std::this_thread::yield(); + } + // Trigger close from the server + grpc_server_cancel_all_calls(server_->c_ptr()); + while (listener_state_->connection_quota() + ->TestOnlyActiveIncomingConnections() != 0) { + ASSERT_LE(absl::Now(), test_deadline); + // Yield for other threads to make progress. + // If the test turns out to be flaky, convert this to a sleep. + std::this_thread::yield(); + } +} + +TEST_F(Chttp2ActiveConnectionTest, CloseAfterSettingsFrame) { + SetUpServer(); + listener_state_->connection_quota()->SetMaxIncomingConnections(10); + // Add a connection + auto mock_endpoint_controller = + grpc_event_engine::experimental::MockEndpointController::Create( + args_.GetObjectRef()); + // Provide settings frame to the mock endpoint + mock_endpoint_controller->TriggerReadEvent( + grpc_event_engine::experimental::Slice::FromCopiedString( + "PRI * " + "HTTP/2.0\r\n\r\nSM\r\n\r\n\x00\x00\x00\x04\x00\x00\x00\x00\x00")); + mock_endpoint_controller->NoMoreReads(); + ASSERT_TRUE(listener_state_->connection_quota()->AllowIncomingConnection( + listener_state_->memory_quota(), "peer")); + auto connection = MakeOrphanable( + listener_state_->Ref(), /*tcp_server=*/nullptr, + /*accepting_pollset=*/nullptr, + /*acceptor=*/nullptr, args_, + listener_state_->memory_quota()->CreateMemoryOwner(), + OrphanablePtr(mock_endpoint_controller->TakeCEndpoint())); + auto* connection_ptr = connection.get(); + // Keep the handshaking state alive for verification + RefCountedPtr + handshaking_state = + HandshakingStateTestPeer( + ActiveConnectionTestPeer(connection_ptr).handshaking_state()) + .Ref(); + listener_state_->AddLogicalConnection(std::move(connection), args_, + /*endpoint=*/nullptr); + EXPECT_EQ( + listener_state_->connection_quota()->TestOnlyActiveIncomingConnections(), + 1); + connection_ptr->Start(args_); + // Wait for handshake to be done. When handshake is done, the connection will + // be removed from the ListenerState's connection map since there is no config + // fetcher. + absl::Time test_deadline = + absl::Now() + absl::Seconds(10) * grpc_test_slowdown_factor(); + while (ListenerStateTestPeer(listener_state_).ConnectionsSize() != 0) { + ASSERT_LE(absl::Now(), test_deadline); + // Yield for other threads to make progress. + // If the test turns out to be flaky, convert this to a sleep. + std::this_thread::yield(); + } + // Wait for settings frame to be received. + ASSERT_TRUE( + HandshakingStateTestPeer(handshaking_state.get()).WaitForSettingsFrame()); + // Trigger close from the server + grpc_server_cancel_all_calls(server_->c_ptr()); + while (listener_state_->connection_quota() + ->TestOnlyActiveIncomingConnections() != 0) { + ASSERT_LE(absl::Now(), test_deadline); + // Yield for other threads to make progress. + // If the test turns out to be flaky, convert this to a sleep. + std::this_thread::yield(); + } +} + +} // namespace +} // namespace testing +} // namespace grpc_core + +int main(int argc, char** argv) { + grpc_core::ForceEnableExperiment("work_serializer_dispatch", true); + grpc_core::ForceEnableExperiment("server_listener", true); + grpc::testing::TestEnvironment env(&argc, argv); + ::testing::InitGoogleTest(&argc, argv); + grpc_init(); + int ret = RUN_ALL_TESTS(); + grpc_shutdown(); + return ret; +} diff --git a/test/cpp/end2end/xds/xds_enabled_server_end2end_test.cc b/test/cpp/end2end/xds/xds_enabled_server_end2end_test.cc index 03c15fe7db664..0f50dd22bdb11 100644 --- a/test/cpp/end2end/xds/xds_enabled_server_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_enabled_server_end2end_test.cc @@ -298,8 +298,6 @@ TEST_P(XdsEnabledServerStatusNotificationTest, RepeatedServingStatusChanges) { } TEST_P(XdsEnabledServerStatusNotificationTest, ExistingRpcsOnResourceDeletion) { - // Send a valid LDS update to get the server to start listening - SetValidLdsUpdate(); StartBackend(0); ASSERT_TRUE(backends_[0]->WaitOnServingStatusChange(grpc::StatusCode::OK)); constexpr int kNumChannels = 10; @@ -356,8 +354,6 @@ TEST_P(XdsEnabledServerStatusNotificationTest, ExistingRpcsFailOnResourceUpdateAfterDrainGraceTimeExpires) { constexpr int kDrainGraceTimeMs = 100; xds_drain_grace_time_ms_ = kDrainGraceTimeMs; - // Send a valid LDS update to get the server to start listening - SetValidLdsUpdate(); StartBackend(0); ASSERT_TRUE(backends_[0]->WaitOnServingStatusChange(grpc::StatusCode::OK)); constexpr int kNumChannels = 10; diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 9af9a5b3b1311..0250cfb95cce2 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -1983,6 +1983,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "chttp2_server_listener_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": true + }, { "args": [], "benchmark": false,