diff --git a/components/sources/cu_gstreamer/src/lib.rs b/components/sources/cu_gstreamer/src/lib.rs index 3db5838b9..36081382c 100644 --- a/components/sources/cu_gstreamer/src/lib.rs +++ b/components/sources/cu_gstreamer/src/lib.rs @@ -68,8 +68,8 @@ impl<'cl, const N: usize> CuSrcTask<'cl> for CuGStreamer { Self: Sized, { if !gstreamer::INITIALIZED.load(std::sync::atomic::Ordering::SeqCst) { - debug!("Initializing gstreamer..."); - gstreamer::init().unwrap(); + gstreamer::init() + .map_err(|e| CuError::new_with_cause("Failed to initialize gstreamer.", e))?; } else { debug!("Gstreamer already initialized."); } @@ -93,10 +93,14 @@ impl<'cl, const N: usize> CuSrcTask<'cl> for CuGStreamer { )) }?; - let pipeline = pipeline.dynamic_cast::().unwrap(); + let pipeline = pipeline + .dynamic_cast::() + .map_err(|_| CuError::from("Failed to cast pipeline to gstreamer::Pipeline."))?; - let appsink = pipeline.by_name("copper").unwrap(); - let appsink = appsink.dynamic_cast::().unwrap(); + let appsink = pipeline.by_name("copper").ok_or::("Failed to get find the \"appsink\" element in the pipeline string, be sure you have an appsink name=copper to feed this task.".into())?; + let appsink = appsink + .dynamic_cast::() + .map_err(|_| CuError::from("Failed to cast appsink to gstreamer::AppSink."))?; let caps = Caps::from_str(caps_str.as_str()) .map_err(|e| CuError::new_with_cause("Failed to create caps for appsink.", e))?; @@ -110,7 +114,6 @@ impl<'cl, const N: usize> CuSrcTask<'cl> for CuGStreamer { .new_sample({ let circular_buffer = circular_buffer.clone(); move |appsink| { - debug!("New sample received."); let sample = appsink .pull_sample() .map_err(|_| gstreamer::FlowError::Eos)?; @@ -137,12 +140,12 @@ impl<'cl, const N: usize> CuSrcTask<'cl> for CuGStreamer { fn start(&mut self, _clock: &RobotClock) -> CuResult<()> { self.circular_buffer.lock().unwrap().clear(); - debug!("Starting the pipeline."); self.pipeline .set_state(gstreamer::State::Playing) .map_err(|e| CuError::new_with_cause("Failed to start the gstreamer pipeline.", e))?; Ok(()) } + fn process(&mut self, _clock: &RobotClock, new_msg: Self::Output) -> CuResult<()> { let mut circular_buffer = self.circular_buffer.lock().unwrap(); if let Some(buffer) = circular_buffer.pop_front() { @@ -155,7 +158,6 @@ impl<'cl, const N: usize> CuSrcTask<'cl> for CuGStreamer { } fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> { - debug!("Stopping the pipeline."); self.pipeline .set_state(gstreamer::State::Null) .map_err(|e| CuError::new_with_cause("Failed to stop the gstreamer pipeline.", e))?; @@ -164,17 +166,4 @@ impl<'cl, const N: usize> CuSrcTask<'cl> for CuGStreamer { } } -#[cfg(test)] -mod tests { - use super::*; - use cu29::prelude::*; - use gstreamer::Buffer; - - #[test] - fn test_end_to_end() { - let mut task = CuDefaultGStreamer::new(None).unwrap(); - let clock = RobotClock::new(); - let mut msg = CuMsg::new(Some(CuGstBuffer(Buffer::new()))); - task.process(&clock, &mut msg).unwrap(); - } -} +// No test here, see the integration tests.