From eca6039d002072441c050f6efbe07119887715c8 Mon Sep 17 00:00:00 2001 From: Guillaume Binet Date: Mon, 20 Jan 2025 16:45:08 -0600 Subject: [PATCH] Monitoring global for Pools + API enhancements. (#230) * Monitoring global for Pools + API enhancements. * Another layer of API improvements. * small snafu * Typo :fp: --- Cargo.toml | 3 + components/sources/cu_v4l/src/lib.rs | 12 +- components/sources/cu_v4l/src/v4lstream.rs | 34 ++- core/cu29_log_runtime/Cargo.toml | 2 +- core/cu29_runtime/Cargo.toml | 1 + core/cu29_runtime/src/pool.rs | 298 +++++++++++++-------- 6 files changed, 237 insertions(+), 113 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9dba6436a..c9fa5b02d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -124,5 +124,8 @@ tempfile = "3.14.0" # rerun rerun = "0.21.0" +# smallvec to avoid heap allocations +smallvec = { version = "1.13.2", features = ["serde"] } + # [profile.release] # lto = true diff --git a/components/sources/cu_v4l/src/lib.rs b/components/sources/cu_v4l/src/lib.rs index 84bc1ea0c..eae0e1d9b 100644 --- a/components/sources/cu_v4l/src/lib.rs +++ b/components/sources/cu_v4l/src/lib.rs @@ -186,8 +186,18 @@ mod linux_impl { let mut stream = CuV4LStream::with_buffers( &dev, Type::VideoCapture, - actual_fmt.size as usize, req_buffers, + CuHostMemoryPool::new( + format!("V4L Host Pool {}", v4l_device).as_str(), + req_buffers as usize + 1, + || vec![0; actual_fmt.size as usize], + ) + .map_err(|e| { + CuError::new_with_cause( + "Could not create host memory pool backing the V4lStream", + e, + ) + })?, ) .map_err(|e| CuError::new_with_cause("Could not create the V4lStream", e))?; debug!("V4L: Set timeout to {} ms", req_timeout.as_millis() as u64); diff --git a/components/sources/cu_v4l/src/v4lstream.rs b/components/sources/cu_v4l/src/v4lstream.rs index 7213a9d40..123f4801d 100644 --- a/components/sources/cu_v4l/src/v4lstream.rs +++ b/components/sources/cu_v4l/src/v4lstream.rs @@ -14,7 +14,7 @@ use v4l::{v4l2, Device}; pub struct CuV4LStream { v4l_handle: Arc, v4l_buf_type: Type, - memory_pool: CuHostMemoryPool>, + pool: Arc>>, // Arena matching the vl42 metadata and the Copper Buffers arena: Vec<(Metadata, Option>>)>, arena_last_freed_up_index: usize, @@ -22,25 +22,47 @@ pub struct CuV4LStream { active: bool, } +use std::fs; +use std::os::fd::RawFd; +use std::path::PathBuf; + +fn get_original_dev_path(fd: RawFd) -> Option { + let link_path = format!("/proc/self/fd/{}", fd); + + if let Ok(path) = fs::read_link(link_path) { + if path.to_string_lossy().starts_with("/dev/video") { + return Some(path); + } + } + None +} + impl CuV4LStream { #[allow(dead_code)] pub fn new(dev: &Device, buf_size: usize, buf_type: Type) -> io::Result { - CuV4LStream::with_buffers(dev, buf_type, buf_size, 4) + let original_path = get_original_dev_path(dev.handle().fd()).unwrap(); + let pool = CuHostMemoryPool::new( + format!("V4L Host Pool {}", original_path.display()).as_str(), + 4, + || vec![0; buf_size], + ) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + + CuV4LStream::with_buffers(dev, buf_type, 4, pool) } pub fn with_buffers( dev: &Device, buf_type: Type, - buf_size: usize, buf_count: u32, + pool: Arc>>, ) -> io::Result { - let memory_pool = CuHostMemoryPool::new(buf_count as usize + 1, || vec![0; buf_size]); // +1 to be able to queue one last buffer before zapping the first let mut arena = Vec::new(); arena.resize(buf_count as usize, (Metadata::default(), None)); let mut result = CuV4LStream { v4l_handle: dev.handle(), - memory_pool, + pool, arena, arena_last_freed_up_index: 0, v4l_buf_type: buf_type, @@ -191,7 +213,7 @@ impl Stream for CuV4LStream { impl CaptureStream<'_> for CuV4LStream { fn queue(&mut self, index: usize) -> io::Result<()> { - let buffer_handle = self.memory_pool.acquire().unwrap(); + let buffer_handle = self.pool.acquire().unwrap(); self.arena[index] = (Metadata::default(), Some(buffer_handle.clone())); let mut v4l2_buf = buffer_handle.with_inner_mut(|inner| { let destination: &mut [u8] = inner; diff --git a/core/cu29_log_runtime/Cargo.toml b/core/cu29_log_runtime/Cargo.toml index c140699e3..950f18607 100644 --- a/core/cu29_log_runtime/Cargo.toml +++ b/core/cu29_log_runtime/Cargo.toml @@ -19,7 +19,7 @@ cu29-log = { workspace = true } cu29-traits = { workspace = true } cu29-clock = { workspace = true } bincode = { workspace = true } -smallvec = "1.13.2" +smallvec = { workspace = true } log = "0.4.22" [dev-dependencies] diff --git a/core/cu29_runtime/Cargo.toml b/core/cu29_runtime/Cargo.toml index 53a9f25b4..aeb9f4dc7 100644 --- a/core/cu29_runtime/Cargo.toml +++ b/core/cu29_runtime/Cargo.toml @@ -34,6 +34,7 @@ cu29-clock = { workspace = true } clap = { workspace = true } tempfile = { workspace = true } arrayvec = "0.7.6" +smallvec = { workspace = true } ron = "0.8.1" hdrhistogram = "7.5.4" petgraph = { version = "0.7.1", features = ["serde", "serde-1", "serde_derive"] } diff --git a/core/cu29_runtime/src/pool.rs b/core/cu29_runtime/src/pool.rs index b611f5e95..d345b98d2 100644 --- a/core/cu29_runtime/src/pool.rs +++ b/core/cu29_runtime/src/pool.rs @@ -1,12 +1,63 @@ +use arrayvec::ArrayString; use bincode::de::Decoder; use bincode::enc::Encoder; use bincode::error::{DecodeError, EncodeError}; use bincode::{Decode, Encode}; +use cu29_traits::CuResult; use object_pool::{Pool, ReusableOwned}; +use smallvec::SmallVec; use std::alloc::{alloc, dealloc, Layout}; +use std::collections::HashMap; use std::fmt::Debug; use std::ops::{Deref, DerefMut}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, OnceLock}; + +type PoolID = ArrayString<64>; + +/// Trait for a Pool to exposed to be monitored by the monitoring API. +pub trait PoolMonitor: Send + Sync { + /// A unique and descriptive identifier for the pool. + fn id(&self) -> PoolID; + + /// Number of buffer slots left in the pool. + fn space_left(&self) -> usize; + + /// Total size of the pool in number of buffers. + fn total_size(&self) -> usize; + + /// Size of one buffer + fn buffer_size(&self) -> usize; +} + +static POOL_REGISTRY: OnceLock>>> = OnceLock::new(); +const MAX_POOLS: usize = 16; + +// Register a pool to the global registry. +fn register_pool(pool: Arc) { + POOL_REGISTRY + .get_or_init(|| Mutex::new(HashMap::new())) + .lock() + .unwrap() + .insert(pool.id().to_string(), pool); +} + +type PoolStats = (PoolID, usize, usize, usize); + +/// Get the list of pools and their statistics. +/// We use SmallVec here to avoid heap allocations while the stack is running. +pub fn pools_statistics() -> SmallVec<[PoolStats; MAX_POOLS]> { + let registry = POOL_REGISTRY.get().unwrap().lock().unwrap(); + let mut result = SmallVec::with_capacity(MAX_POOLS); + for pool in registry.values() { + result.push(( + pool.id(), + pool.space_left(), + pool.total_size(), + pool.buffer_size(), + )); + } + result +} /// Basic Type that can be used in a buffer in a CuPool. pub trait ElementType: @@ -20,10 +71,13 @@ impl ElementType for T where { } -pub trait ArrayLike: Deref + DerefMut + Debug { +pub trait ArrayLike: Deref + DerefMut + Debug + Sync + Send { type Element: ElementType; } +/// A Handle to a Buffer. +/// For onboard usages, the buffer should be Pooled (ie, coming from a preallocated pool). +/// The Detached version is for offline usages where we don't really need a pool to deserialize them. pub enum CuHandleInner { Pooled(ReusableOwned), Detached(T), // Should only be used in offline cases (e.g. deserialization) @@ -120,38 +174,76 @@ impl Decode for CuHandle> { } } -pub trait CuPool { +/// A CuPool is a pool of buffers that can be shared between different parts of the code. +/// Handles can be stored locally in the tasks and shared between them. +pub trait CuPool: PoolMonitor { + /// Acquire a buffer from the pool. fn acquire(&self) -> Option>; - fn space_left(&self) -> usize; + + /// Copy data from a handle to a new handle from the pool. fn copy_from(&self, from: &mut CuHandle) -> CuHandle where O: ArrayLike; } -pub trait DeviceCuPool { - type O: ArrayLike; - +/// A device memory pool can copy data from a device to a host memory pool on top. +pub trait DeviceCuPool: CuPool { /// Takes a handle to a device buffer and copies it into a host buffer pool. /// It returns a new handle from the host pool with the data from the device handle given. - fn copy_into( + fn copy_to_host_pool( &self, - from_device_handle: &CuHandle, - into_cu_host_memory_pool: &mut CuHostMemoryPool, - ) -> CuHandle; + from_device_handle: &CuHandle, + to_host_handle: &mut CuHandle, + ) -> CuResult<()> + where + O: ArrayLike; } +/// A pool of host memory buffers. pub struct CuHostMemoryPool { + /// Underlying pool of host buffers. + // Being an Arc is a requirement of try_pull_owned() so buffers can refer back to the pool. + id: PoolID, pool: Arc>, + size: usize, + buffer_size: usize, } -impl CuHostMemoryPool { - pub fn new(size: usize, f: F) -> Self +impl CuHostMemoryPool { + pub fn new(id: &str, size: usize, buffer_initializer: F) -> CuResult> where F: Fn() -> T, { - Self { - pool: Arc::new(Pool::new(size, f)), - } + let pool = Arc::new(Pool::new(size, buffer_initializer)); + let buffer_size = pool.try_pull().unwrap().len() * size_of::(); + + let og = Self { + id: PoolID::from(id).map_err(|_| "Failed to create PoolID")?, + pool, + size, + buffer_size, + }; + let og = Arc::new(og); + register_pool(og.clone()); + Ok(og) + } +} + +impl PoolMonitor for CuHostMemoryPool { + fn id(&self) -> PoolID { + self.id + } + + fn space_left(&self) -> usize { + self.pool.len() + } + + fn total_size(&self) -> usize { + self.size + } + + fn buffer_size(&self) -> usize { + self.buffer_size } } @@ -162,10 +254,6 @@ impl CuPool for CuHostMemoryPool { owned_object.map(|reusable| CuHandle(Arc::new(Mutex::new(CuHandleInner::Pooled(reusable))))) } - fn space_left(&self) -> usize { - self.pool.len() - } - fn copy_from>(&self, from: &mut CuHandle) -> CuHandle { let to_handle = self.acquire().expect("No available buffers in the pool"); @@ -191,20 +279,6 @@ impl CuPool for CuHostMemoryPool { } } -impl CuHostMemoryPool -where - T: ArrayLike, -{ - pub fn new_buffers(size: usize, f: F) -> Self - where - F: Fn() -> T, - { - Self { - pool: Arc::new(Pool::new(size, f)), - } - } -} - impl ArrayLike for Vec { type Element = E; } @@ -212,6 +286,7 @@ impl ArrayLike for Vec { #[cfg(all(feature = "cuda", not(target_os = "macos")))] mod cuda { use super::*; + use cu29_traits::CuError; use cudarc::driver::{CudaDevice, CudaSlice, DeviceRepr, ValidAsZeroBits}; use std::sync::Arc; @@ -253,57 +328,76 @@ mod cuda { } } + /// A pool of CUDA memory buffers. pub struct CuCudaPool where E: ElementType + ValidAsZeroBits + DeviceRepr + Unpin, { + id: PoolID, device: Arc, pool: Arc>>, + nb_buffers: usize, + nb_element_per_buffer: usize, } impl CuCudaPool { #[allow(dead_code)] pub fn new( + id: &'static str, device: Arc, nb_buffers: usize, nb_element_per_buffer: usize, - ) -> Self { - Self { + ) -> CuResult { + let pool = (0..nb_buffers) + .map(|_| { + device + .alloc_zeros(nb_element_per_buffer) + .map(CudaSliceWrapper) + .map_err(|_| "Failed to allocate device memory") + }) + .collect::, _>>()?; + + Ok(Self { + id: PoolID::from(id).map_err(|_| "Failed to create PoolID")?, device: device.clone(), - pool: Arc::new(Pool::new(nb_buffers, || { - CudaSliceWrapper( - device - .alloc_zeros(nb_element_per_buffer) - .expect("Failed to allocate device memory"), - ) - })), - } + pool: Arc::new(Pool::from_vec(pool)), + nb_buffers, + nb_element_per_buffer, + }) } + } - #[allow(dead_code)] - pub fn new_with(device: Arc, nb_buffers: usize, f: F) -> Self - where - F: Fn() -> CudaSliceWrapper, - { - Self { - device: device.clone(), - pool: Arc::new(Pool::new(nb_buffers, f)), - } + impl PoolMonitor for CuCudaPool + where + E: DeviceRepr + ElementType + ValidAsZeroBits, + { + fn id(&self) -> PoolID { + self.id + } + + fn space_left(&self) -> usize { + self.pool.len() + } + + fn total_size(&self) -> usize { + self.nb_buffers + } + + fn buffer_size(&self) -> usize { + self.nb_element_per_buffer * size_of::() } } - impl CuPool> for CuCudaPool { + impl CuPool> for CuCudaPool + where + E: DeviceRepr + ElementType + ValidAsZeroBits, + { fn acquire(&self) -> Option>> { self.pool .try_pull_owned() .map(|x| CuHandle(Arc::new(Mutex::new(CuHandleInner::Pooled(x))))) } - fn space_left(&self) -> usize { - self.pool.len() - } - - /// Copy from host to device fn copy_from(&self, from_handle: &mut CuHandle) -> CuHandle> where O: ArrayLike, @@ -340,55 +434,46 @@ mod cuda { } } - impl DeviceCuPool for CuCudaPool + impl DeviceCuPool> for CuCudaPool where E: ElementType + ValidAsZeroBits + DeviceRepr, - T: ArrayLike, { - type O = CudaSliceWrapper; - /// Copy from device to host - fn copy_into( + fn copy_to_host_pool( &self, - device_handle: &CuHandle, - cu_host_memory_pool: &mut CuHostMemoryPool, - ) -> CuHandle { - let destination_handle = cu_host_memory_pool - .acquire() - .expect("No available buffers in the pool"); - + device_handle: &CuHandle>, + host_handle: &mut CuHandle, + ) -> Result<(), CuError> + where + O: ArrayLike, + { match device_handle.lock().unwrap().deref() { - CuHandleInner::Pooled(source) => { - match destination_handle.lock().unwrap().deref_mut() { - CuHandleInner::Pooled(ref mut destination) => { - self.device - .dtoh_sync_copy_into(source.as_cuda_slice(), destination) - .expect("Failed to copy data to device"); - } - CuHandleInner::Detached(ref mut destination) => { - self.device - .dtoh_sync_copy_into(source.as_cuda_slice(), destination) - .expect("Failed to copy data to device"); - } + CuHandleInner::Pooled(source) => match host_handle.lock().unwrap().deref_mut() { + CuHandleInner::Pooled(ref mut destination) => { + self.device + .dtoh_sync_copy_into(source.as_cuda_slice(), destination) + .expect("Failed to copy data to device"); } - } - CuHandleInner::Detached(source) => { - match destination_handle.lock().unwrap().deref_mut() { - CuHandleInner::Pooled(ref mut destination) => { - self.device - .dtoh_sync_copy_into(source.as_cuda_slice(), destination) - .expect("Failed to copy data to device"); - } - CuHandleInner::Detached(ref mut destination) => { - self.device - .dtoh_sync_copy_into(source.as_cuda_slice(), destination) - .expect("Failed to copy data to device"); - } + CuHandleInner::Detached(ref mut destination) => { + self.device + .dtoh_sync_copy_into(source.as_cuda_slice(), destination) + .expect("Failed to copy data to device"); } - } + }, + CuHandleInner::Detached(source) => match host_handle.lock().unwrap().deref_mut() { + CuHandleInner::Pooled(ref mut destination) => { + self.device + .dtoh_sync_copy_into(source.as_cuda_slice(), destination) + .expect("Failed to copy data to device"); + } + CuHandleInner::Detached(ref mut destination) => { + self.device + .dtoh_sync_copy_into(source.as_cuda_slice(), destination) + .expect("Failed to copy data to device"); + } + }, } - - destination_handle + Ok(()) } } } @@ -452,7 +537,8 @@ mod tests { let objs = RefCell::new(vec![vec![1], vec![2], vec![3]]); let holding = objs.borrow().clone(); let objs_as_slices = holding.iter().map(|x| x.as_slice()).collect::>(); - let pool = CuHostMemoryPool::new(3, || objs.borrow_mut().pop().unwrap()); + let pool = CuHostMemoryPool::new("mytestcudapool", 3, || objs.borrow_mut().pop().unwrap()) + .unwrap(); let obj1 = pool.acquire().unwrap(); { @@ -481,7 +567,7 @@ mod tests { fn test_cuda_pool() { use cudarc::driver::CudaDevice; let device = CudaDevice::new(0).unwrap(); - let pool = CuCudaPool::::new(device, 3, 1); + let pool = CuCudaPool::::new("mytestcudapool", device, 3, 1).unwrap(); let _obj1 = pool.acquire().unwrap(); @@ -508,9 +594,8 @@ mod tests { fn test_copy_roundtrip() { use cudarc::driver::CudaDevice; let device = CudaDevice::new(0).unwrap(); - let mut host_pool = CuHostMemoryPool::new(3, || vec![0.0; 1]); - - let cuda_pool = CuCudaPool::::new(device, 3, 1); + let host_pool = CuHostMemoryPool::new("mytesthostpool", 3, || vec![0.0; 1]).unwrap(); + let cuda_pool = CuCudaPool::::new("mytestcudapool", device, 3, 1).unwrap(); let cuda_handle = { let mut initial_handle = host_pool.acquire().unwrap(); @@ -528,7 +613,10 @@ mod tests { }; // get it back to the host - let final_handle = cuda_pool.copy_into(&cuda_handle, &mut host_pool); + let mut final_handle = host_pool.acquire().unwrap(); + cuda_pool + .copy_to_host_pool(&cuda_handle, &mut final_handle) + .unwrap(); let value = final_handle.lock().unwrap().deref().deref()[0]; assert_eq!(value, 42.0);