Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2110 packet parsing, mpegts packet parsing, logging cleanup #10

Merged
merged 11 commits into from
Dec 9, 2023
19 changes: 9 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,24 +77,23 @@ ffmpeg -i capture.ts

## TODO - roadmap plans

- Thread the pcap capture and queue the packets, also thread zeromq writes to read from the shared queue.
- Add more information header to the json metadata like system stats, network stats, mediainfo, captions, ancillary data.
- (WIP) Add more information header to the json metadata like system stats, network stats, mediainfo, captions, ancillary data.
- (WIP) SMPTE 2110 handling analogous to the MpegTS support.
- (WIP) PAT/PMT parsing, PES parsing and analysis of streams.
- (WIP) FFmpeg libzmq protocol compatibility to allow branching off into libav easily.
- (WIP) Queue and Broker distribution robustness to allow large video streams capture without loss.
- (WIP) General network analyzer view of network around the streams we know/care about.
- Have multiple client modes to distribute processing of the stream on the zmq endpoints.
- Wrap [ltntstools](https://github.com/LTNGlobal-opensource/libltntstools) lib functionality into Rust through C bindings (If possible).
- SEI metadata decoding various aspects of MpegTS.
- Logging to file/sqliteDB with stats for simple basic graphing using gnuplot.
- Use [OpenCV img_hash fingerprinting](https://docs.opencv.org/3.4/d4/d93/group__img__hash.html#ga5eeee1e27bc45caffe3b529ab42568e3) to perceptually align and compare video streams frames.
- OpenAI Whisper speech to text for caption verfication and insertion. <https://github.com/openai/whisper>
- SEI metadata decoding various aspects of MpegTS.
- SMPTE 2110 handling analogous to the MpegTS support.
- PAT/PMT parsing, PES parsing and analysis of streams.
- Problem discovery and reporting via LLM/VectorDB analysis detection of anomalies in data.
- Fine tune LLM model for finding stream issues beyond basic commonly used ones.
- Multiple streams?
- Logging to file/sqliteDB with stats for simple basic graphing using gnuplot.
- Segmentation of captured MpegTS, VOD file writer by various specs.
- Compression for proxy capture.
- FFmpeg libzmq protocol compatibility to allow branching off into libav easily.
- Wrap [ltntstools](https://github.com/LTNGlobal-opensource/libltntstools) lib functionality into Rust through C bindings (If possible).
- Queue and Broker distribution robustness to allow large video streams capture without loss.
- General network analyzer view of network around the streams we know/care about.

### Chris Kennedy (C) 2023 LGPL

4 changes: 3 additions & 1 deletion src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ async fn main() {
if send_json_header && count % 2 == 0 {
count += 1;
let json_header = String::from_utf8(msg.clone()).unwrap();
info!("#{} Received JSON header: {}", mpeg_packets + 1, json_header);
if debug_on {
info!("#{} Received JSON header: {}", mpeg_packets + 1, json_header);
}
continue;
}
total_bytes += msg.len();
Expand Down
134 changes: 107 additions & 27 deletions src/bin/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ use std::io::Write;
use std::sync::mpsc;
use std::thread;

// Able to keep up with 1080i50 422 10-bit 30 Mbps MPEG-TS stream (not long-term tested)
// Able to keep up with 1080p60 420/422 8/10-bit 20+ Mbps MPEG-TS stream (not long-term tested)
const BATCH_SIZE: usize = 1000; // N MPEG-TS packets per batch
const PAYLOAD_OFFSET: usize = 14 + 20 + 8; // Ethernet (14 bytes) + IP (20 bytes) + UDP (8 bytes)
const PACKET_SIZE: usize = 188; // MPEG-TS packet size
const READ_SIZE: i32 = (PACKET_SIZE as i32 * BATCH_SIZE as i32) + PAYLOAD_OFFSET as i32; // pcap read size


// MAIN
#[tokio::main]
async fn main() {
info!("Starting rscap probe");
Expand All @@ -46,15 +46,10 @@ async fn main() {

let send_json_header: bool = env::var("SEND_JSON_HEADER").unwrap_or("false".to_string()).parse().expect(&format!("Invalid format for SEND_JSON_HEADER"));

let mut is_mpegts = true; // Default to true, update based on actual packet type

// Initialize logging
// env_logger::init(); // FIXME - this doesn't work with log::LevelFilter
let mut log_level: log::LevelFilter = log::LevelFilter::Info;
if !silent {
log_level = log::LevelFilter::Debug;
}
env_logger::Builder::new()
.filter_level(log_level)
.init();
env_logger::init(); // set RUST_LOG environment variable to debug for more verbose logging

// device ip address
let mut interface_addr = source_device_ip.parse::<Ipv4Addr>()
Expand All @@ -70,6 +65,7 @@ async fn main() {
if source_device == "auto" || source_device == "" {
info!("Auto-selecting device...");

// Find the first valid device
for device in pcap::Device::list().unwrap() {
debug!("Device {:?}", device);

Expand All @@ -92,6 +88,7 @@ async fn main() {
continue;
}

// check if device has an IPv4 address
for addr in device.addresses.iter() {
if let std::net::IpAddr::V4(ipv4_addr) = addr.addr {
// check if loopback
Expand All @@ -100,22 +97,27 @@ async fn main() {
}
target_device_found = true;

// Found through auto-detection, set interface_addr
info!("Found IPv4 target device {} with ip {}", source_device, ipv4_addr);
interface_addr = ipv4_addr;
target_device = device;
break;
}
}
// break out of loop if target device is found
if target_device_found {
break;
}
}
} else {
// Use the specified device instead of auto-detection
info!("Using specified device {}", source_device);

let target_device_discovered = devices.into_iter().find(|d| d.name == source_device && d.flags.is_up() && !d.flags.is_loopback() && d.flags.is_running() && (!d.flags.is_wireless() || use_wireless))
// Find the specified device
let target_device_discovered = devices.into_iter().find(|d| d.name == source_device && d.flags.is_up() && d.flags.is_running() && (!d.flags.is_wireless() || use_wireless))
.expect(&format!("Target device not found {}", source_device));

// Check if device has an IPv4 address
info!("Target Device: {:?}", target_device_discovered);
for addr in target_device_discovered.addresses.iter() {
if let std::net::IpAddr::V4(ipv4_addr) = addr.addr {
Expand All @@ -134,6 +136,7 @@ async fn main() {
return;
}

// Join multicast group
let multicast_addr = source_ip.parse::<Ipv4Addr>()
.expect(&format!("Invalid IP address format in source_ip {}", source_ip));

Expand Down Expand Up @@ -172,6 +175,14 @@ async fn main() {
// ... ZeroMQ sending logic ...
let batched_data = batch.concat();

let mut format_str = "unknown";
let format_index = is_mpegts_or_smpte2110(&batched_data);
if format_index == 1 {
format_str = "mpegts";
} else if format_index == 2 {
format_str = "smpte2110";
}
// Check if JSON header is enabled
if send_json_header {
// Construct JSON header for batched data
let json_header = json!({
Expand All @@ -181,45 +192,61 @@ async fn main() {
"count": count,
"source_ip": source_ip,
"source_port": source_port,
"source_device": source_device
"source_device": source_device,
"target_ip": target_ip,
"target_port": target_port,
"format": format_str,
});

// Send JSON header as multipart message
publisher.send(json_header.to_string().as_bytes(), zmq::SNDMORE).unwrap();
}

// Send chunk
// Send chunk of data as multipart message
let chunk_size = batched_data.len();
total_bytes += chunk_size;
count += 1;
publisher.send(batched_data, 0).unwrap();

if !debug_on {
// Print progress
if !debug_on && !silent{
print!(".");
// flush stdout
std::io::stdout().flush().unwrap();
} else {
} else if !silent {
debug!("#{} Sent chunk of {}/{} bytes", count, chunk_size, total_bytes);
}
}
});

// Start packet capture
let mut batch = Vec::new();
while let Ok(packet) = cap.next_packet() {
if debug_on{
debug!("Received packet! {:?}", packet.header);
}
let chunks = process_packet(&packet);
let chunks = if is_mpegts {
process_mpegts_packet(&packet)
} else {
process_smpte2110_packet(&packet)
};

// Process each chunk
for chunk in chunks {
if debug_on {
hexdump(&chunk);
}

// Check if chunk is MPEG-TS or SMPTE 2110
if is_mpegts_or_smpte2110(&chunk) {
let chunk_type = is_mpegts_or_smpte2110(&chunk);
if chunk_type == 1 || chunk_type == 2 {
batch.push(chunk);
if chunk_type == 2 {
debug!("SMPTE 2110 packet detected");
is_mpegts = false;
}

// Check if batch is full
if batch.len() >= BATCH_SIZE {
// Send the batch to the channel
tx.send(batch.clone()).unwrap();
Expand All @@ -230,7 +257,6 @@ async fn main() {
error!("Not MPEG-TS or SMPTE 2110");
}
}

}

info!("Exiting rscap probe");
Expand All @@ -243,33 +269,87 @@ async fn main() {
zmq_thread.join().unwrap();
}

fn is_mpegts_or_smpte2110(packet: &[u8]) -> bool {
// identifying MPEG-TS, TODO: check for SMPTE 2110
// Check if the packet is MPEG-TS or SMPTE 2110
fn is_mpegts_or_smpte2110(packet: &[u8]) -> i32 {
// Check for MPEG-TS (starts with 0x47 sync byte)
if packet.starts_with(&[0x47]) {
return 1;
}

// Basic check for RTP (which SMPTE ST 2110 uses)
// This checks if the first byte is 0x80 or 0x81
// This might need more robust checks based on requirements
if packet.len() > 12 && (packet[0] == 0x80 || packet[0] == 0x81) {
// TODO: Check payload type or other RTP header fields here if necessary
return 2; // Assuming it's SMPTE ST 2110 for now
}

0 // Not MPEG-TS or SMPTE 2110
}

// Process the packet and return a vector of SMPTE ST 2110 packets
fn process_smpte2110_packet(packet: &[u8]) -> Vec<Vec<u8>> {
let mut smpte2110_packets = Vec::new();
let start = PAYLOAD_OFFSET;

if packet.len() > start + 12 {
if packet[start] == 0x80 || packet[start] == 0x81 {
let rtp_packet = &packet[start..];
smpte2110_packets.push(rtp_packet.to_vec());

// Extract RTP header information
let sequence_number = u16::from_be_bytes([packet[start + 2], packet[start + 3]]);
let timestamp = u32::from_be_bytes([packet[start + 4], packet[start + 5], packet[start + 6], packet[start + 7]]);
let ssrc = u32::from_be_bytes([packet[start + 8], packet[start + 9], packet[start + 10], packet[start + 11]]);

// Construct JSON object with RTP header information
let rtp_header_info = json!({
"sequence_number": sequence_number,
"timestamp": timestamp,
"ssrc": ssrc
});

// Print out the JSON structure
debug!("RTP Header Info: {}", rtp_header_info.to_string());
}
}

// MPEG-TS typically starts with a 0x47 sync byte.
return packet.starts_with(&[0x47]);
smpte2110_packets
}

fn process_packet(packet: &[u8]) -> Vec<Vec<u8>> {
// Strip off network headers to get to the MPEG-TS payload

// Process the packet and return a vector of MPEG-TS packets
fn process_mpegts_packet(packet: &[u8]) -> Vec<Vec<u8>> {
let mut mpeg_ts_packets = Vec::new();
let mut start = PAYLOAD_OFFSET;

while start + 188 <= packet.len() {
while start + PACKET_SIZE <= packet.len() {
let chunk = &packet[start..start + PACKET_SIZE];
if chunk[0] == 0x47 { // Check for MPEG-TS sync byte
mpeg_ts_packets.push(chunk.to_vec());

// Extract the PID from the MPEG-TS header
let pid = ((chunk[1] as u16 & 0x1F) << 8) | chunk[2] as u16;

// Construct JSON object with MPEG-TS header information
let mpegts_header_info = json!({
"pid": pid
});

// Print out the JSON structure
debug!("MPEG-TS Header Info: {}", mpegts_header_info.to_string());
}
start += 188;
start += PACKET_SIZE;
}

mpeg_ts_packets
}

// Print a hexdump of the packet
fn hexdump(packet: &[u8]) {
// print in rows of 16 bytes
println!("Packet length: {}", packet.len());
for (i, chunk) in packet.iter().take(188).enumerate() {
for (i, chunk) in packet.iter().take(PACKET_SIZE).enumerate() {
if i % 16 == 0 {
print!("\n{:04x}: ", i);
}
Expand Down