Skip to content

Commit

Permalink
Add test for streaming and retries
Browse files Browse the repository at this point in the history
  • Loading branch information
yashykt committed Jan 11, 2025
1 parent e6365dc commit 6ca649b
Showing 1 changed file with 115 additions and 3 deletions.
118 changes: 115 additions & 3 deletions test/cpp/ext/otel/otel_tracing_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ class OTelTracingTest : public ::testing::Test {
&port);
builder.RegisterService(&service_);
server_ = builder.BuildAndStart();
std::string server_address = absl::StrCat("localhost:", port);
auto channel =
grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials());
server_address_ = absl::StrCat("localhost:", port);
auto channel = grpc::CreateChannel(server_address_,
grpc::InsecureChannelCredentials());
stub_ = EchoTestService::NewStub(channel);
}

Expand All @@ -94,6 +94,7 @@ class OTelTracingTest : public ::testing::Test {
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Tracer> tracer_;
std::shared_ptr<opentelemetry::exporter::memory::InMemorySpanData> data_;
CallbackTestServiceImpl service_;
std::string server_address_;
std::unique_ptr<grpc::Server> server_;
std::unique_ptr<EchoTestService::Stub> stub_;
};
Expand Down Expand Up @@ -350,6 +351,117 @@ TEST_F(OTelTracingTest, FailedStatus) {
MatchesRegex("UNAVAILABLE:.*test message.*"));
}

TEST_F(OTelTracingTest, Streaming) {
{
EchoRequest request;
request.set_message("foo");
EchoResponse response;
grpc::ClientContext context;
auto stream = stub_->BidiStream(&context);
for (int i = 0; i < 10; ++i) {
EXPECT_TRUE(stream->Write(request));
EXPECT_TRUE(stream->Read(&response));
}
stream->WritesDone();
EXPECT_TRUE(stream->Finish().ok());
}
absl::SleepFor(absl::Milliseconds(500 * grpc_test_slowdown_factor()));
auto spans = data_->GetSpans();
EXPECT_EQ(spans.size(), 3);
const auto attempt_span = std::find_if(
spans.begin(), spans.end(), [](const std::unique_ptr<SpanData>& span) {
return span->GetName() ==
"Attempt.grpc.testing.EchoTestService/BidiStream";
});
EXPECT_NE(attempt_span, spans.end());
// Verify messages on the attempt span
std::vector<uint64_t> outbound_seq_nums;
std::vector<uint64_t> inbound_seq_nums;
for (const auto& event : (*attempt_span)->GetEvents()) {
if (event.GetName() == "Outbound message") {
outbound_seq_nums.push_back(
std::get<uint64_t>(event.GetAttributes().at("sequence-number")));
}
if (event.GetName() == "Inbound message") {
inbound_seq_nums.push_back(
std::get<uint64_t>(event.GetAttributes().at("sequence-number")));
}
}
EXPECT_THAT(outbound_seq_nums,
UnorderedElementsAre(0, 1, 2, 3, 4, 5, 6, 7, 8, 9));
EXPECT_THAT(inbound_seq_nums,
UnorderedElementsAre(0, 1, 2, 3, 4, 5, 6, 7, 8, 9));
const auto server_span = std::find_if(
spans.begin(), spans.end(), [](const std::unique_ptr<SpanData>& span) {
return span->GetName() ==
"Recv.grpc.testing.EchoTestService/BidiStream";
});
outbound_seq_nums.clear();
inbound_seq_nums.clear();
// Verify messages on the server span
for (const auto& event : (*server_span)->GetEvents()) {
if (event.GetName() == "Outbound message") {
outbound_seq_nums.push_back(
std::get<uint64_t>(event.GetAttributes().at("sequence-number")));
}
if (event.GetName() == "Inbound message") {
inbound_seq_nums.push_back(
std::get<uint64_t>(event.GetAttributes().at("sequence-number")));
}
}
EXPECT_THAT(outbound_seq_nums,
UnorderedElementsAre(0, 1, 2, 3, 4, 5, 6, 7, 8, 9));
EXPECT_THAT(inbound_seq_nums,
UnorderedElementsAre(0, 1, 2, 3, 4, 5, 6, 7, 8, 9));
EXPECT_NE(server_span, spans.end());
}

TEST_F(OTelTracingTest, Retries) {
{
ChannelArguments args;
args.SetString(GRPC_ARG_SERVICE_CONFIG,
"{\n"
" \"methodConfig\": [ {\n"
" \"name\": [\n"
" { \"service\": \"grpc.testing.EchoTestService\" }\n"
" ],\n"
" \"retryPolicy\": {\n"
" \"maxAttempts\": 3,\n"
" \"initialBackoff\": \"0.1s\",\n"
" \"maxBackoff\": \"120s\",\n"
" \"backoffMultiplier\": 1,\n"
" \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
" }\n"
" } ]\n"
"}");
auto channel = CreateCustomChannel(server_address_,
InsecureChannelCredentials(), args);
auto stub = EchoTestService::NewStub(channel);
EchoRequest request;
request.set_message("foo");
request.mutable_param()->mutable_expected_error()->set_code(
StatusCode::ABORTED);
EchoResponse response;
grpc::ClientContext context;
grpc::Status status = stub->Echo(&context, request, &response);
}
absl::SleepFor(absl::Milliseconds(500 * grpc_test_slowdown_factor()));
auto spans = data_->GetSpans();
EXPECT_EQ(spans.size(), 7); // 1 client span, 3 attempt spans, 3 server spans
std::vector<uint64_t> attempt_seq_nums;
uint64_t server_span_count = 0;
for (const auto& span : spans) {
if (span->GetName() == "Attempt.grpc.testing.EchoTestService/Echo") {
attempt_seq_nums.push_back(std::get<uint64_t>(
span->GetAttributes().at("previous-rpc-attempts")));
} else if (span->GetName() == "Recv.grpc.testing.EchoTestService/Echo") {
++server_span_count;
}
}
EXPECT_THAT(attempt_seq_nums, UnorderedElementsAre(0, 1, 2));
EXPECT_EQ(server_span_count, 3);
}

} // namespace
} // namespace testing
} // namespace grpc
Expand Down

0 comments on commit 6ca649b

Please sign in to comment.