-
Notifications
You must be signed in to change notification settings - Fork 847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement MultipartStore for ThrottledStore #5533
Implement MultipartStore for ThrottledStore #5533
Conversation
Limit concurrency in BufWriter Tweak WriteMultipart
/// Create a new [`WriteMultipart`] that will upload in fixed `capacity` sized chunks | ||
pub fn new_with_capacity(upload: Box<dyn MultipartUpload>, capacity: usize) -> Self { | ||
/// Create a new [`WriteMultipart`] that will upload in fixed `chunk_size` sized chunks | ||
pub fn new_with_chunk_size(upload: Box<dyn MultipartUpload>, chunk_size: usize) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this naming is a bit more obvious
pub async fn wait_for_capacity(&mut self, max_concurrency: usize) -> Result<()> { | ||
while self.tasks.len() > max_concurrency { | ||
self.tasks.join_next().await.unwrap()??; | ||
/// Polls for there to be less than `max_concurrency` [`UploadPart`] in progress |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've changed this to be less than
instead of equal as I think it means if you want to limit to x
requests you poll for x
requests, as opposed to x - 1
@@ -216,6 +216,7 @@ impl AsyncBufRead for BufReader { | |||
/// streamed using [`ObjectStore::put_multipart`] | |||
pub struct BufWriter { | |||
capacity: usize, | |||
max_concurrency: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prior to #5500 this was an implementation detail of the stores, now it is user configurable 🎉
/// Override the maximum number of in-flight requests for this writer | ||
/// | ||
/// Defaults to 8 | ||
pub fn with_max_concurrency(self, max_concurrency: usize) -> Self { | ||
Self { | ||
max_concurrency, | ||
..self | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if 0 is passed in?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will apply backpressure if there is any in-flight request
Which issue does this PR close?
Closes #.
Rationale for this change
Follow up to #5500
What changes are included in this PR?
Are there any user-facing changes?