diff --git a/.github/workflows/general.yml b/.github/workflows/general.yml index e7b192b21..0f5ecafb0 100644 --- a/.github/workflows/general.yml +++ b/.github/workflows/general.yml @@ -2,9 +2,9 @@ name: "CI/CD" on: push: - branches: ["master"] + branches: [ "master" ] pull_request: - branches: ["master"] + branches: [ "master" ] env: CARGO_TERM_COLOR: always @@ -17,8 +17,8 @@ jobs: strategy: matrix: - os: [ubuntu-latest, macos-latest, windows-latest] - mode: [debug, release] + os: [ ubuntu-latest, macos-latest, windows-latest ] + mode: [ debug, release ] steps: - uses: actions/checkout@v4 @@ -36,10 +36,32 @@ jobs: - name: Check formatting run: cargo +stable fmt --all -- --check + - name: Free Disk Space (Ubuntu) + if: runner.os == 'Linux' + uses: jlumbroso/free-disk-space@main + with: + # this might remove tools that are actually needed, + # if set to "true" but frees about 6 GB + tool-cache: false + # all of these default to true, but feel free to set to + # "false" if necessary for your workflow + android: true + dotnet: true + haskell: true + large-packages: false + docker-images: true + swap-storage: true + - name: Install dependencies (Linux) if: runner.os == 'Linux' run: sudo apt-get update && sudo apt-get install -y libudev-dev libpcap-dev + - name: Install CUDA + uses: Jimver/cuda-toolkit@master + if: runner.os != 'macOS' + with: + log-file-suffix: '${{ matrix.os }}-${{ matrix.mode }}.txt' + - name: Install dependencies (Windows) if: runner.os == 'Windows' run: | diff --git a/components/payloads/cu_sensor_payloads/src/image.rs b/components/payloads/cu_sensor_payloads/src/image.rs index 8a87098d2..39a21954c 100644 --- a/components/payloads/cu_sensor_payloads/src/image.rs +++ b/components/payloads/cu_sensor_payloads/src/image.rs @@ -1,5 +1,8 @@ +use bincode::de::Decoder; +use bincode::error::DecodeError; use bincode::{Decode, Encode}; -use cu29::prelude::CuBufferHandle; +use cu29::prelude::{ArrayLike, CuHandle}; +use std::fmt::Debug; #[allow(unused_imports)] use cu29::{CuError, CuResult}; @@ -17,43 +20,72 @@ pub struct CuImageBufferFormat { pub pixel_format: [u8; 4], } -#[derive(Debug, Default, Clone, Decode, Encode)] -pub struct CuImage { +impl CuImageBufferFormat { + pub fn byte_size(&self) -> usize { + self.stride as usize * self.height as usize + } +} + +#[derive(Debug, Default, Clone, Encode)] +pub struct CuImage +where + A: ArrayLike, +{ pub seq: u64, pub format: CuImageBufferFormat, - pub buffer_handle: CuBufferHandle, + pub buffer_handle: CuHandle, } -impl CuImage { - pub fn new(format: CuImageBufferFormat, buffer_handle: CuBufferHandle) -> Self { +impl Decode for CuImage> { + fn decode(decoder: &mut D) -> Result { + let seq = u64::decode(decoder)?; + let format = CuImageBufferFormat::decode(decoder)?; + let buffer = Vec::decode(decoder)?; + let buffer_handle = CuHandle::new_detached(buffer); + + Ok(Self { + seq, + format, + buffer_handle, + }) + } +} + +impl CuImage +where + A: ArrayLike, +{ + pub fn new(format: CuImageBufferFormat, buffer_handle: CuHandle) -> Self { + assert!( + format.byte_size() < buffer_handle.with_inner(|i| i.len()), + "Buffer size must at least match the format." + ); CuImage { seq: 0, format, buffer_handle, } } - - pub fn as_slice(&self) -> &[u8] { - self.buffer_handle.as_slice() - } } -impl CuImage { +impl CuImage +where + A: ArrayLike, +{ /// Builds an ImageBuffer from the image crate backed by the CuImage's pixel data. #[cfg(feature = "image")] pub fn as_image_buffer(&self) -> CuResult> { let width = self.format.width; let height = self.format.height; - let data = self.buffer_handle.as_slice(); - assert_eq!( width, self.format.stride, "STRIDE must equal WIDTH for ImageBuffer compatibility." ); - let raw_pixels: &[P::Subpixel] = - unsafe { core::slice::from_raw_parts(data.as_ptr() as *const P::Subpixel, data.len()) }; - + let raw_pixels: &[P::Subpixel] = self.buffer_handle.with_inner(|inner| unsafe { + let data: &[u8] = inner; + core::slice::from_raw_parts(data.as_ptr() as *const P::Subpixel, data.len()) + }); ImageBuffer::from_raw(width, height, raw_pixels) .ok_or("Could not create the image:: buffer".into()) } @@ -62,7 +94,6 @@ impl CuImage { pub fn as_kornia_image(&self) -> CuResult> { let width = self.format.width as usize; let height = self.format.height as usize; - let data = self.buffer_handle.as_slice(); assert_eq!( width, self.format.stride as usize, @@ -70,10 +101,13 @@ impl CuImage { ); let size = width * height * C; - - let raw_pixels: &[T] = unsafe { - core::slice::from_raw_parts(data.as_ptr() as *const T, data.len() / size_of::()) - }; + let raw_pixels: &[T] = self.buffer_handle.with_inner(|inner| unsafe { + let data: &[u8] = inner; + core::slice::from_raw_parts( + data.as_ptr() as *const T, + data.len() / std::mem::size_of::(), + ) + }); unsafe { Image::from_raw_parts([height, width].into(), raw_pixels.as_ptr(), size) } .map_err(|e| CuError::new_with_cause("Could not create a Kornia Image", e)) diff --git a/components/sources/cu_v4l/src/lib.rs b/components/sources/cu_v4l/src/lib.rs index ac747854e..84bc1ea0c 100644 --- a/components/sources/cu_v4l/src/lib.rs +++ b/components/sources/cu_v4l/src/lib.rs @@ -12,7 +12,7 @@ mod empty_impl { impl Freezable for V4l {} impl<'cl> CuSrcTask<'cl> for V4l { - type Output = output_msg!('cl, CuImage); + type Output = output_msg!('cl, CuImage>); fn new(_config: Option<&ComponentConfig>) -> CuResult where @@ -52,9 +52,8 @@ mod linux_impl { pub use v4l::{Format, FourCC, Timestamp}; // A Copper source task that reads frames from a V4L device. - // BS is the image buffer size to be used. ie the maximum size of the image buffer. pub struct V4l { - stream: CuV4LStream, // move that as a generic parameter + stream: CuV4LStream, settled_format: CuImageBufferFormat, v4l_clock_time_offset_ns: i64, } @@ -67,7 +66,7 @@ mod linux_impl { } impl<'cl> CuSrcTask<'cl> for V4l { - type Output = output_msg!('cl, CuImage); + type Output = output_msg!('cl, CuImage>); fn new(_config: Option<&ComponentConfig>) -> CuResult where @@ -154,7 +153,6 @@ mod linux_impl { }; (size.width, size.height) }; - debug!("V4L: Use resolution: {}x{}", width, height); // Set the format with the chosen resolution let req_fmt = Format::new(width, height, fourcc); @@ -168,6 +166,10 @@ mod linux_impl { dev.set_params(&new_params) .map_err(|e| CuError::new_with_cause("Failed to set params", e))?; } + debug!( + "V4L: Negotiated resolution: {}x{}", + actual_fmt.width, actual_fmt.height + ); actual_fmt } else { return Err(format!( @@ -310,8 +312,7 @@ mod linux_impl { for _ in 0..1000 { let _output = v4l.process(&clock, &mut msg); if let Some(frame) = msg.payload() { - debug!("Buffer index: {}", frame.buffer_handle.index()); - let slice = frame.as_slice(); + let slice: &[u8] = &frame.buffer_handle.lock().unwrap(); let arrow_buffer = ArrowBuffer::from(slice); let blob = Blob::from(arrow_buffer); let rerun_img = ImageBuffer::from(blob); diff --git a/components/sources/cu_v4l/src/v4lstream.rs b/components/sources/cu_v4l/src/v4lstream.rs index ab4ff6ec7..7213a9d40 100644 --- a/components/sources/cu_v4l/src/v4lstream.rs +++ b/components/sources/cu_v4l/src/v4lstream.rs @@ -1,22 +1,22 @@ -use cu29::prelude::{CuBufferHandle, CuHostMemoryPool}; +use cu29::prelude::CuPool; +use cu29::prelude::{CuHandle, CuHostMemoryPool}; use std::convert::TryInto; -use std::rc::Rc; use std::time::Duration; use std::{io, mem, sync::Arc}; use v4l::buffer::{Metadata, Type}; use v4l::device::Handle; use v4l::io::traits::{CaptureStream, Stream}; use v4l::memory::Memory; -use v4l::v4l_sys::*; +use v4l::v4l_sys::{v4l2_buffer, v4l2_buffer__bindgen_ty_1, v4l2_format, v4l2_requestbuffers}; use v4l::{v4l2, Device}; // A specialized V4L stream that uses Copper Buffers for memory management. pub struct CuV4LStream { v4l_handle: Arc, v4l_buf_type: Type, - memory_pool: Rc, + memory_pool: CuHostMemoryPool>, // Arena matching the vl42 metadata and the Copper Buffers - arena: Vec<(Metadata, Option)>, + arena: Vec<(Metadata, Option>>)>, arena_last_freed_up_index: usize, timeout: Option, active: bool, @@ -34,13 +34,13 @@ impl CuV4LStream { buf_size: usize, buf_count: u32, ) -> io::Result { - let memory_pool = CuHostMemoryPool::new(buf_size, buf_count + 1, page_size::get()); // +1 to be able to queue one last buffer before zapping the first + let memory_pool = CuHostMemoryPool::new(buf_count as usize + 1, || vec![0; buf_size]); // +1 to be able to queue one last buffer before zapping the first let mut arena = Vec::new(); arena.resize(buf_count as usize, (Metadata::default(), None)); let mut result = CuV4LStream { v4l_handle: dev.handle(), - memory_pool: Rc::new(memory_pool), + memory_pool, arena, arena_last_freed_up_index: 0, v4l_buf_type: buf_type, @@ -151,7 +151,7 @@ impl Drop for CuV4LStream { } impl Stream for CuV4LStream { - type Item = CuBufferHandle; + type Item = CuHandle>; fn start(&mut self) -> io::Result<()> { // Enqueue all buffers once on stream start @@ -191,20 +191,20 @@ impl Stream for CuV4LStream { impl CaptureStream<'_> for CuV4LStream { fn queue(&mut self, index: usize) -> io::Result<()> { - let buffer_handle = CuHostMemoryPool::allocate(&self.memory_pool).ok_or(io::Error::new( - io::ErrorKind::Other, - "Failed to allocate buffer", - ))?; + let buffer_handle = self.memory_pool.acquire().unwrap(); + self.arena[index] = (Metadata::default(), Some(buffer_handle.clone())); + let mut v4l2_buf = buffer_handle.with_inner_mut(|inner| { + let destination: &mut [u8] = inner; - let buf: &[u8] = buffer_handle.as_slice(); - let mut v4l2_buf = v4l2_buffer { - index: index as u32, - m: v4l2_buffer__bindgen_ty_1 { - userptr: buf.as_ptr() as std::os::raw::c_ulong, - }, - length: buf.len() as u32, - ..self.buffer_desc() - }; + v4l2_buffer { + index: index as u32, + m: v4l2_buffer__bindgen_ty_1 { + userptr: destination.as_ptr() as std::os::raw::c_ulong, + }, + length: destination.len() as u32, + ..self.buffer_desc() + } + }); unsafe { v4l2::ioctl( self.v4l_handle.fd(), @@ -212,7 +212,6 @@ impl CaptureStream<'_> for CuV4LStream { &mut v4l2_buf as *mut _ as *mut std::os::raw::c_void, )?; } - self.arena[index] = (Metadata::default(), Some(buffer_handle)); Ok(()) } @@ -238,12 +237,6 @@ impl CaptureStream<'_> for CuV4LStream { )?; } let index = v4l2_buf.index as usize; - // println!("dequeue: dequeueing buffer {}", index); - // println!(" length {}", v4l2_buf.length); - // println!(" bytesused {}", v4l2_buf.bytesused); - // println!(" timestamp {:?}", v4l2_buf.timestamp); - // println!(" sequence {}", v4l2_buf.sequence); - // println!(" field {}", v4l2_buf.field); self.arena[index].0 = Metadata { bytesused: v4l2_buf.bytesused, diff --git a/core/cu29_runtime/Cargo.toml b/core/cu29_runtime/Cargo.toml index 83dcb415a..6a45e7c48 100644 --- a/core/cu29_runtime/Cargo.toml +++ b/core/cu29_runtime/Cargo.toml @@ -34,7 +34,14 @@ cu29-clock = { workspace = true } clap = { workspace = true } tempfile = { workspace = true } arrayvec = "0.7.6" - ron = "0.8.1" hdrhistogram = "7.5.4" petgraph = { version = "0.7.1", features = ["serde", "serde-1", "serde_derive"] } +object-pool = "0.6.0" + +[target.'cfg(target_os = "linux")'.dependencies] +cudarc = { version = "0.13", optional = true, features = ["cuda-version-from-build-system"] } + +[features] +default = [] +cuda = ["cudarc"] \ No newline at end of file diff --git a/core/cu29_runtime/src/pool.rs b/core/cu29_runtime/src/pool.rs index 801611ba1..b6b0629fd 100644 --- a/core/cu29_runtime/src/pool.rs +++ b/core/cu29_runtime/src/pool.rs @@ -1,235 +1,536 @@ -use std::alloc::{alloc, dealloc, Layout}; -use std::cell::RefCell; -use std::fmt::Debug; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering; - use bincode::de::Decoder; use bincode::enc::Encoder; use bincode::error::{DecodeError, EncodeError}; -use bincode::{BorrowDecode, Decode, Encode}; -use std::rc::{Rc, Weak}; +use bincode::{Decode, Encode}; +use object_pool::{Pool, ReusableOwned}; +use std::alloc::{alloc, dealloc, Layout}; +use std::fmt::Debug; +use std::ops::{Deref, DerefMut}; +use std::sync::{Arc, Mutex}; -use crate::log::*; +/// Basic Type that can be used in a buffer in a CuPool. +pub trait ElementType: + Default + Sized + Copy + Encode + Decode + Debug + Unpin + Send + Sync +{ +} -pub struct CuHostMemoryPool { - buffers: RefCell>, - inflight_counters: Box<[AtomicUsize]>, +/// Blanket implementation for all types that are Sized, Copy, Encode, Decode and Debug. +impl ElementType for T where + T: Default + Sized + Copy + Encode + Decode + Debug + Unpin + Send + Sync +{ } -#[derive(Debug)] -pub struct AlignedBuffer { - ptr: *mut u8, - size: usize, - layout: Layout, +pub trait ArrayLike: Deref + DerefMut + Debug { + type Element: ElementType; } -impl AlignedBuffer { - pub fn new(size: usize, alignment: usize) -> Self { - let layout = Layout::from_size_align(size, alignment).unwrap(); - let ptr = unsafe { alloc(layout) }; - debug!("Allocated buffer at {} with size {}", ptr as usize, size); - if ptr.is_null() { - panic!("Failed to allocate memory"); +pub enum CuHandleInner { + Pooled(ReusableOwned), + Detached(T), // Should only be used in offline cases (e.g. deserialization) +} + +impl Debug for CuHandleInner +where + T: Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + CuHandleInner::Pooled(r) => { + write!(f, "Pooled: {:?}", r.deref()) + } + CuHandleInner::Detached(r) => write!(f, "Detached: {:?}", r), } - Self { ptr, size, layout } } +} - pub fn as_slice(&self) -> &[u8] { - unsafe { std::slice::from_raw_parts(self.ptr, self.size) } - } +impl Deref for CuHandleInner { + type Target = [T::Element]; - pub fn as_mut_slice(&mut self) -> &mut [u8] { - unsafe { std::slice::from_raw_parts_mut(self.ptr, self.size) } + fn deref(&self) -> &Self::Target { + match self { + CuHandleInner::Pooled(pooled) => pooled, + CuHandleInner::Detached(detached) => detached, + } } } -impl Drop for AlignedBuffer { - fn drop(&mut self) { - if !self.ptr.is_null() { - unsafe { - dealloc(self.ptr, self.layout); - } +impl DerefMut for CuHandleInner { + fn deref_mut(&mut self) -> &mut Self::Target { + match self { + CuHandleInner::Pooled(pooled) => pooled.deref_mut(), + CuHandleInner::Detached(detached) => detached, } } } -pub struct CuBufferHandle { - index: usize, - pool: Weak, +/// A shareable handle to an Array coming from a pool (either host or device). +#[derive(Clone, Debug)] +pub struct CuHandle(Arc>>); + +impl Deref for CuHandle { + type Target = Arc>>; + + fn deref(&self) -> &Self::Target { + &self.0 + } } -impl Encode for CuBufferHandle { +impl CuHandle { + /// Create a new CuHandle not part of a Pool (not for onboard usages, use pools instead) + pub fn new_detached(inner: T) -> Self { + CuHandle(Arc::new(Mutex::new(CuHandleInner::Detached(inner)))) + } + + /// Safely access the inner value, applying a closure to it. + pub fn with_inner(&self, f: impl FnOnce(&CuHandleInner) -> R) -> R { + let lock = self.lock().unwrap(); + f(&*lock) + } + + /// Mutably access the inner value, applying a closure to it. + pub fn with_inner_mut(&self, f: impl FnOnce(&mut CuHandleInner) -> R) -> R { + let mut lock = self.lock().unwrap(); + f(&mut *lock) + } +} + +impl Encode for CuHandle +where + ::Element: 'static, +{ fn encode(&self, encoder: &mut E) -> Result<(), EncodeError> { - self.as_slice().encode(encoder) + let inner = self.lock().unwrap(); + match inner.deref() { + CuHandleInner::Pooled(pooled) => pooled.encode(encoder), + CuHandleInner::Detached(detached) => detached.encode(encoder), + } } } -impl Decode for CuBufferHandle { - fn decode(_decoder: &mut D) -> Result { - todo!() +impl Default for CuHandle { + fn default() -> Self { + panic!("Cannot create a default CuHandle") } } -impl BorrowDecode<'_> for CuBufferHandle { - fn borrow_decode(_decoder: &mut D) -> Result { - todo!() +impl Decode for CuHandle> { + fn decode(decoder: &mut D) -> Result { + let vec: Vec = Vec::decode(decoder)?; + Ok(CuHandle(Arc::new(Mutex::new(CuHandleInner::Detached(vec))))) } } -impl Default for CuBufferHandle { - fn default() -> Self { +pub trait CuPool { + fn acquire(&self) -> Option>; + fn space_left(&self) -> usize; + fn copy_from(&self, from: &mut CuHandle) -> CuHandle + where + O: ArrayLike; +} + +pub trait DeviceCuPool { + type O: ArrayLike; + + /// Takes a handle to a device buffer and copies it into a host buffer pool. + /// It returns a new handle from the host pool with the data from the device handle given. + fn copy_into( + &self, + from_device_handle: &CuHandle, + into_cu_host_memory_pool: &mut CuHostMemoryPool, + ) -> CuHandle; +} + +pub struct CuHostMemoryPool { + pool: Arc>, +} + +impl CuHostMemoryPool { + pub fn new(size: usize, f: F) -> Self + where + F: Fn() -> T, + { Self { - index: 0, - pool: Weak::new(), // An already dead ref + pool: Arc::new(Pool::new(size, f)), } } } -impl Clone for CuBufferHandle { - fn clone(&self) -> Self { - if let Some(pool) = self.pool.upgrade() { - pool.inflight_counters[self.index].fetch_add(1, Ordering::SeqCst); +impl CuPool for CuHostMemoryPool { + fn acquire(&self) -> Option> { + let owned_object = self.pool.try_pull_owned(); // Use the owned version + + owned_object.map(|reusable| CuHandle(Arc::new(Mutex::new(CuHandleInner::Pooled(reusable))))) + } + + fn space_left(&self) -> usize { + self.pool.len() + } + + fn copy_from>(&self, from: &mut CuHandle) -> CuHandle { + let to_handle = self.acquire().expect("No available buffers in the pool"); + + match from.lock().unwrap().deref() { + CuHandleInner::Detached(source) => match to_handle.lock().unwrap().deref_mut() { + CuHandleInner::Detached(destination) => { + destination.copy_from_slice(source); + } + CuHandleInner::Pooled(destination) => { + destination.copy_from_slice(source); + } + }, + CuHandleInner::Pooled(source) => match to_handle.lock().unwrap().deref_mut() { + CuHandleInner::Detached(destination) => { + destination.copy_from_slice(source); + } + CuHandleInner::Pooled(destination) => { + destination.copy_from_slice(source); + } + }, } + to_handle + } +} +impl CuHostMemoryPool +where + T: ArrayLike, +{ + pub fn new_buffers(size: usize, f: F) -> Self + where + F: Fn() -> T, + { Self { - index: self.index, - pool: self.pool.clone(), + pool: Arc::new(Pool::new(size, f)), } } } -impl CuBufferHandle { - fn new(index: usize, pool: &Rc) -> Self { - Self { - index, - pool: Rc::::downgrade(pool), +impl ArrayLike for Vec { + type Element = E; +} + +#[cfg(all(feature = "cuda", target_os = "linux"))] +mod cuda { + use super::*; + use cudarc::driver::{CudaDevice, CudaSlice, DeviceRepr, ValidAsZeroBits}; + use std::sync::Arc; + + #[derive(Debug)] + pub struct CudaSliceWrapper(CudaSlice); + + impl Deref for CudaSliceWrapper + where + E: ElementType, + { + type Target = [E]; + + fn deref(&self) -> &Self::Target { + // Implement logic to return a slice + panic!("You need to copy data to host memory pool before accessing it."); } } - pub fn as_slice(&self) -> &[u8] { - // as long as the pool is alive, the buffer is alive - if let Some(pool) = self.pool.upgrade() { - let buffers = pool.buffers.borrow(); - let buffer = buffers[self.index].as_slice(); - unsafe { std::slice::from_raw_parts(buffer.as_ptr(), buffer.len()) } - } else { - panic!("Pool is dead"); + impl DerefMut for CudaSliceWrapper + where + E: ElementType, + { + fn deref_mut(&mut self) -> &mut Self::Target { + panic!("You need to copy data to host memory pool before accessing it."); } } - pub fn as_slice_mut(&mut self) -> &mut [u8] { - if let Some(pool) = self.pool.upgrade() { - let mut buffers = pool.buffers.borrow_mut(); - let buffer = buffers[self.index].as_mut_slice(); - unsafe { std::slice::from_raw_parts_mut(buffer.as_mut_ptr(), buffer.len()) } - } else { - panic!("Pool is dead"); + impl ArrayLike for CudaSliceWrapper { + type Element = E; + } + + impl CudaSliceWrapper { + pub fn as_cuda_slice(&self) -> &CudaSlice { + &self.0 + } + + pub fn as_cuda_slice_mut(&mut self) -> &mut CudaSlice { + &mut self.0 } } - pub fn index(&self) -> usize { - self.index + pub struct CuCudaPool + where + E: ElementType + ValidAsZeroBits + DeviceRepr + Unpin, + { + device: Arc, + pool: Arc>>, } -} -impl Drop for CuBufferHandle { - fn drop(&mut self) { - if let Some(pool) = self.pool.upgrade() { - let remaining = pool.inflight_counters[self.index].fetch_sub(1, Ordering::SeqCst); - debug!("Dropping buffer handle, remaining: {}", remaining); + impl CuCudaPool { + #[allow(dead_code)] + pub fn new( + device: Arc, + nb_buffers: usize, + nb_element_per_buffer: usize, + ) -> Self { + Self { + device: device.clone(), + pool: Arc::new(Pool::new(nb_buffers, || { + CudaSliceWrapper( + device + .alloc_zeros(nb_element_per_buffer) + .expect("Failed to allocate device memory"), + ) + })), + } + } + + #[allow(dead_code)] + pub fn new_with(device: Arc, nb_buffers: usize, f: F) -> Self + where + F: Fn() -> CudaSliceWrapper, + { + Self { + device: device.clone(), + pool: Arc::new(Pool::new(nb_buffers, f)), + } } } -} -impl Debug for CuBufferHandle { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let pool = self.pool.upgrade().unwrap(); - let buffers = pool.buffers.borrow(); - f.debug_struct("CuBufferHandle") - .field("index", &self.index) - .field("buffer", &buffers[self.index]) - .finish() + impl CuPool> for CuCudaPool { + fn acquire(&self) -> Option>> { + self.pool + .try_pull_owned() + .map(|x| CuHandle(Arc::new(Mutex::new(CuHandleInner::Pooled(x))))) + } + + fn space_left(&self) -> usize { + self.pool.len() + } + + /// Copy from host to device + fn copy_from(&self, from_handle: &mut CuHandle) -> CuHandle> + where + O: ArrayLike, + { + let to_handle = self.acquire().expect("No available buffers in the pool"); + + match from_handle.lock().unwrap().deref() { + CuHandleInner::Detached(from) => match to_handle.lock().unwrap().deref_mut() { + CuHandleInner::Detached(CudaSliceWrapper(to)) => { + self.device + .htod_sync_copy_into(from, to) + .expect("Failed to copy data to device"); + } + CuHandleInner::Pooled(to) => { + self.device + .htod_sync_copy_into(from, to.as_cuda_slice_mut()) + .expect("Failed to copy data to device"); + } + }, + CuHandleInner::Pooled(from) => match to_handle.lock().unwrap().deref_mut() { + CuHandleInner::Detached(CudaSliceWrapper(to)) => { + self.device + .htod_sync_copy_into(from, to) + .expect("Failed to copy data to device"); + } + CuHandleInner::Pooled(to) => { + self.device + .htod_sync_copy_into(from, to.as_cuda_slice_mut()) + .expect("Failed to copy data to device"); + } + }, + } + to_handle + } } -} -impl CuHostMemoryPool { - pub fn new(buffer_size: usize, buffer_count: u32, alignment: usize) -> Self { - let mut buffers: Vec = Vec::with_capacity(buffer_count as usize); + impl DeviceCuPool for CuCudaPool + where + E: ElementType + ValidAsZeroBits + DeviceRepr, + T: ArrayLike, + { + type O = CudaSliceWrapper; - for _ in 0..buffer_count { - buffers.push(AlignedBuffer::new(buffer_size, alignment)); + /// Copy from device to host + fn copy_into( + &self, + device_handle: &CuHandle, + cu_host_memory_pool: &mut CuHostMemoryPool, + ) -> CuHandle { + let destination_handle = cu_host_memory_pool + .acquire() + .expect("No available buffers in the pool"); + + match device_handle.lock().unwrap().deref() { + CuHandleInner::Pooled(source) => { + match destination_handle.lock().unwrap().deref_mut() { + CuHandleInner::Pooled(ref mut destination) => { + self.device + .dtoh_sync_copy_into(source.as_cuda_slice(), destination) + .expect("Failed to copy data to device"); + } + CuHandleInner::Detached(ref mut destination) => { + self.device + .dtoh_sync_copy_into(source.as_cuda_slice(), destination) + .expect("Failed to copy data to device"); + } + } + } + CuHandleInner::Detached(source) => { + match destination_handle.lock().unwrap().deref_mut() { + CuHandleInner::Pooled(ref mut destination) => { + self.device + .dtoh_sync_copy_into(source.as_cuda_slice(), destination) + .expect("Failed to copy data to device"); + } + CuHandleInner::Detached(ref mut destination) => { + self.device + .dtoh_sync_copy_into(source.as_cuda_slice(), destination) + .expect("Failed to copy data to device"); + } + } + } + } + + destination_handle } + } +} - let counters = (0..buffer_count) - .map(|_| AtomicUsize::new(0)) - .collect::>() - .into_boxed_slice(); +#[derive(Debug)] +/// A buffer that is aligned to a specific size with the Element of type E. +pub struct AlignedBuffer { + ptr: *mut E, + size: usize, + layout: Layout, +} + +impl AlignedBuffer { + pub fn new(num_elements: usize, alignment: usize) -> Self { + let layout = Layout::from_size_align(num_elements * size_of::(), alignment).unwrap(); + let ptr = unsafe { alloc(layout) as *mut E }; + if ptr.is_null() { + panic!("Failed to allocate memory"); + } Self { - buffers: RefCell::new(buffers), - inflight_counters: counters, + ptr, + size: num_elements, + layout, } } +} + +impl Deref for AlignedBuffer { + type Target = [E]; + + fn deref(&self) -> &Self::Target { + unsafe { std::slice::from_raw_parts(self.ptr, self.size) } + } +} + +impl DerefMut for AlignedBuffer { + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { std::slice::from_raw_parts_mut(self.ptr, self.size) } + } +} - pub fn allocate(self_rc: &Rc) -> Option { - for (index, counter) in self_rc.inflight_counters.iter().enumerate() { - let prev = counter.fetch_add(1, Ordering::SeqCst); - if prev == 0 { - return Some(CuBufferHandle::new(index, self_rc)); - } else { - counter.fetch_sub(1, Ordering::SeqCst); +impl Drop for AlignedBuffer { + fn drop(&mut self) { + if !self.ptr.is_null() { + unsafe { + dealloc(self.ptr as *mut u8, self.layout); } } - None - } - pub fn size(&self) -> usize { - self.buffers.borrow().len() } } #[cfg(test)] mod tests { use super::*; + #[cfg(all(feature = "cuda", target_os = "linux"))] + use crate::pool::cuda::CuCudaPool; + use std::cell::RefCell; #[test] - fn test_full_size_pool() { - let pool = Rc::new(CuHostMemoryPool::new(10, 10, 4096)); - let mut handles = Vec::new(); - for i in 0..10 { - let mut handle = CuHostMemoryPool::allocate(&pool).unwrap(); - handle.as_slice_mut()[0] = 10 - i; - handles.push(handle); + fn test_pool() { + let objs = RefCell::new(vec![vec![1], vec![2], vec![3]]); + let holding = objs.borrow().clone(); + let objs_as_slices = holding.iter().map(|x| x.as_slice()).collect::>(); + let pool = CuHostMemoryPool::new(3, || objs.borrow_mut().pop().unwrap()); + + let obj1 = pool.acquire().unwrap(); + { + let obj2 = pool.acquire().unwrap(); + assert!(objs_as_slices.contains(&obj1.lock().unwrap().deref().deref())); + assert!(objs_as_slices.contains(&obj2.lock().unwrap().deref().deref())); + assert_eq!(pool.space_left(), 1); } - assert!(CuHostMemoryPool::allocate(&pool).is_none()); - drop(handles); + assert_eq!(pool.space_left(), 2); + + let obj3 = pool.acquire().unwrap(); + assert!(objs_as_slices.contains(&obj3.lock().unwrap().deref().deref())); + + assert_eq!(pool.space_left(), 1); + + let _obj4 = pool.acquire().unwrap(); + assert_eq!(pool.space_left(), 0); + + let obj5 = pool.acquire(); + assert!(obj5.is_none()); } + #[cfg(all(feature = "cuda", target_os = "linux"))] #[test] - fn test_pool_with_holes() { - let pool = Rc::new(CuHostMemoryPool::new(10, 10, 4096)); - let mut handles = Vec::new(); - for i in 0..10 { - let mut handle = CuHostMemoryPool::allocate(&pool).unwrap(); - handle.as_slice_mut()[0] = 10 - i; - if i % 2 == 0 { - drop(handle); - } else { - handles.push(handle); - } - } - for i in 0..5 { - let mut handle = CuHostMemoryPool::allocate(&pool).unwrap(); - handle.as_slice_mut()[0] = 10 - i; - handles.push(handle); + #[ignore] // Can only be executed if a real CUDA device is present + fn test_cuda_pool() { + use cudarc::driver::CudaDevice; + let device = CudaDevice::new(0).unwrap(); + let pool = CuCudaPool::::new(device, 3, 1); + + let _obj1 = pool.acquire().unwrap(); + + { + let _obj2 = pool.acquire().unwrap(); + assert_eq!(pool.space_left(), 1); } - assert!(CuHostMemoryPool::allocate(&pool).is_none()); - drop(handles); + assert_eq!(pool.space_left(), 2); + + let _obj3 = pool.acquire().unwrap(); + + assert_eq!(pool.space_left(), 1); + + let _obj4 = pool.acquire().unwrap(); + assert_eq!(pool.space_left(), 0); + + let obj5 = pool.acquire(); + assert!(obj5.is_none()); } + #[cfg(all(feature = "cuda", target_os = "linux"))] #[test] - fn test_alignment() { - let pool = Rc::new(CuHostMemoryPool::new(10, 10, 4096)); - let handle = CuHostMemoryPool::allocate(&pool).unwrap(); - assert_eq!(handle.as_slice().as_ptr() as usize % 4096, 0); + #[ignore] // Can only be executed if a real CUDA device is present + fn test_copy_roundtrip() { + use cudarc::driver::CudaDevice; + let device = CudaDevice::new(0).unwrap(); + let mut host_pool = CuHostMemoryPool::new(3, || vec![0.0; 1]); + + let cuda_pool = CuCudaPool::::new(device, 3, 1); + + let cuda_handle = { + let mut initial_handle = host_pool.acquire().unwrap(); + { + let mut inner_initial_handle = initial_handle.lock().unwrap(); + if let CuHandleInner::Pooled(ref mut pooled) = *inner_initial_handle { + pooled[0] = 42.0; + } else { + panic!(); + } + } + + // send that to the GPU + cuda_pool.copy_from(&mut initial_handle) + }; + + // get it back to the host + let final_handle = cuda_pool.copy_into(&cuda_handle, &mut host_pool); + + let value = final_handle.lock().unwrap().deref().deref()[0]; + assert_eq!(value, 42.0); } }