Skip to content

Commit

Permalink
send json headers if enabled
Browse files Browse the repository at this point in the history
add config item to enable sending json metadata in a header packet.
client modified to handle headers and binary chunks properly.
  • Loading branch information
Chris Kennedy committed Dec 9, 2023
1 parent f6569ae commit 086ca35
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 11 deletions.
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ RUST_LOG="info" # debug, info, error

#DEBUG=true
#SILENT=true
#SEND_JSON_HEADER=true

USE_WIRELESS=true # Allow wireless interface usage

Expand Down
12 changes: 11 additions & 1 deletion src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ async fn main() {
let debug_on: bool = env::var("DEBUG").unwrap_or("false".to_string()).parse().expect(&format!("Invalid format for DEBUG"));
let silent: bool = env::var("SILENT").unwrap_or("false".to_string()).parse().expect(&format!("Invalid format for SILENT"));
let output_file: &str = &env::var("OUTPUT_FILE").unwrap_or("output.ts".to_string());
let send_json_header: bool = env::var("SEND_JSON_HEADER").unwrap_or("false".to_string()).parse().expect(&format!("Invalid format for SEND_JSON_HEADER"));

info!("Starting rscap client");

Expand Down Expand Up @@ -58,11 +59,20 @@ async fn main() {

let mut total_bytes = 0;
let mut count = 0;
let mut mpeg_packets = 0;
while let Ok(msg) = zmq_sub.recv_bytes(0) {
// Check for JSON header if enabled, it will alternate as the first message before each MPEG-TS chunk
if send_json_header && count % 2 == 0 {
count += 1;
let json_header = String::from_utf8(msg.clone()).unwrap();
info!("#{} Received JSON header: {}", mpeg_packets + 1, json_header);
continue;
}
total_bytes += msg.len();
count += 1;
mpeg_packets += 1;
if debug_on {
debug!("#{} Received {}/{} bytes", count, msg.len(), total_bytes);
debug!("#{} Received {}/{} bytes", mpeg_packets, msg.len(), total_bytes);
} else if !silent {
print!(".");
std::io::stdout().flush().unwrap();
Expand Down
28 changes: 18 additions & 10 deletions src/bin/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

extern crate zmq;
use pcap::{Capture};
//use serde_json::json;
use serde_json::json;
use log::{error, debug, info};
use tokio;
use std::net::{Ipv4Addr, UdpSocket};
Expand Down Expand Up @@ -43,6 +43,8 @@ async fn main() {

let use_wireless: bool = env::var("USE_WIRELESS").unwrap_or("false".to_string()).parse().expect(&format!("Invalid format for USE_WIRELESS"));

let send_json_header: bool = env::var("SEND_JSON_HEADER").unwrap_or("false".to_string()).parse().expect(&format!("Invalid format for SEND_JSON_HEADER"));

// Initialize logging
// env_logger::init(); // FIXME - this doesn't work with log::LevelFilter
let mut log_level: log::LevelFilter = log::LevelFilter::Info;
Expand Down Expand Up @@ -174,15 +176,21 @@ async fn main() {
if batch.len() >= BATCH_SIZE {
let batched_data = batch.concat();

// Construct JSON header for batched data
/*let json_header = json!({
"type": "mpegts_chunk",
"content_length": batched_data.len(),
});
// Send JSON header as multipart message
publisher.send(json_header.to_string().as_bytes(), zmq::SNDMORE).unwrap();
*/
if send_json_header {
// Construct JSON header for batched data
let json_header = json!({
"type": "mpegts_chunk",
"content_length": batched_data.len(),
"total_bytes": total_bytes,
"count": count,
"source_ip": source_ip,
"source_port": source_port,
"source_device": source_device
});

// Send JSON header as multipart message
publisher.send(json_header.to_string().as_bytes(), zmq::SNDMORE).unwrap();
}

// Send chunk
let chunk_size = batched_data.len();
Expand Down

0 comments on commit 086ca35

Please sign in to comment.