Skip to content

Commit

Permalink
some cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
gbin committed Feb 4, 2025
1 parent 93a735d commit 004f7df
Showing 1 changed file with 11 additions and 22 deletions.
33 changes: 11 additions & 22 deletions components/sources/cu_gstreamer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ impl<'cl, const N: usize> CuSrcTask<'cl> for CuGStreamer<N> {
Self: Sized,
{
if !gstreamer::INITIALIZED.load(std::sync::atomic::Ordering::SeqCst) {
debug!("Initializing gstreamer...");
gstreamer::init().unwrap();
gstreamer::init()
.map_err(|e| CuError::new_with_cause("Failed to initialize gstreamer.", e))?;
} else {
debug!("Gstreamer already initialized.");
}
Expand All @@ -93,10 +93,14 @@ impl<'cl, const N: usize> CuSrcTask<'cl> for CuGStreamer<N> {
))
}?;

let pipeline = pipeline.dynamic_cast::<Pipeline>().unwrap();
let pipeline = pipeline
.dynamic_cast::<Pipeline>()
.map_err(|_| CuError::from("Failed to cast pipeline to gstreamer::Pipeline."))?;

let appsink = pipeline.by_name("copper").unwrap();
let appsink = appsink.dynamic_cast::<AppSink>().unwrap();
let appsink = pipeline.by_name("copper").ok_or::<CuError>("Failed to get find the \"appsink\" element in the pipeline string, be sure you have an appsink name=copper to feed this task.".into())?;
let appsink = appsink
.dynamic_cast::<AppSink>()
.map_err(|_| CuError::from("Failed to cast appsink to gstreamer::AppSink."))?;
let caps = Caps::from_str(caps_str.as_str())
.map_err(|e| CuError::new_with_cause("Failed to create caps for appsink.", e))?;

Expand All @@ -110,7 +114,6 @@ impl<'cl, const N: usize> CuSrcTask<'cl> for CuGStreamer<N> {
.new_sample({
let circular_buffer = circular_buffer.clone();
move |appsink| {
debug!("New sample received.");
let sample = appsink
.pull_sample()
.map_err(|_| gstreamer::FlowError::Eos)?;
Expand All @@ -137,12 +140,12 @@ impl<'cl, const N: usize> CuSrcTask<'cl> for CuGStreamer<N> {

fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
self.circular_buffer.lock().unwrap().clear();
debug!("Starting the pipeline.");
self.pipeline
.set_state(gstreamer::State::Playing)
.map_err(|e| CuError::new_with_cause("Failed to start the gstreamer pipeline.", e))?;
Ok(())
}

fn process(&mut self, _clock: &RobotClock, new_msg: Self::Output) -> CuResult<()> {
let mut circular_buffer = self.circular_buffer.lock().unwrap();
if let Some(buffer) = circular_buffer.pop_front() {
Expand All @@ -155,7 +158,6 @@ impl<'cl, const N: usize> CuSrcTask<'cl> for CuGStreamer<N> {
}

fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
debug!("Stopping the pipeline.");
self.pipeline
.set_state(gstreamer::State::Null)
.map_err(|e| CuError::new_with_cause("Failed to stop the gstreamer pipeline.", e))?;
Expand All @@ -164,17 +166,4 @@ impl<'cl, const N: usize> CuSrcTask<'cl> for CuGStreamer<N> {
}
}

#[cfg(test)]
mod tests {
use super::*;
use cu29::prelude::*;
use gstreamer::Buffer;

#[test]
fn test_end_to_end() {
let mut task = CuDefaultGStreamer::new(None).unwrap();
let clock = RobotClock::new();
let mut msg = CuMsg::new(Some(CuGstBuffer(Buffer::new())));
task.process(&clock, &mut msg).unwrap();
}
}
// No test here, see the integration tests.

0 comments on commit 004f7df

Please sign in to comment.