diff --git a/glommio/Cargo.toml b/glommio/Cargo.toml index 815a101d5..876d99da3 100755 --- a/glommio/Cargo.toml +++ b/glommio/Cargo.toml @@ -33,7 +33,7 @@ lazy_static = "1.4" libc = "0.2" lockfree = "0.5" log = "0.4" -nix = "0.23" +nix = { version = "0.27", features = ["event", "fs", "ioctl", "mman", "net", "poll", "sched", "time"] } pin-project-lite = "0.2" rlimit = "0.6" scoped-tls = "1.0" diff --git a/glommio/src/iou/sqe.rs b/glommio/src/iou/sqe.rs index 81cf0481e..98ff14306 100644 --- a/glommio/src/iou/sqe.rs +++ b/glommio/src/iou/sqe.rs @@ -8,19 +8,20 @@ use std::{ use super::registrar::{UringFd, UringReadBuf, UringWriteBuf}; +use nix::sys::socket::{SockaddrLike, SockaddrStorage}; pub use nix::{ fcntl::{FallocateFlags, OFlag, PosixFadviseAdvice}, poll::PollFlags, sys::{ epoll::{EpollEvent, EpollOp}, mman::MmapAdvise, - socket::{MsgFlags, SockAddr, SockFlag}, + socket::{MsgFlags, SockFlag}, stat::Mode, }, }; use super::Personality; -use crate::{sys::Statx, to_io_error, uring_sys}; +use crate::{sys::Statx, uring_sys}; /// A pending IO event. /// @@ -352,9 +353,10 @@ impl<'a> SQE<'a> { } #[inline] - pub unsafe fn prep_connect(&mut self, fd: impl UringFd, socket_addr: &SockAddr) { - let (addr, len) = socket_addr.as_ffi_pair(); - uring_sys::io_uring_prep_connect(self.sqe, fd.as_raw_fd(), addr as *const _ as *mut _, len); + pub unsafe fn prep_connect(&mut self, fd: impl UringFd, socket_addr: &SockaddrStorage) { + let addr = socket_addr.as_ptr(); + let len = socket_addr.len(); + uring_sys::io_uring_prep_connect(self.sqe, fd.as_raw_fd(), addr as *mut _, len); fd.update_sqe(self); } @@ -521,11 +523,6 @@ impl SockAddrStorage { let len = mem::size_of::(); SockAddrStorage { storage, len } } - - pub unsafe fn as_socket_addr(&self) -> io::Result { - let storage = &*self.storage.as_ptr(); - nix::sys::socket::sockaddr_storage_to_addr(storage, self.len).map_err(|e| to_io_error!(e)) - } } #[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] diff --git a/glommio/src/net/datagram.rs b/glommio/src/net/datagram.rs index 0126d3627..4d707c81e 100644 --- a/glommio/src/net/datagram.rs +++ b/glommio/src/net/datagram.rs @@ -4,10 +4,10 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2020 Datadog, Inc. // use crate::{ - sys::{self, DmaBuffer, Source, SourceType}, + sys::{DmaBuffer, Source, SourceType}, ByteSliceMutExt, Reactor, }; -use nix::sys::socket::MsgFlags; +use nix::sys::socket::{MsgFlags, SockaddrLike}; use std::{ cell::Cell, io, @@ -95,10 +95,10 @@ impl> GlommioDatagram { self.consume_receive_buffer(source, buf).await } - pub(crate) async fn peek_from( + pub(crate) async fn peek_from( &self, buf: &mut [u8], - ) -> io::Result<(usize, nix::sys::socket::SockAddr)> { + ) -> io::Result<(usize, T)> { match self.yolo_recvmsg(buf, MsgFlags::MSG_PEEK) { Some(res) => res, None => self.recv_from_blocking(buf, MsgFlags::MSG_PEEK).await, @@ -119,11 +119,11 @@ impl> GlommioDatagram { } } - pub(crate) async fn recv_from_blocking( + pub(crate) async fn recv_from_blocking( &self, buf: &mut [u8], flags: MsgFlags, - ) -> io::Result<(usize, nix::sys::socket::SockAddr)> { + ) -> io::Result<(usize, T)> { let source = self.reactor.upgrade().unwrap().rushed_recvmsg( self.socket.as_raw_fd(), buf.len(), @@ -136,7 +136,9 @@ impl> GlommioDatagram { let mut src = src.take().unwrap(); src.trim_to_size(sz); buf[0..sz].copy_from_slice(&src.as_bytes()[0..sz]); - let addr = unsafe { sys::ssptr_to_sockaddr(addr, hdr.msg_namelen as _)? }; + let addr = unsafe { + T::from_raw(addr.as_ptr() as *const _, Some(hdr.msg_namelen)).unwrap() + }; self.rx_yolo.set(true); Ok((sz, addr)) } @@ -172,10 +174,10 @@ impl> GlommioDatagram { self.read_timeout.get() } - pub(crate) async fn recv_from( + pub(crate) async fn recv_from( &self, buf: &mut [u8], - ) -> io::Result<(usize, nix::sys::socket::SockAddr)> { + ) -> io::Result<(usize, T)> { match self.yolo_recvmsg(buf, MsgFlags::empty()) { Some(res) => res, None => self.recv_from_blocking(buf, MsgFlags::empty()).await, @@ -185,7 +187,7 @@ impl> GlommioDatagram { pub(crate) async fn send_to_blocking( &self, buf: &[u8], - sockaddr: nix::sys::socket::SockAddr, + sockaddr: impl nix::sys::socket::SockaddrLike, ) -> io::Result { let mut dma = self.allocate_buffer(buf.len()); assert_eq!(dma.write_at(0, buf), buf.len()); @@ -203,7 +205,7 @@ impl> GlommioDatagram { pub(crate) async fn send_to( &self, buf: &[u8], - addr: nix::sys::socket::SockAddr, + addr: impl nix::sys::socket::SockaddrLike, ) -> io::Result { match self.yolo_sendmsg(buf, &addr) { Some(res) => res, @@ -245,11 +247,11 @@ impl> GlommioDatagram { }) } - fn yolo_recvmsg( + fn yolo_recvmsg( &self, buf: &mut [u8], flags: MsgFlags, - ) -> Option> { + ) -> Option> { if self.rx_yolo.get() { super::yolo_recvmsg(self.socket.as_raw_fd(), buf, flags) } else { @@ -276,7 +278,7 @@ impl> GlommioDatagram { fn yolo_sendmsg( &self, buf: &[u8], - addr: &nix::sys::socket::SockAddr, + addr: &impl nix::sys::socket::SockaddrLike, ) -> Option> { if self.tx_yolo.get() { super::yolo_sendmsg(self.socket.as_raw_fd(), buf, addr) diff --git a/glommio/src/net/mod.rs b/glommio/src/net/mod.rs index 6e57c6d60..47b1f881a 100644 --- a/glommio/src/net/mod.rs +++ b/glommio/src/net/mod.rs @@ -5,7 +5,7 @@ // //! This module provides glommio's networking support. use crate::sys; -use nix::sys::socket::MsgFlags; +use nix::sys::socket::{MsgFlags, SockaddrLike}; use std::{io, os::unix::io::RawFd}; fn yolo_accept(fd: RawFd) -> Option> { @@ -67,11 +67,11 @@ fn yolo_recv(fd: RawFd, buf: &mut [u8]) -> Option> { } } -fn yolo_recvmsg( +fn yolo_recvmsg( fd: RawFd, buf: &mut [u8], flags: MsgFlags, -) -> Option> { +) -> Option> { match sys::recvmsg_syscall( fd, buf.as_mut_ptr(), @@ -89,7 +89,7 @@ fn yolo_recvmsg( fn yolo_sendmsg( fd: RawFd, buf: &[u8], - addr: &nix::sys::socket::SockAddr, + addr: &impl nix::sys::socket::SockaddrLike, ) -> Option> { match sys::sendmsg_syscall( fd, diff --git a/glommio/src/net/tcp_socket.rs b/glommio/src/net/tcp_socket.rs index 5cb994ee9..d03cf78f2 100644 --- a/glommio/src/net/tcp_socket.rs +++ b/glommio/src/net/tcp_socket.rs @@ -17,7 +17,7 @@ use futures_lite::{ io::{AsyncBufRead, AsyncRead, AsyncWrite}, stream::{self, Stream}, }; -use nix::sys::socket::{InetAddr, SockAddr}; +use nix::sys::socket::SockaddrStorage; use pin_project_lite::pin_project; use socket2::{Domain, Protocol, Socket, Type}; use std::{ @@ -393,16 +393,14 @@ impl FromRawFd for TcpStream { } } -fn make_tcp_socket(addr: &SocketAddr) -> io::Result<(SockAddr, Socket)> { +fn make_tcp_socket(addr: &SocketAddr) -> io::Result { let domain = if addr.is_ipv6() { Domain::IPV6 } else { Domain::IPV4 }; let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?; - let inet = InetAddr::from_std(addr); - let addr = SockAddr::new_inet(inet); - Ok((addr, socket)) + Ok(socket) } impl TcpStream { @@ -420,9 +418,9 @@ impl TcpStream { /// ``` pub async fn connect(addr: A) -> Result { let addr = addr.to_socket_addrs()?.next().unwrap(); - let (addr, socket) = make_tcp_socket(&addr)?; + let socket = make_tcp_socket(&addr)?; let reactor = crate::executor().reactor(); - let source = reactor.connect(socket.as_raw_fd(), addr); + let source = reactor.connect(socket.as_raw_fd(), SockaddrStorage::from(addr)); source.collect_rw().await?; Ok(TcpStream { @@ -463,9 +461,10 @@ impl TcpStream { } let addr = addr.to_socket_addrs()?.next().unwrap(); - let (addr, socket) = make_tcp_socket(&addr)?; + let socket = make_tcp_socket(&addr)?; let reactor = crate::executor().reactor(); - let source = reactor.connect_timeout(socket.as_raw_fd(), addr, duration); + let source = + reactor.connect_timeout(socket.as_raw_fd(), SockaddrStorage::from(addr), duration); // connect_timeout submits two sqes to io_uring: a connect sqe soft-linked // with a LINK_TIMEOUT sqe. If the timeout fires, the connect sqe fails with diff --git a/glommio/src/net/udp_socket.rs b/glommio/src/net/udp_socket.rs index 54aba9f05..175839f2c 100644 --- a/glommio/src/net/udp_socket.rs +++ b/glommio/src/net/udp_socket.rs @@ -4,11 +4,11 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2020 Datadog, Inc. // use super::datagram::GlommioDatagram; -use nix::sys::socket::{InetAddr, SockAddr}; +use nix::sys::socket::{SockaddrLike, SockaddrStorage}; use socket2::{Domain, Protocol, Socket, Type}; use std::{ io, - net::{self, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs}, + net::{self, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs}, os::unix::io::{AsRawFd, FromRawFd, RawFd}, time::Duration, }; @@ -42,6 +42,23 @@ impl FromRawFd for UdpSocket { } } +fn sockaddr_storage_to_std(addr: SockaddrStorage) -> Option { + match addr.family() { + Some(nix::sys::socket::AddressFamily::Inet) => addr + .as_sockaddr_in() + .map(|x| SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(x.ip()), x.port()))), + Some(nix::sys::socket::AddressFamily::Inet6) => addr.as_sockaddr_in6().map(|x| { + SocketAddr::V6(SocketAddrV6::new( + x.ip(), + x.port(), + x.flowinfo(), + x.scope_id(), + )) + }), + _ => None, + } +} + impl UdpSocket { /// Creates a UDP socket bound to the specified address. /// @@ -121,10 +138,8 @@ impl UdpSocket { let iter = addr.to_socket_addrs()?; let mut err = io::Error::new(io::ErrorKind::Other, "No Valid addresses"); for addr in iter { - let inet = InetAddr::from_std(&addr); - let addr = SockAddr::new_inet(inet); let reactor = self.socket.reactor.upgrade().unwrap(); - let source = reactor.connect(self.socket.as_raw_fd(), addr); + let source = reactor.connect(self.socket.as_raw_fd(), SockaddrStorage::from(addr)); match source.collect_rw().await { Ok(_) => return Ok(()), Err(x) => { @@ -494,12 +509,8 @@ impl UdpSocket { /// supplied buffer, excess bytes may be discarded. pub async fn peek_from(&self, buf: &mut [u8]) -> Result<(usize, SocketAddr)> { let (sz, addr) = self.socket.peek_from(buf).await?; - - let addr = match addr { - nix::sys::socket::SockAddr::Inet(addr) => addr, - x => panic!("invalid socket addr for this family!: {:?}", x), - }; - Ok((sz, addr.to_std())) + let addr = sockaddr_storage_to_std(addr).expect("invalid socket addr for this family!"); + Ok((sz, addr)) } /// Returns the socket address of the remote peer this socket was connected @@ -581,11 +592,8 @@ impl UdpSocket { /// ``` pub async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, SocketAddr)> { let (sz, addr) = self.socket.recv_from(buf).await?; - let addr = match addr { - nix::sys::socket::SockAddr::Inet(addr) => addr, - x => panic!("invalid socket addr for this family!: {:?}", x), - }; - Ok((sz, addr.to_std())) + let addr = sockaddr_storage_to_std(addr).expect("invalid socket addr for this family!"); + Ok((sz, addr)) } /// Sends data on the socket to the given address. On success, returns the @@ -622,8 +630,7 @@ impl UdpSocket { .next() .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "empty address"))?; - let inet = nix::sys::socket::InetAddr::from_std(&addr); - let sockaddr = nix::sys::socket::SockAddr::new_inet(inet); + let sockaddr = SockaddrStorage::from(addr); self.socket.send_to(buf, sockaddr).await.map_err(Into::into) } @@ -659,7 +666,7 @@ impl UdpSocket { mod tests { use super::*; use crate::{timer::Timer, LocalExecutorBuilder}; - use nix::sys::socket::MsgFlags; + use nix::sys::socket::{MsgFlags, SockaddrIn}; use std::time::Duration; macro_rules! connected_pair { @@ -879,7 +886,7 @@ mod tests { for _ in 0..10 { let (sz, _) = receiver .socket - .recv_from_blocking(&mut buf, MsgFlags::MSG_PEEK) + .recv_from_blocking::(&mut buf, MsgFlags::MSG_PEEK) .await .unwrap(); assert_eq!(sz, 1); @@ -889,11 +896,7 @@ mod tests { .recv_from_blocking(&mut buf, MsgFlags::MSG_PEEK) .await .unwrap(); - let addr = match from { - nix::sys::socket::SockAddr::Inet(addr) => addr, - x => panic!("invalid socket addr for this family!: {:?}", x), - }; - addr.to_std() + sockaddr_storage_to_std(from).expect("invalid socket addr for this family!") }) .detach(); @@ -940,11 +943,7 @@ mod tests { .await .unwrap(); assert_eq!(sz, 1); - let addr = match from { - nix::sys::socket::SockAddr::Inet(addr) => addr, - x => panic!("invalid socket addr for this family!: {:?}", x), - }; - addr.to_std() + sockaddr_storage_to_std(from).expect("invalid socket addr for this family!") }) .detach(); @@ -980,8 +979,7 @@ mod tests { let receiver = UdpSocket::bind("127.0.0.1:0").unwrap(); let addr = receiver.local_addr().unwrap(); - let inet = nix::sys::socket::InetAddr::from_std(&addr); - let sockaddr = nix::sys::socket::SockAddr::new_inet(inet); + let sockaddr = SockaddrStorage::from(addr); let me = UdpSocket::bind("127.0.0.1:0").unwrap(); me.socket .send_to_blocking(&[65u8; 1], sockaddr) diff --git a/glommio/src/net/unix.rs b/glommio/src/net/unix.rs index b78e5d98b..4c72dc9b2 100644 --- a/glommio/src/net/unix.rs +++ b/glommio/src/net/unix.rs @@ -13,7 +13,7 @@ use futures_lite::{ io::{AsyncBufRead, AsyncRead, AsyncWrite}, stream::{self, Stream}, }; -use nix::sys::socket::{SockAddr, UnixAddr}; +use nix::sys::socket::UnixAddr; use pin_project_lite::pin_project; use socket2::{Domain, Socket, Type}; use std::{ @@ -329,8 +329,8 @@ impl UnixStream { let reactor = crate::executor().reactor(); let socket = Socket::new(Domain::UNIX, Type::STREAM, None)?; - let addr = SockAddr::new_unix(addr.as_ref()) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + let addr = + UnixAddr::new(addr.as_ref()).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; let source = reactor.connect(socket.as_raw_fd(), addr); source.collect_rw().await?; @@ -536,8 +536,8 @@ impl UnixDatagram { /// [`send`]: UnixDatagram::send /// [`recv`]: UnixDatagram::recv pub async fn connect>(&self, addr: A) -> Result<()> { - let addr = SockAddr::new_unix(addr.as_ref()) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + let addr = + UnixAddr::new(addr.as_ref()).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; let reactor = self.socket.reactor.upgrade().unwrap(); let source = reactor.connect(self.socket.as_raw_fd(), addr); @@ -595,13 +595,7 @@ impl UnixDatagram { /// to hold the message bytes. If a message is too long to fit in the /// supplied buffer, excess bytes may be discarded. pub async fn peek_from(&self, buf: &mut [u8]) -> Result<(usize, UnixAddr)> { - let (sz, addr) = self.socket.peek_from(buf).await?; - - let addr = match addr { - nix::sys::socket::SockAddr::Unix(addr) => addr, - x => panic!("invalid socket addr for this family!: {:?}", x), - }; - Ok((sz, addr)) + self.socket.peek_from(buf).await.map_err(Into::into) } /// Returns the socket address of the remote peer this socket was connected @@ -672,12 +666,7 @@ impl UnixDatagram { /// }) /// ``` pub async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, UnixAddr)> { - let (sz, addr) = self.socket.recv_from(buf).await?; - let addr = match addr { - nix::sys::socket::SockAddr::Unix(addr) => addr, - x => panic!("invalid socket addr for this family!: {:?}", x), - }; - Ok((sz, addr)) + self.socket.recv_from(buf).await.map_err(Into::into) } /// Sends data on the socket to the given address. On success, returns the @@ -695,7 +684,7 @@ impl UnixDatagram { /// }) /// ``` pub async fn send_to>(&self, buf: &[u8], addr: A) -> Result { - let addr = nix::sys::socket::SockAddr::new_unix(addr.as_ref()) + let addr = nix::sys::socket::UnixAddr::new(addr.as_ref()) .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; self.socket.send_to(buf, addr).await.map_err(Into::into) } diff --git a/glommio/src/reactor.rs b/glommio/src/reactor.rs index 7f5f9895e..f80099275 100644 --- a/glommio/src/reactor.rs +++ b/glommio/src/reactor.rs @@ -25,7 +25,7 @@ use std::{ use ahash::AHashMap; use log::error; -use nix::sys::socket::{MsgFlags, SockAddr}; +use nix::sys::socket::{MsgFlags, SockaddrLike, SockaddrStorage}; use smallvec::SmallVec; use crate::{ @@ -406,13 +406,14 @@ impl Reactor { source } - pub(crate) fn connect(&self, raw: RawFd, addr: SockAddr) -> Source { + pub(crate) fn connect(&self, raw: RawFd, addr: impl SockaddrLike) -> Source { + let addr = unsafe { SockaddrStorage::from_raw(addr.as_ptr(), Some(addr.len())) }.unwrap(); let source = self.new_source(raw, SourceType::Connect(addr), None); self.sys.connect(&source); source } - pub(crate) fn connect_timeout(&self, raw: RawFd, addr: SockAddr, d: Duration) -> Source { + pub(crate) fn connect_timeout(&self, raw: RawFd, addr: SockaddrStorage, d: Duration) -> Source { let source = self.new_source(raw, SourceType::Connect(addr), None); source.set_timeout(d); self.sys.connect(&source); @@ -458,7 +459,7 @@ impl Reactor { &self, fd: RawFd, buf: DmaBuffer, - addr: nix::sys::socket::SockAddr, + addr: impl nix::sys::socket::SockaddrLike, timeout: Option, ) -> io::Result { let iov = libc::iovec { @@ -469,6 +470,7 @@ impl Reactor { // leave it blank and the `io_uring` callee will fill that up let hdr = unsafe { std::mem::zeroed::() }; + let addr = unsafe { SockaddrStorage::from_raw(addr.as_ptr(), Some(addr.len())) }.unwrap(); let source = self.new_source(fd, SourceType::SockSendMsg(buf, iov, hdr, addr), None); if let Some(timeout) = timeout { source.set_timeout(timeout); diff --git a/glommio/src/sys/mod.rs b/glommio/src/sys/mod.rs index a327023b7..daee8d8ab 100644 --- a/glommio/src/sys/mod.rs +++ b/glommio/src/sys/mod.rs @@ -3,9 +3,10 @@ // // This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2020 Datadog, Inc. // -use crate::{to_io_error, uring_sys}; +use crate::uring_sys; use ahash::AHashMap; use log::debug; +use nix::sys::socket::SockaddrLike; use std::{ fmt, io, io::Error, @@ -105,29 +106,28 @@ pub(crate) fn direct_io_ify(fd: RawFd, flags: libc::c_int) -> io::Result<()> { // This essentially converts the nix errors into something we can integrate with // the rest of the crate. -pub(crate) unsafe fn ssptr_to_sockaddr( +pub(crate) unsafe fn ssptr_to_sockaddr( ss: MaybeUninit, len: usize, -) -> io::Result { - let storage = ss.assume_init(); +) -> io::Result { // Unnamed unix sockets have a len of 0. Technically we should make sure this // has family = AF_UNIX, but if len == 0 the OS may not have written // anything here. If this is not supposed to be unix, the upper layers will // complain. if len == 0 { - nix::sys::socket::SockAddr::new_unix("") + let addr = nix::sys::socket::UnixAddr::new("").unwrap(); + Ok(T::from_raw(addr.as_ptr() as *const _, Some(addr.len())).unwrap()) } else { - nix::sys::socket::sockaddr_storage_to_addr(&storage, len) + Ok(T::from_raw(ss.as_ptr() as *const _, Some(len as _)).unwrap()) } - .map_err(|e| to_io_error!(e)) } -pub(crate) fn recvmsg_syscall( +pub(crate) fn recvmsg_syscall( fd: RawFd, buf: *mut u8, len: usize, flags: i32, -) -> io::Result<(usize, nix::sys::socket::SockAddr)> { +) -> io::Result<(usize, T)> { let mut iov = libc::iovec { iov_base: buf as *mut libc::c_void, iov_len: len, @@ -151,7 +151,7 @@ pub(crate) fn sendmsg_syscall( fd: RawFd, buf: *const u8, len: usize, - addr: &nix::sys::socket::SockAddr, + addr: &impl nix::sys::socket::SockaddrLike, flags: i32, ) -> io::Result { let mut iov = libc::iovec { @@ -159,8 +159,8 @@ pub(crate) fn sendmsg_syscall( iov_len: len, }; - let (msg_name, msg_namelen) = addr.as_ffi_pair(); - let msg_name = msg_name as *const nix::sys::socket::sockaddr as *mut libc::c_void; + let msg_name = addr.as_ptr() as *mut libc::c_void; + let msg_namelen = addr.len(); let mut hdr = unsafe { std::mem::zeroed::() }; hdr.msg_name = msg_name; diff --git a/glommio/src/sys/source.rs b/glommio/src/sys/source.rs index c91b3e1a8..1d0c963c3 100644 --- a/glommio/src/sys/source.rs +++ b/glommio/src/sys/source.rs @@ -4,7 +4,7 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2021 Datadog, Inc. // use crate::{ - iou::sqe::{SockAddr, SockAddrStorage}, + iou::sqe::SockAddrStorage, sys::{ DmaBuffer, IoBuffer, OsResult, PollableStatus, ReactorQueue, SourceId, Statx, TimeSpec64, Wakers, @@ -43,7 +43,7 @@ pub(crate) enum SourceType { DmaBuffer, libc::iovec, libc::msghdr, - nix::sys::socket::SockAddr, + nix::sys::socket::SockaddrStorage, ), Open(CString), FdataSync, @@ -54,7 +54,7 @@ pub(crate) enum SourceType { ForeignNotifier(u64, bool), Statx(CString, Box>), Timeout(TimeSpec64, u32), - Connect(SockAddr), + Connect(nix::sys::socket::SockaddrStorage), Accept(SockAddrStorage), Rename(PathBuf, PathBuf), CreateDir(PathBuf), diff --git a/glommio/src/sys/uring.rs b/glommio/src/sys/uring.rs index f7acb28d4..b82cb5776 100644 --- a/glommio/src/sys/uring.rs +++ b/glommio/src/sys/uring.rs @@ -8,6 +8,7 @@ use log::warn; use nix::{ fcntl::{FallocateFlags, OFlag}, poll::PollFlags, + sys::socket::{SockaddrLike, SockaddrStorage}, }; use rlimit::Resource; use std::{ @@ -45,7 +46,7 @@ use crate::{ use ahash::AHashMap; use buddy_alloc::buddy_alloc::{BuddyAlloc, BuddyAllocParam}; use nix::sys::{ - socket::{MsgFlags, SockAddr, SockFlag}, + socket::{MsgFlags, SockFlag}, stat::Mode as OpenMode, }; use smallvec::SmallVec; @@ -65,7 +66,7 @@ enum UringOpDescriptor { Open(*const u8, libc::c_int, u32), Close, FDataSync, - Connect(*const SockAddr), + Connect(*const SockaddrStorage), LinkTimeout(*const uring_sys::__kernel_timespec), Accept(*mut SockAddrStorage), Fallocate(u64, u64, libc::c_int), @@ -380,28 +381,16 @@ fn fill_sqe( UringOpDescriptor::SockSend(ptr, len, flags) => { let buf = std::slice::from_raw_parts(ptr, len); - sqe.prep_send( - op.fd, - buf, - MsgFlags::from_bits_unchecked(flags | MSG_ZEROCOPY), - ); + sqe.prep_send(op.fd, buf, MsgFlags::from_bits_retain(flags | MSG_ZEROCOPY)); } UringOpDescriptor::SockSendMsg(hdr, flags) => { - sqe.prep_sendmsg( - op.fd, - hdr, - MsgFlags::from_bits_unchecked(flags | MSG_ZEROCOPY), - ); + sqe.prep_sendmsg(op.fd, hdr, MsgFlags::from_bits_retain(flags | MSG_ZEROCOPY)); } UringOpDescriptor::SockRecv(len, flags) => { let mut buf = DmaBuffer::new(len).expect("failed to allocate buffer"); - sqe.prep_recv( - op.fd, - buf.as_bytes_mut(), - MsgFlags::from_bits_unchecked(flags), - ); + sqe.prep_recv(op.fd, buf.as_bytes_mut(), MsgFlags::from_bits_retain(flags)); source_map.peek_source_mut(from_user_data(op.user_data), |mut src| { match &mut src.source_type { @@ -432,7 +421,7 @@ fn fill_sqe( sqe.prep_recvmsg( op.fd, hdr as *mut libc::msghdr, - MsgFlags::from_bits_unchecked(flags), + MsgFlags::from_bits_retain(flags), ); *slot = Some(buf); } @@ -1483,8 +1472,8 @@ impl Reactor { pub(crate) fn sendmsg(&self, source: &Source, flags: MsgFlags) { let op = match &mut *source.source_type_mut() { SourceType::SockSendMsg(_, iov, hdr, addr) => { - let (msg_name, msg_namelen) = addr.as_ffi_pair(); - let msg_name = msg_name as *const nix::sys::socket::sockaddr as *mut libc::c_void; + let msg_name = addr.as_ptr() as *mut libc::c_void; + let msg_namelen = addr.len(); hdr.msg_iov = iov as *mut libc::iovec; hdr.msg_iovlen = 1; @@ -1525,7 +1514,7 @@ impl Reactor { pub(crate) fn connect(&self, source: &Source) { let op = match &*source.source_type() { - SourceType::Connect(addr) => UringOpDescriptor::Connect(addr as *const SockAddr), + SourceType::Connect(addr) => UringOpDescriptor::Connect(addr as *const _), x => panic!("Unexpected source type for connect: {:?}", x), }; queue_request_into_ring(