Skip to content

Commit

Permalink
Reviewer comments
Browse files Browse the repository at this point in the history
  • Loading branch information
yashykt committed Jan 17, 2025
1 parent 36dd711 commit a468e0a
Show file tree
Hide file tree
Showing 19 changed files with 137 additions and 144 deletions.
1 change: 1 addition & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -1624,6 +1624,7 @@ grpc_cc_library(
"//src/core:channel_args",
"//src/core:context",
"//src/core:error",
"//src/core:message",
"//src/core:metadata_batch",
"//src/core:ref_counted_string",
"//src/core:slice_buffer",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ MessageHandle ChannelCompression::CompressMessage(
<< " alg=" << algorithm << " flags=" << message->flags();
auto* call_tracer = MaybeGetContext<CallTracerInterface>();
if (call_tracer != nullptr) {
call_tracer->RecordSendMessage(*message->payload());
call_tracer->RecordSendMessage(*message);
}
// Check if we're allowed to compress this message
// (apps might want to disable compression for certain messages to avoid
Expand Down Expand Up @@ -147,7 +147,7 @@ MessageHandle ChannelCompression::CompressMessage(
tmp.Swap(payload);
flags |= GRPC_WRITE_INTERNAL_COMPRESS;
if (call_tracer != nullptr) {
call_tracer->RecordSendCompressedMessage(*message->payload());
call_tracer->RecordSendCompressedMessage(*message);
}
} else {
if (GRPC_TRACE_FLAG_ENABLED(compression)) {
Expand All @@ -169,8 +169,7 @@ absl::StatusOr<MessageHandle> ChannelCompression::DecompressMessage(
<< " alg=" << args.algorithm;
auto* call_tracer = MaybeGetContext<CallTracerInterface>();
if (call_tracer != nullptr) {
call_tracer->RecordReceivedMessage(
*message->payload(), message->flags() & GRPC_WRITE_INTERNAL_COMPRESS);
call_tracer->RecordReceivedMessage(*message);
}
// Check max message length.
if (args.max_recv_message_length.has_value() &&
Expand Down Expand Up @@ -200,7 +199,7 @@ absl::StatusOr<MessageHandle> ChannelCompression::DecompressMessage(
message->mutable_flags() &= ~GRPC_WRITE_INTERNAL_COMPRESS;
message->mutable_flags() |= GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED;
if (call_tracer != nullptr) {
call_tracer->RecordReceivedDecompressedMessage(*message->payload());
call_tracer->RecordReceivedDecompressedMessage(*message);
}
return std::move(message);
}
Expand Down
9 changes: 4 additions & 5 deletions src/core/ext/transport/chttp2/transport/call_tracer_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,14 @@ class Chttp2CallTracerWrapper final : public CallTracerInterface {
grpc_metadata_batch* /*send_initial_metadata*/) override {}
void RecordSendTrailingMetadata(
grpc_metadata_batch* /*send_trailing_metadata*/) override {}
void RecordSendMessage(const SliceBuffer& /*send_message*/) override {}
void RecordSendMessage(const Message& /*send_message*/) override {}
void RecordSendCompressedMessage(
const SliceBuffer& /*send_compressed_message*/) override {}
const Message& /*send_compressed_message*/) override {}
void RecordReceivedInitialMetadata(
grpc_metadata_batch* /*recv_initial_metadata*/) override {}
void RecordReceivedMessage(const SliceBuffer& /*recv_message*/,
bool /*compressed*/) override {}
void RecordReceivedMessage(const Message& /*recv_message*/) override {}
void RecordReceivedDecompressedMessage(
const SliceBuffer& /*recv_decompressed_message*/) override {}
const Message& /*recv_decompressed_message*/) override {}
void RecordCancel(grpc_error_handle /*cancel_error*/) override {}
std::shared_ptr<TcpTracerInterface> StartNewTcpTrace() override {
return nullptr;
Expand Down
22 changes: 10 additions & 12 deletions src/core/telemetry/call_tracer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ class DelegatingClientCallTracer : public ClientCallTracer {
tracer->RecordSendTrailingMetadata(send_trailing_metadata);
}
}
void RecordSendMessage(const SliceBuffer& send_message) override {
void RecordSendMessage(const Message& send_message) override {
for (auto* tracer : tracers_) {
tracer->RecordSendMessage(send_message);
}
}
void RecordSendCompressedMessage(
const SliceBuffer& send_compressed_message) override {
const Message& send_compressed_message) override {
for (auto* tracer : tracers_) {
tracer->RecordSendCompressedMessage(send_compressed_message);
}
Expand All @@ -118,14 +118,13 @@ class DelegatingClientCallTracer : public ClientCallTracer {
tracer->RecordReceivedInitialMetadata(recv_initial_metadata);
}
}
void RecordReceivedMessage(const SliceBuffer& recv_message,
bool compressed) override {
void RecordReceivedMessage(const Message& recv_message) override {
for (auto* tracer : tracers_) {
tracer->RecordReceivedMessage(recv_message, compressed);
tracer->RecordReceivedMessage(recv_message);
}
}
void RecordReceivedDecompressedMessage(
const SliceBuffer& recv_decompressed_message) override {
const Message& recv_decompressed_message) override {
for (auto* tracer : tracers_) {
tracer->RecordReceivedDecompressedMessage(recv_decompressed_message);
}
Expand Down Expand Up @@ -248,13 +247,13 @@ class DelegatingServerCallTracer : public ServerCallTracer {
tracer->RecordSendTrailingMetadata(send_trailing_metadata);
}
}
void RecordSendMessage(const SliceBuffer& send_message) override {
void RecordSendMessage(const Message& send_message) override {
for (auto* tracer : tracers_) {
tracer->RecordSendMessage(send_message);
}
}
void RecordSendCompressedMessage(
const SliceBuffer& send_compressed_message) override {
const Message& send_compressed_message) override {
for (auto* tracer : tracers_) {
tracer->RecordSendCompressedMessage(send_compressed_message);
}
Expand All @@ -265,14 +264,13 @@ class DelegatingServerCallTracer : public ServerCallTracer {
tracer->RecordReceivedInitialMetadata(recv_initial_metadata);
}
}
void RecordReceivedMessage(const SliceBuffer& recv_message,
bool compressed) override {
void RecordReceivedMessage(const Message& recv_message) override {
for (auto* tracer : tracers_) {
tracer->RecordReceivedMessage(recv_message, compressed);
tracer->RecordReceivedMessage(recv_message);
}
}
void RecordReceivedDecompressedMessage(
const SliceBuffer& recv_decompressed_message) override {
const Message& recv_decompressed_message) override {
for (auto* tracer : tracers_) {
tracer->RecordReceivedDecompressedMessage(recv_decompressed_message);
}
Expand Down
10 changes: 5 additions & 5 deletions src/core/telemetry/call_tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/call_final_info.h"
#include "src/core/lib/transport/message.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/telemetry/tcp_tracer.h"
#include "src/core/util/ref_counted_string.h"
Expand Down Expand Up @@ -95,20 +96,19 @@ class CallTracerInterface : public CallTracerAnnotationInterface {
grpc_metadata_batch* send_initial_metadata) = 0;
virtual void RecordSendTrailingMetadata(
grpc_metadata_batch* send_trailing_metadata) = 0;
virtual void RecordSendMessage(const SliceBuffer& send_message) = 0;
virtual void RecordSendMessage(const Message& send_message) = 0;
// Only invoked if message was actually compressed.
virtual void RecordSendCompressedMessage(
const SliceBuffer& send_compressed_message) = 0;
const Message& send_compressed_message) = 0;
// The `RecordReceivedInitialMetadata()` and `RecordReceivedMessage()`
// methods should only be invoked when the metadata/message was
// successfully received, i.e., without any error.
virtual void RecordReceivedInitialMetadata(
grpc_metadata_batch* recv_initial_metadata) = 0;
virtual void RecordReceivedMessage(const SliceBuffer& recv_message,
bool compressed) = 0;
virtual void RecordReceivedMessage(const Message& recv_message) = 0;
// Only invoked if message was actually decompressed.
virtual void RecordReceivedDecompressedMessage(
const SliceBuffer& recv_decompressed_message) = 0;
const Message& recv_decompressed_message) = 0;
virtual void RecordCancel(grpc_error_handle cancel_error) = 0;

struct TransportByteSize {
Expand Down
26 changes: 14 additions & 12 deletions src/cpp/ext/filters/census/client_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,31 +154,33 @@ void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
}

void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordSendMessage(
const grpc_core::SliceBuffer& send_message) {
RecordAnnotation(
absl::StrFormat("Send message: %ld bytes", send_message.Length()));
const grpc_core::Message& send_message) {
RecordAnnotation(absl::StrFormat("Send message: %ld bytes",
send_message.payload()->Length()));
++sent_message_count_;
}

void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
RecordSendCompressedMessage(
const grpc_core::SliceBuffer& send_compressed_message) {
RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes",
send_compressed_message.Length()));
const grpc_core::Message& send_compressed_message) {
RecordAnnotation(
absl::StrFormat("Send compressed message: %ld bytes",
send_compressed_message.payload()->Length()));
}

void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordReceivedMessage(
const grpc_core::SliceBuffer& recv_message, bool /*compressed*/) {
RecordAnnotation(
absl::StrFormat("Received message: %ld bytes", recv_message.Length()));
const grpc_core::Message& recv_message) {
RecordAnnotation(absl::StrFormat("Received message: %ld bytes",
recv_message.payload()->Length()));
++recv_message_count_;
}

void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
RecordReceivedDecompressedMessage(
const grpc_core::SliceBuffer& recv_decompressed_message) {
RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes",
recv_decompressed_message.Length()));
const grpc_core::Message& recv_decompressed_message) {
RecordAnnotation(
absl::StrFormat("Received decompressed message: %ld bytes",
recv_decompressed_message.payload()->Length()));
}

namespace {
Expand Down
9 changes: 4 additions & 5 deletions src/cpp/ext/filters/census/open_census_call_tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,14 @@ class OpenCensusCallTracer : public grpc_core::ClientCallTracer {
grpc_metadata_batch* send_initial_metadata) override;
void RecordSendTrailingMetadata(
grpc_metadata_batch* /*send_trailing_metadata*/) override {}
void RecordSendMessage(const grpc_core::SliceBuffer& send_message) override;
void RecordSendMessage(const grpc_core::Message& send_message) override;
void RecordSendCompressedMessage(
const grpc_core::SliceBuffer& send_compressed_message) override;
const grpc_core::Message& send_compressed_message) override;
void RecordReceivedInitialMetadata(
grpc_metadata_batch* /*recv_initial_metadata*/) override {}
void RecordReceivedMessage(const grpc_core::SliceBuffer& recv_message,
bool /*compressed*/) override;
void RecordReceivedMessage(const grpc_core::Message& recv_message) override;
void RecordReceivedDecompressedMessage(
const grpc_core::SliceBuffer& recv_decompressed_message) override;
const grpc_core::Message& recv_decompressed_message) override;
void RecordReceivedTrailingMetadata(
absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
const grpc_transport_stream_stats* transport_stream_stats) override;
Expand Down
27 changes: 14 additions & 13 deletions src/cpp/ext/filters/census/server_call_tracer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,30 +118,31 @@ class OpenCensusServerCallTracer : public grpc_core::ServerCallTracer {
void RecordSendTrailingMetadata(
grpc_metadata_batch* send_trailing_metadata) override;

void RecordSendMessage(const grpc_core::SliceBuffer& send_message) override {
RecordAnnotation(
absl::StrFormat("Send message: %ld bytes", send_message.Length()));
void RecordSendMessage(const grpc_core::Message& send_message) override {
RecordAnnotation(absl::StrFormat("Send message: %ld bytes",
send_message.payload()->Length()));
++sent_message_count_;
}
void RecordSendCompressedMessage(
const grpc_core::SliceBuffer& send_compressed_message) override {
RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes",
send_compressed_message.Length()));
const grpc_core::Message& send_compressed_message) override {
RecordAnnotation(
absl::StrFormat("Send compressed message: %ld bytes",
send_compressed_message.payload()->Length()));
}

void RecordReceivedInitialMetadata(
grpc_metadata_batch* recv_initial_metadata) override;

void RecordReceivedMessage(const grpc_core::SliceBuffer& recv_message,
bool /*compressed*/) override {
RecordAnnotation(
absl::StrFormat("Received message: %ld bytes", recv_message.Length()));
void RecordReceivedMessage(const grpc_core::Message& recv_message) override {
RecordAnnotation(absl::StrFormat("Received message: %ld bytes",
recv_message.payload()->Length()));
++recv_message_count_;
}
void RecordReceivedDecompressedMessage(
const grpc_core::SliceBuffer& recv_decompressed_message) override {
RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes",
recv_decompressed_message.Length()));
const grpc_core::Message& recv_decompressed_message) override {
RecordAnnotation(
absl::StrFormat("Received decompressed message: %ld bytes",
recv_decompressed_message.payload()->Length()));
}

void RecordReceivedTrailingMetadata(
Expand Down
27 changes: 14 additions & 13 deletions src/cpp/ext/otel/otel_client_call_tracer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,30 +111,31 @@ void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
}

void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordSendMessage(const grpc_core::SliceBuffer& send_message) {
RecordAnnotation(
absl::StrFormat("Send message: %ld bytes", send_message.Length()));
RecordSendMessage(const grpc_core::Message& send_message) {
RecordAnnotation(absl::StrFormat("Send message: %ld bytes",
send_message.payload()->Length()));
}

void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordSendCompressedMessage(
const grpc_core::SliceBuffer& send_compressed_message) {
RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes",
send_compressed_message.Length()));
const grpc_core::Message& send_compressed_message) {
RecordAnnotation(
absl::StrFormat("Send compressed message: %ld bytes",
send_compressed_message.payload()->Length()));
}

void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordReceivedMessage(const grpc_core::SliceBuffer& recv_message,
bool /*compressed*/) {
RecordAnnotation(
absl::StrFormat("Received message: %ld bytes", recv_message.Length()));
RecordReceivedMessage(const grpc_core::Message& recv_message) {
RecordAnnotation(absl::StrFormat("Received message: %ld bytes",
recv_message.payload()->Length()));
}

void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordReceivedDecompressedMessage(
const grpc_core::SliceBuffer& recv_decompressed_message) {
RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes",
recv_decompressed_message.Length()));
const grpc_core::Message& recv_decompressed_message) {
RecordAnnotation(
absl::StrFormat("Received decompressed message: %ld bytes",
recv_decompressed_message.payload()->Length()));
}

void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
Expand Down
9 changes: 4 additions & 5 deletions src/cpp/ext/otel/otel_client_call_tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,14 @@ class OpenTelemetryPluginImpl::ClientCallTracer
grpc_metadata_batch* send_initial_metadata) override;
void RecordSendTrailingMetadata(
grpc_metadata_batch* /*send_trailing_metadata*/) override {}
void RecordSendMessage(const grpc_core::SliceBuffer& send_message) override;
void RecordSendMessage(const grpc_core::Message& send_message) override;
void RecordSendCompressedMessage(
const grpc_core::SliceBuffer& send_compressed_message) override;
const grpc_core::Message& send_compressed_message) override;
void RecordReceivedInitialMetadata(
grpc_metadata_batch* recv_initial_metadata) override;
void RecordReceivedMessage(const grpc_core::SliceBuffer& recv_message,
bool /*compressed*/) override;
void RecordReceivedMessage(const grpc_core::Message& recv_message) override;
void RecordReceivedDecompressedMessage(
const grpc_core::SliceBuffer& recv_decompressed_message) override;
const grpc_core::Message& recv_decompressed_message) override;
void RecordReceivedTrailingMetadata(
absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
const grpc_transport_stream_stats* transport_stream_stats) override;
Expand Down
27 changes: 14 additions & 13 deletions src/cpp/ext/otel/otel_server_call_tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,28 +66,29 @@ class OpenTelemetryPluginImpl::ServerCallTracer
void RecordSendTrailingMetadata(
grpc_metadata_batch* /*send_trailing_metadata*/) override;

void RecordSendMessage(const grpc_core::SliceBuffer& send_message) override {
RecordAnnotation(
absl::StrFormat("Send message: %ld bytes", send_message.Length()));
void RecordSendMessage(const grpc_core::Message& send_message) override {
RecordAnnotation(absl::StrFormat("Send message: %ld bytes",
send_message.payload()->Length()));
}
void RecordSendCompressedMessage(
const grpc_core::SliceBuffer& send_compressed_message) override {
RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes",
send_compressed_message.Length()));
const grpc_core::Message& send_compressed_message) override {
RecordAnnotation(
absl::StrFormat("Send compressed message: %ld bytes",
send_compressed_message.payload()->Length()));
}

void RecordReceivedInitialMetadata(
grpc_metadata_batch* recv_initial_metadata) override;

void RecordReceivedMessage(const grpc_core::SliceBuffer& recv_message,
bool /*compressed*/) override {
RecordAnnotation(
absl::StrFormat("Received message: %ld bytes", recv_message.Length()));
void RecordReceivedMessage(const grpc_core::Message& recv_message) override {
RecordAnnotation(absl::StrFormat("Received message: %ld bytes",
recv_message.payload()->Length()));
}
void RecordReceivedDecompressedMessage(
const grpc_core::SliceBuffer& recv_decompressed_message) override {
RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes",
recv_decompressed_message.Length()));
const grpc_core::Message& recv_decompressed_message) override {
RecordAnnotation(
absl::StrFormat("Received decompressed message: %ld bytes",
recv_decompressed_message.payload()->Length()));
}

void RecordReceivedTrailingMetadata(
Expand Down
Loading

0 comments on commit a468e0a

Please sign in to comment.