Skip to content

Commit

Permalink
send json header of streamdata serialized to json
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Kennedy committed Dec 21, 2023
1 parent cf06fcb commit 066c28a
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 13 deletions.
6 changes: 1 addition & 5 deletions src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ async fn main() {
.parse()
.expect(&format!("Invalid format for SILENT"));
let output_file: &str = &env::var("OUTPUT_FILE").unwrap_or("output.ts".to_string());
let send_json_header: bool = env::var("SEND_JSON_HEADER")
.unwrap_or("false".to_string())
.parse()
.expect(&format!("Invalid format for SEND_JSON_HEADER"));

info!("Starting rscap client");

Expand Down Expand Up @@ -71,7 +67,7 @@ async fn main() {
let mut mpeg_packets = 0;
while let Ok(msg) = zmq_sub.recv_bytes(0) {
// Check for JSON header if enabled, it will alternate as the first message before each MPEG-TS chunk
if send_json_header && count % 2 == 0 {
if count % 2 == 0 {
count += 1;
let json_header = String::from_utf8(msg.clone()).unwrap();
if debug_on {
Expand Down
25 changes: 17 additions & 8 deletions src/bin/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@ use log::{debug, error, info};
use pcap::Capture;
use rtp::RtpReader;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
use std::fmt;
use std::io::Write;
use std::net::{Ipv4Addr, UdpSocket};
use std::sync::mpsc;
use std::sync::Mutex;
use std::thread;
use std::time::{SystemTime, UNIX_EPOCH};
use firestorm::{profile_fn, profile_method, profile_section};
use firestorm::{profile_fn};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, Sender};
Expand Down Expand Up @@ -58,6 +56,7 @@ struct Pmt {
}

// StreamData struct
#[derive(Serialize, Deserialize)]
struct StreamData {
pid: u16,
pmt_pid: u16,
Expand All @@ -78,6 +77,7 @@ struct StreamData {
start_time: u64, // field for start time
total_bits: u64, // field for total bits
count: u32, // field for count
#[serde(skip)]
packet: Arc<Vec<u8>>, // The actual MPEG-TS packet data
packet_start: usize, // Offset into the data
packet_len: usize, // Offset into the data
Expand Down Expand Up @@ -1093,16 +1093,25 @@ fn rscap() {
publisher.bind(&source_port_ip).unwrap();

for mut batch in rx {
for stream_data in batch.iter_mut() {
let slice = &stream_data.packet[stream_data.packet_start..stream_data.packet_start + stream_data.packet_len];
let message = zmq::Message::from(slice); // Efficiently converts the slice to a Message

publisher.send(message, 0).unwrap();
for stream_data in batch.iter() {
// Clone StreamData to get a version without the packet
let cloned_stream_data = stream_data.clone(); // This clone avoids copying the Arc<Vec<u8>>
let metadata = serde_json::to_string(&cloned_stream_data).unwrap();
let metadata_msg = zmq::Message::from(metadata.as_bytes());

// Send metadata as the first message
publisher.send(metadata_msg, zmq::SNDMORE).unwrap();

// Send packet data as the second message
let packet_slice = &stream_data.packet[stream_data.packet_start..stream_data.packet_start + stream_data.packet_len];
let packet_msg = zmq::Message::from(packet_slice);
publisher.send(packet_msg, 0).unwrap();
}
batch.clear();
}
});


// Perform TR 101 290 checks
let mut tr101290_errors = Tr101290Errors::new();

Expand Down

0 comments on commit 066c28a

Please sign in to comment.