Skip to content

Commit

Permalink
Another layer of API improvements.
Browse files Browse the repository at this point in the history
  • Loading branch information
gbin committed Jan 20, 2025
1 parent 0a21f06 commit 4f701ea
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 34 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,5 +124,8 @@ tempfile = "3.14.0"
# rerun
rerun = "0.21.0"

# smallvec to avoid heap allocations
smallvec = { version = "1.13.2", fetures = ["serde"] }

# [profile.release]
# lto = true
8 changes: 7 additions & 1 deletion components/sources/cu_v4l/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,13 @@ mod linux_impl {
format!("V4L Host Pool {}", v4l_device).as_str(),
req_buffers as usize + 1,
|| vec![0; actual_fmt.size as usize],
),
)
.map_err(|e| {
CuError::new_with_cause(
"Could not create host memory pool backing the V4lStream",
e,
)
})?,
)
.map_err(|e| CuError::new_with_cause("Could not create the V4lStream", e))?;
debug!("V4L: Set timeout to {} ms", req_timeout.as_millis() as u64);
Expand Down
3 changes: 2 additions & 1 deletion components/sources/cu_v4l/src/v4lstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ impl CuV4LStream {
format!("V4L Host Pool {}", original_path.display()).as_str(),
4,
|| vec![0; buf_size],
);
)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;

CuV4LStream::with_buffers(dev, buf_type, 4, pool)
}
Expand Down
2 changes: 1 addition & 1 deletion core/cu29_log_runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ cu29-log = { workspace = true }
cu29-traits = { workspace = true }
cu29-clock = { workspace = true }
bincode = { workspace = true }
smallvec = "1.13.2"
smallvec = { workspace = true }
log = "0.4.22"

[dev-dependencies]
Expand Down
1 change: 1 addition & 0 deletions core/cu29_runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ cu29-clock = { workspace = true }
clap = { workspace = true }
tempfile = { workspace = true }
arrayvec = "0.7.6"
smallvec = { workspace = true }
ron = "0.8.1"
hdrhistogram = "7.5.4"
petgraph = { version = "0.7.1", features = ["serde", "serde-1", "serde_derive"] }
Expand Down
78 changes: 47 additions & 31 deletions core/cu29_runtime/src/pool.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
use arrayvec::ArrayString;
use bincode::de::Decoder;
use bincode::enc::Encoder;
use bincode::error::{DecodeError, EncodeError};
use bincode::{Decode, Encode};
use cu29_traits::CuResult;
use object_pool::{Pool, ReusableOwned};
use smallvec::SmallVec;
use std::alloc::{alloc, dealloc, Layout};
use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Mutex, OnceLock};

type PoolID = ArrayString<64>;

/// Trait for a Pool to exposed to be monitored by the monitoring API.
pub trait PoolMonitor: Send + Sync {
/// A unique and descriptive identifier for the pool.
fn id(&self) -> &str;
fn id(&self) -> PoolID;

/// Number of buffer slots left in the pool.
fn space_left(&self) -> usize;
Expand All @@ -26,25 +30,33 @@ pub trait PoolMonitor: Send + Sync {
}

static POOL_REGISTRY: OnceLock<Mutex<HashMap<String, Arc<dyn PoolMonitor>>>> = OnceLock::new();
const MAX_POOLS: usize = 16;

pub fn register_pool(pool: Arc<dyn PoolMonitor>) {
// Register a pool to the global registry.
fn register_pool(pool: Arc<dyn PoolMonitor>) {
POOL_REGISTRY
.get_or_init(|| Mutex::new(HashMap::new()))
.lock()
.unwrap()
.insert(pool.id().to_string(), pool);
}

pub fn list_pools() {
type PoolStats = (PoolID, usize, usize, usize);

/// Get the list of pools and their statistics.
/// We use SmallVec here to avoid heap allocations while the stack is running.
pub fn pools_statistics() -> SmallVec<[PoolStats; MAX_POOLS]> {
let registry = POOL_REGISTRY.get().unwrap().lock().unwrap();
let mut result = SmallVec::with_capacity(MAX_POOLS);
for pool in registry.values() {
println!(
"Pool ID: {}, Space Left: {}, Total Size: {}",
result.push((
pool.id(),
pool.space_left(),
pool.total_size()
);
pool.total_size(),
pool.buffer_size(),
));
}
result
}

/// Basic Type that can be used in a buffer in a CuPool.
Expand Down Expand Up @@ -191,35 +203,35 @@ pub trait DeviceCuPool<T: ArrayLike>: CuPool<T> {
pub struct CuHostMemoryPool<T> {
/// Underlying pool of host buffers.
// Beeing an Arc is a requirement of try_pull_owned() so buffers can refer back to the pool.

Check warning on line 205 in core/cu29_runtime/src/pool.rs

View workflow job for this annotation

GitHub Actions / Typos Check

"Beeing" should be "Being" or "Been".
id: String,
id: PoolID,
pool: Arc<Pool<T>>,
size: usize,
buffer_size: usize,
}

impl<T: ArrayLike + 'static> CuHostMemoryPool<T> {
pub fn new<F>(id: &str, size: usize, buffer_initializer: F) -> Arc<Self>
pub fn new<F>(id: &str, size: usize, buffer_initializer: F) -> CuResult<Arc<Self>>
where
F: Fn() -> T,
{
let pool = Arc::new(Pool::new(size, buffer_initializer));
let buffer_size = pool.try_pull().unwrap().len() * size_of::<T::Element>();

let og = Self {
id: id.to_string(),
id: PoolID::from(id).map_err(|_| "Failed to create PoolID")?,
pool,
size,
buffer_size,
};
let og = Arc::new(og);
register_pool(og.clone());
og
Ok(og)
}
}

impl<T: ArrayLike> PoolMonitor for CuHostMemoryPool<T> {
fn id(&self) -> &str {
self.id.as_str()
fn id(&self) -> PoolID {
self.id
}

fn space_left(&self) -> usize {
Expand Down Expand Up @@ -321,7 +333,7 @@ mod cuda {
where
E: ElementType + ValidAsZeroBits + DeviceRepr + Unpin,
{
id: &'static str,
id: PoolID,
device: Arc<CudaDevice>,
pool: Arc<Pool<CudaSliceWrapper<E>>>,
nb_buffers: usize,
Expand All @@ -335,29 +347,32 @@ mod cuda {
device: Arc<CudaDevice>,
nb_buffers: usize,
nb_element_per_buffer: usize,
) -> Self {
Self {
id,
) -> CuResult<Self> {
let pool = (0..nb_buffers)
.map(|_| {
device
.alloc_zeros(nb_element_per_buffer)
.map(CudaSliceWrapper)
.map_err(|_| "Failed to allocate device memory")
})
.collect::<Result<Vec<_>, _>>()?;

Ok(Self {
id: PoolID::from(id).map_err(|_| "Failed to create PoolID")?,
device: device.clone(),
pool: Arc::new(Pool::new(nb_buffers, || {
CudaSliceWrapper(
device
.alloc_zeros(nb_element_per_buffer)
.expect("Failed to allocate device memory"),
)
})),
pool: Arc::new(Pool::from_vec(pool)),
nb_buffers,
nb_element_per_buffer,
}
})
}
}

impl<E> PoolMonitor for CuCudaPool<E>
where
E: DeviceRepr + ElementType + ValidAsZeroBits,
{
fn id(&self) -> &'static str {
self.id
fn id(&self) -> PoolID {
self.id.clone()
}

fn space_left(&self) -> usize {
Expand Down Expand Up @@ -522,7 +537,8 @@ mod tests {
let objs = RefCell::new(vec![vec![1], vec![2], vec![3]]);
let holding = objs.borrow().clone();
let objs_as_slices = holding.iter().map(|x| x.as_slice()).collect::<Vec<_>>();
let pool = CuHostMemoryPool::new("mytestcudapool", 3, || objs.borrow_mut().pop().unwrap());
let pool = CuHostMemoryPool::new("mytestcudapool", 3, || objs.borrow_mut().pop().unwrap())
.unwrap();

let obj1 = pool.acquire().unwrap();
{
Expand Down Expand Up @@ -551,7 +567,7 @@ mod tests {
fn test_cuda_pool() {
use cudarc::driver::CudaDevice;
let device = CudaDevice::new(0).unwrap();
let pool = CuCudaPool::<f32>::new("mytestcudapool", device, 3, 1);
let pool = CuCudaPool::<f32>::new("mytestcudapool", device, 3, 1).unwrap();

let _obj1 = pool.acquire().unwrap();

Expand All @@ -578,8 +594,8 @@ mod tests {
fn test_copy_roundtrip() {
use cudarc::driver::CudaDevice;
let device = CudaDevice::new(0).unwrap();
let host_pool = CuHostMemoryPool::new("mytesthostpool", 3, || vec![0.0; 1]);
let cuda_pool = CuCudaPool::<f32>::new("mytestcudapool", device, 3, 1);
let host_pool = CuHostMemoryPool::new("mytesthostpool", 3, || vec![0.0; 1]).unwrap();
let cuda_pool = CuCudaPool::<f32>::new("mytestcudapool", device, 3, 1).unwrap();

let cuda_handle = {
let mut initial_handle = host_pool.acquire().unwrap();
Expand Down

0 comments on commit 4f701ea

Please sign in to comment.