Skip to content

Commit

Permalink
feat: Add --multipart-upload-threshold cli flag & Separate part size …
Browse files Browse the repository at this point in the history
…for PUT and GET

Signed-off-by: Ryan Tan <hahadaxigua@gmail.com>
  • Loading branch information
crrow committed Jul 21, 2024
1 parent ac6c177 commit 349e742
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 4 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
# Don't commit releases
out/
# Python stuff
examples/**/venv
examples/**/venv
.idea/
4 changes: 4 additions & 0 deletions mountpoint-s3-client/src/failure_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ where
self.client.part_size()
}

fn multipart_upload_threshold(&self) -> Option<usize> {
self.client.multipart_upload_threshold()
}

async fn delete_object(
&self,
bucket: &str,
Expand Down
4 changes: 4 additions & 0 deletions mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,10 @@ impl ObjectClient for MockClient {
Some(self.config.part_size)
}

fn multipart_upload_threshold(&self) -> Option<usize> {
Some(self.config.part_size)
}

async fn delete_object(
&self,
bucket: &str,
Expand Down
4 changes: 4 additions & 0 deletions mountpoint-s3-client/src/mock_client/throughput_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ impl ObjectClient for ThroughputMockClient {
self.inner.part_size()
}

fn multipart_upload_threshold(&self) -> Option<usize> {
self.inner.multipart_upload_threshold()
}

async fn delete_object(
&self,
bucket: &str,
Expand Down
6 changes: 5 additions & 1 deletion mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,14 @@ pub trait ObjectClient {
type PutObjectRequest: PutObjectRequest<ClientError = Self::ClientError>;
type ClientError: std::error::Error + ProvideErrorMetadata + Send + Sync + 'static;

/// Query the part size this client uses for PUT and GET operations to the object store. This
/// Query the part size this client uses for GET operations to the object store. This
/// can be `None` if the client does not do multi-part operations.
fn part_size(&self) -> Option<usize>;

/// Query the part size this client uses for PUT operations to the object store. This
/// can be `None` if the client does not do multi-part operations.
fn multipart_upload_threshold(&self) -> Option<usize>;

/// Delete a single object from the object store.
///
/// DeleteObject will succeed even if the object within the bucket does not exist.
Expand Down
22 changes: 22 additions & 0 deletions mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub struct S3ClientConfig {
auth_config: S3ClientAuthConfig,
throughput_target_gbps: f64,
part_size: usize,
multipart_upload_threshold: usize,
endpoint_config: EndpointConfig,
user_agent: Option<UserAgent>,
request_payer: Option<String>,
Expand All @@ -102,6 +103,7 @@ impl Default for S3ClientConfig {
auth_config: Default::default(),
throughput_target_gbps: 10.0,
part_size: DEFAULT_PART_SIZE,
multipart_upload_threshold: DEFAULT_PART_SIZE,
endpoint_config: EndpointConfig::new("us-east-1"),
user_agent: None,
request_payer: None,
Expand Down Expand Up @@ -132,6 +134,12 @@ impl S3ClientConfig {
self
}

#[must_use = "S3ClientConfig follows a builder pattern"]
pub fn multipart_upload_threshold(mut self, multipart_upload_threshold: usize) -> Self {
self.multipart_upload_threshold = multipart_upload_threshold;
self
}

/// Set the target throughput in Gbps for the S3 client
#[must_use = "S3ClientConfig follows a builder pattern"]
pub fn throughput_target_gbps(mut self, throughput_target_gbps: f64) -> Self {
Expand Down Expand Up @@ -249,6 +257,7 @@ struct S3CrtClientInner {
user_agent_header: String,
request_payer: Option<String>,
part_size: usize,
multipart_upload_threshold: usize,
bucket_owner: Option<String>,
credentials_provider: Option<CredentialsProvider>,
host_resolver: HostResolver,
Expand Down Expand Up @@ -360,6 +369,14 @@ impl S3CrtClientInner {
}
client_config.part_size(config.part_size);

if !(config.part_size..=max_part_size).contains(&config.multipart_upload_threshold) {
return Err(NewClientError::InvalidConfiguration(format!(
"multipart upload threshold must be at least the part size and at most {}GiB",
max_part_size / 1024 / 1024 / 1024
)));
}
client_config.multipart_upload_threshold(config.multipart_upload_threshold);

let user_agent = config.user_agent.unwrap_or_else(|| UserAgent::new(None));
let user_agent_header = user_agent.build();

Expand All @@ -374,6 +391,7 @@ impl S3CrtClientInner {
user_agent_header,
request_payer: config.request_payer,
part_size: config.part_size,
multipart_upload_threshold: config.multipart_upload_threshold,
bucket_owner: config.bucket_owner,
credentials_provider: Some(credentials_provider),
host_resolver,
Expand Down Expand Up @@ -1152,6 +1170,10 @@ impl ObjectClient for S3CrtClient {
Some(self.inner.part_size)
}

fn multipart_upload_threshold(&self) -> Option<usize> {
Some(self.inner.multipart_upload_threshold)
}

async fn delete_object(
&self,
bucket: &str,
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3-client/tests/put_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ async fn test_put_object_write_cancelled() {
.expect("put_object should succeed");

// Write a multiple of `part_size` to ensure it will not complete immediately.
let full_size = client.part_size().unwrap() * 10;
let full_size = client.multipart_upload_threshold().unwrap() * 10;
let buffer = vec![0u8; full_size];

// Complete one write to ensure the MPU was created and the buffer for the upload request is available.
Expand Down
10 changes: 10 additions & 0 deletions mountpoint-s3-crt/src/s3/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,16 @@ impl ClientConfig {
self
}

/// The size threshold in bytes for when to use multipart uploads.
///
/// Uploads larger than this will use the multipart upload strategy.
/// Uploads smaller or equal to this will use a single HTTP request.
/// This only affects AWS_S3_META_REQUEST_TYPE_PUT_OBJECT.
pub fn multipart_upload_threshold(&mut self, multipart_upload_threshold: usize) -> &mut Self {
self.inner.multipart_upload_threshold = multipart_upload_threshold as u64;
self
}

/// If the part size needs to be adjusted for service limits, this is the maximum size it will be adjusted to.
pub fn max_part_size(&mut self, max_part_size: usize) -> &mut Self {
self.inner.max_part_size = max_part_size as u64;
Expand Down
19 changes: 19 additions & 0 deletions mountpoint-s3/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::cmp::max;
use std::env;
use std::fs::File;
use std::io::{Read, Write};
Expand Down Expand Up @@ -165,6 +166,23 @@ pub struct CliArgs {
)]
pub part_size: u64,

#[clap(
long,
long_help = "The size threshold in bytes for when to use multipart uploads.
Uploads larger than this will use the multipart upload strategy.
Uploads smaller or equal to this will use a single HTTP request.
This only affects AWS_S3_META_REQUEST_TYPE_PUT_OBJECT.
If set, this should be at least `part_size`.
If not set, maximal of `part_size` and 5 MiB will be used.
",
value_parser = value_parser!(u64).range(1..usize::MAX as u64),
help_heading = CLIENT_OPTIONS_HEADER,
)]
pub multipart_upload_threshold: Option<u64>,

#[clap(
long,
help = "Owner UID [default: current user's UID]",
Expand Down Expand Up @@ -619,6 +637,7 @@ pub fn create_s3_client(args: &CliArgs) -> anyhow::Result<(S3CrtClient, EventLoo
.auth_config(auth_config)
.throughput_target_gbps(throughput_target_gbps)
.part_size(args.part_size as usize)
.multipart_upload_threshold(args.multipart_upload_threshold.unwrap_or(max(5 << 20, args.part_size)) as usize)
.user_agent(user_agent);
if args.requester_pays {
client_config = client_config.request_payer("requester");
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3/src/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl<Client: ObjectClient> UploadRequest<Client> {
let request = inner.client.put_object(bucket, key, &params).await?;
let maximum_upload_size = inner
.client
.part_size()
.multipart_upload_threshold()
.map(|ps| ps.saturating_mul(MAX_S3_MULTIPART_UPLOAD_PARTS));

Ok(Self {
Expand Down

0 comments on commit 349e742

Please sign in to comment.