Skip to content

Commit

Permalink
[EventEngine] Migrate chttp2_server to use EE DNSResolver (grpc#37853)
Browse files Browse the repository at this point in the history
<!--

If you know who should review your pull request, please assign it to that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the appropriate
lang label.

-->

Closes grpc#37853

COPYBARA_INTEGRATE_REVIEW=grpc#37853 from yijiem:dns-migration-chttp2-server b830720
PiperOrigin-RevId: 684631881
  • Loading branch information
yijiem authored and copybara-github committed Oct 11, 2024
1 parent efde053 commit 8afaf83
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 30 deletions.
5 changes: 5 additions & 0 deletions bazel/experiments.bzl

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2371,9 +2371,11 @@ grpc_cc_library(
hdrs = ["lib/event_engine/utils.h"],
external_deps = [
"absl/log:check",
"absl/status:statusor",
"absl/strings",
],
deps = [
"notification",
"time",
"//:event_engine_base_hdrs",
"//:gpr_platform",
Expand Down Expand Up @@ -7647,8 +7649,11 @@ grpc_cc_library(
"connection_quota",
"error",
"error_utils",
"event_engine_common",
"event_engine_extensions",
"event_engine_query_extensions",
"event_engine_tcp_socket_utils",
"event_engine_utils",
"grpc_insecure_credentials",
"handshaker_registry",
"iomgr_fwd",
Expand Down
87 changes: 57 additions & 30 deletions src/core/ext/transport/chttp2/server/chttp2_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/event_engine/extensions/supports_fd.h"
#include "src/core/lib/event_engine/query_extensions.h"
#include "src/core/lib/event_engine/resolved_address_internal.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/utils.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
Expand Down Expand Up @@ -113,7 +116,8 @@ using AcceptorPtr = std::unique_ptr<grpc_tcp_server_acceptor, AcceptorDeleter>;

class Chttp2ServerListener : public Server::ListenerInterface {
public:
static grpc_error_handle Create(Server* server, grpc_resolved_address* addr,
static grpc_error_handle Create(Server* server,
const EventEngine::ResolvedAddress& addr,
const ChannelArgs& args,
Chttp2ServerArgsModifier args_modifier,
int* port_num);
Expand Down Expand Up @@ -701,8 +705,9 @@ void Chttp2ServerListener::ActiveConnection::OnDrainGraceTimeExpiry() {
//

grpc_error_handle Chttp2ServerListener::Create(
Server* server, grpc_resolved_address* addr, const ChannelArgs& args,
Chttp2ServerArgsModifier args_modifier, int* port_num) {
Server* server, const EventEngine::ResolvedAddress& addr,
const ChannelArgs& args, Chttp2ServerArgsModifier args_modifier,
int* port_num) {
// Create Chttp2ServerListener.
OrphanablePtr<Chttp2ServerListener> listener =
MakeOrphanable<Chttp2ServerListener>(server, args, args_modifier,
Expand All @@ -714,18 +719,24 @@ grpc_error_handle Chttp2ServerListener::Create(
&listener->tcp_server_shutdown_complete_, ChannelArgsEndpointConfig(args),
OnAccept, listener.get(), &listener->tcp_server_);
if (!error.ok()) return error;
// TODO(yijiem): remove this conversion when we remove all
// grpc_resolved_address usages.
grpc_resolved_address iomgr_addr =
grpc_event_engine::experimental::CreateGRPCResolvedAddress(addr);
if (listener->config_fetcher_ != nullptr) {
listener->resolved_address_ = *addr;
listener->resolved_address_ = iomgr_addr;
// TODO(yashykt): Consider binding so as to be able to return the port
// number.
} else {
error = grpc_tcp_server_add_port(listener->tcp_server_, addr, port_num);
error =
grpc_tcp_server_add_port(listener->tcp_server_, &iomgr_addr, port_num);
if (!error.ok()) return error;
}
// Create channelz node.
if (args.GetBool(GRPC_ARG_ENABLE_CHANNELZ)
.value_or(GRPC_ENABLE_CHANNELZ_DEFAULT)) {
auto string_address = grpc_sockaddr_to_uri(addr);
auto string_address =
grpc_event_engine::experimental::ResolvedAddressToString(addr);
if (!string_address.ok()) {
return GRPC_ERROR_CREATE(string_address.status().ToString());
}
Expand Down Expand Up @@ -955,37 +966,53 @@ grpc_error_handle Chttp2ServerAddPort(Server* server, const char* addr,
args_modifier);
}
*port_num = -1;
absl::StatusOr<std::vector<grpc_resolved_address>> resolved_or;
absl::StatusOr<std::vector<EventEngine::ResolvedAddress>> results =
std::vector<EventEngine::ResolvedAddress>();
std::vector<grpc_error_handle> error_list;
std::string parsed_addr = URI::PercentDecode(addr);
absl::string_view parsed_addr_unprefixed{parsed_addr};
// Using lambda to avoid use of goto.
grpc_error_handle error = [&]() {
grpc_error_handle error;
if (absl::ConsumePrefix(&parsed_addr_unprefixed, kUnixUriPrefix)) {
resolved_or = grpc_resolve_unix_domain_address(parsed_addr_unprefixed);
} else if (absl::ConsumePrefix(&parsed_addr_unprefixed,
kUnixAbstractUriPrefix)) {
resolved_or =
grpc_resolve_unix_abstract_domain_address(parsed_addr_unprefixed);
} else if (absl::ConsumePrefix(&parsed_addr_unprefixed, kVSockUriPrefix)) {
resolved_or = grpc_resolve_vsock_address(parsed_addr_unprefixed);
if (absl::ConsumePrefix(&parsed_addr_unprefixed, kUnixUriPrefix) ||
absl::ConsumePrefix(&parsed_addr_unprefixed, kUnixAbstractUriPrefix) ||
absl::ConsumePrefix(&parsed_addr_unprefixed, kVSockUriPrefix)) {
absl::StatusOr<EventEngine::ResolvedAddress> result =
grpc_event_engine::experimental::URIToResolvedAddress(parsed_addr);
GRPC_RETURN_IF_ERROR(result.status());
results->push_back(*result);
} else {
resolved_or =
GetDNSResolver()->LookupHostnameBlocking(parsed_addr, "https");
}
if (!resolved_or.ok()) {
return absl_status_to_grpc_error(resolved_or.status());
if (IsEventEngineDnsNonClientChannelEnabled()) {
absl::StatusOr<std::unique_ptr<EventEngine::DNSResolver>> ee_resolver =
args.GetObjectRef<EventEngine>()->GetDNSResolver(
EventEngine::DNSResolver::ResolverOptions());
GRPC_RETURN_IF_ERROR(ee_resolver.status());
results = grpc_event_engine::experimental::LookupHostnameBlocking(
ee_resolver->get(), parsed_addr, "https");
} else {
// TODO(yijiem): Remove this after event_engine_dns_non_client_channel
// is fully enabled.
absl::StatusOr<std::vector<grpc_resolved_address>> iomgr_results =
GetDNSResolver()->LookupHostnameBlocking(parsed_addr, "https");
GRPC_RETURN_IF_ERROR(iomgr_results.status());
for (const auto& addr : *iomgr_results) {
results->push_back(
grpc_event_engine::experimental::CreateResolvedAddress(addr));
}
}
}
GRPC_RETURN_IF_ERROR(results.status());
// Create a listener for each resolved address.
for (auto& addr : *resolved_or) {
for (EventEngine::ResolvedAddress& addr : *results) {
// If address has a wildcard port (0), use the same port as a previous
// listener.
if (*port_num != -1 && grpc_sockaddr_get_port(&addr) == 0) {
grpc_sockaddr_set_port(&addr, *port_num);
if (*port_num != -1 &&
grpc_event_engine::experimental::ResolvedAddressGetPort(addr) == 0) {
grpc_event_engine::experimental::ResolvedAddressSetPort(addr,
*port_num);
}
int port_temp = -1;
error = Chttp2ServerListener::Create(server, &addr, args, args_modifier,
error = Chttp2ServerListener::Create(server, addr, args, args_modifier,
&port_temp);
if (!error.ok()) {
error_list.push_back(error);
Expand All @@ -997,17 +1024,17 @@ grpc_error_handle Chttp2ServerAddPort(Server* server, const char* addr,
}
}
}
if (error_list.size() == resolved_or->size()) {
if (error_list.size() == results->size()) {
std::string msg = absl::StrFormat(
"No address added out of total %" PRIuPTR " resolved for '%s'",
resolved_or->size(), addr);
results->size(), addr);
return GRPC_ERROR_CREATE_REFERENCING(msg.c_str(), error_list.data(),
error_list.size());
} else if (!error_list.empty()) {
std::string msg = absl::StrFormat(
"Only %" PRIuPTR " addresses added out of total %" PRIuPTR
" resolved",
resolved_or->size() - error_list.size(), resolved_or->size());
std::string msg =
absl::StrFormat("Only %" PRIuPTR
" addresses added out of total %" PRIuPTR " resolved",
results->size() - error_list.size(), results->size());
error = GRPC_ERROR_CREATE_REFERENCING(msg.c_str(), error_list.data(),
error_list.size());
LOG(INFO) << "WARNING: " << StatusToString(error);
Expand Down
3 changes: 3 additions & 0 deletions src/core/lib/event_engine/resolved_address.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ EventEngine::ResolvedAddress CreateResolvedAddress(

grpc_resolved_address CreateGRPCResolvedAddress(
const EventEngine::ResolvedAddress& ra) {
static_assert(
GRPC_MAX_SOCKADDR_SIZE == EventEngine::ResolvedAddress::MAX_SIZE_BYTES,
"size should match");
grpc_resolved_address grpc_addr;
memset(&grpc_addr, 0, sizeof(grpc_resolved_address));
memcpy(grpc_addr.addr, ra.address(), ra.size());
Expand Down
16 changes: 16 additions & 0 deletions src/core/lib/event_engine/utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <algorithm>

#include "absl/strings/str_cat.h"
#include "src/core/util/notification.h"
#include "src/core/util/time.h"

namespace grpc_event_engine {
Expand All @@ -38,5 +39,20 @@ grpc_core::Timestamp ToTimestamp(grpc_core::Timestamp now,
grpc_core::Duration::Milliseconds(1);
}

absl::StatusOr<std::vector<EventEngine::ResolvedAddress>>
LookupHostnameBlocking(EventEngine::DNSResolver* dns_resolver,
absl::string_view name, absl::string_view default_port) {
absl::StatusOr<std::vector<EventEngine::ResolvedAddress>> results;
grpc_core::Notification done;
dns_resolver->LookupHostname(
[&](absl::StatusOr<std::vector<EventEngine::ResolvedAddress>> addresses) {
results = std::move(addresses);
done.Notify();
},
name, default_port);
done.WaitForNotification();
return results;
}

} // namespace experimental
} // namespace grpc_event_engine
7 changes: 7 additions & 0 deletions src/core/lib/event_engine/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
#include <stdint.h>

#include <string>
#include <vector>

#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "src/core/util/time.h"

namespace grpc_event_engine {
Expand All @@ -36,6 +39,10 @@ std::string HandleToString(const Handle& handle) {
grpc_core::Timestamp ToTimestamp(grpc_core::Timestamp now,
EventEngine::Duration delta);

absl::StatusOr<std::vector<EventEngine::ResolvedAddress>>
LookupHostnameBlocking(EventEngine::DNSResolver* dns_resolver,
absl::string_view name, absl::string_view default_port);

} // namespace experimental
} // namespace grpc_event_engine

Expand Down
27 changes: 27 additions & 0 deletions src/core/lib/experiments/experiments.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions src/core/lib/experiments/experiments.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 8afaf83

Please sign in to comment.