Skip to content

Commit

Permalink
logging variable setup (#4)
Browse files Browse the repository at this point in the history
* logging variable setup
* add dotenv

---------

Co-authored-by: Chris Kennedy <[email protected]>
  • Loading branch information
groovybits and Chris Kennedy authored Dec 9, 2023
1 parent 1b3d1dd commit b768170
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 30 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ zmq = "0.10.0"
log = "0.4"
env_logger = "0.9"
tokio = { version = "1", features = ["full"] }

dotenv = "0.15"
79 changes: 50 additions & 29 deletions src/bin/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
extern crate zmq;
use pcap::{Capture};
//use serde_json::json;
//use log::{error, debug, info};
use log::{error, debug, info};
use tokio;
use std::net::{Ipv4Addr, UdpSocket};
use std::env;
use std::io::Write;

// Able to keep up with 1080i50 422 10-bit 30 Mbps MPEG-TS stream (not long-term tested)
const BATCH_SIZE: usize = 1000; // N MPEG-TS packets per batch
Expand All @@ -24,19 +25,30 @@ const READ_SIZE: i32 = (PACKET_SIZE as i32 * BATCH_SIZE as i32) + PAYLOAD_OFFSET

#[tokio::main]
async fn main() {
// Get environment variables or use default values
let target_port: i32 = env::var("TARGET_PORT").unwrap_or("5556".to_string()).parse().expect("Invalid format for TARGET_PORT");
info!("Starting rscap probe");
dotenv::dotenv().ok(); // read .env file

// Get environment variables or use default values, set in .env file
let target_port: i32 = env::var("TARGET_PORT").unwrap_or("5556".to_string()).parse().expect(&format!("Invalid format for TARGET_PORT"));
let target_ip: &str = &env::var("TARGET_IP").unwrap_or("127.0.0.1".to_string());

let source_device: &str = &env::var("SOURCE_DEVICE").unwrap_or("en0".to_string());
let source_device_ip: &str = &env::var("SOURCE_DEVICE_IP").unwrap_or("0.0.0.0".to_string());

let source_port: i32 = env::var("SOURCE_PORT").unwrap_or("10000".to_string()).parse().expect("Invalid format for SOURCE_PORT");
let source_port: i32 = env::var("SOURCE_PORT").unwrap_or("10000".to_string()).parse().expect(&format!("Invalid format for SOURCE_PORT"));
let source_ip: &str = &env::var("SOURCE_IP").unwrap_or("224.0.0.200".to_string());

let debug: bool = env::var("DEBUG").unwrap_or("false".to_string()).parse().expect(&format!("Invalid format for DEBUG"));
let silent: bool = env::var("SILENT").unwrap_or("false".to_string()).parse().expect(&format!("Invalid format for SILENT"));

// Initialize logging
// env_logger::init(); // FIXME - this doesn't work with log::LevelFilter
let mut log_level: log::LevelFilter = log::LevelFilter::Info;
if !silent {
log_level = log::LevelFilter::Debug;
}
env_logger::Builder::new()
.filter_level(log::LevelFilter::Debug)
.filter_level(log_level)
.init();

let context = zmq::Context::new();
Expand All @@ -46,18 +58,18 @@ async fn main() {

// device ip address
let mut interface_addr = source_device_ip.parse::<Ipv4Addr>()
.expect("Invalid IP address format in source_device_ip");
.expect(&format!("Invalid IP address format in source_device_ip {}", source_device_ip));

// Get the selected device's details
let devices = pcap::Device::list().unwrap();
let target_device = devices.into_iter().find(|d| d.name == source_device)
.expect("Target device not found");
.expect(&format!("Target device not found {}", source_device));

println!("{:?}", target_device);
info!("Target Device: {:?}", target_device);
let mut target_device_found = false;
for addr in target_device.addresses.iter() {
if let std::net::IpAddr::V4(ipv4_addr) = addr.addr {
println!("ipv4_addr: {:?}", ipv4_addr);
info!("Found ipv4_addr: {:?}", ipv4_addr);
interface_addr = ipv4_addr;
target_device_found = true;
break;
Expand All @@ -66,41 +78,39 @@ async fn main() {

// If the device is not found, search for it
if !target_device_found {
println!("Target device {} not found, searching...", source_device);
info!("Target device {} not found, searching...", source_device);
for device in pcap::Device::list().unwrap() {
println!("{:?}", device);
debug!("Device {:?}", device);

for addr in device.addresses.iter() {
if let std::net::IpAddr::V4(ipv4_addr) = addr.addr {
// check if localhost
if ipv4_addr.is_loopback() {
continue;
}
println!("ipv4_addr: {:?}", ipv4_addr);
target_device_found = true;

println!("Found IPv4 target device {} with ip {}", source_device, ipv4_addr);
info!("Found IPv4 target device {} with ip {}", source_device, ipv4_addr);
interface_addr = ipv4_addr;
break;
}
}
if target_device_found {
break;
}

}
}
}
if !target_device_found {
println!("Target device {} not found", source_device);
error!("Target device {} not found", source_device);
return;
}

let multicast_addr = source_ip.parse::<Ipv4Addr>()
.expect("Invalid IP address format in source_ip");
.expect(&format!("Invalid IP address format in source_ip {}", source_ip));

let socket = UdpSocket::bind("0.0.0.0:0").expect("Failed to bind socket");
socket.join_multicast_v4(&multicast_addr, &interface_addr)
.expect("Failed to join multicast group");
.expect(&format!("Failed to join multicast group on interface {}", source_device));

// Setup packet capture
let mut cap = Capture::from_device(target_device).unwrap()
Expand All @@ -115,14 +125,19 @@ async fn main() {
let mut total_bytes = 0;
let mut count = 0;
let mut batch = Vec::new();
while let Ok(packet) = cap.next_packet() {
println!("received packet! {:?}", packet.header);
while let Ok(packet) = cap.next_packet() {
if debug{
debug!("Received packet! {:?}", packet.header);
}
let chunks = process_packet(&packet);

for chunk in chunks {
if debug {
hexdump(&chunk);
}

// Check if chunk is MPEG-TS or SMPTE 2110
if is_mpegts_or_smpte2110(&chunk) {
//hexdump(&chunk);
//println!("--------------------------------------------------");
batch.push(chunk);

if batch.len() >= BATCH_SIZE {
Expand All @@ -144,18 +159,25 @@ async fn main() {
count += 1;
publisher.send(batched_data, 0).unwrap();

println!("#{} Sent chunk of {}/{} bytes", count, chunk_size, total_bytes);
if !debug {
print!(".");
// flush stdout
std::io::stdout().flush().unwrap();
} else {
debug!("#{} Sent chunk of {}/{} bytes", count, chunk_size, total_bytes);
}

batch.clear();
}
} else {
hexdump(&chunk);
println!("Not MPEG-TS or SMPTE 2110");
println!("--------------------------------------------------");
error!("Not MPEG-TS or SMPTE 2110");
}
}

}

info!("Exiting rscap probe");
}

fn is_mpegts_or_smpte2110(packet: &[u8]) -> bool {
Expand All @@ -167,9 +189,6 @@ fn is_mpegts_or_smpte2110(packet: &[u8]) -> bool {

fn process_packet(packet: &[u8]) -> Vec<Vec<u8>> {
// Strip off network headers to get to the MPEG-TS payload
// The exact amount to strip depends on network configuration
// Default is Ethernet (14 bytes) + IP (20 bytes) + UDP (8 bytes)

let mut mpeg_ts_packets = Vec::new();
let mut start = PAYLOAD_OFFSET;

Expand All @@ -186,12 +205,14 @@ fn process_packet(packet: &[u8]) -> Vec<Vec<u8>> {

fn hexdump(packet: &[u8]) {
// print in rows of 16 bytes
println!("Packet length: {}", packet.len());
for (i, chunk) in packet.iter().take(188).enumerate() {
if i % 16 == 0 {
print!("\n{:04x}: ", i);
}
print!("{:02x} ", chunk);
}
println!();
println!("");
println!("--------------------------------------------------");
}

0 comments on commit b768170

Please sign in to comment.