diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 0b736a7f7ec7..b58b883ab666 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -18,8 +18,7 @@ use std::fs::{self, File, OpenOptions}; use std::io::{Error, ErrorKind, Seek, SeekFrom}; use std::os::unix::fs::FileExt; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; -use tokio::time::Instant; +use std::sync::{RwLock, RwLockWriteGuard}; use utils::fs_ext; /// @@ -112,7 +111,7 @@ impl OpenFiles { /// /// On return, we hold a lock on the slot, and its 'tag' has been updated /// recently_used has been set. It's all ready for reuse. - async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard) { + fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard) { // // Run the clock algorithm to find a slot to replace. // @@ -144,7 +143,7 @@ impl OpenFiles { } retries += 1; } else { - slot_guard = slot.inner.write().await; + slot_guard = slot.inner.write().unwrap(); index = next; break; } @@ -155,7 +154,7 @@ impl OpenFiles { // old file. // if let Some(old_file) = slot_guard.file.take() { - // the normal path of dropping VirtualFile uses `Close`, use `CloseByReplace` here to + // the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to // distinguish the two. STORAGE_IO_TIME_METRIC .get(StorageIoOperation::CloseByReplace) @@ -251,29 +250,6 @@ impl MaybeFatalIo for std::io::Result { } } -/// Observe duration for the given storage I/O operation -/// -/// Unlike `observe_closure_duration`, this supports async, -/// where "support" means that we measure wall clock time. -macro_rules! observe_duration { - ($op:expr, $($body:tt)*) => {{ - let instant = Instant::now(); - let result = $($body)*; - let elapsed = instant.elapsed().as_secs_f64(); - STORAGE_IO_TIME_METRIC - .get($op) - .observe(elapsed); - result - }} -} - -macro_rules! with_file { - ($this:expr, $op:expr, | $ident:ident | $($body:tt)*) => {{ - let $ident = $this.lock_file().await?; - observe_duration!($op, $($body)*) - }}; -} - impl VirtualFile { /// Open a file in read-only mode. Like File::open. pub async fn open(path: &Utf8Path) -> Result { @@ -310,9 +286,11 @@ impl VirtualFile { tenant_id = "*".to_string(); timeline_id = "*".to_string(); } - let (handle, mut slot_guard) = get_open_files().find_victim_slot().await; + let (handle, mut slot_guard) = get_open_files().find_victim_slot(); - let file = observe_duration!(StorageIoOperation::Open, open_options.open(path))?; + let file = STORAGE_IO_TIME_METRIC + .get(StorageIoOperation::Open) + .observe_closure_duration(|| open_options.open(path))?; // Strip all options other than read and write. // @@ -382,24 +360,22 @@ impl VirtualFile { /// Call File::sync_all() on the underlying File. pub async fn sync_all(&self) -> Result<(), Error> { - with_file!(self, StorageIoOperation::Fsync, |file| file - .as_ref() - .sync_all()) + self.with_file(StorageIoOperation::Fsync, |file| file.sync_all()) + .await? } pub async fn metadata(&self) -> Result { - with_file!(self, StorageIoOperation::Metadata, |file| file - .as_ref() - .metadata()) + self.with_file(StorageIoOperation::Metadata, |file| file.metadata()) + .await? } - /// Helper function internal to `VirtualFile` that looks up the underlying File, - /// opens it and evicts some other File if necessary. The passed parameter is - /// assumed to be a function available for the physical `File`. - /// - /// We are doing it via a macro as Rust doesn't support async closures that - /// take on parameters with lifetimes. - async fn lock_file(&self) -> Result, Error> { + /// Helper function that looks up the underlying File for this VirtualFile, + /// opening it and evicting some other File if necessary. It calls 'func' + /// with the physical File. + async fn with_file(&self, op: StorageIoOperation, mut func: F) -> Result + where + F: FnMut(&File) -> R, + { let open_files = get_open_files(); let mut handle_guard = { @@ -409,23 +385,27 @@ impl VirtualFile { // We only need to hold the handle lock while we read the current handle. If // another thread closes the file and recycles the slot for a different file, // we will notice that the handle we read is no longer valid and retry. - let mut handle = *self.handle.read().await; + let mut handle = *self.handle.read().unwrap(); loop { // Check if the slot contains our File { let slot = &open_files.slots[handle.index]; - let slot_guard = slot.inner.read().await; - if slot_guard.tag == handle.tag && slot_guard.file.is_some() { - // Found a cached file descriptor. - slot.recently_used.store(true, Ordering::Relaxed); - return Ok(FileGuard { slot_guard }); + let slot_guard = slot.inner.read().unwrap(); + if slot_guard.tag == handle.tag { + if let Some(file) = &slot_guard.file { + // Found a cached file descriptor. + slot.recently_used.store(true, Ordering::Relaxed); + return Ok(STORAGE_IO_TIME_METRIC + .get(op) + .observe_closure_duration(|| func(file))); + } } } // The slot didn't contain our File. We will have to open it ourselves, // but before that, grab a write lock on handle in the VirtualFile, so // that no other thread will try to concurrently open the same file. - let handle_guard = self.handle.write().await; + let handle_guard = self.handle.write().unwrap(); // If another thread changed the handle while we were not holding the lock, // then the handle might now be valid again. Loop back to retry. @@ -439,10 +419,17 @@ impl VirtualFile { // We need to open the file ourselves. The handle in the VirtualFile is // now locked in write-mode. Find a free slot to put it in. - let (handle, mut slot_guard) = open_files.find_victim_slot().await; + let (handle, mut slot_guard) = open_files.find_victim_slot(); // Open the physical file - let file = observe_duration!(StorageIoOperation::Open, self.open_options.open(&self.path))?; + let file = STORAGE_IO_TIME_METRIC + .get(StorageIoOperation::Open) + .observe_closure_duration(|| self.open_options.open(&self.path))?; + + // Perform the requested operation on it + let result = STORAGE_IO_TIME_METRIC + .get(op) + .observe_closure_duration(|| func(&file)); // Store the File in the slot and update the handle in the VirtualFile // to point to it. @@ -450,9 +437,7 @@ impl VirtualFile { *handle_guard = handle; - return Ok(FileGuard { - slot_guard: slot_guard.downgrade(), - }); + Ok(result) } pub fn remove(self) { @@ -467,9 +452,11 @@ impl VirtualFile { self.pos = offset; } SeekFrom::End(offset) => { - self.pos = with_file!(self, StorageIoOperation::Seek, |file| file - .as_ref() - .seek(SeekFrom::End(offset)))? + self.pos = self + .with_file(StorageIoOperation::Seek, |mut file| { + file.seek(SeekFrom::End(offset)) + }) + .await?? } SeekFrom::Current(offset) => { let pos = self.pos as i128 + offset as i128; @@ -557,9 +544,9 @@ impl VirtualFile { } pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result { - let result = with_file!(self, StorageIoOperation::Read, |file| file - .as_ref() - .read_at(buf, offset)); + let result = self + .with_file(StorageIoOperation::Read, |file| file.read_at(buf, offset)) + .await?; if let Ok(size) = result { STORAGE_IO_SIZE .with_label_values(&["read", &self.tenant_id, &self.timeline_id]) @@ -569,9 +556,9 @@ impl VirtualFile { } async fn write_at(&self, buf: &[u8], offset: u64) -> Result { - let result = with_file!(self, StorageIoOperation::Write, |file| file - .as_ref() - .write_at(buf, offset)); + let result = self + .with_file(StorageIoOperation::Write, |file| file.write_at(buf, offset)) + .await?; if let Ok(size) = result { STORAGE_IO_SIZE .with_label_values(&["write", &self.tenant_id, &self.timeline_id]) @@ -581,18 +568,6 @@ impl VirtualFile { } } -struct FileGuard<'a> { - slot_guard: RwLockReadGuard<'a, SlotInner>, -} - -impl<'a> AsRef for FileGuard<'a> { - fn as_ref(&self) -> &File { - // This unwrap is safe because we only create `FileGuard`s - // if we know that the file is Some. - self.slot_guard.file.as_ref().unwrap() - } -} - #[cfg(test)] impl VirtualFile { pub(crate) async fn read_blk( @@ -625,39 +600,20 @@ impl VirtualFile { impl Drop for VirtualFile { /// If a VirtualFile is dropped, close the underlying file if it was open. fn drop(&mut self) { - let handle = self.handle.get_mut(); - - fn clean_slot(slot: &Slot, mut slot_guard: RwLockWriteGuard<'_, SlotInner>, tag: u64) { - if slot_guard.tag == tag { - slot.recently_used.store(false, Ordering::Relaxed); - // there is also the `CloseByReplace` operation for closes done on eviction for - // comparison. - STORAGE_IO_TIME_METRIC - .get(StorageIoOperation::Close) - .observe_closure_duration(|| drop(slot_guard.file.take())); - } - } + let handle = self.handle.get_mut().unwrap(); - // We don't have async drop so we cannot directly await the lock here. - // Instead, first do a best-effort attempt at closing the underlying - // file descriptor by using `try_write`, and if that fails, spawn - // a tokio task to do it asynchronously: we just want it to be - // cleaned up eventually. - // Most of the time, the `try_lock` should succeed though, - // as we have `&mut self` access. In other words, if the slot - // is still occupied by our file, there should be no access from - // other I/O operations; the only other possible place to lock - // the slot is the lock algorithm looking for free slots. + // We could check with a read-lock first, to avoid waiting on an + // unrelated I/O. let slot = &get_open_files().slots[handle.index]; - if let Ok(slot_guard) = slot.inner.try_write() { - clean_slot(slot, slot_guard, handle.tag); - } else { - let tag = handle.tag; - tokio::spawn(async move { - let slot_guard = slot.inner.write().await; - clean_slot(slot, slot_guard, tag); - }); - }; + let mut slot_guard = slot.inner.write().unwrap(); + if slot_guard.tag == handle.tag { + slot.recently_used.store(false, Ordering::Relaxed); + // there is also operation "close-by-replace" for closes done on eviction for + // comparison. + STORAGE_IO_TIME_METRIC + .get(StorageIoOperation::Close) + .observe_closure_duration(|| drop(slot_guard.file.take())); + } } }