Skip to content

Commit

Permalink
[EventEngine] Replace ApplicationCallbackExecCtx with EventEngine::Run (
Browse files Browse the repository at this point in the history
grpc#37579)

WIP / experiment.

Closes grpc#37579

COPYBARA_INTEGRATE_REVIEW=grpc#37579 from drfloob:simple-rm-acec-2 44d5340
PiperOrigin-RevId: 668972280
  • Loading branch information
drfloob authored and copybara-github committed Aug 29, 2024
1 parent be26969 commit b69291a
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 2 deletions.
1 change: 1 addition & 0 deletions bazel/experiments.bzl

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions src/core/lib/experiments/experiments.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions src/core/lib/experiments/experiments.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions src/core/lib/experiments/experiments.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@
owner: [email protected]
test_tags: []
allow_in_fuzzing_config: false
- name: event_engine_application_callbacks
description: Run application callbacks in EventEngine threads, instead of on the thread-local ApplicationCallbackExecCtx
expiry: 2024/10/31
owner: [email protected]
- name: event_engine_client
description: Use EventEngine clients instead of iomgr's grpc_tcp_client
expiry: 2024/10/01
Expand Down
2 changes: 2 additions & 0 deletions src/core/lib/experiments/rollouts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
windows: broken
- name: client_privacy
default: false
- name: event_engine_application_callbacks
default: true
- name: event_engine_client
default:
# not tested on iOS at all
Expand Down
26 changes: 24 additions & 2 deletions src/core/lib/surface/completion_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include <grpc/support/sync.h>
#include <grpc/support/time.h>

#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/atomic_utils.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted.h"
Expand Down Expand Up @@ -317,7 +318,9 @@ struct cq_pluck_data {

struct cq_callback_data {
explicit cq_callback_data(grpc_completion_queue_functor* shutdown_callback)
: shutdown_callback(shutdown_callback) {}
: shutdown_callback(shutdown_callback),
event_engine(grpc_event_engine::experimental::GetDefaultEventEngine()) {
}

~cq_callback_data() {
#ifndef NDEBUG
Expand All @@ -338,6 +341,8 @@ struct cq_callback_data {

/// A callback that gets invoked when the CQ completes shutdown
grpc_completion_queue_functor* shutdown_callback;

std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine;
};

} // namespace
Expand Down Expand Up @@ -847,13 +852,22 @@ static void cq_end_op_for_callback(
cq_finish_shutdown_callback(cq);
}

auto* functor = static_cast<grpc_completion_queue_functor*>(tag);
if (grpc_core::IsEventEngineApplicationCallbacksEnabled()) {
// Run the callback on EventEngine threads.
cqd->event_engine->Run(
[engine = cqd->event_engine, functor, ok = error.ok()]() {
grpc_core::ExecCtx exec_ctx;
(*functor->functor_run)(functor, ok);
});
return;
}
// If possible, schedule the callback onto an existing thread-local
// ApplicationCallbackExecCtx, which is a work queue. This is possible for:
// 1. The callback is internally-generated and there is an ACEC available
// 2. The callback is marked inlineable and there is an ACEC available
// 3. We are already running in a background poller thread (which always has
// an ACEC available at the base of the stack).
auto* functor = static_cast<grpc_completion_queue_functor*>(tag);
if (((internal || functor->inlineable) &&
grpc_core::ApplicationCallbackExecCtx::Available()) ||
grpc_iomgr_is_any_background_poller_thread()) {
Expand Down Expand Up @@ -1334,6 +1348,14 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
CHECK(cqd->shutdown_called);

cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);

if (grpc_core::IsEventEngineApplicationCallbacksEnabled()) {
cqd->event_engine->Run([engine = cqd->event_engine, callback]() {
grpc_core::ExecCtx exec_ctx;
callback->functor_run(callback, /*true=*/1);
});
return;
}
if (grpc_iomgr_is_any_background_poller_thread()) {
grpc_core::ApplicationCallbackExecCtx::Enqueue(callback, true);
return;
Expand Down

0 comments on commit b69291a

Please sign in to comment.