Skip to content

Commit

Permalink
Use native Rust support for async traits in LogExporter::export() met…
Browse files Browse the repository at this point in the history
…hod (11% improvement) (#2374)

Co-authored-by: Cijo Thomas <[email protected]>
Co-authored-by: Zhongyang Wu <[email protected]>
  • Loading branch information
3 people authored Dec 20, 2024
1 parent 80629c8 commit 23f6ae2
Show file tree
Hide file tree
Showing 15 changed files with 242 additions and 184 deletions.
1 change: 0 additions & 1 deletion opentelemetry-appender-tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ opentelemetry-stdout = { path = "../opentelemetry-stdout", features = ["logs"] }
opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["logs", "testing"] }
tracing-subscriber = { workspace = true, features = ["registry", "std", "env-filter"] }
tracing-log = "0.2"
async-trait = { workspace = true }
criterion = { workspace = true }
tokio = { workspace = true, features = ["full"]}

Expand Down
22 changes: 12 additions & 10 deletions opentelemetry-appender-tracing/benches/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
| ot_layer_enabled | 196 ns |
*/

use async_trait::async_trait;
use criterion::{criterion_group, criterion_main, Criterion};
use opentelemetry::InstrumentationScope;
use opentelemetry_appender_tracing::layer as tracing_layer;
Expand All @@ -32,10 +31,13 @@ struct NoopExporter {
enabled: bool,
}

#[async_trait]
impl LogExporter for NoopExporter {
async fn export(&self, _: LogBatch<'_>) -> LogResult<()> {
LogResult::Ok(())
#[allow(clippy::manual_async_fn)]
fn export(
&self,
_batch: LogBatch<'_>,
) -> impl std::future::Future<Output = LogResult<()>> + Send {
async { LogResult::Ok(()) }
}

fn event_enabled(&self, _: opentelemetry::logs::Severity, _: &str, _: &str) -> bool {
Expand All @@ -44,17 +46,17 @@ impl LogExporter for NoopExporter {
}

#[derive(Debug)]
struct NoopProcessor {
exporter: Box<dyn LogExporter>,
struct NoopProcessor<E: LogExporter> {
exporter: E,
}

impl NoopProcessor {
fn new(exporter: Box<dyn LogExporter>) -> Self {
impl<E: LogExporter> NoopProcessor<E> {
fn new(exporter: E) -> Self {
Self { exporter }
}
}

impl LogProcessor for NoopProcessor {
impl<E: LogExporter> LogProcessor for NoopProcessor<E> {
fn emit(&self, _: &mut LogRecord, _: &InstrumentationScope) {
// no-op
}
Expand Down Expand Up @@ -124,7 +126,7 @@ fn benchmark_no_subscriber(c: &mut Criterion) {

fn benchmark_with_ot_layer(c: &mut Criterion, enabled: bool, bench_name: &str) {
let exporter = NoopExporter { enabled };
let processor = NoopProcessor::new(Box::new(exporter));
let processor = NoopProcessor::new(exporter);
let provider = LoggerProvider::builder()
.with_resource(
Resource::builder_empty()
Expand Down
18 changes: 11 additions & 7 deletions opentelemetry-appender-tracing/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ const fn severity_of_level(level: &Level) -> Severity {
#[cfg(test)]
mod tests {
use crate::layer;
use async_trait::async_trait;
use opentelemetry::logs::Severity;
use opentelemetry::trace::TracerProvider as _;
use opentelemetry::trace::{TraceContextExt, TraceFlags, Tracer};
Expand Down Expand Up @@ -245,13 +244,18 @@ mod tests {
#[derive(Clone, Debug, Default)]
struct ReentrantLogExporter;

#[async_trait]
impl LogExporter for ReentrantLogExporter {
async fn export(&self, _batch: LogBatch<'_>) -> LogResult<()> {
// This will cause a deadlock as the export itself creates a log
// while still within the lock of the SimpleLogProcessor.
warn!(name: "my-event-name", target: "reentrant", event_id = 20, user_name = "otel", user_email = "[email protected]");
Ok(())
#[allow(clippy::manual_async_fn)]
fn export(
&self,
_batch: LogBatch<'_>,
) -> impl std::future::Future<Output = LogResult<()>> + Send {
async {
// This will cause a deadlock as the export itself creates a log
// while still within the lock of the SimpleLogProcessor.
warn!(name: "my-event-name", target: "reentrant", event_id = 20, user_name = "otel", user_email = "[email protected]");
Ok(())
}
}
}

Expand Down
80 changes: 42 additions & 38 deletions opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
@@ -1,52 +1,56 @@
use std::sync::Arc;

use async_trait::async_trait;
use http::{header::CONTENT_TYPE, Method};
use opentelemetry::otel_debug;
use opentelemetry_sdk::export::logs::{LogBatch, LogExporter};
use opentelemetry_sdk::logs::{LogError, LogResult};

use super::OtlpHttpClient;

#[async_trait]
impl LogExporter for OtlpHttpClient {
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {
let client = self
.client
.lock()
.map_err(|e| LogError::Other(e.to_string().into()))
.and_then(|g| match &*g {
Some(client) => Ok(Arc::clone(client)),
_ => Err(LogError::Other("exporter is already shut down".into())),
})?;

let (body, content_type) = { self.build_logs_export_body(batch)? };
let mut request = http::Request::builder()
.method(Method::POST)
.uri(&self.collector_endpoint)
.header(CONTENT_TYPE, content_type)
.body(body)
.map_err(|e| crate::Error::RequestFailed(Box::new(e)))?;

for (k, v) in &self.headers {
request.headers_mut().insert(k.clone(), v.clone());
#[allow(clippy::manual_async_fn)]
fn export(
&self,
batch: LogBatch<'_>,
) -> impl std::future::Future<Output = LogResult<()>> + Send {
async move {
let client = self
.client
.lock()
.map_err(|e| LogError::Other(e.to_string().into()))
.and_then(|g| match &*g {
Some(client) => Ok(Arc::clone(client)),
_ => Err(LogError::Other("exporter is already shut down".into())),
})?;

let (body, content_type) = { self.build_logs_export_body(batch)? };
let mut request = http::Request::builder()
.method(Method::POST)
.uri(&self.collector_endpoint)
.header(CONTENT_TYPE, content_type)
.body(body)
.map_err(|e| crate::Error::RequestFailed(Box::new(e)))?;

for (k, v) in &self.headers {
request.headers_mut().insert(k.clone(), v.clone());
}

let request_uri = request.uri().to_string();
otel_debug!(name: "HttpLogsClient.CallingExport");
let response = client.send(request).await?;

if !response.status().is_success() {
let error = format!(
"OpenTelemetry logs export failed. Url: {}, Status Code: {}, Response: {:?}",
response.status().as_u16(),
request_uri,
response.body()
);
return Err(LogError::Other(error.into()));
}

Ok(())
}

let request_uri = request.uri().to_string();
otel_debug!(name: "HttpLogsClient.CallingExport");
let response = client.send(request).await?;

if !response.status().is_success() {
let error = format!(
"OpenTelemetry logs export failed. Url: {}, Status Code: {}, Response: {:?}",
response.status().as_u16(),
request_uri,
response.body()
);
return Err(LogError::Other(error.into()));
}

Ok(())
}

fn shutdown(&mut self) {
Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::time::Duration;
mod metrics;

#[cfg(feature = "logs")]
mod logs;
pub(crate) mod logs;

#[cfg(feature = "trace")]
mod trace;
Expand Down Expand Up @@ -239,7 +239,7 @@ impl HttpExporterBuilder {
OTEL_EXPORTER_OTLP_LOGS_HEADERS,
)?;

Ok(crate::LogExporter::new(client))
Ok(crate::LogExporter::from_http(client))
}

/// Create a metrics exporter with the current configuration
Expand All @@ -265,7 +265,7 @@ impl HttpExporterBuilder {
}

#[derive(Debug)]
struct OtlpHttpClient {
pub(crate) struct OtlpHttpClient {
client: Mutex<Option<Arc<dyn HttpClient>>>,
collector_endpoint: Uri,
headers: HashMap<HeaderName, HeaderValue>,
Expand Down
59 changes: 31 additions & 28 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use async_trait::async_trait;
use core::fmt;
use opentelemetry::otel_debug;
use opentelemetry_proto::tonic::collector::logs::v1::{
Expand Down Expand Up @@ -56,37 +55,41 @@ impl TonicLogsClient {
}
}

#[async_trait]
impl LogExporter for TonicLogsClient {
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {
let (mut client, metadata, extensions) = match &self.inner {
Some(inner) => {
let (m, e, _) = inner
.interceptor
.lock()
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
.call(Request::new(()))
.map_err(|e| LogError::Other(Box::new(e)))?
.into_parts();
(inner.client.clone(), m, e)
}
None => return Err(LogError::Other("exporter is already shut down".into())),
};
#[allow(clippy::manual_async_fn)]
fn export(
&self,
batch: LogBatch<'_>,
) -> impl std::future::Future<Output = LogResult<()>> + Send {
async move {
let (mut client, metadata, extensions) = match &self.inner {
Some(inner) => {
let (m, e, _) = inner
.interceptor
.lock()
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
.call(Request::new(()))
.map_err(|e| LogError::Other(Box::new(e)))?
.into_parts();
(inner.client.clone(), m, e)
}
None => return Err(LogError::Other("exporter is already shut down".into())),
};

let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource);
let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource);

otel_debug!(name: "TonicsLogsClient.CallingExport");
otel_debug!(name: "TonicsLogsClient.CallingExport");

client
.export(Request::from_parts(
metadata,
extensions,
ExportLogsServiceRequest { resource_logs },
))
.await
.map_err(crate::Error::from)?;

Ok(())
client
.export(Request::from_parts(
metadata,
extensions,
ExportLogsServiceRequest { resource_logs },
))
.await
.map_err(crate::Error::from)?;
Ok(())
}
}

fn shutdown(&mut self) {
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/exporter/tonic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
};

#[cfg(feature = "logs")]
mod logs;
pub(crate) mod logs;

#[cfg(feature = "metrics")]
mod metrics;
Expand Down Expand Up @@ -273,7 +273,7 @@ impl TonicExporterBuilder {

let client = TonicLogsClient::new(channel, interceptor, compression);

Ok(crate::logs::LogExporter::new(client))
Ok(crate::logs::LogExporter::from_tonic(client))
}

/// Build a new tonic metrics exporter
Expand Down
47 changes: 38 additions & 9 deletions opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
//!
//! Defines a [LogExporter] to send logs via the OpenTelemetry Protocol (OTLP)
use async_trait::async_trait;
use opentelemetry::otel_debug;
use std::fmt::Debug;

Expand Down Expand Up @@ -108,7 +107,15 @@ impl HasHttpConfig for LogExporterBuilder<HttpExporterBuilderSet> {
/// OTLP exporter that sends log data
#[derive(Debug)]
pub struct LogExporter {
client: Box<dyn opentelemetry_sdk::export::logs::LogExporter>,
client: SupportedTransportClient,
}

#[derive(Debug)]
enum SupportedTransportClient {
#[cfg(feature = "grpc-tonic")]
Tonic(crate::exporter::tonic::logs::TonicLogsClient),
#[cfg(any(feature = "http-proto", feature = "http-json"))]
Http(crate::exporter::http::OtlpHttpClient),
}

impl LogExporter {
Expand All @@ -117,21 +124,43 @@ impl LogExporter {
LogExporterBuilder::default()
}

/// Create a new log exporter
pub fn new(client: impl opentelemetry_sdk::export::logs::LogExporter + 'static) -> Self {
#[cfg(any(feature = "http-proto", feature = "http-json"))]
pub(crate) fn from_http(client: crate::exporter::http::OtlpHttpClient) -> Self {
LogExporter {
client: Box::new(client),
client: SupportedTransportClient::Http(client),
}
}

#[cfg(feature = "grpc-tonic")]
pub(crate) fn from_tonic(client: crate::exporter::tonic::logs::TonicLogsClient) -> Self {
LogExporter {
client: SupportedTransportClient::Tonic(client),
}
}
}

#[async_trait]
impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {
self.client.export(batch).await
#[allow(clippy::manual_async_fn)]
fn export(
&self,
batch: LogBatch<'_>,
) -> impl std::future::Future<Output = LogResult<()>> + Send {
async move {
match &self.client {
#[cfg(feature = "grpc-tonic")]
SupportedTransportClient::Tonic(client) => client.export(batch).await,
#[cfg(any(feature = "http-proto", feature = "http-json"))]
SupportedTransportClient::Http(client) => client.export(batch).await,
}
}
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.client.set_resource(resource);
match &mut self.client {
#[cfg(feature = "grpc-tonic")]
SupportedTransportClient::Tonic(client) => client.set_resource(resource),
#[cfg(any(feature = "http-proto", feature = "http-json"))]
SupportedTransportClient::Http(client) => client.set_resource(resource),
}
}
}
2 changes: 1 addition & 1 deletion opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pprof = { version = "0.13", features = ["flamegraph", "criterion"] }
default = ["trace", "metrics", "logs", "internal-logs"]
trace = ["opentelemetry/trace", "rand", "percent-encoding"]
jaeger_remote_sampler = ["trace", "opentelemetry-http", "http", "serde", "serde_json", "url"]
logs = ["opentelemetry/logs", "async-trait", "serde_json"]
logs = ["opentelemetry/logs", "serde_json"]
spec_unstable_logs_enabled = ["logs", "opentelemetry/spec_unstable_logs_enabled"]
metrics = ["opentelemetry/metrics", "glob", "async-trait"]
testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"]
Expand Down
Loading

0 comments on commit 23f6ae2

Please sign in to comment.