Skip to content

Commit

Permalink
[catpowder] Feature: Add cohosting mode to XDP backend
Browse files Browse the repository at this point in the history
  • Loading branch information
kyleholohan committed Nov 26, 2024
1 parent 38b7fdf commit 627b928
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 47 deletions.
6 changes: 6 additions & 0 deletions scripts/config/azure.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ demikernel:
raw_socket:
linux_interface_name: "abcde"
xdp_interface_index: 0
xdp_cohost_mode: false
# Enable the following for XDP cohosting mode, or override in environment:
# xdp_tcp_ports: [80, 443]
# xdp_udp_ports: [53]
# Enable the following line if you have a VF interface
# xdp_vf_interface_index: 0
dpdk:
eal_init: ["-c", "0xff", "-n", "4", "-a", "WW:WW.W", "--proc-type=auto", "--vdev=net_vdev_netvsc0,iface=abcde"]
tcp_socket_options:
Expand Down
4 changes: 4 additions & 0 deletions scripts/config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ demikernel:
raw_socket:
linux_interface_name: "abcde"
xdp_interface_index: 0
xdp_cohost_mode: false
# Enable the following for XDP cohosting mode, or override in environment:
# xdp_tcp_ports: [80, 443]
# xdp_udp_ports: [53]
# Enable the following line if you have a VF interface
# xdp_vf_interface_index: 0
dpdk:
Expand Down
23 changes: 22 additions & 1 deletion src/rust/catpowder/win/ring/rule/rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

use crate::{
catpowder::win::{ring::rule::params::XdpRedirectParams, socket::XdpSocket},
inetstack::protocols::Protocol,
runtime::libxdp,
};
use ::std::mem;
Expand All @@ -24,7 +25,8 @@ pub struct XdpRule(libxdp::XDP_RULE);
//======================================================================================================================

impl XdpRule {
/// Creates a new XDP rule for the target socket.
/// Creates a new XDP rule for the target socket which filters all traffic.
#[allow(dead_code)]
pub fn new(socket: &XdpSocket) -> Self {
let redirect: XdpRedirectParams = XdpRedirectParams::new(socket);
let rule: libxdp::XDP_RULE = unsafe {
Expand All @@ -39,4 +41,23 @@ impl XdpRule {
};
Self(rule)
}

/// Creates a new XDP rule for the target socket which filters for a specific (protocol, port) combination.
pub fn new_for_dest(socket: &XdpSocket, protocol: Protocol, port: u16) -> Self {
let redirect: XdpRedirectParams = XdpRedirectParams::new(socket);
let rule: libxdp::XDP_RULE = unsafe {
let mut rule: libxdp::XDP_RULE = std::mem::zeroed();
rule.Match = match protocol {
Protocol::Udp => libxdp::_XDP_MATCH_TYPE_XDP_MATCH_UDP_DST,
Protocol::Tcp => libxdp::_XDP_MATCH_TYPE_XDP_MATCH_TCP_DST,
};
rule.Action = libxdp::_XDP_RULE_ACTION_XDP_PROGRAM_ACTION_REDIRECT;
*rule.Pattern.Port.as_mut() = port;
// Perform bitwise copy from redirect to rule, as this is a union field.
*rule.__bindgen_anon_1.Redirect.as_mut() = mem::transmute_copy(redirect.as_ref());

rule
};
Self(rule)
}
}
83 changes: 65 additions & 18 deletions src/rust/catpowder/win/ring/rx_ring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
},
socket::XdpSocket,
},
inetstack::protocols::Protocol,
runtime::{fail::Fail, libxdp, limits},
};
use ::std::{cell::RefCell, rc::Rc};
Expand All @@ -26,16 +27,20 @@ use ::std::{cell::RefCell, rc::Rc};

/// A ring for receiving packets.
pub struct RxRing {
/// Index of the interface for the ring.
ifindex: u32,
/// Index of the queue for the ring.
queueid: u32,
/// A user memory region where receive buffers are stored.
mem: Rc<RefCell<UmemReg>>,
/// A ring for receiving packets.
rx_ring: XdpRing,
/// A ring for returning receive buffers to the kernel.
rx_fill_ring: XdpRing,
/// Underlying XDP socket.
_socket: XdpSocket, // NOTE: we keep this here to prevent the socket from being dropped.
socket: XdpSocket, // NOTE: we keep this here to prevent the socket from being dropped.
/// Underlying XDP program.
_program: XdpProgram, // NOTE: we keep this here to prevent the program from being dropped.
_program: Option<XdpProgram>, // NOTE: we keep this here to prevent the program from being dropped.
}

//======================================================================================================================
Expand All @@ -44,7 +49,7 @@ pub struct RxRing {

impl RxRing {
/// Creates a new ring for receiving packets.
pub fn new(api: &mut XdpApi, length: u32, ifindex: u32, queueid: u32) -> Result<Self, Fail> {
fn new(api: &mut XdpApi, length: u32, ifindex: u32, queueid: u32) -> Result<Self, Fail> {
// Create an XDP socket.
trace!("creating xdp socket");
let mut socket: XdpSocket = XdpSocket::create(api)?;
Expand All @@ -63,7 +68,7 @@ impl RxRing {
)?;

// Set rx ring size.
trace!("setting rx ring size");
trace!("setting rx ring size: {}", length);
socket.setsockopt(
api,
libxdp::XSK_SOCKOPT_RX_RING_SIZE,
Expand All @@ -72,7 +77,7 @@ impl RxRing {
)?;

// Set rx fill ring size.
trace!("setting rx fill ring size");
trace!("setting rx fill ring size: {}", length);
socket.setsockopt(
api,
libxdp::XSK_SOCKOPT_RX_FILL_RING_SIZE,
Expand All @@ -81,7 +86,7 @@ impl RxRing {
)?;

// Bind the rx queue.
trace!("binding rx queue");
trace!("binding rx queue for interface {}, queue {}", ifindex, queueid);
socket.bind(api, ifindex, queueid, libxdp::_XSK_BIND_FLAGS_XSK_BIND_FLAG_RX)?;

// Activate socket to enable packet reception.
Expand Down Expand Up @@ -111,25 +116,67 @@ impl RxRing {
unsafe { *b = 0 };
rx_fill_ring.producer_submit(length);

// Create XDP program.
trace!("creating xdp program");
Ok(Self {
ifindex,
queueid,
mem,
rx_ring,
rx_fill_ring,
socket: socket,
_program: None,
})
}

/// Create a new RxRing which redirects all traffic on the (if, queue) pair.
pub fn new_redirect_all(api: &mut XdpApi, length: u32, ifindex: u32, queueid: u32) -> Result<Self, Fail> {
let mut ring: Self = Self::new(api, length, ifindex, queueid)?;
let rules: [XdpRule; 1] = [XdpRule::new(&ring.socket)];
ring.reprogram(api, &rules)?;
Ok(ring)
}

/// Create a new RxRing which redirects only specific TCP/UDP ports on the (if, queue) pair.
pub fn new_cohost(
api: &mut XdpApi,
length: u32,
ifindex: u32,
queueid: u32,
tcp_ports: &[u16],
udp_ports: &[u16],
) -> Result<Self, Fail> {
let mut ring: Self = Self::new(api, length, ifindex, queueid)?;

let rules: Vec<XdpRule> = tcp_ports
.iter()
.map(|port: &u16| XdpRule::new_for_dest(&ring.socket, Protocol::Tcp, *port))
.chain(
udp_ports
.iter()
.map(|port: &u16| XdpRule::new_for_dest(&ring.socket, Protocol::Udp, *port)),
)
.collect::<Vec<XdpRule>>();

ring.reprogram(api, rules.as_slice())?;
Ok(ring)
}

/// Update the RxRing to use the specified rules for filtering.
fn reprogram(&mut self, api: &mut XdpApi, rules: &[XdpRule]) -> Result<(), Fail> {
const XDP_INSPECT_RX: libxdp::XDP_HOOK_ID = libxdp::XDP_HOOK_ID {
Layer: libxdp::_XDP_HOOK_LAYER_XDP_HOOK_L2,
Direction: libxdp::_XDP_HOOK_DATAPATH_DIRECTION_XDP_HOOK_RX,
SubLayer: libxdp::_XDP_HOOK_SUBLAYER_XDP_HOOK_INSPECT,
};
let rules: Vec<XdpRule> = vec![XdpRule::new(&socket)];
let program: XdpProgram = XdpProgram::new(api, &rules, ifindex, &XDP_INSPECT_RX, queueid, 0)?;

trace!("xdp program created");
let program: XdpProgram = XdpProgram::new(api, &rules, self.ifindex, &XDP_INSPECT_RX, self.queueid, 0)?;
trace!(
"xdp program created for interface {}, queue {}",
self.ifindex,
self.queueid
);

Ok(Self {
mem,
rx_ring,
rx_fill_ring,
_socket: socket,
_program: program,
})
self._program = Some(program);
Ok(())
}

/// Reserves a consumer slot in the rx ring.
Expand Down
58 changes: 39 additions & 19 deletions src/rust/catpowder/win/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,48 +61,68 @@ impl SharedCatpowderRuntime {
/// Instantiates a new XDP runtime.
pub fn new(config: &Config) -> Result<Self, Fail> {
let ifindex: u32 = config.local_interface_index()?;
let vf_if_index = config.local_vf_interface_index(); // N.B. this one is optional

trace!("Creating XDP runtime.");
let mut api: XdpApi = XdpApi::new()?;

// Open TX and RX rings
let tx: TxRing = TxRing::new(&mut api, Self::RING_LENGTH, ifindex, 0)?;

let cohost_mode = config.xdp_cohost_mode()?;
let (tcp_ports, udp_ports) = if cohost_mode {
trace!("XDP cohost mode enabled.");
config.xdp_cohost_ports()?
} else {
trace!("XDP not cohosted; will redirect all traffic");
(vec![], vec![])
};

let make_ring = |api: &mut XdpApi, length: u32, ifindex: u32, queueid: u32| -> Result<RxRing, Fail> {
if cohost_mode {
RxRing::new_cohost(
api,
length,
ifindex,
queueid,
tcp_ports.as_slice(),
udp_ports.as_slice(),
)
} else {
RxRing::new_redirect_all(api, length, ifindex, queueid)
}
};

let queue_count: u32 = deduce_rss_settings(&mut api, ifindex)?;
let mut rx_rings: Vec<RxRing> = Vec::with_capacity(queue_count as usize);
for queueid in 0..queue_count {
rx_rings.push(RxRing::new(&mut api, Self::RING_LENGTH, ifindex, queueid as u32)?);
rx_rings.push(make_ring(&mut api, Self::RING_LENGTH, ifindex, queueid as u32)?);
}
trace!("Created {} RX rings on interface {}", rx_rings.len(), ifindex);

if let Ok(vf_if_index) = vf_if_index {
let vf_rx_rings: Vec<RxRing> = if let Ok(vf_if_index) = config.local_vf_interface_index() {
// Optionally create VF RX rings
let vf_queue_count = deduce_rss_settings(&mut api, vf_if_index)?;
let mut vf_rx_rings = Vec::with_capacity(vf_queue_count as usize);
let vf_queue_count: u32 = deduce_rss_settings(&mut api, vf_if_index)?;
let mut vf_rx_rings: Vec<RxRing> = Vec::with_capacity(vf_queue_count as usize);
for queueid in 0..vf_queue_count {
vf_rx_rings.push(RxRing::new(&mut api, Self::RING_LENGTH, vf_if_index, queueid as u32)?);
vf_rx_rings.push(make_ring(&mut api, Self::RING_LENGTH, vf_if_index, queueid as u32)?);
}
trace!(
"Created {} RX rings on VF interface {}.",
vf_rx_rings.len(),
vf_if_index
);

Ok(Self(SharedObject::new(CatpowderRuntimeInner {
api,
tx,
rx_rings,
vf_rx_rings,
})))
vf_rx_rings
} else {
Ok(Self(SharedObject::new(CatpowderRuntimeInner {
api,
tx,
rx_rings,
vf_rx_rings: Vec::new(),
})))
}
vec![]
};

Ok(Self(SharedObject::new(CatpowderRuntimeInner {
api,
tx,
rx_rings,
vf_rx_rings,
})))
}
}

Expand Down
Loading

0 comments on commit 627b928

Please sign in to comment.