Skip to content

Commit

Permalink
WIP struct log reading
Browse files Browse the repository at this point in the history
  • Loading branch information
gbin committed May 30, 2024
1 parent 03188fd commit 0e32e4c
Show file tree
Hide file tree
Showing 13 changed files with 145 additions and 49 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ members = ["copper",
"copper_log_runtime",
"copper_datalogger",
"copper_clock",
"copper_traits"]
"copper_traits", "copper_log_reader"]
resolver = "2"
70 changes: 41 additions & 29 deletions copper_datalogger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use bincode::{decode_from_reader, decode_from_slice};
use bincode_derive::Decode as dDecode;
use bincode_derive::Encode as dEncode;

use copper::{CuError, CuResult, Stream};
use copper::{CuError, CuResult, DataLogType, Stream};

const MAIN_MAGIC: [u8; 4] = [0xB4, 0xA5, 0x50, 0xFF];

Expand All @@ -31,21 +31,15 @@ struct MainHeader {
first_section_offset: u16, // This is to align with a page at write time.
}

#[derive(dEncode, dDecode, Copy, Clone)]
pub enum EntryType {
StructuredLogLine,
CopperList,
}

#[derive(dEncode, dDecode)]
struct SectionHeader {
magic: [u8; 2],
entry_type: EntryType,
entry_type: DataLogType,
section_size: u32, // offset of section_magic + section_size -> should be the index of the next section_magic
}

struct MmapStream {
entry_type: EntryType,
entry_type: DataLogType,
parent_logger: Arc<Mutex<DataLogger>>,
current_slice: &'static mut [u8],
current_position: usize,
Expand All @@ -54,7 +48,7 @@ struct MmapStream {

impl MmapStream {
fn new(
entry_type: EntryType,
entry_type: DataLogType,
parent_logger: Arc<Mutex<DataLogger>>,
current_slice: &'static mut [u8],
minimum_allocation_amount: usize,
Expand Down Expand Up @@ -114,7 +108,7 @@ impl Drop for MmapStream {

pub fn stream(
logger: Arc<Mutex<DataLogger>>,
entry_type: EntryType,
entry_type: DataLogType,
minimum_allocation_amount: usize,
) -> impl Stream {
let clone = logger.clone();
Expand Down Expand Up @@ -208,7 +202,7 @@ impl DataLogger {
}

/// The returned slice is section_size or greater.
fn add_section(&mut self, entry_type: EntryType, section_size: usize) -> &mut [u8] {
fn add_section(&mut self, entry_type: DataLogType, section_size: usize) -> &mut [u8] {
// align current_position to the next page
self.current_global_position =
(self.current_global_position + self.page_size - 1) & !(self.page_size - 1);
Expand Down Expand Up @@ -264,24 +258,41 @@ impl Drop for DataLogger {
// Section iterator returning the [u8] of the current section
pub struct SectionIterator {
reader: BufReader<File>,
datalogtype: Option<DataLogType>,
}

impl Iterator for SectionIterator {
type Item = Vec<u8>;

fn next(&mut self) -> Option<Self::Item> {
let section_header: SectionHeader = decode_from_reader(&mut self.reader, standard())
.expect("Failed to decode section header");
let mut section = vec![0; section_header.section_size as usize];
self.reader
.read_exact(section.as_mut_slice())
.expect("Failed to read section");
Some(section)
let answer: Option<Vec<u8>> = loop {
if let Ok(section_header) =
decode_from_reader::<SectionHeader, _, _>(&mut self.reader, standard())
{
let mut section = vec![0; section_header.section_size as usize];
let read = self.reader.read_exact(section.as_mut_slice());
if read.is_err() {
break None;
}
if let Some(datalogtype) = self.datalogtype {
if section_header.entry_type == datalogtype {
break Some(section);
}
}
} else {
break None;
}
};
answer
}
}

// make an iterator to read back a serialzed datalogger from a file
pub fn read_datalogger(file_path: &Path) -> io::Result<impl Iterator<Item = Vec<u8>>> {
// optionally filter by type
pub fn read_datalogger(
file_path: &Path,
datalogtype: Option<DataLogType>,
) -> io::Result<impl Iterator<Item = Vec<u8>>> {
let mut file = OpenOptions::new().read(true).open(file_path)?;
let mut header = [0; 4096];
let s = file.read(&mut header).unwrap();
Expand All @@ -306,6 +317,7 @@ pub fn read_datalogger(file_path: &Path) -> io::Result<impl Iterator<Item = Vec<

Ok(SectionIterator {
reader: BufReader::new(file),
datalogtype,
})
}

Expand All @@ -332,8 +344,8 @@ mod tests {
let used = {
let mut logger =
DataLogger::new(&file_path, Some(100000)).expect("Failed to create logger");
logger.add_section(EntryType::StructuredLogLine, 1024);
logger.add_section(EntryType::CopperList, 2048);
logger.add_section(DataLogType::StructuredLogLine, 1024);
logger.add_section(DataLogType::CopperList, 2048);
let used = logger.used();
assert!(used < 3 * 4096); // ie. 3 headers, 1 page max per
// logger drops
Expand All @@ -353,7 +365,7 @@ mod tests {
let tmp_dir = TempDir::new().expect("could not create a tmp dir");
let (logger, _) = make_a_logger(&tmp_dir);
{
let stream = stream(logger.clone(), EntryType::StructuredLogLine, 1024);
let stream = stream(logger.clone(), DataLogType::StructuredLogLine, 1024);
assert_eq!(logger.lock().unwrap().sections_in_flight.len(), 1);
}
let lg = logger.lock().unwrap();
Expand All @@ -365,9 +377,9 @@ mod tests {
fn test_two_sections_self_cleaning_in_order() {
let tmp_dir = TempDir::new().expect("could not create a tmp dir");
let (logger, _) = make_a_logger(&tmp_dir);
let s1 = stream(logger.clone(), EntryType::StructuredLogLine, 1024);
let s1 = stream(logger.clone(), DataLogType::StructuredLogLine, 1024);
assert_eq!(logger.lock().unwrap().sections_in_flight.len(), 1);
let s2 = stream(logger.clone(), EntryType::StructuredLogLine, 1024);
let s2 = stream(logger.clone(), DataLogType::StructuredLogLine, 1024);
assert_eq!(logger.lock().unwrap().sections_in_flight.len(), 2);
drop(s2);
assert_eq!(logger.lock().unwrap().sections_in_flight.len(), 1);
Expand All @@ -381,9 +393,9 @@ mod tests {
fn test_two_sections_self_cleaning_out_of_order() {
let tmp_dir = TempDir::new().expect("could not create a tmp dir");
let (logger, _) = make_a_logger(&tmp_dir);
let s1 = stream(logger.clone(), EntryType::StructuredLogLine, 1024);
let s1 = stream(logger.clone(), DataLogType::StructuredLogLine, 1024);
assert_eq!(logger.lock().unwrap().sections_in_flight.len(), 1);
let s2 = stream(logger.clone(), EntryType::StructuredLogLine, 1024);
let s2 = stream(logger.clone(), DataLogType::StructuredLogLine, 1024);
assert_eq!(logger.lock().unwrap().sections_in_flight.len(), 2);
drop(s1);
assert_eq!(logger.lock().unwrap().sections_in_flight.len(), 1);
Expand All @@ -400,12 +412,12 @@ mod tests {
let p = f.as_path();
println!("Path : {:?}", p);
{
let mut stream = stream(logger.clone(), EntryType::StructuredLogLine, 1024);
let mut stream = stream(logger.clone(), DataLogType::StructuredLogLine, 1024);
stream.log(&1u32);
stream.log(&2u32);
stream.log(&3u32);
}
let mut sections = read_datalogger(p).unwrap();
let mut sections = read_datalogger(p, Some(DataLogType::StructuredLogLine)).unwrap();
let section = sections.next().unwrap();
let mut reader = BufReader::new(&section[..]);
let v1: u32 = decode_from_reader(&mut reader, standard()).unwrap();
Expand Down
7 changes: 3 additions & 4 deletions copper_log/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@ name = "copper_log"
path = "src/lib.rs"
proc-macro = true

[[bin]]
name = "copper_log_extract"
path = "src/log_extract.rs"

[dependencies]
proc-macro2 = { version = "1.0.83" }
quote = "1.0.36"
syn = { version = "2.0.65", features = ["full"] }
rkv = { version = "0.19.0", features = ["lmdb"] }
lazy_static = { version = "1.4.0" }
bincode_derive = "2.0.0-rc.3"
bincode = "2.0.0-rc.3"
copper-value = { path = "../copper_value" }
1 change: 1 addition & 0 deletions copper_log/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ extern crate proc_macro;
use crate::index::{check_and_insert, intern_string};
use proc_macro::TokenStream;
use quote::quote;
use std::fmt::Display;
use syn::parse::Parser;
use syn::Token;
use syn::{Expr, ExprAssign, ExprLit, Lit};
Expand Down
3 changes: 0 additions & 3 deletions copper_log/src/log_extract.rs

This file was deleted.

10 changes: 10 additions & 0 deletions copper_log_reader/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "copper-log-reader"
version = "0.1.0"
edition = "2021"

[dependencies]
copper-log-runtime = { path = "../copper_log_runtime" }
rkv = { version = "0.19.0", features = ["lmdb"] }
byteorder = "1.5.0"
bincode = "2.0.0-rc.3"
70 changes: 70 additions & 0 deletions copper_log_reader/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use bincode::config::standard;
use bincode::decode_from_std_read;
use byteorder::{ByteOrder, LittleEndian};
use copper_log_runtime::CuLogEntry;
use rkv::backend::Lmdb;
use rkv::{Rkv, StoreOptions, Writer};
use std::io::Read;
use std::path::Path;

pub fn extract_copper_log_index(mut src: impl Read, index: &Path) {
let mut all_strings = Vec::<String>::new();
let env = Rkv::new::<Lmdb>(&index).unwrap();
let index_to_string = env
.open_single("index_to_string", StoreOptions::default())
.expect("Failed to open index_to_string store");
let db_eader = env.read().unwrap();
let ri = index_to_string.iter_start(&db_eader);
let mut i = ri.expect("Failed to start iterator");
while let Some(Ok(v)) = i.next() {
let (k, v) = v;
let index = LittleEndian::read_u32(&k) as usize;

if let rkv::Value::Str(s) = v {
if all_strings.len() <= index as usize {
all_strings.resize(index as usize + 1, String::new());
}

all_strings[index] = s.to_string();
println!("{} -> {}", index, s);
}
}
let entry = decode_from_std_read::<CuLogEntry, _, _>(&mut src, standard());
let entry = entry.expect("Failed to decode CuLogEntry");
println!(
"Entry: {} -> {} with params {:?}",
entry.msg_index, all_strings[entry.msg_index as usize], entry.params
);
//FIXME: Entry: 1 -> Just a string {} with params [I16(61)]
compile_error!("ici");
}

#[cfg(test)]
mod tests {

use super::*;
use std::io::Cursor;
/*
Just a string {} CuLogEntry { msg_index: 1, paramname_indexes: [0], params: [String("zarma")] }
anonymous param constants {} {} CuLogEntry { msg_index: 2, paramname_indexes: [0, 0], params: [U16(42), U8(43)] }
named param constants {} {} CuLogEntry { msg_index: 3, paramname_indexes: [4, 5], params: [I32(3), I32(2)] }
mixed named param constants, {} {} {} CuLogEntry { msg_index: 6, paramname_indexes: [0, 4, 5], params: [I32(54), I32(3), I32(2)] }
complex tuple CuLogEntry { msg_index: 7, paramname_indexes: [0], params: [Seq([I32(1), String("toto"), F64(3.34), Bool(true), Char('a')])] }
Struct CuLogEntry { msg_index: 8, paramname_indexes: [0], params: [Map({String("a"): I32(3), String("b"): I32(4)})] }
Allocations: +{}B -{}B CuLogEntry { msg_index: 1, paramname_indexes: [2, 3], params: [U64(1003932), U64(1002876)] }
AFTER CLOSE {} CuLogEntry { msg_index: 10, paramname_indexes: [0], params: [String("AFTER CLOSE")] }
*/

// const stored_log: &[u8] = include_bytes!("../test/teststructlog.copper");

#[test]
fn test_extract_copper_log() {
let hex_string = "01 01 00 01 05 7A 61 72 6D 61 02 02 00 00 02 2A 2B 03 02 04 05 02 06 04 06 03 00 04 05 03 6C 06 04 07 01 00 01 05 02 04 74 6F 74 6F B8 1E 85 EB 51 B8 0A 40 01 61 08 01 00 01 02 01 61 06 01 62 08 01 02 02 03 02 FC 9C 51 0F 00 FC 7C 4D 0F 00 0A 01 00 01 0B 41 46 54 45 52 20 43 4C 4F 53 45 43 4C 4F 53 45 00 00 00 00";
let bytes: Vec<u8> = hex_string
.split_whitespace()
.map(|s| u8::from_str_radix(s, 16).expect("Parse error"))
.collect();
let mut reader = Cursor::new(bytes.as_slice());
extract_copper_log_index(reader, Path::new("test/copper_log_index"));
}
}
Binary file not shown.
Binary file not shown.
Binary file added copper_log_reader/test/teststructlog.copper
Binary file not shown.
13 changes: 7 additions & 6 deletions copper_log_runtime/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use bincode::enc::write::Writer;
use bincode_derive::{Decode, Encode};
use copper_traits::{CuResult, Stream};
pub use copper_value as value; // Part of the API, do not remove.
pub use copper_value as value;
use copper_value::Value;
use kanal::{bounded, Sender};
use once_cell::sync::OnceCell;
Expand All @@ -16,6 +16,9 @@ static QUEUE: OnceCell<Sender<CuLogEntry>> = OnceCell::new();
#[allow(dead_code)]
pub const ANONYMOUS: u32 = 0;

/// The lifetime of this struct is the lifetime of the logger.
pub struct LoggerRuntime {}

#[derive(Debug, Encode, Decode)]
pub struct CuLogEntry {
pub msg_index: u32,
Expand Down Expand Up @@ -47,10 +50,6 @@ impl CuLogEntry {
self.params.push(param);
}
}

/// The lifetime of this struct is the lifetime of the logger.
pub struct LoggerRuntime {}

impl LoggerRuntime {
pub fn init(destination: impl Stream + 'static) -> Self {
QUEUE
Expand Down Expand Up @@ -79,7 +78,9 @@ fn initialize_queue(mut destination: impl Stream + 'static) -> Sender<CuLogEntry

let handle = thread::spawn(move || loop {
if let Ok(data) = receiver.recv() {
destination.log(&data);
if let Err(err) = destination.log(&data) {
eprintln!("Failed to log data: {}", err);
}
} else {
break;
}
Expand Down
10 changes: 5 additions & 5 deletions copper_log_test/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use copper_datalogger::{stream, DataLogger, EntryType};
use copper::DataLogType;
use copper_datalogger::{stream, DataLogger};
use copper_log::debug;
use copper_log_runtime::LoggerRuntime;
use copper_value::to_value;
Expand All @@ -11,7 +12,7 @@ fn main() {
let data_logger = Arc::new(Mutex::new(
DataLogger::new(path.as_path(), Some(100000)).expect("Failed to create logger"),
));
let mut stream = stream(data_logger.clone(), EntryType::StructuredLogLine, 1024);
let mut stream = stream(data_logger.clone(), DataLogType::StructuredLogLine, 1024);
let rt = LoggerRuntime::init(stream);
#[derive(Serialize)]
struct Test {
Expand All @@ -22,13 +23,12 @@ fn main() {
{
let hop = copper::monitoring::ScopedAllocCounter::new();
let gigantic_vec = vec![0u8; 1_000_000];
debug!("Just a string");
debug!("anonymous param constants {} {}", 3u16, 2u8);
debug!("Just a string {}", "zarma");
debug!("anonymous param constants {} {}", 42u16, 43u8);
debug!("named param constants {} {}", a = 3, b = 2);
debug!("mixed named param constants, {} {} {}", a = 3, 54, b = 2);
debug!("complex tuple", mytuple);
debug!("Struct", Test { a: 3, b: 4 });
debug!("u8", gigantic_vec[999999]);
}
debug!(" AFTER CLOSE {} ", "AFTER CLOSE");
rt.close();
Expand Down
8 changes: 7 additions & 1 deletion copper_traits/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use bincode::Encode;
use bincode::{Decode as dDecode, Encode, Encode as dEncode};
use std::error::Error;
use std::fmt::{Display, Formatter};

Expand Down Expand Up @@ -54,3 +54,9 @@ pub type CuResult<T> = Result<T, CuError>;
pub trait Stream: Sync + Send {
fn log(&mut self, obj: &impl Encode) -> CuResult<()>;
}

#[derive(dEncode, dDecode, Copy, Clone, Debug, PartialEq)]
pub enum DataLogType {
StructuredLogLine,
CopperList,
}

0 comments on commit 0e32e4c

Please sign in to comment.