diff --git a/src/bin/probe.rs b/src/bin/probe.rs index e40a0d1a..379be0ed 100644 --- a/src/bin/probe.rs +++ b/src/bin/probe.rs @@ -66,6 +66,8 @@ struct StreamData { iat: u64, error_count: u32, last_arrival_time: u64, + start_time: u64, // field for start time + total_bits: u64, // field for total bits data: Vec, // The actual MPEG-TS packet data } @@ -85,44 +87,110 @@ impl StreamData { iat, error_count, last_arrival_time, + start_time: timestamp, // Initialize start time + total_bits: 0, // Initialize total bits data: packet.to_vec(), } } - fn update_stats(&mut self, packet_size: usize, arrival_time: u64) { - // Update bitrate, IAT, etc. - // Example calculation for bitrate (simplified) - self.bitrate += packet_size as u32 * 8; // Convert bytes to bits + let bits = packet_size as u64 * 8; // Convert bytes to bits + + // Elapsed time in milliseconds + let elapsed_time_ms = arrival_time - self.start_time; + + if elapsed_time_ms > 0 { + let elapsed_time_sec = elapsed_time_ms as f64 / 1000.0; + self.bitrate = (self.total_bits as f64 / elapsed_time_sec) as u32; + } - // Example calculation for IAT + self.total_bits += bits; // Accumulate total bits + + // IAT calculation remains the same let iat = arrival_time - self.last_arrival_time; self.iat = iat; - // Update last arrival time self.last_arrival_time = arrival_time; } } +struct Tr101290Errors { + sync_byte_errors: u32, + transport_error_indicator_errors: u32, + continuity_counter_errors: u32, + // ... other error types ... +} + +impl Tr101290Errors { + fn new() -> Self { + Tr101290Errors { + sync_byte_errors: 0, + transport_error_indicator_errors: 0, + continuity_counter_errors: 0, + // ... initialize other errors ... + } + } + + fn log_errors(&self) { + // Log the error counts for monitoring + if self.sync_byte_errors > 0 { + error!("Sync byte errors: {}", self.sync_byte_errors); + } + if self.transport_error_indicator_errors > 0 { + error!("Transport Error Indicator errors: {}", self.transport_error_indicator_errors); + } + if self.continuity_counter_errors > 0 { + error!("Continuity counter errors: {}", self.continuity_counter_errors); + } + // ... log other errors ... + } +} + // TR 101 290 Priority 1 Check Example -fn tr101290_p1_check(packet: &[u8]) -> Result<(), String> { - // Implement checks like sync byte verification, transport error indicator check, etc. +fn tr101290_p1_check(packet: &[u8], errors: &mut Tr101290Errors) { if packet[0] != 0x47 { - return Err("Sync byte error".to_string()); + errors.sync_byte_errors += 1; } - // Additional checks... + if (packet[1] & 0x80) != 0 { + errors.transport_error_indicator_errors += 1; + } - Ok(()) + // ... other checks, updating the respective counters ... } // Invoke this function for each MPEG-TS packet -fn process_packet(packet: &[u8]) { - if let Err(e) = tr101290_p1_check(packet) { - // Handle error, log it, etc. - println!("TR 101 290 Error: {}", e); - } +fn process_packet(packet: &[u8], errors: &mut Tr101290Errors) { + tr101290_p1_check(packet, errors); + + let pid = extract_pid(packet); + let arrival_time = current_unix_timestamp_ms().unwrap_or(0); - // Further processing... + // Use a scope to limit the duration of the lock + /*{ + let mut pid_map = PID_MAP.lock().unwrap(); + + // Check if the PID map already has an entry for this PID + match pid_map.get_mut(&pid) { + Some(stream_data) => { + // Existing StreamData instance found, update it + stream_data.update_stats(packet.len(), arrival_time); + }, + None => { + // New StreamData instance needs to be created + let stream_type = determine_stream_type(pid); // Determine stream type + let new_stream_data = StreamData::new(packet, pid, stream_type, arrival_time, 0); + pid_map.insert(pid, new_stream_data); + } + } + }*/ + + // Log outside of the lock scope + /*let pid_map = PID_MAP.lock().unwrap(); + if let Some(stream_data) = pid_map.get(&pid) { + info!("PID: {}, Type: {}, Bitrate: {} bps, IAT: {} ms, Errors: {}, CC: {}, Timestamp: {} ms", + stream_data.pid, stream_data.stream_type, stream_data.bitrate, stream_data.iat, + stream_data.error_count, stream_data.continuity_counter, stream_data.timestamp); + }*/ } // Function to get the current Unix timestamp in milliseconds @@ -538,6 +606,9 @@ async fn main() { } }); + // Perform TR 101 290 checks + let mut tr101290_errors = Tr101290Errors::new(); + // Start packet capture let mut batch = Vec::new(); loop { @@ -576,11 +647,18 @@ async fn main() { if let Some(stream_data) = pid_map.get_mut(&pid) { if let Ok(arrival_time) = current_unix_timestamp_ms() { stream_data.update_stats(stream_data.data.len(), arrival_time); + // Log the updated metrics + info!("PID: {}, Type: {}, Bitrate: {} bps, IAT: {} ms, Errors: {}, CC: {}, Timestamp: {} ms", + stream_data.pid, stream_data.stream_type, stream_data.bitrate, stream_data.iat, + stream_data.error_count, stream_data.continuity_counter, stream_data.timestamp); } } - // Perform TR 101 290 checks - process_packet(&stream_data.data); + // In your packet processing loop: + process_packet(&stream_data.data, &mut tr101290_errors); + + // Periodically, or at the end of the processing: + tr101290_errors.log_errors(); // print out the stream data parts outside of the .data debug!("PID: {}, Stream Type: {}, Continuity Counter: {}, Timestamp: {} ms",