Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

revert recent VirtualFile asyncification changes #5291

Merged
merged 2 commits into from
Sep 12, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 63 additions & 107 deletions pageserver/src/virtual_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ use std::io::{Error, ErrorKind, Seek, SeekFrom};
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;
use std::sync::{RwLock, RwLockWriteGuard};

///
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
Expand Down Expand Up @@ -111,7 +110,7 @@ impl OpenFiles {
///
/// On return, we hold a lock on the slot, and its 'tag' has been updated
/// recently_used has been set. It's all ready for reuse.
async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
//
// Run the clock algorithm to find a slot to replace.
//
Expand Down Expand Up @@ -143,7 +142,7 @@ impl OpenFiles {
}
retries += 1;
} else {
slot_guard = slot.inner.write().await;
slot_guard = slot.inner.write().unwrap();
index = next;
break;
}
Expand All @@ -154,7 +153,7 @@ impl OpenFiles {
// old file.
//
if let Some(old_file) = slot_guard.file.take() {
// the normal path of dropping VirtualFile uses `Close`, use `CloseByReplace` here to
// the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
// distinguish the two.
STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::CloseByReplace)
Expand Down Expand Up @@ -209,29 +208,6 @@ impl CrashsafeOverwriteError {
}
}

/// Observe duration for the given storage I/O operation
///
/// Unlike `observe_closure_duration`, this supports async,
/// where "support" means that we measure wall clock time.
macro_rules! observe_duration {
($op:expr, $($body:tt)*) => {{
let instant = Instant::now();
let result = $($body)*;
let elapsed = instant.elapsed().as_secs_f64();
STORAGE_IO_TIME_METRIC
.get($op)
.observe(elapsed);
result
}}
}

macro_rules! with_file {
($this:expr, $op:expr, | $ident:ident | $($body:tt)*) => {{
let $ident = $this.lock_file().await?;
observe_duration!($op, $($body)*)
}};
}

impl VirtualFile {
/// Open a file in read-only mode. Like File::open.
pub async fn open(path: &Path) -> Result<VirtualFile, std::io::Error> {
Expand Down Expand Up @@ -268,9 +244,11 @@ impl VirtualFile {
tenant_id = "*".to_string();
timeline_id = "*".to_string();
}
let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
let (handle, mut slot_guard) = get_open_files().find_victim_slot();

let file = observe_duration!(StorageIoOperation::Open, open_options.open(path))?;
let file = STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Open)
.observe_closure_duration(|| open_options.open(path))?;

// Strip all options other than read and write.
//
Expand Down Expand Up @@ -353,24 +331,22 @@ impl VirtualFile {

/// Call File::sync_all() on the underlying File.
pub async fn sync_all(&self) -> Result<(), Error> {
with_file!(self, StorageIoOperation::Fsync, |file| file
.as_ref()
.sync_all())
self.with_file(StorageIoOperation::Fsync, |file| file.sync_all())
.await?
}

pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
with_file!(self, StorageIoOperation::Metadata, |file| file
.as_ref()
.metadata())
self.with_file(StorageIoOperation::Metadata, |file| file.metadata())
.await?
}

/// Helper function internal to `VirtualFile` that looks up the underlying File,
/// opens it and evicts some other File if necessary. The passed parameter is
/// assumed to be a function available for the physical `File`.
///
/// We are doing it via a macro as Rust doesn't support async closures that
/// take on parameters with lifetimes.
async fn lock_file(&self) -> Result<FileGuard<'_>, Error> {
/// Helper function that looks up the underlying File for this VirtualFile,
/// opening it and evicting some other File if necessary. It calls 'func'
/// with the physical File.
async fn with_file<F, R>(&self, op: StorageIoOperation, mut func: F) -> Result<R, Error>
where
F: FnMut(&File) -> R,
{
let open_files = get_open_files();

let mut handle_guard = {
Expand All @@ -380,23 +356,27 @@ impl VirtualFile {
// We only need to hold the handle lock while we read the current handle. If
// another thread closes the file and recycles the slot for a different file,
// we will notice that the handle we read is no longer valid and retry.
let mut handle = *self.handle.read().await;
let mut handle = *self.handle.read().unwrap();
loop {
// Check if the slot contains our File
{
let slot = &open_files.slots[handle.index];
let slot_guard = slot.inner.read().await;
if slot_guard.tag == handle.tag && slot_guard.file.is_some() {
// Found a cached file descriptor.
slot.recently_used.store(true, Ordering::Relaxed);
return Ok(FileGuard { slot_guard });
let slot_guard = slot.inner.read().unwrap();
if slot_guard.tag == handle.tag {
if let Some(file) = &slot_guard.file {
// Found a cached file descriptor.
slot.recently_used.store(true, Ordering::Relaxed);
return Ok(STORAGE_IO_TIME_METRIC
.get(op)
.observe_closure_duration(|| func(file)));
}
}
}

// The slot didn't contain our File. We will have to open it ourselves,
// but before that, grab a write lock on handle in the VirtualFile, so
// that no other thread will try to concurrently open the same file.
let handle_guard = self.handle.write().await;
let handle_guard = self.handle.write().unwrap();

// If another thread changed the handle while we were not holding the lock,
// then the handle might now be valid again. Loop back to retry.
Expand All @@ -410,20 +390,25 @@ impl VirtualFile {

// We need to open the file ourselves. The handle in the VirtualFile is
// now locked in write-mode. Find a free slot to put it in.
let (handle, mut slot_guard) = open_files.find_victim_slot().await;
let (handle, mut slot_guard) = open_files.find_victim_slot();

// Open the physical file
let file = observe_duration!(StorageIoOperation::Open, self.open_options.open(&self.path))?;
let file = STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Open)
.observe_closure_duration(|| self.open_options.open(&self.path))?;

// Perform the requested operation on it
let result = STORAGE_IO_TIME_METRIC
.get(op)
.observe_closure_duration(|| func(&file));

// Store the File in the slot and update the handle in the VirtualFile
// to point to it.
slot_guard.file.replace(file);

*handle_guard = handle;

return Ok(FileGuard {
slot_guard: slot_guard.downgrade(),
});
Ok(result)
}

pub fn remove(self) {
Expand All @@ -438,9 +423,11 @@ impl VirtualFile {
self.pos = offset;
}
SeekFrom::End(offset) => {
self.pos = with_file!(self, StorageIoOperation::Seek, |file| file
.as_ref()
.seek(SeekFrom::End(offset)))?
self.pos = self
.with_file(StorageIoOperation::Seek, |mut file| {
file.seek(SeekFrom::End(offset))
})
.await??
}
SeekFrom::Current(offset) => {
let pos = self.pos as i128 + offset as i128;
Expand Down Expand Up @@ -528,9 +515,9 @@ impl VirtualFile {
}

pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
let result = with_file!(self, StorageIoOperation::Read, |file| file
.as_ref()
.read_at(buf, offset));
let result = self
.with_file(StorageIoOperation::Read, |file| file.read_at(buf, offset))
.await?;
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
Expand All @@ -540,9 +527,9 @@ impl VirtualFile {
}

async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
let result = with_file!(self, StorageIoOperation::Write, |file| file
.as_ref()
.write_at(buf, offset));
let result = self
.with_file(StorageIoOperation::Write, |file| file.write_at(buf, offset))
.await?;
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["write", &self.tenant_id, &self.timeline_id])
Expand All @@ -552,18 +539,6 @@ impl VirtualFile {
}
}

struct FileGuard<'a> {
slot_guard: RwLockReadGuard<'a, SlotInner>,
}

impl<'a> AsRef<File> for FileGuard<'a> {
fn as_ref(&self) -> &File {
// This unwrap is safe because we only create `FileGuard`s
// if we know that the file is Some.
self.slot_guard.file.as_ref().unwrap()
}
}

#[cfg(test)]
impl VirtualFile {
pub(crate) async fn read_blk(
Expand Down Expand Up @@ -596,39 +571,20 @@ impl VirtualFile {
impl Drop for VirtualFile {
/// If a VirtualFile is dropped, close the underlying file if it was open.
fn drop(&mut self) {
let handle = self.handle.get_mut();

fn clean_slot(slot: &Slot, mut slot_guard: RwLockWriteGuard<'_, SlotInner>, tag: u64) {
if slot_guard.tag == tag {
slot.recently_used.store(false, Ordering::Relaxed);
// there is also the `CloseByReplace` operation for closes done on eviction for
// comparison.
STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Close)
.observe_closure_duration(|| drop(slot_guard.file.take()));
}
}
let handle = self.handle.get_mut().unwrap();

// We don't have async drop so we cannot directly await the lock here.
// Instead, first do a best-effort attempt at closing the underlying
// file descriptor by using `try_write`, and if that fails, spawn
// a tokio task to do it asynchronously: we just want it to be
// cleaned up eventually.
// Most of the time, the `try_lock` should succeed though,
// as we have `&mut self` access. In other words, if the slot
// is still occupied by our file, there should be no access from
// other I/O operations; the only other possible place to lock
// the slot is the lock algorithm looking for free slots.
// We could check with a read-lock first, to avoid waiting on an
// unrelated I/O.
let slot = &get_open_files().slots[handle.index];
if let Ok(slot_guard) = slot.inner.try_write() {
clean_slot(slot, slot_guard, handle.tag);
} else {
let tag = handle.tag;
tokio::spawn(async move {
let slot_guard = slot.inner.write().await;
clean_slot(slot, slot_guard, tag);
});
};
let mut slot_guard = slot.inner.write().unwrap();
if slot_guard.tag == handle.tag {
slot.recently_used.store(false, Ordering::Relaxed);
// there is also operation "close-by-replace" for closes done on eviction for
// comparison.
STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Close)
.observe_closure_duration(|| drop(slot_guard.file.take()));
}
}
}

Expand Down