Skip to content

Commit

Permalink
x
Browse files Browse the repository at this point in the history
  • Loading branch information
ctiller committed Jan 27, 2025
2 parents d7b129b + 682ae95 commit 57982ce
Show file tree
Hide file tree
Showing 103 changed files with 227 additions and 286 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions build_autogenerated.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,10 @@ grpc_cc_library(
grpc_cc_library(
name = "prioritized_race",
public_hdrs = ["lib/promise/prioritized_race.h"],
deps = ["//:gpr_platform"],
deps = [
"promise_like",
"//:gpr_platform",
],
)

grpc_cc_library(
Expand Down
23 changes: 12 additions & 11 deletions src/core/lib/promise/prioritized_race.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

#include <utility>

#include "src/core/lib/promise/detail/promise_like.h"

namespace grpc_core {

template <typename A, typename B>
Expand All @@ -34,21 +36,20 @@ class TwoPartyPrioritizedRace {
auto p = a_();
if (p.ready()) return p;
// Check the other promise.
p = b_();
if (p.ready()) {
// re-poll a to see if it's also completed.
auto q = a_();
if (q.ready()) {
// both are ready, but a is prioritized
return q;
}
auto q = b_();
if (!q.ready()) return q;
// re-poll a to see if it's also completed.
auto r = a_();
if (r.ready()) {
// both are ready, but a is prioritized
return r;
}
return p;
return q;
}

private:
A a_;
B b_;
promise_detail::PromiseLike<A> a_;
promise_detail::PromiseLike<B> b_;
};

/// Run all the promises until one is non-pending.
Expand Down
10 changes: 10 additions & 0 deletions src/core/lib/transport/call_spine.h
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,11 @@ class CallInitiator {
return spine_->OnDone(std::move(fn));
}

template <typename Promise>
auto UntilCallCompletes(Promise promise) {
return spine_->UntilCallCompletes(std::move(promise));
}

template <typename PromiseFactory>
void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory) {
spine_->SpawnGuarded(name, std::move(promise_factory));
Expand Down Expand Up @@ -458,6 +463,11 @@ class CallHandler {
return spine_->call_filters().WasCancelledPushed();
}

template <typename Promise>
auto UntilCallCompletes(Promise promise) {
return spine_->UntilCallCompletes(std::move(promise));
}

template <typename PromiseFactory>
void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory,
DebugLocation whence = {}) {
Expand Down
120 changes: 60 additions & 60 deletions src/core/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1085,71 +1085,71 @@ absl::StatusOr<ClientMetadataHandle> CheckClientMetadata(
}
} // namespace

auto Server::MatchRequestAndMaybeReadFirstMessage(CallHandler call_handler,
ClientMetadataHandle md) {
auto* registered_method = static_cast<RegisteredMethod*>(
md->get(GrpcRegisteredMethod()).value_or(nullptr));
RequestMatcherInterface* rm;
grpc_server_register_method_payload_handling payload_handling =
GRPC_SRM_PAYLOAD_NONE;
if (registered_method == nullptr) {
rm = unregistered_request_matcher_.get();
} else {
payload_handling = registered_method->payload_handling;
rm = registered_method->matcher.get();
}
using FirstMessageResult = ValueOrFailure<std::optional<MessageHandle>>;
auto maybe_read_first_message = If(
payload_handling == GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER,
[call_handler]() mutable {
return Map(
call_handler.PullMessage(),
[](ClientToServerNextMessage next_msg) -> FirstMessageResult {
if (!next_msg.ok()) return Failure{};
if (!next_msg.has_value()) {
return FirstMessageResult(std::nullopt);
}
return FirstMessageResult(next_msg.TakeValue());
});
},
[]() -> FirstMessageResult { return FirstMessageResult(std::nullopt); });
return TryJoin<absl::StatusOr>(
std::move(maybe_read_first_message), rm->MatchRequest(0),
[md = std::move(md)]() mutable {
return ValueOrFailure<ClientMetadataHandle>(std::move(md));
});
}

auto Server::MatchAndPublishCall(CallHandler call_handler) {
call_handler.SpawnGuardedUntilCallCompletes(
"request_matcher", [this, call_handler]() mutable {
return TrySeq(
call_handler.SpawnGuarded("request_matcher", [this, call_handler]() mutable {
return TrySeq(
call_handler.UntilCallCompletes(TrySeq(
// Wait for initial metadata to pass through all filters
Map(call_handler.PullClientInitialMetadata(), CheckClientMetadata),
// Match request with requested call
[this, call_handler](ClientMetadataHandle md) mutable {
auto* registered_method = static_cast<RegisteredMethod*>(
md->get(GrpcRegisteredMethod()).value_or(nullptr));
RequestMatcherInterface* rm;
grpc_server_register_method_payload_handling payload_handling =
GRPC_SRM_PAYLOAD_NONE;
if (registered_method == nullptr) {
rm = unregistered_request_matcher_.get();
} else {
payload_handling = registered_method->payload_handling;
rm = registered_method->matcher.get();
}
using FirstMessageResult =
ValueOrFailure<std::optional<MessageHandle>>;
auto maybe_read_first_message = If(
payload_handling == GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER,
[call_handler]() mutable {
return Map(
call_handler.PullMessage(),
[](ClientToServerNextMessage next_msg)
-> FirstMessageResult {
if (!next_msg.ok()) return Failure{};
if (!next_msg.has_value()) {
return FirstMessageResult(std::nullopt);
}
return FirstMessageResult(next_msg.TakeValue());
});
},
[]() -> FirstMessageResult {
return FirstMessageResult(std::nullopt);
});
return TryJoin<absl::StatusOr>(
std::move(maybe_read_first_message), rm->MatchRequest(0),
[md = std::move(md)]() mutable {
return ValueOrFailure<ClientMetadataHandle>(std::move(md));
});
},
// Publish call to cq
[call_handler,
this](std::tuple<std::optional<MessageHandle>,
RequestMatcherInterface::MatchResult,
ClientMetadataHandle>
r) {
RequestMatcherInterface::MatchResult& mr = std::get<1>(r);
auto md = std::move(std::get<2>(r));
auto* rc = mr.TakeCall();
rc->Complete(std::move(std::get<0>(r)), *md);
grpc_call* call =
MakeServerCall(call_handler, std::move(md), this,
rc->cq_bound_to_call, rc->initial_metadata);
*rc->call = call;
return Map(
WaitForCqEndOp(false, rc->tag, absl::OkStatus(), mr.cq()),
[rc = std::unique_ptr<RequestedCall>(rc)](Empty) {
return absl::OkStatus();
});
});
});
return MatchRequestAndMaybeReadFirstMessage(
std::move(call_handler), std::move(md));
})),
// Publish call to cq
[call_handler, this](std::tuple<std::optional<MessageHandle>,
RequestMatcherInterface::MatchResult,
ClientMetadataHandle>
r) {
RequestMatcherInterface::MatchResult& mr = std::get<1>(r);
auto md = std::move(std::get<2>(r));
auto* rc = mr.TakeCall();
rc->Complete(std::move(std::get<0>(r)), *md);
grpc_call* call =
MakeServerCall(call_handler, std::move(md), this,
rc->cq_bound_to_call, rc->initial_metadata);
*rc->call = call;
return Map(WaitForCqEndOp(false, rc->tag, absl::OkStatus(), mr.cq()),
[rc = std::unique_ptr<RequestedCall>(rc)](Empty) {
return absl::OkStatus();
});
});
});
}

absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>
Expand Down
7 changes: 7 additions & 0 deletions src/core/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,13 @@ class Server : public ServerInterface,
return shutdown_refs_.load(std::memory_order_acquire) == 0;
}

// Returns a promise that resolves to
// tuple<
// optional<MessageHandle>,
// RequestMatcherInterface::MatchResult,
// ClientMetadataHandle>
auto MatchRequestAndMaybeReadFirstMessage(CallHandler call_handler,
ClientMetadataHandle md);
auto MatchAndPublishCall(CallHandler call_handler);
absl::StatusOr<RefCountedPtr<UnstartedCallDestination>> MakeCallDestination(
const ChannelArgs& args);
Expand Down
42 changes: 36 additions & 6 deletions test/core/end2end/fuzzers/connector_fuzzer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <google/protobuf/text_format.h>

#include <memory>

#include "fuzztest/fuzztest.h"
#include "gtest/gtest.h"
#include "src/core/ext/transport/chttp2/client/chttp2_connector.h"
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
Expand All @@ -30,9 +33,6 @@
#include "test/core/test_util/fuzz_config_vars.h"
#include "test/core/test_util/test_config.h"

bool squelch = true;
bool leak_check = true;

using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig;
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::FuzzingEventEngine;
Expand Down Expand Up @@ -171,9 +171,6 @@ void RunConnectorFuzzer(
absl::FunctionRef<RefCountedPtr<grpc_channel_security_connector>()>
make_security_connector,
absl::FunctionRef<OrphanablePtr<SubchannelConnector>()> make_connector) {
if (squelch && !GetEnv("GRPC_TRACE_FUZZER").has_value()) {
grpc_disable_all_absl_logs();
}
static const int once = []() {
ForceEnableExperiment("event_engine_client", true);
ForceEnableExperiment("event_engine_listener", true);
Expand All @@ -185,6 +182,12 @@ void RunConnectorFuzzer(
ConnectorFuzzer(msg, make_security_connector, make_connector).Run();
}

auto ParseTestProto(const std::string& proto) {
fuzzer_input::Msg msg;
CHECK(google::protobuf::TextFormat::ParseFromString(proto, &msg));
return msg;
}

void Chttp2(fuzzer_input::Msg msg) {
RunConnectorFuzzer(
msg, []() { return RefCountedPtr<grpc_channel_security_connector>(); },
Expand All @@ -205,5 +208,32 @@ void Chttp2Fakesec(fuzzer_input::Msg msg) {
}
FUZZ_TEST(ConnectorFuzzers, Chttp2Fakesec);

TEST(ConnectorFuzzers, Chttp2FakesecTimeout1) {
Chttp2Fakesec(ParseTestProto(R"pb(network_input {
input_segments {
segments { delay_ms: 1 }
segments {
delay_ms: 1
chaotic_good {
known_type: SETTINGS
payload_empty_of_length: 2147483647
}
}
}
connect_delay_ms: -1603816748
connect_timeout_ms: 3
}
event_engine_actions {
run_delay: 1
assign_ports: 1
assign_ports: 2147483647
connections {}
}
config_vars {
verbosity: ""
experiments: 9223372036854775807
})pb"));
}

} // namespace
} // namespace grpc_core
7 changes: 4 additions & 3 deletions test/core/end2end/fuzzers/network_input.cc
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,10 @@ SliceBuffer ChaoticGoodFrame(const fuzzer_input::ChaoticGoodFrame& frame) {
suffix.Append(Slice::FromCopiedString(frame.payload_raw_bytes()));
break;
case fuzzer_input::ChaoticGoodFrame::kPayloadEmptyOfLength:
h.payload_length = frame.payload_empty_of_length();
suffix.Append(Slice::FromCopiedString(
std::string(frame.payload_empty_of_length(), 'a')));
h.payload_length =
std::min<uint32_t>(65536, frame.payload_empty_of_length());
suffix.Append(
Slice::FromCopiedString(std::string(h.payload_length, 'a')));
break;
case fuzzer_input::ChaoticGoodFrame::kPayloadOtherConnectionId:
h.payload_connection_id =
Expand Down
6 changes: 0 additions & 6 deletions test/core/end2end/fuzzers/server_fuzzer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@
#include "test/core/test_util/fuzz_config_vars.h"
#include "test/core/test_util/test_config.h"

bool squelch = true;
bool leak_check = true;

namespace grpc_core {
namespace testing {

Expand Down Expand Up @@ -97,9 +94,6 @@ void RunServerFuzzer(
const fuzzer_input::Msg& msg,
absl::FunctionRef<void(grpc_server*, int, const ChannelArgs&)>
server_setup) {
if (squelch && !GetEnv("GRPC_TRACE_FUZZER").has_value()) {
grpc_disable_all_absl_logs();
}
static const int once = []() {
ForceEnableExperiment("event_engine_client", true);
ForceEnableExperiment("event_engine_listener", true);
Expand Down
3 changes: 0 additions & 3 deletions test/core/end2end/tests/timeout_before_request_call.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ namespace grpc_core {
namespace {

CORE_END2END_TEST(CoreDeadlineTests, TimeoutBeforeRequestCall) {
SKIP_IF_V3();
auto c = NewClientCall("/foo").Timeout(Duration::Seconds(1)).Create();
IncomingStatusOnClient server_status;
IncomingMetadata server_initial_metadata;
Expand Down Expand Up @@ -73,7 +72,6 @@ CORE_END2END_TEST(CoreDeadlineTests, TimeoutBeforeRequestCall) {

CORE_END2END_TEST(CoreDeadlineTests,
TimeoutBeforeRequestCallWithRegisteredMethod) {
SKIP_IF_V3();
auto method = RegisterServerMethod("/foo", GRPC_SRM_PAYLOAD_NONE);

auto c = NewClientCall("/foo").Timeout(Duration::Seconds(1)).Create();
Expand Down Expand Up @@ -118,7 +116,6 @@ CORE_END2END_TEST(CoreDeadlineTests,

CORE_END2END_TEST(CoreDeadlineSingleHopTests,
TimeoutBeforeRequestCallWithRegisteredMethodWithPayload) {
SKIP_IF_V3();
auto method =
RegisterServerMethod("/foo", GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER);

Expand Down
Loading

0 comments on commit 57982ce

Please sign in to comment.