Skip to content

Commit

Permalink
[catpowder] Feature: Add port-based filtering to catpowder backend
Browse files Browse the repository at this point in the history
  • Loading branch information
kyleholohan committed Nov 21, 2024
1 parent 6c8a75a commit ba62956
Show file tree
Hide file tree
Showing 13 changed files with 454 additions and 90 deletions.
8 changes: 7 additions & 1 deletion src/rust/catpowder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@
mod win;

#[cfg(target_os = "windows")]
pub use win::runtime::SharedCatpowderRuntime;
pub use win::{
runtime::SharedCatpowderRuntime,
transport::SharedCatpowderTransport,
};

#[cfg(target_os = "linux")]
mod linux;

#[cfg(target_os = "linux")]
pub use linux::LinuxRuntime as SharedCatpowderRuntime;

#[cfg(target_os = "linux")]
pub use crate::inetstack::SharedInetStack as SharedCatpowderTransport;
1 change: 1 addition & 0 deletions src/rust/catpowder/win/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ mod socket;
//======================================================================================================================

pub mod runtime;
pub mod transport;
9 changes: 9 additions & 0 deletions src/rust/catpowder/win/ring/rule/program.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ impl XdpProgram {
let rule_count: u32 = rules.len() as u32;
let mut handle: HANDLE = HANDLE::default();

trace!(
"new(): rules={:?}, ifindex={}, hookid={:?}, queueid={}, flags={}",
rules,
ifindex,
hookid,
queueid,
flags
);

// Attempt to create the XDP program.
if let Some(create_program) = api.get().XdpCreateProgram {
let result: HRESULT = unsafe {
Expand Down
66 changes: 64 additions & 2 deletions src/rust/catpowder/win/ring/rule/rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@

use crate::{
catpowder::win::{ring::rule::params::XdpRedirectParams, socket::XdpSocket},
inetstack::protocols::Protocol,
runtime::libxdp,
};
use ::std::mem;
use ::std::{
fmt::{self, Formatter},
mem,
};

//======================================================================================================================
// Structures
Expand All @@ -24,7 +28,8 @@ pub struct XdpRule(libxdp::XDP_RULE);
//======================================================================================================================

impl XdpRule {
/// Creates a new XDP rule for the target socket.
/// Creates a new XDP rule for the target socket which filters all traffic.
#[allow(dead_code)]
pub fn new(socket: &XdpSocket) -> Self {
let redirect: XdpRedirectParams = XdpRedirectParams::new(socket);
let rule: libxdp::XDP_RULE = unsafe {
Expand All @@ -39,4 +44,61 @@ impl XdpRule {
};
Self(rule)
}

/// Creates a new XDP rule for the target socket which filters for a specific (protocol, port) combination.
pub fn new_for_dest(socket: &XdpSocket, protocol: Protocol, port: u16) -> Self {
let redirect: XdpRedirectParams = XdpRedirectParams::new(socket);
let rule: libxdp::XDP_RULE = unsafe {
let mut rule: libxdp::XDP_RULE = std::mem::zeroed();
rule.Match = match protocol {
Protocol::Udp => libxdp::_XDP_MATCH_TYPE_XDP_MATCH_UDP_DST,
Protocol::Tcp => libxdp::_XDP_MATCH_TYPE_XDP_MATCH_TCP_DST,
};
rule.Action = libxdp::_XDP_RULE_ACTION_XDP_PROGRAM_ACTION_REDIRECT;
*rule.Pattern.Port.as_mut() = port;
// Perform bitwise copy from redirect to rule, as this is a union field.
*rule.__bindgen_anon_1.Redirect.as_mut() = mem::transmute_copy(redirect.as_ref());

rule
};
Self(rule)
}
}

//======================================================================================================================
// Trait Implementations
//======================================================================================================================

impl fmt::Debug for XdpRule {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let match_str: &str = match self.0.Match {
libxdp::_XDP_MATCH_TYPE_XDP_MATCH_UDP_DST => "XDP_MATCH_UDP_DST",
libxdp::_XDP_MATCH_TYPE_XDP_MATCH_TCP_DST => "XDP_MATCH_TCP_DST",
libxdp::_XDP_MATCH_TYPE_XDP_MATCH_ALL => "XDP_MATCH_ALL",
_ => "UNKNOWN",
};

let pattern: String = match self.0.Match {
libxdp::_XDP_MATCH_TYPE_XDP_MATCH_UDP_DST | libxdp::_XDP_MATCH_TYPE_XDP_MATCH_TCP_DST => {
format!("{{Port={}}}", unsafe { self.0.Pattern.Port.as_ref() })
},
libxdp::_XDP_MATCH_TYPE_XDP_MATCH_ALL => format!("{{ALL}}"),
_ => format!("{{UNKNOWN}}"),
};

let action_str: &str = match self.0.Action {
libxdp::_XDP_RULE_ACTION_XDP_PROGRAM_ACTION_REDIRECT => "XDP_PROGRAM_ACTION_REDIRECT",
libxdp::_XDP_RULE_ACTION_XDP_PROGRAM_ACTION_EBPF => "XDP_PROGRAM_ACTION_EBPF",
libxdp::_XDP_RULE_ACTION_XDP_PROGRAM_ACTION_L2FWD => "XDP_PROGRAM_ACTION_L2FWD",
libxdp::_XDP_RULE_ACTION_XDP_PROGRAM_ACTION_PASS => "XDP_PROGRAM_ACTION_PASS",
libxdp::_XDP_RULE_ACTION_XDP_PROGRAM_ACTION_DROP => "XDP_PROGRAM_ACTION_DROP",
_ => "UNKNOWN",
};

f.debug_struct("XDP_RULE")
.field("Match", &match_str)
.field("Pattern", &pattern)
.field("Action", &action_str)
.finish()
}
}
82 changes: 54 additions & 28 deletions src/rust/catpowder/win/ring/rx_ring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
},
socket::XdpSocket,
},
inetstack::protocols::Protocol,
runtime::{fail::Fail, libxdp, limits},
};
use ::std::{cell::RefCell, rc::Rc};
Expand All @@ -26,16 +27,20 @@ use ::std::{cell::RefCell, rc::Rc};

/// A ring for receiving packets.
pub struct RxRing {
/// Index of the interface for the ring.
ifindex: u32,
/// Index of the queue for the ring.
queueid: u32,
/// A user memory region where receive buffers are stored.
mem: Rc<RefCell<UmemReg>>,
/// A ring for receiving packets.
rx_ring: XdpRing,
/// A ring for returning receive buffers to the kernel.
rx_fill_ring: XdpRing,
/// Underlying XDP socket.
_socket: XdpSocket, // NOTE: we keep this here to prevent the socket from being dropped.
socket: XdpSocket, // NOTE: we keep this here to prevent the socket from being dropped.
/// Underlying XDP program.
_program: XdpProgram, // NOTE: we keep this here to prevent the program from being dropped.
_program: Option<XdpProgram>, // NOTE: we keep this here to prevent the program from being dropped.
}

//======================================================================================================================
Expand All @@ -50,38 +55,42 @@ impl RxRing {
let mut socket: XdpSocket = XdpSocket::create(api)?;

// Create a UMEM region.
trace!("creating umem region");
trace!(
"creating umem region; count={}, chunk_size={}",
length,
limits::RECVBUF_SIZE_MAX as u32
);
let mem: Rc<RefCell<UmemReg>> = Rc::new(RefCell::new(UmemReg::new(length, limits::RECVBUF_SIZE_MAX as u32)));

// Register the UMEM region.
trace!("registering umem region");
socket.setsockopt(
api,
libxdp::XSK_SOCKOPT_UMEM_REG,
mem.borrow().as_ref() as *const libxdp::XSK_UMEM_REG as *const core::ffi::c_void,
mem.borrow().as_ref(),
std::mem::size_of::<libxdp::XSK_UMEM_REG>() as u32,
)?;

// Set rx ring size.
trace!("setting rx ring size");
trace!("setting rx ring size: {}", length);
socket.setsockopt(
api,
libxdp::XSK_SOCKOPT_RX_RING_SIZE,
&length as *const u32 as *const core::ffi::c_void,
&length,
std::mem::size_of::<u32>() as u32,
)?;

// Set rx fill ring size.
trace!("setting rx fill ring size");
trace!("setting rx fill ring size: {}", length);
socket.setsockopt(
api,
libxdp::XSK_SOCKOPT_RX_FILL_RING_SIZE,
&length as *const u32 as *const core::ffi::c_void,
&length,
std::mem::size_of::<u32>() as u32,
)?;

// Bind the rx queue.
trace!("binding rx queue");
trace!("binding rx queue for interface {}, queue {}", ifindex, queueid);
socket.bind(api, ifindex, queueid, libxdp::_XSK_BIND_FLAGS_XSK_BIND_FLAG_RX)?;

// Activate socket to enable packet reception.
Expand All @@ -92,12 +101,9 @@ impl RxRing {
trace!("retrieving rx ring info");
let mut ring_info: libxdp::XSK_RING_INFO_SET = unsafe { std::mem::zeroed() };
let mut option_length: u32 = std::mem::size_of::<libxdp::XSK_RING_INFO_SET>() as u32;
socket.getsockopt(
api,
libxdp::XSK_SOCKOPT_RING_INFO,
&mut ring_info as *mut libxdp::XSK_RING_INFO_SET as *mut core::ffi::c_void,
&mut option_length as *mut u32,
)?;
socket.getsockopt(api, libxdp::XSK_SOCKOPT_RING_INFO, &mut ring_info, &mut option_length)?;

trace!("rx ring info: {:?}", ring_info);

// Initialize rx and rx fill rings.
let mut rx_fill_ring: XdpRing = XdpRing::new(&ring_info.Fill);
Expand All @@ -111,25 +117,45 @@ impl RxRing {
unsafe { *b = 0 };
rx_fill_ring.producer_submit(length);

Ok(Self {
ifindex,
queueid,
mem,
rx_ring,
rx_fill_ring,
socket,
_program: None,
})
}

/// Update the RxRing to use the specified rules for filtering.
pub fn reprogram(&mut self, api: &mut XdpApi, rules: &[(Protocol, u16)]) -> Result<(), Fail> {
// Create XDP program.
trace!("creating xdp program");
trace!(
"creating xdp program for interface {}, queue {} with the rules: {:?}",
self.ifindex,
self.queueid,
rules,
);
const XDP_INSPECT_RX: libxdp::XDP_HOOK_ID = libxdp::XDP_HOOK_ID {
Layer: libxdp::_XDP_HOOK_LAYER_XDP_HOOK_L2,
Direction: libxdp::_XDP_HOOK_DATAPATH_DIRECTION_XDP_HOOK_RX,
SubLayer: libxdp::_XDP_HOOK_SUBLAYER_XDP_HOOK_INSPECT,
};
let rules: Vec<XdpRule> = vec![XdpRule::new(&socket)];
let program: XdpProgram = XdpProgram::new(api, &rules, ifindex, &XDP_INSPECT_RX, queueid, 0)?;

trace!("xdp program created");

Ok(Self {
mem,
rx_ring,
rx_fill_ring,
_socket: socket,
_program: program,
})
let mut xdp_rules: Vec<XdpRule> = Vec::with_capacity(rules.len());
for (protocol, port) in rules.iter() {
xdp_rules.push(XdpRule::new_for_dest(&self.socket, *protocol, *port));
}

let program: XdpProgram = XdpProgram::new(api, &xdp_rules, self.ifindex, &XDP_INSPECT_RX, self.queueid, 0)?;
trace!(
"xdp program created for interface {}, queue {}",
self.ifindex,
self.queueid
);

self._program = Some(program);
Ok(())
}

/// Reserves a consumer slot in the rx ring.
Expand Down
13 changes: 4 additions & 9 deletions src/rust/catpowder/win/ring/tx_ring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl TxRing {
socket.setsockopt(
api,
libxdp::XSK_SOCKOPT_UMEM_REG,
mem.borrow().as_ref() as *const libxdp::XSK_UMEM_REG as *const core::ffi::c_void,
mem.borrow().as_ref(),
std::mem::size_of::<libxdp::XSK_UMEM_REG>() as u32,
)?;

Expand All @@ -56,7 +56,7 @@ impl TxRing {
socket.setsockopt(
api,
libxdp::XSK_SOCKOPT_TX_RING_SIZE,
&length as *const u32 as *const core::ffi::c_void,
&length,
std::mem::size_of::<u32>() as u32,
)?;

Expand All @@ -65,7 +65,7 @@ impl TxRing {
socket.setsockopt(
api,
libxdp::XSK_SOCKOPT_TX_COMPLETION_RING_SIZE,
&length as *const u32 as *const core::ffi::c_void,
&length,
std::mem::size_of::<u32>() as u32,
)?;

Expand All @@ -81,12 +81,7 @@ impl TxRing {
trace!("retrieving tx ring info");
let mut ring_info: libxdp::XSK_RING_INFO_SET = unsafe { std::mem::zeroed() };
let mut option_length: u32 = std::mem::size_of::<libxdp::XSK_RING_INFO_SET>() as u32;
socket.getsockopt(
api,
libxdp::XSK_SOCKOPT_RING_INFO,
&mut ring_info as *mut libxdp::XSK_RING_INFO_SET as *mut core::ffi::c_void,
&mut option_length as *mut u32,
)?;
socket.getsockopt(api, libxdp::XSK_SOCKOPT_RING_INFO, &mut ring_info, &mut option_length)?;

// Initialize tx and tx completion rings.
let tx_ring: XdpRing = XdpRing::new(&ring_info.Tx);
Expand Down
10 changes: 10 additions & 0 deletions src/rust/catpowder/win/ring/umemreg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,13 @@ impl UmemReg {
self.umem.Address
}
}

//======================================================================================================================
// Trait Implementations
//======================================================================================================================

impl std::fmt::Debug for UmemReg {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.umem.fmt(f)
}
}
Loading

0 comments on commit ba62956

Please sign in to comment.