Skip to content

Commit

Permalink
thread zmq writes (#9)
Browse files Browse the repository at this point in the history
get better performance by threading the zeromq output.

Co-authored-by: Chris Kennedy <[email protected]>
  • Loading branch information
groovybits and Chris Kennedy authored Dec 9, 2023
1 parent d1e5a63 commit 00126f1
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 44 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rscap"
version = "0.1.0"
version = "0.1.1"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
110 changes: 67 additions & 43 deletions src/bin/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use tokio;
use std::net::{Ipv4Addr, UdpSocket};
use std::env;
use std::io::Write;
use std::sync::mpsc;
use std::thread;

// Able to keep up with 1080i50 422 10-bit 30 Mbps MPEG-TS stream (not long-term tested)
const BATCH_SIZE: usize = 1000; // N MPEG-TS packets per batch
Expand All @@ -30,12 +32,11 @@ async fn main() {

// Get environment variables or use default values, set in .env file
let target_port: i32 = env::var("TARGET_PORT").unwrap_or("5556".to_string()).parse().expect(&format!("Invalid format for TARGET_PORT"));
let target_ip: &str = &env::var("TARGET_IP").unwrap_or("127.0.0.1".to_string());

let source_device: &str = &env::var("SOURCE_DEVICE").unwrap_or("".to_string());
let target_ip: String = env::var("TARGET_IP").unwrap_or("127.0.0.1".to_string());
let source_device: String = env::var("SOURCE_DEVICE").unwrap_or("".to_string());
let source_ip: String = env::var("SOURCE_IP").unwrap_or("224.0.0.200".to_string());

let source_port: i32 = env::var("SOURCE_PORT").unwrap_or("10000".to_string()).parse().expect(&format!("Invalid format for SOURCE_PORT"));
let source_ip: &str = &env::var("SOURCE_IP").unwrap_or("224.0.0.200".to_string());
let source_device_ip: &str = "0.0.0.0";

let debug_on: bool = env::var("DEBUG").unwrap_or("false".to_string()).parse().expect(&format!("Invalid format for DEBUG"));
Expand All @@ -55,11 +56,6 @@ async fn main() {
.filter_level(log_level)
.init();

let context = zmq::Context::new();
let publisher = context.socket(zmq::PUB).unwrap();
let source_port_ip = format!("tcp://{}:{}", target_ip, target_port);
publisher.bind(&source_port_ip).unwrap();

// device ip address
let mut interface_addr = source_device_ip.parse::<Ipv4Addr>()
.expect(&format!("Invalid IP address format in source_device_ip {}", source_device_ip));
Expand Down Expand Up @@ -155,8 +151,59 @@ async fn main() {
let source_host_and_port = format!("udp dst port {} and ip dst host {}", source_port, source_ip);
cap.filter(&source_host_and_port, true).unwrap();

let mut total_bytes = 0;
let mut count = 0;
// Setup channel for passing data between threads
let (tx, rx) = mpsc::channel::<Vec<Vec<u8>>>();

// Spawn a new thread for ZeroMQ communication
let zmq_thread = thread::spawn(move || {
// Setup ZeroMQ publisher
let context = zmq::Context::new();
let publisher = context.socket(zmq::PUB).unwrap();
let source_port_ip = format!("tcp://{}:{}", target_ip, target_port);
publisher.bind(&source_port_ip).unwrap();

let mut total_bytes = 0;
let mut count = 0;
for batch in rx {
// Check for a stop signal
if batch.is_empty() {
break; // Exit the loop if a stop signal is received
}
// ... ZeroMQ sending logic ...
let batched_data = batch.concat();

if send_json_header {
// Construct JSON header for batched data
let json_header = json!({
"type": "mpegts_chunk",
"content_length": batched_data.len(),
"total_bytes": total_bytes,
"count": count,
"source_ip": source_ip,
"source_port": source_port,
"source_device": source_device
});

// Send JSON header as multipart message
publisher.send(json_header.to_string().as_bytes(), zmq::SNDMORE).unwrap();
}

// Send chunk
let chunk_size = batched_data.len();
total_bytes += chunk_size;
count += 1;
publisher.send(batched_data, 0).unwrap();

if !debug_on {
print!(".");
// flush stdout
std::io::stdout().flush().unwrap();
} else {
debug!("#{} Sent chunk of {}/{} bytes", count, chunk_size, total_bytes);
}
}
});

let mut batch = Vec::new();
while let Ok(packet) = cap.next_packet() {
if debug_on{
Expand All @@ -174,38 +221,8 @@ async fn main() {
batch.push(chunk);

if batch.len() >= BATCH_SIZE {
let batched_data = batch.concat();

if send_json_header {
// Construct JSON header for batched data
let json_header = json!({
"type": "mpegts_chunk",
"content_length": batched_data.len(),
"total_bytes": total_bytes,
"count": count,
"source_ip": source_ip,
"source_port": source_port,
"source_device": source_device
});

// Send JSON header as multipart message
publisher.send(json_header.to_string().as_bytes(), zmq::SNDMORE).unwrap();
}

// Send chunk
let chunk_size = batched_data.len();
total_bytes += chunk_size;
count += 1;
publisher.send(batched_data, 0).unwrap();

if !debug_on {
print!(".");
// flush stdout
std::io::stdout().flush().unwrap();
} else {
debug!("#{} Sent chunk of {}/{} bytes", count, chunk_size, total_bytes);
}

// Send the batch to the channel
tx.send(batch.clone()).unwrap();
batch.clear();
}
} else {
Expand All @@ -217,6 +234,13 @@ async fn main() {
}

info!("Exiting rscap probe");

// Send stop signal
tx.send(Vec::new()).unwrap();
drop(tx);

// Wait for the zmq_thread to finish
zmq_thread.join().unwrap();
}

fn is_mpegts_or_smpte2110(packet: &[u8]) -> bool {
Expand Down

0 comments on commit 00126f1

Please sign in to comment.