Skip to content

Commit

Permalink
ported to task
Browse files Browse the repository at this point in the history
  • Loading branch information
gbin committed Feb 4, 2025
1 parent 24ca24b commit e0ee69a
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 16 deletions.
1 change: 1 addition & 0 deletions components/sources/cu_gstreamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ gstreamer-app = "0.23.4"
cu29 = { workspace = true }
gstreamer-pbutils = "0.23.4"
gstreamer-video = "0.23.4"
circular-buffer = "1.0.0"
rerun = "*"
bincode = "2.0.0-rc.3"
55 changes: 39 additions & 16 deletions components/sources/cu_gstreamer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ use bincode::de::Decoder;
use bincode::enc::Encoder;
use bincode::error::{DecodeError, EncodeError};
use bincode::{Decode, Encode};
use circular_buffer::CircularBuffer;
use gstreamer::{parse, Buffer, BufferRef, Caps, FlowSuccess, Pipeline};
use gstreamer_app::{AppSink, AppSinkCallbacks};
use std::fmt::Debug;
use std::str::FromStr;
use std::sync::{Arc, Mutex};

#[derive(Debug, Clone, Default)]
pub struct GstBufferWrapper(Buffer);
Expand All @@ -33,13 +35,16 @@ impl Encode for GstBufferWrapper {
}
}

pub struct CuGStreamer {
pub type CuDefaultGStreamer = CuGStreamer<8>;

pub struct CuGStreamer<const N: usize> {
pipeline: Pipeline,
circular_buffer: Arc<Mutex<CircularBuffer<N, GstBufferWrapper>>>,
}

impl Freezable for CuGStreamer {}
impl<const N: usize> Freezable for CuGStreamer<N> {}

impl<'cl> CuSrcTask<'cl> for CuGStreamer {
impl<'cl, const N: usize> CuSrcTask<'cl> for CuGStreamer<N> {
type Output = output_msg!('cl, GstBufferWrapper);

fn new(config: Option<&ComponentConfig>) -> CuResult<Self>
Expand Down Expand Up @@ -76,27 +81,40 @@ impl<'cl> CuSrcTask<'cl> for CuGStreamer {

appsink.set_caps(Some(&caps));

let circular_buffer = Arc::new(Mutex::new(CircularBuffer::new()));

// Configure `appsink` to handle incoming buffers
appsink.set_callbacks(
AppSinkCallbacks::builder()
.new_sample(move |appsink| {
println!("Callback!");
let sample = appsink
.pull_sample()
.map_err(|_| gstreamer::FlowError::Eos)?;
let buffer: &BufferRef = sample.buffer().ok_or(gstreamer::FlowError::Error)?;
// TODO

Ok(FlowSuccess::Ok)
.new_sample({
let mut circular_buffer = circular_buffer.clone();
move |appsink| {
println!("Callback!");
let sample = appsink
.pull_sample()
.map_err(|_| gstreamer::FlowError::Eos)?;
let buffer: &BufferRef =
sample.buffer().ok_or(gstreamer::FlowError::Error)?;
circular_buffer
.lock()
.unwrap()
.push_back(GstBufferWrapper(buffer.to_owned()));

Ok(FlowSuccess::Ok)
}
})
.build(),
);

let s = CuGStreamer { pipeline };
let s = CuGStreamer {
pipeline,
circular_buffer,
};
Ok(s)
}

fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
self.circular_buffer.lock().unwrap().clear();
self.pipeline
.set_state(gstreamer::State::Playing)
.map_err(|e| CuError::new_with_cause("Failed to start the gstreamer pipeline.", e))?;
Expand All @@ -106,18 +124,23 @@ impl<'cl> CuSrcTask<'cl> for CuGStreamer {
self.pipeline
.set_state(gstreamer::State::Null)
.map_err(|e| CuError::new_with_cause("Failed to stop the gstreamer pipeline.", e))?;
self.circular_buffer.lock().unwrap().clear();
Ok(())
}

fn process(&mut self, clock: &RobotClock, new_msg: Self::Output) -> CuResult<()> {
todo!()
let mut circular_buffer = self.circular_buffer.lock().unwrap();
if let Some(buffer) = circular_buffer.pop_front() {
// TODO: timing metadata
new_msg.set_payload(buffer);
}
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::CuGStreamer;
use cu29::prelude::*;
use gstreamer::{parse, Buffer, BufferRef, Caps, FlowSuccess, Pipeline};
use gstreamer_app::{AppSink, AppSinkCallbacks};
Expand All @@ -127,7 +150,7 @@ mod tests {

#[test]
fn test_end_to_end() {
let mut task = CuGStreamer::new(None).unwrap();
let mut task = CuDefaultGStreamer::new(None).unwrap();
let clock = RobotClock::new();
let mut msg = CuMsg::new(Some(GstBufferWrapper(Buffer::new())));
task.process(&clock, &mut msg).unwrap();
Expand Down

0 comments on commit e0ee69a

Please sign in to comment.