Skip to content

Commit

Permalink
Merge branch 'master' into LogicalConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
yashykt committed Oct 19, 2024
2 parents 57e9f67 + 1b6601b commit bd4d59a
Show file tree
Hide file tree
Showing 11 changed files with 162 additions and 38 deletions.
5 changes: 5 additions & 0 deletions bazel/experiments.bzl

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2378,9 +2378,11 @@ grpc_cc_library(
hdrs = ["lib/event_engine/utils.h"],
external_deps = [
"absl/log:check",
"absl/status:statusor",
"absl/strings",
],
deps = [
"notification",
"time",
"//:event_engine_base_hdrs",
"//:gpr_platform",
Expand Down Expand Up @@ -7658,8 +7660,11 @@ grpc_cc_library(
"connection_quota",
"error",
"error_utils",
"event_engine_common",
"event_engine_extensions",
"event_engine_query_extensions",
"event_engine_tcp_socket_utils",
"event_engine_utils",
"grpc_insecure_credentials",
"handshaker_registry",
"iomgr_fwd",
Expand Down
117 changes: 81 additions & 36 deletions src/core/ext/transport/chttp2/server/chttp2_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/event_engine/extensions/supports_fd.h"
#include "src/core/lib/event_engine/query_extensions.h"
#include "src/core/lib/event_engine/resolved_address_internal.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/utils.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
Expand Down Expand Up @@ -123,7 +126,8 @@ Timestamp GetConnectionDeadline(const ChannelArgs& args) {

class Chttp2ServerListener : public Server::ListenerInterface {
public:
static grpc_error_handle Create(Server* server, grpc_resolved_address* addr,
static grpc_error_handle Create(Server* server,
const EventEngine::ResolvedAddress& addr,
const ChannelArgs& args, int* port_num);

static grpc_error_handle CreateWithAcceptor(Server* server, const char* name,
Expand Down Expand Up @@ -706,10 +710,9 @@ void Chttp2ServerListener::ActiveConnection::OnDrainGraceTimeExpiry() {
// Chttp2ServerListener
//

grpc_error_handle Chttp2ServerListener::Create(Server* server,
grpc_resolved_address* addr,
const ChannelArgs& args,
int* port_num) {
grpc_error_handle Chttp2ServerListener::Create(
Server* server, const EventEngine::ResolvedAddress& addr,
const ChannelArgs& args, int* port_num) {
// Create Chttp2ServerListener.
OrphanablePtr<Chttp2ServerListener> listener =
MakeOrphanable<Chttp2ServerListener>(server, args,
Expand All @@ -721,18 +724,24 @@ grpc_error_handle Chttp2ServerListener::Create(Server* server,
&listener->tcp_server_shutdown_complete_, ChannelArgsEndpointConfig(args),
OnAccept, listener.get(), &listener->tcp_server_);
if (!error.ok()) return error;
// TODO(yijiem): remove this conversion when we remove all
// grpc_resolved_address usages.
grpc_resolved_address iomgr_addr =
grpc_event_engine::experimental::CreateGRPCResolvedAddress(addr);
if (listener->config_fetcher_ != nullptr) {
listener->resolved_address_ = *addr;
listener->resolved_address_ = iomgr_addr;
// TODO(yashykt): Consider binding so as to be able to return the port
// number.
} else {
error = grpc_tcp_server_add_port(listener->tcp_server_, addr, port_num);
error =
grpc_tcp_server_add_port(listener->tcp_server_, &iomgr_addr, port_num);
if (!error.ok()) return error;
}
// Create channelz node.
if (args.GetBool(GRPC_ARG_ENABLE_CHANNELZ)
.value_or(GRPC_ENABLE_CHANNELZ_DEFAULT)) {
auto string_address = grpc_sockaddr_to_uri(addr);
auto string_address =
grpc_event_engine::experimental::ResolvedAddressToString(addr);
if (!string_address.ok()) {
return GRPC_ERROR_CREATE(string_address.status().ToString());
}
Expand Down Expand Up @@ -964,7 +973,8 @@ void Chttp2ServerListener::Orphan() {
// New ChttpServerListener used if experiment "server_listener" is enabled
class NewChttp2ServerListener : public Server::ListenerInterface {
public:
static grpc_error_handle Create(Server* server, grpc_resolved_address* addr,
static grpc_error_handle Create(Server* server,
const EventEngine::ResolvedAddress& addr,
const ChannelArgs& args, int* port_num);

static grpc_error_handle CreateWithAcceptor(Server* server, const char* name,
Expand Down Expand Up @@ -1405,10 +1415,9 @@ void NewChttp2ServerListener::ActiveConnection::
// NewChttp2ServerListener
//

grpc_error_handle NewChttp2ServerListener::Create(Server* server,
grpc_resolved_address* addr,
const ChannelArgs& args,
int* port_num) {
grpc_error_handle NewChttp2ServerListener::Create(
Server* server, const EventEngine::ResolvedAddress& addr,
const ChannelArgs& args, int* port_num) {
// Create NewChttp2ServerListener.
OrphanablePtr<NewChttp2ServerListener> listener =
MakeOrphanable<NewChttp2ServerListener>(args);
Expand All @@ -1419,22 +1428,28 @@ grpc_error_handle NewChttp2ServerListener::Create(Server* server,
&listener->tcp_server_shutdown_complete_, ChannelArgsEndpointConfig(args),
OnAccept, listener.get(), &listener->tcp_server_);
if (!error.ok()) return error;
// TODO(yijiem): remove this conversion when we remove all
// grpc_resolved_address usages.
grpc_resolved_address iomgr_addr =
grpc_event_engine::experimental::CreateGRPCResolvedAddress(addr);
if (server->config_fetcher() != nullptr) {
// TODO(yashykt): Consider binding so as to be able to return the port
// number.
listener->resolved_address_ = *addr;
listener->resolved_address_ = iomgr_addr;
{
MutexLock lock(&listener->mu_);
listener->add_port_on_start_ = true;
}
} else {
error = grpc_tcp_server_add_port(listener->tcp_server_, addr, port_num);
error =
grpc_tcp_server_add_port(listener->tcp_server_, &iomgr_addr, port_num);
if (!error.ok()) return error;
}
// Create channelz node.
if (args.GetBool(GRPC_ARG_ENABLE_CHANNELZ)
.value_or(GRPC_ENABLE_CHANNELZ_DEFAULT)) {
auto string_address = grpc_sockaddr_to_uri(addr);
auto string_address =
grpc_event_engine::experimental::ResolvedAddressToString(addr);
if (!string_address.ok()) {
return GRPC_ERROR_CREATE(string_address.status().ToString());
}
Expand Down Expand Up @@ -1621,41 +1636,71 @@ grpc_error_handle Chttp2ServerAddPort(Server* server, const char* addr,
}
}
*port_num = -1;
absl::StatusOr<std::vector<grpc_resolved_address>> resolved_or;
absl::StatusOr<std::vector<grpc_resolved_address>> resolved;
absl::StatusOr<std::vector<EventEngine::ResolvedAddress>> results =
std::vector<EventEngine::ResolvedAddress>();
std::vector<grpc_error_handle> error_list;
std::string parsed_addr = URI::PercentDecode(addr);
absl::string_view parsed_addr_unprefixed{parsed_addr};
// Using lambda to avoid use of goto.
grpc_error_handle error = [&]() {
grpc_error_handle error;
// TODO(ladynana, yijiem): this code does not handle address URIs correctly:
// it's parsing `unix://foo/bar` as path `/foo/bar` when it should be
// parsing it as authority `foo` and path `/bar`. Also add API documentation
// on the valid URIs that grpc_server_add_http2_port accepts.
if (absl::ConsumePrefix(&parsed_addr_unprefixed, kUnixUriPrefix)) {
resolved_or = grpc_resolve_unix_domain_address(parsed_addr_unprefixed);
resolved = grpc_resolve_unix_domain_address(parsed_addr_unprefixed);
GRPC_RETURN_IF_ERROR(resolved.status());
} else if (absl::ConsumePrefix(&parsed_addr_unprefixed,
kUnixAbstractUriPrefix)) {
resolved_or =
resolved =
grpc_resolve_unix_abstract_domain_address(parsed_addr_unprefixed);
GRPC_RETURN_IF_ERROR(resolved.status());
} else if (absl::ConsumePrefix(&parsed_addr_unprefixed, kVSockUriPrefix)) {
resolved_or = grpc_resolve_vsock_address(parsed_addr_unprefixed);
resolved = grpc_resolve_vsock_address(parsed_addr_unprefixed);
GRPC_RETURN_IF_ERROR(resolved.status());
} else {
resolved_or =
GetDNSResolver()->LookupHostnameBlocking(parsed_addr, "https");
if (IsEventEngineDnsNonClientChannelEnabled()) {
absl::StatusOr<std::unique_ptr<EventEngine::DNSResolver>> ee_resolver =
args.GetObjectRef<EventEngine>()->GetDNSResolver(
EventEngine::DNSResolver::ResolverOptions());
GRPC_RETURN_IF_ERROR(ee_resolver.status());
results = grpc_event_engine::experimental::LookupHostnameBlocking(
ee_resolver->get(), parsed_addr, "https");
} else {
// TODO(yijiem): Remove this after event_engine_dns_non_client_channel
// is fully enabled.
absl::StatusOr<std::vector<grpc_resolved_address>> iomgr_results =
GetDNSResolver()->LookupHostnameBlocking(parsed_addr, "https");
GRPC_RETURN_IF_ERROR(iomgr_results.status());
for (const auto& addr : *iomgr_results) {
results->push_back(
grpc_event_engine::experimental::CreateResolvedAddress(addr));
}
}
}
if (!resolved_or.ok()) {
return absl_status_to_grpc_error(resolved_or.status());
if (resolved.ok()) {
for (const auto& addr : *resolved) {
results->push_back(
grpc_event_engine::experimental::CreateResolvedAddress(addr));
}
}
GRPC_RETURN_IF_ERROR(results.status());
// Create a listener for each resolved address.
for (auto& addr : *resolved_or) {
for (EventEngine::ResolvedAddress& addr : *results) {
// If address has a wildcard port (0), use the same port as a previous
// listener.
if (*port_num != -1 && grpc_sockaddr_get_port(&addr) == 0) {
grpc_sockaddr_set_port(&addr, *port_num);
if (*port_num != -1 &&
grpc_event_engine::experimental::ResolvedAddressGetPort(addr) == 0) {
grpc_event_engine::experimental::ResolvedAddressSetPort(addr,
*port_num);
}
int port_temp = -1;
if (IsServerListenerEnabled()) {
error =
NewChttp2ServerListener::Create(server, &addr, args, &port_temp);
error = NewChttp2ServerListener::Create(server, addr, args, &port_temp);
} else {
error = Chttp2ServerListener::Create(server, &addr, args, &port_temp);
error = Chttp2ServerListener::Create(server, addr, args, &port_temp);
}
if (!error.ok()) {
error_list.push_back(error);
Expand All @@ -1667,17 +1712,17 @@ grpc_error_handle Chttp2ServerAddPort(Server* server, const char* addr,
}
}
}
if (error_list.size() == resolved_or->size()) {
if (error_list.size() == results->size()) {
std::string msg = absl::StrFormat(
"No address added out of total %" PRIuPTR " resolved for '%s'",
resolved_or->size(), addr);
results->size(), addr);
return GRPC_ERROR_CREATE_REFERENCING(msg.c_str(), error_list.data(),
error_list.size());
} else if (!error_list.empty()) {
std::string msg = absl::StrFormat(
"Only %" PRIuPTR " addresses added out of total %" PRIuPTR
" resolved",
resolved_or->size() - error_list.size(), resolved_or->size());
std::string msg =
absl::StrFormat("Only %" PRIuPTR
" addresses added out of total %" PRIuPTR " resolved",
results->size() - error_list.size(), results->size());
error = GRPC_ERROR_CREATE_REFERENCING(msg.c_str(), error_list.data(),
error_list.size());
LOG(INFO) << "WARNING: " << StatusToString(error);
Expand Down
3 changes: 3 additions & 0 deletions src/core/lib/event_engine/resolved_address.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ EventEngine::ResolvedAddress CreateResolvedAddress(

grpc_resolved_address CreateGRPCResolvedAddress(
const EventEngine::ResolvedAddress& ra) {
static_assert(
GRPC_MAX_SOCKADDR_SIZE == EventEngine::ResolvedAddress::MAX_SIZE_BYTES,
"size should match");
grpc_resolved_address grpc_addr;
memset(&grpc_addr, 0, sizeof(grpc_resolved_address));
memcpy(grpc_addr.addr, ra.address(), ra.size());
Expand Down
16 changes: 16 additions & 0 deletions src/core/lib/event_engine/utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <algorithm>

#include "absl/strings/str_cat.h"
#include "src/core/util/notification.h"
#include "src/core/util/time.h"

namespace grpc_event_engine {
Expand All @@ -38,5 +39,20 @@ grpc_core::Timestamp ToTimestamp(grpc_core::Timestamp now,
grpc_core::Duration::Milliseconds(1);
}

absl::StatusOr<std::vector<EventEngine::ResolvedAddress>>
LookupHostnameBlocking(EventEngine::DNSResolver* dns_resolver,
absl::string_view name, absl::string_view default_port) {
absl::StatusOr<std::vector<EventEngine::ResolvedAddress>> results;
grpc_core::Notification done;
dns_resolver->LookupHostname(
[&](absl::StatusOr<std::vector<EventEngine::ResolvedAddress>> addresses) {
results = std::move(addresses);
done.Notify();
},
name, default_port);
done.WaitForNotification();
return results;
}

} // namespace experimental
} // namespace grpc_event_engine
7 changes: 7 additions & 0 deletions src/core/lib/event_engine/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
#include <stdint.h>

#include <string>
#include <vector>

#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "src/core/util/time.h"

namespace grpc_event_engine {
Expand All @@ -36,6 +39,10 @@ std::string HandleToString(const Handle& handle) {
grpc_core::Timestamp ToTimestamp(grpc_core::Timestamp now,
EventEngine::Duration delta);

absl::StatusOr<std::vector<EventEngine::ResolvedAddress>>
LookupHostnameBlocking(EventEngine::DNSResolver* dns_resolver,
absl::string_view name, absl::string_view default_port);

} // namespace experimental
} // namespace grpc_event_engine

Expand Down
Loading

0 comments on commit bd4d59a

Please sign in to comment.