diff --git a/.env.example b/.env.example index f86cb5ee..593149fe 100644 --- a/.env.example +++ b/.env.example @@ -3,6 +3,7 @@ RUST_LOG="info" # debug, info, error #DEBUG=true #SILENT=true +#SEND_JSON_HEADER=true USE_WIRELESS=true # Allow wireless interface usage diff --git a/src/bin/client.rs b/src/bin/client.rs index e544f382..5ba6ccef 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -30,6 +30,7 @@ async fn main() { let debug_on: bool = env::var("DEBUG").unwrap_or("false".to_string()).parse().expect(&format!("Invalid format for DEBUG")); let silent: bool = env::var("SILENT").unwrap_or("false".to_string()).parse().expect(&format!("Invalid format for SILENT")); let output_file: &str = &env::var("OUTPUT_FILE").unwrap_or("output.ts".to_string()); + let send_json_header: bool = env::var("SEND_JSON_HEADER").unwrap_or("false".to_string()).parse().expect(&format!("Invalid format for SEND_JSON_HEADER")); info!("Starting rscap client"); @@ -58,11 +59,20 @@ async fn main() { let mut total_bytes = 0; let mut count = 0; + let mut mpeg_packets = 0; while let Ok(msg) = zmq_sub.recv_bytes(0) { + // Check for JSON header if enabled, it will alternate as the first message before each MPEG-TS chunk + if send_json_header && count % 2 == 0 { + count += 1; + let json_header = String::from_utf8(msg.clone()).unwrap(); + info!("#{} Received JSON header: {}", mpeg_packets + 1, json_header); + continue; + } total_bytes += msg.len(); count += 1; + mpeg_packets += 1; if debug_on { - debug!("#{} Received {}/{} bytes", count, msg.len(), total_bytes); + debug!("#{} Received {}/{} bytes", mpeg_packets, msg.len(), total_bytes); } else if !silent { print!("."); std::io::stdout().flush().unwrap(); diff --git a/src/bin/probe.rs b/src/bin/probe.rs index de86ff49..9183bb42 100644 --- a/src/bin/probe.rs +++ b/src/bin/probe.rs @@ -9,7 +9,7 @@ extern crate zmq; use pcap::{Capture}; -//use serde_json::json; +use serde_json::json; use log::{error, debug, info}; use tokio; use std::net::{Ipv4Addr, UdpSocket}; @@ -43,6 +43,8 @@ async fn main() { let use_wireless: bool = env::var("USE_WIRELESS").unwrap_or("false".to_string()).parse().expect(&format!("Invalid format for USE_WIRELESS")); + let send_json_header: bool = env::var("SEND_JSON_HEADER").unwrap_or("false".to_string()).parse().expect(&format!("Invalid format for SEND_JSON_HEADER")); + // Initialize logging // env_logger::init(); // FIXME - this doesn't work with log::LevelFilter let mut log_level: log::LevelFilter = log::LevelFilter::Info; @@ -174,15 +176,21 @@ async fn main() { if batch.len() >= BATCH_SIZE { let batched_data = batch.concat(); - // Construct JSON header for batched data - /*let json_header = json!({ - "type": "mpegts_chunk", - "content_length": batched_data.len(), - }); - - // Send JSON header as multipart message - publisher.send(json_header.to_string().as_bytes(), zmq::SNDMORE).unwrap(); - */ + if send_json_header { + // Construct JSON header for batched data + let json_header = json!({ + "type": "mpegts_chunk", + "content_length": batched_data.len(), + "total_bytes": total_bytes, + "count": count, + "source_ip": source_ip, + "source_port": source_port, + "source_device": source_device + }); + + // Send JSON header as multipart message + publisher.send(json_header.to_string().as_bytes(), zmq::SNDMORE).unwrap(); + } // Send chunk let chunk_size = batched_data.len();