diff --git a/Cargo.toml b/Cargo.toml index 8ddf255..e061ffd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ license-file = "LICENSE" homepage = "https://github.com/groovybits/rscap" repository = "https://github.com/groovybits/rscap" authors = ["Chris Kennedy"] -version = "0.7.6" +version = "0.7.7" edition = "2021" [lib] diff --git a/specs/rsprobe.spec b/specs/rsprobe.spec index 5a538b7..3249477 100755 --- a/specs/rsprobe.spec +++ b/specs/rsprobe.spec @@ -1,5 +1,5 @@ Name: rsprobe -Version: 0.7.6 +Version: 0.7.7 Release: 1%{?dist} Summary: MpegTS Stream Analysis Probe with Kafka and GStreamer License: MIT diff --git a/src/bin/probe.rs b/src/bin/probe.rs index 11e5e98..12a553a 100644 --- a/src/bin/probe.rs +++ b/src/bin/probe.rs @@ -494,7 +494,7 @@ async fn send_to_kafka( #[derive(Parser, Debug)] #[clap( author = "Chris Kennedy", - version = "0.7.6", + version = "0.7.7", about = "MpegTS Stream Analysis Probe with Kafka and GStreamer" )] struct Args { @@ -1677,6 +1677,7 @@ async fn rsprobe(running: Arc) { let mut pmt_info: PmtInfo = PmtInfo { pid: 0xFFFF, packet: Vec::new(), + program_number: 0, }; let mut pmt_pid: Option = Some(0xFFFF); let mut program_number: Option = Some(0xFFFF); diff --git a/src/stream_data.rs b/src/stream_data.rs index 3e4f702..2ca00dc 100644 --- a/src/stream_data.rs +++ b/src/stream_data.rs @@ -403,6 +403,7 @@ pub fn pull_images( pub const PAT_PID: u16 = 0; pub const TS_PACKET_SIZE: usize = 188; +#[derive(Debug)] pub struct PatEntry { pub program_number: u16, pub pmt_pid: u16, @@ -412,10 +413,18 @@ pub struct PmtEntry { pub stream_pid: u16, pub stream_type: u8, // Stream type (e.g., 0x02 for MPEG video) pub program_number: u16, + pub descriptors: Vec, } pub struct Pmt { pub entries: Vec, + pub descriptors: Vec, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Descriptor { + pub tag: u8, + pub data: Vec, } #[derive(Clone, PartialEq)] @@ -1167,6 +1176,7 @@ pub fn extract_pid(packet: &[u8]) -> u16 { pub struct PmtInfo { pub pid: u16, pub packet: Vec, + pub program_number: u16, } // Helper function to parse PAT and update global PAT packet storage @@ -1175,104 +1185,177 @@ pub fn parse_and_store_pat(packet: &[u8]) -> PmtInfo { let mut pmt_info = PmtInfo { pid: 0xFFFF, packet: Vec::new(), + program_number: 0, }; pmt_info.packet = packet.to_vec(); - // loook for the program that is non zero and below 0x1FFF + debug!("ParseAndStorePAT: Found {} PAT entries {:?}", pat_entries.len(), pat_entries); + + let mut found_pmt = false; + + // look for the PMT Pid and Program Number that are greater zero and less than 0x1FFF for PMT PID for entry in pat_entries { - if entry.pmt_pid != 0 && entry.pmt_pid < 0x1FFF { - pmt_info.pid = entry.pmt_pid; - break; + if entry.pmt_pid > 0 && entry.pmt_pid <= 0x1FFF { + if entry.pmt_pid > 0 && entry.pmt_pid <= 0x1FFF && entry.program_number > 0 { + // TODO: return an array of all valid PMT PIDs and Program Numbers + pmt_info.pid = entry.pmt_pid; + pmt_info.program_number = entry.program_number; + + debug!( + "ParseAndStorePAT: Found Program Number: {} PMT PID: {}", + entry.program_number, entry.pmt_pid + ); + found_pmt = true; + } else { + log::warn!( + "ParseAndStorePAT: PMT Pid OUT OF RANGE, Skipping Program Number: {} PMT PID: {}", + entry.program_number, entry.pmt_pid + ); + } } } + // print info on if we found a valid PMT PID or not + if found_pmt { + debug!("ParseAndStorePAT: Found PMT PID: {} Program Number: {}", pmt_info.pid, pmt_info.program_number); + } pmt_info } +// Helper function to extract descriptors with bounds checking +fn parse_descriptors(packet: &[u8], offset: usize, length: usize) -> Vec { + let mut descriptors = Vec::new(); + let mut i = offset; + + // Ensure that the descriptor parsing does not exceed packet bounds + while i < offset + length && i + 2 <= packet.len() { + let tag = packet[i]; + let len = packet[i + 1] as usize; + + // Check if the remaining data is sufficient to read the descriptor + if i + 2 + len <= packet.len() { + let data = packet[i + 2..i + 2 + len].to_vec(); + descriptors.push(Descriptor { tag, data }); + } else { + // If there's not enough data for the descriptor, break the loop + break; + } + + i += 2 + len; + } + + descriptors +} + +// Parse PAT packets pub fn parse_pat(packet: &[u8]) -> Vec { let mut entries = Vec::new(); - // Check for minimum packet size if packet.len() < TS_PACKET_SIZE { return entries; } - // Check if Payload Unit Start Indicator (PUSI) is set let pusi = (packet[1] & 0x40) != 0; if !pusi { - // If Payload Unit Start Indicator is not set, this packet does not start a new PAT return entries; } let adaptation_field_control = (packet[3] & 0x30) >> 4; - let mut offset = 4; // start after TS header + let mut offset = 4; - // Check for adaptation field and skip it if adaptation_field_control == 0x02 || adaptation_field_control == 0x03 { let adaptation_field_length = packet[4] as usize; - offset += 1 + adaptation_field_length; // +1 for the length byte itself + offset += 1 + adaptation_field_length; } - // Pointer field indicates the start of the PAT section let pointer_field = packet[offset] as usize; - offset += 1 + pointer_field; // Skip pointer field + offset += 1 + pointer_field; - // Now, 'offset' points to the start of the PAT section while offset + 4 <= packet.len() { let program_number = ((packet[offset] as u16) << 8) | (packet[offset + 1] as u16); let pmt_pid = (((packet[offset + 2] as u16) & 0x1F) << 8) | (packet[offset + 3] as u16); - // Only add valid entries (non-zero program_number and pmt_pid) - if program_number != 0 && pmt_pid != 0 && pmt_pid < 0x1FFF && program_number < 100 { + if program_number > 0 && pmt_pid > 0 && pmt_pid <= 0x1FFF && program_number < 5000 { entries.push(PatEntry { program_number, pmt_pid, }); + debug!( + "ParsePAT: Found Program Number: {} PMT PID: {}", + program_number, pmt_pid + ); + } else { + if program_number <= 0 { + debug!( + "ParsePAT: Skipping Program Number <= 0: {} PMT PID: {}", + program_number, pmt_pid + ); + } else if pmt_pid <= 0 { + debug!( + "ParsePAT: Skipping PMT PID <= 0: {} Program Number: {}", + pmt_pid, program_number + ); + } else if pmt_pid > 0x1FFF { + debug!( + "ParsePAT: Skipping PMT PID >= 0x1FFF: {} Program Number: {}", + pmt_pid, program_number + ); + } } - debug!( - "ParsePAT: Program Number: {} PMT PID: {}", - program_number, pmt_pid - ); - - offset += 4; // Move to the next PAT entry + offset += 4; } entries } +// Parse PMT packets pub fn parse_pmt(packet: &[u8]) -> Pmt { let mut entries = Vec::new(); - let program_number = ((packet[8] as u16) << 8) | (packet[9] as u16); - - // Calculate the starting position for stream entries - let section_length = (((packet[6] as usize) & 0x0F) << 8) | packet[7] as usize; - let program_info_length = (((packet[15] as usize) & 0x0F) << 8) | packet[16] as usize; - let mut i = 17 + program_info_length; // Starting index of the first stream in the PMT - - debug!( - "ParsePMT: Program Number: {} PMT PID: {} starting at position {}", - program_number, - extract_pid(packet), - i - ); - while i + 5 <= packet.len() && i < 17 + section_length - 4 { - let stream_type = packet[i]; - let stream_pid = (((packet[i + 1] as u16) & 0x1F) << 8) | (packet[i + 2] as u16); - let es_info_length = (((packet[i + 3] as usize) & 0x0F) << 8) | packet[i + 4] as usize; - i += 5 + es_info_length; // Update index to point to next stream's info + + let adaptation_field_control = (packet[3] & 0x30) >> 4; + let mut offset = 0; + + if adaptation_field_control == 0x02 || adaptation_field_control == 0x03 { + let adaptation_field_length = packet[4] as usize; + offset += 1 + adaptation_field_length; + } + + if offset + 4 > packet.len() { + error!("ParsePMT: Packet size is incorrect: {}", packet.len()); + return Pmt { + entries, + descriptors: Vec::new(), + }; + } + + let program_number = ((packet[8 + offset] as u16) << 8) | (packet[9 + offset] as u16); + + let section_length = (((packet[6+ offset] as usize) & 0x0F) << 8) | packet[7 + offset] as usize; + let program_info_length = (((packet[15 + offset] as usize) & 0x0F) << 8) | packet[16 + offset] as usize; + let mut i = 17 + program_info_length; + + // Parse program descriptors + let descriptors = parse_descriptors(packet, 17 + offset, program_info_length); + + while i + 5 + offset <= packet.len() && i < 17 + section_length - 4 { + let stream_type = packet[i + offset]; + let stream_pid = (((packet[i + 1 + offset] as u16) & 0x1F) << 8) | (packet[i + 2 + offset] as u16); + let es_info_length = (((packet[i + 3 + offset] as usize) & 0x0F) << 8) | packet[i + 4 + offset] as usize; + + // Parse ES descriptors + let es_descriptors = parse_descriptors(packet, i + 5 + offset, es_info_length); entries.push(PmtEntry { stream_pid, stream_type, program_number, + descriptors: es_descriptors, }); - debug!( - "ParsePMT: ProgramNumber: {}, Stream PID: {}, Stream Type: {}", - program_number, stream_pid, stream_type - ); + + i += 5 + es_info_length; } - Pmt { entries } + Pmt { entries, descriptors } } // Invoke this function for each MPEG-TS packet @@ -1481,7 +1564,7 @@ pub fn update_pid_map( program_number_result = program_number; // Ensure the current PMT packet matches the PMT PID from the PAT - if extract_pid(pmt_packet) == pmt_pid { + //if extract_pid(pmt_packet) == pmt_pid { let pmt = parse_pmt(pmt_packet); for pmt_entry in pmt.entries.iter() { @@ -1584,9 +1667,9 @@ pub fn update_pid_map( pid_map.insert(stream_pid, stream_data); } } - } else { - error!("UpdatePIDmap: Skipping PMT PID: {} as it does not match with current PMT packet PID", pmt_pid); - } + //} else { + // error!("UpdatePIDmap: Skipping PMT PID: {} as it does not match with current PMT packet PID", pmt_pid); + //} } program_number_result }