diff --git a/README.md b/README.md index beb04c6e..6984b10c 100644 --- a/README.md +++ b/README.md @@ -77,24 +77,23 @@ ffmpeg -i capture.ts ## TODO - roadmap plans -- Thread the pcap capture and queue the packets, also thread zeromq writes to read from the shared queue. -- Add more information header to the json metadata like system stats, network stats, mediainfo, captions, ancillary data. +- (WIP) Add more information header to the json metadata like system stats, network stats, mediainfo, captions, ancillary data. +- (WIP) SMPTE 2110 handling analogous to the MpegTS support. +- (WIP) PAT/PMT parsing, PES parsing and analysis of streams. +- (WIP) FFmpeg libzmq protocol compatibility to allow branching off into libav easily. +- (WIP) Queue and Broker distribution robustness to allow large video streams capture without loss. +- (WIP) General network analyzer view of network around the streams we know/care about. - Have multiple client modes to distribute processing of the stream on the zmq endpoints. +- Wrap [ltntstools](https://github.com/LTNGlobal-opensource/libltntstools) lib functionality into Rust through C bindings (If possible). +- SEI metadata decoding various aspects of MpegTS. +- Logging to file/sqliteDB with stats for simple basic graphing using gnuplot. - Use [OpenCV img_hash fingerprinting](https://docs.opencv.org/3.4/d4/d93/group__img__hash.html#ga5eeee1e27bc45caffe3b529ab42568e3) to perceptually align and compare video streams frames. - OpenAI Whisper speech to text for caption verfication and insertion. -- SEI metadata decoding various aspects of MpegTS. -- SMPTE 2110 handling analogous to the MpegTS support. -- PAT/PMT parsing, PES parsing and analysis of streams. - Problem discovery and reporting via LLM/VectorDB analysis detection of anomalies in data. - Fine tune LLM model for finding stream issues beyond basic commonly used ones. - Multiple streams? -- Logging to file/sqliteDB with stats for simple basic graphing using gnuplot. - Segmentation of captured MpegTS, VOD file writer by various specs. - Compression for proxy capture. -- FFmpeg libzmq protocol compatibility to allow branching off into libav easily. -- Wrap [ltntstools](https://github.com/LTNGlobal-opensource/libltntstools) lib functionality into Rust through C bindings (If possible). -- Queue and Broker distribution robustness to allow large video streams capture without loss. -- General network analyzer view of network around the streams we know/care about. ### Chris Kennedy (C) 2023 LGPL diff --git a/src/bin/client.rs b/src/bin/client.rs index 5ba6ccef..babc3668 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -65,7 +65,9 @@ async fn main() { if send_json_header && count % 2 == 0 { count += 1; let json_header = String::from_utf8(msg.clone()).unwrap(); - info!("#{} Received JSON header: {}", mpeg_packets + 1, json_header); + if debug_on { + info!("#{} Received JSON header: {}", mpeg_packets + 1, json_header); + } continue; } total_bytes += msg.len(); diff --git a/src/bin/probe.rs b/src/bin/probe.rs index c43d68fe..42b3de13 100644 --- a/src/bin/probe.rs +++ b/src/bin/probe.rs @@ -18,13 +18,13 @@ use std::io::Write; use std::sync::mpsc; use std::thread; -// Able to keep up with 1080i50 422 10-bit 30 Mbps MPEG-TS stream (not long-term tested) +// Able to keep up with 1080p60 420/422 8/10-bit 20+ Mbps MPEG-TS stream (not long-term tested) const BATCH_SIZE: usize = 1000; // N MPEG-TS packets per batch const PAYLOAD_OFFSET: usize = 14 + 20 + 8; // Ethernet (14 bytes) + IP (20 bytes) + UDP (8 bytes) const PACKET_SIZE: usize = 188; // MPEG-TS packet size const READ_SIZE: i32 = (PACKET_SIZE as i32 * BATCH_SIZE as i32) + PAYLOAD_OFFSET as i32; // pcap read size - +// MAIN #[tokio::main] async fn main() { info!("Starting rscap probe"); @@ -46,15 +46,10 @@ async fn main() { let send_json_header: bool = env::var("SEND_JSON_HEADER").unwrap_or("false".to_string()).parse().expect(&format!("Invalid format for SEND_JSON_HEADER")); + let mut is_mpegts = true; // Default to true, update based on actual packet type + // Initialize logging - // env_logger::init(); // FIXME - this doesn't work with log::LevelFilter - let mut log_level: log::LevelFilter = log::LevelFilter::Info; - if !silent { - log_level = log::LevelFilter::Debug; - } - env_logger::Builder::new() - .filter_level(log_level) - .init(); + env_logger::init(); // set RUST_LOG environment variable to debug for more verbose logging // device ip address let mut interface_addr = source_device_ip.parse::() @@ -70,6 +65,7 @@ async fn main() { if source_device == "auto" || source_device == "" { info!("Auto-selecting device..."); + // Find the first valid device for device in pcap::Device::list().unwrap() { debug!("Device {:?}", device); @@ -92,6 +88,7 @@ async fn main() { continue; } + // check if device has an IPv4 address for addr in device.addresses.iter() { if let std::net::IpAddr::V4(ipv4_addr) = addr.addr { // check if loopback @@ -100,22 +97,27 @@ async fn main() { } target_device_found = true; + // Found through auto-detection, set interface_addr info!("Found IPv4 target device {} with ip {}", source_device, ipv4_addr); interface_addr = ipv4_addr; target_device = device; break; } } + // break out of loop if target device is found if target_device_found { break; } } } else { + // Use the specified device instead of auto-detection info!("Using specified device {}", source_device); - let target_device_discovered = devices.into_iter().find(|d| d.name == source_device && d.flags.is_up() && !d.flags.is_loopback() && d.flags.is_running() && (!d.flags.is_wireless() || use_wireless)) + // Find the specified device + let target_device_discovered = devices.into_iter().find(|d| d.name == source_device && d.flags.is_up() && d.flags.is_running() && (!d.flags.is_wireless() || use_wireless)) .expect(&format!("Target device not found {}", source_device)); + // Check if device has an IPv4 address info!("Target Device: {:?}", target_device_discovered); for addr in target_device_discovered.addresses.iter() { if let std::net::IpAddr::V4(ipv4_addr) = addr.addr { @@ -134,6 +136,7 @@ async fn main() { return; } + // Join multicast group let multicast_addr = source_ip.parse::() .expect(&format!("Invalid IP address format in source_ip {}", source_ip)); @@ -172,6 +175,14 @@ async fn main() { // ... ZeroMQ sending logic ... let batched_data = batch.concat(); + let mut format_str = "unknown"; + let format_index = is_mpegts_or_smpte2110(&batched_data); + if format_index == 1 { + format_str = "mpegts"; + } else if format_index == 2 { + format_str = "smpte2110"; + } + // Check if JSON header is enabled if send_json_header { // Construct JSON header for batched data let json_header = json!({ @@ -181,45 +192,61 @@ async fn main() { "count": count, "source_ip": source_ip, "source_port": source_port, - "source_device": source_device + "source_device": source_device, + "target_ip": target_ip, + "target_port": target_port, + "format": format_str, }); // Send JSON header as multipart message publisher.send(json_header.to_string().as_bytes(), zmq::SNDMORE).unwrap(); } - // Send chunk + // Send chunk of data as multipart message let chunk_size = batched_data.len(); total_bytes += chunk_size; count += 1; publisher.send(batched_data, 0).unwrap(); - if !debug_on { + // Print progress + if !debug_on && !silent{ print!("."); // flush stdout std::io::stdout().flush().unwrap(); - } else { + } else if !silent { debug!("#{} Sent chunk of {}/{} bytes", count, chunk_size, total_bytes); } } }); + // Start packet capture let mut batch = Vec::new(); while let Ok(packet) = cap.next_packet() { if debug_on{ debug!("Received packet! {:?}", packet.header); } - let chunks = process_packet(&packet); + let chunks = if is_mpegts { + process_mpegts_packet(&packet) + } else { + process_smpte2110_packet(&packet) + }; + // Process each chunk for chunk in chunks { if debug_on { hexdump(&chunk); } // Check if chunk is MPEG-TS or SMPTE 2110 - if is_mpegts_or_smpte2110(&chunk) { + let chunk_type = is_mpegts_or_smpte2110(&chunk); + if chunk_type == 1 || chunk_type == 2 { batch.push(chunk); + if chunk_type == 2 { + debug!("SMPTE 2110 packet detected"); + is_mpegts = false; + } + // Check if batch is full if batch.len() >= BATCH_SIZE { // Send the batch to the channel tx.send(batch.clone()).unwrap(); @@ -230,7 +257,6 @@ async fn main() { error!("Not MPEG-TS or SMPTE 2110"); } } - } info!("Exiting rscap probe"); @@ -243,33 +269,87 @@ async fn main() { zmq_thread.join().unwrap(); } -fn is_mpegts_or_smpte2110(packet: &[u8]) -> bool { - // identifying MPEG-TS, TODO: check for SMPTE 2110 +// Check if the packet is MPEG-TS or SMPTE 2110 +fn is_mpegts_or_smpte2110(packet: &[u8]) -> i32 { + // Check for MPEG-TS (starts with 0x47 sync byte) + if packet.starts_with(&[0x47]) { + return 1; + } + + // Basic check for RTP (which SMPTE ST 2110 uses) + // This checks if the first byte is 0x80 or 0x81 + // This might need more robust checks based on requirements + if packet.len() > 12 && (packet[0] == 0x80 || packet[0] == 0x81) { + // TODO: Check payload type or other RTP header fields here if necessary + return 2; // Assuming it's SMPTE ST 2110 for now + } + + 0 // Not MPEG-TS or SMPTE 2110 +} + +// Process the packet and return a vector of SMPTE ST 2110 packets +fn process_smpte2110_packet(packet: &[u8]) -> Vec> { + let mut smpte2110_packets = Vec::new(); + let start = PAYLOAD_OFFSET; + + if packet.len() > start + 12 { + if packet[start] == 0x80 || packet[start] == 0x81 { + let rtp_packet = &packet[start..]; + smpte2110_packets.push(rtp_packet.to_vec()); + + // Extract RTP header information + let sequence_number = u16::from_be_bytes([packet[start + 2], packet[start + 3]]); + let timestamp = u32::from_be_bytes([packet[start + 4], packet[start + 5], packet[start + 6], packet[start + 7]]); + let ssrc = u32::from_be_bytes([packet[start + 8], packet[start + 9], packet[start + 10], packet[start + 11]]); + + // Construct JSON object with RTP header information + let rtp_header_info = json!({ + "sequence_number": sequence_number, + "timestamp": timestamp, + "ssrc": ssrc + }); + + // Print out the JSON structure + debug!("RTP Header Info: {}", rtp_header_info.to_string()); + } + } - // MPEG-TS typically starts with a 0x47 sync byte. - return packet.starts_with(&[0x47]); + smpte2110_packets } -fn process_packet(packet: &[u8]) -> Vec> { - // Strip off network headers to get to the MPEG-TS payload + +// Process the packet and return a vector of MPEG-TS packets +fn process_mpegts_packet(packet: &[u8]) -> Vec> { let mut mpeg_ts_packets = Vec::new(); let mut start = PAYLOAD_OFFSET; - while start + 188 <= packet.len() { + while start + PACKET_SIZE <= packet.len() { let chunk = &packet[start..start + PACKET_SIZE]; if chunk[0] == 0x47 { // Check for MPEG-TS sync byte mpeg_ts_packets.push(chunk.to_vec()); + + // Extract the PID from the MPEG-TS header + let pid = ((chunk[1] as u16 & 0x1F) << 8) | chunk[2] as u16; + + // Construct JSON object with MPEG-TS header information + let mpegts_header_info = json!({ + "pid": pid + }); + + // Print out the JSON structure + debug!("MPEG-TS Header Info: {}", mpegts_header_info.to_string()); } - start += 188; + start += PACKET_SIZE; } mpeg_ts_packets } +// Print a hexdump of the packet fn hexdump(packet: &[u8]) { // print in rows of 16 bytes println!("Packet length: {}", packet.len()); - for (i, chunk) in packet.iter().take(188).enumerate() { + for (i, chunk) in packet.iter().take(PACKET_SIZE).enumerate() { if i % 16 == 0 { print!("\n{:04x}: ", i); }