diff --git a/components/sources/cu_gstreamer/Cargo.toml b/components/sources/cu_gstreamer/Cargo.toml index 0d23049c8..1618a1ac0 100644 --- a/components/sources/cu_gstreamer/Cargo.toml +++ b/components/sources/cu_gstreamer/Cargo.toml @@ -17,5 +17,6 @@ gstreamer-app = "0.23.4" cu29 = { workspace = true } gstreamer-pbutils = "0.23.4" gstreamer-video = "0.23.4" +circular-buffer = "1.0.0" rerun = "*" bincode = "2.0.0-rc.3" \ No newline at end of file diff --git a/components/sources/cu_gstreamer/src/lib.rs b/components/sources/cu_gstreamer/src/lib.rs index 9b2a1938d..9d89ceacb 100644 --- a/components/sources/cu_gstreamer/src/lib.rs +++ b/components/sources/cu_gstreamer/src/lib.rs @@ -6,10 +6,12 @@ use bincode::de::Decoder; use bincode::enc::Encoder; use bincode::error::{DecodeError, EncodeError}; use bincode::{Decode, Encode}; +use circular_buffer::CircularBuffer; use gstreamer::{parse, Buffer, BufferRef, Caps, FlowSuccess, Pipeline}; use gstreamer_app::{AppSink, AppSinkCallbacks}; use std::fmt::Debug; use std::str::FromStr; +use std::sync::{Arc, Mutex}; #[derive(Debug, Clone, Default)] pub struct GstBufferWrapper(Buffer); @@ -33,13 +35,16 @@ impl Encode for GstBufferWrapper { } } -pub struct CuGStreamer { +pub type CuDefaultGStreamer = CuGStreamer<8>; + +pub struct CuGStreamer { pipeline: Pipeline, + circular_buffer: Arc>>, } -impl Freezable for CuGStreamer {} +impl Freezable for CuGStreamer {} -impl<'cl> CuSrcTask<'cl> for CuGStreamer { +impl<'cl, const N: usize> CuSrcTask<'cl> for CuGStreamer { type Output = output_msg!('cl, GstBufferWrapper); fn new(config: Option<&ComponentConfig>) -> CuResult @@ -76,27 +81,40 @@ impl<'cl> CuSrcTask<'cl> for CuGStreamer { appsink.set_caps(Some(&caps)); + let circular_buffer = Arc::new(Mutex::new(CircularBuffer::new())); + // Configure `appsink` to handle incoming buffers appsink.set_callbacks( AppSinkCallbacks::builder() - .new_sample(move |appsink| { - println!("Callback!"); - let sample = appsink - .pull_sample() - .map_err(|_| gstreamer::FlowError::Eos)?; - let buffer: &BufferRef = sample.buffer().ok_or(gstreamer::FlowError::Error)?; - // TODO - - Ok(FlowSuccess::Ok) + .new_sample({ + let mut circular_buffer = circular_buffer.clone(); + move |appsink| { + println!("Callback!"); + let sample = appsink + .pull_sample() + .map_err(|_| gstreamer::FlowError::Eos)?; + let buffer: &BufferRef = + sample.buffer().ok_or(gstreamer::FlowError::Error)?; + circular_buffer + .lock() + .unwrap() + .push_back(GstBufferWrapper(buffer.to_owned())); + + Ok(FlowSuccess::Ok) + } }) .build(), ); - let s = CuGStreamer { pipeline }; + let s = CuGStreamer { + pipeline, + circular_buffer, + }; Ok(s) } fn start(&mut self, _clock: &RobotClock) -> CuResult<()> { + self.circular_buffer.lock().unwrap().clear(); self.pipeline .set_state(gstreamer::State::Playing) .map_err(|e| CuError::new_with_cause("Failed to start the gstreamer pipeline.", e))?; @@ -106,18 +124,23 @@ impl<'cl> CuSrcTask<'cl> for CuGStreamer { self.pipeline .set_state(gstreamer::State::Null) .map_err(|e| CuError::new_with_cause("Failed to stop the gstreamer pipeline.", e))?; + self.circular_buffer.lock().unwrap().clear(); Ok(()) } fn process(&mut self, clock: &RobotClock, new_msg: Self::Output) -> CuResult<()> { - todo!() + let mut circular_buffer = self.circular_buffer.lock().unwrap(); + if let Some(buffer) = circular_buffer.pop_front() { + // TODO: timing metadata + new_msg.set_payload(buffer); + } + Ok(()) } } #[cfg(test)] mod tests { use super::*; - use crate::CuGStreamer; use cu29::prelude::*; use gstreamer::{parse, Buffer, BufferRef, Caps, FlowSuccess, Pipeline}; use gstreamer_app::{AppSink, AppSinkCallbacks}; @@ -127,7 +150,7 @@ mod tests { #[test] fn test_end_to_end() { - let mut task = CuGStreamer::new(None).unwrap(); + let mut task = CuDefaultGStreamer::new(None).unwrap(); let clock = RobotClock::new(); let mut msg = CuMsg::new(Some(GstBufferWrapper(Buffer::new()))); task.process(&clock, &mut msg).unwrap();