From fdec476b5140731c5cff0cbdcb0e6efb67f137b0 Mon Sep 17 00:00:00 2001 From: Guillaume Binet Date: Tue, 4 Feb 2025 08:49:53 -0600 Subject: [PATCH] Works on my machine (c) (r) --- components/sources/cu_gstreamer/Cargo.toml | 16 +- components/sources/cu_gstreamer/build.rs | 11 ++ components/sources/cu_gstreamer/src/lib.rs | 144 ++++++------------ .../cu_gstreamer/tests/copperconfig.ron | 21 +++ .../cu_gstreamer/tests/gstreamer_tester.rs | 83 ++++++++++ 5 files changed, 172 insertions(+), 103 deletions(-) create mode 100644 components/sources/cu_gstreamer/build.rs create mode 100644 components/sources/cu_gstreamer/tests/copperconfig.ron create mode 100644 components/sources/cu_gstreamer/tests/gstreamer_tester.rs diff --git a/components/sources/cu_gstreamer/Cargo.toml b/components/sources/cu_gstreamer/Cargo.toml index 1618a1ac0..9480ec68c 100644 --- a/components/sources/cu_gstreamer/Cargo.toml +++ b/components/sources/cu_gstreamer/Cargo.toml @@ -12,11 +12,15 @@ homepage.workspace = true repository.workspace = true [dependencies] -gstreamer = "0.23.4" -gstreamer-app = "0.23.4" cu29 = { workspace = true } -gstreamer-pbutils = "0.23.4" -gstreamer-video = "0.23.4" +bincode = { workspace = true } circular-buffer = "1.0.0" -rerun = "*" -bincode = "2.0.0-rc.3" \ No newline at end of file +gstreamer = "0.23.4" +gstreamer-app = "0.23.4" + +[dev-dependencies] +rerun = { workspace = true } +cu29-helpers = { workspace = true } + +[build-dependencies] +cfg_aliases = "0.2.1" diff --git a/components/sources/cu_gstreamer/build.rs b/components/sources/cu_gstreamer/build.rs new file mode 100644 index 000000000..51824d1b3 --- /dev/null +++ b/components/sources/cu_gstreamer/build.rs @@ -0,0 +1,11 @@ +use cfg_aliases::cfg_aliases; +fn main() { + println!( + "cargo:rustc-env=LOG_INDEX_DIR={}", + std::env::var("OUT_DIR").unwrap() + ); + cfg_aliases! { + hardware: { all(target_os = "linux", not(feature = "mock")) }, + mock: { any(not(target_os = "linux"), feature = "mock") }, + } +} diff --git a/components/sources/cu_gstreamer/src/lib.rs b/components/sources/cu_gstreamer/src/lib.rs index 9d89ceacb..3db5838b9 100644 --- a/components/sources/cu_gstreamer/src/lib.rs +++ b/components/sources/cu_gstreamer/src/lib.rs @@ -1,6 +1,5 @@ use cu29::prelude::*; use gstreamer::prelude::*; -use std::error::Error; use bincode::de::Decoder; use bincode::enc::Encoder; @@ -10,20 +9,36 @@ use circular_buffer::CircularBuffer; use gstreamer::{parse, Buffer, BufferRef, Caps, FlowSuccess, Pipeline}; use gstreamer_app::{AppSink, AppSinkCallbacks}; use std::fmt::Debug; +use std::ops::{Deref, DerefMut}; use std::str::FromStr; use std::sync::{Arc, Mutex}; #[derive(Debug, Clone, Default)] -pub struct GstBufferWrapper(Buffer); -impl Decode for GstBufferWrapper { +pub struct CuGstBuffer(Buffer); + +impl Deref for CuGstBuffer { + type Target = Buffer; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for CuGstBuffer { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl Decode for CuGstBuffer { fn decode(decoder: &mut D) -> Result { let vec: Vec = Vec::decode(decoder)?; let buffer = Buffer::from_slice(vec); - Ok(GstBufferWrapper(buffer)) + Ok(CuGstBuffer(buffer)) } } -impl Encode for GstBufferWrapper { +impl Encode for CuGstBuffer { fn encode(&self, encoder: &mut E) -> Result<(), EncodeError> { self.0 .as_ref() @@ -39,25 +54,30 @@ pub type CuDefaultGStreamer = CuGStreamer<8>; pub struct CuGStreamer { pipeline: Pipeline, - circular_buffer: Arc>>, + circular_buffer: Arc>>, + _appsink: AppSink, } impl Freezable for CuGStreamer {} impl<'cl, const N: usize> CuSrcTask<'cl> for CuGStreamer { - type Output = output_msg!('cl, GstBufferWrapper); + type Output = output_msg!('cl, CuGstBuffer); fn new(config: Option<&ComponentConfig>) -> CuResult where Self: Sized, { if !gstreamer::INITIALIZED.load(std::sync::atomic::Ordering::SeqCst) { + debug!("Initializing gstreamer..."); gstreamer::init().unwrap(); + } else { + debug!("Gstreamer already initialized."); } let config = config.ok_or_else(|| CuError::from("No config provided."))?; let pipeline = if let Some(pipeline_str) = config.get::("pipeline") { + debug!("Creating with pipeline: {}", &pipeline_str); let pipeline = parse::launch(pipeline_str.as_str()) .map_err(|e| CuError::new_with_cause("Failed to parse pipeline.", e))?; Ok(pipeline) @@ -65,6 +85,7 @@ impl<'cl, const N: usize> CuSrcTask<'cl> for CuGStreamer { Err(CuError::from("No pipeline provided.")) }?; let caps_str = if let Some(caps_str) = config.get::("caps") { + debug!("Creating with caps: {}", &caps_str); Ok(caps_str) } else { Err(CuError::from( @@ -87,9 +108,9 @@ impl<'cl, const N: usize> CuSrcTask<'cl> for CuGStreamer { appsink.set_callbacks( AppSinkCallbacks::builder() .new_sample({ - let mut circular_buffer = circular_buffer.clone(); + let circular_buffer = circular_buffer.clone(); move |appsink| { - println!("Callback!"); + debug!("New sample received."); let sample = appsink .pull_sample() .map_err(|_| gstreamer::FlowError::Eos)?; @@ -98,7 +119,7 @@ impl<'cl, const N: usize> CuSrcTask<'cl> for CuGStreamer { circular_buffer .lock() .unwrap() - .push_back(GstBufferWrapper(buffer.to_owned())); + .push_back(CuGstBuffer(buffer.to_owned())); Ok(FlowSuccess::Ok) } @@ -109,122 +130,51 @@ impl<'cl, const N: usize> CuSrcTask<'cl> for CuGStreamer { let s = CuGStreamer { pipeline, circular_buffer, + _appsink: appsink, }; Ok(s) } fn start(&mut self, _clock: &RobotClock) -> CuResult<()> { self.circular_buffer.lock().unwrap().clear(); + debug!("Starting the pipeline."); self.pipeline .set_state(gstreamer::State::Playing) .map_err(|e| CuError::new_with_cause("Failed to start the gstreamer pipeline.", e))?; Ok(()) } - fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> { - self.pipeline - .set_state(gstreamer::State::Null) - .map_err(|e| CuError::new_with_cause("Failed to stop the gstreamer pipeline.", e))?; - self.circular_buffer.lock().unwrap().clear(); - Ok(()) - } - - fn process(&mut self, clock: &RobotClock, new_msg: Self::Output) -> CuResult<()> { + fn process(&mut self, _clock: &RobotClock, new_msg: Self::Output) -> CuResult<()> { let mut circular_buffer = self.circular_buffer.lock().unwrap(); if let Some(buffer) = circular_buffer.pop_front() { // TODO: timing metadata new_msg.set_payload(buffer); + } else { + debug!("Empty circular buffer."); } Ok(()) } + + fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> { + debug!("Stopping the pipeline."); + self.pipeline + .set_state(gstreamer::State::Null) + .map_err(|e| CuError::new_with_cause("Failed to stop the gstreamer pipeline.", e))?; + self.circular_buffer.lock().unwrap().clear(); + Ok(()) + } } #[cfg(test)] mod tests { use super::*; use cu29::prelude::*; - use gstreamer::{parse, Buffer, BufferRef, Caps, FlowSuccess, Pipeline}; - use gstreamer_app::{AppSink, AppSinkCallbacks}; - use rerun::{ChannelDatatype, ColorModel, Image, RecordingStreamBuilder}; - use std::thread::sleep; - use std::time::Duration; + use gstreamer::Buffer; #[test] fn test_end_to_end() { let mut task = CuDefaultGStreamer::new(None).unwrap(); let clock = RobotClock::new(); - let mut msg = CuMsg::new(Some(GstBufferWrapper(Buffer::new()))); + let mut msg = CuMsg::new(Some(CuGstBuffer(Buffer::new()))); task.process(&clock, &mut msg).unwrap(); } - - #[test] - fn old() { - let rec = RecordingStreamBuilder::new("Camera B&W Viz") - .spawn() - .unwrap(); - - gstreamer::init().unwrap(); - - let pipeline = parse::launch( - "v4l2src device=/dev/video2 ! video/x-raw, format=NV12, width=1920, height=1080 ! appsink name=sink", - ).unwrap(); - println!("launched"); - let pipeline = pipeline.dynamic_cast::().unwrap(); - - let appsink = pipeline.by_name("sink").unwrap(); - let appsink = appsink.dynamic_cast::().unwrap(); - - appsink.set_caps(Some( - &Caps::builder("video/x-raw") - .field("format", &"NV12") - .build(), - )); - - // Configure `appsink` to handle incoming buffers - appsink.set_callbacks( - AppSinkCallbacks::builder() - .new_sample(move |appsink| { - println!("Callback!"); - let sample = appsink - .pull_sample() - .map_err(|_| gstreamer::FlowError::Eos)?; - let buffer: &BufferRef = sample.buffer().ok_or(gstreamer::FlowError::Error)?; - - // Get the buffer's memory (zero-copy access) - let data = buffer - .map_readable() - .map_err(|_| gstreamer::FlowError::Error)?; - println!("Received buffer: {} bytes", data.len()); - let width = 1920; - let height = 1080; - let y_plane_size = width * height; - let grey_image = &data[0..y_plane_size]; - - // Rerun stuff - let image = Image::from_color_model_and_bytes( - grey_image.to_vec(), - [width as u32, height as u32], - ColorModel::L, - ChannelDatatype::U8, - ); - { - rec.log("camera/image", &image).map_err(|err| { - eprintln!("Error logging image to rerun: {:?}", err); - gstreamer::FlowError::Error - })?; - } - // end rerun - - Ok(FlowSuccess::Ok) - }) - .build(), - ); - - // Start streaming - pipeline.set_state(gstreamer::State::Playing).unwrap(); - - println!("Streaming... Press Ctrl+C to stop."); - loop { - sleep(Duration::from_millis(100)); - } - } } diff --git a/components/sources/cu_gstreamer/tests/copperconfig.ron b/components/sources/cu_gstreamer/tests/copperconfig.ron new file mode 100644 index 000000000..64aa9250f --- /dev/null +++ b/components/sources/cu_gstreamer/tests/copperconfig.ron @@ -0,0 +1,21 @@ +( + tasks: [ + ( + id: "src", + type: "cu_gstreamer::CuDefaultGStreamer", + config: { // my webcam produces mjpeg, this is just to emulate a more embedded format like NV12 + "pipeline": "v4l2src device=/dev/video0 ! image/jpeg, width=1920, height=1080 ! jpegdec ! videoconvert ! video/x-raw, format=NV12 ! appsink name=copper", + "caps": "video/x-raw, format=NV12, width=1920, height=1080", + }, + ), + ( id: "tester", + type: "GStreamerTester" + ), + ], + cnx: [ + (src: "src", dst: "tester", msg: "cu_gstreamer::CuGstBuffer"), + ], + logging: ( + enable_task_logging: false + ) +) diff --git a/components/sources/cu_gstreamer/tests/gstreamer_tester.rs b/components/sources/cu_gstreamer/tests/gstreamer_tester.rs new file mode 100644 index 000000000..42edb7563 --- /dev/null +++ b/components/sources/cu_gstreamer/tests/gstreamer_tester.rs @@ -0,0 +1,83 @@ +use cu29::prelude::*; +use cu29_helpers::basic_copper_setup; +use cu_gstreamer::CuGstBuffer; +use rerun::{ChannelDatatype, ColorModel, Image, RecordingStream, RecordingStreamBuilder}; +use std::path::PathBuf; +use std::thread::sleep; +use std::time::Duration; + +struct GStreamerTester { + rec: RecordingStream, +} + +impl Freezable for GStreamerTester {} + +impl<'cl> CuSinkTask<'cl> for GStreamerTester { + type Input = input_msg!('cl, CuGstBuffer); + + fn new(_config: Option<&ComponentConfig>) -> CuResult + where + Self: Sized, + { + let rec = RecordingStreamBuilder::new("Camera B&W Viz") + .spawn() + .unwrap(); + Ok(Self { rec }) + } + + fn process(&mut self, _clock: &RobotClock, msg: Self::Input) -> CuResult<()> { + if msg.payload().is_none() { + debug!("Skipped"); + return Ok(()); + } + // Get the buffer's memory (zero-copy access) + let data = msg.payload().unwrap().map_readable().unwrap(); + println!("Received buffer: {} bytes", data.len()); + let width = 1920; + let height = 1080; + let y_plane_size = width * height; + let grey_image = &data[0..y_plane_size]; + + // Rerun stuff + let image = Image::from_color_model_and_bytes( + grey_image.to_vec(), + [width as u32, height as u32], + ColorModel::L, + ChannelDatatype::U8, + ); + self.rec.log("camera/image", &image).unwrap(); + Ok(()) + } +} + +#[copper_runtime(config = "tests/copperconfig.ron")] +struct GStreamerTestApp {} + +#[test] +fn end_2_end() { + let logger_path = "/tmp/caterpillar.copper"; + let copper_ctx = basic_copper_setup(&PathBuf::from(logger_path), None, true, None) + .expect("Failed to setup logger."); + debug!("Logger created at {}.", logger_path); + debug!("Creating application... "); + let mut application = GStreamerTestAppBuilder::new() + .with_context(&copper_ctx) + .build() + .expect("Failed to create runtime."); + + debug!("Running..."); + application + .start_all_tasks() + .expect("Failed to start tasks."); + for _ in 0..1000 { + application + .run_one_iteration() + .expect("Failed to run application."); + sleep(Duration::from_millis(100)); // avoid zapping through 1000 buffers before the first image arrived + } + application + .stop_all_tasks() + .expect("Failed to start tasks."); + + debug!("End of program."); +}