diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index 304b6736dd..ff96e037b9 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -158,7 +158,7 @@ impl PeriodicReader { let reader = PeriodicReader { inner: Arc::new(PeriodicReaderInner { message_sender: Arc::new(message_sender), - is_shutdown: AtomicBool::new(false), + shutdown_invoked: AtomicBool::new(false), producer: Mutex::new(None), exporter: Arc::new(exporter), }), @@ -300,7 +300,7 @@ struct PeriodicReaderInner { exporter: Arc, message_sender: Arc>, producer: Mutex>>, - is_shutdown: AtomicBool, + shutdown_invoked: AtomicBool, } impl PeriodicReaderInner { @@ -314,10 +314,6 @@ impl PeriodicReaderInner { } fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> { - if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { - return Err(MetricError::Other("reader is shut down".into())); - } - let producer = self.producer.lock().expect("lock poisoned"); if let Some(p) = producer.as_ref() { p.upgrade() @@ -378,9 +374,28 @@ impl PeriodicReaderInner { } fn force_flush(&self) -> MetricResult<()> { - if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { - return Err(MetricError::Other("reader is shut down".into())); + if self + .shutdown_invoked + .load(std::sync::atomic::Ordering::Relaxed) + { + return Err(MetricError::Other( + "Cannot perform flush as PeriodicReader shutdown already invoked.".into(), + )); } + + // TODO: Better message for this scenario. + // Flush and Shutdown called from 2 threads Flush check shutdown + // flag before shutdown thread sets it. Both threads attempt to send + // message to the same channel. Case1: Flush thread sends message first, + // shutdown thread sends message next. Flush would succeed, as + // background thread won't process shutdown message until flush + // triggered export is done. Case2: Shutdown thread sends message first, + // flush thread sends message next. Shutdown would succeed, as + // background thread would process shutdown message first. The + // background exits so it won't receive the flush message. ForceFlush + // returns Failure, but we could indicate specifically that shutdown has + // completed. TODO is to see if this message can be improved. + let (response_tx, response_rx) = mpsc::channel(); self.message_sender .send(Message::Flush(response_tx)) @@ -399,8 +414,13 @@ impl PeriodicReaderInner { } fn shutdown(&self) -> MetricResult<()> { - if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { - return Err(MetricError::Other("Reader is already shut down".into())); + if self + .shutdown_invoked + .swap(true, std::sync::atomic::Ordering::Relaxed) + { + return Err(MetricError::Other( + "PeriodicReader shutdown already invoked.".into(), + )); } // TODO: See if this is better to be created upfront. @@ -410,16 +430,12 @@ impl PeriodicReaderInner { .map_err(|e| MetricError::Other(e.to_string()))?; if let Ok(response) = response_rx.recv() { - self.is_shutdown - .store(true, std::sync::atomic::Ordering::Relaxed); if response { Ok(()) } else { Err(MetricError::Other("Failed to shutdown".into())) } } else { - self.is_shutdown - .store(true, std::sync::atomic::Ordering::Relaxed); Err(MetricError::Other("Failed to shutdown".into())) } } @@ -697,6 +713,7 @@ mod tests { collection_triggered_by_interval_helper(); collection_triggered_by_flush_helper(); collection_triggered_by_shutdown_helper(); + collection_triggered_by_drop_helper(); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] @@ -704,6 +721,7 @@ mod tests { collection_triggered_by_interval_helper(); collection_triggered_by_flush_helper(); collection_triggered_by_shutdown_helper(); + collection_triggered_by_drop_helper(); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -711,6 +729,7 @@ mod tests { collection_triggered_by_interval_helper(); collection_triggered_by_flush_helper(); collection_triggered_by_shutdown_helper(); + collection_triggered_by_drop_helper(); } #[tokio::test(flavor = "current_thread")] @@ -718,6 +737,7 @@ mod tests { collection_triggered_by_interval_helper(); collection_triggered_by_flush_helper(); collection_triggered_by_shutdown_helper(); + collection_triggered_by_drop_helper(); } fn collection_triggered_by_interval_helper() { @@ -742,7 +762,13 @@ mod tests { }); } - fn collection_helper(trigger: fn(&SdkMeterProvider)) { + fn collection_triggered_by_drop_helper() { + collection_helper(|meter_provider| { + drop(meter_provider); + }); + } + + fn collection_helper(trigger: fn(SdkMeterProvider)) { // Arrange let interval = std::time::Duration::from_millis(10); let exporter = InMemoryMetricExporter::default(); @@ -762,7 +788,7 @@ mod tests { .build(); // Act - trigger(&meter_provider); + trigger(meter_provider); // Assert receiver