Skip to content

Commit

Permalink
[EventEngine] Add public methods to allow EventEngine Endpoints to su…
Browse files Browse the repository at this point in the history
…pport optional Extensions.

PiperOrigin-RevId: 587071965
  • Loading branch information
Vignesh2208 authored and copybara-github committed Dec 1, 2023
1 parent 49f7ee9 commit 8467882
Show file tree
Hide file tree
Showing 10 changed files with 319 additions and 0 deletions.
35 changes: 35 additions & 0 deletions CMakeLists.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions build_autogenerated.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 39 additions & 0 deletions include/grpc/event_engine/event_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,45 @@ class EventEngine : public std::enable_shared_from_this<EventEngine> {
/// values are expected to remain valid for the life of the Endpoint.
virtual const ResolvedAddress& GetPeerAddress() const = 0;
virtual const ResolvedAddress& GetLocalAddress() const = 0;

/// A method which allows users to query whether an Endpoint implementation
/// supports a specified extension. The name of the extension is provided
/// as an input.
///
/// An extension could be any type with a unique string id. Each extension
/// may support additional capabilities and if the Endpoint implementation
/// supports the queried extension, it should return a valid pointer to the
/// extension type.
///
/// E.g., use case of an EventEngine::Endpoint supporting a custom
/// extension.
///
/// class CustomEndpointExtension {
/// public:
/// static constexpr std::string name = "my.namespace.extension_name";
/// void Process() { ... }
/// }
///
///
/// class CustomEndpoint :
/// public EventEngine::Endpoint, CustomEndpointExtension {
/// public:
/// void* QueryExtension(absl::string_view id) override {
/// if (id == CustomEndpointExtension::name) {
/// return static_cast<CustomEndpointExtension*>(this);
/// }
/// return nullptr;
/// }
/// ...
/// }
///
/// auto ext_ =
/// static_cast<CustomEndpointExtension*>(
/// endpoint->QueryExtension(CustomrEndpointExtension::name));
/// if (ext_ != nullptr) { ext_->Process(); }
///
///
virtual void* QueryExtension(absl::string_view /*id*/) { return nullptr; }
};

/// Called when a new connection is established.
Expand Down
12 changes: 12 additions & 0 deletions src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -1540,6 +1540,18 @@ grpc_cc_library(
],
)

grpc_cc_library(
name = "event_engine_query_extensions",
hdrs = [
"lib/event_engine/query_extensions.h",
],
external_deps = ["absl/strings"],
deps = [
"//:event_engine_base_hdrs",
"//:gpr_platform",
],
)

grpc_cc_library(
name = "event_engine_work_queue",
hdrs = [
Expand Down
70 changes: 70 additions & 0 deletions src/core/lib/event_engine/query_extensions.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2023 gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_QUERY_EXTENSIONS_H
#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_QUERY_EXTENSIONS_H

#include <grpc/support/port_platform.h>

#include "absl/strings/string_view.h"

#include <grpc/event_engine/event_engine.h>

namespace grpc_event_engine {
namespace experimental {

namespace endpoint_detail {

template <typename Querying, typename... Es>
struct QueryExtensionRecursion;

template <typename Querying, typename E, typename... Es>
struct QueryExtensionRecursion<Querying, E, Es...> {
static void* Query(absl::string_view id, Querying* p) {
if (id == E::EndpointExtensionName()) return static_cast<E*>(p);
return QueryExtensionRecursion<Querying, Es...>::Query(id, p);
}
};

template <typename Querying>
struct QueryExtensionRecursion<Querying> {
static void* Query(absl::string_view, Querying*) { return nullptr; }
};

} // namespace endpoint_detail

// A helper class to derive from some set of base classes and export
// QueryExtension for them all.
// Endpoint implementations which need to support different extensions just need
// to derive from ExtendedEndpoint class.
template <typename... Exports>
class ExtendedEndpoint : public EventEngine::Endpoint, public Exports... {
public:
void* QueryExtension(absl::string_view id) override {
return endpoint_detail::QueryExtensionRecursion<ExtendedEndpoint,
Exports...>::Query(id,
this);
}
};

/// A helper method which returns a valid pointer if the extension is supported
/// by the endpoint.
template <typename T>
T* QueryExtension(EventEngine::Endpoint* endpoint) {
return static_cast<T*>(endpoint->QueryExtension(T::EndpointExtensionName()));
}

} // namespace experimental
} // namespace grpc_event_engine

#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_QUERY_EXTENSIONS_H
13 changes: 13 additions & 0 deletions src/core/lib/iomgr/event_engine_shims/endpoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class EventEngineEndpointWrapper {
explicit EventEngineEndpointWrapper(
std::unique_ptr<EventEngine::Endpoint> endpoint);

EventEngine::Endpoint* endpoint() { return endpoint_.get(); }

int Fd() {
grpc_core::MutexLock lock(&mu_);
return fd_;
Expand Down Expand Up @@ -428,6 +430,17 @@ bool grpc_is_event_engine_endpoint(grpc_endpoint* ep) {
return ep->vtable == &grpc_event_engine_endpoint_vtable;
}

EventEngine::Endpoint* grpc_get_wrapped_event_engine_endpoint(
grpc_endpoint* ep) {
if (!grpc_is_event_engine_endpoint(ep)) {
return nullptr;
}
auto* eeep =
reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
ep);
return eeep->wrapper->endpoint();
}

void grpc_event_engine_endpoint_destroy_and_release_fd(
grpc_endpoint* ep, int* fd, grpc_closure* on_release_fd) {
auto* eeep =
Expand Down
5 changes: 5 additions & 0 deletions src/core/lib/iomgr/event_engine_shims/endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ grpc_endpoint* grpc_event_engine_endpoint_create(
/// Returns true if the passed endpoint is an event engine shim endpoint.
bool grpc_is_event_engine_endpoint(grpc_endpoint* ep);

/// Returns the wrapped event engine endpoint if the given grpc_endpoint is an
/// event engine shim endpoint. Otherwise it returns nullptr.
EventEngine::Endpoint* grpc_get_wrapped_event_engine_endpoint(
grpc_endpoint* ep);

/// Destroys the passed in event engine shim endpoint and schedules the
/// asynchronous execution of the on_release_fd callback. The int pointer fd is
/// set to the underlying endpoint's file descriptor.
Expand Down
13 changes: 13 additions & 0 deletions test/core/event_engine/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,16 @@ grpc_cc_library(
"//src/core:time",
],
)

grpc_cc_test(
name = "query_extensions_test",
srcs = ["query_extensions_test.cc"],
external_deps = ["gtest"],
language = "C++",
uses_event_engine = False,
uses_polling = False,
deps = [
"//:gpr_platform",
"//src/core:event_engine_query_extensions",
],
)
95 changes: 95 additions & 0 deletions test/core/event_engine/query_extensions_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>

#include "src/core/lib/event_engine/query_extensions.h"

#include <string>

#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "gtest/gtest.h"

#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/slice_buffer.h>

#include "src/core/lib/gprpp/crash.h"

namespace grpc_event_engine {
namespace experimental {
namespace {

template <int i>
class TestExtension {
public:
TestExtension() = default;
~TestExtension() = default;

static std::string EndpointExtensionName() {
return "grpc.test.test_extension" + std::to_string(i);
}

int GetValue() const { return val_; }

private:
int val_ = i;
};

class ExtendedTestEndpoint
: public ExtendedEndpoint<TestExtension<0>, TestExtension<1>,
TestExtension<2>> {
public:
ExtendedTestEndpoint() = default;
~ExtendedTestEndpoint() override = default;
bool Read(absl::AnyInvocable<void(absl::Status)> /*on_read*/,
SliceBuffer* /*buffer*/, const ReadArgs* /*args*/) override {
grpc_core::Crash("Not implemented");
};
bool Write(absl::AnyInvocable<void(absl::Status)> /*on_writable*/,
SliceBuffer* /*data*/, const WriteArgs* /*args*/) override {
grpc_core::Crash("Not implemented");
}
/// Returns an address in the format described in DNSResolver. The returned
/// values are expected to remain valid for the life of the Endpoint.
const EventEngine::ResolvedAddress& GetPeerAddress() const override {
grpc_core::Crash("Not implemented");
}
const EventEngine::ResolvedAddress& GetLocalAddress() const override {
grpc_core::Crash("Not implemented");
};
};

TEST(QueryExtensionsTest, EndpointSupportsMultipleExtensions) {
ExtendedTestEndpoint endpoint;
TestExtension<0>* extension_0 = QueryExtension<TestExtension<0>>(&endpoint);
TestExtension<1>* extension_1 = QueryExtension<TestExtension<1>>(&endpoint);
TestExtension<2>* extension_2 = QueryExtension<TestExtension<2>>(&endpoint);

EXPECT_NE(extension_0, nullptr);
EXPECT_NE(extension_1, nullptr);
EXPECT_NE(extension_2, nullptr);

EXPECT_EQ(extension_0->GetValue(), 0);
EXPECT_EQ(extension_1->GetValue(), 1);
EXPECT_EQ(extension_2->GetValue(), 2);
}
} // namespace

} // namespace experimental
} // namespace grpc_event_engine

int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
24 changes: 24 additions & 0 deletions tools/run_tests/generated/tests.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 8467882

Please sign in to comment.