Skip to content

Commit

Permalink
Merge branch 'main' into cijothomas/otlp-builder-issue
Browse files Browse the repository at this point in the history
  • Loading branch information
TommyCpp authored May 24, 2024
2 parents 340458f + bded598 commit 191684d
Show file tree
Hide file tree
Showing 11 changed files with 239 additions and 57 deletions.
2 changes: 1 addition & 1 deletion opentelemetry-appender-tracing/benches/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
| noop_layer_disabled | 12 ns |
| noop_layer_enabled | 25 ns |
| ot_layer_disabled | 19 ns |
| ot_layer_enabled | 446 ns |
| ot_layer_enabled | 371 ns |
*/

use async_trait::async_trait;
Expand Down
79 changes: 39 additions & 40 deletions opentelemetry-appender-tracing/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ use tracing_subscriber::Layer;
const INSTRUMENTATION_LIBRARY_NAME: &str = "opentelemetry-appender-tracing";

/// Visitor to record the fields from the event record.
#[derive(Default)]
struct EventVisitor {
log_record_attributes: Vec<(Key, AnyValue)>,
log_record_body: Option<AnyValue>,
struct EventVisitor<'a, LR: LogRecord> {
log_record: &'a mut LR,
}

/// Logs from the log crate have duplicated attributes that we removed here.
Expand All @@ -37,59 +35,61 @@ fn get_filename(filepath: &str) -> &str {
filepath
}

impl EventVisitor {
impl<'a, LR: LogRecord> EventVisitor<'a, LR> {
fn new(log_record: &'a mut LR) -> Self {
EventVisitor { log_record }
}
fn visit_metadata(&mut self, meta: &Metadata) {
self.log_record_attributes
.push(("name".into(), meta.name().into()));
self.log_record
.add_attribute(Key::new("name"), AnyValue::from(meta.name()));

#[cfg(feature = "experimental_metadata_attributes")]
self.visit_experimental_metadata(meta);
}

#[cfg(feature = "experimental_metadata_attributes")]
fn visit_experimental_metadata(&mut self, meta: &Metadata) {
self.log_record_attributes
.push(("log.target".into(), meta.target().to_owned().into()));
self.log_record.add_attribute(
Key::new("log.target"),
AnyValue::from(meta.target().to_owned()),
);

if let Some(module_path) = meta.module_path() {
self.log_record_attributes
.push(("code.namespace".into(), module_path.to_owned().into()));
self.log_record.add_attribute(
Key::new("code.namespace"),
AnyValue::from(module_path.to_owned()),
);
}

if let Some(filepath) = meta.file() {
self.log_record_attributes
.push(("code.filepath".into(), filepath.to_owned().into()));
self.log_record_attributes.push((
"code.filename".into(),
get_filename(filepath).to_owned().into(),
));
self.log_record.add_attribute(
Key::new("code.filepath"),
AnyValue::from(filepath.to_owned()),
);
self.log_record.add_attribute(
Key::new("code.filename"),
AnyValue::from(get_filename(filepath).to_owned()),
);
}

if let Some(line) = meta.line() {
self.log_record_attributes
.push(("code.lineno".into(), line.into()));
}
}

fn push_to_otel_log_record<LR: LogRecord>(self, log_record: &mut LR) {
if let Some(body) = self.log_record_body {
log_record.set_body(body);
self.log_record
.add_attribute(Key::new("code.lineno"), AnyValue::from(line));
}
log_record.add_attributes(self.log_record_attributes);
}
}

impl tracing::field::Visit for EventVisitor {
impl<'a, LR: LogRecord> tracing::field::Visit for EventVisitor<'a, LR> {
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
#[cfg(feature = "experimental_metadata_attributes")]
if is_duplicated_metadata(field.name()) {
return;
}
if field.name() == "message" {
self.log_record_body = Some(format!("{value:?}").into());
self.log_record.set_body(format!("{:?}", value).into());
} else {
self.log_record_attributes
.push((field.name().into(), format!("{value:?}").into()));
self.log_record
.add_attribute(Key::new(field.name()), AnyValue::from(format!("{value:?}")));
}
}

Expand All @@ -98,27 +98,27 @@ impl tracing::field::Visit for EventVisitor {
if is_duplicated_metadata(field.name()) {
return;
}
self.log_record_attributes
.push((field.name().into(), value.to_owned().into()));
self.log_record
.add_attribute(Key::new(field.name()), AnyValue::from(value.to_owned()));
}

fn record_bool(&mut self, field: &tracing_core::Field, value: bool) {
self.log_record_attributes
.push((field.name().into(), value.into()));
self.log_record
.add_attribute(Key::new(field.name()), AnyValue::from(value));
}

fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
self.log_record_attributes
.push((field.name().into(), value.into()));
self.log_record
.add_attribute(Key::new(field.name()), AnyValue::from(value));
}

fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
#[cfg(feature = "experimental_metadata_attributes")]
if is_duplicated_metadata(field.name()) {
return;
}
self.log_record_attributes
.push((field.name().into(), value.into()));
self.log_record
.add_attribute(Key::new(field.name()), AnyValue::from(value));
}

// TODO: Remaining field types from AnyValue : Bytes, ListAny, Boolean
Expand Down Expand Up @@ -173,11 +173,10 @@ where
log_record.set_severity_number(severity_of_level(meta.level()));
log_record.set_severity_text(meta.level().to_string().into());

let mut visitor = EventVisitor::default();
let mut visitor = EventVisitor::new(&mut log_record);
visitor.visit_metadata(meta);
// Visit fields.
event.record(&mut visitor);
visitor.push_to_otel_log_record(&mut log_record);

self.logger.emit(log_record);
}
Expand Down
5 changes: 4 additions & 1 deletion opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ harness = false
[[bench]]
name = "metric_counter"
harness = false
required-features = ["metrics"]

[[bench]]
name = "metric_gauge"
harness = false

[[bench]]
name = "attribute_set"
Expand Down
37 changes: 29 additions & 8 deletions opentelemetry-sdk/benches/metric_counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use opentelemetry::{
};
use opentelemetry_sdk::metrics::{ManualReader, SdkMeterProvider};
use rand::{
rngs::{self, SmallRng},
rngs::{self},
Rng, SeedableRng,
};
use std::cell::RefCell;
Expand Down Expand Up @@ -107,14 +107,35 @@ fn counter_add(c: &mut Criterion) {
});
});

c.bench_function("Random_Generator_5", |b| {
// Cause overflow.
for v in 0..2001 {
counter.add(100, &[KeyValue::new("A", v.to_string())]);
}
c.bench_function("Counter_Overflow", |b| {
b.iter(|| {
let mut rng = SmallRng::from_entropy();
let _i1 = rng.gen_range(0..4);
let _i2 = rng.gen_range(0..4);
let _i3 = rng.gen_range(0..10);
let _i4 = rng.gen_range(0..10);
let _i5 = rng.gen_range(0..10);
// 4*4*10*10 = 1600 time series.
let rands = CURRENT_RNG.with(|rng| {
let mut rng = rng.borrow_mut();
[
rng.gen_range(0..4),
rng.gen_range(0..4),
rng.gen_range(0..10),
rng.gen_range(0..10),
]
});
let index_first_attribute = rands[0];
let index_second_attribute = rands[1];
let index_third_attribute = rands[2];
let index_forth_attribute = rands[3];
counter.add(
1,
&[
KeyValue::new("attribute1", attribute_values[index_first_attribute]),
KeyValue::new("attribute2", attribute_values[index_second_attribute]),
KeyValue::new("attribute3", attribute_values[index_third_attribute]),
KeyValue::new("attribute4", attribute_values[index_forth_attribute]),
],
);
});
});

Expand Down
82 changes: 82 additions & 0 deletions opentelemetry-sdk/benches/metric_gauge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
The benchmark results:
criterion = "0.5.1"
OS: Ubuntu 22.04.3 LTS (5.15.146.1-microsoft-standard-WSL2)
Hardware: AMD EPYC 7763 64-Core Processor - 2.44 GHz, 16vCPUs,
RAM: 64.0 GB
| Test | Average time|
|--------------------------------|-------------|
| Gauge_Add_4 | 586 ns |
*/

use criterion::{criterion_group, criterion_main, Criterion};
use opentelemetry::{
metrics::{Gauge, MeterProvider as _},
KeyValue,
};
use opentelemetry_sdk::metrics::{ManualReader, SdkMeterProvider};
use rand::{
rngs::{self},
Rng, SeedableRng,
};
use std::cell::RefCell;

thread_local! {
/// Store random number generator for each thread
static CURRENT_RNG: RefCell<rngs::SmallRng> = RefCell::new(rngs::SmallRng::from_entropy());
}

// Run this benchmark with:
// cargo bench --bench metric_gauge
fn create_gauge() -> Gauge<u64> {
let meter_provider: SdkMeterProvider = SdkMeterProvider::builder()
.with_reader(ManualReader::builder().build())
.build();
let meter = meter_provider.meter("benchmarks");

meter.u64_gauge("gauge_bench").init()
}

fn criterion_benchmark(c: &mut Criterion) {
gauge_record(c);
}

fn gauge_record(c: &mut Criterion) {
let attribute_values = [
"value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9",
"value10",
];

let gauge = create_gauge();
c.bench_function("Gauge_Add_4", |b| {
b.iter(|| {
// 4*4*10*10 = 1600 time series.
let rands = CURRENT_RNG.with(|rng| {
let mut rng = rng.borrow_mut();
[
rng.gen_range(0..4),
rng.gen_range(0..4),
rng.gen_range(0..10),
rng.gen_range(0..10),
]
});
let index_first_attribute = rands[0];
let index_second_attribute = rands[1];
let index_third_attribute = rands[2];
let index_forth_attribute = rands[3];
gauge.record(
1,
&[
KeyValue::new("attribute1", attribute_values[index_first_attribute]),
KeyValue::new("attribute2", attribute_values[index_second_attribute]),
KeyValue::new("attribute3", attribute_values[index_third_attribute]),
KeyValue::new("attribute4", attribute_values[index_forth_attribute]),
],
);
});
});
}

criterion_group!(benches, criterion_benchmark);

criterion_main!(benches);
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub(crate) static STREAM_OVERFLOW_ATTRIBUTE_SET: Lazy<AttributeSet> = Lazy::new(

/// Checks whether aggregator has hit cardinality limit for metric streams
pub(crate) fn is_under_cardinality_limit(size: usize) -> bool {
size < STREAM_CARDINALITY_LIMIT as usize - 1
size < STREAM_CARDINALITY_LIMIT as usize
}

/// Receives measurements to be aggregated.
Expand Down
10 changes: 5 additions & 5 deletions opentelemetry-sdk/src/metrics/internal/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ impl<T: Number<T>> ValueMap<T> {
Entry::Vacant(vacant_entry) => {
if is_under_cardinality_limit(size) {
vacant_entry.insert(measurement);
} else if let Some(val) = values.get_mut(&STREAM_OVERFLOW_ATTRIBUTE_SET) {
*val += measurement;
return;
} else {
values
.entry(STREAM_OVERFLOW_ATTRIBUTE_SET.clone())
.and_modify(|val| *val += measurement)
.or_insert(measurement);
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow.".into()));
values.insert(STREAM_OVERFLOW_ATTRIBUTE_SET.clone(), measurement);
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into()));
}
}
}
Expand Down
29 changes: 29 additions & 0 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,35 @@ mod tests {
// "multi_thread" tokio flavor must be used else flush won't
// be able to make progress!

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn counter_overflow_delta() {
// Arrange
let mut test_context = TestContext::new(Temporality::Delta);
let counter = test_context.u64_counter("test", "my_counter", None);

// Act
// Record measurements with A:0, A:1,.......A:1999, which just fits in the 2000 limit
for v in 0..2000 {
counter.add(100, &[KeyValue::new("A", v.to_string())]);
}

// All of the below will now go into overflow.
counter.add(100, &[KeyValue::new("A", "foo")]);
counter.add(100, &[KeyValue::new("A", "another")]);
counter.add(100, &[KeyValue::new("A", "yet_another")]);
test_context.flush_metrics();

let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);

// Expecting 2001 metric points. (2000 + 1 overflow)
assert_eq!(sum.data_points.len(), 2001);

let data_point =
find_datapoint_with_key_value(&sum.data_points, "otel.metric.overflow", "true")
.expect("overflow point expected");
assert_eq!(data_point.value, 300);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn counter_aggregation_cumulative() {
// Run this test with stdout enabled to see output.
Expand Down
5 changes: 5 additions & 0 deletions stress/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ name = "metrics"
path = "src/metrics.rs"
doc = false

[[bin]] # Bin to run the metrics overflow stress tests
name = "metrics_overflow"
path = "src/metrics_overflow.rs"
doc = false

[[bin]] # Bin to run the logs stress tests
name = "logs"
path = "src/logs.rs"
Expand Down
2 changes: 1 addition & 1 deletion stress/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
OS: Ubuntu 22.04.3 LTS (5.15.146.1-microsoft-standard-WSL2)
Hardware: AMD EPYC 7763 64-Core Processor - 2.44 GHz, 16vCPUs,
RAM: 64.0 GB
39 M/sec
53 M/sec
*/

use opentelemetry_appender_tracing::layer;
Expand Down
Loading

0 comments on commit 191684d

Please sign in to comment.