Skip to content

Commit

Permalink
[inetstack] Enhancement: Remove some TODOs
Browse files Browse the repository at this point in the history
  • Loading branch information
iyzhang committed Nov 26, 2024
1 parent b5ad653 commit ba668d6
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 71 deletions.
20 changes: 5 additions & 15 deletions src/rust/inetstack/protocols/layer4/tcp/established/ctrlblk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ use ::std::net::SocketAddrV4;
// Structures
//======================================================================================================================

// TCP Connection State.
// Note: This ControlBlock structure is only used after we've reached the ESTABLISHED state, so states LISTEN,
// SYN_RCVD, and SYN_SENT aren't included here.
// This struct has only public members because includes state for both the send and receive path and is accessed by
// both.
/// TCP Connection State.
/// Note: This ControlBlock structure is only used after we've reached the ESTABLISHED state, so states LISTEN,
/// SYN_RCVD, and SYN_SENT aren't included here.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum State {
Established,
Expand All @@ -32,24 +30,16 @@ pub enum State {
Closed,
}

//======================================================================================================================
// Data Structures
//======================================================================================================================

/// Transmission control block for representing our TCP connection.
/// This struct has only public members because includes state for both the send and receive path and is accessed by
/// both.
pub struct ControlBlock {
pub local: SocketAddrV4,
pub remote: SocketAddrV4,

pub tcp_config: TcpConfig,
pub socket_options: TcpSocketOptions,

// TCP Connection State.
pub state: State,

// TCP send path state.
pub sender: Sender,
// TCP receive path state.
pub receiver: Receiver,

// Congestion control trait implementation we're currently using.
Expand Down
67 changes: 24 additions & 43 deletions src/rust/inetstack/protocols/layer4/tcp/established/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use ::futures::never::Never;
const MAX_OUT_OF_ORDER_SIZE_FRAMES: usize = 16;

//======================================================================================================================
// Data Structures
// Structures
//======================================================================================================================

pub struct Receiver {
Expand Down Expand Up @@ -66,24 +66,21 @@ pub struct Receiver {
// Pop queue. Contains in-order received (and acknowledged) data ready for the application to read.
pop_queue: AsyncQueue<DemiBuffer>,

// The amount of time before we will send a bare ACK.
ack_delay_timeout_secs: Duration,

// The deadline when we will send a bare ACK if there are no outgoing packets by then.
pub ack_deadline_time_secs: SharedAsyncValue<Option<Instant>>,

// This is our receive buffer size, which is also the maximum size of our receive window.
// Note: The maximum possible advertised window is 1 GiB with window scaling and 64 KiB without.
buffer_size_frames: u32,

// TODO: Review how this is used. We could have separate window scale factors, so there should be one for the
// receiver and one for the sender.
// This is the receive-side window scale factor.
// This is the number of bits to shift to convert to/from the scaled value, and has a maximum value of 14.
window_scale_shift_bits: u8,

// Queue of out-of-order segments. This is where we hold onto data that we've received (because it was within our
// receive window) but can't yet present to the user because we're missing some other data that comes between this
// and what we've already presented to the user.
//
out_of_order_frames: VecDeque<(SeqNumber, DemiBuffer)>,
}

Expand Down Expand Up @@ -112,8 +109,9 @@ impl Receiver {
}
}

// This function causes a EOF to be returned to the user. We also know that there will be no more incoming
// data after this sequence number.
fn push_fin(&mut self) {
debug!("notifying FIN");
self.pop_queue.push(DemiBuffer::new(0));
debug_assert_eq!(self.receive_next_seq_no, self.fin_seq_no.get().unwrap());
// Reset it to wake up any close coroutines waiting for FIN to arrive.
Expand Down Expand Up @@ -147,17 +145,13 @@ impl Receiver {
// be received, it receives that too.
//
// This routine also updates receive_next to reflect any data now considered "received".
//
// Returns true if a previously out-of-order segment containing a FIN has now been received.
//
fn receive_data(&mut self, seg_start: SeqNumber, buf: DemiBuffer) {
// This routine should only be called with in-order segment data.
debug_assert_eq!(seg_start, self.receive_next_seq_no);

// Push the new segment data onto the end of the receive queue.
self.receive_next_seq_no = self.receive_next_seq_no + SeqNumber::from(buf.len() as u32);
// This inserts the segment and wakes a waiting pop coroutine.
debug!("pushing buffer");
self.pop_queue.push(buf);

// Okay, we've successfully received some new data. Check if any of the formerly out-of-order data waiting in
Expand All @@ -181,7 +175,7 @@ impl Receiver {
}
}

// Return Ok after FIN arrives (plus all previous data).
// Block until the remote sends a FIN (plus all previous data has arrived).
pub async fn wait_for_fin(&mut self) -> Result<(), Fail> {
let mut fin_seq_no: Option<SeqNumber> = self.fin_seq_no.get();
loop {
Expand All @@ -194,6 +188,7 @@ impl Receiver {
}
}

// Block until some data is received, up to an optional size.
pub async fn pop(&mut self, size: Option<usize>) -> Result<DemiBuffer, Fail> {
debug!("waiting on pop {:?}", size);
let buf: DemiBuffer = if let Some(size) = size {
Expand Down Expand Up @@ -221,6 +216,7 @@ impl Receiver {
Ok(buf)
}

// Receive a single incoming packet from layer3.
pub fn receive(
cb: &mut ControlBlock,
layer3_endpoint: &mut SharedLayer3Endpoint,
Expand Down Expand Up @@ -271,7 +267,7 @@ impl Receiver {
Self::process_data(cb, layer3_endpoint, data, seg_start, seg_end, seg_len)?;
}

// Process FIN flag.
// Deal with FIN flag, saving the FIN for later if it is out of order.
if header.fin {
match cb.receiver.fin_seq_no.get() {
// We've already received this FIN, so ignore.
Expand All @@ -282,26 +278,30 @@ impl Receiver {
Some(_) => trace!("Received duplicate FIN"),
None => {
trace!("Received FIN");
cb.receiver.fin_seq_no.set(seg_end.into())
cb.receiver.fin_seq_no.set(seg_end.into());
},
}
}
// Check whether we've received the last packet.
};

// Check whether we've received the last packet in this TCP stream.
if cb
.receiver
.fin_seq_no
.get()
.is_some_and(|seq_no| seq_no == cb.receiver.receive_next_seq_no)
{
// Once we know there is no more data coming, begin closing down the connection.
Self::process_fin(cb);
}

// Send an ack on every FIN. We do this separately here because if the FIN is in order, we ack it after the
// previous line, otherwise we do not ack the FIN.
if header.fin {
// Send ack for out of order FIN.
trace!("Acking FIN");
Self::send_ack(cb, layer3_endpoint)
Sender::send_ack(cb, layer3_endpoint)
}

// We should ACK this segment, preferably via piggybacking on a response.
// TODO: Consider replacing the delayed ACK timer with a simple flag.
if cb.receiver.ack_deadline_time_secs.get().is_none() {
// Start the delayed ACK timer to ensure an ACK gets sent soon even if no piggyback opportunity occurs.
let timeout: Duration = cb.receiver.ack_delay_timeout_secs;
Expand All @@ -311,7 +311,7 @@ impl Receiver {
// We already owe our peer an ACK (the timer was already running), so cancel the timer and ACK now.
cb.receiver.ack_deadline_time_secs.set(None);
trace!("process_packet(): sending ack on deadline expiration");
Self::send_ack(cb, layer3_endpoint);
Sender::send_ack(cb, layer3_endpoint);
}

Ok(())
Expand Down Expand Up @@ -372,18 +372,16 @@ impl Receiver {
// See if it is a complete duplicate, or if some of the data is new.
if *seg_end < receive_next {
// This is an entirely duplicate (i.e. old) segment. ACK (if not RST) and drop.
//
if !header.rst {
trace!("check_segment_in_window(): send ack on duplicate segment");
Self::send_ack(cb, layer3_endpoint);
Sender::send_ack(cb, layer3_endpoint);
}
let cause: String = format!("duplicate packet");
error!("check_segment_in_window(): {}", cause);
return Err(Fail::new(libc::EBADMSG, &cause));
} else {
// Some of this segment's data is new. Cut the duplicate data off of the front.
// If there is a SYN at the start of this segment, remove it too.
//
let mut duplicate: u32 = u32::from(receive_next - *seg_start);
*seg_start = *seg_start + SeqNumber::from(duplicate);
*seg_len -= duplicate;
Expand All @@ -399,13 +397,11 @@ impl Receiver {
} else {
// This segment contains entirely new data, but is later in the sequence than what we're expecting.
// See if any part of the data fits within our receive window.
//
if *seg_start >= after_receive_window {
// This segment is completely outside of our window. ACK (if not RST) and drop.
//
if !header.rst {
trace!("check_segment_in_window(): send ack on out-of-window segment");
Self::send_ack(cb, layer3_endpoint);
Sender::send_ack(cb, layer3_endpoint);
}
let cause: String = format!("packet outside of receive window");
error!("check_segment_in_window(): {}", cause);
Expand Down Expand Up @@ -489,10 +485,6 @@ impl Receiver {
// TODO: RFC 5961 "Blind Data Injection Attack" prevention would have us perform additional ACK validation
// checks here.

// Process the ACK.
// Start by checking that the ACK acknowledges something new.
// TODO: Look into removing Watched types.
//
Sender::process_ack(cb, header, now);

Ok(())
Expand All @@ -518,7 +510,7 @@ impl Receiver {
cb.receiver.store_out_of_order_segment(seg_start, seg_end, data);
// Sending an ACK here is only a "MAY" according to the RFCs, but helpful for fast retransmit.
trace!("process_data(): send ack on out-of-order segment");
Self::send_ack(cb, layer3_endpoint);
Sender::send_ack(cb, layer3_endpoint);
},
state => warn!("Ignoring data received after FIN (in state {:?}).", state),
}
Expand Down Expand Up @@ -664,7 +656,7 @@ impl Receiver {
continue;
},
Err(Fail { errno, cause: _ }) if errno == libc::ETIMEDOUT => {
Self::send_ack(cb, layer3_endpoint);
Sender::send_ack(cb, layer3_endpoint);
deadline = ack_deadline.get();
},
Err(_) => {
Expand All @@ -675,15 +667,4 @@ impl Receiver {
}
}
}

/// Send an ACK to our peer, reflecting our current state.
pub fn send_ack(cb: &mut ControlBlock, layer3_endpoint: &mut SharedLayer3Endpoint) {
trace!("sending ack");
let mut header: TcpHeader = Sender::tcp_header(cb);

// TODO: Think about moving this to tcp_header() as well.
let seq_num: SeqNumber = cb.sender.send_next_seq_no.get();
header.seq_num = seq_num;
Sender::emit(cb, layer3_endpoint, header, None);
}
}
28 changes: 15 additions & 13 deletions src/rust/inetstack/protocols/layer4/tcp/established/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ const MIN_UNACKED_QUEUE_SIZE_FRAMES: usize = 64;
// of the unacked queue, below which memory allocation is not required.
const MIN_UNSENT_QUEUE_SIZE_FRAMES: usize = 64;

// TODO: Consider moving retransmit timer and congestion control fields out of this structure.
// TODO: Make all public fields in this structure private.
pub struct Sender {
//
// Send Sequence Space:
Expand Down Expand Up @@ -294,8 +292,7 @@ impl Sender {
}

fn send_fin(cb: &mut ControlBlock, layer3_endpoint: &mut SharedLayer3Endpoint, now: Instant) -> Result<(), Fail> {
let mut header: TcpHeader = Self::tcp_header(cb);
header.seq_num = cb.sender.send_next_seq_no.get();
let mut header: TcpHeader = Self::tcp_header(cb, None);
debug_assert!(cb.sender.fin_seq_no.is_some_and(|s| { s == header.seq_num }));
header.fin = true;
Self::emit(cb, layer3_endpoint, header, None);
Expand Down Expand Up @@ -382,8 +379,7 @@ impl Sender {
let mut win_sz_watched: SharedAsyncValue<u32> = cb.sender.send_window.clone();
loop {
// Create packet.
let mut header: TcpHeader = Self::tcp_header(cb);
header.seq_num = cb.sender.send_next_seq_no.get();
let header: TcpHeader = Self::tcp_header(cb, None);
Self::emit(cb, layer3_endpoint, header, Some(probe.clone()));

match win_sz_watched.wait_for_change(Some(timeout)).await {
Expand Down Expand Up @@ -438,8 +434,7 @@ impl Sender {
);

// Prepare the segment and send it.
let mut header: TcpHeader = Self::tcp_header(cb);
header.seq_num = cb.sender.send_next_seq_no.get();
let mut header: TcpHeader = Self::tcp_header(cb, None);
if do_push {
header.psh = true;
}
Expand All @@ -466,14 +461,16 @@ impl Sender {
}

/// Fetch a TCP header filling out various values based on our current state.
/// TODO: Fix the "filling out various values based on our current state" part to actually do that correctly.
pub fn tcp_header(cb: &mut ControlBlock) -> TcpHeader {
/// If a sequence number is provided, use it otherwise, use the current unsent sequence number.
/// The only time that the unsent sequence number is not used is when we are retransmitting.
pub fn tcp_header(cb: &mut ControlBlock, seq_num: Option<SeqNumber>) -> TcpHeader {
let mut header: TcpHeader = TcpHeader::new(cb.local.port(), cb.remote.port());
header.window_size = cb.receiver.hdr_window_size();

// Note that once we reach a synchronized state we always include a valid acknowledgement number.
header.ack = true;
header.ack_num = cb.receiver.receive_next_seq_no;
header.seq_num = seq_num.unwrap_or(cb.sender.send_next_seq_no.get());

// Return this header.
header
Expand Down Expand Up @@ -592,8 +589,7 @@ impl Sender {
// TODO: Issue #198 Repacketization - we should send a full MSS (and set the FIN flag if applicable).

// Prepare and send the segment.
let mut header: TcpHeader = Self::tcp_header(cb);
header.seq_num = cb.sender.send_unacked.get();
let mut header: TcpHeader = Self::tcp_header(cb, Some(cb.sender.send_unacked.get()));
// If data exists, then this is a regular packet, otherwise, its a FIN.
if data.is_some() {
header.psh = true;
Expand All @@ -609,7 +605,6 @@ impl Sender {
// Process an ack.
pub fn process_ack(cb: &mut ControlBlock, header: &TcpHeader, now: Instant) {
// Start by checking that the ACK acknowledges something new.
// TODO: Look into removing Watched types.
let send_unacknowledged: SeqNumber = cb.sender.send_unacked.get();

if send_unacknowledged < header.ack_num {
Expand Down Expand Up @@ -656,6 +651,13 @@ impl Sender {
}
}

/// Send an ACK to our peer, reflecting our current state.
pub fn send_ack(cb: &mut ControlBlock, layer3_endpoint: &mut SharedLayer3Endpoint) {
trace!("sending ack");
let header: TcpHeader = Self::tcp_header(cb, None);
Self::emit(cb, layer3_endpoint, header, None);
}

/// Transmit this message to our connected peer.
pub fn emit(
cb: &mut ControlBlock,
Expand Down

0 comments on commit ba668d6

Please sign in to comment.