From 2eaf724d6484f9b8642a54446fbc2e5a7595c467 Mon Sep 17 00:00:00 2001 From: Vitali Lovich Date: Wed, 24 Apr 2024 11:35:13 -0700 Subject: [PATCH] Implement a weak reference to a DmaFile --- glommio/src/io/dma_file.rs | 192 ++++++++++++++++++++++++++++++++- glommio/src/io/glommio_file.rs | 49 ++++++++- 2 files changed, 238 insertions(+), 3 deletions(-) diff --git a/glommio/src/io/dma_file.rs b/glommio/src/io/dma_file.rs index ea6b7b575..047f0e80c 100644 --- a/glommio/src/io/dma_file.rs +++ b/glommio/src/io/dma_file.rs @@ -26,11 +26,14 @@ use std::{ rc::Rc, }; -use super::{glommio_file::OwnedGlommioFile, Stat}; +use super::{ + glommio_file::{OwnedGlommioFile, WeakGlommioFile}, + Stat, +}; pub(super) type Result = crate::Result; -/// Close result of [`DmaFile::close_rc()`]. Indicates which operation is +/// Close result of [`DmaFile::close_rc()`] and [`DmaFile::close`]. Indicates which operation is /// performed on close. #[derive(Debug)] pub enum CloseResult { @@ -650,6 +653,19 @@ impl DmaFile { self.file.dev_minor } + /// Creates a non-owning reference to this file that can be shared to a background thread. + /// The weak reference may keep some memory alive but does not keep the underlying file descriptor + /// open if all strong references have been dropped. + pub fn downgrade(&self) -> WeakDmaFile { + WeakDmaFile { + file: self.file.downgrade(), + o_direct_alignment: self.o_direct_alignment, + max_sectors_size: self.max_sectors_size, + max_segment_size: self.max_segment_size, + pollable: self.pollable, + } + } + /// Convenience method that closes a DmaFile wrapped inside an Rc. /// /// Returns [CloseResult] to indicate which operation was performed. @@ -734,6 +750,17 @@ impl DmaFile { /// assert!(read.iter().all(|&b| b == 2)); /// }); /// ``` +/// +/// SAFETY: Make sure to use pread/pwrite APIs if operating on the FD directly to ensure you +/// don't have multiple threads messing with the kernel's internal offset for the descriptor. +/// Additionally, there's no synchronization guarantee about race conditions reading and writing +/// the same file region from different threads. This is also true for a DmaFile that you convert +/// this into. +/// +/// For a guarantee of safety of the logical data on disk, you need to provide external +/// synchronization guarantees. For example, immutably writing a given file region, handing out +/// an owned reference after the fact, and then somehow guaranteeing that you never modify that +/// region again as long as there are readers alive. pub struct OwnedDmaFile { file: OwnedGlommioFile, o_direct_alignment: u64, @@ -762,6 +789,49 @@ impl OwnedDmaFile { pollable: self.pollable, }) } + + /// Returns an `Option` containing the path associated with this open + /// directory, or `None` if there isn't one. + pub fn path(&self) -> Option<&Path> { + self.file.path.as_deref() + } + + /// The inode backing the file. A file with the same inode may appear under multiple + /// paths due to renaming and linking. + pub fn inode(&self) -> u64 { + self.file.inode + } + + /// The major ID of the device containing the filesystem where the file resides. + /// The device may be found by issuing a `readlink`` on `/sys/dev/block/:` + pub fn dev_major(&self) -> u32 { + self.file.dev_major + } + + /// The minor ID of the device containing the filesystem where the file resides. + pub fn dev_minor(&self) -> u32 { + self.file.dev_minor + } + + /// Returns the alignment required for I/O operations. Typical values will + /// be 512 (NVME drive is configured in slower compat mode) or 4096 + /// (typical TLC native alignment). + pub fn alignment(&self) -> u64 { + self.o_direct_alignment + } + + /// Downgrade to a weak reference that doesn't keep the FD open but can be upgraded at a later + /// date on any thread to an OwnedDmaFile. There are safety implications if multiple threads + /// get involved (see the safety note for [OwnedDmaFile]). + pub fn downgrade(&self) -> WeakDmaFile { + WeakDmaFile { + file: self.file.downgrade(), + o_direct_alignment: self.o_direct_alignment, + max_sectors_size: self.max_sectors_size, + max_segment_size: self.max_segment_size, + pollable: self.pollable, + } + } } impl From for OwnedDmaFile { @@ -794,6 +864,64 @@ impl AsRawFd for OwnedDmaFile { } } +/// This holds a weak non-owning reference to the file that can attempt to be upgraded to an owned +/// reference on any thread. Underneath it uses Arc so there is an atomic cost involved in constructing +/// it / cloning. However, it can be useful if you want to grant conditional access at a future date +/// to a different thread or even to provide access to the FD for use outside glommio. For example, +/// upgrade to an OwnedDmaFile which gives you access to the underlying fd via [OwnedDmaFile::as_raw_fd]. +/// +/// SAFETY: See [OwnedDmaFile]. +#[derive(Debug, Clone)] +pub struct WeakDmaFile { + file: WeakGlommioFile, + o_direct_alignment: u64, + max_sectors_size: usize, + max_segment_size: usize, + pollable: PollableStatus, +} + +impl WeakDmaFile { + pub fn upgrade(&self) -> Option { + self.file.upgrade().map(|file| OwnedDmaFile { + file, + o_direct_alignment: self.o_direct_alignment, + max_sectors_size: self.max_sectors_size, + max_segment_size: self.max_segment_size, + pollable: self.pollable, + }) + } + + /// Returns an `Option` containing the path associated with this open + /// directory, or `None` if there isn't one. + pub fn path(&self) -> Option<&Path> { + self.file.path.as_deref() + } + + /// The inode backing the file. A file with the same inode may appear under multiple + /// paths due to renaming and linking. + pub fn inode(&self) -> u64 { + self.file.inode + } + + /// The major ID of the device containing the filesystem where the file resides. + /// The device may be found by issuing a `readlink`` on `/sys/dev/block/:` + pub fn dev_major(&self) -> u32 { + self.file.dev_major + } + + /// The minor ID of the device containing the filesystem where the file resides. + pub fn dev_minor(&self) -> u32 { + self.file.dev_minor + } + + /// Returns the alignment required for I/O operations. Typical values will + /// be 512 (NVME drive is configured in slower compat mode) or 4096 + /// (typical TLC native alignment). + pub fn alignment(&self) -> u64 { + self.o_direct_alignment + } +} + #[cfg(test)] pub(crate) mod test { use super::*; @@ -1938,4 +2066,64 @@ pub(crate) mod test { .expect("File should be read from foreground thread successfully"); assert_eq!(&*found_buffer, expected_buffer.as_slice()); }); + + dma_file_test!(basic_weak_file, path, _k, { + let file = OpenOptions::new() + .create_new(true) + .read(true) + .write(true) + .tmpfile(true) + .dma_open(&path) + .await + .unwrap(); + + let weak: WeakDmaFile = file.downgrade(); + assert!(weak.upgrade().is_some(), "File still alive"); + assert_eq!(weak.inode(), file.inode()); + + let cloned_weak = weak.clone(); + assert_eq!(cloned_weak.inode(), weak.inode()); + + let cloned = file.clone(); + + std::mem::drop(file); + + assert!( + weak.upgrade().is_some(), + "Clone still keeping the file alive" + ); + + std::mem::drop(cloned); + + assert!(weak.upgrade().is_none(), "Weak reference no longer valid"); + assert!(cloned_weak.upgrade().is_none(), "Weak reference no longer valid"); + }); + + dma_file_test!(weak_file_upgrade_off_thread, path, _k, { + let file = OpenOptions::new() + .create_new(true) + .read(true) + .write(true) + .tmpfile(true) + .dma_open(&path) + .await + .unwrap(); + + let inode = file.inode(); + + let weak: WeakDmaFile = file.downgrade(); + assert_eq!(weak.inode(), inode); + + crate::executor().spawn_blocking(move || { + let local_ex = + crate::executor::LocalExecutorBuilder::new(crate::executor::Placement::Unbound) + .record_io_latencies(true) + .make() + .unwrap(); + local_ex.run(async move { + let upgraded: DmaFile = weak.upgrade().unwrap().into(); + assert_eq!(upgraded.inode(), inode); + }); + }).await; + }); } diff --git a/glommio/src/io/glommio_file.rs b/glommio/src/io/glommio_file.rs index a73e2df19..9d20f37b2 100644 --- a/glommio/src/io/glommio_file.rs +++ b/glommio/src/io/glommio_file.rs @@ -18,7 +18,7 @@ use std::{ os::unix::io::{AsRawFd, FromRawFd, RawFd}, path::{Path, PathBuf}, rc::{Rc, Weak}, - sync::Arc, + sync::{Arc, Weak as AWeak}, }; type Result = crate::Result; @@ -346,6 +346,19 @@ impl GlommioFile { let st = self.statx().await?; Ok(st.stx_size) } + + pub(crate) fn downgrade(&self) -> WeakGlommioFile { + WeakGlommioFile { + fd: self + .file + .as_ref() + .map_or(AWeak::new(), Arc::downgrade), + path: self.path.borrow().clone(), + inode: self.inode, + dev_major: self.dev_major, + dev_minor: self.dev_minor, + } + } } /// This lets you open a DmaFile on one thread and then send it safely to another thread for processing. @@ -373,6 +386,19 @@ impl OwnedGlommioFile { dev_minor: self.dev_minor, }) } + + pub fn downgrade(&self) -> WeakGlommioFile { + WeakGlommioFile { + fd: self + .fd + .as_ref() + .map_or(AWeak::new(), Arc::downgrade), + path: self.path.clone(), + inode: self.inode, + dev_major: self.dev_major, + dev_minor: self.dev_minor, + } + } } impl AsRawFd for OwnedGlommioFile { @@ -417,6 +443,27 @@ impl From for OwnedGlommioFile { } } +#[derive(Debug, Clone)] +pub(crate) struct WeakGlommioFile { + pub(crate) fd: AWeak, + pub(crate) path: Option, + pub(crate) inode: u64, + pub(crate) dev_major: u32, + pub(crate) dev_minor: u32, +} + +impl WeakGlommioFile { + pub(crate) fn upgrade(&self) -> Option { + self.fd.upgrade().map(|fd| OwnedGlommioFile { + fd: Some(fd), + path: self.path.clone(), + inode: self.inode, + dev_major: self.dev_major, + dev_minor: self.dev_minor, + }) + } +} + #[cfg(test)] pub(crate) mod test { use super::*;