Skip to content

Commit

Permalink
refactor(virtual_file): take owned buffer in VirtualFile::write_all (#…
Browse files Browse the repository at this point in the history
…6664)

Building atop #6660 , this PR converts VirtualFile::write_all to
owned buffers.

Part of #6663
  • Loading branch information
problame authored Feb 13, 2024
1 parent 331935d commit 7fa732c
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 83 deletions.
4 changes: 2 additions & 2 deletions pageserver/src/deletion_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ impl DeletionHeader {
let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?;
let header_path = conf.deletion_header_path();
let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX);
VirtualFile::crashsafe_overwrite(&header_path, &temp_path, &header_bytes)
VirtualFile::crashsafe_overwrite(&header_path, &temp_path, header_bytes)
.await
.maybe_fatal_err("save deletion header")?;

Expand Down Expand Up @@ -325,7 +325,7 @@ impl DeletionList {
let temp_path = path_with_suffix_extension(&path, TEMP_SUFFIX);

let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list");
VirtualFile::crashsafe_overwrite(&path, &temp_path, &bytes)
VirtualFile::crashsafe_overwrite(&path, &temp_path, bytes)
.await
.maybe_fatal_err("save deletion list")
.map_err(Into::into)
Expand Down
4 changes: 2 additions & 2 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2880,7 +2880,7 @@ impl Tenant {
let config_path = config_path.to_owned();
tokio::task::spawn_blocking(move || {
Handle::current().block_on(async move {
let conf_content = conf_content.as_bytes();
let conf_content = conf_content.into_bytes();
VirtualFile::crashsafe_overwrite(&config_path, &temp_path, conf_content)
.await
.with_context(|| {
Expand Down Expand Up @@ -2917,7 +2917,7 @@ impl Tenant {
let target_config_path = target_config_path.to_owned();
tokio::task::spawn_blocking(move || {
Handle::current().block_on(async move {
let conf_content = conf_content.as_bytes();
let conf_content = conf_content.into_bytes();
VirtualFile::crashsafe_overwrite(&target_config_path, &temp_path, conf_content)
.await
.with_context(|| {
Expand Down
26 changes: 11 additions & 15 deletions pageserver/src/tenant/blob_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,27 +131,23 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
&mut self,
src_buf: B,
) -> (B::Buf, Result<(), Error>) {
let src_buf_len = src_buf.bytes_init();
let (src_buf, res) = if src_buf_len > 0 {
let src_buf = src_buf.slice(0..src_buf_len);
let res = self.inner.write_all(&src_buf).await;
let src_buf = Slice::into_inner(src_buf);
(src_buf, res)
} else {
let res = self.inner.write_all(&[]).await;
(Slice::into_inner(src_buf.slice_full()), res)
let (src_buf, res) = self.inner.write_all(src_buf).await;
let nbytes = match res {
Ok(nbytes) => nbytes,
Err(e) => return (src_buf, Err(e)),
};
if let Ok(()) = &res {
self.offset += src_buf_len as u64;
}
(src_buf, res)
self.offset += nbytes as u64;
(src_buf, Ok(()))
}

#[inline(always)]
/// Flushes the internal buffer to the underlying `VirtualFile`.
pub async fn flush_buffer(&mut self) -> Result<(), Error> {
self.inner.write_all(&self.buf).await?;
self.buf.clear();
let buf = std::mem::take(&mut self.buf);
let (mut buf, res) = self.inner.write_all(buf).await;
res?;
buf.clear();
self.buf = buf;
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/tenant/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ pub async fn save_metadata(
let path = conf.metadata_path(tenant_shard_id, timeline_id);
let temp_path = path_with_suffix_extension(&path, TEMP_FILE_SUFFIX);
let metadata_bytes = data.to_bytes().context("serialize metadata")?;
VirtualFile::crashsafe_overwrite(&path, &temp_path, &metadata_bytes)
VirtualFile::crashsafe_overwrite(&path, &temp_path, metadata_bytes)
.await
.context("write metadata")?;
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/tenant/secondary/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ impl<'a> TenantDownloader<'a> {
let heatmap_path_bg = heatmap_path.clone();
tokio::task::spawn_blocking(move || {
tokio::runtime::Handle::current().block_on(async move {
VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, &heatmap_bytes).await
VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, heatmap_bytes).await
})
})
.await
Expand Down
30 changes: 10 additions & 20 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,8 @@ impl DeltaLayerWriterInner {
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
.await?;
for buf in block_buf.blocks {
file.write_all(buf.as_ref()).await?;
let (_buf, res) = file.write_all(buf).await;
res?;
}
assert!(self.lsn_range.start < self.lsn_range.end);
// Fill in the summary on blk 0
Expand All @@ -476,17 +477,12 @@ impl DeltaLayerWriterInner {
index_root_blk,
};

let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
let mut buf = Vec::with_capacity(PAGE_SZ);
// TODO: could use smallvec here but it's a pain with Slice<T>
Summary::ser_into(&summary, &mut buf)?;
if buf.spilled() {
// This is bad as we only have one free block for the summary
warn!(
"Used more than one page size for summary buffer: {}",
buf.len()
);
}
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&buf).await?;
let (_buf, res) = file.write_all(buf).await;
res?;

let metadata = file
.metadata()
Expand Down Expand Up @@ -679,18 +675,12 @@ impl DeltaLayer {

let new_summary = rewrite(actual_summary);

let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
let mut buf = Vec::with_capacity(PAGE_SZ);
// TODO: could use smallvec here, but it's a pain with Slice<T>
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
if buf.spilled() {
// The code in DeltaLayerWriterInner just warn!()s for this.
// It should probably error out as well.
return Err(RewriteSummaryError::Other(anyhow::anyhow!(
"Used more than one page size for summary buffer: {}",
buf.len()
)));
}
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&buf).await?;
let (_buf, res) = file.write_all(buf).await;
res?;
Ok(())
}
}
Expand Down
30 changes: 10 additions & 20 deletions pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,18 +341,12 @@ impl ImageLayer {

let new_summary = rewrite(actual_summary);

let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
let mut buf = Vec::with_capacity(PAGE_SZ);
// TODO: could use smallvec here but it's a pain with Slice<T>
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
if buf.spilled() {
// The code in ImageLayerWriterInner just warn!()s for this.
// It should probably error out as well.
return Err(RewriteSummaryError::Other(anyhow::anyhow!(
"Used more than one page size for summary buffer: {}",
buf.len()
)));
}
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&buf).await?;
let (_buf, res) = file.write_all(buf).await;
res?;
Ok(())
}
}
Expand Down Expand Up @@ -555,7 +549,8 @@ impl ImageLayerWriterInner {
.await?;
let (index_root_blk, block_buf) = self.tree.finish()?;
for buf in block_buf.blocks {
file.write_all(buf.as_ref()).await?;
let (_buf, res) = file.write_all(buf).await;
res?;
}

// Fill in the summary on blk 0
Expand All @@ -570,17 +565,12 @@ impl ImageLayerWriterInner {
index_root_blk,
};

let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
let mut buf = Vec::with_capacity(PAGE_SZ);
// TODO: could use smallvec here but it's a pain with Slice<T>
Summary::ser_into(&summary, &mut buf)?;
if buf.spilled() {
// This is bad as we only have one free block for the summary
warn!(
"Used more than one page size for summary buffer: {}",
buf.len()
);
}
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&buf).await?;
let (_buf, res) = file.write_all(buf).await;
res?;

let metadata = file
.metadata()
Expand Down
66 changes: 44 additions & 22 deletions pageserver/src/virtual_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use once_cell::sync::OnceCell;
use pageserver_api::shard::TenantShardId;
use std::fs::{self, File};
use std::io::{Error, ErrorKind, Seek, SeekFrom};
use tokio_epoll_uring::IoBufMut;
use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice};

use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
use std::os::unix::fs::FileExt;
Expand Down Expand Up @@ -410,10 +410,10 @@ impl VirtualFile {
/// step, the tmp path is renamed to the final path. As renames are
/// atomic, a crash during the write operation will never leave behind a
/// partially written file.
pub async fn crashsafe_overwrite(
pub async fn crashsafe_overwrite<B: BoundedBuf>(
final_path: &Utf8Path,
tmp_path: &Utf8Path,
content: &[u8],
content: B,
) -> std::io::Result<()> {
let Some(final_path_parent) = final_path.parent() else {
return Err(std::io::Error::from_raw_os_error(
Expand All @@ -430,7 +430,8 @@ impl VirtualFile {
.create_new(true),
)
.await?;
file.write_all(content).await?;
let (_content, res) = file.write_all(content).await;
res?;
file.sync_all().await?;
drop(file); // before the rename, that's important!
// renames are atomic
Expand Down Expand Up @@ -601,23 +602,36 @@ impl VirtualFile {
Ok(())
}

pub async fn write_all(&mut self, mut buf: &[u8]) -> Result<(), Error> {
/// Writes `buf.slice(0..buf.bytes_init())`.
/// Returns the IoBuf that is underlying the BoundedBuf `buf`.
/// I.e., the returned value's `bytes_init()` method returns something different than the `bytes_init()` that was passed in.
/// It's quite brittle and easy to mis-use, so, we return the size in the Ok() variant.
pub async fn write_all<B: BoundedBuf>(&mut self, buf: B) -> (B::Buf, Result<usize, Error>) {
let nbytes = buf.bytes_init();
if nbytes == 0 {
return (Slice::into_inner(buf.slice_full()), Ok(0));
}
let mut buf = buf.slice(0..nbytes);
while !buf.is_empty() {
match self.write(buf).await {
// TODO: push `Slice` further down
match self.write(&buf).await {
Ok(0) => {
return Err(Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
));
return (
Slice::into_inner(buf),
Err(Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
)),
);
}
Ok(n) => {
buf = &buf[n..];
buf = buf.slice(n..);
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
Err(e) => return (Slice::into_inner(buf), Err(e)),
}
}
Ok(())
(Slice::into_inner(buf), Ok(nbytes))
}

async fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
Expand Down Expand Up @@ -676,7 +690,6 @@ where
F: FnMut(tokio_epoll_uring::Slice<B>, u64) -> Fut,
Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<B>, std::io::Result<usize>)>,
{
use tokio_epoll_uring::BoundedBuf;
let mut buf: tokio_epoll_uring::Slice<B> = buf.slice_full(); // includes all the uninitialized memory
while buf.bytes_total() != 0 {
let res;
Expand Down Expand Up @@ -1063,10 +1076,19 @@ mod tests {
MaybeVirtualFile::File(file) => file.seek(pos),
}
}
async fn write_all(&mut self, buf: &[u8]) -> Result<(), Error> {
async fn write_all<B: BoundedBuf>(&mut self, buf: B) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.write_all(buf).await,
MaybeVirtualFile::File(file) => file.write_all(buf),
MaybeVirtualFile::VirtualFile(file) => {
let (_buf, res) = file.write_all(buf).await;
res.map(|_| ())
}
MaybeVirtualFile::File(file) => {
let buf_len = buf.bytes_init();
if buf_len == 0 {
return Ok(());
}
file.write_all(&buf.slice(0..buf_len))
}
}
}

Expand Down Expand Up @@ -1141,7 +1163,7 @@ mod tests {
.to_owned(),
)
.await?;
file_a.write_all(b"foobar").await?;
file_a.write_all(b"foobar".to_vec()).await?;

// cannot read from a file opened in write-only mode
let _ = file_a.read_string().await.unwrap_err();
Expand All @@ -1150,7 +1172,7 @@ mod tests {
let mut file_a = openfunc(path_a, OpenOptions::new().read(true).to_owned()).await?;

// cannot write to a file opened in read-only mode
let _ = file_a.write_all(b"bar").await.unwrap_err();
let _ = file_a.write_all(b"bar".to_vec()).await.unwrap_err();

// Try simple read
assert_eq!("foobar", file_a.read_string().await?);
Expand Down Expand Up @@ -1293,7 +1315,7 @@ mod tests {
let path = testdir.join("myfile");
let tmp_path = testdir.join("myfile.tmp");

VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo")
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
Expand All @@ -1302,7 +1324,7 @@ mod tests {
assert!(!tmp_path.exists());
drop(file);

VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"bar")
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"bar".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
Expand All @@ -1324,7 +1346,7 @@ mod tests {
std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
assert!(tmp_path.exists());

VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo")
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec())
.await
.unwrap();

Expand Down

1 comment on commit 7fa732c

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2511 tests run: 2382 passed, 4 failed, 125 skipped (full report)


Failures on Postgres 14

  • test_pageserver_max_throughput_getpage_at_latest_lsn[10-6-30]: release
  • test_pageserver_max_throughput_getpage_at_latest_lsn[10-13-30]: release
  • test_pageserver_max_throughput_getpage_at_latest_lsn[1-6-30]: release
  • test_pageserver_max_throughput_getpage_at_latest_lsn[1-13-30]: release
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_pageserver_max_throughput_getpage_at_latest_lsn[10-6-30] or test_pageserver_max_throughput_getpage_at_latest_lsn[10-13-30] or test_pageserver_max_throughput_getpage_at_latest_lsn[1-6-30] or test_pageserver_max_throughput_getpage_at_latest_lsn[1-13-30]"

Code coverage (full report)

  • functions: 55.9% (12835 of 22954 functions)
  • lines: 82.5% (69238 of 83900 lines)

The comment gets automatically updated with the latest test results
7fa732c at 2024-02-13T18:33:14.609Z :recycle:

Please sign in to comment.