From d6b1f028c9216d5ce6cc19130d725115ab117729 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 2 Oct 2023 16:01:11 +0000 Subject: [PATCH] Revert "revert recent VirtualFile asyncification changes (#5291)" This reverts commit ab1f37e90849f9d0e5cd1f4eb9d5d30a89eb4adc. --- pageserver/src/virtual_file.rs | 170 +++++++++++++++++++++------------ 1 file changed, 107 insertions(+), 63 deletions(-) diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index dfb8d397b4d5..6c9873c35cde 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -18,7 +18,8 @@ use std::io::{Error, ErrorKind, Seek, SeekFrom}; use std::os::unix::fs::FileExt; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::{RwLock, RwLockWriteGuard}; +use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; +use tokio::time::Instant; /// /// A virtual file descriptor. You can use this just like std::fs::File, but internally @@ -110,7 +111,7 @@ impl OpenFiles { /// /// On return, we hold a lock on the slot, and its 'tag' has been updated /// recently_used has been set. It's all ready for reuse. - fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard) { + async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard) { // // Run the clock algorithm to find a slot to replace. // @@ -142,7 +143,7 @@ impl OpenFiles { } retries += 1; } else { - slot_guard = slot.inner.write().unwrap(); + slot_guard = slot.inner.write().await; index = next; break; } @@ -153,7 +154,7 @@ impl OpenFiles { // old file. // if let Some(old_file) = slot_guard.file.take() { - // the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to + // the normal path of dropping VirtualFile uses `Close`, use `CloseByReplace` here to // distinguish the two. STORAGE_IO_TIME_METRIC .get(StorageIoOperation::CloseByReplace) @@ -208,6 +209,29 @@ impl CrashsafeOverwriteError { } } +/// Observe duration for the given storage I/O operation +/// +/// Unlike `observe_closure_duration`, this supports async, +/// where "support" means that we measure wall clock time. +macro_rules! observe_duration { + ($op:expr, $($body:tt)*) => {{ + let instant = Instant::now(); + let result = $($body)*; + let elapsed = instant.elapsed().as_secs_f64(); + STORAGE_IO_TIME_METRIC + .get($op) + .observe(elapsed); + result + }} +} + +macro_rules! with_file { + ($this:expr, $op:expr, | $ident:ident | $($body:tt)*) => {{ + let $ident = $this.lock_file().await?; + observe_duration!($op, $($body)*) + }}; +} + impl VirtualFile { /// Open a file in read-only mode. Like File::open. pub async fn open(path: &Path) -> Result { @@ -244,11 +268,9 @@ impl VirtualFile { tenant_id = "*".to_string(); timeline_id = "*".to_string(); } - let (handle, mut slot_guard) = get_open_files().find_victim_slot(); + let (handle, mut slot_guard) = get_open_files().find_victim_slot().await; - let file = STORAGE_IO_TIME_METRIC - .get(StorageIoOperation::Open) - .observe_closure_duration(|| open_options.open(path))?; + let file = observe_duration!(StorageIoOperation::Open, open_options.open(path))?; // Strip all options other than read and write. // @@ -331,22 +353,24 @@ impl VirtualFile { /// Call File::sync_all() on the underlying File. pub async fn sync_all(&self) -> Result<(), Error> { - self.with_file(StorageIoOperation::Fsync, |file| file.sync_all()) - .await? + with_file!(self, StorageIoOperation::Fsync, |file| file + .as_ref() + .sync_all()) } pub async fn metadata(&self) -> Result { - self.with_file(StorageIoOperation::Metadata, |file| file.metadata()) - .await? + with_file!(self, StorageIoOperation::Metadata, |file| file + .as_ref() + .metadata()) } - /// Helper function that looks up the underlying File for this VirtualFile, - /// opening it and evicting some other File if necessary. It calls 'func' - /// with the physical File. - async fn with_file(&self, op: StorageIoOperation, mut func: F) -> Result - where - F: FnMut(&File) -> R, - { + /// Helper function internal to `VirtualFile` that looks up the underlying File, + /// opens it and evicts some other File if necessary. The passed parameter is + /// assumed to be a function available for the physical `File`. + /// + /// We are doing it via a macro as Rust doesn't support async closures that + /// take on parameters with lifetimes. + async fn lock_file(&self) -> Result, Error> { let open_files = get_open_files(); let mut handle_guard = { @@ -356,27 +380,23 @@ impl VirtualFile { // We only need to hold the handle lock while we read the current handle. If // another thread closes the file and recycles the slot for a different file, // we will notice that the handle we read is no longer valid and retry. - let mut handle = *self.handle.read().unwrap(); + let mut handle = *self.handle.read().await; loop { // Check if the slot contains our File { let slot = &open_files.slots[handle.index]; - let slot_guard = slot.inner.read().unwrap(); - if slot_guard.tag == handle.tag { - if let Some(file) = &slot_guard.file { - // Found a cached file descriptor. - slot.recently_used.store(true, Ordering::Relaxed); - return Ok(STORAGE_IO_TIME_METRIC - .get(op) - .observe_closure_duration(|| func(file))); - } + let slot_guard = slot.inner.read().await; + if slot_guard.tag == handle.tag && slot_guard.file.is_some() { + // Found a cached file descriptor. + slot.recently_used.store(true, Ordering::Relaxed); + return Ok(FileGuard { slot_guard }); } } // The slot didn't contain our File. We will have to open it ourselves, // but before that, grab a write lock on handle in the VirtualFile, so // that no other thread will try to concurrently open the same file. - let handle_guard = self.handle.write().unwrap(); + let handle_guard = self.handle.write().await; // If another thread changed the handle while we were not holding the lock, // then the handle might now be valid again. Loop back to retry. @@ -390,17 +410,10 @@ impl VirtualFile { // We need to open the file ourselves. The handle in the VirtualFile is // now locked in write-mode. Find a free slot to put it in. - let (handle, mut slot_guard) = open_files.find_victim_slot(); + let (handle, mut slot_guard) = open_files.find_victim_slot().await; // Open the physical file - let file = STORAGE_IO_TIME_METRIC - .get(StorageIoOperation::Open) - .observe_closure_duration(|| self.open_options.open(&self.path))?; - - // Perform the requested operation on it - let result = STORAGE_IO_TIME_METRIC - .get(op) - .observe_closure_duration(|| func(&file)); + let file = observe_duration!(StorageIoOperation::Open, self.open_options.open(&self.path))?; // Store the File in the slot and update the handle in the VirtualFile // to point to it. @@ -408,7 +421,9 @@ impl VirtualFile { *handle_guard = handle; - Ok(result) + return Ok(FileGuard { + slot_guard: slot_guard.downgrade(), + }); } pub fn remove(self) { @@ -423,11 +438,9 @@ impl VirtualFile { self.pos = offset; } SeekFrom::End(offset) => { - self.pos = self - .with_file(StorageIoOperation::Seek, |mut file| { - file.seek(SeekFrom::End(offset)) - }) - .await?? + self.pos = with_file!(self, StorageIoOperation::Seek, |file| file + .as_ref() + .seek(SeekFrom::End(offset)))? } SeekFrom::Current(offset) => { let pos = self.pos as i128 + offset as i128; @@ -515,9 +528,9 @@ impl VirtualFile { } pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result { - let result = self - .with_file(StorageIoOperation::Read, |file| file.read_at(buf, offset)) - .await?; + let result = with_file!(self, StorageIoOperation::Read, |file| file + .as_ref() + .read_at(buf, offset)); if let Ok(size) = result { STORAGE_IO_SIZE .with_label_values(&["read", &self.tenant_id, &self.timeline_id]) @@ -527,9 +540,9 @@ impl VirtualFile { } async fn write_at(&self, buf: &[u8], offset: u64) -> Result { - let result = self - .with_file(StorageIoOperation::Write, |file| file.write_at(buf, offset)) - .await?; + let result = with_file!(self, StorageIoOperation::Write, |file| file + .as_ref() + .write_at(buf, offset)); if let Ok(size) = result { STORAGE_IO_SIZE .with_label_values(&["write", &self.tenant_id, &self.timeline_id]) @@ -539,6 +552,18 @@ impl VirtualFile { } } +struct FileGuard<'a> { + slot_guard: RwLockReadGuard<'a, SlotInner>, +} + +impl<'a> AsRef for FileGuard<'a> { + fn as_ref(&self) -> &File { + // This unwrap is safe because we only create `FileGuard`s + // if we know that the file is Some. + self.slot_guard.file.as_ref().unwrap() + } +} + #[cfg(test)] impl VirtualFile { pub(crate) async fn read_blk( @@ -571,20 +596,39 @@ impl VirtualFile { impl Drop for VirtualFile { /// If a VirtualFile is dropped, close the underlying file if it was open. fn drop(&mut self) { - let handle = self.handle.get_mut().unwrap(); + let handle = self.handle.get_mut(); + + fn clean_slot(slot: &Slot, mut slot_guard: RwLockWriteGuard<'_, SlotInner>, tag: u64) { + if slot_guard.tag == tag { + slot.recently_used.store(false, Ordering::Relaxed); + // there is also the `CloseByReplace` operation for closes done on eviction for + // comparison. + STORAGE_IO_TIME_METRIC + .get(StorageIoOperation::Close) + .observe_closure_duration(|| drop(slot_guard.file.take())); + } + } - // We could check with a read-lock first, to avoid waiting on an - // unrelated I/O. + // We don't have async drop so we cannot directly await the lock here. + // Instead, first do a best-effort attempt at closing the underlying + // file descriptor by using `try_write`, and if that fails, spawn + // a tokio task to do it asynchronously: we just want it to be + // cleaned up eventually. + // Most of the time, the `try_lock` should succeed though, + // as we have `&mut self` access. In other words, if the slot + // is still occupied by our file, there should be no access from + // other I/O operations; the only other possible place to lock + // the slot is the lock algorithm looking for free slots. let slot = &get_open_files().slots[handle.index]; - let mut slot_guard = slot.inner.write().unwrap(); - if slot_guard.tag == handle.tag { - slot.recently_used.store(false, Ordering::Relaxed); - // there is also operation "close-by-replace" for closes done on eviction for - // comparison. - STORAGE_IO_TIME_METRIC - .get(StorageIoOperation::Close) - .observe_closure_duration(|| drop(slot_guard.file.take())); - } + if let Ok(slot_guard) = slot.inner.try_write() { + clean_slot(slot, slot_guard, handle.tag); + } else { + let tag = handle.tag; + tokio::spawn(async move { + let slot_guard = slot.inner.write().await; + clean_slot(slot, slot_guard, tag); + }); + }; } }