Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(owned_buffer_io::util::size_tracking_writer): make generic over underlying writer #7483

Merged
9 changes: 5 additions & 4 deletions pageserver/src/tenant/remote_timeline_client/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ async fn download_object<'a>(
#[cfg(target_os = "linux")]
crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
use crate::virtual_file::owned_buffers_io::{self, util::size_tracking_writer};
use bytes::BytesMut;
async {
let destination_file = VirtualFile::create(dst_path)
.await
Expand All @@ -194,10 +195,10 @@ async fn download_object<'a>(
// There's chunks_vectored() on the stream.
let (bytes_amount, destination_file) = async {
let size_tracking = size_tracking_writer::Writer::new(destination_file);
let mut buffered = owned_buffers_io::write::BufferedWriter::<
{ super::BUFFER_SIZE },
_,
>::new(size_tracking);
let mut buffered = owned_buffers_io::write::BufferedWriter::<BytesMut, _>::new(
size_tracking,
BytesMut::with_capacity(super::BUFFER_SIZE),
);
while let Some(res) =
futures::StreamExt::next(&mut download.download_stream).await
{
Expand Down
12 changes: 12 additions & 0 deletions pageserver/src/virtual_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub use io_engine::feature_test as io_engine_feature_test;
pub use io_engine::FeatureTestResult as IoEngineFeatureTestResult;
mod metadata;
mod open_options;
use self::owned_buffers_io::write::OwnedAsyncWriter;
pub(crate) use io_engine::IoEngineKind;
pub(crate) use metadata::Metadata;
pub(crate) use open_options::*;
Expand Down Expand Up @@ -1083,6 +1084,17 @@ impl Drop for VirtualFile {
}
}

impl OwnedAsyncWriter for VirtualFile {
#[inline(always)]
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
buf: B,
) -> std::io::Result<(usize, B::Buf)> {
let (buf, res) = VirtualFile::write_all(self, buf).await;
res.map(move |v| (v, buf))
}
}

impl OpenFiles {
fn new(num_slots: usize) -> OpenFiles {
let mut slots = Box::new(Vec::with_capacity(num_slots));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,36 @@
use crate::virtual_file::{owned_buffers_io::write::OwnedAsyncWriter, VirtualFile};
use crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter;
use tokio_epoll_uring::{BoundedBuf, IoBuf};

pub struct Writer {
dst: VirtualFile,
pub struct Writer<W> {
dst: W,
bytes_amount: u64,
}

impl Writer {
pub fn new(dst: VirtualFile) -> Self {
impl<W> Writer<W> {
pub fn new(dst: W) -> Self {
Self {
dst,
bytes_amount: 0,
}
}

/// Returns the wrapped `VirtualFile` object as well as the number
/// of bytes that were written to it through this object.
pub fn into_inner(self) -> (u64, VirtualFile) {
pub fn into_inner(self) -> (u64, W) {
(self.bytes_amount, self.dst)
}
}

impl OwnedAsyncWriter for Writer {
impl<W> OwnedAsyncWriter for Writer<W>
where
W: OwnedAsyncWriter,
{
#[inline(always)]
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
buf: B,
) -> std::io::Result<(usize, B::Buf)> {
let (buf, res) = self.dst.write_all(buf).await;
let nwritten = res?;
let (nwritten, buf) = self.dst.write_all(buf).await?;
self.bytes_amount += u64::try_from(nwritten).unwrap();
Ok((nwritten, buf))
}
Expand Down
147 changes: 105 additions & 42 deletions pageserver/src/virtual_file/owned_buffers_io/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,39 +10,40 @@ pub trait OwnedAsyncWriter {
) -> std::io::Result<(usize, B::Buf)>;
}

/// A wrapper aorund an [`OwnedAsyncWriter`] that batches smaller writers
/// into `BUFFER_SIZE`-sized writes.
/// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch
/// small writes into larger writes of size [`Buffer::cap`].
///
/// # Passthrough Of Large Writers
///
/// Buffered writes larger than the `BUFFER_SIZE` cause the internal
/// buffer to be flushed, even if it is not full yet. Then, the large
/// buffered write is passed through to the unerlying [`OwnedAsyncWriter`].
/// Calls to [`BufferedWriter::write_buffered`] that are larger than [`Buffer::cap`]
/// cause the internal buffer to be flushed prematurely so that the large
/// buffered write is passed through to the underlying [`OwnedAsyncWriter`].
///
/// This pass-through is generally beneficial for throughput, but if
/// the storage backend of the [`OwnedAsyncWriter`] is a shared resource,
/// unlimited large writes may cause latency or fairness issues.
///
/// In such cases, a different implementation that always buffers in memory
/// may be preferable.
pub struct BufferedWriter<const BUFFER_SIZE: usize, W> {
pub struct BufferedWriter<B, W> {
writer: W,
// invariant: always remains Some(buf)
// with buf.capacity() == BUFFER_SIZE except
// - while IO is ongoing => goes back to Some() once the IO completed successfully
// - after an IO error => stays `None` forever
// In these exceptional cases, it's `None`.
buf: Option<BytesMut>,
/// invariant: always remains Some(buf) except
/// - while IO is ongoing => goes back to Some() once the IO completed successfully
/// - after an IO error => stays `None` forever
/// In these exceptional cases, it's `None`.
buf: Option<B>,
}

impl<const BUFFER_SIZE: usize, W> BufferedWriter<BUFFER_SIZE, W>
impl<B, Buf, W> BufferedWriter<B, W>
where
B: Buffer<IoBuf = Buf> + Send,
Buf: IoBuf + Send,
W: OwnedAsyncWriter,
{
pub fn new(writer: W) -> Self {
pub fn new(writer: W, buf: B) -> Self {
Self {
writer,
buf: Some(BytesMut::with_capacity(BUFFER_SIZE)),
buf: Some(buf),
}
}

Expand All @@ -53,61 +54,121 @@ where
Ok(writer)
}

pub async fn write_buffered<B: IoBuf>(&mut self, chunk: Slice<B>) -> std::io::Result<()>
#[inline(always)]
fn buf(&self) -> &B {
self.buf
.as_ref()
.expect("must not use after we returned an error")
}

pub async fn write_buffered<S: IoBuf>(&mut self, chunk: Slice<S>) -> std::io::Result<(usize, S)>
where
B: IoBuf + Send,
S: IoBuf + Send,
{
let chunk_len = chunk.len();
// avoid memcpy for the middle of the chunk
if chunk.len() >= BUFFER_SIZE {
if chunk.len() >= self.buf().cap() {
self.flush().await?;
// do a big write, bypassing `buf`
assert_eq!(
self.buf
.as_ref()
.expect("must not use after an error")
.len(),
.pending(),
0
);
let chunk_len = chunk.len();
let (nwritten, chunk) = self.writer.write_all(chunk).await?;
assert_eq!(nwritten, chunk_len);
drop(chunk);
return Ok(());
return Ok((nwritten, chunk));
}
// in-memory copy the < BUFFER_SIZED tail of the chunk
assert!(chunk.len() < BUFFER_SIZE);
let mut chunk = &chunk[..];
while !chunk.is_empty() {
assert!(chunk.len() < self.buf().cap());
let mut slice = &chunk[..];
while !slice.is_empty() {
let buf = self.buf.as_mut().expect("must not use after an error");
let need = BUFFER_SIZE - buf.len();
let have = chunk.len();
let need = buf.cap() - buf.pending();
let have = slice.len();
let n = std::cmp::min(need, have);
buf.extend_from_slice(&chunk[..n]);
chunk = &chunk[n..];
if buf.len() >= BUFFER_SIZE {
assert_eq!(buf.len(), BUFFER_SIZE);
buf.extend_from_slice(&slice[..n]);
slice = &slice[n..];
if buf.pending() >= buf.cap() {
assert_eq!(buf.pending(), buf.cap());
self.flush().await?;
}
}
assert!(chunk.is_empty(), "by now we should have drained the chunk");
Ok(())
assert!(slice.is_empty(), "by now we should have drained the chunk");
Ok((chunk_len, chunk.into_inner()))
}

async fn flush(&mut self) -> std::io::Result<()> {
let buf = self.buf.take().expect("must not use after an error");
if buf.is_empty() {
let buf_len = buf.pending();
if buf_len == 0 {
self.buf = Some(buf);
return std::io::Result::Ok(());
return Ok(());
}
let buf_len = buf.len();
let (nwritten, mut buf) = self.writer.write_all(buf).await?;
let (nwritten, io_buf) = self.writer.write_all(buf.flush()).await?;
assert_eq!(nwritten, buf_len);
buf.clear();
self.buf = Some(buf);
self.buf = Some(Buffer::reuse_after_flush(io_buf));
Ok(())
}
}

/// A [`Buffer`] is used by [`BufferedWriter`] to batch smaller writes into larger ones.
pub trait Buffer {
type IoBuf: IoBuf;

/// Capacity of the buffer. Must not change over the lifetime `self`.`
fn cap(&self) -> usize;

/// Add data to the buffer.
/// Panics if there is not enough room to accomodate `other`'s content, i.e.,
/// panics if `other.len() > self.cap() - self.pending()`.
fn extend_from_slice(&mut self, other: &[u8]);

/// Number of bytes in the buffer.
fn pending(&self) -> usize;

/// Turns `self` into a [`tokio_epoll_uring::Slice`] of the pending data
/// so we can use [`tokio_epoll_uring`] to write it to disk.
fn flush(self) -> Slice<Self::IoBuf>;

/// After the write to disk is done and we have gotten back the slice,
/// [`BufferedWriter`] uses this method to re-use the io buffer.
fn reuse_after_flush(iobuf: Self::IoBuf) -> Self;
}

impl Buffer for BytesMut {
type IoBuf = BytesMut;

#[inline(always)]
fn cap(&self) -> usize {
self.capacity()
}

fn extend_from_slice(&mut self, other: &[u8]) {
BytesMut::extend_from_slice(self, other)
}

#[inline(always)]
fn pending(&self) -> usize {
self.len()
}

fn flush(self) -> Slice<BytesMut> {
if self.is_empty() {
return self.slice_full();
}
let len = self.len();
self.slice(0..len)
}

fn reuse_after_flush(mut iobuf: BytesMut) -> Self {
iobuf.clear();
iobuf
}
}

impl OwnedAsyncWriter for Vec<u8> {
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
Expand All @@ -125,6 +186,8 @@ impl OwnedAsyncWriter for Vec<u8> {

#[cfg(test)]
mod tests {
use bytes::BytesMut;

use super::*;

#[derive(Default)]
Expand Down Expand Up @@ -158,7 +221,7 @@ mod tests {
#[tokio::test]
async fn test_buffered_writes_only() -> std::io::Result<()> {
let recorder = RecorderWriter::default();
let mut writer = BufferedWriter::<2, _>::new(recorder);
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
write!(writer, b"a");
write!(writer, b"b");
write!(writer, b"c");
Expand All @@ -175,7 +238,7 @@ mod tests {
#[tokio::test]
async fn test_passthrough_writes_only() -> std::io::Result<()> {
let recorder = RecorderWriter::default();
let mut writer = BufferedWriter::<2, _>::new(recorder);
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
write!(writer, b"abc");
write!(writer, b"de");
write!(writer, b"");
Expand All @@ -191,7 +254,7 @@ mod tests {
#[tokio::test]
async fn test_passthrough_write_with_nonempty_buffer() -> std::io::Result<()> {
let recorder = RecorderWriter::default();
let mut writer = BufferedWriter::<2, _>::new(recorder);
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
write!(writer, b"a");
write!(writer, b"bc");
write!(writer, b"d");
Expand Down
Loading