diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 90995c1d116a..6588f9332e2c 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1268,6 +1268,7 @@ dependencies = [ "parking_lot", "rand", "tempfile", + "tokio", "url", ] diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml index fb2e7e914fe5..3aea88ed88af 100644 --- a/datafusion/execution/Cargo.toml +++ b/datafusion/execution/Cargo.toml @@ -48,4 +48,5 @@ object_store = { workspace = true } parking_lot = { workspace = true } rand = { workspace = true } tempfile = { workspace = true } +tokio = { workspace = true } url = { workspace = true } diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index cf5f72656859..4bb6616319e5 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -23,7 +23,10 @@ use log::debug; use parking_lot::Mutex; use std::{ num::NonZeroUsize, - sync::atomic::{AtomicU64, AtomicUsize, Ordering}, + sync::{ + atomic::{AtomicU64, AtomicUsize, Ordering}, + Arc, + }, }; /// A [`MemoryPool`] that enforces no limit @@ -262,7 +265,7 @@ fn insufficient_capacity_err( pub struct TrackConsumersPool { inner: I, top: NonZeroUsize, - tracked_consumers: Mutex>, + tracked_consumers: Arc>>, } impl TrackConsumersPool { @@ -271,10 +274,31 @@ impl TrackConsumersPool { /// The `top` determines how many Top K [`MemoryConsumer`]s to include /// in the reported [`DataFusionError::ResourcesExhausted`]. pub fn new(inner: I, top: NonZeroUsize) -> Self { + let tracked_consumers = Default::default(); + + let _captured: Arc< + parking_lot::lock_api::Mutex< + parking_lot::RawMutex, + HashMap, + >, + > = Arc::clone(&tracked_consumers); + #[allow(clippy::disallowed_methods)] + tokio::spawn(async move { + loop { + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + println!( + "REPORT: {}, {}", + Utc::now(), + Self::_report_top(5, Arc::clone(&_captured)) + ); + } + }); + Self { inner, top, - tracked_consumers: Default::default(), + tracked_consumers, } } @@ -282,17 +306,21 @@ impl TrackConsumersPool { /// which have the same name. /// /// This is very tied to the implementation of the memory consumer. - fn has_multiple_consumers(&self, name: &String) -> bool { + fn has_multiple_consumers( + name: &String, + tracked_consumers: &Arc>>, + ) -> bool { let consumer = MemoryConsumer::new(name); let consumer_with_spill = consumer.clone().with_can_spill(true); - let guard = self.tracked_consumers.lock(); + let guard = tracked_consumers.lock(); guard.contains_key(&consumer) && guard.contains_key(&consumer_with_spill) } - /// The top consumers in a report string. - pub fn report_top(&self, top: usize) -> String { - let mut consumers = self - .tracked_consumers + fn _report_top( + top: usize, + tracked_consumers: Arc>>, + ) -> String { + let mut consumers = tracked_consumers .lock() .iter() .map(|(consumer, reserved)| { @@ -308,7 +336,7 @@ impl TrackConsumersPool { consumers[0..std::cmp::min(top, consumers.len())] .iter() .map(|((name, can_spill), size)| { - if self.has_multiple_consumers(name) { + if Self::has_multiple_consumers(name, &tracked_consumers) { format!("{name}(can_spill={}) consumed {:?} bytes", can_spill, size) } else { format!("{name} consumed {:?} bytes", size) @@ -317,6 +345,11 @@ impl TrackConsumersPool { .collect::>() .join(", ") } + + /// The top consumers in a report string. + pub fn report_top(&self, top: usize) -> String { + Self::_report_top(top, Arc::clone(&self.tracked_consumers)) + } } impl MemoryPool for TrackConsumersPool { @@ -348,8 +381,6 @@ impl MemoryPool for TrackConsumersPool { .and_modify(|bytes| { bytes.fetch_add(additional as u64, Ordering::AcqRel); }); - - println!("REPORT: {}, {}", Utc::now(), self.report_top(5)); } fn shrink(&self, reservation: &MemoryReservation, shrink: usize) { @@ -384,7 +415,6 @@ impl MemoryPool for TrackConsumersPool { .and_modify(|bytes| { bytes.fetch_add(additional as u64, Ordering::AcqRel); }); - println!("REPORT: {}, {}", Utc::now(), self.report_top(5)); Ok(()) }