diff --git a/components/payloads/cu_sensor_payloads/src/image.rs b/components/payloads/cu_sensor_payloads/src/image.rs index 825092627..39a21954c 100644 --- a/components/payloads/cu_sensor_payloads/src/image.rs +++ b/components/payloads/cu_sensor_payloads/src/image.rs @@ -29,7 +29,7 @@ impl CuImageBufferFormat { #[derive(Debug, Default, Clone, Encode)] pub struct CuImage where - A: ArrayLike, + A: ArrayLike, { pub seq: u64, pub format: CuImageBufferFormat, @@ -53,7 +53,7 @@ impl Decode for CuImage> { impl CuImage where - A: ArrayLike, + A: ArrayLike, { pub fn new(format: CuImageBufferFormat, buffer_handle: CuHandle) -> Self { assert!( @@ -70,7 +70,7 @@ where impl CuImage where - A: ArrayLike, + A: ArrayLike, { /// Builds an ImageBuffer from the image crate backed by the CuImage's pixel data. #[cfg(feature = "image")] @@ -113,11 +113,3 @@ where .map_err(|e| CuError::new_with_cause("Could not create a Kornia Image", e)) } } - - -// Gstreamer support -#[cfg(feature = "gst")] -mod gst {} - -#[cfg!(feature = "gst")] -pub use gst::*; diff --git a/components/sources/cu_gstreamer/Cargo.toml b/components/sources/cu_gstreamer/Cargo.toml index cae2bca1c..0d23049c8 100644 --- a/components/sources/cu_gstreamer/Cargo.toml +++ b/components/sources/cu_gstreamer/Cargo.toml @@ -14,6 +14,8 @@ repository.workspace = true [dependencies] gstreamer = "0.23.4" gstreamer-app = "0.23.4" +cu29 = { workspace = true } gstreamer-pbutils = "0.23.4" gstreamer-video = "0.23.4" -rerun = "*" \ No newline at end of file +rerun = "*" +bincode = "2.0.0-rc.3" \ No newline at end of file diff --git a/components/sources/cu_gstreamer/src/lib.rs b/components/sources/cu_gstreamer/src/lib.rs new file mode 100644 index 000000000..9b2a1938d --- /dev/null +++ b/components/sources/cu_gstreamer/src/lib.rs @@ -0,0 +1,207 @@ +use cu29::prelude::*; +use gstreamer::prelude::*; +use std::error::Error; + +use bincode::de::Decoder; +use bincode::enc::Encoder; +use bincode::error::{DecodeError, EncodeError}; +use bincode::{Decode, Encode}; +use gstreamer::{parse, Buffer, BufferRef, Caps, FlowSuccess, Pipeline}; +use gstreamer_app::{AppSink, AppSinkCallbacks}; +use std::fmt::Debug; +use std::str::FromStr; + +#[derive(Debug, Clone, Default)] +pub struct GstBufferWrapper(Buffer); +impl Decode for GstBufferWrapper { + fn decode(decoder: &mut D) -> Result { + let vec: Vec = Vec::decode(decoder)?; + let buffer = Buffer::from_slice(vec); + Ok(GstBufferWrapper(buffer)) + } +} + +impl Encode for GstBufferWrapper { + fn encode(&self, encoder: &mut E) -> Result<(), EncodeError> { + self.0 + .as_ref() + .map_readable() + .map_err(|_| EncodeError::Other { + 0: "Could not map readable", + })? + .encode(encoder) + } +} + +pub struct CuGStreamer { + pipeline: Pipeline, +} + +impl Freezable for CuGStreamer {} + +impl<'cl> CuSrcTask<'cl> for CuGStreamer { + type Output = output_msg!('cl, GstBufferWrapper); + + fn new(config: Option<&ComponentConfig>) -> CuResult + where + Self: Sized, + { + if !gstreamer::INITIALIZED.load(std::sync::atomic::Ordering::SeqCst) { + gstreamer::init().unwrap(); + } + + let config = config.ok_or_else(|| CuError::from("No config provided."))?; + + let pipeline = if let Some(pipeline_str) = config.get::("pipeline") { + let pipeline = parse::launch(pipeline_str.as_str()) + .map_err(|e| CuError::new_with_cause("Failed to parse pipeline.", e))?; + Ok(pipeline) + } else { + Err(CuError::from("No pipeline provided.")) + }?; + let caps_str = if let Some(caps_str) = config.get::("caps") { + Ok(caps_str) + } else { + Err(CuError::from( + "No Caps (ie format for example \"video/x-raw, format=NV12, width=1920, height=1080\") provided for the appsink element.", + )) + }?; + + let pipeline = pipeline.dynamic_cast::().unwrap(); + + let appsink = pipeline.by_name("copper").unwrap(); + let appsink = appsink.dynamic_cast::().unwrap(); + let caps = Caps::from_str(caps_str.as_str()) + .map_err(|e| CuError::new_with_cause("Failed to create caps for appsink.", e))?; + + appsink.set_caps(Some(&caps)); + + // Configure `appsink` to handle incoming buffers + appsink.set_callbacks( + AppSinkCallbacks::builder() + .new_sample(move |appsink| { + println!("Callback!"); + let sample = appsink + .pull_sample() + .map_err(|_| gstreamer::FlowError::Eos)?; + let buffer: &BufferRef = sample.buffer().ok_or(gstreamer::FlowError::Error)?; + // TODO + + Ok(FlowSuccess::Ok) + }) + .build(), + ); + + let s = CuGStreamer { pipeline }; + Ok(s) + } + + fn start(&mut self, _clock: &RobotClock) -> CuResult<()> { + self.pipeline + .set_state(gstreamer::State::Playing) + .map_err(|e| CuError::new_with_cause("Failed to start the gstreamer pipeline.", e))?; + Ok(()) + } + fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> { + self.pipeline + .set_state(gstreamer::State::Null) + .map_err(|e| CuError::new_with_cause("Failed to stop the gstreamer pipeline.", e))?; + Ok(()) + } + + fn process(&mut self, clock: &RobotClock, new_msg: Self::Output) -> CuResult<()> { + todo!() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::CuGStreamer; + use cu29::prelude::*; + use gstreamer::{parse, Buffer, BufferRef, Caps, FlowSuccess, Pipeline}; + use gstreamer_app::{AppSink, AppSinkCallbacks}; + use rerun::{ChannelDatatype, ColorModel, Image, RecordingStreamBuilder}; + use std::thread::sleep; + use std::time::Duration; + + #[test] + fn test_end_to_end() { + let mut task = CuGStreamer::new(None).unwrap(); + let clock = RobotClock::new(); + let mut msg = CuMsg::new(Some(GstBufferWrapper(Buffer::new()))); + task.process(&clock, &mut msg).unwrap(); + } + + #[test] + fn old() { + let rec = RecordingStreamBuilder::new("Camera B&W Viz") + .spawn() + .unwrap(); + + gstreamer::init().unwrap(); + + let pipeline = parse::launch( + "v4l2src device=/dev/video2 ! video/x-raw, format=NV12, width=1920, height=1080 ! appsink name=sink", + ).unwrap(); + println!("launched"); + let pipeline = pipeline.dynamic_cast::().unwrap(); + + let appsink = pipeline.by_name("sink").unwrap(); + let appsink = appsink.dynamic_cast::().unwrap(); + + appsink.set_caps(Some( + &Caps::builder("video/x-raw") + .field("format", &"NV12") + .build(), + )); + + // Configure `appsink` to handle incoming buffers + appsink.set_callbacks( + AppSinkCallbacks::builder() + .new_sample(move |appsink| { + println!("Callback!"); + let sample = appsink + .pull_sample() + .map_err(|_| gstreamer::FlowError::Eos)?; + let buffer: &BufferRef = sample.buffer().ok_or(gstreamer::FlowError::Error)?; + + // Get the buffer's memory (zero-copy access) + let data = buffer + .map_readable() + .map_err(|_| gstreamer::FlowError::Error)?; + println!("Received buffer: {} bytes", data.len()); + let width = 1920; + let height = 1080; + let y_plane_size = width * height; + let grey_image = &data[0..y_plane_size]; + + // Rerun stuff + let image = Image::from_color_model_and_bytes( + grey_image.to_vec(), + [width as u32, height as u32], + ColorModel::L, + ChannelDatatype::U8, + ); + { + rec.log("camera/image", &image).map_err(|err| { + eprintln!("Error logging image to rerun: {:?}", err); + gstreamer::FlowError::Error + })?; + } + // end rerun + + Ok(FlowSuccess::Ok) + }) + .build(), + ); + + // Start streaming + pipeline.set_state(gstreamer::State::Playing).unwrap(); + + println!("Streaming... Press Ctrl+C to stop."); + loop { + sleep(Duration::from_millis(100)); + } + } +} diff --git a/components/sources/cu_gstreamer/src/main.rs b/components/sources/cu_gstreamer/src/main.rs deleted file mode 100644 index b8677a79c..000000000 --- a/components/sources/cu_gstreamer/src/main.rs +++ /dev/null @@ -1,78 +0,0 @@ -use gstreamer::prelude::*; -use gstreamer::{parse, BufferRef, Caps, FlowSuccess, Pipeline}; -use gstreamer_app::{AppSink, AppSinkCallbacks}; -use rerun::{ChannelDatatype, ColorModel, Image, RecordingStreamBuilder}; -use std::error::Error; -use std::thread::sleep; -use std::time::Duration; - -fn main() -> Result<(), Box> { - let rec = RecordingStreamBuilder::new("Camera B&W Viz") - .spawn() - .unwrap(); - - gstreamer::init()?; - - let pipeline = parse::launch( - "v4l2src device=/dev/video2 ! video/x-raw, format=NV12, width=1920, height=1080 ! appsink name=sink", - )?; - println!("launched"); - let pipeline = pipeline.dynamic_cast::().unwrap(); - - let appsink = pipeline.by_name("sink").unwrap(); - let appsink = appsink.dynamic_cast::().unwrap(); - - appsink.set_caps(Some( - &Caps::builder("video/x-raw") - .field("format", &"NV12") - .build(), - )); - - // Configure `appsink` to handle incoming buffers - appsink.set_callbacks( - AppSinkCallbacks::builder() - .new_sample(move |appsink| { - println!("Callback!"); - let sample = appsink - .pull_sample() - .map_err(|_| gstreamer::FlowError::Eos)?; - let buffer: &BufferRef = sample.buffer().ok_or(gstreamer::FlowError::Error)?; - - // Get the buffer's memory (zero-copy access) - let data = buffer - .map_readable() - .map_err(|_| gstreamer::FlowError::Error)?; - println!("Received buffer: {} bytes", data.len()); - let width = 1920; - let height = 1080; - let y_plane_size = width * height; - let grey_image = &data[0..y_plane_size]; - - // Rerun stuff - let image = Image::from_color_model_and_bytes( - grey_image.to_vec(), - [width as u32, height as u32], - ColorModel::L, - ChannelDatatype::U8, - ); - { - rec.log("camera/image", &image).map_err(|err| { - eprintln!("Error logging image to rerun: {:?}", err); - gstreamer::FlowError::Error - })?; - } - // end rerun - - Ok(FlowSuccess::Ok) - }) - .build(), - ); - - // Start streaming - pipeline.set_state(gstreamer::State::Playing)?; - - println!("Streaming... Press Ctrl+C to stop."); - loop { - sleep(Duration::from_millis(100)); - } -} diff --git a/core/cu29_runtime/src/pool.rs b/core/cu29_runtime/src/pool.rs index 45dabd64e..7c10777da 100644 --- a/core/cu29_runtime/src/pool.rs +++ b/core/cu29_runtime/src/pool.rs @@ -525,51 +525,12 @@ impl Drop for AlignedBuffer { } } -// Gstreamer buffer pools: support -#[cfg(feature = "gst")] -mod gst { - use bincode::de::Decoder; - use bincode::enc::Encoder; - use bincode::error::{DecodeError, EncodeError}; - use bincode::{Decode, Encode}; - use gstreamer::Buffer; - use std::fmt::Debug; - - #[derive(Debug, Clone, Default)] - pub struct GstBufferWrapper(Buffer); - impl Decode for GstBufferWrapper { - fn decode(decoder: &mut D) -> Result { - let vec: Vec = Vec::decode(decoder)?; - let buffer = Buffer::from_slice(vec); - Ok(GstBufferWrapper(buffer)) - } - } - - impl Encode for GstBufferWrapper { - fn encode(&self, encoder: &mut E) -> Result<(), EncodeError> { - self.0 - .as_ref() - .map_readable() - .map_err(|_| EncodeError::Other { - 0: "Could not map readable", - })? - .encode(encoder) - } - } -} - -#[cfg(feature = "gst")] -pub use gst::*; - #[cfg(test)] mod tests { use super::*; - use crate::config::ComponentConfig; - use crate::cutask::{CuSrcTask, Freezable}; - use crate::output_msg; + use crate::cutask::CuSrcTask; #[cfg(all(feature = "cuda", not(target_os = "macos")))] use crate::pool::cuda::CuCudaPool; - use cu29_clock::RobotClock; use std::cell::RefCell; #[test] @@ -661,28 +622,4 @@ mod tests { let value = final_handle.lock().unwrap().deref().deref()[0]; assert_eq!(value, 42.0); } - - mod gst_compat { - use super::*; - use crate::cutask::CuMsg; - // just test if the GstBuffers are message compatible with Copper - struct MySrc; - - impl Freezable for MySrc {} - - impl<'cl> CuSrcTask<'cl> for MySrc { - type Output = output_msg!('cl, GstBufferWrapper); - - fn new(_config: Option<&ComponentConfig>) -> CuResult - where - Self: Sized, - { - todo!() - } - - fn process(&mut self, clock: &RobotClock, new_msg: Self::Output) -> CuResult<()> { - todo!() - } - } - } }