diff --git a/Cargo.toml b/Cargo.toml index 716774fd..8fad47b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,3 +51,4 @@ image = "0.25.1" base64 = "0.22.0" imageproc = "0.24.0" datetime = "0.5.2" +crc = "3.2.1" diff --git a/src/stream_data.rs b/src/stream_data.rs index c0f5e723..5bb1808c 100644 --- a/src/stream_data.rs +++ b/src/stream_data.rs @@ -4,8 +4,8 @@ * Data structure for the stream data */ +use crc::{Crc, CRC_32_ISO_HDLC}; use std::fs::OpenOptions; -//use std::fs::File; use crate::current_unix_timestamp_ms; use ahash::AHashMap; #[cfg(feature = "gst")] @@ -1069,23 +1069,131 @@ impl Tr101290Errors { } // TR 101 290 Priority 1 Check -pub fn tr101290_p1_check(packet: &[u8], errors: &mut Tr101290Errors) { - // p1 +pub fn tr101290_p1_check(packet: &[u8], errors: &mut Tr101290Errors, pid: u16, continuity_counter: u8) { + // TS sync byte error + if packet[0] != 0x47 { + errors.ts_sync_byte_errors += 1; + } + + // Sync byte error if packet[0] != 0x47 { errors.sync_byte_errors += 1; } - // TODO: ... other checks, updating the respective counters ... + // Continuity counter error + if pid != 0x1FFF { + let prev_continuity_counter = PID_MAP.read().unwrap().get(&pid).map(|stream_data| stream_data.continuity_counter).unwrap_or(0); + if continuity_counter != (prev_continuity_counter + 1) % 16 { + errors.continuity_counter_errors += 1; + } + } + + // PAT error + if pid == 0 { + let section_syntax_indicator = (packet[1] & 0x80) != 0; + let section_length = (((packet[1] & 0x0F) as u16) << 8) | packet[2] as u16; + if !section_syntax_indicator || section_length < 13 { + errors.pat_errors += 1; + } + } + + // PMT error + if pid == 0x1000 { + let section_syntax_indicator = (packet[1] & 0x80) != 0; + let section_length = (((packet[1] & 0x0F) as u16) << 8) | packet[2] as u16; + if !section_syntax_indicator || section_length < 17 { + errors.pmt_errors += 1; + } + } + + // PID map error + if !PID_MAP.read().unwrap().contains_key(&pid) { + errors.pid_map_errors += 1; + } } // TR 101 290 Priority 2 Check pub fn tr101290_p2_check(packet: &[u8], errors: &mut Tr101290Errors) { - // p2 - + // Transport error indicator if (packet[1] & 0x80) != 0 { errors.transport_error_indicator_errors += 1; } - // TODO: ... other checks, updating the respective counters ... + + // CRC error + let has_adaptation_field = (packet[3] & 0x20) != 0; + let has_payload = (packet[3] & 0x10) != 0; + if has_adaptation_field && has_payload { + let adaptation_field_length = packet[4] as usize; + let payload_start = 5 + adaptation_field_length; + let crc32 = u32::from_be_bytes([ + packet[payload_start - 4], + packet[payload_start - 3], + packet[payload_start - 2], + packet[payload_start - 1], + ]); + let crc32_calculator = Crc::::new(&CRC_32_ISO_HDLC); + let calculated_crc32 = crc32_calculator.checksum(&packet[payload_start..]); + if crc32 != calculated_crc32 { + errors.crc_errors += 1; + } + } + + // PCR repetition error + let has_pcr = (packet[5] & 0x10) != 0; + if has_pcr { + let pcr_base = u64::from_be_bytes([0, 0, packet[6], packet[7], packet[8], packet[9], packet[10] & 0xFE, 0]); + let pcr_ext = (((packet[10] & 0x01) as u64) << 8) | (packet[11] as u64); + let pcr = pcr_base * 300 + pcr_ext; + let max_pcr_repetition_interval = 40_000; // 40 ms in PCR units (27 MHz) + let prev_pcr = PID_MAP.read().unwrap().get(&0).map(|stream_data| stream_data.timestamp).unwrap_or(0); + if pcr - prev_pcr > max_pcr_repetition_interval { + errors.pcr_repetition_errors += 1; + } + } + + // PCR discontinuity indicator error + let has_discontinuity_indicator = (packet[5] & 0x80) != 0; + if has_discontinuity_indicator { + let prev_discontinuity_state = PID_MAP.read().unwrap().get(&0).map(|stream_data| stream_data.rtp_timestamp).unwrap_or(0) & 0x01; + if prev_discontinuity_state == 1 { + errors.pcr_discontinuity_indicator_errors += 1; + } + } + + // PCR accuracy error + if has_pcr { + let pcr_base = u64::from_be_bytes([0, 0, packet[6], packet[7], packet[8], packet[9], packet[10] & 0xFE, 0]); + let pcr_ext = (((packet[10] & 0x01) as u64) << 8) | (packet[11] as u64); + let pcr = pcr_base * 300 + pcr_ext; + let max_pcr_accuracy_error = 500; // 500 nanoseconds + let prev_pcr = PID_MAP.read().unwrap().get(&0).map(|stream_data| stream_data.timestamp).unwrap_or(0); + if (pcr as i64 - prev_pcr as i64).abs() > max_pcr_accuracy_error { + errors.pcr_accuracy_errors += 1; + } + } + + // PTS error + let has_pts = (packet[7] & 0x80) != 0; + if has_pts { + let pts_high = u64::from_be_bytes([0, 0, 0, packet[9] & 0x0E, packet[10], packet[11], packet[12], packet[13] & 0xFE]); + let pts_low = (((packet[13] & 0x01) as u64) << 8) | (packet[14] as u64); + let pts = pts_high | pts_low; + let max_pts_interval = 700_000; // 700 ms in PTS units (90 kHz) + let prev_pts = PID_MAP.read().unwrap().get(&0).map(|stream_data| stream_data.rtp_timestamp).unwrap_or(0) as u64; + if pts < prev_pts || pts - prev_pts > max_pts_interval { + errors.pts_errors += 1; + } + } + + // CAT error + let has_cat = packet[3] == 0x01; + if has_cat { + let section_syntax_indicator = (packet[1] & 0x80) != 0; + let section_length = (((packet[1] & 0x0F) as u16) << 8) | (packet[2] as u16); + if !section_syntax_indicator || section_length < 9 { + errors.cat_errors += 1; + } + } } // Implement a function to extract PID from a packet @@ -1224,7 +1332,7 @@ pub fn process_packet( ) { let packet: &[u8] = &stream_data_packet.packet[stream_data_packet.packet_start ..stream_data_packet.packet_start + stream_data_packet.packet_len]; - tr101290_p1_check(packet, errors); + tr101290_p1_check(packet, errors, stream_data_packet.pid, stream_data_packet.continuity_counter); tr101290_p2_check(packet, errors); let pid = stream_data_packet.pid;