-
Notifications
You must be signed in to change notification settings - Fork 847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement MultipartStore for ThrottledStore #5533
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -216,6 +216,7 @@ impl AsyncBufRead for BufReader { | |
/// streamed using [`ObjectStore::put_multipart`] | ||
pub struct BufWriter { | ||
capacity: usize, | ||
max_concurrency: usize, | ||
state: BufWriterState, | ||
store: Arc<dyn ObjectStore>, | ||
} | ||
|
@@ -250,10 +251,21 @@ impl BufWriter { | |
Self { | ||
capacity, | ||
store, | ||
max_concurrency: 8, | ||
state: BufWriterState::Buffer(path, Vec::new()), | ||
} | ||
} | ||
|
||
/// Override the maximum number of in-flight requests for this writer | ||
/// | ||
/// Defaults to 8 | ||
pub fn with_max_concurrency(self, max_concurrency: usize) -> Self { | ||
Self { | ||
max_concurrency, | ||
..self | ||
} | ||
} | ||
Comment on lines
+259
to
+267
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if 0 is passed in? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will apply backpressure if there is any in-flight request |
||
|
||
/// Abort this writer, cleaning up any partially uploaded state | ||
/// | ||
/// # Panic | ||
|
@@ -275,9 +287,11 @@ impl AsyncWrite for BufWriter { | |
buf: &[u8], | ||
) -> Poll<Result<usize, Error>> { | ||
let cap = self.capacity; | ||
let max_concurrency = self.max_concurrency; | ||
loop { | ||
return match &mut self.state { | ||
BufWriterState::Write(Some(write)) => { | ||
ready!(write.poll_for_capacity(cx, max_concurrency))?; | ||
write.write(buf); | ||
Poll::Ready(Ok(buf.len())) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,12 +15,15 @@ | |
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use crate::{PutResult, Result}; | ||
use std::task::{ready, Context, Poll}; | ||
|
||
use async_trait::async_trait; | ||
use bytes::Bytes; | ||
use futures::future::BoxFuture; | ||
use tokio::task::JoinSet; | ||
|
||
use crate::{PutResult, Result}; | ||
|
||
/// An upload part request | ||
pub type UploadPart = BoxFuture<'static, Result<()>>; | ||
|
||
|
@@ -110,31 +113,44 @@ pub struct WriteMultipart { | |
impl WriteMultipart { | ||
/// Create a new [`WriteMultipart`] that will upload using 5MB chunks | ||
pub fn new(upload: Box<dyn MultipartUpload>) -> Self { | ||
Self::new_with_capacity(upload, 5 * 1024 * 1024) | ||
Self::new_with_chunk_size(upload, 5 * 1024 * 1024) | ||
} | ||
|
||
/// Create a new [`WriteMultipart`] that will upload in fixed `capacity` sized chunks | ||
pub fn new_with_capacity(upload: Box<dyn MultipartUpload>, capacity: usize) -> Self { | ||
/// Create a new [`WriteMultipart`] that will upload in fixed `chunk_size` sized chunks | ||
pub fn new_with_chunk_size(upload: Box<dyn MultipartUpload>, chunk_size: usize) -> Self { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this naming is a bit more obvious |
||
Self { | ||
upload, | ||
buffer: Vec::with_capacity(capacity), | ||
buffer: Vec::with_capacity(chunk_size), | ||
tasks: Default::default(), | ||
} | ||
} | ||
|
||
/// Wait until there are `max_concurrency` or fewer requests in-flight | ||
pub async fn wait_for_capacity(&mut self, max_concurrency: usize) -> Result<()> { | ||
while self.tasks.len() > max_concurrency { | ||
self.tasks.join_next().await.unwrap()??; | ||
/// Polls for there to be less than `max_concurrency` [`UploadPart`] in progress | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've changed this to be |
||
/// | ||
/// See [`Self::wait_for_capacity`] for an async version of this function | ||
pub fn poll_for_capacity( | ||
&mut self, | ||
cx: &mut Context<'_>, | ||
max_concurrency: usize, | ||
) -> Poll<Result<()>> { | ||
while !self.tasks.is_empty() && self.tasks.len() >= max_concurrency { | ||
ready!(self.tasks.poll_join_next(cx)).unwrap()?? | ||
} | ||
Ok(()) | ||
Poll::Ready(Ok(())) | ||
} | ||
|
||
/// Wait until there are less than `max_concurrency` [`UploadPart`] in progress | ||
/// | ||
/// See [`Self::poll_for_capacity`] for a [`Poll`] version of this function | ||
pub async fn wait_for_capacity(&mut self, max_concurrency: usize) -> Result<()> { | ||
futures::future::poll_fn(|cx| self.poll_for_capacity(cx, max_concurrency)).await | ||
} | ||
|
||
/// Write data to this [`WriteMultipart`] | ||
/// | ||
/// Note this method is synchronous (not `async`) and will immediately start new uploads | ||
/// as soon as the internal `capacity` is hit, regardless of | ||
/// how many outstanding uploads are already in progress. | ||
/// Note this method is synchronous (not `async`) and will immediately | ||
/// start new uploads as soon as the internal `chunk_size` is hit, | ||
/// regardless of how many outstanding uploads are already in progress. | ||
/// | ||
/// Back pressure can optionally be applied to producers by calling | ||
/// [`Self::wait_for_capacity`] prior to calling this method | ||
|
@@ -173,3 +189,36 @@ impl WriteMultipart { | |
self.upload.complete().await | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use std::time::Duration; | ||
|
||
use futures::FutureExt; | ||
|
||
use crate::memory::InMemory; | ||
use crate::path::Path; | ||
use crate::throttle::{ThrottleConfig, ThrottledStore}; | ||
use crate::ObjectStore; | ||
|
||
use super::*; | ||
|
||
#[tokio::test] | ||
async fn test_concurrency() { | ||
let config = ThrottleConfig { | ||
wait_put_per_call: Duration::from_millis(1), | ||
..Default::default() | ||
}; | ||
|
||
let path = Path::from("foo"); | ||
let store = ThrottledStore::new(InMemory::new(), config); | ||
let upload = store.put_multipart(&path).await.unwrap(); | ||
let mut write = WriteMultipart::new_with_chunk_size(upload, 10); | ||
|
||
for _ in 0..20 { | ||
write.write(&[0; 5]); | ||
} | ||
assert!(write.wait_for_capacity(10).now_or_never().is_none()); | ||
write.wait_for_capacity(10).await.unwrap() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prior to #5500 this was an implementation detail of the stores, now it is user configurable 🎉