diff --git a/BUILD b/BUILD index 6f8f43b7bef3d..6d2ed540e8c3c 100644 --- a/BUILD +++ b/BUILD @@ -1624,6 +1624,7 @@ grpc_cc_library( "//src/core:channel_args", "//src/core:context", "//src/core:error", + "//src/core:message", "//src/core:metadata_batch", "//src/core:ref_counted_string", "//src/core:slice_buffer", diff --git a/src/core/ext/filters/http/message_compress/compression_filter.cc b/src/core/ext/filters/http/message_compress/compression_filter.cc index 42269ee82096e..9451f6448dc8f 100644 --- a/src/core/ext/filters/http/message_compress/compression_filter.cc +++ b/src/core/ext/filters/http/message_compress/compression_filter.cc @@ -114,7 +114,7 @@ MessageHandle ChannelCompression::CompressMessage( << " alg=" << algorithm << " flags=" << message->flags(); auto* call_tracer = MaybeGetContext(); if (call_tracer != nullptr) { - call_tracer->RecordSendMessage(*message->payload()); + call_tracer->RecordSendMessage(*message); } // Check if we're allowed to compress this message // (apps might want to disable compression for certain messages to avoid @@ -147,7 +147,7 @@ MessageHandle ChannelCompression::CompressMessage( tmp.Swap(payload); flags |= GRPC_WRITE_INTERNAL_COMPRESS; if (call_tracer != nullptr) { - call_tracer->RecordSendCompressedMessage(*message->payload()); + call_tracer->RecordSendCompressedMessage(*message); } } else { if (GRPC_TRACE_FLAG_ENABLED(compression)) { @@ -169,8 +169,7 @@ absl::StatusOr ChannelCompression::DecompressMessage( << " alg=" << args.algorithm; auto* call_tracer = MaybeGetContext(); if (call_tracer != nullptr) { - call_tracer->RecordReceivedMessage( - *message->payload(), message->flags() & GRPC_WRITE_INTERNAL_COMPRESS); + call_tracer->RecordReceivedMessage(*message); } // Check max message length. if (args.max_recv_message_length.has_value() && @@ -200,7 +199,7 @@ absl::StatusOr ChannelCompression::DecompressMessage( message->mutable_flags() &= ~GRPC_WRITE_INTERNAL_COMPRESS; message->mutable_flags() |= GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED; if (call_tracer != nullptr) { - call_tracer->RecordReceivedDecompressedMessage(*message->payload()); + call_tracer->RecordReceivedDecompressedMessage(*message); } return std::move(message); } diff --git a/src/core/ext/transport/chttp2/transport/call_tracer_wrapper.h b/src/core/ext/transport/chttp2/transport/call_tracer_wrapper.h index 0068faa80e740..d1c1a99df5d22 100644 --- a/src/core/ext/transport/chttp2/transport/call_tracer_wrapper.h +++ b/src/core/ext/transport/chttp2/transport/call_tracer_wrapper.h @@ -45,15 +45,14 @@ class Chttp2CallTracerWrapper final : public CallTracerInterface { grpc_metadata_batch* /*send_initial_metadata*/) override {} void RecordSendTrailingMetadata( grpc_metadata_batch* /*send_trailing_metadata*/) override {} - void RecordSendMessage(const SliceBuffer& /*send_message*/) override {} + void RecordSendMessage(const Message& /*send_message*/) override {} void RecordSendCompressedMessage( - const SliceBuffer& /*send_compressed_message*/) override {} + const Message& /*send_compressed_message*/) override {} void RecordReceivedInitialMetadata( grpc_metadata_batch* /*recv_initial_metadata*/) override {} - void RecordReceivedMessage(const SliceBuffer& /*recv_message*/, - bool /*compressed*/) override {} + void RecordReceivedMessage(const Message& /*recv_message*/) override {} void RecordReceivedDecompressedMessage( - const SliceBuffer& /*recv_decompressed_message*/) override {} + const Message& /*recv_decompressed_message*/) override {} void RecordCancel(grpc_error_handle /*cancel_error*/) override {} std::shared_ptr StartNewTcpTrace() override { return nullptr; diff --git a/src/core/telemetry/call_tracer.cc b/src/core/telemetry/call_tracer.cc index 89efcc2e33618..49467582cb115 100644 --- a/src/core/telemetry/call_tracer.cc +++ b/src/core/telemetry/call_tracer.cc @@ -101,13 +101,13 @@ class DelegatingClientCallTracer : public ClientCallTracer { tracer->RecordSendTrailingMetadata(send_trailing_metadata); } } - void RecordSendMessage(const SliceBuffer& send_message) override { + void RecordSendMessage(const Message& send_message) override { for (auto* tracer : tracers_) { tracer->RecordSendMessage(send_message); } } void RecordSendCompressedMessage( - const SliceBuffer& send_compressed_message) override { + const Message& send_compressed_message) override { for (auto* tracer : tracers_) { tracer->RecordSendCompressedMessage(send_compressed_message); } @@ -118,14 +118,13 @@ class DelegatingClientCallTracer : public ClientCallTracer { tracer->RecordReceivedInitialMetadata(recv_initial_metadata); } } - void RecordReceivedMessage(const SliceBuffer& recv_message, - bool compressed) override { + void RecordReceivedMessage(const Message& recv_message) override { for (auto* tracer : tracers_) { - tracer->RecordReceivedMessage(recv_message, compressed); + tracer->RecordReceivedMessage(recv_message); } } void RecordReceivedDecompressedMessage( - const SliceBuffer& recv_decompressed_message) override { + const Message& recv_decompressed_message) override { for (auto* tracer : tracers_) { tracer->RecordReceivedDecompressedMessage(recv_decompressed_message); } @@ -248,13 +247,13 @@ class DelegatingServerCallTracer : public ServerCallTracer { tracer->RecordSendTrailingMetadata(send_trailing_metadata); } } - void RecordSendMessage(const SliceBuffer& send_message) override { + void RecordSendMessage(const Message& send_message) override { for (auto* tracer : tracers_) { tracer->RecordSendMessage(send_message); } } void RecordSendCompressedMessage( - const SliceBuffer& send_compressed_message) override { + const Message& send_compressed_message) override { for (auto* tracer : tracers_) { tracer->RecordSendCompressedMessage(send_compressed_message); } @@ -265,14 +264,13 @@ class DelegatingServerCallTracer : public ServerCallTracer { tracer->RecordReceivedInitialMetadata(recv_initial_metadata); } } - void RecordReceivedMessage(const SliceBuffer& recv_message, - bool compressed) override { + void RecordReceivedMessage(const Message& recv_message) override { for (auto* tracer : tracers_) { - tracer->RecordReceivedMessage(recv_message, compressed); + tracer->RecordReceivedMessage(recv_message); } } void RecordReceivedDecompressedMessage( - const SliceBuffer& recv_decompressed_message) override { + const Message& recv_decompressed_message) override { for (auto* tracer : tracers_) { tracer->RecordReceivedDecompressedMessage(recv_decompressed_message); } diff --git a/src/core/telemetry/call_tracer.h b/src/core/telemetry/call_tracer.h index a2cbd19fa7b6f..348a327cd21f5 100644 --- a/src/core/telemetry/call_tracer.h +++ b/src/core/telemetry/call_tracer.h @@ -34,6 +34,7 @@ #include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/transport/call_final_info.h" +#include "src/core/lib/transport/message.h" #include "src/core/lib/transport/metadata_batch.h" #include "src/core/telemetry/tcp_tracer.h" #include "src/core/util/ref_counted_string.h" @@ -95,20 +96,19 @@ class CallTracerInterface : public CallTracerAnnotationInterface { grpc_metadata_batch* send_initial_metadata) = 0; virtual void RecordSendTrailingMetadata( grpc_metadata_batch* send_trailing_metadata) = 0; - virtual void RecordSendMessage(const SliceBuffer& send_message) = 0; + virtual void RecordSendMessage(const Message& send_message) = 0; // Only invoked if message was actually compressed. virtual void RecordSendCompressedMessage( - const SliceBuffer& send_compressed_message) = 0; + const Message& send_compressed_message) = 0; // The `RecordReceivedInitialMetadata()` and `RecordReceivedMessage()` // methods should only be invoked when the metadata/message was // successfully received, i.e., without any error. virtual void RecordReceivedInitialMetadata( grpc_metadata_batch* recv_initial_metadata) = 0; - virtual void RecordReceivedMessage(const SliceBuffer& recv_message, - bool compressed) = 0; + virtual void RecordReceivedMessage(const Message& recv_message) = 0; // Only invoked if message was actually decompressed. virtual void RecordReceivedDecompressedMessage( - const SliceBuffer& recv_decompressed_message) = 0; + const Message& recv_decompressed_message) = 0; virtual void RecordCancel(grpc_error_handle cancel_error) = 0; struct TransportByteSize { diff --git a/src/cpp/ext/filters/census/client_filter.cc b/src/cpp/ext/filters/census/client_filter.cc index b88600355edfb..e067f106683d2 100644 --- a/src/cpp/ext/filters/census/client_filter.cc +++ b/src/cpp/ext/filters/census/client_filter.cc @@ -154,31 +154,33 @@ void OpenCensusCallTracer::OpenCensusCallAttemptTracer:: } void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordSendMessage( - const grpc_core::SliceBuffer& send_message) { - RecordAnnotation( - absl::StrFormat("Send message: %ld bytes", send_message.Length())); + const grpc_core::Message& send_message) { + RecordAnnotation(absl::StrFormat("Send message: %ld bytes", + send_message.payload()->Length())); ++sent_message_count_; } void OpenCensusCallTracer::OpenCensusCallAttemptTracer:: RecordSendCompressedMessage( - const grpc_core::SliceBuffer& send_compressed_message) { - RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes", - send_compressed_message.Length())); + const grpc_core::Message& send_compressed_message) { + RecordAnnotation( + absl::StrFormat("Send compressed message: %ld bytes", + send_compressed_message.payload()->Length())); } void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordReceivedMessage( - const grpc_core::SliceBuffer& recv_message, bool /*compressed*/) { - RecordAnnotation( - absl::StrFormat("Received message: %ld bytes", recv_message.Length())); + const grpc_core::Message& recv_message) { + RecordAnnotation(absl::StrFormat("Received message: %ld bytes", + recv_message.payload()->Length())); ++recv_message_count_; } void OpenCensusCallTracer::OpenCensusCallAttemptTracer:: RecordReceivedDecompressedMessage( - const grpc_core::SliceBuffer& recv_decompressed_message) { - RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes", - recv_decompressed_message.Length())); + const grpc_core::Message& recv_decompressed_message) { + RecordAnnotation( + absl::StrFormat("Received decompressed message: %ld bytes", + recv_decompressed_message.payload()->Length())); } namespace { diff --git a/src/cpp/ext/filters/census/open_census_call_tracer.h b/src/cpp/ext/filters/census/open_census_call_tracer.h index 41974373bc32b..daa9fc47fd191 100644 --- a/src/cpp/ext/filters/census/open_census_call_tracer.h +++ b/src/cpp/ext/filters/census/open_census_call_tracer.h @@ -83,15 +83,14 @@ class OpenCensusCallTracer : public grpc_core::ClientCallTracer { grpc_metadata_batch* send_initial_metadata) override; void RecordSendTrailingMetadata( grpc_metadata_batch* /*send_trailing_metadata*/) override {} - void RecordSendMessage(const grpc_core::SliceBuffer& send_message) override; + void RecordSendMessage(const grpc_core::Message& send_message) override; void RecordSendCompressedMessage( - const grpc_core::SliceBuffer& send_compressed_message) override; + const grpc_core::Message& send_compressed_message) override; void RecordReceivedInitialMetadata( grpc_metadata_batch* /*recv_initial_metadata*/) override {} - void RecordReceivedMessage(const grpc_core::SliceBuffer& recv_message, - bool /*compressed*/) override; + void RecordReceivedMessage(const grpc_core::Message& recv_message) override; void RecordReceivedDecompressedMessage( - const grpc_core::SliceBuffer& recv_decompressed_message) override; + const grpc_core::Message& recv_decompressed_message) override; void RecordReceivedTrailingMetadata( absl::Status status, grpc_metadata_batch* recv_trailing_metadata, const grpc_transport_stream_stats* transport_stream_stats) override; diff --git a/src/cpp/ext/filters/census/server_call_tracer.cc b/src/cpp/ext/filters/census/server_call_tracer.cc index 88e807736dc99..176407f510fee 100644 --- a/src/cpp/ext/filters/census/server_call_tracer.cc +++ b/src/cpp/ext/filters/census/server_call_tracer.cc @@ -118,30 +118,31 @@ class OpenCensusServerCallTracer : public grpc_core::ServerCallTracer { void RecordSendTrailingMetadata( grpc_metadata_batch* send_trailing_metadata) override; - void RecordSendMessage(const grpc_core::SliceBuffer& send_message) override { - RecordAnnotation( - absl::StrFormat("Send message: %ld bytes", send_message.Length())); + void RecordSendMessage(const grpc_core::Message& send_message) override { + RecordAnnotation(absl::StrFormat("Send message: %ld bytes", + send_message.payload()->Length())); ++sent_message_count_; } void RecordSendCompressedMessage( - const grpc_core::SliceBuffer& send_compressed_message) override { - RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes", - send_compressed_message.Length())); + const grpc_core::Message& send_compressed_message) override { + RecordAnnotation( + absl::StrFormat("Send compressed message: %ld bytes", + send_compressed_message.payload()->Length())); } void RecordReceivedInitialMetadata( grpc_metadata_batch* recv_initial_metadata) override; - void RecordReceivedMessage(const grpc_core::SliceBuffer& recv_message, - bool /*compressed*/) override { - RecordAnnotation( - absl::StrFormat("Received message: %ld bytes", recv_message.Length())); + void RecordReceivedMessage(const grpc_core::Message& recv_message) override { + RecordAnnotation(absl::StrFormat("Received message: %ld bytes", + recv_message.payload()->Length())); ++recv_message_count_; } void RecordReceivedDecompressedMessage( - const grpc_core::SliceBuffer& recv_decompressed_message) override { - RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes", - recv_decompressed_message.Length())); + const grpc_core::Message& recv_decompressed_message) override { + RecordAnnotation( + absl::StrFormat("Received decompressed message: %ld bytes", + recv_decompressed_message.payload()->Length())); } void RecordReceivedTrailingMetadata( diff --git a/src/cpp/ext/otel/otel_client_call_tracer.cc b/src/cpp/ext/otel/otel_client_call_tracer.cc index 4e30b9bab1541..27f414abcd1b7 100644 --- a/src/cpp/ext/otel/otel_client_call_tracer.cc +++ b/src/cpp/ext/otel/otel_client_call_tracer.cc @@ -111,30 +111,31 @@ void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer:: } void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer:: - RecordSendMessage(const grpc_core::SliceBuffer& send_message) { - RecordAnnotation( - absl::StrFormat("Send message: %ld bytes", send_message.Length())); + RecordSendMessage(const grpc_core::Message& send_message) { + RecordAnnotation(absl::StrFormat("Send message: %ld bytes", + send_message.payload()->Length())); } void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer:: RecordSendCompressedMessage( - const grpc_core::SliceBuffer& send_compressed_message) { - RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes", - send_compressed_message.Length())); + const grpc_core::Message& send_compressed_message) { + RecordAnnotation( + absl::StrFormat("Send compressed message: %ld bytes", + send_compressed_message.payload()->Length())); } void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer:: - RecordReceivedMessage(const grpc_core::SliceBuffer& recv_message, - bool /*compressed*/) { - RecordAnnotation( - absl::StrFormat("Received message: %ld bytes", recv_message.Length())); + RecordReceivedMessage(const grpc_core::Message& recv_message) { + RecordAnnotation(absl::StrFormat("Received message: %ld bytes", + recv_message.payload()->Length())); } void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer:: RecordReceivedDecompressedMessage( - const grpc_core::SliceBuffer& recv_decompressed_message) { - RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes", - recv_decompressed_message.Length())); + const grpc_core::Message& recv_decompressed_message) { + RecordAnnotation( + absl::StrFormat("Received decompressed message: %ld bytes", + recv_decompressed_message.payload()->Length())); } void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer:: diff --git a/src/cpp/ext/otel/otel_client_call_tracer.h b/src/cpp/ext/otel/otel_client_call_tracer.h index ff20c0f3c5671..145a471877642 100644 --- a/src/cpp/ext/otel/otel_client_call_tracer.h +++ b/src/cpp/ext/otel/otel_client_call_tracer.h @@ -72,15 +72,14 @@ class OpenTelemetryPluginImpl::ClientCallTracer grpc_metadata_batch* send_initial_metadata) override; void RecordSendTrailingMetadata( grpc_metadata_batch* /*send_trailing_metadata*/) override {} - void RecordSendMessage(const grpc_core::SliceBuffer& send_message) override; + void RecordSendMessage(const grpc_core::Message& send_message) override; void RecordSendCompressedMessage( - const grpc_core::SliceBuffer& send_compressed_message) override; + const grpc_core::Message& send_compressed_message) override; void RecordReceivedInitialMetadata( grpc_metadata_batch* recv_initial_metadata) override; - void RecordReceivedMessage(const grpc_core::SliceBuffer& recv_message, - bool /*compressed*/) override; + void RecordReceivedMessage(const grpc_core::Message& recv_message) override; void RecordReceivedDecompressedMessage( - const grpc_core::SliceBuffer& recv_decompressed_message) override; + const grpc_core::Message& recv_decompressed_message) override; void RecordReceivedTrailingMetadata( absl::Status status, grpc_metadata_batch* recv_trailing_metadata, const grpc_transport_stream_stats* transport_stream_stats) override; diff --git a/src/cpp/ext/otel/otel_server_call_tracer.h b/src/cpp/ext/otel/otel_server_call_tracer.h index 57720548f39a2..cd3cb1325ce1e 100644 --- a/src/cpp/ext/otel/otel_server_call_tracer.h +++ b/src/cpp/ext/otel/otel_server_call_tracer.h @@ -66,28 +66,29 @@ class OpenTelemetryPluginImpl::ServerCallTracer void RecordSendTrailingMetadata( grpc_metadata_batch* /*send_trailing_metadata*/) override; - void RecordSendMessage(const grpc_core::SliceBuffer& send_message) override { - RecordAnnotation( - absl::StrFormat("Send message: %ld bytes", send_message.Length())); + void RecordSendMessage(const grpc_core::Message& send_message) override { + RecordAnnotation(absl::StrFormat("Send message: %ld bytes", + send_message.payload()->Length())); } void RecordSendCompressedMessage( - const grpc_core::SliceBuffer& send_compressed_message) override { - RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes", - send_compressed_message.Length())); + const grpc_core::Message& send_compressed_message) override { + RecordAnnotation( + absl::StrFormat("Send compressed message: %ld bytes", + send_compressed_message.payload()->Length())); } void RecordReceivedInitialMetadata( grpc_metadata_batch* recv_initial_metadata) override; - void RecordReceivedMessage(const grpc_core::SliceBuffer& recv_message, - bool /*compressed*/) override { - RecordAnnotation( - absl::StrFormat("Received message: %ld bytes", recv_message.Length())); + void RecordReceivedMessage(const grpc_core::Message& recv_message) override { + RecordAnnotation(absl::StrFormat("Received message: %ld bytes", + recv_message.payload()->Length())); } void RecordReceivedDecompressedMessage( - const grpc_core::SliceBuffer& recv_decompressed_message) override { - RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes", - recv_decompressed_message.Length())); + const grpc_core::Message& recv_decompressed_message) override { + RecordAnnotation( + absl::StrFormat("Received decompressed message: %ld bytes", + recv_decompressed_message.payload()->Length())); } void RecordReceivedTrailingMetadata( diff --git a/src/python/grpcio_observability/grpc_observability/client_call_tracer.cc b/src/python/grpcio_observability/grpc_observability/client_call_tracer.cc index 601c362d3e166..35644d59d630e 100644 --- a/src/python/grpcio_observability/grpc_observability/client_call_tracer.cc +++ b/src/python/grpcio_observability/grpc_observability/client_call_tracer.cc @@ -201,13 +201,12 @@ void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer:: } void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer:: - RecordSendMessage(const grpc_core::SliceBuffer& /*send_message*/) { + RecordSendMessage(const grpc_core::Message& /*send_message*/) { ++sent_message_count_; } void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer:: - RecordReceivedMessage(const grpc_core::SliceBuffer& /*recv_message*/, - bool /*compressed*/) { + RecordReceivedMessage(const grpc_core::Message& /*recv_message*/) { ++recv_message_count_; } diff --git a/src/python/grpcio_observability/grpc_observability/client_call_tracer.h b/src/python/grpcio_observability/grpc_observability/client_call_tracer.h index 0872884f1770a..b059bcb6310b8 100644 --- a/src/python/grpcio_observability/grpc_observability/client_call_tracer.h +++ b/src/python/grpcio_observability/grpc_observability/client_call_tracer.h @@ -55,16 +55,15 @@ class PythonOpenCensusCallTracer : public grpc_core::ClientCallTracer { grpc_metadata_batch* send_initial_metadata) override; void RecordSendTrailingMetadata( grpc_metadata_batch* /*send_trailing_metadata*/) override {} - void RecordSendMessage( - const grpc_core::SliceBuffer& /*send_message*/) override; + void RecordSendMessage(const grpc_core::Message& /*send_message*/) override; void RecordSendCompressedMessage( - const grpc_core::SliceBuffer& /*send_compressed_message*/) override {} + const grpc_core::Message& /*send_compressed_message*/) override {} void RecordReceivedInitialMetadata( grpc_metadata_batch* /*recv_initial_metadata*/) override; - void RecordReceivedMessage(const grpc_core::SliceBuffer& /*recv_message*/, - bool /*compressed*/) override; + void RecordReceivedMessage( + const grpc_core::Message& /*recv_message*/) override; void RecordReceivedDecompressedMessage( - const grpc_core::SliceBuffer& /*recv_decompressed_message*/) override {} + const grpc_core::Message& /*recv_decompressed_message*/) override {} void RecordReceivedTrailingMetadata( absl::Status status, grpc_metadata_batch* recv_trailing_metadata, const grpc_transport_stream_stats* transport_stream_stats) override; diff --git a/src/python/grpcio_observability/grpc_observability/server_call_tracer.cc b/src/python/grpcio_observability/grpc_observability/server_call_tracer.cc index c995690105b28..93a602bcf58b5 100644 --- a/src/python/grpcio_observability/grpc_observability/server_call_tracer.cc +++ b/src/python/grpcio_observability/grpc_observability/server_call_tracer.cc @@ -141,29 +141,31 @@ void PythonOpenCensusServerCallTracer::RecordSendTrailingMetadata( } void PythonOpenCensusServerCallTracer::RecordSendMessage( - const grpc_core::SliceBuffer& send_message) { - RecordAnnotation( - absl::StrFormat("Send message: %ld bytes", send_message.Length())); + const grpc_core::Message& send_message) { + RecordAnnotation(absl::StrFormat("Send message: %ld bytes", + send_message.payload()->Length())); ++sent_message_count_; } void PythonOpenCensusServerCallTracer::RecordSendCompressedMessage( - const grpc_core::SliceBuffer& send_compressed_message) { - RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes", - send_compressed_message.Length())); + const grpc_core::Message& send_compressed_message) { + RecordAnnotation( + absl::StrFormat("Send compressed message: %ld bytes", + send_compressed_message.payload()->Length())); } void PythonOpenCensusServerCallTracer::RecordReceivedMessage( - const grpc_core::SliceBuffer& recv_message, bool /*compressed*/) { - RecordAnnotation( - absl::StrFormat("Received message: %ld bytes", recv_message.Length())); + const grpc_core::Message& recv_message) { + RecordAnnotation(absl::StrFormat("Received message: %ld bytes", + recv_message.payload()->Length())); ++recv_message_count_; } void PythonOpenCensusServerCallTracer::RecordReceivedDecompressedMessage( - const grpc_core::SliceBuffer& recv_decompressed_message) { - RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes", - recv_decompressed_message.Length())); + const grpc_core::Message& recv_decompressed_message) { + RecordAnnotation( + absl::StrFormat("Received decompressed message: %ld bytes", + recv_decompressed_message.payload()->Length())); } void PythonOpenCensusServerCallTracer::RecordCancel( diff --git a/src/python/grpcio_observability/grpc_observability/server_call_tracer.h b/src/python/grpcio_observability/grpc_observability/server_call_tracer.h index d20f4910ee5d8..15b2128d457cc 100644 --- a/src/python/grpcio_observability/grpc_observability/server_call_tracer.h +++ b/src/python/grpcio_observability/grpc_observability/server_call_tracer.h @@ -82,19 +82,18 @@ class PythonOpenCensusServerCallTracer : public grpc_core::ServerCallTracer { void RecordSendTrailingMetadata( grpc_metadata_batch* send_trailing_metadata) override; - void RecordSendMessage(const grpc_core::SliceBuffer& send_message) override; + void RecordSendMessage(const grpc_core::Message& send_message) override; void RecordSendCompressedMessage( - const grpc_core::SliceBuffer& send_compressed_message) override; + const grpc_core::Message& send_compressed_message) override; void RecordReceivedInitialMetadata( grpc_metadata_batch* recv_initial_metadata) override; - void RecordReceivedMessage(const grpc_core::SliceBuffer& recv_message, - bool /*compressed*/) override; + void RecordReceivedMessage(const grpc_core::Message& recv_message) override; void RecordReceivedDecompressedMessage( - const grpc_core::SliceBuffer& recv_decompressed_message) override; + const grpc_core::Message& recv_decompressed_message) override; void RecordReceivedTrailingMetadata( grpc_metadata_batch* /*recv_trailing_metadata*/) override {} diff --git a/test/core/end2end/tests/http2_stats.cc b/test/core/end2end/tests/http2_stats.cc index 4883dd9fad7b3..2be4102eed938 100644 --- a/test/core/end2end/tests/http2_stats.cc +++ b/test/core/end2end/tests/http2_stats.cc @@ -73,15 +73,14 @@ class FakeCallTracer : public ClientCallTracer { grpc_metadata_batch* /*send_initial_metadata*/) override {} void RecordSendTrailingMetadata( grpc_metadata_batch* /*send_trailing_metadata*/) override {} - void RecordSendMessage(const SliceBuffer& /*send_message*/) override {} + void RecordSendMessage(const Message& /*send_message*/) override {} void RecordSendCompressedMessage( - const SliceBuffer& /*send_compressed_message*/) override {} + const Message& /*send_compressed_message*/) override {} void RecordReceivedInitialMetadata( grpc_metadata_batch* /*recv_initial_metadata*/) override {} - void RecordReceivedMessage(const SliceBuffer& /*recv_message*/, - bool /*compressed*/) override {} + void RecordReceivedMessage(const Message& /*recv_message*/) override {} void RecordReceivedDecompressedMessage( - const SliceBuffer& /*recv_decompressed_message*/) override {} + const Message& /*recv_decompressed_message*/) override {} void RecordReceivedTrailingMetadata( absl::Status /*status*/, @@ -174,15 +173,14 @@ class FakeServerCallTracer : public ServerCallTracer { grpc_metadata_batch* /*send_initial_metadata*/) override {} void RecordSendTrailingMetadata( grpc_metadata_batch* /*send_trailing_metadata*/) override {} - void RecordSendMessage(const SliceBuffer& /*send_message*/) override {} + void RecordSendMessage(const Message& /*send_message*/) override {} void RecordSendCompressedMessage( - const SliceBuffer& /*send_compressed_message*/) override {} + const Message& /*send_compressed_message*/) override {} void RecordReceivedInitialMetadata( grpc_metadata_batch* /*recv_initial_metadata*/) override {} - void RecordReceivedMessage(const SliceBuffer& /*recv_message*/, - bool /*compressed*/) override {} + void RecordReceivedMessage(const Message& /*recv_message*/) override {} void RecordReceivedDecompressedMessage( - const SliceBuffer& /*recv_decompressed_message*/) override {} + const Message& /*recv_decompressed_message*/) override {} void RecordCancel(grpc_error_handle /*cancel_error*/) override {} std::shared_ptr StartNewTcpTrace() override { return nullptr; diff --git a/test/core/test_util/fake_stats_plugin.h b/test/core/test_util/fake_stats_plugin.h index 3c899d0e0e0e4..afa69cffb9aa5 100644 --- a/test/core/test_util/fake_stats_plugin.h +++ b/test/core/test_util/fake_stats_plugin.h @@ -71,15 +71,14 @@ class FakeClientCallTracer : public ClientCallTracer { grpc_metadata_batch* /*send_initial_metadata*/) override {} void RecordSendTrailingMetadata( grpc_metadata_batch* /*send_trailing_metadata*/) override {} - void RecordSendMessage(const SliceBuffer& /*send_message*/) override {} + void RecordSendMessage(const Message& /*send_message*/) override {} void RecordSendCompressedMessage( - const SliceBuffer& /*send_compressed_message*/) override {} + const Message& /*send_compressed_message*/) override {} void RecordReceivedInitialMetadata( grpc_metadata_batch* /*recv_initial_metadata*/) override {} - void RecordReceivedMessage(const SliceBuffer& /*recv_message*/, - bool /*compressed*/) override {} + void RecordReceivedMessage(const Message& /*recv_message*/) override {} void RecordReceivedDecompressedMessage( - const SliceBuffer& /*recv_decompressed_message*/) override {} + const Message& /*recv_decompressed_message*/) override {} void RecordCancel(grpc_error_handle /*cancel_error*/) override {} void RecordReceivedTrailingMetadata( absl::Status /*status*/, @@ -172,15 +171,14 @@ class FakeServerCallTracer : public ServerCallTracer { grpc_metadata_batch* /*send_initial_metadata*/) override {} void RecordSendTrailingMetadata( grpc_metadata_batch* /*send_trailing_metadata*/) override {} - void RecordSendMessage(const SliceBuffer& /*send_message*/) override {} + void RecordSendMessage(const Message& /*send_message*/) override {} void RecordSendCompressedMessage( - const SliceBuffer& /*send_compressed_message*/) override {} + const Message& /*send_compressed_message*/) override {} void RecordReceivedInitialMetadata( grpc_metadata_batch* /*recv_initial_metadata*/) override {} - void RecordReceivedMessage(const SliceBuffer& /*recv_message*/, - bool /*compressed*/) override {} + void RecordReceivedMessage(const Message& /*recv_message*/) override {} void RecordReceivedDecompressedMessage( - const SliceBuffer& /*recv_decompressed_message*/) override {} + const Message& /*recv_decompressed_message*/) override {} void RecordCancel(grpc_error_handle /*cancel_error*/) override {} void RecordReceivedTrailingMetadata( grpc_metadata_batch* /*recv_trailing_metadata*/) override {} diff --git a/test/core/transport/chttp2/hpack_encoder_test.cc b/test/core/transport/chttp2/hpack_encoder_test.cc index 86b5bcbbe0381..89f916e6c5d4a 100644 --- a/test/core/transport/chttp2/hpack_encoder_test.cc +++ b/test/core/transport/chttp2/hpack_encoder_test.cc @@ -158,15 +158,14 @@ class FakeCallTracer final : public CallTracerInterface { grpc_metadata_batch* send_initial_metadata) override {} void RecordSendTrailingMetadata( grpc_metadata_batch* send_trailing_metadata) override {} - void RecordSendMessage(const SliceBuffer& send_message) override {} + void RecordSendMessage(const Message& send_message) override {} void RecordSendCompressedMessage( - const SliceBuffer& send_compressed_message) override {} + const Message& send_compressed_message) override {} void RecordReceivedInitialMetadata( grpc_metadata_batch* recv_initial_metadata) override {} - void RecordReceivedMessage(const SliceBuffer& recv_message, - bool /*compressed*/) override {} + void RecordReceivedMessage(const Message& recv_message) override {} void RecordReceivedDecompressedMessage( - const SliceBuffer& recv_decompressed_message) override {} + const Message& recv_decompressed_message) override {} void RecordCancel(grpc_error_handle cancel_error) override {} std::shared_ptr StartNewTcpTrace() override { return nullptr; diff --git a/test/cpp/microbenchmarks/bm_chttp2_hpack.cc b/test/cpp/microbenchmarks/bm_chttp2_hpack.cc index 74a8d2cc52491..dfe50ed4c8d07 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_hpack.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_hpack.cc @@ -63,15 +63,14 @@ class FakeCallTracer final : public CallTracerInterface { grpc_metadata_batch* send_initial_metadata) override {} void RecordSendTrailingMetadata( grpc_metadata_batch* send_trailing_metadata) override {} - void RecordSendMessage(const SliceBuffer& send_message) override {} + void RecordSendMessage(const Message& send_message) override {} void RecordSendCompressedMessage( - const SliceBuffer& send_compressed_message) override {} + const Message& send_compressed_message) override {} void RecordReceivedInitialMetadata( grpc_metadata_batch* recv_initial_metadata) override {} - void RecordReceivedMessage(const SliceBuffer& recv_message, - bool /*compressed*/) override {} + void RecordReceivedMessage(const Message& recv_message) override {} void RecordReceivedDecompressedMessage( - const SliceBuffer& recv_decompressed_message) override {} + const Message& recv_decompressed_message) override {} void RecordCancel(grpc_error_handle cancel_error) override {} std::shared_ptr StartNewTcpTrace() override { return nullptr;