From afa47cbe63d86d66e7c281a26e02f33143c1f0c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Mon, 28 Aug 2023 10:27:12 +0200 Subject: [PATCH 01/11] Move VirtualFile::seek to inherent function --- .../src/tenant/storage_layer/delta_layer.rs | 2 +- .../src/tenant/storage_layer/image_layer.rs | 2 +- pageserver/src/virtual_file.rs | 52 +++++++++---------- 3 files changed, 27 insertions(+), 29 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index f9a8c52c2f0f..60427a22e465 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -45,8 +45,8 @@ use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind}; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; use std::fs::{self, File}; +use std::io::SeekFrom; use std::io::{BufWriter, Write}; -use std::io::{Seek, SeekFrom}; use std::ops::Range; use std::os::unix::fs::FileExt; use std::path::{Path, PathBuf}; diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 32f20e6227f6..f329041fb1c4 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -42,8 +42,8 @@ use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind}; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; use std::fs::{self, File}; +use std::io::SeekFrom; use std::io::Write; -use std::io::{Seek, SeekFrom}; use std::ops::Range; use std::os::unix::prelude::FileExt; use std::path::{Path, PathBuf}; diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 48d43a4e638f..1bde33c12a63 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -400,6 +400,31 @@ impl VirtualFile { drop(self); std::fs::remove_file(path).expect("failed to remove the virtual file"); } + + pub fn seek(&mut self, pos: SeekFrom) -> Result { + match pos { + SeekFrom::Start(offset) => { + self.pos = offset; + } + SeekFrom::End(offset) => { + self.pos = self.with_file("seek", |mut file| file.seek(SeekFrom::End(offset)))?? + } + SeekFrom::Current(offset) => { + let pos = self.pos as i128 + offset as i128; + if pos < 0 { + return Err(Error::new( + ErrorKind::InvalidInput, + "offset would be negative", + )); + } + if pos > u64::MAX as i128 { + return Err(Error::new(ErrorKind::InvalidInput, "offset overflow")); + } + self.pos = pos as u64; + } + } + Ok(self.pos) + } } impl Drop for VirtualFile { @@ -446,33 +471,6 @@ impl Write for VirtualFile { } } -impl Seek for VirtualFile { - fn seek(&mut self, pos: SeekFrom) -> Result { - match pos { - SeekFrom::Start(offset) => { - self.pos = offset; - } - SeekFrom::End(offset) => { - self.pos = self.with_file("seek", |mut file| file.seek(SeekFrom::End(offset)))?? - } - SeekFrom::Current(offset) => { - let pos = self.pos as i128 + offset as i128; - if pos < 0 { - return Err(Error::new( - ErrorKind::InvalidInput, - "offset would be negative", - )); - } - if pos > u64::MAX as i128 { - return Err(Error::new(ErrorKind::InvalidInput, "offset overflow")); - } - self.pos = pos as u64; - } - } - Ok(self.pos) - } -} - impl FileExt for VirtualFile { fn read_at(&self, buf: &mut [u8], offset: u64) -> Result { let result = self.with_file("read", |file| file.read_at(buf, offset))?; From 9308967ff8191496c62000a09ea12510eb22595b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Mon, 28 Aug 2023 11:31:43 +0200 Subject: [PATCH 02/11] Move used FileExt functions to inherent impls --- pageserver/src/tenant/block_io.rs | 1 - pageserver/src/tenant/ephemeral_file.rs | 1 - pageserver/src/virtual_file.rs | 84 ++++++++++++++++++------- 3 files changed, 62 insertions(+), 24 deletions(-) diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 93b79d211dfc..8da0bee5b271 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -8,7 +8,6 @@ use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ}; use crate::virtual_file::VirtualFile; use bytes::Bytes; use std::ops::{Deref, DerefMut}; -use std::os::unix::fs::FileExt; /// This is implemented by anything that can read 8 kB (PAGE_SZ) /// blocks, using the page cache diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 31db3869d978..02ef7166e578 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -9,7 +9,6 @@ use std::cmp::min; use std::fs::OpenOptions; use std::io::{self, ErrorKind}; use std::ops::DerefMut; -use std::os::unix::prelude::FileExt; use std::path::PathBuf; use std::sync::atomic::AtomicU64; use tracing::*; diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 1bde33c12a63..9d1ea7b5705a 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -425,6 +425,68 @@ impl VirtualFile { } Ok(self.pos) } + + // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 + pub fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> { + while !buf.is_empty() { + match self.read_at(buf, offset) { + Ok(0) => { + return Err(Error::new( + std::io::ErrorKind::UnexpectedEof, + "failed to fill whole buffer", + )) + } + Ok(n) => { + buf = &mut buf[n..]; + offset += n as u64; + } + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + } + Ok(()) + } + + // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235 + pub fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> { + while !buf.is_empty() { + match self.write_at(buf, offset) { + Ok(0) => { + return Err(Error::new( + std::io::ErrorKind::WriteZero, + "failed to write whole buffer", + )); + } + Ok(n) => { + buf = &buf[n..]; + offset += n as u64; + } + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + } + Ok(()) + } + + pub fn read_at(&self, buf: &mut [u8], offset: u64) -> Result { + let result = self.with_file("read", |file| file.read_at(buf, offset))?; + if let Ok(size) = result { + STORAGE_IO_SIZE + .with_label_values(&["read", &self.tenant_id, &self.timeline_id]) + .add(size as i64); + } + result + } + + pub fn write_at(&self, buf: &[u8], offset: u64) -> Result { + let result = self.with_file("write", |file| file.write_at(buf, offset))?; + if let Ok(size) = result { + STORAGE_IO_SIZE + .with_label_values(&["write", &self.tenant_id, &self.timeline_id]) + .add(size as i64); + } + result + } } impl Drop for VirtualFile { @@ -471,28 +533,6 @@ impl Write for VirtualFile { } } -impl FileExt for VirtualFile { - fn read_at(&self, buf: &mut [u8], offset: u64) -> Result { - let result = self.with_file("read", |file| file.read_at(buf, offset))?; - if let Ok(size) = result { - STORAGE_IO_SIZE - .with_label_values(&["read", &self.tenant_id, &self.timeline_id]) - .add(size as i64); - } - result - } - - fn write_at(&self, buf: &[u8], offset: u64) -> Result { - let result = self.with_file("write", |file| file.write_at(buf, offset))?; - if let Ok(size) = result { - STORAGE_IO_SIZE - .with_label_values(&["write", &self.tenant_id, &self.timeline_id]) - .add(size as i64); - } - result - } -} - impl OpenFiles { fn new(num_slots: usize) -> OpenFiles { let mut slots = Box::new(Vec::with_capacity(num_slots)); From aa63910d2644ba6d6e625f555ba0758d34417dc6 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 31 Aug 2023 11:49:08 +0200 Subject: [PATCH 03/11] Make FileBlockReader::fill_buffer async fn --- pageserver/src/tenant/block_io.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 8da0bee5b271..f50439593710 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -154,7 +154,7 @@ impl FileBlockReader { } /// Read a page from the underlying file into given buffer. - fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> { + async fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> { assert!(buf.len() == PAGE_SZ); self.file.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64) } @@ -178,7 +178,7 @@ impl FileBlockReader { ReadBufResult::Found(guard) => break Ok(guard.into()), ReadBufResult::NotFound(mut write_guard) => { // Read the page from disk into the buffer - self.fill_buffer(write_guard.deref_mut(), blknum)?; + self.fill_buffer(write_guard.deref_mut(), blknum).await?; write_guard.mark_valid(); // Swap for read lock From de7cd038995e8014602af710397543591cbea1cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Sat, 2 Sep 2023 10:54:28 +0200 Subject: [PATCH 04/11] Remove unused Read impl for VirtualFile --- pageserver/src/virtual_file.rs | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 9d1ea7b5705a..846da7d7030d 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -13,7 +13,7 @@ use crate::metrics::{STORAGE_IO_SIZE, STORAGE_IO_TIME}; use once_cell::sync::OnceCell; use std::fs::{self, File, OpenOptions}; -use std::io::{Error, ErrorKind, Read, Seek, SeekFrom, Write}; +use std::io::{Error, ErrorKind, Seek, SeekFrom, Write}; use std::os::unix::fs::FileExt; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -509,15 +509,6 @@ impl Drop for VirtualFile { } } -impl Read for VirtualFile { - fn read(&mut self, buf: &mut [u8]) -> Result { - let pos = self.pos; - let n = self.read_at(buf, pos)?; - self.pos += n as u64; - Ok(n) - } -} - impl Write for VirtualFile { fn write(&mut self, buf: &[u8]) -> Result { let pos = self.pos; From 668718a1cbe6c9cf83c8b82fd62a29a8bd9d9709 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 31 Aug 2023 12:05:10 +0200 Subject: [PATCH 05/11] Remove bounds --- pageserver/src/tenant/blob_io.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index f5ff15b50c22..e4dede2c3099 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -96,18 +96,12 @@ pub trait BlobWriter { /// An implementation of BlobWriter to write blobs to anything that /// implements std::io::Write. /// -pub struct WriteBlobWriter -where - W: std::io::Write, -{ +pub struct WriteBlobWriter { inner: W, offset: u64, } -impl WriteBlobWriter -where - W: std::io::Write, -{ +impl WriteBlobWriter { pub fn new(inner: W, start_offset: u64) -> Self { WriteBlobWriter { inner, From d4a61c67cbd5e9d13310c62bebdd9313e457ebc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Sat, 2 Sep 2023 11:08:25 +0200 Subject: [PATCH 06/11] Make write_all_at async fn --- pageserver/src/tenant/ephemeral_file.rs | 13 +++++++++---- pageserver/src/virtual_file.rs | 2 +- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 02ef7166e578..1edb86f8570a 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -127,10 +127,15 @@ impl EphemeralFile { self.off += n; src_remaining = &src_remaining[n..]; if self.off == PAGE_SZ { - match self.ephemeral_file.file.write_all_at( - &self.ephemeral_file.mutable_tail, - self.blknum as u64 * PAGE_SZ as u64, - ) { + match self + .ephemeral_file + .file + .write_all_at( + &self.ephemeral_file.mutable_tail, + self.blknum as u64 * PAGE_SZ as u64, + ) + .await + { Ok(_) => { // Pre-warm the page cache with what we just wrote. // This isn't necessary for coherency/correctness, but it's how we've always done it. diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 846da7d7030d..b74caf5dfc24 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -448,7 +448,7 @@ impl VirtualFile { } // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235 - pub fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> { + pub async fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> { while !buf.is_empty() { match self.write_at(buf, offset) { Ok(0) => { From d238793a4895f84c9c1260e42841b42b9f35dfed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 1 Sep 2023 00:19:40 +0200 Subject: [PATCH 07/11] Add MaybeVirtualFile and use it in tests --- pageserver/src/virtual_file.rs | 125 +++++++++++++++++++++------------ 1 file changed, 82 insertions(+), 43 deletions(-) diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index b74caf5dfc24..513cdd7d69ab 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -580,30 +580,67 @@ mod tests { use std::sync::Arc; use std::thread; - // Helper function to slurp contents of a file, starting at the current position, - // into a string - fn read_string(vfile: &mut FD) -> Result - where - FD: Read, - { - let mut buf = String::new(); - vfile.read_to_string(&mut buf)?; - Ok(buf) + enum MaybeVirtualFile { + VirtualFile(VirtualFile), + File(File), } - // Helper function to slurp a portion of a file into a string - fn read_string_at(vfile: &mut FD, pos: u64, len: usize) -> Result - where - FD: FileExt, - { - let mut buf = Vec::new(); - buf.resize(len, 0); - vfile.read_exact_at(&mut buf, pos)?; - Ok(String::from_utf8(buf).unwrap()) + impl MaybeVirtualFile { + fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> { + match self { + MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset), + MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset), + } + } + async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> { + match self { + MaybeVirtualFile::VirtualFile(file) => file.write_all_at(buf, offset).await, + MaybeVirtualFile::File(file) => file.write_all_at(buf, offset), + } + } + fn seek(&mut self, pos: SeekFrom) -> Result { + match self { + MaybeVirtualFile::VirtualFile(file) => file.seek(pos), + MaybeVirtualFile::File(file) => file.seek(pos), + } + } + async fn write_all(&mut self, buf: &[u8]) -> Result<(), Error> { + match self { + MaybeVirtualFile::VirtualFile(file) => file.write_all(buf), + MaybeVirtualFile::File(file) => file.write_all(buf), + } + } + + // Helper function to slurp contents of a file, starting at the current position, + // into a string + async fn read_string(&mut self) -> Result { + use std::io::Read; + let mut buf = String::new(); + match self { + MaybeVirtualFile::VirtualFile(file) => { + let pos = file.seek(SeekFrom::Current(0))?; + let len = file.metadata()?.len().saturating_sub(pos); + let len_usize = len.try_into().unwrap(); + return self.read_string_at(pos, len_usize).await; + } + MaybeVirtualFile::File(file) => { + file.read_to_string(&mut buf)?; + } + } + Ok(buf) + } + + // Helper function to slurp a portion of a file into a string + async fn read_string_at(&mut self, pos: u64, len: usize) -> Result { + let mut buf = Vec::new(); + buf.resize(len, 0); + self.read_exact_at(&mut buf, pos)?; + Ok(String::from_utf8(buf).unwrap()) + } } - #[test] - fn test_virtual_files() -> Result<(), Error> { + #[tokio::test] + async fn test_virtual_files() -> Result<(), Error> { // The real work is done in the test_files() helper function. This // allows us to run the same set of tests against a native File, and // VirtualFile. We trust the native Files and wouldn't need to test them, @@ -612,21 +649,23 @@ mod tests { // native files, you will run out of file descriptors if the ulimit // is low enough.) test_files("virtual_files", |path, open_options| { - VirtualFile::open_with_options(path, open_options) + let vf = VirtualFile::open_with_options(path, open_options)?; + Ok(MaybeVirtualFile::VirtualFile(vf)) }) + .await } - #[test] - fn test_physical_files() -> Result<(), Error> { + #[tokio::test] + async fn test_physical_files() -> Result<(), Error> { test_files("physical_files", |path, open_options| { - open_options.open(path) + Ok(MaybeVirtualFile::File(open_options.open(path)?)) }) + .await } - fn test_files(testname: &str, openfunc: OF) -> Result<(), Error> + async fn test_files(testname: &str, openfunc: OF) -> Result<(), Error> where - FD: Read + Write + Seek + FileExt, - OF: Fn(&Path, &OpenOptions) -> Result, + OF: Fn(&Path, &OpenOptions) -> Result, { let testdir = crate::config::PageServerConf::test_repo_dir(testname); std::fs::create_dir_all(&testdir)?; @@ -636,36 +675,36 @@ mod tests { &path_a, OpenOptions::new().write(true).create(true).truncate(true), )?; - file_a.write_all(b"foobar")?; + file_a.write_all(b"foobar").await?; // cannot read from a file opened in write-only mode - assert!(read_string(&mut file_a).is_err()); + assert!(file_a.read_string().await.is_err()); // Close the file and re-open for reading let mut file_a = openfunc(&path_a, OpenOptions::new().read(true))?; // cannot write to a file opened in read-only mode - assert!(file_a.write(b"bar").is_err()); + assert!(file_a.write_all(b"bar").await.is_err()); // Try simple read - assert_eq!("foobar", read_string(&mut file_a)?); + assert_eq!("foobar", file_a.read_string().await?); // It's positioned at the EOF now. - assert_eq!("", read_string(&mut file_a)?); + assert_eq!("", file_a.read_string().await?); // Test seeks. assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1); - assert_eq!("oobar", read_string(&mut file_a)?); + assert_eq!("oobar", file_a.read_string().await?); assert_eq!(file_a.seek(SeekFrom::End(-2))?, 4); - assert_eq!("ar", read_string(&mut file_a)?); + assert_eq!("ar", file_a.read_string().await?); assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1); assert_eq!(file_a.seek(SeekFrom::Current(2))?, 3); - assert_eq!("bar", read_string(&mut file_a)?); + assert_eq!("bar", file_a.read_string().await?); assert_eq!(file_a.seek(SeekFrom::Current(-5))?, 1); - assert_eq!("oobar", read_string(&mut file_a)?); + assert_eq!("oobar", file_a.read_string().await?); // Test erroneous seeks to before byte 0 assert!(file_a.seek(SeekFrom::End(-7)).is_err()); @@ -673,7 +712,7 @@ mod tests { assert!(file_a.seek(SeekFrom::Current(-2)).is_err()); // the erroneous seek should have left the position unchanged - assert_eq!("oobar", read_string(&mut file_a)?); + assert_eq!("oobar", file_a.read_string().await?); // Create another test file, and try FileExt functions on it. let path_b = testdir.join("file_b"); @@ -685,10 +724,10 @@ mod tests { .create(true) .truncate(true), )?; - file_b.write_all_at(b"BAR", 3)?; - file_b.write_all_at(b"FOO", 0)?; + file_b.write_all_at(b"BAR", 3).await?; + file_b.write_all_at(b"FOO", 0).await?; - assert_eq!(read_string_at(&mut file_b, 2, 3)?, "OBA"); + assert_eq!(file_b.read_string_at(2, 3).await?, "OBA"); // Open a lot of files, enough to cause some evictions. (Or to be precise, // open the same file many times. The effect is the same.) @@ -699,7 +738,7 @@ mod tests { let mut vfiles = Vec::new(); for _ in 0..100 { let mut vfile = openfunc(&path_b, OpenOptions::new().read(true))?; - assert_eq!("FOOBAR", read_string(&mut vfile)?); + assert_eq!("FOOBAR", vfile.read_string().await?); vfiles.push(vfile); } @@ -708,13 +747,13 @@ mod tests { // The underlying file descriptor for 'file_a' should be closed now. Try to read // from it again. We left the file positioned at offset 1 above. - assert_eq!("oobar", read_string(&mut file_a)?); + assert_eq!("oobar", file_a.read_string().await?); // Check that all the other FDs still work too. Use them in random order for // good measure. vfiles.as_mut_slice().shuffle(&mut thread_rng()); for vfile in vfiles.iter_mut() { - assert_eq!("OOBAR", read_string_at(vfile, 1, 5)?); + assert_eq!("OOBAR", vfile.read_string_at(1, 5).await?); } Ok(()) From d04ee8c526a12bd9af71c0bcb483c75925dd9d81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 1 Sep 2023 07:39:25 +0200 Subject: [PATCH 08/11] Make read_exact_at async fn --- pageserver/src/tenant/block_io.rs | 4 ++- pageserver/src/tenant/ephemeral_file.rs | 3 +- pageserver/src/virtual_file.rs | 45 +++++++++++-------------- 3 files changed, 24 insertions(+), 28 deletions(-) diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index f50439593710..645ec81036b2 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -156,7 +156,9 @@ impl FileBlockReader { /// Read a page from the underlying file into given buffer. async fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> { assert!(buf.len() == PAGE_SZ); - self.file.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64) + self.file + .read_exact_at(buf, blkno as u64 * PAGE_SZ as u64) + .await } /// Read a block. /// diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 1edb86f8570a..4c5fe424f3ca 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -87,7 +87,8 @@ impl EphemeralFile { let buf: &mut [u8] = write_guard.deref_mut(); debug_assert_eq!(buf.len(), PAGE_SZ); self.file - .read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)?; + .read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64) + .await?; write_guard.mark_valid(); // Swap for read lock diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 513cdd7d69ab..a048967f3208 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -427,7 +427,7 @@ impl VirtualFile { } // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 - pub fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> { + pub async fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> { while !buf.is_empty() { match self.read_at(buf, offset) { Ok(0) => { @@ -578,7 +578,6 @@ mod tests { use rand::thread_rng; use rand::Rng; use std::sync::Arc; - use std::thread; enum MaybeVirtualFile { VirtualFile(VirtualFile), @@ -586,9 +585,9 @@ mod tests { } impl MaybeVirtualFile { - fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> { + async fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> { match self { - MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset), + MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset).await, MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset), } } @@ -634,7 +633,7 @@ mod tests { async fn read_string_at(&mut self, pos: u64, len: usize) -> Result { let mut buf = Vec::new(); buf.resize(len, 0); - self.read_exact_at(&mut buf, pos)?; + self.read_exact_at(&mut buf, pos).await?; Ok(String::from_utf8(buf).unwrap()) } } @@ -788,28 +787,22 @@ mod tests { let files = Arc::new(files); // Launch many threads, and use the virtual files concurrently in random order. - let mut threads = Vec::new(); - for threadno in 0..THREADS { - let builder = - thread::Builder::new().name(format!("test_vfile_concurrency thread {}", threadno)); - + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(THREADS) + .thread_name("test_vfile_concurrency thread") + .build() + .unwrap(); + for _threadno in 0..THREADS { let files = files.clone(); - let thread = builder - .spawn(move || { - let mut buf = [0u8; SIZE]; - let mut rng = rand::thread_rng(); - for _ in 1..1000 { - let f = &files[rng.gen_range(0..files.len())]; - f.read_exact_at(&mut buf, 0).unwrap(); - assert!(buf == SAMPLE); - } - }) - .unwrap(); - threads.push(thread); - } - - for thread in threads { - thread.join().unwrap(); + rt.spawn(async move { + let mut buf = [0u8; SIZE]; + let mut rng = rand::rngs::OsRng; + for _ in 1..1000 { + let f = &files[rng.gen_range(0..files.len())]; + f.read_exact_at(&mut buf, 0).await.unwrap(); + assert!(buf == SAMPLE); + } + }); } Ok(()) From 3ea1e532946cb5947a2e4d2b85c039135e295ac2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 1 Sep 2023 08:13:47 +0200 Subject: [PATCH 09/11] Make read_at async fn --- pageserver/src/virtual_file.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index a048967f3208..21bdfbb24d5b 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -429,7 +429,7 @@ impl VirtualFile { // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 pub async fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> { while !buf.is_empty() { - match self.read_at(buf, offset) { + match self.read_at(buf, offset).await { Ok(0) => { return Err(Error::new( std::io::ErrorKind::UnexpectedEof, @@ -468,7 +468,7 @@ impl VirtualFile { Ok(()) } - pub fn read_at(&self, buf: &mut [u8], offset: u64) -> Result { + pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result { let result = self.with_file("read", |file| file.read_at(buf, offset))?; if let Ok(size) = result { STORAGE_IO_SIZE From e202451847d9dcf536e7dcd369598c65b350c379 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Mon, 4 Sep 2023 14:07:09 +0200 Subject: [PATCH 10/11] Use unwrap_err --- pageserver/src/virtual_file.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 21bdfbb24d5b..00bccc48d33b 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -677,13 +677,13 @@ mod tests { file_a.write_all(b"foobar").await?; // cannot read from a file opened in write-only mode - assert!(file_a.read_string().await.is_err()); + let _ = file_a.read_string().await.unwrap_err(); // Close the file and re-open for reading let mut file_a = openfunc(&path_a, OpenOptions::new().read(true))?; // cannot write to a file opened in read-only mode - assert!(file_a.write_all(b"bar").await.is_err()); + let _ = file_a.write_all(b"bar").await.unwrap_err(); // Try simple read assert_eq!("foobar", file_a.read_string().await?); From 021c0bc1e320dcbd04885f435aa02b4df48c53f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Mon, 4 Sep 2023 16:23:58 +0200 Subject: [PATCH 11/11] Introduce and use read_to_end instead This fixes the test failures --- pageserver/src/virtual_file.rs | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 00bccc48d33b..bfa71c2a0947 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -426,6 +426,22 @@ impl VirtualFile { Ok(self.pos) } + #[cfg(test)] + async fn read_to_end(&mut self, buf: &mut Vec) -> Result<(), Error> { + loop { + let mut tmp = [0; 128]; + match self.read_at(&mut tmp, self.pos).await { + Ok(0) => return Ok(()), + Ok(n) => { + self.pos += n as u64; + buf.extend_from_slice(&tmp[..n]); + } + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + } + } + // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 pub async fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> { while !buf.is_empty() { @@ -617,10 +633,9 @@ mod tests { let mut buf = String::new(); match self { MaybeVirtualFile::VirtualFile(file) => { - let pos = file.seek(SeekFrom::Current(0))?; - let len = file.metadata()?.len().saturating_sub(pos); - let len_usize = len.try_into().unwrap(); - return self.read_string_at(pos, len_usize).await; + let mut buf = Vec::new(); + file.read_to_end(&mut buf).await?; + return Ok(String::from_utf8(buf).unwrap()); } MaybeVirtualFile::File(file) => { file.read_to_string(&mut buf)?; @@ -631,8 +646,7 @@ mod tests { // Helper function to slurp a portion of a file into a string async fn read_string_at(&mut self, pos: u64, len: usize) -> Result { - let mut buf = Vec::new(); - buf.resize(len, 0); + let mut buf = vec![0; len]; self.read_exact_at(&mut buf, pos).await?; Ok(String::from_utf8(buf).unwrap()) }