Skip to content

Commit

Permalink
[inetstack] Enhancement: Move to shared state machine
Browse files Browse the repository at this point in the history
  • Loading branch information
iyzhang committed Nov 16, 2023
1 parent 322df14 commit 30a1cd8
Show file tree
Hide file tree
Showing 10 changed files with 468 additions and 329 deletions.
4 changes: 1 addition & 3 deletions src/rust/catnip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,7 @@ impl CatnipLibOS {
return Err(Fail::new(libc::EINVAL, "zero-length buffer"));
}

let handle: TaskHandle = self.do_push(qd, buf)?;
let qt: QToken = handle.get_task_id().into();
Ok(qt)
self.do_push(qd, buf)
},
Err(e) => Err(e),
}
Expand Down
4 changes: 1 addition & 3 deletions src/rust/catpowder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,7 @@ impl CatpowderLibOS {
if buf.len() == 0 {
return Err(Fail::new(libc::EINVAL, "zero-length buffer"));
}
let handle: TaskHandle = self.do_push(qd, buf)?;
let qt: QToken = handle.get_task_id().into();
Ok(qt)
self.do_push(qd, buf)
},
Err(e) => Err(e),
}
Expand Down
69 changes: 20 additions & 49 deletions src/rust/inetstack/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,7 @@ impl<const N: usize> InetStack<N> {

// Search for target queue descriptor.
match self.runtime.get_queue_type(&qd)? {
QType::TcpSocket => {
let coroutine: Pin<Box<Operation>> = self.ipv4.tcp.accept(qd)?;
let task_id: String = format!("Inetstack::TCP::accept for qd={:?}", qd);
let handle: TaskHandle = self.runtime.insert_coroutine(task_id.as_str(), coroutine)?;
Ok(handle.get_task_id().into())
},
QType::TcpSocket => self.ipv4.tcp.accept(qd),
// This queue descriptor does not concern a TCP socket.
_ => Err(Fail::new(libc::EINVAL, "invalid queue type")),
}
Expand All @@ -281,12 +276,7 @@ impl<const N: usize> InetStack<N> {
let remote: SocketAddrV4 = unwrap_socketaddr(remote)?;

match self.runtime.get_queue_type(&qd)? {
QType::TcpSocket => {
let coroutine: Pin<Box<Operation>> = self.ipv4.tcp.connect(qd, remote)?;
let task_id: String = format!("Inetstack::TCP::connect for qd={:?}", qd);
let handle: TaskHandle = self.runtime.insert_coroutine(task_id.as_str(), coroutine)?;
Ok(handle.get_task_id().into())
},
QType::TcpSocket => self.ipv4.tcp.connect(qd, remote),
_ => Err(Fail::new(libc::EINVAL, "invalid queue type")),
}
}
Expand Down Expand Up @@ -328,12 +318,8 @@ impl<const N: usize> InetStack<N> {
timer!("inetstack::async_close");
trace!("async_close(): qd={:?}", qd);

let (task_id, coroutine): (String, Pin<Box<Operation>>) = match self.runtime.get_queue_type(&qd)? {
QType::TcpSocket => {
let task_id: String = format!("Inetstack::TCP::close for qd={:?}", qd);
let coroutine: Pin<Box<Operation>> = self.ipv4.tcp.async_close(qd)?;
(task_id, coroutine)
},
match self.runtime.get_queue_type(&qd)? {
QType::TcpSocket => self.ipv4.tcp.async_close(qd),
QType::UdpSocket => {
self.ipv4.udp.close(qd)?;
let task_id: String = format!("Inetstack::UDP::close for qd={:?}", qd);
Expand All @@ -346,26 +332,20 @@ impl<const N: usize> InetStack<N> {
.expect("queue should exist");
(qd, OperationResult::Close)
});
(task_id, coroutine)
let handle: TaskHandle = self.runtime.insert_coroutine(task_id.as_str(), coroutine)?;
let qt: QToken = handle.get_task_id().into();
trace!("async_close() qt={:?}", qt);
Ok(qt)
},
_ => return Err(Fail::new(libc::EINVAL, "invalid queue type")),
};

let handle: TaskHandle = self.runtime.insert_coroutine(task_id.as_str(), coroutine)?;
let qt: QToken = handle.get_task_id().into();
trace!("async_close() qt={:?}", qt);
Ok(qt)
_ => Err(Fail::new(libc::EINVAL, "invalid queue type")),
}
}

/// Pushes a buffer to a TCP socket.
/// TODO: Rename this function to push() once we have a common representation across all libOSes.
pub fn do_push(&mut self, qd: QDesc, buf: DemiBuffer) -> Result<TaskHandle, Fail> {
pub fn do_push(&mut self, qd: QDesc, buf: DemiBuffer) -> Result<QToken, Fail> {
match self.runtime.get_queue_type(&qd)? {
QType::TcpSocket => {
let coroutine: Pin<Box<Operation>> = self.ipv4.tcp.push(qd, buf)?;
let task_id: String = format!("Inetstack::TCP::push for qd={:?}", qd);
self.runtime.insert_coroutine(task_id.as_str(), coroutine)
},
QType::TcpSocket => self.ipv4.tcp.push(qd, buf),
_ => Err(Fail::new(libc::EINVAL, "invalid queue type")),
}
}
Expand All @@ -384,10 +364,7 @@ impl<const N: usize> InetStack<N> {
}

// Issue operation.
let handle: TaskHandle = self.do_push(qd, buf)?;
let qt: QToken = handle.get_task_id().into();
trace!("push2() qt={:?}", qt);
Ok(qt)
self.do_push(qd, buf)
}

/// Pushes a buffer to a UDP socket.
Expand Down Expand Up @@ -436,24 +413,18 @@ impl<const N: usize> InetStack<N> {
// We just assert 'size' here, because it was previously checked at PDPIX layer.
debug_assert!(size.is_none() || ((size.unwrap() > 0) && (size.unwrap() <= limits::POP_SIZE_MAX)));

let (task_id, coroutine): (String, Pin<Box<Operation>>) = match self.runtime.get_queue_type(&qd)? {
QType::TcpSocket => {
let task_id: String = format!("Inetstack::TCP::pop for qd={:?}", qd);
let coroutine: Pin<Box<Operation>> = self.ipv4.tcp.pop(qd, size)?;
(task_id, coroutine)
},
match self.runtime.get_queue_type(&qd)? {
QType::TcpSocket => self.ipv4.tcp.pop(qd, size),
QType::UdpSocket => {
let task_id: String = format!("Inetstack::UDP::pop for qd={:?}", qd);
let coroutine: Pin<Box<Operation>> = self.ipv4.udp.pop(qd, size)?;
(task_id, coroutine)
let handle: TaskHandle = self.runtime.insert_coroutine(task_id.as_str(), coroutine)?;
let qt: QToken = handle.get_task_id().into();
trace!("async_close() qt={:?}", qt);
Ok(qt)
},
_ => return Err(Fail::new(libc::EINVAL, "invalid queue type")),
};

let handle: TaskHandle = self.runtime.insert_coroutine(task_id.as_str(), coroutine)?;
let qt: QToken = handle.get_task_id().into();
trace!("pop() qt={:?}", qt);
Ok(qt)
}
}

/// Waits for an operation to complete.
Expand Down
2 changes: 1 addition & 1 deletion src/rust/inetstack/protocols/tcp/passive_open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl<const N: usize> SharedPassiveSocket<N> {
self.local
}

pub async fn accept(&mut self, yielder: Yielder) -> Result<EstablishedSocket<N>, Fail> {
pub async fn do_accept(&mut self, yielder: Yielder) -> Result<EstablishedSocket<N>, Fail> {
self.ready.pop(yielder).await
}

Expand Down
Loading

0 comments on commit 30a1cd8

Please sign in to comment.