Skip to content

Commit

Permalink
Implement a weak reference to a DmaFile
Browse files Browse the repository at this point in the history
  • Loading branch information
vlovich committed Apr 24, 2024
1 parent 85b115d commit 2eaf724
Show file tree
Hide file tree
Showing 2 changed files with 238 additions and 3 deletions.
192 changes: 190 additions & 2 deletions glommio/src/io/dma_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ use std::{
rc::Rc,
};

use super::{glommio_file::OwnedGlommioFile, Stat};
use super::{
glommio_file::{OwnedGlommioFile, WeakGlommioFile},
Stat,
};

pub(super) type Result<T> = crate::Result<T, ()>;

/// Close result of [`DmaFile::close_rc()`]. Indicates which operation is
/// Close result of [`DmaFile::close_rc()`] and [`DmaFile::close`]. Indicates which operation is
/// performed on close.
#[derive(Debug)]
pub enum CloseResult {
Expand Down Expand Up @@ -650,6 +653,19 @@ impl DmaFile {
self.file.dev_minor
}

/// Creates a non-owning reference to this file that can be shared to a background thread.
/// The weak reference may keep some memory alive but does not keep the underlying file descriptor
/// open if all strong references have been dropped.
pub fn downgrade(&self) -> WeakDmaFile {
WeakDmaFile {
file: self.file.downgrade(),
o_direct_alignment: self.o_direct_alignment,
max_sectors_size: self.max_sectors_size,
max_segment_size: self.max_segment_size,
pollable: self.pollable,
}
}

/// Convenience method that closes a DmaFile wrapped inside an Rc.
///
/// Returns [CloseResult] to indicate which operation was performed.
Expand Down Expand Up @@ -734,6 +750,17 @@ impl DmaFile {
/// assert!(read.iter().all(|&b| b == 2));
/// });
/// ```
///
/// SAFETY: Make sure to use pread/pwrite APIs if operating on the FD directly to ensure you
/// don't have multiple threads messing with the kernel's internal offset for the descriptor.
/// Additionally, there's no synchronization guarantee about race conditions reading and writing
/// the same file region from different threads. This is also true for a DmaFile that you convert
/// this into.
///
/// For a guarantee of safety of the logical data on disk, you need to provide external
/// synchronization guarantees. For example, immutably writing a given file region, handing out
/// an owned reference after the fact, and then somehow guaranteeing that you never modify that
/// region again as long as there are readers alive.
pub struct OwnedDmaFile {
file: OwnedGlommioFile,
o_direct_alignment: u64,
Expand Down Expand Up @@ -762,6 +789,49 @@ impl OwnedDmaFile {
pollable: self.pollable,
})
}

/// Returns an `Option` containing the path associated with this open
/// directory, or `None` if there isn't one.
pub fn path(&self) -> Option<&Path> {
self.file.path.as_deref()
}

/// The inode backing the file. A file with the same inode may appear under multiple
/// paths due to renaming and linking.
pub fn inode(&self) -> u64 {
self.file.inode
}

/// The major ID of the device containing the filesystem where the file resides.
/// The device may be found by issuing a `readlink`` on `/sys/dev/block/<major>:<minor>`
pub fn dev_major(&self) -> u32 {
self.file.dev_major
}

/// The minor ID of the device containing the filesystem where the file resides.
pub fn dev_minor(&self) -> u32 {
self.file.dev_minor
}

/// Returns the alignment required for I/O operations. Typical values will
/// be 512 (NVME drive is configured in slower compat mode) or 4096
/// (typical TLC native alignment).
pub fn alignment(&self) -> u64 {
self.o_direct_alignment
}

/// Downgrade to a weak reference that doesn't keep the FD open but can be upgraded at a later
/// date on any thread to an OwnedDmaFile. There are safety implications if multiple threads
/// get involved (see the safety note for [OwnedDmaFile]).
pub fn downgrade(&self) -> WeakDmaFile {
WeakDmaFile {
file: self.file.downgrade(),
o_direct_alignment: self.o_direct_alignment,
max_sectors_size: self.max_sectors_size,
max_segment_size: self.max_segment_size,
pollable: self.pollable,
}
}
}

impl From<DmaFile> for OwnedDmaFile {
Expand Down Expand Up @@ -794,6 +864,64 @@ impl AsRawFd for OwnedDmaFile {
}
}

/// This holds a weak non-owning reference to the file that can attempt to be upgraded to an owned
/// reference on any thread. Underneath it uses Arc so there is an atomic cost involved in constructing
/// it / cloning. However, it can be useful if you want to grant conditional access at a future date
/// to a different thread or even to provide access to the FD for use outside glommio. For example,
/// upgrade to an OwnedDmaFile which gives you access to the underlying fd via [OwnedDmaFile::as_raw_fd].
///
/// SAFETY: See [OwnedDmaFile].
#[derive(Debug, Clone)]
pub struct WeakDmaFile {
file: WeakGlommioFile,
o_direct_alignment: u64,
max_sectors_size: usize,
max_segment_size: usize,
pollable: PollableStatus,
}

impl WeakDmaFile {
pub fn upgrade(&self) -> Option<OwnedDmaFile> {
self.file.upgrade().map(|file| OwnedDmaFile {
file,
o_direct_alignment: self.o_direct_alignment,
max_sectors_size: self.max_sectors_size,
max_segment_size: self.max_segment_size,
pollable: self.pollable,
})
}

/// Returns an `Option` containing the path associated with this open
/// directory, or `None` if there isn't one.
pub fn path(&self) -> Option<&Path> {
self.file.path.as_deref()
}

/// The inode backing the file. A file with the same inode may appear under multiple
/// paths due to renaming and linking.
pub fn inode(&self) -> u64 {
self.file.inode
}

/// The major ID of the device containing the filesystem where the file resides.
/// The device may be found by issuing a `readlink`` on `/sys/dev/block/<major>:<minor>`
pub fn dev_major(&self) -> u32 {
self.file.dev_major
}

/// The minor ID of the device containing the filesystem where the file resides.
pub fn dev_minor(&self) -> u32 {
self.file.dev_minor
}

/// Returns the alignment required for I/O operations. Typical values will
/// be 512 (NVME drive is configured in slower compat mode) or 4096
/// (typical TLC native alignment).
pub fn alignment(&self) -> u64 {
self.o_direct_alignment
}
}

#[cfg(test)]
pub(crate) mod test {
use super::*;
Expand Down Expand Up @@ -1938,4 +2066,64 @@ pub(crate) mod test {
.expect("File should be read from foreground thread successfully");
assert_eq!(&*found_buffer, expected_buffer.as_slice());
});

dma_file_test!(basic_weak_file, path, _k, {
let file = OpenOptions::new()
.create_new(true)
.read(true)
.write(true)
.tmpfile(true)
.dma_open(&path)
.await
.unwrap();

let weak: WeakDmaFile = file.downgrade();
assert!(weak.upgrade().is_some(), "File still alive");
assert_eq!(weak.inode(), file.inode());

let cloned_weak = weak.clone();
assert_eq!(cloned_weak.inode(), weak.inode());

let cloned = file.clone();

std::mem::drop(file);

assert!(
weak.upgrade().is_some(),
"Clone still keeping the file alive"
);

std::mem::drop(cloned);

assert!(weak.upgrade().is_none(), "Weak reference no longer valid");
assert!(cloned_weak.upgrade().is_none(), "Weak reference no longer valid");
});

dma_file_test!(weak_file_upgrade_off_thread, path, _k, {
let file = OpenOptions::new()
.create_new(true)
.read(true)
.write(true)
.tmpfile(true)
.dma_open(&path)
.await
.unwrap();

let inode = file.inode();

let weak: WeakDmaFile = file.downgrade();
assert_eq!(weak.inode(), inode);

crate::executor().spawn_blocking(move || {
let local_ex =
crate::executor::LocalExecutorBuilder::new(crate::executor::Placement::Unbound)
.record_io_latencies(true)
.make()
.unwrap();
local_ex.run(async move {
let upgraded: DmaFile = weak.upgrade().unwrap().into();
assert_eq!(upgraded.inode(), inode);
});
}).await;
});
}
49 changes: 48 additions & 1 deletion glommio/src/io/glommio_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::{
os::unix::io::{AsRawFd, FromRawFd, RawFd},
path::{Path, PathBuf},
rc::{Rc, Weak},
sync::Arc,
sync::{Arc, Weak as AWeak},
};

type Result<T> = crate::Result<T, ()>;
Expand Down Expand Up @@ -346,6 +346,19 @@ impl GlommioFile {
let st = self.statx().await?;
Ok(st.stx_size)
}

pub(crate) fn downgrade(&self) -> WeakGlommioFile {
WeakGlommioFile {
fd: self
.file
.as_ref()
.map_or(AWeak::new(), Arc::downgrade),
path: self.path.borrow().clone(),
inode: self.inode,
dev_major: self.dev_major,
dev_minor: self.dev_minor,
}
}
}

/// This lets you open a DmaFile on one thread and then send it safely to another thread for processing.
Expand Down Expand Up @@ -373,6 +386,19 @@ impl OwnedGlommioFile {
dev_minor: self.dev_minor,
})
}

pub fn downgrade(&self) -> WeakGlommioFile {
WeakGlommioFile {
fd: self
.fd
.as_ref()
.map_or(AWeak::new(), Arc::downgrade),
path: self.path.clone(),
inode: self.inode,
dev_major: self.dev_major,
dev_minor: self.dev_minor,
}
}
}

impl AsRawFd for OwnedGlommioFile {
Expand Down Expand Up @@ -417,6 +443,27 @@ impl From<GlommioFile> for OwnedGlommioFile {
}
}

#[derive(Debug, Clone)]
pub(crate) struct WeakGlommioFile {
pub(crate) fd: AWeak<RawFd>,
pub(crate) path: Option<PathBuf>,
pub(crate) inode: u64,
pub(crate) dev_major: u32,
pub(crate) dev_minor: u32,
}

impl WeakGlommioFile {
pub(crate) fn upgrade(&self) -> Option<OwnedGlommioFile> {
self.fd.upgrade().map(|fd| OwnedGlommioFile {
fd: Some(fd),
path: self.path.clone(),
inode: self.inode,
dev_major: self.dev_major,
dev_minor: self.dev_minor,
})
}
}

#[cfg(test)]
pub(crate) mod test {
use super::*;
Expand Down

0 comments on commit 2eaf724

Please sign in to comment.