Skip to content

Commit

Permalink
Improve shutdown handling in PeriodicReader (#2422)
Browse files Browse the repository at this point in the history
  • Loading branch information
cijothomas authored Dec 13, 2024
1 parent 9b0ccce commit 15d69b1
Showing 1 changed file with 42 additions and 16 deletions.
58 changes: 42 additions & 16 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl PeriodicReader {
let reader = PeriodicReader {
inner: Arc::new(PeriodicReaderInner {
message_sender: Arc::new(message_sender),
is_shutdown: AtomicBool::new(false),
shutdown_invoked: AtomicBool::new(false),
producer: Mutex::new(None),
exporter: Arc::new(exporter),
}),
Expand Down Expand Up @@ -300,7 +300,7 @@ struct PeriodicReaderInner {
exporter: Arc<dyn PushMetricExporter>,
message_sender: Arc<mpsc::Sender<Message>>,
producer: Mutex<Option<Weak<dyn SdkProducer>>>,
is_shutdown: AtomicBool,
shutdown_invoked: AtomicBool,
}

impl PeriodicReaderInner {
Expand All @@ -314,10 +314,6 @@ impl PeriodicReaderInner {
}

fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
return Err(MetricError::Other("reader is shut down".into()));
}

let producer = self.producer.lock().expect("lock poisoned");
if let Some(p) = producer.as_ref() {
p.upgrade()
Expand Down Expand Up @@ -378,9 +374,28 @@ impl PeriodicReaderInner {
}

fn force_flush(&self) -> MetricResult<()> {
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
return Err(MetricError::Other("reader is shut down".into()));
if self
.shutdown_invoked
.load(std::sync::atomic::Ordering::Relaxed)
{
return Err(MetricError::Other(
"Cannot perform flush as PeriodicReader shutdown already invoked.".into(),
));
}

// TODO: Better message for this scenario.
// Flush and Shutdown called from 2 threads Flush check shutdown
// flag before shutdown thread sets it. Both threads attempt to send
// message to the same channel. Case1: Flush thread sends message first,
// shutdown thread sends message next. Flush would succeed, as
// background thread won't process shutdown message until flush
// triggered export is done. Case2: Shutdown thread sends message first,
// flush thread sends message next. Shutdown would succeed, as
// background thread would process shutdown message first. The
// background exits so it won't receive the flush message. ForceFlush
// returns Failure, but we could indicate specifically that shutdown has
// completed. TODO is to see if this message can be improved.

let (response_tx, response_rx) = mpsc::channel();
self.message_sender
.send(Message::Flush(response_tx))
Expand All @@ -399,8 +414,13 @@ impl PeriodicReaderInner {
}

fn shutdown(&self) -> MetricResult<()> {
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
return Err(MetricError::Other("Reader is already shut down".into()));
if self
.shutdown_invoked
.swap(true, std::sync::atomic::Ordering::Relaxed)
{
return Err(MetricError::Other(
"PeriodicReader shutdown already invoked.".into(),
));
}

// TODO: See if this is better to be created upfront.
Expand All @@ -410,16 +430,12 @@ impl PeriodicReaderInner {
.map_err(|e| MetricError::Other(e.to_string()))?;

if let Ok(response) = response_rx.recv() {
self.is_shutdown
.store(true, std::sync::atomic::Ordering::Relaxed);
if response {
Ok(())
} else {
Err(MetricError::Other("Failed to shutdown".into()))
}
} else {
self.is_shutdown
.store(true, std::sync::atomic::Ordering::Relaxed);
Err(MetricError::Other("Failed to shutdown".into()))
}
}
Expand Down Expand Up @@ -697,27 +713,31 @@ mod tests {
collection_triggered_by_interval_helper();
collection_triggered_by_flush_helper();
collection_triggered_by_shutdown_helper();
collection_triggered_by_drop_helper();
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn collection_from_tokio_multi_with_one_worker() {
collection_triggered_by_interval_helper();
collection_triggered_by_flush_helper();
collection_triggered_by_shutdown_helper();
collection_triggered_by_drop_helper();
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn collection_from_tokio_with_two_worker() {
collection_triggered_by_interval_helper();
collection_triggered_by_flush_helper();
collection_triggered_by_shutdown_helper();
collection_triggered_by_drop_helper();
}

#[tokio::test(flavor = "current_thread")]
async fn collection_from_tokio_current() {
collection_triggered_by_interval_helper();
collection_triggered_by_flush_helper();
collection_triggered_by_shutdown_helper();
collection_triggered_by_drop_helper();
}

fn collection_triggered_by_interval_helper() {
Expand All @@ -742,7 +762,13 @@ mod tests {
});
}

fn collection_helper(trigger: fn(&SdkMeterProvider)) {
fn collection_triggered_by_drop_helper() {
collection_helper(|meter_provider| {
drop(meter_provider);
});
}

fn collection_helper(trigger: fn(SdkMeterProvider)) {
// Arrange
let interval = std::time::Duration::from_millis(10);
let exporter = InMemoryMetricExporter::default();
Expand All @@ -762,7 +788,7 @@ mod tests {
.build();

// Act
trigger(&meter_provider);
trigger(meter_provider);

// Assert
receiver
Expand Down

0 comments on commit 15d69b1

Please sign in to comment.