Skip to content

Commit

Permalink
Works on my machine (c) (r)
Browse files Browse the repository at this point in the history
  • Loading branch information
gbin committed Feb 4, 2025
1 parent e0ee69a commit fdec476
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 103 deletions.
16 changes: 10 additions & 6 deletions components/sources/cu_gstreamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ homepage.workspace = true
repository.workspace = true

[dependencies]
gstreamer = "0.23.4"
gstreamer-app = "0.23.4"
cu29 = { workspace = true }
gstreamer-pbutils = "0.23.4"
gstreamer-video = "0.23.4"
bincode = { workspace = true }
circular-buffer = "1.0.0"
rerun = "*"
bincode = "2.0.0-rc.3"
gstreamer = "0.23.4"
gstreamer-app = "0.23.4"

[dev-dependencies]
rerun = { workspace = true }
cu29-helpers = { workspace = true }

[build-dependencies]
cfg_aliases = "0.2.1"
11 changes: 11 additions & 0 deletions components/sources/cu_gstreamer/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use cfg_aliases::cfg_aliases;
fn main() {
println!(
"cargo:rustc-env=LOG_INDEX_DIR={}",
std::env::var("OUT_DIR").unwrap()
);
cfg_aliases! {
hardware: { all(target_os = "linux", not(feature = "mock")) },
mock: { any(not(target_os = "linux"), feature = "mock") },
}
}
144 changes: 47 additions & 97 deletions components/sources/cu_gstreamer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use cu29::prelude::*;
use gstreamer::prelude::*;
use std::error::Error;

use bincode::de::Decoder;
use bincode::enc::Encoder;
Expand All @@ -10,20 +9,36 @@ use circular_buffer::CircularBuffer;
use gstreamer::{parse, Buffer, BufferRef, Caps, FlowSuccess, Pipeline};
use gstreamer_app::{AppSink, AppSinkCallbacks};
use std::fmt::Debug;
use std::ops::{Deref, DerefMut};
use std::str::FromStr;
use std::sync::{Arc, Mutex};

#[derive(Debug, Clone, Default)]
pub struct GstBufferWrapper(Buffer);
impl Decode for GstBufferWrapper {
pub struct CuGstBuffer(Buffer);

impl Deref for CuGstBuffer {
type Target = Buffer;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl DerefMut for CuGstBuffer {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

impl Decode for CuGstBuffer {
fn decode<D: Decoder>(decoder: &mut D) -> Result<Self, DecodeError> {
let vec: Vec<u8> = Vec::decode(decoder)?;
let buffer = Buffer::from_slice(vec);
Ok(GstBufferWrapper(buffer))
Ok(CuGstBuffer(buffer))
}
}

impl Encode for GstBufferWrapper {
impl Encode for CuGstBuffer {
fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
self.0
.as_ref()
Expand All @@ -39,32 +54,38 @@ pub type CuDefaultGStreamer = CuGStreamer<8>;

pub struct CuGStreamer<const N: usize> {
pipeline: Pipeline,
circular_buffer: Arc<Mutex<CircularBuffer<N, GstBufferWrapper>>>,
circular_buffer: Arc<Mutex<CircularBuffer<N, CuGstBuffer>>>,
_appsink: AppSink,
}

impl<const N: usize> Freezable for CuGStreamer<N> {}

impl<'cl, const N: usize> CuSrcTask<'cl> for CuGStreamer<N> {
type Output = output_msg!('cl, GstBufferWrapper);
type Output = output_msg!('cl, CuGstBuffer);

fn new(config: Option<&ComponentConfig>) -> CuResult<Self>
where
Self: Sized,
{
if !gstreamer::INITIALIZED.load(std::sync::atomic::Ordering::SeqCst) {
debug!("Initializing gstreamer...");
gstreamer::init().unwrap();
} else {
debug!("Gstreamer already initialized.");
}

let config = config.ok_or_else(|| CuError::from("No config provided."))?;

let pipeline = if let Some(pipeline_str) = config.get::<String>("pipeline") {
debug!("Creating with pipeline: {}", &pipeline_str);
let pipeline = parse::launch(pipeline_str.as_str())
.map_err(|e| CuError::new_with_cause("Failed to parse pipeline.", e))?;
Ok(pipeline)
} else {
Err(CuError::from("No pipeline provided."))
}?;
let caps_str = if let Some(caps_str) = config.get::<String>("caps") {
debug!("Creating with caps: {}", &caps_str);
Ok(caps_str)
} else {
Err(CuError::from(
Expand All @@ -87,9 +108,9 @@ impl<'cl, const N: usize> CuSrcTask<'cl> for CuGStreamer<N> {
appsink.set_callbacks(
AppSinkCallbacks::builder()
.new_sample({
let mut circular_buffer = circular_buffer.clone();
let circular_buffer = circular_buffer.clone();
move |appsink| {
println!("Callback!");
debug!("New sample received.");
let sample = appsink
.pull_sample()
.map_err(|_| gstreamer::FlowError::Eos)?;
Expand All @@ -98,7 +119,7 @@ impl<'cl, const N: usize> CuSrcTask<'cl> for CuGStreamer<N> {
circular_buffer
.lock()
.unwrap()
.push_back(GstBufferWrapper(buffer.to_owned()));
.push_back(CuGstBuffer(buffer.to_owned()));

Ok(FlowSuccess::Ok)
}
Expand All @@ -109,122 +130,51 @@ impl<'cl, const N: usize> CuSrcTask<'cl> for CuGStreamer<N> {
let s = CuGStreamer {
pipeline,
circular_buffer,
_appsink: appsink,
};
Ok(s)
}

fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
self.circular_buffer.lock().unwrap().clear();
debug!("Starting the pipeline.");
self.pipeline
.set_state(gstreamer::State::Playing)
.map_err(|e| CuError::new_with_cause("Failed to start the gstreamer pipeline.", e))?;
Ok(())
}
fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
self.pipeline
.set_state(gstreamer::State::Null)
.map_err(|e| CuError::new_with_cause("Failed to stop the gstreamer pipeline.", e))?;
self.circular_buffer.lock().unwrap().clear();
Ok(())
}

fn process(&mut self, clock: &RobotClock, new_msg: Self::Output) -> CuResult<()> {
fn process(&mut self, _clock: &RobotClock, new_msg: Self::Output) -> CuResult<()> {
let mut circular_buffer = self.circular_buffer.lock().unwrap();
if let Some(buffer) = circular_buffer.pop_front() {
// TODO: timing metadata
new_msg.set_payload(buffer);
} else {
debug!("Empty circular buffer.");
}
Ok(())
}

fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
debug!("Stopping the pipeline.");
self.pipeline
.set_state(gstreamer::State::Null)
.map_err(|e| CuError::new_with_cause("Failed to stop the gstreamer pipeline.", e))?;
self.circular_buffer.lock().unwrap().clear();
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use cu29::prelude::*;
use gstreamer::{parse, Buffer, BufferRef, Caps, FlowSuccess, Pipeline};
use gstreamer_app::{AppSink, AppSinkCallbacks};
use rerun::{ChannelDatatype, ColorModel, Image, RecordingStreamBuilder};
use std::thread::sleep;
use std::time::Duration;
use gstreamer::Buffer;

#[test]
fn test_end_to_end() {
let mut task = CuDefaultGStreamer::new(None).unwrap();
let clock = RobotClock::new();
let mut msg = CuMsg::new(Some(GstBufferWrapper(Buffer::new())));
let mut msg = CuMsg::new(Some(CuGstBuffer(Buffer::new())));
task.process(&clock, &mut msg).unwrap();
}

#[test]
fn old() {
let rec = RecordingStreamBuilder::new("Camera B&W Viz")
.spawn()
.unwrap();

gstreamer::init().unwrap();

let pipeline = parse::launch(
"v4l2src device=/dev/video2 ! video/x-raw, format=NV12, width=1920, height=1080 ! appsink name=sink",
).unwrap();
println!("launched");
let pipeline = pipeline.dynamic_cast::<Pipeline>().unwrap();

let appsink = pipeline.by_name("sink").unwrap();
let appsink = appsink.dynamic_cast::<AppSink>().unwrap();

appsink.set_caps(Some(
&Caps::builder("video/x-raw")
.field("format", &"NV12")
.build(),
));

// Configure `appsink` to handle incoming buffers
appsink.set_callbacks(
AppSinkCallbacks::builder()
.new_sample(move |appsink| {
println!("Callback!");
let sample = appsink
.pull_sample()
.map_err(|_| gstreamer::FlowError::Eos)?;
let buffer: &BufferRef = sample.buffer().ok_or(gstreamer::FlowError::Error)?;

// Get the buffer's memory (zero-copy access)
let data = buffer
.map_readable()
.map_err(|_| gstreamer::FlowError::Error)?;
println!("Received buffer: {} bytes", data.len());
let width = 1920;
let height = 1080;
let y_plane_size = width * height;
let grey_image = &data[0..y_plane_size];

// Rerun stuff
let image = Image::from_color_model_and_bytes(
grey_image.to_vec(),
[width as u32, height as u32],
ColorModel::L,
ChannelDatatype::U8,
);
{
rec.log("camera/image", &image).map_err(|err| {
eprintln!("Error logging image to rerun: {:?}", err);
gstreamer::FlowError::Error
})?;
}
// end rerun

Ok(FlowSuccess::Ok)
})
.build(),
);

// Start streaming
pipeline.set_state(gstreamer::State::Playing).unwrap();

println!("Streaming... Press Ctrl+C to stop.");
loop {
sleep(Duration::from_millis(100));
}
}
}
21 changes: 21 additions & 0 deletions components/sources/cu_gstreamer/tests/copperconfig.ron
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
(
tasks: [
(
id: "src",
type: "cu_gstreamer::CuDefaultGStreamer",
config: { // my webcam produces mjpeg, this is just to emulate a more embedded format like NV12
"pipeline": "v4l2src device=/dev/video0 ! image/jpeg, width=1920, height=1080 ! jpegdec ! videoconvert ! video/x-raw, format=NV12 ! appsink name=copper",
"caps": "video/x-raw, format=NV12, width=1920, height=1080",
},
),
( id: "tester",
type: "GStreamerTester"
),
],
cnx: [
(src: "src", dst: "tester", msg: "cu_gstreamer::CuGstBuffer"),
],
logging: (
enable_task_logging: false
)
)
83 changes: 83 additions & 0 deletions components/sources/cu_gstreamer/tests/gstreamer_tester.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use cu29::prelude::*;
use cu29_helpers::basic_copper_setup;
use cu_gstreamer::CuGstBuffer;
use rerun::{ChannelDatatype, ColorModel, Image, RecordingStream, RecordingStreamBuilder};
use std::path::PathBuf;
use std::thread::sleep;
use std::time::Duration;

struct GStreamerTester {
rec: RecordingStream,
}

impl Freezable for GStreamerTester {}

impl<'cl> CuSinkTask<'cl> for GStreamerTester {
type Input = input_msg!('cl, CuGstBuffer);

fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
where
Self: Sized,
{
let rec = RecordingStreamBuilder::new("Camera B&W Viz")
.spawn()
.unwrap();
Ok(Self { rec })
}

fn process(&mut self, _clock: &RobotClock, msg: Self::Input) -> CuResult<()> {
if msg.payload().is_none() {
debug!("Skipped");
return Ok(());
}
// Get the buffer's memory (zero-copy access)
let data = msg.payload().unwrap().map_readable().unwrap();
println!("Received buffer: {} bytes", data.len());
let width = 1920;
let height = 1080;
let y_plane_size = width * height;
let grey_image = &data[0..y_plane_size];

// Rerun stuff
let image = Image::from_color_model_and_bytes(
grey_image.to_vec(),
[width as u32, height as u32],
ColorModel::L,
ChannelDatatype::U8,
);
self.rec.log("camera/image", &image).unwrap();
Ok(())
}
}

#[copper_runtime(config = "tests/copperconfig.ron")]
struct GStreamerTestApp {}

#[test]
fn end_2_end() {
let logger_path = "/tmp/caterpillar.copper";
let copper_ctx = basic_copper_setup(&PathBuf::from(logger_path), None, true, None)
.expect("Failed to setup logger.");
debug!("Logger created at {}.", logger_path);
debug!("Creating application... ");
let mut application = GStreamerTestAppBuilder::new()
.with_context(&copper_ctx)
.build()
.expect("Failed to create runtime.");

debug!("Running...");
application
.start_all_tasks()
.expect("Failed to start tasks.");
for _ in 0..1000 {
application
.run_one_iteration()
.expect("Failed to run application.");
sleep(Duration::from_millis(100)); // avoid zapping through 1000 buffers before the first image arrived
}
application
.stop_all_tasks()
.expect("Failed to start tasks.");

debug!("End of program.");
}

0 comments on commit fdec476

Please sign in to comment.