Skip to content

Commit

Permalink
datalogger: basic write working
Browse files Browse the repository at this point in the history
  • Loading branch information
gbin committed May 29, 2024
1 parent 5bc34b5 commit 918c065
Showing 1 changed file with 100 additions and 19 deletions.
119 changes: 100 additions & 19 deletions copper_datalogger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ use libc;
use bincode::encode_into_slice;
use bincode::error::EncodeError;
use bincode::Encode;
use bincode::{decode_from_reader, decode_from_slice};
use bincode_derive::Decode as dDecode;
use bincode_derive::Encode as dEncode;

use bincode::config::standard;
use bincode::config::{standard, Configuration};
use bincode::de::read::Reader;
use memmap2::{MmapMut, RemapOptions};
use std::fs::{File, OpenOptions};
use std::io;
use std::io::{BufReader, Seek};
use std::io::{Read, SeekFrom};
use std::path::Path;
use std::slice::from_raw_parts_mut;
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -37,23 +41,23 @@ struct SectionHeader {
section_size: u32, // offset of section_magic + section_size -> should be the index of the next section_magic
}

pub trait Stream<'a> {
fn log(&'a mut self, obj: &impl Encode);
pub trait Stream {
fn log(&mut self, obj: &impl Encode);
}

struct MmapStream<'a> {
struct MmapStream {
entry_type: EntryType,
parent_logger: Arc<Mutex<DataLogger>>,
current_slice: &'a mut [u8],
current_slice: &'static mut [u8],
current_position: usize,
minimum_allocation_amount: usize,
}

impl<'a> MmapStream<'a> {
impl MmapStream {
fn new(
entry_type: EntryType,
parent_logger: Arc<Mutex<DataLogger>>,
current_slice: &'a mut [u8],
current_slice: &'static mut [u8],
minimum_allocation_amount: usize,
) -> Self {
Self {
Expand All @@ -66,8 +70,8 @@ impl<'a> MmapStream<'a> {
}
}

impl<'a> Stream<'a> for MmapStream<'a> {
fn log(&'a mut self, obj: &impl Encode) {
impl Stream for MmapStream {
fn log(&mut self, obj: &impl Encode) {
let result = encode_into_slice(
obj,
&mut self.current_slice[self.current_position..],
Expand Down Expand Up @@ -95,7 +99,7 @@ impl<'a> Stream<'a> for MmapStream<'a> {
}
}

impl Drop for MmapStream<'_> {
impl Drop for MmapStream {
fn drop(&mut self) {
let mut logger_guard = self.parent_logger.lock().unwrap();
logger_guard.unlock_section(self.current_slice);
Expand All @@ -106,7 +110,7 @@ pub fn stream(
logger: Arc<Mutex<DataLogger>>,
entry_type: EntryType,
minimum_allocation_amount: usize,
) -> impl Stream<'static> {
) -> impl Stream {
let clone = logger.clone();
let mut logger = logger.lock().unwrap();
let underlying_slice = logger.add_section(entry_type, minimum_allocation_amount);
Expand Down Expand Up @@ -251,16 +255,67 @@ impl Drop for DataLogger {
}
}

// Section iterator returning the [u8] of the current section
pub struct SectionIterator {
reader: BufReader<File>,
}

impl Iterator for SectionIterator {
type Item = Vec<u8>;

fn next(&mut self) -> Option<Self::Item> {
let section_header: SectionHeader = decode_from_reader(&mut self.reader, standard())
.expect("Failed to decode section header");
let mut section = vec![0; section_header.section_size as usize];
self.reader
.read_exact(section.as_mut_slice())
.expect("Failed to read section");
Some(section)
}
}

// make an iterator to read back a serialzed datalogger from a file
pub fn read_datalogger(file_path: &Path) -> io::Result<impl Iterator<Item = Vec<u8>>> {
let mut file = OpenOptions::new().read(true).open(file_path)?;
let mut header = [0; 4096];
let s = file.read(&mut header).unwrap();
if s < 4096 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Failed to read main header",
));
}

let main_header: MainHeader;
let read: usize;
(main_header, read) =
decode_from_slice(&header[..], standard()).expect("Failed to decode main header");

if main_header.magic != MAIN_MAGIC {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid magic number in main header",
));
}

Ok(SectionIterator {
reader: BufReader::new(file),
})
}

#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
use tempfile::TempDir;
fn make_a_logger() -> Arc<Mutex<DataLogger>> {
let tmp_dir = TempDir::new().expect("could not create a tmp dir");
fn make_a_logger(tmp_dir: &TempDir) -> (Arc<Mutex<DataLogger>>, PathBuf) {
let file_path = tmp_dir.path().join("test.bin");
Arc::new(Mutex::new(
DataLogger::new(&file_path, Some(100000)).expect("Failed to create logger"),
))
(
Arc::new(Mutex::new(
DataLogger::new(&file_path, Some(100000)).expect("Failed to create logger"),
)),
file_path,
)
}

#[test]
Expand Down Expand Up @@ -289,7 +344,8 @@ mod tests {

#[test]
fn test_one_section_self_cleaning() {
let logger = make_a_logger();
let tmp_dir = TempDir::new().expect("could not create a tmp dir");
let (logger, _) = make_a_logger(&tmp_dir);
{
let stream = stream(logger.clone(), EntryType::StructuredLogLine, 1024);
assert_eq!(logger.lock().unwrap().sections_in_flight.len(), 1);
Expand All @@ -301,7 +357,8 @@ mod tests {

#[test]
fn test_two_sections_self_cleaning_in_order() {
let logger = make_a_logger();
let tmp_dir = TempDir::new().expect("could not create a tmp dir");
let (logger, _) = make_a_logger(&tmp_dir);
let s1 = stream(logger.clone(), EntryType::StructuredLogLine, 1024);
assert_eq!(logger.lock().unwrap().sections_in_flight.len(), 1);
let s2 = stream(logger.clone(), EntryType::StructuredLogLine, 1024);
Expand All @@ -316,7 +373,8 @@ mod tests {

#[test]
fn test_two_sections_self_cleaning_out_of_order() {
let logger = make_a_logger();
let tmp_dir = TempDir::new().expect("could not create a tmp dir");
let (logger, _) = make_a_logger(&tmp_dir);
let s1 = stream(logger.clone(), EntryType::StructuredLogLine, 1024);
assert_eq!(logger.lock().unwrap().sections_in_flight.len(), 1);
let s2 = stream(logger.clone(), EntryType::StructuredLogLine, 1024);
Expand All @@ -328,4 +386,27 @@ mod tests {
assert_eq!(lg.sections_in_flight.len(), 0);
assert_eq!(lg.flushed_until, lg.current_global_position);
}

#[test]
fn test_write_then_read_one_section() {
let tmp_dir = TempDir::new().expect("could not create a tmp dir");
let (logger, f) = make_a_logger(&tmp_dir);
let p = f.as_path();
println!("Path : {:?}", p);
{
let mut stream = stream(logger.clone(), EntryType::StructuredLogLine, 1024);
stream.log(&1u32);
stream.log(&2u32);
stream.log(&3u32);
}
let mut sections = read_datalogger(p).unwrap();
let section = sections.next().unwrap();
let mut reader = BufReader::new(&section[..]);
let v1: u32 = decode_from_reader(&mut reader, standard()).unwrap();
let v2: u32 = decode_from_reader(&mut reader, standard()).unwrap();
let v3: u32 = decode_from_reader(&mut reader, standard()).unwrap();
assert_eq!(v1, 1);
assert_eq!(v2, 2);
assert_eq!(v3, 3);
}
}

0 comments on commit 918c065

Please sign in to comment.