Skip to content

Commit

Permalink
join igmp, detect interface and ip to bind udp
Browse files Browse the repository at this point in the history
working auto-detect, may need some adjustments if not set.
env variables set defaults for interface/ip values.
avoid ipv6 addresses for binding udp.
  • Loading branch information
Chris Kennedy committed Dec 9, 2023
1 parent 9462a42 commit a32663f
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 80 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
pcap = "0.9.0"
pcap = "1.1.0"
serde = "1.0"
serde_json = "1.0"
zmq = "0.10.0"
Expand Down
145 changes: 66 additions & 79 deletions src/bin/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,99 +8,115 @@
*/

extern crate zmq;
use pcap::{Capture, Device};
use pcap::{Capture};
//use serde_json::json;
//use log::{error, debug, info};
use tokio;
use std::net::{Ipv4Addr};
use std::io;
use std::net::{Ipv4Addr, UdpSocket};
use std::env;

// Able to keep up with 1080i50 422 10-bit 30 Mbps MPEG-TS stream (not long-term tested)
const BATCH_SIZE: usize = 1000; // N MPEG-TS packets per batch
const PAYLOAD_OFFSET: usize = 14 + 20 + 8; // Ethernet (14 bytes) + IP (20 bytes) + UDP (8 bytes)
const PACKET_SIZE: usize = 188; // MPEG-TS packet size
const READ_SIZE: i32 = (PACKET_SIZE as i32 * BATCH_SIZE as i32) + PAYLOAD_OFFSET as i32; // pcap read size

// TODO: change to your target address and port
const TARGET_PORT: i32 = 5556; // TODO: change to your target port
const TARGET_IP: &str = "127.0.0.1"; // TODO: change to your target IP

// TODO: change to your source device
const SOURCE_DEVICE: &str = "en0"; // TODO: change to your network interface
#[tokio::main]
async fn main() {
// Get environment variables or use default values
let target_port: i32 = env::var("TARGET_PORT").unwrap_or("5556".to_string()).parse().expect("Invalid format for TARGET_PORT");
let target_ip: &str = &env::var("TARGET_IP").unwrap_or("127.0.0.1".to_string());

// TODO: change to your source address and port
const SOURCE_PORT: i32 = 10000; // TODO: change to your source port
const SOURCE_IP: &str = "224.0.0.200"; // TODO: change to your source IP
let source_device: &str = &env::var("SOURCE_DEVICE").unwrap_or("en0".to_string());
let source_device_ip: &str = &env::var("SOURCE_DEVICE_IP").unwrap_or("0.0.0.0".to_string());

let source_port: i32 = env::var("SOURCE_PORT").unwrap_or("10000".to_string()).parse().expect("Invalid format for SOURCE_PORT");
let source_ip: &str = &env::var("SOURCE_IP").unwrap_or("224.0.0.200".to_string());

#[tokio::main]
async fn main() {
// Initialize logging
env_logger::Builder::new()
.filter_level(log::LevelFilter::Debug)
.init();

let context = zmq::Context::new();
let publisher = context.socket(zmq::PUB).unwrap();
let source_port_ip = format!("tcp://{}:{}", TARGET_IP, TARGET_PORT);
let source_port_ip = format!("tcp://{}:{}", target_ip, target_port);
publisher.bind(&source_port_ip).unwrap();

// List available devices
let default_device = Device::lookup().unwrap();
println!("Default device: {:?}", default_device);
let target_device = SOURCE_DEVICE;
// device ip address
let mut interface_addr = source_device_ip.parse::<Ipv4Addr>()
.expect("Invalid IP address format in source_device_ip");

// Get the selected device's details
let devices = pcap::Device::list().unwrap();
let target_device = devices.into_iter().find(|d| d.name == source_device)
.expect("Target device not found");

println!("{:?}", target_device);
let mut target_device_found = false;
for device in pcap::Device::list().unwrap() {
println!("{:?}", device);
if device.name == target_device {
println!("Found target device: {}", target_device);
for addr in target_device.addresses.iter() {
if let std::net::IpAddr::V4(ipv4_addr) = addr.addr {
println!("ipv4_addr: {:?}", ipv4_addr);
interface_addr = ipv4_addr;
target_device_found = true;
break;
}
}

// If the device is not found, search for it
if !target_device_found {
println!("Target device {} not found, searching...", source_device);
for device in pcap::Device::list().unwrap() {
println!("{:?}", device);

for addr in device.addresses.iter() {
if let std::net::IpAddr::V4(ipv4_addr) = addr.addr {
// check if localhost
if ipv4_addr.is_loopback() {
continue;
}
println!("ipv4_addr: {:?}", ipv4_addr);
target_device_found = true;

println!("Found IPv4 target device {} with ip {}", source_device, ipv4_addr);
interface_addr = ipv4_addr;
break;
}
}
if target_device_found {
break;
}

}
}
// Exit if target device not found
if !target_device_found {
println!("Target device not found: {}", target_device);
println!("Target device {} not found", source_device);
return;
}

let multicast_addr = source_ip.parse::<Ipv4Addr>()
.expect("Invalid IP address format in source_ip");

let socket = UdpSocket::bind("0.0.0.0:0").expect("Failed to bind socket");
socket.join_multicast_v4(&multicast_addr, &interface_addr)
.expect("Failed to join multicast group");

// Setup packet capture
let mut cap = Capture::from_device(target_device).unwrap()
.promisc(false)
.snaplen(READ_SIZE) // Adjust this based on network configuration
.open().unwrap();

// Filter pcap
let source_host_and_port = format!("udp dst port {} and ip dst host {}", SOURCE_PORT, SOURCE_IP);
let source_host_and_port = format!("udp dst port {} and ip dst host {}", source_port, source_ip);
cap.filter(&source_host_and_port, true).unwrap();

// Parse the SOURCE_IP string into an Ipv4Addr
let group_address = match SOURCE_IP.parse::<Ipv4Addr>() {
Ok(addr) => addr,
Err(_) => {
eprintln!("Invalid IP address format in SOURCE_IP");
return;
}
};

let igmp_packet = create_igmp_join_request(group_address);
match igmp_packet {
Ok(packet) => {
// Send IGMP join request
/*let mut socket = zmq::Socket::new(&context, zmq::SocketType::REQ).unwrap();
socket.connect(&source_port_ip).unwrap();
socket.send(&packet, 0).unwrap();*/
println!("(Not Implemented, Use external program) Sent IGMP join request {}", packet.len());
}
Err(e) => {
eprintln!("Failed to create IGMP join request: {:?}", e);
return;
}
}

let mut total_bytes = 0;
let mut count = 0;
let mut batch = Vec::new();
while let Ok(packet) = cap.next() {
while let Ok(packet) = cap.next_packet() {
println!("received packet! {:?}", packet.header);
let chunks = process_packet(&packet);

for chunk in chunks {
Expand Down Expand Up @@ -179,32 +195,3 @@ fn hexdump(packet: &[u8]) {
println!();
}

fn create_igmp_join_request(group_address: Ipv4Addr) -> Result<Vec<u8>, io::Error> {
let mut packet = vec![0u8; 8]; // IGMP packet is 8 bytes

packet[0] = 0x16; // Type: Membership Report for IGMPv2
packet[1] = 0x00; // Max Response Time: 0 for Membership Report

// Insert group address (network order)
packet[4..8].copy_from_slice(&group_address.octets());

// Calculate checksum
let checksum = compute_igmp_checksum(&packet);
packet[2] = (checksum >> 8) as u8; // Checksum high byte
packet[3] = (checksum & 0xFF) as u8; // Checksum low byte

Ok(packet)
}

fn compute_igmp_checksum(packet: &[u8]) -> u16 {
let mut sum = 0u32;

for i in (0..packet.len()).step_by(2) {
sum += u32::from(u16::from(packet[i]) << 8 | u16::from(packet[i + 1]));
}
while (sum >> 16) > 0 {
sum = (sum & 0xFFFF) + (sum >> 16);
}
!sum as u16
}

0 comments on commit a32663f

Please sign in to comment.