Skip to content

Commit

Permalink
[xds_client_test] change test to use FuzzingEventEngine (grpc#37668)
Browse files Browse the repository at this point in the history
Closes grpc#37668

COPYBARA_INTEGRATE_REVIEW=grpc#37668 from markdroth:xds_client_test_fuzzing_ee e1fe7f5
PiperOrigin-RevId: 681150007
  • Loading branch information
markdroth authored and copybara-github committed Oct 1, 2024
1 parent 74e6403 commit 9a12ec9
Show file tree
Hide file tree
Showing 10 changed files with 269 additions and 205 deletions.
6 changes: 6 additions & 0 deletions CMakeLists.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions build_autogenerated.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -194,16 +194,23 @@ void FuzzingEventEngine::TickUntilIdle() {
<< "TickUntilIdle: "
<< GRPC_DUMP_ARGS(tasks_by_id_.size(), outstanding_reads_.load(),
outstanding_writes_.load());
if (tasks_by_id_.empty() &&
outstanding_writes_.load(std::memory_order_relaxed) == 0 &&
outstanding_reads_.load(std::memory_order_relaxed) == 0) {
return;
}
if (IsIdleLocked()) return;
}
Tick();
}
}

bool FuzzingEventEngine::IsIdle() {
grpc_core::MutexLock lock(&*mu_);
return IsIdleLocked();
}

bool FuzzingEventEngine::IsIdleLocked() {
return tasks_by_id_.empty() &&
outstanding_writes_.load(std::memory_order_relaxed) == 0 &&
outstanding_reads_.load(std::memory_order_relaxed) == 0;
}

void FuzzingEventEngine::TickUntil(Time t) {
while (true) {
auto now = Now();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class FuzzingEventEngine : public EventEngine {
ABSL_LOCKS_EXCLUDED(mu_);
// Repeatedly call Tick() until there is no more work to do.
void TickUntilIdle() ABSL_LOCKS_EXCLUDED(mu_);
// Returns true if idle.
bool IsIdle() ABSL_LOCKS_EXCLUDED(mu_);
// Tick until some time
void TickUntil(Time t) ABSL_LOCKS_EXCLUDED(mu_);
// Tick for some duration
Expand Down Expand Up @@ -296,6 +298,9 @@ class FuzzingEventEngine : public EventEngine {
int AllocatePort() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
// Is the given port in use by any listener?
bool IsPortUsed(int port) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);

bool IsIdleLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);

// For the next connection being built, query the list of fuzzer selected
// write size limits.
std::queue<size_t> WriteSizesForConnection()
Expand Down
9 changes: 7 additions & 2 deletions test/core/xds/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ grpc_cc_library(
"//:orphanable",
"//:ref_counted_ptr",
"//:xds_client",
"//test/core/event_engine/fuzzing_event_engine",
"//test/core/test_util:grpc_test_util",
],
)
Expand All @@ -166,14 +167,15 @@ grpc_cc_test(
srcs = ["xds_client_test.cc"],
external_deps = ["gtest"],
language = "C++",
shard_count = 10,
uses_event_engine = True,
uses_polling = False,
deps = [
":xds_client_test_peer",
":xds_transport_fake",
"//:xds_client",
"//src/proto/grpc/testing/xds/v3:discovery_proto",
"//test/core/event_engine:event_engine_test_utils",
"//test/core/event_engine/fuzzing_event_engine",
"//test/core/test_util:grpc_test_util",
"//test/core/test_util:scoped_env_var",
],
Expand All @@ -188,9 +190,10 @@ grpc_proto_fuzzer(
proto = "xds_client_fuzzer.proto",
proto_deps = [
"//src/proto/grpc/testing/xds/v3:discovery_proto",
"//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_proto",
],
tags = ["no_windows"],
uses_event_engine = False,
uses_event_engine = True,
uses_polling = False,
deps = [
":xds_client_test_peer",
Expand All @@ -206,6 +209,8 @@ grpc_proto_fuzzer(
"//src/proto/grpc/testing/xds/v3:endpoint_proto",
"//src/proto/grpc/testing/xds/v3:http_connection_manager_proto",
"//src/proto/grpc/testing/xds/v3:router_proto",
"//test/core/event_engine/fuzzing_event_engine",
"//test/core/event_engine:event_engine_test_utils",
],
)

Expand Down
40 changes: 29 additions & 11 deletions test/core/xds/xds_client_fuzzer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

#include <grpc/grpc.h>

#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/iomgr/timer_manager.h"
#include "src/core/util/orphanable.h"
#include "src/core/util/ref_counted_ptr.h"
#include "src/core/xds/grpc/xds_bootstrap_grpc.h"
Expand All @@ -46,31 +46,50 @@
#include "src/core/xds/xds_client/xds_client.h"
#include "src/libfuzzer/libfuzzer_macro.h"
#include "src/proto/grpc/testing/xds/v3/discovery.pb.h"
#include "test/core/event_engine/event_engine_test_utils.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
#include "test/core/xds/xds_client_fuzzer.pb.h"
#include "test/core/xds/xds_client_test_peer.h"
#include "test/core/xds/xds_transport_fake.h"

using grpc_event_engine::experimental::FuzzingEventEngine;

namespace grpc_core {

class Fuzzer {
public:
explicit Fuzzer(absl::string_view bootstrap_json) {
Fuzzer(absl::string_view bootstrap_json,
const fuzzing_event_engine::Actions& fuzzing_ee_actions) {
event_engine_ = std::make_shared<FuzzingEventEngine>(
FuzzingEventEngine::Options(), fuzzing_ee_actions);
grpc_timer_manager_set_start_threaded(false);
grpc_init();
auto bootstrap = GrpcXdsBootstrap::Create(bootstrap_json);
if (!bootstrap.ok()) {
LOG(ERROR) << "error creating bootstrap: " << bootstrap.status();
// Leave xds_client_ unset, so Act() will be a no-op.
return;
}
transport_factory_ = MakeRefCounted<FakeXdsTransportFactory>(
[]() { Crash("Multiple concurrent reads"); });
[]() { Crash("Multiple concurrent reads"); }, event_engine_);
transport_factory_->SetAutoCompleteMessagesFromClient(false);
transport_factory_->SetAbortOnUndrainedMessages(false);
xds_client_ = MakeRefCounted<XdsClient>(
std::move(*bootstrap), transport_factory_,
grpc_event_engine::experimental::GetDefaultEventEngine(),
std::move(*bootstrap), transport_factory_, event_engine_,
/*metrics_reporter=*/nullptr, "foo agent", "foo version");
}

~Fuzzer() {
transport_factory_.reset();
xds_client_.reset();
event_engine_->FuzzingDone();
event_engine_->TickUntilIdle();
event_engine_->UnsetGlobalHooks();
grpc_event_engine::experimental::WaitForSingleOwner(
std::move(event_engine_));
grpc_shutdown_blocking();
}

void Act(const xds_client_fuzzer::Action& action) {
if (xds_client_ == nullptr) return;
switch (action.action_type_case()) {
Expand Down Expand Up @@ -276,8 +295,7 @@ class Fuzzer {
if (xds_server == nullptr) return nullptr;
const char* method = StreamIdMethod(stream_id);
if (method == nullptr) return nullptr;
return transport_factory_->WaitForStream(*xds_server, method,
absl::ZeroDuration());
return transport_factory_->WaitForStream(*xds_server, method);
}

static std::string StreamIdString(
Expand All @@ -293,7 +311,7 @@ class Fuzzer {
auto stream = GetStream(stream_id);
if (stream == nullptr) return;
LOG(INFO) << " stream=" << stream.get();
auto message = stream->WaitForMessageFromClient(absl::ZeroDuration());
auto message = stream->WaitForMessageFromClient();
if (message.has_value()) {
LOG(INFO) << " completing send_message";
stream->CompleteSendMessageFromClient(ok);
Expand All @@ -320,6 +338,7 @@ class Fuzzer {
stream->MaybeSendStatusToClient(std::move(status));
}

std::shared_ptr<FuzzingEventEngine> event_engine_;
RefCountedPtr<XdsClient> xds_client_;
RefCountedPtr<FakeXdsTransportFactory> transport_factory_;

Expand All @@ -336,10 +355,9 @@ class Fuzzer {
bool squelch = true;

DEFINE_PROTO_FUZZER(const xds_client_fuzzer::Msg& message) {
grpc_init();
grpc_core::Fuzzer fuzzer(message.bootstrap());
grpc_core::Fuzzer fuzzer(message.bootstrap(),
message.fuzzing_event_engine_actions());
for (int i = 0; i < message.actions_size(); i++) {
fuzzer.Act(message.actions(i));
}
grpc_shutdown();
}
2 changes: 2 additions & 0 deletions test/core/xds/xds_client_fuzzer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ syntax = "proto3";
package xds_client_fuzzer;

import "src/proto/grpc/testing/xds/v3/discovery.proto";
import "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto";

// We'd ideally like to use google.rpc.Status instead of creating our
// own proto for this, but that winds up causing all sorts of dependency
Expand Down Expand Up @@ -120,4 +121,5 @@ message Action {
message Msg {
string bootstrap = 1;
repeated Action actions = 2;
fuzzing_event_engine.Actions fuzzing_event_engine_actions = 3;
}
Loading

0 comments on commit 9a12ec9

Please sign in to comment.