Skip to content

Commit

Permalink
packet metrics work (#18)
Browse files Browse the repository at this point in the history
* allow hexdump in info mode

* collect more packet stream stats

add initial tr101290 and iat, bitrate storage histograms.

WIP

* hook in tr101290 and other metrics

* keep track of bitrate by start time and bits

* update packet information per packet

* revert previous change

* try packet stats again

* remove duplicate code

* comment out redundant print

* comment out and fix later

---------

Co-authored-by: Chris Kennedy <[email protected]>
  • Loading branch information
groovybits and Chris Kennedy authored Dec 12, 2023
1 parent f5c84f1 commit 34bab3e
Showing 1 changed file with 97 additions and 19 deletions.
116 changes: 97 additions & 19 deletions src/bin/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ struct StreamData {
iat: u64,
error_count: u32,
last_arrival_time: u64,
start_time: u64, // field for start time
total_bits: u64, // field for total bits
data: Vec<u8>, // The actual MPEG-TS packet data
}

Expand All @@ -85,44 +87,110 @@ impl StreamData {
iat,
error_count,
last_arrival_time,
start_time: timestamp, // Initialize start time
total_bits: 0, // Initialize total bits
data: packet.to_vec(),
}
}

fn update_stats(&mut self, packet_size: usize, arrival_time: u64) {
// Update bitrate, IAT, etc.
// Example calculation for bitrate (simplified)
self.bitrate += packet_size as u32 * 8; // Convert bytes to bits
let bits = packet_size as u64 * 8; // Convert bytes to bits

// Elapsed time in milliseconds
let elapsed_time_ms = arrival_time - self.start_time;

if elapsed_time_ms > 0 {
let elapsed_time_sec = elapsed_time_ms as f64 / 1000.0;
self.bitrate = (self.total_bits as f64 / elapsed_time_sec) as u32;
}

// Example calculation for IAT
self.total_bits += bits; // Accumulate total bits

// IAT calculation remains the same
let iat = arrival_time - self.last_arrival_time;
self.iat = iat;

// Update last arrival time
self.last_arrival_time = arrival_time;
}
}

struct Tr101290Errors {
sync_byte_errors: u32,
transport_error_indicator_errors: u32,
continuity_counter_errors: u32,
// ... other error types ...
}

impl Tr101290Errors {
fn new() -> Self {
Tr101290Errors {
sync_byte_errors: 0,
transport_error_indicator_errors: 0,
continuity_counter_errors: 0,
// ... initialize other errors ...
}
}

fn log_errors(&self) {
// Log the error counts for monitoring
if self.sync_byte_errors > 0 {
error!("Sync byte errors: {}", self.sync_byte_errors);
}
if self.transport_error_indicator_errors > 0 {
error!("Transport Error Indicator errors: {}", self.transport_error_indicator_errors);
}
if self.continuity_counter_errors > 0 {
error!("Continuity counter errors: {}", self.continuity_counter_errors);
}
// ... log other errors ...
}
}

// TR 101 290 Priority 1 Check Example
fn tr101290_p1_check(packet: &[u8]) -> Result<(), String> {
// Implement checks like sync byte verification, transport error indicator check, etc.
fn tr101290_p1_check(packet: &[u8], errors: &mut Tr101290Errors) {
if packet[0] != 0x47 {
return Err("Sync byte error".to_string());
errors.sync_byte_errors += 1;
}

// Additional checks...
if (packet[1] & 0x80) != 0 {
errors.transport_error_indicator_errors += 1;
}

Ok(())
// ... other checks, updating the respective counters ...
}

// Invoke this function for each MPEG-TS packet
fn process_packet(packet: &[u8]) {
if let Err(e) = tr101290_p1_check(packet) {
// Handle error, log it, etc.
println!("TR 101 290 Error: {}", e);
}
fn process_packet(packet: &[u8], errors: &mut Tr101290Errors) {
tr101290_p1_check(packet, errors);

let pid = extract_pid(packet);
let arrival_time = current_unix_timestamp_ms().unwrap_or(0);

// Further processing...
// Use a scope to limit the duration of the lock
/*{
let mut pid_map = PID_MAP.lock().unwrap();
// Check if the PID map already has an entry for this PID
match pid_map.get_mut(&pid) {
Some(stream_data) => {
// Existing StreamData instance found, update it
stream_data.update_stats(packet.len(), arrival_time);
},
None => {
// New StreamData instance needs to be created
let stream_type = determine_stream_type(pid); // Determine stream type
let new_stream_data = StreamData::new(packet, pid, stream_type, arrival_time, 0);
pid_map.insert(pid, new_stream_data);
}
}
}*/

// Log outside of the lock scope
/*let pid_map = PID_MAP.lock().unwrap();
if let Some(stream_data) = pid_map.get(&pid) {
info!("PID: {}, Type: {}, Bitrate: {} bps, IAT: {} ms, Errors: {}, CC: {}, Timestamp: {} ms",
stream_data.pid, stream_data.stream_type, stream_data.bitrate, stream_data.iat,
stream_data.error_count, stream_data.continuity_counter, stream_data.timestamp);
}*/
}

// Function to get the current Unix timestamp in milliseconds
Expand Down Expand Up @@ -538,6 +606,9 @@ async fn main() {
}
});

// Perform TR 101 290 checks
let mut tr101290_errors = Tr101290Errors::new();

// Start packet capture
let mut batch = Vec::new();
loop {
Expand Down Expand Up @@ -576,11 +647,18 @@ async fn main() {
if let Some(stream_data) = pid_map.get_mut(&pid) {
if let Ok(arrival_time) = current_unix_timestamp_ms() {
stream_data.update_stats(stream_data.data.len(), arrival_time);
// Log the updated metrics
info!("PID: {}, Type: {}, Bitrate: {} bps, IAT: {} ms, Errors: {}, CC: {}, Timestamp: {} ms",
stream_data.pid, stream_data.stream_type, stream_data.bitrate, stream_data.iat,
stream_data.error_count, stream_data.continuity_counter, stream_data.timestamp);
}
}

// Perform TR 101 290 checks
process_packet(&stream_data.data);
// In your packet processing loop:
process_packet(&stream_data.data, &mut tr101290_errors);

// Periodically, or at the end of the processing:
tr101290_errors.log_errors();

// print out the stream data parts outside of the .data
debug!("PID: {}, Stream Type: {}, Continuity Counter: {}, Timestamp: {} ms",
Expand Down

0 comments on commit 34bab3e

Please sign in to comment.