Skip to content

Commit

Permalink
Host and Device Memory Pool with initial CUDA support + port of exist…
Browse files Browse the repository at this point in the history
…ing code. (#201)
  • Loading branch information
gbin authored Jan 17, 2025
1 parent d5aa7e7 commit 0c34cce
Show file tree
Hide file tree
Showing 6 changed files with 567 additions and 209 deletions.
30 changes: 26 additions & 4 deletions .github/workflows/general.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: "CI/CD"

on:
push:
branches: ["master"]
branches: [ "master" ]
pull_request:
branches: ["master"]
branches: [ "master" ]

env:
CARGO_TERM_COLOR: always
Expand All @@ -17,8 +17,8 @@ jobs:

strategy:
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
mode: [debug, release]
os: [ ubuntu-latest, macos-latest, windows-latest ]
mode: [ debug, release ]

steps:
- uses: actions/checkout@v4
Expand All @@ -36,10 +36,32 @@ jobs:
- name: Check formatting
run: cargo +stable fmt --all -- --check

- name: Free Disk Space (Ubuntu)
if: runner.os == 'Linux'
uses: jlumbroso/free-disk-space@main
with:
# this might remove tools that are actually needed,
# if set to "true" but frees about 6 GB
tool-cache: false
# all of these default to true, but feel free to set to
# "false" if necessary for your workflow
android: true
dotnet: true
haskell: true
large-packages: false
docker-images: true
swap-storage: true

- name: Install dependencies (Linux)
if: runner.os == 'Linux'
run: sudo apt-get update && sudo apt-get install -y libudev-dev libpcap-dev

- name: Install CUDA
uses: Jimver/cuda-toolkit@master
if: runner.os != 'macOS'
with:
log-file-suffix: '${{ matrix.os }}-${{ matrix.mode }}.txt'

- name: Install dependencies (Windows)
if: runner.os == 'Windows'
run: |
Expand Down
76 changes: 55 additions & 21 deletions components/payloads/cu_sensor_payloads/src/image.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use bincode::de::Decoder;
use bincode::error::DecodeError;
use bincode::{Decode, Encode};
use cu29::prelude::CuBufferHandle;
use cu29::prelude::{ArrayLike, CuHandle};
use std::fmt::Debug;

#[allow(unused_imports)]
use cu29::{CuError, CuResult};
Expand All @@ -17,43 +20,72 @@ pub struct CuImageBufferFormat {
pub pixel_format: [u8; 4],
}

#[derive(Debug, Default, Clone, Decode, Encode)]
pub struct CuImage {
impl CuImageBufferFormat {
pub fn byte_size(&self) -> usize {
self.stride as usize * self.height as usize
}
}

#[derive(Debug, Default, Clone, Encode)]
pub struct CuImage<A>
where
A: ArrayLike<Element = u8>,
{
pub seq: u64,
pub format: CuImageBufferFormat,
pub buffer_handle: CuBufferHandle,
pub buffer_handle: CuHandle<A>,
}

impl CuImage {
pub fn new(format: CuImageBufferFormat, buffer_handle: CuBufferHandle) -> Self {
impl Decode for CuImage<Vec<u8>> {
fn decode<D: Decoder>(decoder: &mut D) -> Result<Self, DecodeError> {
let seq = u64::decode(decoder)?;
let format = CuImageBufferFormat::decode(decoder)?;
let buffer = Vec::decode(decoder)?;
let buffer_handle = CuHandle::new_detached(buffer);

Ok(Self {
seq,
format,
buffer_handle,
})
}
}

impl<A> CuImage<A>
where
A: ArrayLike<Element = u8>,
{
pub fn new(format: CuImageBufferFormat, buffer_handle: CuHandle<A>) -> Self {
assert!(
format.byte_size() < buffer_handle.with_inner(|i| i.len()),
"Buffer size must at least match the format."
);
CuImage {
seq: 0,
format,
buffer_handle,
}
}

pub fn as_slice(&self) -> &[u8] {
self.buffer_handle.as_slice()
}
}

impl CuImage {
impl<A> CuImage<A>
where
A: ArrayLike<Element = u8>,
{
/// Builds an ImageBuffer from the image crate backed by the CuImage's pixel data.
#[cfg(feature = "image")]
pub fn as_image_buffer<P: Pixel>(&self) -> CuResult<ImageBuffer<P, &[P::Subpixel]>> {
let width = self.format.width;
let height = self.format.height;
let data = self.buffer_handle.as_slice();

assert_eq!(
width, self.format.stride,
"STRIDE must equal WIDTH for ImageBuffer compatibility."
);

let raw_pixels: &[P::Subpixel] =
unsafe { core::slice::from_raw_parts(data.as_ptr() as *const P::Subpixel, data.len()) };

let raw_pixels: &[P::Subpixel] = self.buffer_handle.with_inner(|inner| unsafe {
let data: &[u8] = inner;
core::slice::from_raw_parts(data.as_ptr() as *const P::Subpixel, data.len())
});
ImageBuffer::from_raw(width, height, raw_pixels)
.ok_or("Could not create the image:: buffer".into())
}
Expand All @@ -62,18 +94,20 @@ impl CuImage {
pub fn as_kornia_image<T: Clone, const C: usize>(&self) -> CuResult<Image<T, C>> {
let width = self.format.width as usize;
let height = self.format.height as usize;
let data = self.buffer_handle.as_slice();

assert_eq!(
width, self.format.stride as usize,
"stride must equal width for Kornia compatibility."
);

let size = width * height * C;

let raw_pixels: &[T] = unsafe {
core::slice::from_raw_parts(data.as_ptr() as *const T, data.len() / size_of::<T>())
};
let raw_pixels: &[T] = self.buffer_handle.with_inner(|inner| unsafe {
let data: &[u8] = inner;
core::slice::from_raw_parts(
data.as_ptr() as *const T,
data.len() / std::mem::size_of::<T>(),
)
});

unsafe { Image::from_raw_parts([height, width].into(), raw_pixels.as_ptr(), size) }
.map_err(|e| CuError::new_with_cause("Could not create a Kornia Image", e))
Expand Down
15 changes: 8 additions & 7 deletions components/sources/cu_v4l/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ mod empty_impl {
impl Freezable for V4l {}

impl<'cl> CuSrcTask<'cl> for V4l {
type Output = output_msg!('cl, CuImage);
type Output = output_msg!('cl, CuImage<Vec<u8>>);

fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
where
Expand Down Expand Up @@ -52,9 +52,8 @@ mod linux_impl {
pub use v4l::{Format, FourCC, Timestamp};

// A Copper source task that reads frames from a V4L device.
// BS is the image buffer size to be used. ie the maximum size of the image buffer.
pub struct V4l {
stream: CuV4LStream, // move that as a generic parameter
stream: CuV4LStream,
settled_format: CuImageBufferFormat,
v4l_clock_time_offset_ns: i64,
}
Expand All @@ -67,7 +66,7 @@ mod linux_impl {
}

impl<'cl> CuSrcTask<'cl> for V4l {
type Output = output_msg!('cl, CuImage);
type Output = output_msg!('cl, CuImage<Vec<u8>>);

fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
where
Expand Down Expand Up @@ -154,7 +153,6 @@ mod linux_impl {
};
(size.width, size.height)
};
debug!("V4L: Use resolution: {}x{}", width, height);

// Set the format with the chosen resolution
let req_fmt = Format::new(width, height, fourcc);
Expand All @@ -168,6 +166,10 @@ mod linux_impl {
dev.set_params(&new_params)
.map_err(|e| CuError::new_with_cause("Failed to set params", e))?;
}
debug!(
"V4L: Negotiated resolution: {}x{}",
actual_fmt.width, actual_fmt.height
);
actual_fmt
} else {
return Err(format!(
Expand Down Expand Up @@ -310,8 +312,7 @@ mod linux_impl {
for _ in 0..1000 {
let _output = v4l.process(&clock, &mut msg);
if let Some(frame) = msg.payload() {
debug!("Buffer index: {}", frame.buffer_handle.index());
let slice = frame.as_slice();
let slice: &[u8] = &frame.buffer_handle.lock().unwrap();
let arrow_buffer = ArrowBuffer::from(slice);
let blob = Blob::from(arrow_buffer);
let rerun_img = ImageBuffer::from(blob);
Expand Down
49 changes: 21 additions & 28 deletions components/sources/cu_v4l/src/v4lstream.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
use cu29::prelude::{CuBufferHandle, CuHostMemoryPool};
use cu29::prelude::CuPool;
use cu29::prelude::{CuHandle, CuHostMemoryPool};
use std::convert::TryInto;
use std::rc::Rc;
use std::time::Duration;
use std::{io, mem, sync::Arc};
use v4l::buffer::{Metadata, Type};
use v4l::device::Handle;
use v4l::io::traits::{CaptureStream, Stream};
use v4l::memory::Memory;
use v4l::v4l_sys::*;
use v4l::v4l_sys::{v4l2_buffer, v4l2_buffer__bindgen_ty_1, v4l2_format, v4l2_requestbuffers};
use v4l::{v4l2, Device};

// A specialized V4L stream that uses Copper Buffers for memory management.
pub struct CuV4LStream {
v4l_handle: Arc<Handle>,
v4l_buf_type: Type,
memory_pool: Rc<CuHostMemoryPool>,
memory_pool: CuHostMemoryPool<Vec<u8>>,
// Arena matching the vl42 metadata and the Copper Buffers
arena: Vec<(Metadata, Option<CuBufferHandle>)>,
arena: Vec<(Metadata, Option<CuHandle<Vec<u8>>>)>,
arena_last_freed_up_index: usize,
timeout: Option<i32>,
active: bool,
Expand All @@ -34,13 +34,13 @@ impl CuV4LStream {
buf_size: usize,
buf_count: u32,
) -> io::Result<Self> {
let memory_pool = CuHostMemoryPool::new(buf_size, buf_count + 1, page_size::get()); // +1 to be able to queue one last buffer before zapping the first
let memory_pool = CuHostMemoryPool::new(buf_count as usize + 1, || vec![0; buf_size]); // +1 to be able to queue one last buffer before zapping the first
let mut arena = Vec::new();
arena.resize(buf_count as usize, (Metadata::default(), None));

let mut result = CuV4LStream {
v4l_handle: dev.handle(),
memory_pool: Rc::new(memory_pool),
memory_pool,
arena,
arena_last_freed_up_index: 0,
v4l_buf_type: buf_type,
Expand Down Expand Up @@ -151,7 +151,7 @@ impl Drop for CuV4LStream {
}

impl Stream for CuV4LStream {
type Item = CuBufferHandle;
type Item = CuHandle<Vec<u8>>;

fn start(&mut self) -> io::Result<()> {
// Enqueue all buffers once on stream start
Expand Down Expand Up @@ -191,28 +191,27 @@ impl Stream for CuV4LStream {

impl CaptureStream<'_> for CuV4LStream {
fn queue(&mut self, index: usize) -> io::Result<()> {
let buffer_handle = CuHostMemoryPool::allocate(&self.memory_pool).ok_or(io::Error::new(
io::ErrorKind::Other,
"Failed to allocate buffer",
))?;
let buffer_handle = self.memory_pool.acquire().unwrap();
self.arena[index] = (Metadata::default(), Some(buffer_handle.clone()));
let mut v4l2_buf = buffer_handle.with_inner_mut(|inner| {
let destination: &mut [u8] = inner;

let buf: &[u8] = buffer_handle.as_slice();
let mut v4l2_buf = v4l2_buffer {
index: index as u32,
m: v4l2_buffer__bindgen_ty_1 {
userptr: buf.as_ptr() as std::os::raw::c_ulong,
},
length: buf.len() as u32,
..self.buffer_desc()
};
v4l2_buffer {
index: index as u32,
m: v4l2_buffer__bindgen_ty_1 {
userptr: destination.as_ptr() as std::os::raw::c_ulong,
},
length: destination.len() as u32,
..self.buffer_desc()
}
});
unsafe {
v4l2::ioctl(
self.v4l_handle.fd(),
v4l2::vidioc::VIDIOC_QBUF,
&mut v4l2_buf as *mut _ as *mut std::os::raw::c_void,
)?;
}
self.arena[index] = (Metadata::default(), Some(buffer_handle));
Ok(())
}

Expand All @@ -238,12 +237,6 @@ impl CaptureStream<'_> for CuV4LStream {
)?;
}
let index = v4l2_buf.index as usize;
// println!("dequeue: dequeueing buffer {}", index);
// println!(" length {}", v4l2_buf.length);
// println!(" bytesused {}", v4l2_buf.bytesused);
// println!(" timestamp {:?}", v4l2_buf.timestamp);
// println!(" sequence {}", v4l2_buf.sequence);
// println!(" field {}", v4l2_buf.field);

self.arena[index].0 = Metadata {
bytesused: v4l2_buf.bytesused,
Expand Down
9 changes: 8 additions & 1 deletion core/cu29_runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,14 @@ cu29-clock = { workspace = true }
clap = { workspace = true }
tempfile = { workspace = true }
arrayvec = "0.7.6"

ron = "0.8.1"
hdrhistogram = "7.5.4"
petgraph = { version = "0.7.1", features = ["serde", "serde-1", "serde_derive"] }
object-pool = "0.6.0"

[target.'cfg(target_os = "linux")'.dependencies]
cudarc = { version = "0.13", optional = true, features = ["cuda-version-from-build-system"] }

[features]
default = []
cuda = ["cudarc"]
Loading

0 comments on commit 0c34cce

Please sign in to comment.