Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
gbin committed Feb 3, 2025
1 parent 3c12873 commit 4599872
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 4 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ members = [
"examples/cu_multisources",
"examples/cu_pointclouds",
"examples/cu_rp_balancebot",
"examples/cu_standalone_structlog",
"examples/cu_standalone_structlog", "components/sources/cu_gstreamer",
]

# put only the core crates here that are not platform specific
Expand Down
3 changes: 3 additions & 0 deletions components/payloads/cu_sensor_payloads/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ uom = { workspace = true }
derive_more = { workspace = true }
image = { version = "0.25.5", optional = true }
kornia = { version = "0.1.8", optional = true }
gstreamer = { version = "0.23.4", optional = true }
gstreamer-app = { version = "0.23.4", optional = true }

[features]
image = ["dep:image"]
kornia = ["dep:kornia"]
gst = ["dep:gstreamer", "dep:gstreamer-app"]
14 changes: 11 additions & 3 deletions components/payloads/cu_sensor_payloads/src/image.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl CuImageBufferFormat {
#[derive(Debug, Default, Clone, Encode)]
pub struct CuImage<A>
where
A: ArrayLike<Element = u8>,
A: ArrayLike<Element=u8>,
{
pub seq: u64,
pub format: CuImageBufferFormat,
Expand All @@ -53,7 +53,7 @@ impl Decode for CuImage<Vec<u8>> {

impl<A> CuImage<A>
where
A: ArrayLike<Element = u8>,
A: ArrayLike<Element=u8>,
{
pub fn new(format: CuImageBufferFormat, buffer_handle: CuHandle<A>) -> Self {
assert!(
Expand All @@ -70,7 +70,7 @@ where

impl<A> CuImage<A>
where
A: ArrayLike<Element = u8>,
A: ArrayLike<Element=u8>,
{
/// Builds an ImageBuffer from the image crate backed by the CuImage's pixel data.
#[cfg(feature = "image")]
Expand Down Expand Up @@ -113,3 +113,11 @@ where
.map_err(|e| CuError::new_with_cause("Could not create a Kornia Image", e))
}
}


// Gstreamer support
#[cfg(feature = "gst")]
mod gst {}

#[cfg!(feature = "gst")]
pub use gst::*;
19 changes: 19 additions & 0 deletions components/sources/cu_gstreamer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "cu-gstreamer"
description = "This is a Copper GStreamer sink."

version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
keywords.workspace = true
categories.workspace = true
homepage.workspace = true
repository.workspace = true

[dependencies]
gstreamer = "0.23.4"
gstreamer-app = "0.23.4"
gstreamer-pbutils = "0.23.4"
gstreamer-video = "0.23.4"
rerun = "*"
78 changes: 78 additions & 0 deletions components/sources/cu_gstreamer/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use gstreamer::prelude::*;
use gstreamer::{parse, BufferRef, Caps, FlowSuccess, Pipeline};
use gstreamer_app::{AppSink, AppSinkCallbacks};
use rerun::{ChannelDatatype, ColorModel, Image, RecordingStreamBuilder};
use std::error::Error;
use std::thread::sleep;
use std::time::Duration;

fn main() -> Result<(), Box<dyn Error>> {
let rec = RecordingStreamBuilder::new("Camera B&W Viz")
.spawn()
.unwrap();

gstreamer::init()?;

let pipeline = parse::launch(
"v4l2src device=/dev/video2 ! video/x-raw, format=NV12, width=1920, height=1080 ! appsink name=sink",
)?;
println!("launched");
let pipeline = pipeline.dynamic_cast::<Pipeline>().unwrap();

let appsink = pipeline.by_name("sink").unwrap();
let appsink = appsink.dynamic_cast::<AppSink>().unwrap();

appsink.set_caps(Some(
&Caps::builder("video/x-raw")
.field("format", &"NV12")
.build(),
));

// Configure `appsink` to handle incoming buffers
appsink.set_callbacks(
AppSinkCallbacks::builder()
.new_sample(move |appsink| {
println!("Callback!");
let sample = appsink
.pull_sample()
.map_err(|_| gstreamer::FlowError::Eos)?;
let buffer: &BufferRef = sample.buffer().ok_or(gstreamer::FlowError::Error)?;

// Get the buffer's memory (zero-copy access)
let data = buffer
.map_readable()
.map_err(|_| gstreamer::FlowError::Error)?;
println!("Received buffer: {} bytes", data.len());
let width = 1920;
let height = 1080;
let y_plane_size = width * height;
let grey_image = &data[0..y_plane_size];

// Rerun stuff
let image = Image::from_color_model_and_bytes(
grey_image.to_vec(),
[width as u32, height as u32],
ColorModel::L,
ChannelDatatype::U8,
);
{
rec.log("camera/image", &image).map_err(|err| {
eprintln!("Error logging image to rerun: {:?}", err);
gstreamer::FlowError::Error
})?;
}
// end rerun

Ok(FlowSuccess::Ok)
})
.build(),
);

// Start streaming
pipeline.set_state(gstreamer::State::Playing)?;

println!("Streaming... Press Ctrl+C to stop.");
loop {
sleep(Duration::from_millis(100));
}
}
3 changes: 3 additions & 0 deletions core/cu29_runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,13 @@ ron = "0.8.1"
hdrhistogram = "7.5.4"
petgraph = { version = "0.7.1", features = ["serde", "serde-1", "serde_derive"] }
object-pool = "0.6.0"
gstreamer = { version = "0.23.4", optional = true }
gstreamer-app = { version = "0.23.4", optional = true }

[target.'cfg(not(target_os = "macos"))'.dependencies]
cudarc = { version = "0.13", optional = true, features = ["cuda-version-from-build-system"] }

[features]
default = []
cuda = ["cudarc"]
gst = ["gstreamer", "gstreamer-app"]
64 changes: 64 additions & 0 deletions core/cu29_runtime/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,11 +525,51 @@ impl<E: ElementType> Drop for AlignedBuffer<E> {
}
}

// Gstreamer buffer pools: support
#[cfg(feature = "gst")]
mod gst {
use bincode::de::Decoder;
use bincode::enc::Encoder;
use bincode::error::{DecodeError, EncodeError};
use bincode::{Decode, Encode};
use gstreamer::Buffer;
use std::fmt::Debug;

#[derive(Debug, Clone, Default)]
pub struct GstBufferWrapper(Buffer);
impl Decode for GstBufferWrapper {
fn decode<D: Decoder>(decoder: &mut D) -> Result<Self, DecodeError> {
let vec: Vec<u8> = Vec::decode(decoder)?;
let buffer = Buffer::from_slice(vec);
Ok(GstBufferWrapper(buffer))
}
}

impl Encode for GstBufferWrapper {
fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
self.0
.as_ref()
.map_readable()
.map_err(|_| EncodeError::Other {
0: "Could not map readable",
})?
.encode(encoder)
}
}
}

#[cfg(feature = "gst")]
pub use gst::*;

#[cfg(test)]
mod tests {
use super::*;
use crate::config::ComponentConfig;
use crate::cutask::{CuSrcTask, Freezable};
use crate::output_msg;
#[cfg(all(feature = "cuda", not(target_os = "macos")))]
use crate::pool::cuda::CuCudaPool;
use cu29_clock::RobotClock;
use std::cell::RefCell;

#[test]
Expand Down Expand Up @@ -621,4 +661,28 @@ mod tests {
let value = final_handle.lock().unwrap().deref().deref()[0];
assert_eq!(value, 42.0);
}

mod gst_compat {
use super::*;
use crate::cutask::CuMsg;
// just test if the GstBuffers are message compatible with Copper
struct MySrc;

impl Freezable for MySrc {}

impl<'cl> CuSrcTask<'cl> for MySrc {
type Output = output_msg!('cl, GstBufferWrapper);

fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
where
Self: Sized,
{
todo!()
}

fn process(&mut self, clock: &RobotClock, new_msg: Self::Output) -> CuResult<()> {
todo!()
}
}
}
}

0 comments on commit 4599872

Please sign in to comment.