Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement MultipartStore for ThrottledStore #5533

Merged
merged 3 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions object_store/src/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ impl AsyncBufRead for BufReader {
/// streamed using [`ObjectStore::put_multipart`]
pub struct BufWriter {
capacity: usize,
max_concurrency: usize,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prior to #5500 this was an implementation detail of the stores, now it is user configurable 🎉

state: BufWriterState,
store: Arc<dyn ObjectStore>,
}
Expand Down Expand Up @@ -250,10 +251,21 @@ impl BufWriter {
Self {
capacity,
store,
max_concurrency: 8,
state: BufWriterState::Buffer(path, Vec::new()),
}
}

/// Override the maximum number of in-flight requests for this writer
///
/// Defaults to 8
pub fn with_max_concurrency(self, max_concurrency: usize) -> Self {
Self {
max_concurrency,
..self
}
}
Comment on lines +259 to +267
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if 0 is passed in?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will apply backpressure if there is any in-flight request


/// Abort this writer, cleaning up any partially uploaded state
///
/// # Panic
Expand All @@ -275,9 +287,11 @@ impl AsyncWrite for BufWriter {
buf: &[u8],
) -> Poll<Result<usize, Error>> {
let cap = self.capacity;
let max_concurrency = self.max_concurrency;
loop {
return match &mut self.state {
BufWriterState::Write(Some(write)) => {
ready!(write.poll_for_capacity(cx, max_concurrency))?;
write.write(buf);
Poll::Ready(Ok(buf.len()))
}
Expand Down
78 changes: 71 additions & 7 deletions object_store/src/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ use parking_lot::Mutex;
use std::ops::Range;
use std::{convert::TryInto, sync::Arc};

use crate::GetOptions;
use crate::multipart::{MultipartStore, PartId};
use crate::{
path::Path, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
PutOptions, PutResult, Result,
path::Path, GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload, ObjectMeta,
ObjectStore, PutOptions, PutResult, Result,
};
use crate::{GetOptions, UploadPart};
use async_trait::async_trait;
use bytes::Bytes;
use futures::{stream::BoxStream, FutureExt, StreamExt};
Expand Down Expand Up @@ -110,12 +111,12 @@ async fn sleep(duration: Duration) {
/// **Note that the behavior of the wrapper is deterministic and might not reflect real-world
/// conditions!**
#[derive(Debug)]
pub struct ThrottledStore<T: ObjectStore> {
pub struct ThrottledStore<T> {
inner: T,
config: Arc<Mutex<ThrottleConfig>>,
}

impl<T: ObjectStore> ThrottledStore<T> {
impl<T> ThrottledStore<T> {
/// Create new wrapper with zero waiting times.
pub fn new(inner: T, config: ThrottleConfig) -> Self {
Self {
Expand Down Expand Up @@ -157,8 +158,12 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
self.inner.put_opts(location, bytes, opts).await
}

async fn put_multipart(&self, _location: &Path) -> Result<Box<dyn MultipartUpload>> {
Err(super::Error::NotImplemented)
async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
let upload = self.inner.put_multipart(location).await?;
Ok(Box::new(ThrottledUpload {
upload,
sleep: self.config().wait_put_per_call,
}))
}

async fn get(&self, location: &Path) -> Result<GetResult> {
Expand Down Expand Up @@ -316,6 +321,63 @@ where
.boxed()
}

#[async_trait]
impl<T: MultipartStore> MultipartStore for ThrottledStore<T> {
async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
self.inner.create_multipart(path).await
}

async fn put_part(
&self,
path: &Path,
id: &MultipartId,
part_idx: usize,
data: Bytes,
) -> Result<PartId> {
sleep(self.config().wait_put_per_call).await;
self.inner.put_part(path, id, part_idx, data).await
}

async fn complete_multipart(
&self,
path: &Path,
id: &MultipartId,
parts: Vec<PartId>,
) -> Result<PutResult> {
self.inner.complete_multipart(path, id, parts).await
}

async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> {
self.inner.abort_multipart(path, id).await
}
}

#[derive(Debug)]
struct ThrottledUpload {
upload: Box<dyn MultipartUpload>,
sleep: Duration,
}

#[async_trait]
impl MultipartUpload for ThrottledUpload {
fn put_part(&mut self, data: Bytes) -> UploadPart {
let duration = self.sleep;
let put = self.upload.put_part(data);
Box::pin(async move {
sleep(duration).await;
put.await
})
}

async fn complete(&mut self) -> Result<PutResult> {
self.upload.complete().await
}

async fn abort(&mut self) -> Result<()> {
self.upload.abort().await
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -351,6 +413,8 @@ mod tests {
list_with_delimiter(&store).await;
rename_and_copy(&store).await;
copy_if_not_exists(&store).await;
stream_get(&store).await;
multipart(&store, &store).await;
}

#[tokio::test]
Expand Down
75 changes: 62 additions & 13 deletions object_store/src/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
// specific language governing permissions and limitations
// under the License.

use crate::{PutResult, Result};
use std::task::{ready, Context, Poll};

use async_trait::async_trait;
use bytes::Bytes;
use futures::future::BoxFuture;
use tokio::task::JoinSet;

use crate::{PutResult, Result};

/// An upload part request
pub type UploadPart = BoxFuture<'static, Result<()>>;

Expand Down Expand Up @@ -110,31 +113,44 @@ pub struct WriteMultipart {
impl WriteMultipart {
/// Create a new [`WriteMultipart`] that will upload using 5MB chunks
pub fn new(upload: Box<dyn MultipartUpload>) -> Self {
Self::new_with_capacity(upload, 5 * 1024 * 1024)
Self::new_with_chunk_size(upload, 5 * 1024 * 1024)
}

/// Create a new [`WriteMultipart`] that will upload in fixed `capacity` sized chunks
pub fn new_with_capacity(upload: Box<dyn MultipartUpload>, capacity: usize) -> Self {
/// Create a new [`WriteMultipart`] that will upload in fixed `chunk_size` sized chunks
pub fn new_with_chunk_size(upload: Box<dyn MultipartUpload>, chunk_size: usize) -> Self {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this naming is a bit more obvious

Self {
upload,
buffer: Vec::with_capacity(capacity),
buffer: Vec::with_capacity(chunk_size),
tasks: Default::default(),
}
}

/// Wait until there are `max_concurrency` or fewer requests in-flight
pub async fn wait_for_capacity(&mut self, max_concurrency: usize) -> Result<()> {
while self.tasks.len() > max_concurrency {
self.tasks.join_next().await.unwrap()??;
/// Polls for there to be less than `max_concurrency` [`UploadPart`] in progress
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed this to be less than instead of equal as I think it means if you want to limit to x requests you poll for x requests, as opposed to x - 1

///
/// See [`Self::wait_for_capacity`] for an async version of this function
pub fn poll_for_capacity(
&mut self,
cx: &mut Context<'_>,
max_concurrency: usize,
) -> Poll<Result<()>> {
while !self.tasks.is_empty() && self.tasks.len() >= max_concurrency {
ready!(self.tasks.poll_join_next(cx)).unwrap()??
}
Ok(())
Poll::Ready(Ok(()))
}

/// Wait until there are less than `max_concurrency` [`UploadPart`] in progress
///
/// See [`Self::poll_for_capacity`] for a [`Poll`] version of this function
pub async fn wait_for_capacity(&mut self, max_concurrency: usize) -> Result<()> {
futures::future::poll_fn(|cx| self.poll_for_capacity(cx, max_concurrency)).await
}

/// Write data to this [`WriteMultipart`]
///
/// Note this method is synchronous (not `async`) and will immediately start new uploads
/// as soon as the internal `capacity` is hit, regardless of
/// how many outstanding uploads are already in progress.
/// Note this method is synchronous (not `async`) and will immediately
/// start new uploads as soon as the internal `chunk_size` is hit,
/// regardless of how many outstanding uploads are already in progress.
///
/// Back pressure can optionally be applied to producers by calling
/// [`Self::wait_for_capacity`] prior to calling this method
Expand Down Expand Up @@ -173,3 +189,36 @@ impl WriteMultipart {
self.upload.complete().await
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use futures::FutureExt;

use crate::memory::InMemory;
use crate::path::Path;
use crate::throttle::{ThrottleConfig, ThrottledStore};
use crate::ObjectStore;

use super::*;

#[tokio::test]
async fn test_concurrency() {
let config = ThrottleConfig {
wait_put_per_call: Duration::from_millis(1),
..Default::default()
};

let path = Path::from("foo");
let store = ThrottledStore::new(InMemory::new(), config);
let upload = store.put_multipart(&path).await.unwrap();
let mut write = WriteMultipart::new_with_chunk_size(upload, 10);

for _ in 0..20 {
write.write(&[0; 5]);
}
assert!(write.wait_for_capacity(10).now_or_never().is_none());
write.wait_for_capacity(10).await.unwrap()
}
}
Loading