From b7681702baa6676a009852b87611a52dc2ccee04 Mon Sep 17 00:00:00 2001 From: Chris Kennedy Date: Fri, 8 Dec 2023 23:54:48 -0800 Subject: [PATCH] logging variable setup (#4) * logging variable setup * add dotenv --------- Co-authored-by: Chris Kennedy --- Cargo.toml | 2 +- src/bin/probe.rs | 79 ++++++++++++++++++++++++++++++------------------ 2 files changed, 51 insertions(+), 30 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1d977e89..bdda0cb0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,4 +13,4 @@ zmq = "0.10.0" log = "0.4" env_logger = "0.9" tokio = { version = "1", features = ["full"] } - +dotenv = "0.15" diff --git a/src/bin/probe.rs b/src/bin/probe.rs index cf200719..6dd713de 100644 --- a/src/bin/probe.rs +++ b/src/bin/probe.rs @@ -10,10 +10,11 @@ extern crate zmq; use pcap::{Capture}; //use serde_json::json; -//use log::{error, debug, info}; +use log::{error, debug, info}; use tokio; use std::net::{Ipv4Addr, UdpSocket}; use std::env; +use std::io::Write; // Able to keep up with 1080i50 422 10-bit 30 Mbps MPEG-TS stream (not long-term tested) const BATCH_SIZE: usize = 1000; // N MPEG-TS packets per batch @@ -24,19 +25,30 @@ const READ_SIZE: i32 = (PACKET_SIZE as i32 * BATCH_SIZE as i32) + PAYLOAD_OFFSET #[tokio::main] async fn main() { - // Get environment variables or use default values - let target_port: i32 = env::var("TARGET_PORT").unwrap_or("5556".to_string()).parse().expect("Invalid format for TARGET_PORT"); + info!("Starting rscap probe"); + dotenv::dotenv().ok(); // read .env file + + // Get environment variables or use default values, set in .env file + let target_port: i32 = env::var("TARGET_PORT").unwrap_or("5556".to_string()).parse().expect(&format!("Invalid format for TARGET_PORT")); let target_ip: &str = &env::var("TARGET_IP").unwrap_or("127.0.0.1".to_string()); let source_device: &str = &env::var("SOURCE_DEVICE").unwrap_or("en0".to_string()); let source_device_ip: &str = &env::var("SOURCE_DEVICE_IP").unwrap_or("0.0.0.0".to_string()); - let source_port: i32 = env::var("SOURCE_PORT").unwrap_or("10000".to_string()).parse().expect("Invalid format for SOURCE_PORT"); + let source_port: i32 = env::var("SOURCE_PORT").unwrap_or("10000".to_string()).parse().expect(&format!("Invalid format for SOURCE_PORT")); let source_ip: &str = &env::var("SOURCE_IP").unwrap_or("224.0.0.200".to_string()); + let debug: bool = env::var("DEBUG").unwrap_or("false".to_string()).parse().expect(&format!("Invalid format for DEBUG")); + let silent: bool = env::var("SILENT").unwrap_or("false".to_string()).parse().expect(&format!("Invalid format for SILENT")); + // Initialize logging + // env_logger::init(); // FIXME - this doesn't work with log::LevelFilter + let mut log_level: log::LevelFilter = log::LevelFilter::Info; + if !silent { + log_level = log::LevelFilter::Debug; + } env_logger::Builder::new() - .filter_level(log::LevelFilter::Debug) + .filter_level(log_level) .init(); let context = zmq::Context::new(); @@ -46,18 +58,18 @@ async fn main() { // device ip address let mut interface_addr = source_device_ip.parse::() - .expect("Invalid IP address format in source_device_ip"); + .expect(&format!("Invalid IP address format in source_device_ip {}", source_device_ip)); // Get the selected device's details let devices = pcap::Device::list().unwrap(); let target_device = devices.into_iter().find(|d| d.name == source_device) - .expect("Target device not found"); + .expect(&format!("Target device not found {}", source_device)); - println!("{:?}", target_device); + info!("Target Device: {:?}", target_device); let mut target_device_found = false; for addr in target_device.addresses.iter() { if let std::net::IpAddr::V4(ipv4_addr) = addr.addr { - println!("ipv4_addr: {:?}", ipv4_addr); + info!("Found ipv4_addr: {:?}", ipv4_addr); interface_addr = ipv4_addr; target_device_found = true; break; @@ -66,9 +78,9 @@ async fn main() { // If the device is not found, search for it if !target_device_found { - println!("Target device {} not found, searching...", source_device); + info!("Target device {} not found, searching...", source_device); for device in pcap::Device::list().unwrap() { - println!("{:?}", device); + debug!("Device {:?}", device); for addr in device.addresses.iter() { if let std::net::IpAddr::V4(ipv4_addr) = addr.addr { @@ -76,31 +88,29 @@ async fn main() { if ipv4_addr.is_loopback() { continue; } - println!("ipv4_addr: {:?}", ipv4_addr); target_device_found = true; - println!("Found IPv4 target device {} with ip {}", source_device, ipv4_addr); + info!("Found IPv4 target device {} with ip {}", source_device, ipv4_addr); interface_addr = ipv4_addr; break; } } if target_device_found { break; - } - + } } } if !target_device_found { - println!("Target device {} not found", source_device); + error!("Target device {} not found", source_device); return; } let multicast_addr = source_ip.parse::() - .expect("Invalid IP address format in source_ip"); + .expect(&format!("Invalid IP address format in source_ip {}", source_ip)); let socket = UdpSocket::bind("0.0.0.0:0").expect("Failed to bind socket"); socket.join_multicast_v4(&multicast_addr, &interface_addr) - .expect("Failed to join multicast group"); + .expect(&format!("Failed to join multicast group on interface {}", source_device)); // Setup packet capture let mut cap = Capture::from_device(target_device).unwrap() @@ -115,14 +125,19 @@ async fn main() { let mut total_bytes = 0; let mut count = 0; let mut batch = Vec::new(); - while let Ok(packet) = cap.next_packet() { - println!("received packet! {:?}", packet.header); + while let Ok(packet) = cap.next_packet() { + if debug{ + debug!("Received packet! {:?}", packet.header); + } let chunks = process_packet(&packet); for chunk in chunks { + if debug { + hexdump(&chunk); + } + + // Check if chunk is MPEG-TS or SMPTE 2110 if is_mpegts_or_smpte2110(&chunk) { - //hexdump(&chunk); - //println!("--------------------------------------------------"); batch.push(chunk); if batch.len() >= BATCH_SIZE { @@ -144,18 +159,25 @@ async fn main() { count += 1; publisher.send(batched_data, 0).unwrap(); - println!("#{} Sent chunk of {}/{} bytes", count, chunk_size, total_bytes); + if !debug { + print!("."); + // flush stdout + std::io::stdout().flush().unwrap(); + } else { + debug!("#{} Sent chunk of {}/{} bytes", count, chunk_size, total_bytes); + } batch.clear(); } } else { hexdump(&chunk); - println!("Not MPEG-TS or SMPTE 2110"); - println!("--------------------------------------------------"); + error!("Not MPEG-TS or SMPTE 2110"); } } } + + info!("Exiting rscap probe"); } fn is_mpegts_or_smpte2110(packet: &[u8]) -> bool { @@ -167,9 +189,6 @@ fn is_mpegts_or_smpte2110(packet: &[u8]) -> bool { fn process_packet(packet: &[u8]) -> Vec> { // Strip off network headers to get to the MPEG-TS payload - // The exact amount to strip depends on network configuration - // Default is Ethernet (14 bytes) + IP (20 bytes) + UDP (8 bytes) - let mut mpeg_ts_packets = Vec::new(); let mut start = PAYLOAD_OFFSET; @@ -186,12 +205,14 @@ fn process_packet(packet: &[u8]) -> Vec> { fn hexdump(packet: &[u8]) { // print in rows of 16 bytes + println!("Packet length: {}", packet.len()); for (i, chunk) in packet.iter().take(188).enumerate() { if i % 16 == 0 { print!("\n{:04x}: ", i); } print!("{:02x} ", chunk); } - println!(); + println!(""); + println!("--------------------------------------------------"); }