From e6365dc52982cb0945ef72b834ea37173fe8ede8 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Sat, 11 Jan 2025 01:39:14 +0000 Subject: [PATCH] Add compression and failed status tests --- src/proto/grpc/testing/echo_messages.proto | 7 + test/cpp/end2end/test_service_impl.cc | 9 ++ test/cpp/ext/otel/otel_tracing_test.cc | 151 +++++++++++++++++++++ 3 files changed, 167 insertions(+) diff --git a/src/proto/grpc/testing/echo_messages.proto b/src/proto/grpc/testing/echo_messages.proto index 0b4a3bd6fee90..75acff7cd99f6 100644 --- a/src/proto/grpc/testing/echo_messages.proto +++ b/src/proto/grpc/testing/echo_messages.proto @@ -55,6 +55,13 @@ message RequestParams { bool server_notify_client_when_started = 18; xds.data.orca.v3.OrcaLoadReport backend_metrics = 19; bool echo_host_from_authority_header = 20; + + enum CompressionAlgorithm { + NONE = 0; + DEFLATE = 1; + GZIP = 2; + } + CompressionAlgorithm compression_algorithm = 21; } message EchoRequest { diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc index b2b347f7aabf4..e87f7b2a8822c 100644 --- a/test/cpp/end2end/test_service_impl.cc +++ b/test/cpp/end2end/test_service_impl.cc @@ -230,6 +230,15 @@ ServerUnaryReactor* CallbackTestServiceImpl::Echo( FinishWhenCancelledAsync(); return; } + if (req_->has_param() && + req_->param().compression_algorithm() != RequestParams::NONE) { + if (req_->param().compression_algorithm() == RequestParams::DEFLATE) { + ctx_->set_compression_algorithm(GRPC_COMPRESS_DEFLATE); + } else if (req_->param().compression_algorithm() == + RequestParams::GZIP) { + ctx_->set_compression_algorithm(GRPC_COMPRESS_GZIP); + } + } resp_->set_message(req_->message()); internal::MaybeEchoDeadline(ctx_, req_, resp_); if (service_->host_) { diff --git a/test/cpp/ext/otel/otel_tracing_test.cc b/test/cpp/ext/otel/otel_tracing_test.cc index 549f9d95f9531..782ee8e790d96 100644 --- a/test/cpp/ext/otel/otel_tracing_test.cc +++ b/test/cpp/ext/otel/otel_tracing_test.cc @@ -36,6 +36,8 @@ namespace { using opentelemetry::sdk::trace::SpanData; using opentelemetry::sdk::trace::SpanDataEvent; +using ::testing::Lt; +using ::testing::MatchesRegex; using ::testing::Pair; using ::testing::UnorderedElementsAre; using ::testing::VariantWith; @@ -199,6 +201,155 @@ TEST_F(OTelTracingTest, TestApplicationContextFlows) { EXPECT_EQ((*client_span)->GetParentSpanId(), (*test_span)->GetSpanId()); } +TEST_F(OTelTracingTest, CompressionMessageEvents) { + { + EchoRequest request; + request.set_message("AAAAAAAAAAAAAAAAAAAAAAAAAAAAA"); + request.mutable_param()->set_compression_algorithm(RequestParams::GZIP); + EchoResponse response; + grpc::ClientContext context; + context.set_compression_algorithm(GRPC_COMPRESS_GZIP); + grpc::Status status = stub_->Echo(&context, request, &response); + } + absl::SleepFor(absl::Milliseconds(500 * grpc_test_slowdown_factor())); + auto spans = data_->GetSpans(); + EXPECT_EQ(spans.size(), 3); + const auto attempt_span = std::find_if( + spans.begin(), spans.end(), [](const std::unique_ptr& span) { + return span->GetName() == "Attempt.grpc.testing.EchoTestService/Echo"; + }); + EXPECT_NE(attempt_span, spans.end()); + // Verify outbound messages on the attempt + auto outbound_message_event = std::find_if( + (*attempt_span)->GetEvents().begin(), (*attempt_span)->GetEvents().end(), + [](const SpanDataEvent& event) { + return event.GetName() == "Outbound message"; + }); + EXPECT_NE(outbound_message_event, (*attempt_span)->GetEvents().end()); + EXPECT_THAT( + outbound_message_event->GetAttributes(), + UnorderedElementsAre(Pair("sequence-number", VariantWith(0)), + Pair("message-size", VariantWith(36)))); + auto outbound_message_compressed_event = std::find_if( + (*attempt_span)->GetEvents().begin(), (*attempt_span)->GetEvents().end(), + [](const SpanDataEvent& event) { + return event.GetName() == "Outbound message compressed"; + }); + EXPECT_NE(outbound_message_compressed_event, + (*attempt_span)->GetEvents().end()); + EXPECT_THAT(outbound_message_compressed_event->GetAttributes(), + UnorderedElementsAre( + Pair("sequence-number", VariantWith(0)), + Pair("message-size", VariantWith(Lt(36))))); + // Verify inbound messages on the attempt + auto inbound_message_event = std::find_if( + (*attempt_span)->GetEvents().begin(), (*attempt_span)->GetEvents().end(), + [](const SpanDataEvent& event) { + return event.GetName() == "Inbound message"; + }); + EXPECT_NE(inbound_message_event, (*attempt_span)->GetEvents().end()); + EXPECT_THAT(inbound_message_event->GetAttributes(), + UnorderedElementsAre( + Pair("sequence-number", VariantWith(0)), + Pair("message-size", VariantWith(Lt(31))))); + auto inbound_message_decompressed_event = std::find_if( + (*attempt_span)->GetEvents().begin(), (*attempt_span)->GetEvents().end(), + [](const SpanDataEvent& event) { + return event.GetName() == "Inbound message decompressed"; + }); + EXPECT_NE(inbound_message_decompressed_event, + (*attempt_span)->GetEvents().end()); + EXPECT_THAT( + inbound_message_decompressed_event->GetAttributes(), + UnorderedElementsAre(Pair("sequence-number", VariantWith(0)), + Pair("message-size", VariantWith(31)))); + const auto server_span = std::find_if( + spans.begin(), spans.end(), [](const std::unique_ptr& span) { + return span->GetName() == "Recv.grpc.testing.EchoTestService/Echo"; + }); + EXPECT_NE(server_span, spans.end()); + // Verify inbound messages on the server + inbound_message_event = std::find_if( + (*server_span)->GetEvents().begin(), (*server_span)->GetEvents().end(), + [](const SpanDataEvent& event) { + return event.GetName() == "Inbound message"; + }); + EXPECT_NE(inbound_message_event, (*server_span)->GetEvents().end()); + EXPECT_THAT(inbound_message_event->GetAttributes(), + UnorderedElementsAre( + Pair("sequence-number", VariantWith(0)), + Pair("message-size", VariantWith(Lt(36))))); + inbound_message_decompressed_event = std::find_if( + (*server_span)->GetEvents().begin(), (*server_span)->GetEvents().end(), + [](const SpanDataEvent& event) { + return event.GetName() == "Inbound message decompressed"; + }); + EXPECT_NE(inbound_message_decompressed_event, + (*server_span)->GetEvents().end()); + EXPECT_THAT( + inbound_message_decompressed_event->GetAttributes(), + UnorderedElementsAre(Pair("sequence-number", VariantWith(0)), + Pair("message-size", VariantWith(36)))); + // Verify outbound messages on the server + outbound_message_event = std::find_if( + (*server_span)->GetEvents().begin(), (*server_span)->GetEvents().end(), + [](const SpanDataEvent& event) { + return event.GetName() == "Outbound message"; + }); + EXPECT_NE(outbound_message_event, (*server_span)->GetEvents().end()); + EXPECT_THAT( + outbound_message_event->GetAttributes(), + UnorderedElementsAre(Pair("sequence-number", VariantWith(0)), + Pair("message-size", VariantWith(31)))); + outbound_message_compressed_event = std::find_if( + (*server_span)->GetEvents().begin(), (*server_span)->GetEvents().end(), + [](const SpanDataEvent& event) { + return event.GetName() == "Outbound message compressed"; + }); + EXPECT_NE(outbound_message_compressed_event, + (*server_span)->GetEvents().end()); + EXPECT_THAT(outbound_message_compressed_event->GetAttributes(), + UnorderedElementsAre( + Pair("sequence-number", VariantWith(0)), + Pair("message-size", VariantWith(Lt(31))))); +} + +TEST_F(OTelTracingTest, FailedStatus) { + { + EchoRequest request; + request.set_message("foo"); + request.mutable_param()->mutable_expected_error()->set_code( + grpc::StatusCode::UNAVAILABLE); + request.mutable_param()->mutable_expected_error()->set_error_message( + "test message"); + EchoResponse response; + grpc::ClientContext context; + context.set_compression_algorithm(GRPC_COMPRESS_GZIP); + grpc::Status status = stub_->Echo(&context, request, &response); + } + absl::SleepFor(absl::Milliseconds(500 * grpc_test_slowdown_factor())); + auto spans = data_->GetSpans(); + EXPECT_EQ(spans.size(), 3); + const auto attempt_span = std::find_if( + spans.begin(), spans.end(), [](const std::unique_ptr& span) { + return span->GetName() == "Attempt.grpc.testing.EchoTestService/Echo"; + }); + EXPECT_NE(attempt_span, spans.end()); + EXPECT_EQ((*attempt_span)->GetStatus(), + opentelemetry::trace::StatusCode::kError); + EXPECT_THAT((*attempt_span)->GetDescription(), + MatchesRegex("UNAVAILABLE:.*test message.*")); + const auto server_span = std::find_if( + spans.begin(), spans.end(), [](const std::unique_ptr& span) { + return span->GetName() == "Recv.grpc.testing.EchoTestService/Echo"; + }); + EXPECT_NE(server_span, spans.end()); + EXPECT_EQ((*server_span)->GetStatus(), + opentelemetry::trace::StatusCode::kError); + EXPECT_THAT((*server_span)->GetDescription(), + MatchesRegex("UNAVAILABLE:.*test message.*")); +} + } // namespace } // namespace testing } // namespace grpc