Skip to content

Commit

Permalink
use tokio_epoll_uring for read path
Browse files Browse the repository at this point in the history
  • Loading branch information
problame committed Nov 20, 2023
1 parent a4e6f0c commit 51b26b1
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 49 deletions.
78 changes: 68 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pageserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ enum-map.workspace = true
enumset.workspace = true
strum.workspace = true
strum_macros.workspace = true
#tokio-epoll-uring = { path = "../../tokio-epoll-uring/tokio-epoll-uring" }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }

[dev-dependencies]
criterion.workspace = true
Expand Down
14 changes: 9 additions & 5 deletions pageserver/src/tenant/block_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
use super::ephemeral_file::EphemeralFile;
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
use crate::context::RequestContext;
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
use crate::page_cache::{self, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ};
use crate::virtual_file::VirtualFile;
use bytes::Bytes;
use std::ops::{Deref, DerefMut};
use std::ops::Deref;

/// This is implemented by anything that can read 8 kB (PAGE_SZ)
/// blocks, using the page cache
Expand Down Expand Up @@ -169,7 +169,11 @@ impl FileBlockReader {
}

/// Read a page from the underlying file into given buffer.
async fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> {
async fn fill_buffer(
&self,
buf: PageWriteGuard<'static>,
blkno: u32,
) -> Result<PageWriteGuard<'static>, std::io::Error> {
assert!(buf.len() == PAGE_SZ);
self.file
.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
Expand All @@ -196,9 +200,9 @@ impl FileBlockReader {
)
})? {
ReadBufResult::Found(guard) => Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => {
ReadBufResult::NotFound(write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
let write_guard = self.fill_buffer(write_guard, blknum).await?;
Ok(write_guard.mark_valid().into())
}
}
Expand Down
9 changes: 4 additions & 5 deletions pageserver/src/tenant/ephemeral_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,10 @@ impl EphemeralFile {
page_cache::ReadBufResult::Found(guard) => {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
page_cache::ReadBufResult::NotFound(write_guard) => {
let write_guard = self
.file
.read_exact_at(write_guard, blknum as u64 * PAGE_SZ as u64)
.await?;
let read_guard = write_guard.mark_valid();
return Ok(BlockLease::PageReadGuard(read_guard));
Expand Down
83 changes: 54 additions & 29 deletions pageserver/src/virtual_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
//! src/backend/storage/file/fd.c
//!
use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};
use crate::page_cache::PageWriteGuard;
use crate::tenant::TENANTS_SEGMENT_NAME;
use camino::{Utf8Path, Utf8PathBuf};
use once_cell::sync::OnceCell;
use std::fs::{self, File, OpenOptions};
use std::io::{Error, ErrorKind, Seek, SeekFrom};
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd};
use std::os::unix::fs::FileExt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
Expand Down Expand Up @@ -399,7 +401,7 @@ impl VirtualFile {
///
/// We are doing it via a macro as Rust doesn't support async closures that
/// take on parameters with lifetimes.
async fn lock_file(&self) -> Result<FileGuard<'_>, Error> {
async fn lock_file(&self) -> Result<FileGuard<'static>, Error> {
let open_files = get_open_files();

let mut handle_guard = {
Expand Down Expand Up @@ -489,24 +491,59 @@ impl VirtualFile {
}

// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
pub async fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> {
while !buf.is_empty() {
match self.read_at(buf, offset).await {
Ok(0) => {
return Err(Error::new(
std::io::ErrorKind::UnexpectedEof,
"failed to fill whole buffer",
))
}
Ok(n) => {
buf = &mut buf[n..];
offset += n as u64;
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
pub async fn read_exact_at(
&self,
write_guard: PageWriteGuard<'static>,
offset: u64,
) -> Result<PageWriteGuard<'static>, Error> {
let file_guard: FileGuard<'static> = self.lock_file().await?;

let system = tokio_epoll_uring::thread_local_system().await;
struct PageWriteGuardBuf {
buf: PageWriteGuard<'static>,
init_up_to: usize,
}
unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
fn stable_ptr(&self) -> *const u8 {
self.buf.as_ptr()
}
fn bytes_init(&self) -> usize {
self.init_up_to
}
fn bytes_total(&self) -> usize {
self.buf.len()
}
}
Ok(())
unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
fn stable_mut_ptr(&mut self) -> *mut u8 {
self.buf.as_mut_ptr()
}

unsafe fn set_init(&mut self, pos: usize) {
assert!(pos <= self.buf.len());
self.init_up_to = pos;
}
}
let buf = PageWriteGuardBuf {
buf: write_guard,
init_up_to: 0,
};
let owned_fd = unsafe { OwnedFd::from_raw_fd(file_guard.as_ref().as_raw_fd()) };
let guard = scopeguard::guard(file_guard, |_| {
panic!("must not drop future while operation is ongoing (todo: pass file_guard to tokio_epoll_uring to aovid this)")
});
let ((owned_fd, buf), res) = system.read(owned_fd, offset, buf).await;
let _ = OwnedFd::into_raw_fd(owned_fd);
let _ = scopeguard::ScopeGuard::into_inner(guard);
let PageWriteGuardBuf {
buf: write_guard,
init_up_to,
} = buf;
if let Ok(num_read) = res {
assert!(init_up_to == num_read); // TODO need to deal with short reads here
}
res.map(|_| write_guard)
.map_err(|e| Error::new(ErrorKind::Other, e))
}

// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
Expand Down Expand Up @@ -556,18 +593,6 @@ impl VirtualFile {
Ok(n)
}

pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
let result = with_file!(self, StorageIoOperation::Read, |file| file
.as_ref()
.read_at(buf, offset));
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
.add(size as i64);
}
result
}

async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
let result = with_file!(self, StorageIoOperation::Write, |file| file
.as_ref()
Expand Down

0 comments on commit 51b26b1

Please sign in to comment.