From ac2b8af0a48f39aae72df57cf40330db795b667b Mon Sep 17 00:00:00 2001 From: Sterling Deng Date: Fri, 27 Sep 2024 13:56:56 -0700 Subject: [PATCH] manual testing --- quinn/examples/ack_timestamp_test.rs | 83 +++++++++++++++++ quinn/examples/common/custom_congestion.rs | 102 +++++++++++++++++++++ quinn/examples/common/mod.rs | 13 ++- trafficshape.sh | 41 +++++++++ 4 files changed, 238 insertions(+), 1 deletion(-) create mode 100644 quinn/examples/ack_timestamp_test.rs create mode 100644 quinn/examples/common/custom_congestion.rs create mode 100755 trafficshape.sh diff --git a/quinn/examples/ack_timestamp_test.rs b/quinn/examples/ack_timestamp_test.rs new file mode 100644 index 0000000000..f2baf97206 --- /dev/null +++ b/quinn/examples/ack_timestamp_test.rs @@ -0,0 +1,83 @@ +//! This example intends to use the smallest amount of code to make a simple QUIC connection. +//! +//! Checkout the `README.md` for guidance. + +use std::error::Error; +use std::time::{Duration, Instant}; + +mod common; +use common::{make_client_endpoint, make_server_endpoint}; + +use bytes::BufMut; + +use tracing::{self, info, trace, trace_span}; +use tracing_subscriber::{fmt, prelude::*, EnvFilter}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let test_length = Duration::from_secs(15); + // This should match approximately what the interpacket delay is. + let send_interval = Duration::from_millis(250); + + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); + + let server_addr = "127.0.0.1:20001".parse().unwrap(); + let (endpoint, server_cert) = make_server_endpoint(server_addr)?; + let endpoint2 = endpoint.clone(); + let handle = tokio::spawn(async move { + let span = trace_span!("SERVER"); + let _guard = span.enter(); + let incoming_conn = endpoint2.accept().await.unwrap(); + let conn = incoming_conn.await.unwrap(); + trace!("connection accepted: addr={}", conn.remote_address()); + loop { + match conn.read_datagram().await { + Ok(v) => { + let _ = String::from_utf8(v.to_vec()).unwrap(); + } + Err(e) => match e { + proto::ConnectionError::ConnectionClosed(_) + | proto::ConnectionError::ApplicationClosed(_) => { + return; + } + _ => { + println!("connection error:{}", e); + return; + } + }, + } + } + }); + + let span = trace_span!("CLIENT"); + let _guard = span.enter(); + let client_endpoint = make_client_endpoint("0.0.0.0:20002".parse().unwrap(), &[&server_cert])?; + // connect to server + let connection = client_endpoint + .connect(server_addr, "localhost") + .unwrap() + .await + .unwrap(); + trace!("connected: addr={}", connection.remote_address()); + + let end = Instant::now().checked_add(test_length).unwrap(); + + let mut buf = bytes::BytesMut::new(); + buf.put(&b"foobarbaz"[..]); + let buf = buf.freeze(); + while Instant::now() < end { + connection.send_datagram(buf.clone()).unwrap(); + tokio::time::sleep(send_interval).await; + } + + drop(connection); + drop(_guard); + + handle.await.unwrap(); + + info!("test exiting.."); + Ok(()) +} diff --git a/quinn/examples/common/custom_congestion.rs b/quinn/examples/common/custom_congestion.rs new file mode 100644 index 0000000000..45781dfbba --- /dev/null +++ b/quinn/examples/common/custom_congestion.rs @@ -0,0 +1,102 @@ +use proto::congestion::{Controller, ControllerFactory, Cubic, CubicConfig}; +use std::sync::Arc; +use std::time::Instant; + +use tracing::{info, info_span}; + +pub struct TestCubicWrapperFactory {} + +impl ControllerFactory for TestCubicWrapperFactory { + fn build(self: Arc, now: Instant, current_mtu: u16) -> Box { + let cc = Arc::new(CubicConfig::default()); + let controller = TestCubicWrapper { + last_packet: None, + controller: cc.build(now, current_mtu), + }; + Box::new(controller) + } +} + +pub struct TestCubicWrapper { + last_packet: Option, + controller: Box, +} + +#[derive(Debug, Clone)] +struct LastPacket { + number: u64, + sent: Instant, + received: Option, +} + +impl Clone for TestCubicWrapper { + fn clone(&self) -> Self { + let cloned_controller = self.controller.clone_box(); + Self { + last_packet: self.last_packet.clone(), + controller: cloned_controller, + } + } +} + +impl Controller for TestCubicWrapper { + fn on_congestion_event( + &mut self, + now: Instant, + sent: Instant, + is_persistent_congestion: bool, + lost_bytes: u64, + ) { + self.controller + .on_congestion_event(now, sent, is_persistent_congestion, lost_bytes) + } + + fn on_mtu_update(&mut self, new_mtu: u16) { + self.controller.on_mtu_update(new_mtu); + } + fn window(&self) -> u64 { + self.controller.window() + } + fn clone_box(&self) -> Box { + Box::new(self.clone()) + } + fn initial_window(&self) -> u64 { + self.controller.initial_window() + } + fn into_any(self: Box) -> Box { + Box::new(self) + } + + // Provided methods + + fn on_ack_packet( + &mut self, + pn: u64, + _now: Instant, + sent: Instant, + received: Option, + _bytes: u64, + _app_limited: bool, + _rtt: &proto::RttEstimator, + ) { + let span = info_span!("[cc] on_ack_packet", "pn" = pn); + let _guard = span.enter(); + if let Some(recv) = received { + info!("~0.5RTT={}", recv.duration_since(sent).as_millis()); + + if let Some(lp) = self.last_packet.as_ref() { + if let Some(last_recv) = lp.received { + info!( + "receiver interpacket delay = {}", + recv.duration_since(last_recv).as_millis() + ) + } + } + } + self.last_packet = Some(LastPacket { + number: pn, + sent, + received, + }) + } +} diff --git a/quinn/examples/common/mod.rs b/quinn/examples/common/mod.rs index 05e9f37b40..c63232405e 100644 --- a/quinn/examples/common/mod.rs +++ b/quinn/examples/common/mod.rs @@ -6,6 +6,8 @@ use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer}; use std::{error::Error, net::SocketAddr, sync::Arc}; +mod custom_congestion; + /// Constructs a QUIC endpoint configured for use a client only. /// /// ## Args @@ -51,7 +53,16 @@ fn configure_client( certs.add(CertificateDer::from(*cert))?; } - Ok(ClientConfig::with_root_certificates(Arc::new(certs))?) + let mut cfg = ClientConfig::with_root_certificates(Arc::new(certs))?; + let mut transport_config = proto::TransportConfig::default(); + transport_config.ack_timestamp_config(Some(proto::AckTimestampsConfig::default())); + + transport_config + .congestion_controller_factory(Arc::new(custom_congestion::TestCubicWrapperFactory {})); + + cfg.transport_config(Arc::new(transport_config)); + + Ok(cfg) } /// Returns default server configuration along with its certificate. diff --git a/trafficshape.sh b/trafficshape.sh new file mode 100755 index 0000000000..65999ee41a --- /dev/null +++ b/trafficshape.sh @@ -0,0 +1,41 @@ +#!/bin/bash + +reset() { + dnctl -q flush + pfctl -f /etc/pf.conf + pfctl -d +} + +status() { + echo + dnctl list +} + +# this causes a 500ms delay for udp packets from port 20000:20100 +delay() { + pfctl -e + dnctl pipe 1 config delay 500 + + (cat /etc/pf.conf && cat) <<__PF__ | pfctl -f - +dummynet-anchor "mop" +anchor "mop" +__PF__ + cat <<__MOP__ | pfctl -a mop -f - +dummynet in proto udp from port 20000:20100 to any pipe 1 +dummynet in proto udp from any to port 20000:20100 pipe 1 +__MOP__ + +} + +case $1 in +delay) + delay + ;; +reset) + reset + status + ;; +*) + echo $0 "delay | reset" + ;; +esac