Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pass zeromq zero copy and stream data as json #36

Merged
merged 3 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ async fn main() {
.parse()
.expect(&format!("Invalid format for SILENT"));
let output_file: &str = &env::var("OUTPUT_FILE").unwrap_or("output.ts".to_string());
let send_json_header: bool = env::var("SEND_JSON_HEADER")
.unwrap_or("false".to_string())
.parse()
.expect(&format!("Invalid format for SEND_JSON_HEADER"));

info!("Starting rscap client");

Expand Down Expand Up @@ -71,7 +67,7 @@ async fn main() {
let mut mpeg_packets = 0;
while let Ok(msg) = zmq_sub.recv_bytes(0) {
// Check for JSON header if enabled, it will alternate as the first message before each MPEG-TS chunk
if send_json_header && count % 2 == 0 {
if count % 2 == 0 {
count += 1;
let json_header = String::from_utf8(msg.clone()).unwrap();
if debug_on {
Expand Down
81 changes: 19 additions & 62 deletions src/bin/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@ use log::{debug, error, info};
use pcap::Capture;
use rtp::RtpReader;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
use std::fmt;
use std::io::Write;
use std::net::{Ipv4Addr, UdpSocket};
use std::sync::mpsc;
use std::sync::Mutex;
use std::thread;
use std::time::{SystemTime, UNIX_EPOCH};
use firestorm::{profile_fn, profile_method, profile_section};
use firestorm::{profile_fn};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, Sender};
Expand Down Expand Up @@ -58,6 +56,7 @@ struct Pmt {
}

// StreamData struct
#[derive(Serialize, Deserialize)]
struct StreamData {
pid: u16,
pmt_pid: u16,
Expand All @@ -78,6 +77,7 @@ struct StreamData {
start_time: u64, // field for start time
total_bits: u64, // field for total bits
count: u32, // field for count
#[serde(skip)]
packet: Arc<Vec<u8>>, // The actual MPEG-TS packet data
packet_start: usize, // Offset into the data
packet_len: usize, // Offset into the data
Expand Down Expand Up @@ -1087,74 +1087,31 @@ fn rscap() {

// Spawn a new thread for ZeroMQ communication
let zmq_thread = thread::spawn(move || {
// Setup ZeroMQ publisher
let context = zmq::Context::new();
let publisher = context.socket(zmq::PUB).unwrap();
let source_port_ip = format!("tcp://{}:{}", target_ip, target_port);
publisher.bind(&source_port_ip).unwrap();

let mut total_bytes = 0;
let mut count = 0;
for mut batch in rx {
// Check for a stop signal
if batch.is_empty() {
break; // Exit the loop if a stop signal is received
}
// ... ZeroMQ sending logic ...
for stream_data in batch.iter_mut() {
// Send chunk of data as multipart message
let chunk_size = stream_data.packet_len;
total_bytes += chunk_size;
count += 1;

/*let mut format_str = "unknown";
let format_index = is_mpegts_or_smpte2110(&stream_data.packet[stream_data.packet_start..stream_data.packet_start + stream_data.packet_len]);
if format_index == 1 {
format_str = "mpegts";
} else if format_index == 2 {
format_str = "smpte2110";
}*/

/*
// Construct JSON header for batched data
let json_header = json!({
"type": "mpegts_chunk",
"content_length": chunk_size,
"total_bytes": total_bytes,
"count": count,
"source_ip": source_ip,
"source_port": source_port,
"source_device": source_device,
"target_ip": target_ip,
"target_port": target_port,
"format": format_str,
"count": count,
"timestamp": current_unix_timestamp_ms().unwrap_or(0),
"chunk_size": chunk_size,
"batch_size": batch_size,
});

// Check if JSON header is enabled
if send_json_header {
// Send JSON header as multipart message
publisher
.send(json_header.to_string().as_bytes(), zmq::SNDMORE)
.unwrap();
}*/

publisher.send(&stream_data.packet[stream_data.packet_start..stream_data.packet_start + stream_data.packet_len], 0).unwrap();

// Print progress
//log::info!("STATUS::ZEROMQ:TX {}", json_header);

// release the packet Arc so it can be reused
stream_data.packet = Arc::new(Vec::new()); // Create a new Arc<Vec<u8>> for the next packet
for stream_data in batch.iter() {
// Clone StreamData to get a version without the packet
let cloned_stream_data = stream_data.clone(); // This clone avoids copying the Arc<Vec<u8>>
let metadata = serde_json::to_string(&cloned_stream_data).unwrap();
let metadata_msg = zmq::Message::from(metadata.as_bytes());

// Send metadata as the first message
publisher.send(metadata_msg, zmq::SNDMORE).unwrap();

// Send packet data as the second message
let packet_slice = &stream_data.packet[stream_data.packet_start..stream_data.packet_start + stream_data.packet_len];
let packet_msg = zmq::Message::from(packet_slice);
publisher.send(packet_msg, 0).unwrap();
}
// clear batch and any other data
batch.clear();
}
});


// Perform TR 101 290 checks
let mut tr101290_errors = Tr101290Errors::new();

Expand All @@ -1176,8 +1133,8 @@ fn rscap() {

if silent && !no_progress {
print!(".");
// flush stdout
std::io::stdout().flush().unwrap();
// flush stdout TODO: output in ncurses or something w/less output
//std::io::stdout().flush().unwrap();
}

// Check if chunk is MPEG-TS or SMPTE 2110
Expand Down