Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use native Rust support for async traits in LogExporter::export() method (11% improvement) #2374

Merged
merged 34 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
d454536
initial commit
lalitb Dec 1, 2024
3ed482d
futher changes..
lalitb Dec 1, 2024
a2aa648
changes..
lalitb Dec 2, 2024
8ba1173
initial change
lalitb Dec 2, 2024
37823e7
stdout exporter
lalitb Dec 2, 2024
3b4cd57
update stress
lalitb Dec 2, 2024
858cd87
Merge branch 'main' into log-async-trait-impl
lalitb Dec 3, 2024
e6472bf
Merge branch 'main' into log-async-trait-impl
lalitb Dec 6, 2024
8cacf52
fix otlp
lalitb Dec 6, 2024
dfac978
Merge branch 'main' into log-async-trait-impl
lalitb Dec 13, 2024
f752d50
add comment
lalitb Dec 13, 2024
6ac8aa0
Merge branch 'main' into log-async-trait-impl
cijothomas Dec 13, 2024
4924e3b
Merge branch 'main' into log-async-trait-impl
TommyCpp Dec 13, 2024
8db475c
review comment
lalitb Dec 15, 2024
4a6e36f
Merge branch 'main' into log-async-trait-impl
lalitb Dec 15, 2024
3af54c3
Merge branch 'main' into log-async-trait-impl
lalitb Dec 15, 2024
8c34380
Merge branch 'main' into log-async-trait-impl
lalitb Dec 16, 2024
87c6c9d
Merge branch 'main' into log-async-trait-impl
cijothomas Dec 16, 2024
be7ecec
Merge branch 'main' into log-async-trait-impl
lalitb Dec 17, 2024
5c8c644
Merge branch 'main' into log-async-trait-impl
lalitb Dec 19, 2024
d1f3cbb
resolve conflicts
lalitb Dec 19, 2024
fa11cd7
initial commit
lalitb Dec 19, 2024
67f006a
further conflicts
lalitb Dec 19, 2024
85f653c
remove unwantd comments
lalitb Dec 19, 2024
33b7572
Merge branch 'main' into log-async-trait-impl
cijothomas Dec 19, 2024
0d5a17c
merge conflict
lalitb Dec 19, 2024
e8333f5
Merge branch 'main' into log-async-trait-impl
lalitb Dec 20, 2024
4e4a189
Remove async_trait import and attribute
lalitb Dec 20, 2024
7627224
remove unused crate
lalitb Dec 20, 2024
09f7cb8
Merge branch 'main' into log-async-trait-impl
lalitb Dec 20, 2024
ddb15bf
update export to take batch by value
lalitb Dec 20, 2024
5e025d2
Merge branch 'log-async-trait-impl' of github.com:lalitb/opentelemetr…
lalitb Dec 20, 2024
158e922
remove temporary batch variable
lalitb Dec 20, 2024
36ea18a
keep diff minimal
lalitb Dec 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions opentelemetry-appender-tracing/benches/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@ struct NoopExporter {

#[async_trait]
impl LogExporter for NoopExporter {
async fn export(&self, _: LogBatch<'_>) -> LogResult<()> {
LogResult::Ok(())
#[allow(clippy::manual_async_fn)]
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
fn export<'a>(
&'a self,
_batch: &'a LogBatch<'a>,
) -> impl std::future::Future<Output = LogResult<()>> + Send + 'a {
async { LogResult::Ok(()) }
}

fn event_enabled(&self, _: opentelemetry::logs::Severity, _: &str, _: &str) -> bool {
Expand All @@ -44,17 +48,17 @@ impl LogExporter for NoopExporter {
}

#[derive(Debug)]
struct NoopProcessor {
exporter: Box<dyn LogExporter>,
struct NoopProcessor<E: LogExporter> {
exporter: E,
}

impl NoopProcessor {
fn new(exporter: Box<dyn LogExporter>) -> Self {
impl<E: LogExporter> NoopProcessor<E> {
fn new(exporter: E) -> Self {
Self { exporter }
}
}

impl LogProcessor for NoopProcessor {
impl<E: LogExporter> LogProcessor for NoopProcessor<E> {
fn emit(&self, _: &mut LogRecord, _: &InstrumentationScope) {
// no-op
}
Expand Down Expand Up @@ -124,7 +128,7 @@ fn benchmark_no_subscriber(c: &mut Criterion) {

fn benchmark_with_ot_layer(c: &mut Criterion, enabled: bool, bench_name: &str) {
let exporter = NoopExporter { enabled };
let processor = NoopProcessor::new(Box::new(exporter));
let processor = NoopProcessor::new(exporter);
let provider = LoggerProvider::builder()
.with_resource(
Resource::builder_empty()
Expand Down
16 changes: 11 additions & 5 deletions opentelemetry-appender-tracing/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,17 @@

#[async_trait]
lalitb marked this conversation as resolved.
Show resolved Hide resolved
impl LogExporter for ReentrantLogExporter {
async fn export(&self, _batch: LogBatch<'_>) -> LogResult<()> {
// This will cause a deadlock as the export itself creates a log
// while still within the lock of the SimpleLogProcessor.
warn!(name: "my-event-name", target: "reentrant", event_id = 20, user_name = "otel", user_email = "[email protected]");
Ok(())
#[allow(clippy::manual_async_fn)]
fn export<'a>(
&'a self,
_batch: &'a LogBatch<'a>,
) -> impl std::future::Future<Output = LogResult<()>> + Send + 'a {
async {
// This will cause a deadlock as the export itself creates a log
// while still within the lock of the SimpleLogProcessor.
warn!(name: "my-event-name", target: "reentrant", event_id = 20, user_name = "otel", user_email = "[email protected]");
Ok(())
}

Check warning on line 260 in opentelemetry-appender-tracing/src/layer.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-appender-tracing/src/layer.rs#L251-L260

Added lines #L251 - L260 were not covered by tests
}
}

Expand Down
76 changes: 41 additions & 35 deletions opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,42 +9,48 @@

#[async_trait]
impl LogExporter for OtlpHttpClient {
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {
let client = self
.client
.lock()
.map_err(|e| LogError::Other(e.to_string().into()))
.and_then(|g| match &*g {
Some(client) => Ok(Arc::clone(client)),
_ => Err(LogError::Other("exporter is already shut down".into())),
})?;

let (body, content_type) = { self.build_logs_export_body(batch)? };
let mut request = http::Request::builder()
.method(Method::POST)
.uri(&self.collector_endpoint)
.header(CONTENT_TYPE, content_type)
.body(body)
.map_err(|e| crate::Error::RequestFailed(Box::new(e)))?;

for (k, v) in &self.headers {
request.headers_mut().insert(k.clone(), v.clone());
#[allow(clippy::manual_async_fn)]
fn export<'a>(
&'a self,
batch: &'a LogBatch<'a>,
) -> impl std::future::Future<Output = LogResult<()>> + Send + 'a {
async move {
let client = self
.client
.lock()
.map_err(|e| LogError::Other(e.to_string().into()))
.and_then(|g| match &*g {
Some(client) => Ok(Arc::clone(client)),
_ => Err(LogError::Other("exporter is already shut down".into())),
})?;

Check warning on line 25 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L13-L25

Added lines #L13 - L25 were not covered by tests

let (body, content_type) = { self.build_logs_export_body(batch)? };
let mut request = http::Request::builder()
.method(Method::POST)
.uri(&self.collector_endpoint)
.header(CONTENT_TYPE, content_type)
.body(body)
.map_err(|e| crate::Error::RequestFailed(Box::new(e)))?;

Check warning on line 33 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L27-L33

Added lines #L27 - L33 were not covered by tests

for (k, v) in &self.headers {
request.headers_mut().insert(k.clone(), v.clone());
}

Check warning on line 37 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L35-L37

Added lines #L35 - L37 were not covered by tests

let request_uri = request.uri().to_string();
let response = client.send(request).await?;

Check warning on line 40 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L39-L40

Added lines #L39 - L40 were not covered by tests

if !response.status().is_success() {
let error = format!(
"OpenTelemetry logs export failed. Url: {}, Status Code: {}, Response: {:?}",
response.status().as_u16(),
request_uri,
response.body()
);
return Err(LogError::Other(error.into()));
}

Ok(())

Check warning on line 52 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L42-L52

Added lines #L42 - L52 were not covered by tests
}

let request_uri = request.uri().to_string();
let response = client.send(request).await?;

if !response.status().is_success() {
let error = format!(
"OpenTelemetry logs export failed. Url: {}, Status Code: {}, Response: {:?}",
response.status().as_u16(),
request_uri,
response.body()
);
return Err(LogError::Other(error.into()));
}

Ok(())
}

fn shutdown(&mut self) {
Expand Down
8 changes: 4 additions & 4 deletions opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
mod metrics;

#[cfg(feature = "logs")]
mod logs;
pub(crate) mod logs;

#[cfg(feature = "trace")]
mod trace;
Expand Down Expand Up @@ -236,7 +236,7 @@
OTEL_EXPORTER_OTLP_LOGS_HEADERS,
)?;

Ok(crate::LogExporter::new(client))
Ok(crate::LogExporter::from_http(client))

Check warning on line 239 in opentelemetry-otlp/src/exporter/http/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/mod.rs#L239

Added line #L239 was not covered by tests
}

/// Create a metrics exporter with the current configuration
Expand All @@ -262,7 +262,7 @@
}

#[derive(Debug)]
struct OtlpHttpClient {
pub(crate) struct OtlpHttpClient {
client: Mutex<Option<Arc<dyn HttpClient>>>,
collector_endpoint: Uri,
headers: HashMap<HeaderName, HeaderValue>,
Expand Down Expand Up @@ -314,7 +314,7 @@
#[cfg(feature = "logs")]
fn build_logs_export_body(
&self,
logs: LogBatch<'_>,
logs: &LogBatch<'_>,

Check warning on line 317 in opentelemetry-otlp/src/exporter/http/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/mod.rs#L317

Added line #L317 was not covered by tests
) -> opentelemetry_sdk::logs::LogResult<(Vec<u8>, &'static str)> {
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource);
Expand Down
57 changes: 31 additions & 26 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,35 +58,40 @@

#[async_trait]
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
impl LogExporter for TonicLogsClient {
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {
let (mut client, metadata, extensions) = match &self.inner {
Some(inner) => {
let (m, e, _) = inner
.interceptor
.lock()
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
.call(Request::new(()))
.map_err(|e| LogError::Other(Box::new(e)))?
.into_parts();
(inner.client.clone(), m, e)
}
None => return Err(LogError::Other("exporter is already shut down".into())),
};
#[allow(clippy::manual_async_fn)]
fn export<'a>(
&'a self,
batch: &'a LogBatch<'a>,
) -> impl std::future::Future<Output = LogResult<()>> + Send + 'a {
async move {
let (mut client, metadata, extensions) = match &self.inner {
Some(inner) => {
let (m, e, _) = inner
.interceptor
.lock()
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
.call(Request::new(()))
.map_err(|e| LogError::Other(Box::new(e)))?
.into_parts();
(inner.client.clone(), m, e)

Check warning on line 76 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L62-L76

Added lines #L62 - L76 were not covered by tests
}
None => return Err(LogError::Other("exporter is already shut down".into())),

Check warning on line 78 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L78

Added line #L78 was not covered by tests
};

let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource);
let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource);

Check warning on line 81 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L81

Added line #L81 was not covered by tests

otel_debug!(name: "TonicsLogsClient.CallingExport");
otel_debug!(name: "TonicsLogsClient.CallingExport");

Check warning on line 83 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L83

Added line #L83 was not covered by tests

client
.export(Request::from_parts(
metadata,
extensions,
ExportLogsServiceRequest { resource_logs },
))
.await
.map_err(crate::Error::from)?;

Ok(())
client
.export(Request::from_parts(
metadata,
extensions,
ExportLogsServiceRequest { resource_logs },
))
.await
.map_err(crate::Error::from)?;
Ok(())
}

Check warning on line 94 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L85-L94

Added lines #L85 - L94 were not covered by tests
}

fn shutdown(&mut self) {
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/exporter/tonic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
};

#[cfg(feature = "logs")]
mod logs;
pub(crate) mod logs;

#[cfg(feature = "metrics")]
mod metrics;
Expand Down Expand Up @@ -273,7 +273,7 @@

let client = TonicLogsClient::new(channel, interceptor, compression);

Ok(crate::logs::LogExporter::new(client))
Ok(crate::logs::LogExporter::from_tonic(client))

Check warning on line 276 in opentelemetry-otlp/src/exporter/tonic/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/mod.rs#L276

Added line #L276 was not covered by tests
}

/// Build a new tonic metrics exporter
Expand Down
46 changes: 39 additions & 7 deletions opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,16 @@
/// OTLP exporter that sends log data
#[derive(Debug)]
pub struct LogExporter {
client: Box<dyn opentelemetry_sdk::export::logs::LogExporter>,
//client: Box<dyn opentelemetry_sdk::export::logs::LogExporter>,
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
client: LogExporterInner,
}

#[derive(Debug)]
enum LogExporterInner {
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
#[cfg(feature = "grpc-tonic")]
Tonic(crate::exporter::tonic::logs::TonicLogsClient),
#[cfg(any(feature = "http-proto", feature = "http-json"))]
Http(crate::exporter::http::OtlpHttpClient),
}

impl LogExporter {
Expand All @@ -117,21 +126,44 @@
LogExporterBuilder::default()
}

/// Create a new log exporter
pub fn new(client: impl opentelemetry_sdk::export::logs::LogExporter + 'static) -> Self {
#[cfg(any(feature = "http-proto", feature = "http-json"))]
pub(crate) fn from_http(client: crate::exporter::http::OtlpHttpClient) -> Self {

Check warning on line 130 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L130

Added line #L130 was not covered by tests
LogExporter {
client: Box::new(client),
client: LogExporterInner::Http(client),
}
}

Check warning on line 134 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L132-L134

Added lines #L132 - L134 were not covered by tests

#[cfg(feature = "grpc-tonic")]
pub(crate) fn from_tonic(client: crate::exporter::tonic::logs::TonicLogsClient) -> Self {
LogExporter {
client: LogExporterInner::Tonic(client),

Check warning on line 139 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L137-L139

Added lines #L137 - L139 were not covered by tests
}
}
}

#[async_trait]
impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {
self.client.export(batch).await
#[allow(clippy::manual_async_fn)]
fn export<'a>(
&'a self,
batch: &'a LogBatch<'a>,
) -> impl std::future::Future<Output = LogResult<()>> + Send + 'a {
async move {
match &self.client {

Check warning on line 152 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L147-L152

Added lines #L147 - L152 were not covered by tests
#[cfg(feature = "grpc-tonic")]
LogExporterInner::Tonic(client) => client.export(batch).await,

Check warning on line 154 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L154

Added line #L154 was not covered by tests
#[cfg(any(feature = "http-proto", feature = "http-json"))]
LogExporterInner::Http(client) => client.export(batch).await,

Check warning on line 156 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L156

Added line #L156 was not covered by tests
}
}

Check warning on line 158 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L158

Added line #L158 was not covered by tests
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.client.set_resource(resource);
match &mut self.client {

Check warning on line 162 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L162

Added line #L162 was not covered by tests
#[cfg(feature = "grpc-tonic")]
LogExporterInner::Tonic(client) => client.set_resource(resource),

Check warning on line 164 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L164

Added line #L164 was not covered by tests
#[cfg(any(feature = "http-proto", feature = "http-json"))]
LogExporterInner::Http(client) => client.set_resource(resource),

Check warning on line 166 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L166

Added line #L166 was not covered by tests
}
}
}
6 changes: 3 additions & 3 deletions opentelemetry-proto/src/transform/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ pub mod tonic {
}

pub fn group_logs_by_resource_and_scope(
logs: LogBatch<'_>,
logs: &LogBatch<'_>,
resource: &ResourceAttributesWithSchema,
) -> Vec<ResourceLogs> {
// Group logs by target or instrumentation name
Expand Down Expand Up @@ -261,7 +261,7 @@ mod tests {
let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema

let grouped_logs =
crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource);
crate::transform::logs::tonic::group_logs_by_resource_and_scope(&log_batch, &resource);

assert_eq!(grouped_logs.len(), 1);
let resource_logs = &grouped_logs[0];
Expand All @@ -281,7 +281,7 @@ mod tests {
let log_batch = LogBatch::new(&logs);
let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema
let grouped_logs =
crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource);
crate::transform::logs::tonic::group_logs_by_resource_and_scope(&log_batch, &resource);

assert_eq!(grouped_logs.len(), 1);
let resource_logs = &grouped_logs[0];
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pprof = { version = "0.13", features = ["flamegraph", "criterion"] }
default = ["trace", "metrics", "logs", "internal-logs"]
trace = ["opentelemetry/trace", "rand", "percent-encoding"]
jaeger_remote_sampler = ["trace", "opentelemetry-http", "http", "serde", "serde_json", "url"]
logs = ["opentelemetry/logs", "async-trait", "serde_json"]
logs = ["opentelemetry/logs", "serde_json"]
spec_unstable_logs_enabled = ["logs", "opentelemetry/spec_unstable_logs_enabled"]
metrics = ["opentelemetry/metrics", "glob", "async-trait"]
testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"]
Expand Down
Loading
Loading