diff --git a/Cargo.toml b/Cargo.toml index 4b6287509..4182eaa74 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,3 @@ -[workspace] -members = [ "copper", "copper_derive", "copper_derive_test", "examples/pluginload", "examples/camreader"] +[workspace] +members = ["copper", "copper_derive", "copper_derive_test", "examples/pluginload", "examples/v4lsrc"] resolver = "2" diff --git a/copper/src/cutask.rs b/copper/src/cutask.rs index bb6dba02c..a6414a477 100644 --- a/copper/src/cutask.rs +++ b/copper/src/cutask.rs @@ -8,23 +8,54 @@ pub trait CuMsg: Default + Serialize + for<'a> Deserialize<'a> + Sized {} // Also anything that follows this contract can be a message impl CuMsg for T where T: Default + Serialize + for<'a> Deserialize<'a> + Sized {} -pub trait CuMsgLifecycle: Sync + Send + Clone + 'static { - type Msg: CuMsg; - - fn create(&self) -> &mut Self::Msg; - fn send(&self, msg: &Self::Msg); +pub type CuError = String; + +// Define your custom Result type alias +pub type CuResult = std::result::Result; + +// Because of the Rust orphan rule, we need to define the common methods in a macro. +// This can be cleaned up with a proc macro or with a negative impl +// https://doc.rust-lang.org/beta/unstable-book/language-features/negative-impls.html when +// they are stabilized. +macro_rules! cu_task_common { + () => { + fn new(config: NodeConfig) -> CuResult + where + Self: Sized; + + fn start(&mut self) -> CuResult<()> { + Ok(()) + } + + fn preprocess(&mut self) -> CuResult<()> { + Ok(()) + } + + fn postprocess(&mut self) -> CuResult<()> { + Ok(()) + } + + fn stop(&mut self) -> CuResult<()> { + Ok(()) + } + }; } -pub trait CuSrcTask> { - fn new(config: NodeConfig, msgif: L) -> Self; +pub trait CuSrcTask { + type Msg: CuMsg; + cu_task_common!(); + fn process(&mut self, empty_msg: &mut Self::Msg) -> CuResult<()>; } -pub trait CuTask { - fn new(config: NodeConfig) -> Self; - fn process(&self, input: &I, output: &O) -> Result<(), String>; +pub trait CuTask { + type Input: CuMsg; + type Output: CuMsg; + cu_task_common!(); + fn process(&mut self, input: &Self::Input, output: &Self::Output) -> CuResult<()>; } -pub trait CuSinkTask { - fn new(&self, config: NodeConfig) -> Self; - fn process(&self, input: &I) -> Result<(), String>; +pub trait CuSinkTask { + type Input: CuMsg; + cu_task_common!(); + fn process(&mut self, input: &Self::Input) -> CuResult<()>; } diff --git a/examples/camreader/src/lib.rs b/examples/camreader/src/lib.rs deleted file mode 100644 index b99f50620..000000000 --- a/examples/camreader/src/lib.rs +++ /dev/null @@ -1,89 +0,0 @@ -use std::thread; - -use serde::{Deserialize, Serialize}; - -use copper::config::NodeConfig; -use copper::cutask::{CuMsg, CuMsgLifecycle, CuSrcTask}; -use copper::serde::arrays; - -#[derive(Serialize, Deserialize)] -struct ImageBuffer { - #[serde(with = "arrays")] - buffer: [[u8; 1920]; 1200], -} - -impl Default for ImageBuffer { - fn default() -> Self { - ImageBuffer { - buffer: [[0; 1920]; 1200], - } - } -} - -// Concrete struct for CamReader. -struct CamReader> { - buff_management: L, -} - -// Implement CuSrcTask for CamReader with a specific type (ImageBuffer). -impl> CuSrcTask for CamReader { - fn new(_config: NodeConfig, msgif: L) -> CamReader { - CamReader { - buff_management: msgif, - } - } -} -impl> CamReader { - pub fn start(&self) { - let handle = thread::spawn({ - let buff_management = self.buff_management.clone(); - move || { - let mut i = 0; - loop { - let image = buff_management.create(); - image.buffer[0][0] = i; - i += 1; - buff_management.send(image); - } - } - }); - } -} - -// impl<'a> CuSrcTask<'a, ImageBuffer> for CamReader<'a> { -// fn new(_config: NodeConfig, msgif: dyn CuMsgLifecycle<'a, ImageBuffer>) -> CamReader<'a> { -// CamReader { -// buff_management: Box::new(msgif), -// } -// } -// } - -// impl CamReader<'_, FB, FP> { -// pub fn run(&self) { -// let get_buffer = self.get_buffer.unwrap(); -// let push_buffer = self.push_buffer.unwrap(); -// -// // let get_buffer = Arc::new(Mutex::new(get_buffer)); -// // let push_buffer = Arc::new(Mutex::new(push_buffer)); -// -// // thread::spawn(move || { -// // let mut i = 0; -// // loop { -// // { -// // let get_buffer = get_buffer.lock().unwrap(); -// // let push_buffer = push_buffer.lock().unwrap(); -// // let buffer = get_buffer(); -// // buffer.buffer[0][0] = i; -// // i += 1; -// // push_buffer(buffer); -// // } -// // thread::sleep(Duration::from_secs(1)); -// // } -// // }); -// } -// } -#[cfg(test)] -mod tests { - #[test] - fn it_works() {} -} diff --git a/examples/pluginload/Cargo.toml b/examples/pluginload/Cargo.toml index 0df2e3aa1..3414d52ae 100644 --- a/examples/pluginload/Cargo.toml +++ b/examples/pluginload/Cargo.toml @@ -4,9 +4,9 @@ version = "0.1.0" edition = "2021" [dependencies] -uom = {version="0.36.0", features=["rational"]} +uom = { version = "0.36.0", features = ["rational"] } copper = { path = "../../copper" } -camreader = { path = "../camreader" } +camreader = { path = "../v4lsrc" } [build-dependencies] cargo_metadata = "0.18.1" diff --git a/examples/camreader/Cargo.toml b/examples/v4lsrc/Cargo.toml similarity index 64% rename from examples/camreader/Cargo.toml rename to examples/v4lsrc/Cargo.toml index 02cefdc7d..1c7252192 100644 --- a/examples/camreader/Cargo.toml +++ b/examples/v4lsrc/Cargo.toml @@ -8,4 +8,5 @@ copper_plugin_type = "driver" [dependencies] copper = { path = "../../copper" } -serde = { version = "1.0.201", features = ["derive"] } \ No newline at end of file +serde = { version = "1.0.202", features = ["derive"] } +linux-video = { version = "0.1.1" } \ No newline at end of file diff --git a/examples/v4lsrc/src/lib.rs b/examples/v4lsrc/src/lib.rs new file mode 100644 index 000000000..5f6cd7a94 --- /dev/null +++ b/examples/v4lsrc/src/lib.rs @@ -0,0 +1,112 @@ +use linux_video::{Device, Stream}; +use linux_video::types::*; +use serde::{Deserialize, Serialize}; + +use copper::config::NodeConfig; +use copper::cutask::{CuResult, CuSrcTask}; +use copper::serde::arrays; + +#[derive(Serialize, Deserialize)] +struct ImageMsg { + #[serde(with = "arrays")] + buffer: [[u8; 1920]; 1200], +} + +impl Default for ImageMsg { + fn default() -> Self { + ImageMsg { + buffer: [[0; 1920]; 1200], + } + } +} + +impl ImageMsg { + fn copy_from(&mut self, buff_src: &[u8]) { + let mut x = 0usize; + let mut y = 0usize; + + for el in buff_src { + self.buffer[y][x] = *el; + x += 1; + if x == 1920 { + x = 0; + y += 1; + if y == 1200 { + break; + } + } + } + } +} + +struct Video4LinuxSource { + device: Device, + stream: Option>, +} + +impl CuSrcTask for Video4LinuxSource { + type Msg = ImageMsg; + + fn new(config: NodeConfig) -> CuResult + where + Self: Sized, + { + let device = Device::open("/dev/video0").unwrap(); + Ok(Video4LinuxSource { + device, + stream: None, + }) + } + + fn start(&mut self) -> CuResult<()> { + self.stream = Some( + self.device + .stream::(ContentType::Video, 4) + .unwrap(), + ); + Ok(()) + } + + fn process(&mut self, empty_msg: &mut Self::Msg) -> CuResult<()> { + let stream = self.stream.as_ref().unwrap(); + if let Ok(buffer) = stream.next() { + let buffer = buffer.lock(); + empty_msg.copy_from(buffer.as_ref()); + } + Ok(()) + } + fn stop(&mut self) -> CuResult<()> { + self.stream = None; // This will trigger the Drop implementation on Stream + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::mem; + + use super::*; + + #[test] + fn emulate_runtime() -> CuResult<()> { + println!("Build config"); + let config = NodeConfig::default(); + println!("Build task"); + let mut task = Video4LinuxSource::new(config)?; + println!("Build img"); + // emulates the inplace behavior of copper's runtime. + let size_of_image_msg = mem::size_of::(); + let mut memory: Vec = vec![0; size_of_image_msg]; + let ptr = memory.as_mut_ptr() as *mut ImageMsg; + let img = unsafe { &mut *ptr }; + + println!("Start"); + task.start()?; + println!("Process"); + task.process(img)?; + println!("First byte: {}", img.buffer[0][0]); + println!("Stop"); + task.stop()?; + Ok(()) + } +}