From 54f97db389fd9efdd6401012ba3942ea2aa3f828 Mon Sep 17 00:00:00 2001 From: Chris Kennedy Date: Sat, 9 Dec 2023 06:47:00 -0800 Subject: [PATCH 01/11] replace packet size hardcode with variable --- src/bin/probe.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/bin/probe.rs b/src/bin/probe.rs index c43d68fe..c42f7d6a 100644 --- a/src/bin/probe.rs +++ b/src/bin/probe.rs @@ -255,12 +255,12 @@ fn process_packet(packet: &[u8]) -> Vec> { let mut mpeg_ts_packets = Vec::new(); let mut start = PAYLOAD_OFFSET; - while start + 188 <= packet.len() { + while start + PACKET_SIZE <= packet.len() { let chunk = &packet[start..start + PACKET_SIZE]; if chunk[0] == 0x47 { // Check for MPEG-TS sync byte mpeg_ts_packets.push(chunk.to_vec()); } - start += 188; + start += PACKET_SIZE; } mpeg_ts_packets @@ -269,7 +269,7 @@ fn process_packet(packet: &[u8]) -> Vec> { fn hexdump(packet: &[u8]) { // print in rows of 16 bytes println!("Packet length: {}", packet.len()); - for (i, chunk) in packet.iter().take(188).enumerate() { + for (i, chunk) in packet.iter().take(PACKET_SIZE).enumerate() { if i % 16 == 0 { print!("\n{:04x}: ", i); } From 2413c7a66b724632fe8e3bd9d3226d119a60d632 Mon Sep 17 00:00:00 2001 From: Chris Kennedy Date: Sat, 9 Dec 2023 06:50:41 -0800 Subject: [PATCH 02/11] reduce verbosity of client output --- src/bin/client.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/bin/client.rs b/src/bin/client.rs index 5ba6ccef..babc3668 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -65,7 +65,9 @@ async fn main() { if send_json_header && count % 2 == 0 { count += 1; let json_header = String::from_utf8(msg.clone()).unwrap(); - info!("#{} Received JSON header: {}", mpeg_packets + 1, json_header); + if debug_on { + info!("#{} Received JSON header: {}", mpeg_packets + 1, json_header); + } continue; } total_bytes += msg.len(); From f9582f95a6788b9bd389643236b906e62ad64133 Mon Sep 17 00:00:00 2001 From: Chris Kennedy Date: Sat, 9 Dec 2023 06:55:43 -0800 Subject: [PATCH 03/11] update comment on performance max tested so far --- src/bin/probe.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bin/probe.rs b/src/bin/probe.rs index c42f7d6a..2ebfb1de 100644 --- a/src/bin/probe.rs +++ b/src/bin/probe.rs @@ -18,7 +18,7 @@ use std::io::Write; use std::sync::mpsc; use std::thread; -// Able to keep up with 1080i50 422 10-bit 30 Mbps MPEG-TS stream (not long-term tested) +// Able to keep up with 1080p60 420/422 8/10-bit 20+ Mbps MPEG-TS stream (not long-term tested) const BATCH_SIZE: usize = 1000; // N MPEG-TS packets per batch const PAYLOAD_OFFSET: usize = 14 + 20 + 8; // Ethernet (14 bytes) + IP (20 bytes) + UDP (8 bytes) const PACKET_SIZE: usize = 188; // MPEG-TS packet size From c4c9f431d6c8db5a029ae5e39163c3274a29d640 Mon Sep 17 00:00:00 2001 From: Chris Kennedy Date: Sat, 9 Dec 2023 06:59:32 -0800 Subject: [PATCH 04/11] check for small chunks less than ip headers --- src/bin/probe.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/bin/probe.rs b/src/bin/probe.rs index 2ebfb1de..001b8a1c 100644 --- a/src/bin/probe.rs +++ b/src/bin/probe.rs @@ -204,6 +204,7 @@ async fn main() { } }); + // Start packet capture let mut batch = Vec::new(); while let Ok(packet) = cap.next_packet() { if debug_on{ @@ -220,6 +221,7 @@ async fn main() { if is_mpegts_or_smpte2110(&chunk) { batch.push(chunk); + // Check if batch is full if batch.len() >= BATCH_SIZE { // Send the batch to the channel tx.send(batch.clone()).unwrap(); @@ -246,6 +248,11 @@ async fn main() { fn is_mpegts_or_smpte2110(packet: &[u8]) -> bool { // identifying MPEG-TS, TODO: check for SMPTE 2110 + if packet.len() < PAYLOAD_OFFSET { + return false; + } + // SMPTE 2110 detection TODO... + // MPEG-TS typically starts with a 0x47 sync byte. return packet.starts_with(&[0x47]); } From 7029663a63290bc16a05a23a0696b70c520747d4 Mon Sep 17 00:00:00 2001 From: Chris Kennedy Date: Sat, 9 Dec 2023 07:00:45 -0800 Subject: [PATCH 05/11] remove extra blank line --- src/bin/probe.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/bin/probe.rs b/src/bin/probe.rs index 001b8a1c..82fe9e17 100644 --- a/src/bin/probe.rs +++ b/src/bin/probe.rs @@ -232,7 +232,6 @@ async fn main() { error!("Not MPEG-TS or SMPTE 2110"); } } - } info!("Exiting rscap probe"); From 360684c5d35a2cbf8e2884304697383a4bfd621a Mon Sep 17 00:00:00 2001 From: Chris Kennedy Date: Sat, 9 Dec 2023 07:05:48 -0800 Subject: [PATCH 06/11] add comments to functions and routines explain what each section is doing in a summary above them. --- src/bin/probe.rs | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/bin/probe.rs b/src/bin/probe.rs index 82fe9e17..b5777bac 100644 --- a/src/bin/probe.rs +++ b/src/bin/probe.rs @@ -24,7 +24,7 @@ const PAYLOAD_OFFSET: usize = 14 + 20 + 8; // Ethernet (14 bytes) + IP (20 bytes const PACKET_SIZE: usize = 188; // MPEG-TS packet size const READ_SIZE: i32 = (PACKET_SIZE as i32 * BATCH_SIZE as i32) + PAYLOAD_OFFSET as i32; // pcap read size - +// MAIN #[tokio::main] async fn main() { info!("Starting rscap probe"); @@ -70,6 +70,7 @@ async fn main() { if source_device == "auto" || source_device == "" { info!("Auto-selecting device..."); + // Find the first valid device for device in pcap::Device::list().unwrap() { debug!("Device {:?}", device); @@ -92,6 +93,7 @@ async fn main() { continue; } + // check if device has an IPv4 address for addr in device.addresses.iter() { if let std::net::IpAddr::V4(ipv4_addr) = addr.addr { // check if loopback @@ -100,22 +102,27 @@ async fn main() { } target_device_found = true; + // Found through auto-detection, set interface_addr info!("Found IPv4 target device {} with ip {}", source_device, ipv4_addr); interface_addr = ipv4_addr; target_device = device; break; } } + // break out of loop if target device is found if target_device_found { break; } } } else { + // Use the specified device instead of auto-detection info!("Using specified device {}", source_device); + // Find the specified device let target_device_discovered = devices.into_iter().find(|d| d.name == source_device && d.flags.is_up() && !d.flags.is_loopback() && d.flags.is_running() && (!d.flags.is_wireless() || use_wireless)) .expect(&format!("Target device not found {}", source_device)); + // Check if device has an IPv4 address info!("Target Device: {:?}", target_device_discovered); for addr in target_device_discovered.addresses.iter() { if let std::net::IpAddr::V4(ipv4_addr) = addr.addr { @@ -134,6 +141,7 @@ async fn main() { return; } + // Join multicast group let multicast_addr = source_ip.parse::() .expect(&format!("Invalid IP address format in source_ip {}", source_ip)); @@ -172,6 +180,7 @@ async fn main() { // ... ZeroMQ sending logic ... let batched_data = batch.concat(); + // Check if JSON header is enabled if send_json_header { // Construct JSON header for batched data let json_header = json!({ @@ -188,12 +197,13 @@ async fn main() { publisher.send(json_header.to_string().as_bytes(), zmq::SNDMORE).unwrap(); } - // Send chunk + // Send chunk of data as multipart message let chunk_size = batched_data.len(); total_bytes += chunk_size; count += 1; publisher.send(batched_data, 0).unwrap(); + // Print progress if !debug_on { print!("."); // flush stdout @@ -212,6 +222,7 @@ async fn main() { } let chunks = process_packet(&packet); + // Process each chunk for chunk in chunks { if debug_on { hexdump(&chunk); @@ -244,6 +255,7 @@ async fn main() { zmq_thread.join().unwrap(); } +// Check if the packet is MPEG-TS or SMPTE 2110 fn is_mpegts_or_smpte2110(packet: &[u8]) -> bool { // identifying MPEG-TS, TODO: check for SMPTE 2110 @@ -256,6 +268,7 @@ fn is_mpegts_or_smpte2110(packet: &[u8]) -> bool { return packet.starts_with(&[0x47]); } +// Process the packet and return a vector of MPEG-TS packets fn process_packet(packet: &[u8]) -> Vec> { // Strip off network headers to get to the MPEG-TS payload let mut mpeg_ts_packets = Vec::new(); @@ -272,6 +285,7 @@ fn process_packet(packet: &[u8]) -> Vec> { mpeg_ts_packets } +// Print a hexdump of the packet fn hexdump(packet: &[u8]) { // print in rows of 16 bytes println!("Packet length: {}", packet.len()); From 79dbca76bd965fc21ee01deca4c6df3bcbfa44a6 Mon Sep 17 00:00:00 2001 From: Chris Kennedy Date: Sat, 9 Dec 2023 07:08:34 -0800 Subject: [PATCH 07/11] if interface is specified, allow loopback --- src/bin/probe.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bin/probe.rs b/src/bin/probe.rs index b5777bac..ca2196f2 100644 --- a/src/bin/probe.rs +++ b/src/bin/probe.rs @@ -119,7 +119,7 @@ async fn main() { info!("Using specified device {}", source_device); // Find the specified device - let target_device_discovered = devices.into_iter().find(|d| d.name == source_device && d.flags.is_up() && !d.flags.is_loopback() && d.flags.is_running() && (!d.flags.is_wireless() || use_wireless)) + let target_device_discovered = devices.into_iter().find(|d| d.name == source_device && d.flags.is_up() && d.flags.is_running() && (!d.flags.is_wireless() || use_wireless)) .expect(&format!("Target device not found {}", source_device)); // Check if device has an IPv4 address From 6a22bb8c80479a50d91cfa1b23cfb803622ba2c7 Mon Sep 17 00:00:00 2001 From: Chris Kennedy Date: Sat, 9 Dec 2023 07:17:08 -0800 Subject: [PATCH 08/11] added logic for smpte2110 packets WIP: needs to have the rest of the logic dealt with for 2110. --- src/bin/probe.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/bin/probe.rs b/src/bin/probe.rs index ca2196f2..89c5fcd0 100644 --- a/src/bin/probe.rs +++ b/src/bin/probe.rs @@ -257,15 +257,20 @@ async fn main() { // Check if the packet is MPEG-TS or SMPTE 2110 fn is_mpegts_or_smpte2110(packet: &[u8]) -> bool { - // identifying MPEG-TS, TODO: check for SMPTE 2110 + // Check for MPEG-TS (starts with 0x47 sync byte) + if packet.starts_with(&[0x47]) { + return true; + } - if packet.len() < PAYLOAD_OFFSET { - return false; + // Basic check for RTP (which SMPTE ST 2110 uses) + // This checks if the first byte is 0x80 or 0x81 + // This might need more robust checks based on requirements + if packet.len() > 12 && (packet[0] == 0x80 || packet[0] == 0x81) { + // TODO: Check payload type or other RTP header fields here if necessary + return true; // Assuming it's SMPTE ST 2110 for now } - // SMPTE 2110 detection TODO... - // MPEG-TS typically starts with a 0x47 sync byte. - return packet.starts_with(&[0x47]); + false } // Process the packet and return a vector of MPEG-TS packets From 8dabad02b6898c49725cda4f2476369a15a9dee3 Mon Sep 17 00:00:00 2001 From: Chris Kennedy Date: Sat, 9 Dec 2023 07:37:56 -0800 Subject: [PATCH 09/11] auto-detect smpte2110 basic crude form, untested and doubtful it works yet. --- src/bin/probe.rs | 54 +++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 46 insertions(+), 8 deletions(-) diff --git a/src/bin/probe.rs b/src/bin/probe.rs index 89c5fcd0..a36e4844 100644 --- a/src/bin/probe.rs +++ b/src/bin/probe.rs @@ -46,6 +46,8 @@ async fn main() { let send_json_header: bool = env::var("SEND_JSON_HEADER").unwrap_or("false".to_string()).parse().expect(&format!("Invalid format for SEND_JSON_HEADER")); + let mut is_mpegts = true; // Default to true, update based on actual packet type + // Initialize logging // env_logger::init(); // FIXME - this doesn't work with log::LevelFilter let mut log_level: log::LevelFilter = log::LevelFilter::Info; @@ -180,6 +182,13 @@ async fn main() { // ... ZeroMQ sending logic ... let batched_data = batch.concat(); + let mut format_str = "unknown"; + let format_index = is_mpegts_or_smpte2110(&batched_data); + if format_index == 1 { + format_str = "mpegts"; + } else if format_index == 2 { + format_str = "smpte2110"; + } // Check if JSON header is enabled if send_json_header { // Construct JSON header for batched data @@ -190,7 +199,10 @@ async fn main() { "count": count, "source_ip": source_ip, "source_port": source_port, - "source_device": source_device + "source_device": source_device, + "target_ip": target_ip, + "target_port": target_port, + "format": format_str, }); // Send JSON header as multipart message @@ -220,7 +232,11 @@ async fn main() { if debug_on{ debug!("Received packet! {:?}", packet.header); } - let chunks = process_packet(&packet); + let chunks = if is_mpegts { + process_mpegts_packet(&packet) + } else { + process_smpte2110_packet(&packet) + }; // Process each chunk for chunk in chunks { @@ -229,8 +245,13 @@ async fn main() { } // Check if chunk is MPEG-TS or SMPTE 2110 - if is_mpegts_or_smpte2110(&chunk) { + let chunk_type = is_mpegts_or_smpte2110(&chunk); + if chunk_type == 1 || chunk_type == 2 { batch.push(chunk); + if chunk_type == 2 { + debug!("SMPTE 2110 packet detected"); + is_mpegts = false; + } // Check if batch is full if batch.len() >= BATCH_SIZE { @@ -256,10 +277,10 @@ async fn main() { } // Check if the packet is MPEG-TS or SMPTE 2110 -fn is_mpegts_or_smpte2110(packet: &[u8]) -> bool { +fn is_mpegts_or_smpte2110(packet: &[u8]) -> i32 { // Check for MPEG-TS (starts with 0x47 sync byte) if packet.starts_with(&[0x47]) { - return true; + return 1; } // Basic check for RTP (which SMPTE ST 2110 uses) @@ -267,14 +288,31 @@ fn is_mpegts_or_smpte2110(packet: &[u8]) -> bool { // This might need more robust checks based on requirements if packet.len() > 12 && (packet[0] == 0x80 || packet[0] == 0x81) { // TODO: Check payload type or other RTP header fields here if necessary - return true; // Assuming it's SMPTE ST 2110 for now + return 2; // Assuming it's SMPTE ST 2110 for now + } + + 0 // Not MPEG-TS or SMPTE 2110 +} + +// Process the packet and return a vector of SMPTE ST 2110 packets +fn process_smpte2110_packet(packet: &[u8]) -> Vec> { + // Strip off network headers to get to the SMPTE ST 2110 payload + let mut smpte2110_packets = Vec::new(); + let mut start = PAYLOAD_OFFSET; + + while start + PACKET_SIZE <= packet.len() { + let chunk = &packet[start..start + PACKET_SIZE]; + if chunk[0] == 0x80 || chunk[0] == 0x81 { // Check for RTP version 2 + smpte2110_packets.push(chunk.to_vec()); + } + start += PACKET_SIZE; } - false + smpte2110_packets } // Process the packet and return a vector of MPEG-TS packets -fn process_packet(packet: &[u8]) -> Vec> { +fn process_mpegts_packet(packet: &[u8]) -> Vec> { // Strip off network headers to get to the MPEG-TS payload let mut mpeg_ts_packets = Vec::new(); let mut start = PAYLOAD_OFFSET; From b3292de1ce36465bcb367826ba68a4ad1a244073 Mon Sep 17 00:00:00 2001 From: Chris Kennedy Date: Sat, 9 Dec 2023 08:03:03 -0800 Subject: [PATCH 10/11] add 2110 packet parsing check mpegts and 2110 packets structure. print out details of packets. fix info/debug/error rust log env variable handling. --- src/bin/probe.rs | 55 +++++++++++++++++++++++++++++++----------------- 1 file changed, 36 insertions(+), 19 deletions(-) diff --git a/src/bin/probe.rs b/src/bin/probe.rs index a36e4844..42b3de13 100644 --- a/src/bin/probe.rs +++ b/src/bin/probe.rs @@ -49,14 +49,7 @@ async fn main() { let mut is_mpegts = true; // Default to true, update based on actual packet type // Initialize logging - // env_logger::init(); // FIXME - this doesn't work with log::LevelFilter - let mut log_level: log::LevelFilter = log::LevelFilter::Info; - if !silent { - log_level = log::LevelFilter::Debug; - } - env_logger::Builder::new() - .filter_level(log_level) - .init(); + env_logger::init(); // set RUST_LOG environment variable to debug for more verbose logging // device ip address let mut interface_addr = source_device_ip.parse::() @@ -216,11 +209,11 @@ async fn main() { publisher.send(batched_data, 0).unwrap(); // Print progress - if !debug_on { + if !debug_on && !silent{ print!("."); // flush stdout std::io::stdout().flush().unwrap(); - } else { + } else if !silent { debug!("#{} Sent chunk of {}/{} bytes", count, chunk_size, total_bytes); } } @@ -296,24 +289,37 @@ fn is_mpegts_or_smpte2110(packet: &[u8]) -> i32 { // Process the packet and return a vector of SMPTE ST 2110 packets fn process_smpte2110_packet(packet: &[u8]) -> Vec> { - // Strip off network headers to get to the SMPTE ST 2110 payload let mut smpte2110_packets = Vec::new(); - let mut start = PAYLOAD_OFFSET; - - while start + PACKET_SIZE <= packet.len() { - let chunk = &packet[start..start + PACKET_SIZE]; - if chunk[0] == 0x80 || chunk[0] == 0x81 { // Check for RTP version 2 - smpte2110_packets.push(chunk.to_vec()); + let start = PAYLOAD_OFFSET; + + if packet.len() > start + 12 { + if packet[start] == 0x80 || packet[start] == 0x81 { + let rtp_packet = &packet[start..]; + smpte2110_packets.push(rtp_packet.to_vec()); + + // Extract RTP header information + let sequence_number = u16::from_be_bytes([packet[start + 2], packet[start + 3]]); + let timestamp = u32::from_be_bytes([packet[start + 4], packet[start + 5], packet[start + 6], packet[start + 7]]); + let ssrc = u32::from_be_bytes([packet[start + 8], packet[start + 9], packet[start + 10], packet[start + 11]]); + + // Construct JSON object with RTP header information + let rtp_header_info = json!({ + "sequence_number": sequence_number, + "timestamp": timestamp, + "ssrc": ssrc + }); + + // Print out the JSON structure + debug!("RTP Header Info: {}", rtp_header_info.to_string()); } - start += PACKET_SIZE; } smpte2110_packets } + // Process the packet and return a vector of MPEG-TS packets fn process_mpegts_packet(packet: &[u8]) -> Vec> { - // Strip off network headers to get to the MPEG-TS payload let mut mpeg_ts_packets = Vec::new(); let mut start = PAYLOAD_OFFSET; @@ -321,6 +327,17 @@ fn process_mpegts_packet(packet: &[u8]) -> Vec> { let chunk = &packet[start..start + PACKET_SIZE]; if chunk[0] == 0x47 { // Check for MPEG-TS sync byte mpeg_ts_packets.push(chunk.to_vec()); + + // Extract the PID from the MPEG-TS header + let pid = ((chunk[1] as u16 & 0x1F) << 8) | chunk[2] as u16; + + // Construct JSON object with MPEG-TS header information + let mpegts_header_info = json!({ + "pid": pid + }); + + // Print out the JSON structure + debug!("MPEG-TS Header Info: {}", mpegts_header_info.to_string()); } start += PACKET_SIZE; } From 448bc35f7b41fe9fbac891be64bd6ff4e75842ce Mon Sep 17 00:00:00 2001 From: Chris Kennedy Date: Sat, 9 Dec 2023 08:07:09 -0800 Subject: [PATCH 11/11] update roadmap, show what is in progress --- README.md | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index beb04c6e..6984b10c 100644 --- a/README.md +++ b/README.md @@ -77,24 +77,23 @@ ffmpeg -i capture.ts ## TODO - roadmap plans -- Thread the pcap capture and queue the packets, also thread zeromq writes to read from the shared queue. -- Add more information header to the json metadata like system stats, network stats, mediainfo, captions, ancillary data. +- (WIP) Add more information header to the json metadata like system stats, network stats, mediainfo, captions, ancillary data. +- (WIP) SMPTE 2110 handling analogous to the MpegTS support. +- (WIP) PAT/PMT parsing, PES parsing and analysis of streams. +- (WIP) FFmpeg libzmq protocol compatibility to allow branching off into libav easily. +- (WIP) Queue and Broker distribution robustness to allow large video streams capture without loss. +- (WIP) General network analyzer view of network around the streams we know/care about. - Have multiple client modes to distribute processing of the stream on the zmq endpoints. +- Wrap [ltntstools](https://github.com/LTNGlobal-opensource/libltntstools) lib functionality into Rust through C bindings (If possible). +- SEI metadata decoding various aspects of MpegTS. +- Logging to file/sqliteDB with stats for simple basic graphing using gnuplot. - Use [OpenCV img_hash fingerprinting](https://docs.opencv.org/3.4/d4/d93/group__img__hash.html#ga5eeee1e27bc45caffe3b529ab42568e3) to perceptually align and compare video streams frames. - OpenAI Whisper speech to text for caption verfication and insertion. -- SEI metadata decoding various aspects of MpegTS. -- SMPTE 2110 handling analogous to the MpegTS support. -- PAT/PMT parsing, PES parsing and analysis of streams. - Problem discovery and reporting via LLM/VectorDB analysis detection of anomalies in data. - Fine tune LLM model for finding stream issues beyond basic commonly used ones. - Multiple streams? -- Logging to file/sqliteDB with stats for simple basic graphing using gnuplot. - Segmentation of captured MpegTS, VOD file writer by various specs. - Compression for proxy capture. -- FFmpeg libzmq protocol compatibility to allow branching off into libav easily. -- Wrap [ltntstools](https://github.com/LTNGlobal-opensource/libltntstools) lib functionality into Rust through C bindings (If possible). -- Queue and Broker distribution robustness to allow large video streams capture without loss. -- General network analyzer view of network around the streams we know/care about. ### Chris Kennedy (C) 2023 LGPL