Skip to content

Commit

Permalink
First interface for task that does work.
Browse files Browse the repository at this point in the history
It needs to get simpler for the user though.
  • Loading branch information
gbin committed May 15, 2024
1 parent 22c47cc commit 42e9c1c
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 23 deletions.
28 changes: 19 additions & 9 deletions copper/src/cutask.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
use crate::config::NodeConfig;
use serde::{Deserialize, Serialize};

pub trait CuStateful<'a>: Default + Serialize + Deserialize<'a> {}
use crate::config::NodeConfig;

// Everything that is stateful in copper for zero copy constraints need to be restricted to this trait.
pub trait CuMsg: Default + Serialize + for<'a> Deserialize<'a> + Sized {}

pub trait CuMsg<'a>: CuStateful<'a> {}
// Also anything that follows this contract can be a message
impl<T> CuMsg for T where T: Default + Serialize + for<'a> Deserialize<'a> + Sized {}

pub trait CuMsgLifecycle: Sync + Send + Clone + 'static {
type Msg: CuMsg;

fn create(&self) -> &mut Self::Msg;
fn send(&self, msg: &Self::Msg);
}

pub trait CuSrcTask<'a, O>: CuStateful<'a> {
fn initialize(&self, config: NodeConfig, get_buffer: dyn Fn() -> &'a mut O, push_buffer: dyn Fn(&O));
pub trait CuSrcTask<O: CuMsg, L: CuMsgLifecycle<Msg = O>> {
fn new(config: NodeConfig, msgif: L) -> Self;
}

pub trait CuTask<'a, I,O>: CuStateful<'a> {
fn initialize(&self, config: NodeConfig);
pub trait CuTask<I: CuMsg, O: CuMsg> {
fn new(config: NodeConfig) -> Self;
fn process(&self, input: &I, output: &O) -> Result<(), String>;
}

pub trait CuSinkTask<'a, I>: CuStateful<'a> {
fn initialize(&self, config: NodeConfig);
pub trait CuSinkTask<I: CuMsg> {
fn new(&self, config: NodeConfig) -> Self;
fn process(&self, input: &I) -> Result<(), String>;
}
1 change: 1 addition & 0 deletions copper/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod config;
pub mod cutask;
pub mod serde;
66 changes: 66 additions & 0 deletions copper/src/serde/arrays.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use std::fmt;
use std::marker::PhantomData;

use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde::de::{self, SeqAccess, Visitor};
use serde::ser::SerializeSeq;

pub fn serialize<S, T, const WIDTH: usize, const HEIGHT: usize>(
array: &[[T; WIDTH]; HEIGHT],
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: Serializer,
T: Serialize,
{
let mut seq = serializer.serialize_seq(Some(WIDTH * HEIGHT))?;
for row in array.iter() {
for element in row.iter() {
seq.serialize_element(element)?;
}
}
seq.end()
}

pub fn deserialize<'de, D, T, const WIDTH: usize, const HEIGHT: usize>(
deserializer: D,
) -> Result<[[T; WIDTH]; HEIGHT], D::Error>
where
D: Deserializer<'de>,
T: Deserialize<'de> + Default + Copy,
{
struct ArrayVisitor<T, const WIDTH: usize, const HEIGHT: usize> {
marker: PhantomData<T>,
}

impl<'de, T, const WIDTH: usize, const HEIGHT: usize> Visitor<'de>
for ArrayVisitor<T, WIDTH, HEIGHT>
where
T: Deserialize<'de> + Default + Copy,
{
type Value = [[T; WIDTH]; HEIGHT];

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a 2D array")
}

fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: SeqAccess<'de>,
{
let mut array = [[T::default(); WIDTH]; HEIGHT];
for row in array.iter_mut() {
for element in row.iter_mut() {
*element = seq
.next_element()?
.ok_or_else(|| de::Error::invalid_length(0, &self))?;
}
}
Ok(array)
}
}

deserializer.deserialize_seq(ArrayVisitor::<T, WIDTH, HEIGHT> {
marker: PhantomData,
})
}
1 change: 1 addition & 0 deletions copper/src/serde/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod arrays;
2 changes: 2 additions & 0 deletions examples/camreader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ edition = "2021"
copper_plugin_type = "driver"

[dependencies]
copper = { path = "../../copper" }
serde = { version = "1.0.201", features = ["derive"] }
89 changes: 75 additions & 14 deletions examples/camreader/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,89 @@
use std::thread;

struct CamReaderConfig {
use serde::{Deserialize, Serialize};

}

struct CamReader {
use copper::config::NodeConfig;
use copper::cutask::{CuMsg, CuMsgLifecycle, CuSrcTask};
use copper::serde::arrays;

#[derive(Serialize, Deserialize)]
struct ImageBuffer {
#[serde(with = "arrays")]
buffer: [[u8; 1920]; 1200],
}

pub fn build(config: CamReaderConfig) -> CamReader {
CamReader {}
impl Default for ImageBuffer {
fn default() -> Self {
ImageBuffer {
buffer: [[0; 1920]; 1200],
}
}
}

// Concrete struct for CamReader.
struct CamReader<L: CuMsgLifecycle<Msg = ImageBuffer>> {
buff_management: L,
}

pub fn add(left: usize, right: usize) -> usize {
left + right
// Implement CuSrcTask for CamReader with a specific type (ImageBuffer).
impl<L: CuMsgLifecycle<Msg = ImageBuffer>> CuSrcTask<ImageBuffer, L> for CamReader<L> {
fn new(_config: NodeConfig, msgif: L) -> CamReader<L> {
CamReader {
buff_management: msgif,
}
}
}
impl<L: CuMsgLifecycle<Msg = ImageBuffer>> CamReader<L> {
pub fn start(&self) {
let handle = thread::spawn({
let buff_management = self.buff_management.clone();
move || {
let mut i = 0;
loop {
let image = buff_management.create();
image.buffer[0][0] = i;
i += 1;
buff_management.send(image);
}
}
});
}
}

// impl<'a> CuSrcTask<'a, ImageBuffer> for CamReader<'a> {
// fn new(_config: NodeConfig, msgif: dyn CuMsgLifecycle<'a, ImageBuffer>) -> CamReader<'a> {
// CamReader {
// buff_management: Box::new(msgif),
// }
// }
// }

// impl<FB, FP> CamReader<'_, FB, FP> {
// pub fn run(&self) {
// let get_buffer = self.get_buffer.unwrap();
// let push_buffer = self.push_buffer.unwrap();
//
// // let get_buffer = Arc::new(Mutex::new(get_buffer));
// // let push_buffer = Arc::new(Mutex::new(push_buffer));
//
// // thread::spawn(move || {
// // let mut i = 0;
// // loop {
// // {
// // let get_buffer = get_buffer.lock().unwrap();
// // let push_buffer = push_buffer.lock().unwrap();
// // let buffer = get_buffer();
// // buffer.buffer[0][0] = i;
// // i += 1;
// // push_buffer(buffer);
// // }
// // thread::sleep(Duration::from_secs(1));
// // }
// // });
// }
// }
#[cfg(test)]
mod tests {
use super::*;

#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
}
fn it_works() {}
}

0 comments on commit 42e9c1c

Please sign in to comment.