From 6ca649b0a04ffb9589596e946cbc57c67a8b2bf2 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Sat, 11 Jan 2025 02:27:07 +0000 Subject: [PATCH] Add test for streaming and retries --- test/cpp/ext/otel/otel_tracing_test.cc | 118 ++++++++++++++++++++++++- 1 file changed, 115 insertions(+), 3 deletions(-) diff --git a/test/cpp/ext/otel/otel_tracing_test.cc b/test/cpp/ext/otel/otel_tracing_test.cc index 782ee8e790d96..f94f751774ecd 100644 --- a/test/cpp/ext/otel/otel_tracing_test.cc +++ b/test/cpp/ext/otel/otel_tracing_test.cc @@ -69,9 +69,9 @@ class OTelTracingTest : public ::testing::Test { &port); builder.RegisterService(&service_); server_ = builder.BuildAndStart(); - std::string server_address = absl::StrCat("localhost:", port); - auto channel = - grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials()); + server_address_ = absl::StrCat("localhost:", port); + auto channel = grpc::CreateChannel(server_address_, + grpc::InsecureChannelCredentials()); stub_ = EchoTestService::NewStub(channel); } @@ -94,6 +94,7 @@ class OTelTracingTest : public ::testing::Test { opentelemetry::nostd::shared_ptr tracer_; std::shared_ptr data_; CallbackTestServiceImpl service_; + std::string server_address_; std::unique_ptr server_; std::unique_ptr stub_; }; @@ -350,6 +351,117 @@ TEST_F(OTelTracingTest, FailedStatus) { MatchesRegex("UNAVAILABLE:.*test message.*")); } +TEST_F(OTelTracingTest, Streaming) { + { + EchoRequest request; + request.set_message("foo"); + EchoResponse response; + grpc::ClientContext context; + auto stream = stub_->BidiStream(&context); + for (int i = 0; i < 10; ++i) { + EXPECT_TRUE(stream->Write(request)); + EXPECT_TRUE(stream->Read(&response)); + } + stream->WritesDone(); + EXPECT_TRUE(stream->Finish().ok()); + } + absl::SleepFor(absl::Milliseconds(500 * grpc_test_slowdown_factor())); + auto spans = data_->GetSpans(); + EXPECT_EQ(spans.size(), 3); + const auto attempt_span = std::find_if( + spans.begin(), spans.end(), [](const std::unique_ptr& span) { + return span->GetName() == + "Attempt.grpc.testing.EchoTestService/BidiStream"; + }); + EXPECT_NE(attempt_span, spans.end()); + // Verify messages on the attempt span + std::vector outbound_seq_nums; + std::vector inbound_seq_nums; + for (const auto& event : (*attempt_span)->GetEvents()) { + if (event.GetName() == "Outbound message") { + outbound_seq_nums.push_back( + std::get(event.GetAttributes().at("sequence-number"))); + } + if (event.GetName() == "Inbound message") { + inbound_seq_nums.push_back( + std::get(event.GetAttributes().at("sequence-number"))); + } + } + EXPECT_THAT(outbound_seq_nums, + UnorderedElementsAre(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)); + EXPECT_THAT(inbound_seq_nums, + UnorderedElementsAre(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)); + const auto server_span = std::find_if( + spans.begin(), spans.end(), [](const std::unique_ptr& span) { + return span->GetName() == + "Recv.grpc.testing.EchoTestService/BidiStream"; + }); + outbound_seq_nums.clear(); + inbound_seq_nums.clear(); + // Verify messages on the server span + for (const auto& event : (*server_span)->GetEvents()) { + if (event.GetName() == "Outbound message") { + outbound_seq_nums.push_back( + std::get(event.GetAttributes().at("sequence-number"))); + } + if (event.GetName() == "Inbound message") { + inbound_seq_nums.push_back( + std::get(event.GetAttributes().at("sequence-number"))); + } + } + EXPECT_THAT(outbound_seq_nums, + UnorderedElementsAre(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)); + EXPECT_THAT(inbound_seq_nums, + UnorderedElementsAre(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)); + EXPECT_NE(server_span, spans.end()); +} + +TEST_F(OTelTracingTest, Retries) { + { + ChannelArguments args; + args.SetString(GRPC_ARG_SERVICE_CONFIG, + "{\n" + " \"methodConfig\": [ {\n" + " \"name\": [\n" + " { \"service\": \"grpc.testing.EchoTestService\" }\n" + " ],\n" + " \"retryPolicy\": {\n" + " \"maxAttempts\": 3,\n" + " \"initialBackoff\": \"0.1s\",\n" + " \"maxBackoff\": \"120s\",\n" + " \"backoffMultiplier\": 1,\n" + " \"retryableStatusCodes\": [ \"ABORTED\" ]\n" + " }\n" + " } ]\n" + "}"); + auto channel = CreateCustomChannel(server_address_, + InsecureChannelCredentials(), args); + auto stub = EchoTestService::NewStub(channel); + EchoRequest request; + request.set_message("foo"); + request.mutable_param()->mutable_expected_error()->set_code( + StatusCode::ABORTED); + EchoResponse response; + grpc::ClientContext context; + grpc::Status status = stub->Echo(&context, request, &response); + } + absl::SleepFor(absl::Milliseconds(500 * grpc_test_slowdown_factor())); + auto spans = data_->GetSpans(); + EXPECT_EQ(spans.size(), 7); // 1 client span, 3 attempt spans, 3 server spans + std::vector attempt_seq_nums; + uint64_t server_span_count = 0; + for (const auto& span : spans) { + if (span->GetName() == "Attempt.grpc.testing.EchoTestService/Echo") { + attempt_seq_nums.push_back(std::get( + span->GetAttributes().at("previous-rpc-attempts"))); + } else if (span->GetName() == "Recv.grpc.testing.EchoTestService/Echo") { + ++server_span_count; + } + } + EXPECT_THAT(attempt_seq_nums, UnorderedElementsAre(0, 1, 2)); + EXPECT_EQ(server_span_count, 3); +} + } // namespace } // namespace testing } // namespace grpc