Skip to content

Commit

Permalink
[OTel] Experimental API for metrics (grpc#35348)
Browse files Browse the repository at this point in the history
Provide a public experimental API and bazel compatible build target for OpenTelemetry metrics.

Details -
* New `OpenTelemetryPluginBuilder` class that provides the API specified in https://github.com/grpc/proposal/blob/master/A66-otel-stats.md
* The existing `grpc::internal::OpenTelemetryPluginBuilder` class is moved to `grpc::internal::OpenTelemetryPluginBuilderImpl` for disambiguation.
* Renamed `OTel` in some instances to `OpenTelemetry` for consistency.

Closes grpc#35348

COPYBARA_INTEGRATE_REVIEW=grpc#35348 from yashykt:OTelPublicApi e323288
PiperOrigin-RevId: 594271246
  • Loading branch information
yashykt authored and copybara-github committed Dec 28, 2023
1 parent f703530 commit c12a564
Show file tree
Hide file tree
Showing 18 changed files with 473 additions and 264 deletions.
12 changes: 12 additions & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2404,6 +2404,18 @@ grpc_cc_library(
],
)

# This is an EXPERIMENTAL target subject to change.
grpc_cc_library(
name = "grpcpp_otel_plugin",
hdrs = [
"include/grpcpp/ext/otel_plugin.h",
],
language = "c++",
deps = [
"//src/cpp/ext/otel:otel_plugin",
],
)

grpc_cc_library(
name = "work_serializer",
srcs = [
Expand Down
11 changes: 8 additions & 3 deletions include/grpcpp/ext/csm_observability.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@
#include "absl/strings/string_view.h"
#include "opentelemetry/sdk/metrics/meter_provider.h"

#include "src/cpp/ext/otel/otel_plugin.h"

namespace grpc {

namespace internal {
class OpenTelemetryPluginBuilderImpl;
} // namespace internal

namespace experimental {

// This is a no-op at present, but in the future, this object would be useful
Expand All @@ -41,6 +44,8 @@ class CsmObservability {};
// for a binary running on CSM.
class CsmObservabilityBuilder {
public:
CsmObservabilityBuilder();
~CsmObservabilityBuilder();
CsmObservabilityBuilder& SetMeterProvider(
std::shared_ptr<opentelemetry::sdk::metrics::MeterProvider>
meter_provider);
Expand Down Expand Up @@ -80,7 +85,7 @@ class CsmObservabilityBuilder {
absl::StatusOr<CsmObservability> BuildAndRegister();

private:
internal::OpenTelemetryPluginBuilder builder_;
std::unique_ptr<grpc::internal::OpenTelemetryPluginBuilderImpl> builder_;
};

} // namespace experimental
Expand Down
108 changes: 108 additions & 0 deletions include/grpcpp/ext/otel_plugin.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
//
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//

#ifndef GRPCPP_EXT_OTEL_PLUGIN_H
#define GRPCPP_EXT_OTEL_PLUGIN_H

#include <grpc/support/port_platform.h>

#include <stddef.h>
#include <stdint.h>

#include <memory>

#include "absl/functional/any_invocable.h"
#include "absl/strings/string_view.h"
#include "opentelemetry/metrics/meter_provider.h"

namespace grpc {

namespace internal {
class OpenTelemetryPluginBuilderImpl;
} // namespace internal

namespace experimental {

/// The most common way to use this API is -
///
/// OpenTelemetryPluginBuilder().SetMeterProvider(provider).BuildAndRegister();
///
/// The set of instruments available are -
/// grpc.client.attempt.started
/// grpc.client.attempt.duration
/// grpc.client.attempt.sent_total_compressed_message_size
/// grpc.client.attempt.rcvd_total_compressed_message_size
/// grpc.server.call.started
/// grpc.server.call.duration
/// grpc.server.call.sent_total_compressed_message_size
/// grpc.server.call.rcvd_total_compressed_message_size
class OpenTelemetryPluginBuilder {
public:
/// Metrics
static constexpr absl::string_view kClientAttemptStartedInstrumentName =
"grpc.client.attempt.started";
static constexpr absl::string_view kClientAttemptDurationInstrumentName =
"grpc.client.attempt.duration";
static constexpr absl::string_view
kClientAttemptSentTotalCompressedMessageSizeInstrumentName =
"grpc.client.attempt.sent_total_compressed_message_size";
static constexpr absl::string_view
kClientAttemptRcvdTotalCompressedMessageSizeInstrumentName =
"grpc.client.attempt.rcvd_total_compressed_message_size";
static constexpr absl::string_view kServerCallStartedInstrumentName =
"grpc.server.call.started";
static constexpr absl::string_view kServerCallDurationInstrumentName =
"grpc.server.call.duration";
static constexpr absl::string_view
kServerCallSentTotalCompressedMessageSizeInstrumentName =
"grpc.server.call.sent_total_compressed_message_size";
static constexpr absl::string_view
kServerCallRcvdTotalCompressedMessageSizeInstrumentName =
"grpc.server.call.rcvd_total_compressed_message_size";

OpenTelemetryPluginBuilder();
/// If `SetMeterProvider()` is not called, no metrics are collected.
OpenTelemetryPluginBuilder& SetMeterProvider(
std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider);
/// If set, \a target_attribute_filter is called per channel to decide whether
/// to record the target attribute on client or to replace it with "other".
/// This helps reduce the cardinality on metrics in cases where many channels
/// are created with different targets in the same binary (which might happen
/// for example, if the channel target string uses IP addresses directly).
OpenTelemetryPluginBuilder& SetTargetAttributeFilter(
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_attribute_filter);
/// If set, \a generic_method_attribute_filter is called per call with a
/// generic method type to decide whether to record the method name or to
/// replace it with "other". Non-generic or pre-registered methods remain
/// unaffected. If not set, by default, generic method names are replaced with
/// "other" when recording metrics.
OpenTelemetryPluginBuilder& SetGenericMethodAttributeFilter(
absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
generic_method_attribute_filter);
/// Registers a global plugin that acts on all channels and servers running on
/// the process.
void BuildAndRegisterGlobal();

private:
std::unique_ptr<internal::OpenTelemetryPluginBuilderImpl> impl_;
};
} // namespace experimental
} // namespace grpc

#endif // GRPCPP_EXT_OTEL_PLUGIN_H
20 changes: 13 additions & 7 deletions src/cpp/ext/csm/csm_observability.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,40 +48,46 @@ namespace experimental {
// CsmObservabilityBuilder
//

CsmObservabilityBuilder::CsmObservabilityBuilder()
: builder_(
std::make_unique<grpc::internal::OpenTelemetryPluginBuilderImpl>()) {}

CsmObservabilityBuilder::~CsmObservabilityBuilder() = default;

CsmObservabilityBuilder& CsmObservabilityBuilder::SetMeterProvider(
std::shared_ptr<opentelemetry::sdk::metrics::MeterProvider>
meter_provider) {
builder_.SetMeterProvider(meter_provider);
builder_->SetMeterProvider(meter_provider);
return *this;
}

CsmObservabilityBuilder& CsmObservabilityBuilder::SetTargetAttributeFilter(
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_attribute_filter) {
builder_.SetTargetAttributeFilter(std::move(target_attribute_filter));
builder_->SetTargetAttributeFilter(std::move(target_attribute_filter));
return *this;
}

CsmObservabilityBuilder&
CsmObservabilityBuilder::SetGenericMethodAttributeFilter(
absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
generic_method_attribute_filter) {
builder_.SetGenericMethodAttributeFilter(
builder_->SetGenericMethodAttributeFilter(
std::move(generic_method_attribute_filter));
return *this;
}

absl::StatusOr<CsmObservability> CsmObservabilityBuilder::BuildAndRegister() {
builder_.SetServerSelector([](const grpc_core::ChannelArgs& args) {
builder_->SetServerSelector([](const grpc_core::ChannelArgs& args) {
return args.GetBool(GRPC_ARG_XDS_ENABLED_SERVER).value_or(false);
});
builder_.SetTargetSelector(internal::CsmChannelTargetSelector);
builder_.SetLabelsInjector(
builder_->SetTargetSelector(internal::CsmChannelTargetSelector);
builder_->SetLabelsInjector(
std::make_unique<internal::ServiceMeshLabelsInjector>(
google::cloud::otel::MakeResourceDetector()
->Detect()
.GetAttributes()));
builder_.BuildAndRegisterGlobal();
builder_->BuildAndRegisterGlobal();
return CsmObservability();
}

Expand Down
1 change: 1 addition & 0 deletions src/cpp/ext/otel/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ grpc_cc_library(
"otel_client_filter.h",
"otel_plugin.h",
"otel_server_call_tracer.h",
"//:include/grpcpp/ext/otel_plugin.h",
],
external_deps = [
"absl/base:core_headers",
Expand Down
10 changes: 5 additions & 5 deletions src/cpp/ext/otel/key_value_iterable.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
namespace grpc {
namespace internal {

inline opentelemetry::nostd::string_view AbslStrViewToOTelStrView(
inline opentelemetry::nostd::string_view AbslStrViewToOpenTelemetryStrView(
absl::string_view str) {
return opentelemetry::nostd::string_view(str.data(), str.size());
}
Expand All @@ -62,15 +62,15 @@ class KeyValueIterable : public opentelemetry::common::KeyValueIterable {
if (injected_labels_iterable_ != nullptr) {
injected_labels_iterable_->ResetIteratorPosition();
while (const auto& pair = injected_labels_iterable_->Next()) {
if (!callback(AbslStrViewToOTelStrView(pair->first),
AbslStrViewToOTelStrView(pair->second))) {
if (!callback(AbslStrViewToOpenTelemetryStrView(pair->first),
AbslStrViewToOpenTelemetryStrView(pair->second))) {
return false;
}
}
}
for (const auto& pair : additional_labels_) {
if (!callback(AbslStrViewToOTelStrView(pair.first),
AbslStrViewToOTelStrView(pair.second))) {
if (!callback(AbslStrViewToOpenTelemetryStrView(pair.first),
AbslStrViewToOpenTelemetryStrView(pair.second))) {
return false;
}
}
Expand Down
74 changes: 39 additions & 35 deletions src/cpp/ext/otel/otel_client_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ absl::StatusOr<OpenTelemetryClientFilter> OpenTelemetryClientFilter::Create(
std::string target = args.GetOwnedString(GRPC_ARG_SERVER_URI).value_or("");
// Use the original target string only if a filter on the attribute is not
// registered or if the filter returns true, otherwise use "other".
if (OTelPluginState().target_attribute_filter == nullptr ||
OTelPluginState().target_attribute_filter(target)) {
if (OpenTelemetryPluginState().target_attribute_filter == nullptr ||
OpenTelemetryPluginState().target_attribute_filter(target)) {
return OpenTelemetryClientFilter(std::move(target));
}
return OpenTelemetryClientFilter("other");
Expand Down Expand Up @@ -116,31 +116,32 @@ OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
: parent_(parent),
arena_allocated_(arena_allocated),
start_time_(absl::Now()) {
if (OTelPluginState().client.attempt.started != nullptr) {
if (OpenTelemetryPluginState().client.attempt.started != nullptr) {
std::array<std::pair<absl::string_view, absl::string_view>, 2>
additional_labels = {{{OTelMethodKey(), parent_->MethodForStats()},
{OTelTargetKey(), parent_->parent_->target()}}};
additional_labels = {
{{OpenTelemetryMethodKey(), parent_->MethodForStats()},
{OpenTelemetryTargetKey(), parent_->parent_->target()}}};
// We might not have all the injected labels that we want at this point, so
// avoid recording a subset of injected labels here.
OTelPluginState().client.attempt.started->Add(
OpenTelemetryPluginState().client.attempt.started->Add(
1, KeyValueIterable(/*injected_labels_iterable=*/nullptr,
additional_labels));
}
}

void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
RecordReceivedInitialMetadata(grpc_metadata_batch* recv_initial_metadata) {
if (OTelPluginState().labels_injector != nullptr) {
injected_labels_ =
OTelPluginState().labels_injector->GetLabels(recv_initial_metadata);
if (OpenTelemetryPluginState().labels_injector != nullptr) {
injected_labels_ = OpenTelemetryPluginState().labels_injector->GetLabels(
recv_initial_metadata);
}
}

void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata) {
if (OTelPluginState().labels_injector != nullptr) {
OTelPluginState().labels_injector->AddLabels(send_initial_metadata,
nullptr);
if (OpenTelemetryPluginState().labels_injector != nullptr) {
OpenTelemetryPluginState().labels_injector->AddLabels(send_initial_metadata,
nullptr);
}
}

Expand Down Expand Up @@ -175,32 +176,35 @@ void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
absl::Status status, grpc_metadata_batch* /*recv_trailing_metadata*/,
const grpc_transport_stream_stats* transport_stream_stats) {
std::array<std::pair<absl::string_view, absl::string_view>, 3>
additional_labels = {{{OTelMethodKey(), parent_->MethodForStats()},
{OTelTargetKey(), parent_->parent_->target()},
{OTelStatusKey(), grpc_status_code_to_string(
static_cast<grpc_status_code>(
status.code()))}}};
additional_labels = {
{{OpenTelemetryMethodKey(), parent_->MethodForStats()},
{OpenTelemetryTargetKey(), parent_->parent_->target()},
{OpenTelemetryStatusKey(),
grpc_status_code_to_string(
static_cast<grpc_status_code>(status.code()))}}};
KeyValueIterable labels(injected_labels_.get(), additional_labels);
if (OTelPluginState().client.attempt.duration != nullptr) {
OTelPluginState().client.attempt.duration->Record(
if (OpenTelemetryPluginState().client.attempt.duration != nullptr) {
OpenTelemetryPluginState().client.attempt.duration->Record(
absl::ToDoubleSeconds(absl::Now() - start_time_), labels,
opentelemetry::context::Context{});
}
if (OTelPluginState().client.attempt.sent_total_compressed_message_size !=
nullptr) {
OTelPluginState().client.attempt.sent_total_compressed_message_size->Record(
transport_stream_stats != nullptr
? transport_stream_stats->outgoing.data_bytes
: 0,
labels, opentelemetry::context::Context{});
if (OpenTelemetryPluginState()
.client.attempt.sent_total_compressed_message_size != nullptr) {
OpenTelemetryPluginState()
.client.attempt.sent_total_compressed_message_size->Record(
transport_stream_stats != nullptr
? transport_stream_stats->outgoing.data_bytes
: 0,
labels, opentelemetry::context::Context{});
}
if (OTelPluginState().client.attempt.rcvd_total_compressed_message_size !=
nullptr) {
OTelPluginState().client.attempt.rcvd_total_compressed_message_size->Record(
transport_stream_stats != nullptr
? transport_stream_stats->incoming.data_bytes
: 0,
labels, opentelemetry::context::Context{});
if (OpenTelemetryPluginState()
.client.attempt.rcvd_total_compressed_message_size != nullptr) {
OpenTelemetryPluginState()
.client.attempt.rcvd_total_compressed_message_size->Record(
transport_stream_stats != nullptr
? transport_stream_stats->incoming.data_bytes
: 0,
labels, opentelemetry::context::Context{});
}
}

Expand Down Expand Up @@ -273,8 +277,8 @@ OpenTelemetryCallTracer::StartNewAttempt(bool is_transparent_retry) {
absl::string_view OpenTelemetryCallTracer::MethodForStats() const {
absl::string_view method = absl::StripPrefix(path_.as_string_view(), "/");
if (registered_method_ ||
(OTelPluginState().generic_method_attribute_filter != nullptr &&
OTelPluginState().generic_method_attribute_filter(method))) {
(OpenTelemetryPluginState().generic_method_attribute_filter != nullptr &&
OpenTelemetryPluginState().generic_method_attribute_filter(method))) {
return method;
}
return "other";
Expand Down
Loading

0 comments on commit c12a564

Please sign in to comment.