Skip to content

Commit

Permalink
Refactor cumsg.metadata and added easier diags.
Browse files Browse the repository at this point in the history
  • Loading branch information
gbin committed Jun 16, 2024
1 parent 05ca1b2 commit 020b680
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 22 deletions.
3 changes: 2 additions & 1 deletion copper/src/copperlist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ enum CopperListState {
BeingSerialized,
}

/// This structure maintains the entire memory needed by Copper for one process for the inter task communication.
/// This structure maintains the entire memory needed by Copper for one loop for the inter tasks communication within a process.
/// T is typically a Tuple of various types of messages that are exchanged between tasks.
#[derive(Debug)]
pub struct CuListsManager<T: Sized + PartialEq, const N: usize> {
#[allow(dead_code)]
Expand Down
2 changes: 2 additions & 0 deletions copper/src/curuntime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ pub fn compute_runtime_plan(config: &CuConfig) -> CuResult<Vec<CuExecutionStep>>
next_culist_output_index += 1;
CuTaskType::Source
} else if config.graph.neighbors_directed(id.into(), Outgoing).count() == 0 {
// this is a Sink.
input_msg_type = Some(
config
.graph
Expand All @@ -116,6 +117,7 @@ pub fn compute_runtime_plan(config: &CuConfig) -> CuResult<Vec<CuExecutionStep>>
.neighbors_directed(id.into(), Incoming)
.next()
.unwrap();
// Find the source of the incoming message
culist_input_index = find_output_index_from_nodeid(parent.index() as NodeId, &result);
CuTaskType::Sink
} else {
Expand Down
33 changes: 25 additions & 8 deletions copper/src/cutask.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,41 @@
use serde::{Deserialize, Serialize};
use copper_clock::RobotClock;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::fmt::{Display, Formatter};

use crate::clock::OptionCuTime;
use crate::config::NodeInstanceConfig;
use crate::CuResult;
use crate::clock::OptionCuTime;

// Everything that is stateful in copper for zero copy constraints need to be restricted to this trait.
pub trait CuMsgPayload: Default + Serialize + for<'a> Deserialize<'a> + Sized {}

// Also anything that follows this contract can be a payload (blanket implementation)
impl<T> CuMsgPayload for T where T: Default + Serialize + for<'a> Deserialize<'a> + Sized {}

#[derive(Debug, PartialEq, Default)]
pub struct CuMsgMetadata {
pub before_process: OptionCuTime,
pub after_process: OptionCuTime,
}

impl Display for CuMsgMetadata {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"before_process: {}, after_process: {}",
self.before_process, self.after_process
)
}
}

#[derive(Debug, PartialEq)]
pub struct CuMsg<T>
where
T: CuMsgPayload,
{
pub payload: T,

// Runtime statistics
pub before_process: OptionCuTime,
pub after_process: OptionCuTime,
pub metadata: CuMsgMetadata,
}

impl<T> CuMsg<T>
Expand All @@ -30,8 +45,10 @@ where
pub fn new(payload: T) -> Self {
CuMsg {
payload,
before_process: OptionCuTime::none(),
after_process: OptionCuTime::none(),
metadata: CuMsgMetadata {
before_process: OptionCuTime::none(),
after_process: OptionCuTime::none(),
},
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions copper_clock/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ impl OptionCuTime {
}
}

impl Display for OptionCuTime {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if self.is_none() {
write!(f, "None")
} else {
write!(f, "{}", self.0)
}
}
}

impl Default for OptionCuTime {
fn default() -> Self {
Self::none()
Expand Down
50 changes: 38 additions & 12 deletions copper_derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ pub fn copper_runtime(args: TokenStream, input: TokenStream) -> TokenStream {
{
#comment_tokens
let cumsg_output = &mut culist.#output_culist_index;
cumsg_output.before_process = self.copper_runtime.clock.now().into();
cumsg_output.metadata.before_process = self.copper_runtime.clock.now().into();
#task_instance.process(&self.copper_runtime.clock, cumsg_output)?;
cumsg_output.after_process = self.copper_runtime.clock.now().into();
cumsg_output.metadata.after_process = self.copper_runtime.clock.now().into();
}
}
},
Expand All @@ -149,9 +149,9 @@ pub fn copper_runtime(args: TokenStream, input: TokenStream) -> TokenStream {
#comment_tokens
let cumsg_input = &mut culist.#input_culist_index;
let cumsg_output = &mut culist.#output_culist_index;
cumsg_output.before_process = self.copper_runtime.clock.now().into();
cumsg_output.metadata.before_process = self.copper_runtime.clock.now().into();
#task_instance.process(&self.copper_runtime.clock, cumsg_input, cumsg_output)?;
cumsg_output.after_process = self.copper_runtime.clock.now().into();
cumsg_output.metadata.after_process = self.copper_runtime.clock.now().into();
}
}
}
Expand Down Expand Up @@ -182,16 +182,36 @@ pub fn copper_runtime(args: TokenStream, input: TokenStream) -> TokenStream {
parse_quote! { (#(_CuMsg<#all_msgs_types_in_culist_order>),*,)}
};

let run_method = quote! {
pub fn run(&mut self, iterations: u32) -> _CuResult<()> {
let culist = self.copper_runtime.copper_lists.create().expect("Ran out of space for copper lists"); // FIXME: error handling.
for _ in 0..iterations {
#(#runtime_plan_code)*
}
Ok(())
println!("[build the collect metadata function]");
let culist_size = all_msgs_types_in_culist_order.len();
let culist_indices = 0..culist_size;
let collect_metadata_function = quote! {
pub fn collect_metadata<'a>(culist: &'a CuList) -> [&'a _CuMsgMetadata; #culist_size] {
[#( &culist.#culist_indices.metadata, )*]
}
};

println!("[build the run method]");
let run_method = quote! {
pub fn run(&mut self, iterations: u32) -> _CuResult<()> {
let culist = self.copper_runtime.copper_lists.create().expect("Ran out of space for copper lists"); // FIXME: error handling.
for _ in 0..iterations {
#(#runtime_plan_code)*
}

let md = collect_metadata(&culist);
for m in md.iter() {
println!("Metadata: {}", m);
}

let e2e = md.last().unwrap().after_process.unwrap() - md.first().unwrap().before_process.unwrap();
let e2en: u64 = e2e.into();
println!("End to end latency {}, mean latency per hop: {}", e2e, e2en / (md.len() as u64));

Ok(())
}
};

println!("[build result]");
// Convert the modified struct back into a TokenStream
let result = quote! {
Expand All @@ -200,19 +220,25 @@ pub fn copper_runtime(args: TokenStream, input: TokenStream) -> TokenStream {
use copper::config::NodeInstanceConfig as _NodeInstanceConfig;
use copper::config::read_configuration as _read_configuration;
use copper::curuntime::CuRuntime as _CuRuntime;
use copper::cutask::CuTaskLifecycle as _CuTaskLifecycle; // Needed for the instantiation of tasks
use copper::CuResult as _CuResult;
use copper::CuError as _CuError;
use copper::cutask::CuTaskLifecycle as _CuTaskLifecycle; // Needed for the instantiation of tasks
use copper::cutask::CuSrcTask as _CuSrcTask;
use copper::cutask::CuSinkTask as _CuSinkTask;
use copper::cutask::CuTask as _CuTask;
use copper::cutask::CuMsg as _CuMsg;
use copper::cutask::CuMsgMetadata as _CuMsgMetadata;
use copper::clock::RobotClock as _RobotClock;
use copper::clock::OptionCuTime as _OptionCuTime;

// This is the heart of everything.
// CuTasks is the list of all the tasks types.
// CuList is the list of all the messages types.
pub type CuTasks = #task_types_tuple;
pub type CuList = #msgs_types_tuple;

// This generates a way to get the metadata of every single message of a culist at low cost
#collect_metadata_function

fn tasks_instanciator(all_instances_configs: Vec<Option<&_NodeInstanceConfig>>) -> _CuResult<CuTasks> {
Ok(( #(#task_instances_init_code),*, ))
Expand Down
3 changes: 2 additions & 1 deletion examples/cu_caterpillar/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ fn main() {

let extra: ExtraTextLogger = ExtraTextLogger::new(log_index_path, slow_text_logger);
let clock = RobotClock::default();
let _needed = LoggerRuntime::init(clock.clone(), stream, Some(extra));
//let _needed = LoggerRuntime::init(clock.clone(), stream, Some(extra)); // with the slow textual logger on top.
let _needed = LoggerRuntime::init(clock.clone(), stream, None); // with only the fast copper logger.
debug!("Application created.");
let mut application =
TheVeryHungryCaterpillar::new(clock.clone()).expect("Failed to create runtime.");
Expand Down

0 comments on commit 020b680

Please sign in to comment.