Skip to content

Commit

Permalink
send json headers if enabled (#8)
Browse files Browse the repository at this point in the history
* send json headers if enabled

add config item to enable sending json metadata in a header packet.
client modified to handle headers and binary chunks properly.

* update readme

* readme headline cleanup

* better description

* add image

---------

Co-authored-by: Chris Kennedy <[email protected]>
  • Loading branch information
groovybits and Chris Kennedy authored Dec 9, 2023
1 parent f6569ae commit d1e5a63
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 14 deletions.
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ RUST_LOG="info" # debug, info, error

#DEBUG=true
#SILENT=true
#SEND_JSON_HEADER=true

USE_WIRELESS=true # Allow wireless interface usage

Expand Down
87 changes: 84 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
# rsCap: Rust based capture of mpegts using pcap
## Packet Capture MpegTS/SMPTE2110 for ZeroMQ Distributed Processing

Proxy an MpegTS network stream over ZeroMQ. Capture
Distribute an MpegTS network stream over ZeroMQ. Capture
the TS using pcap with filter rules for specifying
which stream. Validate the stream for conformance
keeping the ZeroMQ output clean without any non-legal
TS packets.
TS packets. Store metadata extracted in zeromq json headers.
Share out multicast to many clients for distributed stream processing.

![rscap](https://storage.googleapis.com/gaib/2/rscap/rscap.png)

## This consists of two programs, a probe and a client.

Expand All @@ -15,5 +18,83 @@ batches of the MpegTS 188 byte packets to a ZeroMQ output.
a file containing the MpegTS stream matching the one
captured by the probe.

## Configuration with environment variables using [.env](.env.example)

```text
## rsCap Configuration
RUST_LOG="info" # debug, info, error
DEBUG=true
#SILENT=true
SEND_JSON_HEADER=true # Send metadata in a json header
USE_WIRELESS=true # Allow wireless interface usage
# ZeroMQ output host and port to TCP Publish
TARGET_IP="127.0.0.1"
TARGET_PORT=5556
# Pcap device to listen to, empty for autodetection
SOURCE_DEVICE=""
# Pcap filter for MpegTS multicast host and port
SOURCE_IP="224.0.0.200"
SOURCE_PORT=10000
# Output file name for client capture from ZeroMQ output
OUTPUT_FILE=capture.ts
```

## Building and executing

Install Rust via Homebrew on MacOS or from Rust main website (preferred)...

<https://www.rust-lang.org/tools/install>

```text
brew install rust
# better...
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
```

Build and run the pcap stream probe...

```text
cargo build
sudo target/debug/probe
```

Build and run the zmq capture client...

```text
target/debug/client
```

Check the output file capture.ts (or what you set in .env or environment variables)

```text
ffmpeg -i capture.ts
```

## TODO - roadmap plans

- Thread the pcap capture and queue the packets, also thread zeromq writes to read from the shared queue.
- Add more information header to the json metadata like system stats, network stats, mediainfo, captions, ancillary data.
- Have multiple client modes to distribute processing of the stream on the zmq endpoints.
- Use [OpenCV img_hash fingerprinting](https://docs.opencv.org/3.4/d4/d93/group__img__hash.html#ga5eeee1e27bc45caffe3b529ab42568e3) to perceptually align and compare video streams frames.
- OpenAI Whisper speech to text for caption verfication and insertion. <https://github.com/openai/whisper>
- SEI metadata decoding various aspects of MpegTS.
- SMPTE 2110 handling analogous to the MpegTS support.
- PAT/PMT parsing, PES parsing and analysis of streams.
- Problem discovery and reporting via LLM/VectorDB analysis detection of anomalies in data.
- Fine tune LLM model for finding stream issues beyond basic commonly used ones.
- Multiple streams?
- Logging to file/sqliteDB with stats for simple basic graphing using gnuplot.
- Segmentation of captured MpegTS, VOD file writer by various specs.
- Compression for proxy capture.
- FFmpeg libzmq protocol compatibility to allow branching off into libav easily.
- Wrap [ltntstools](https://github.com/LTNGlobal-opensource/libltntstools) lib functionality into Rust through C bindings (If possible).
- Queue and Broker distribution robustness to allow large video streams capture without loss.
- General network analyzer view of network around the streams we know/care about.

### Chris Kennedy (C) 2023 LGPL

12 changes: 11 additions & 1 deletion src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ async fn main() {
let debug_on: bool = env::var("DEBUG").unwrap_or("false".to_string()).parse().expect(&format!("Invalid format for DEBUG"));
let silent: bool = env::var("SILENT").unwrap_or("false".to_string()).parse().expect(&format!("Invalid format for SILENT"));
let output_file: &str = &env::var("OUTPUT_FILE").unwrap_or("output.ts".to_string());
let send_json_header: bool = env::var("SEND_JSON_HEADER").unwrap_or("false".to_string()).parse().expect(&format!("Invalid format for SEND_JSON_HEADER"));

info!("Starting rscap client");

Expand Down Expand Up @@ -58,11 +59,20 @@ async fn main() {

let mut total_bytes = 0;
let mut count = 0;
let mut mpeg_packets = 0;
while let Ok(msg) = zmq_sub.recv_bytes(0) {
// Check for JSON header if enabled, it will alternate as the first message before each MPEG-TS chunk
if send_json_header && count % 2 == 0 {
count += 1;
let json_header = String::from_utf8(msg.clone()).unwrap();
info!("#{} Received JSON header: {}", mpeg_packets + 1, json_header);
continue;
}
total_bytes += msg.len();
count += 1;
mpeg_packets += 1;
if debug_on {
debug!("#{} Received {}/{} bytes", count, msg.len(), total_bytes);
debug!("#{} Received {}/{} bytes", mpeg_packets, msg.len(), total_bytes);
} else if !silent {
print!(".");
std::io::stdout().flush().unwrap();
Expand Down
28 changes: 18 additions & 10 deletions src/bin/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

extern crate zmq;
use pcap::{Capture};
//use serde_json::json;
use serde_json::json;
use log::{error, debug, info};
use tokio;
use std::net::{Ipv4Addr, UdpSocket};
Expand Down Expand Up @@ -43,6 +43,8 @@ async fn main() {

let use_wireless: bool = env::var("USE_WIRELESS").unwrap_or("false".to_string()).parse().expect(&format!("Invalid format for USE_WIRELESS"));

let send_json_header: bool = env::var("SEND_JSON_HEADER").unwrap_or("false".to_string()).parse().expect(&format!("Invalid format for SEND_JSON_HEADER"));

// Initialize logging
// env_logger::init(); // FIXME - this doesn't work with log::LevelFilter
let mut log_level: log::LevelFilter = log::LevelFilter::Info;
Expand Down Expand Up @@ -174,15 +176,21 @@ async fn main() {
if batch.len() >= BATCH_SIZE {
let batched_data = batch.concat();

// Construct JSON header for batched data
/*let json_header = json!({
"type": "mpegts_chunk",
"content_length": batched_data.len(),
});
// Send JSON header as multipart message
publisher.send(json_header.to_string().as_bytes(), zmq::SNDMORE).unwrap();
*/
if send_json_header {
// Construct JSON header for batched data
let json_header = json!({
"type": "mpegts_chunk",
"content_length": batched_data.len(),
"total_bytes": total_bytes,
"count": count,
"source_ip": source_ip,
"source_port": source_port,
"source_device": source_device
});

// Send JSON header as multipart message
publisher.send(json_header.to_string().as_bytes(), zmq::SNDMORE).unwrap();
}

// Send chunk
let chunk_size = batched_data.len();
Expand Down

0 comments on commit d1e5a63

Please sign in to comment.