Skip to content

Commit

Permalink
Cleanup multipart upload trait (#4572)
Browse files Browse the repository at this point in the history
* Cleanup multipart upload trait

* Update object_store/src/multipart.rs

Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>

---------

Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
  • Loading branch information
tustvold and viirya authored Jul 27, 2023
1 parent b27dc7e commit fba19b0
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 109 deletions.
4 changes: 2 additions & 2 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::client::list::ListClient;
use crate::client::list_response::ListResponse;
use crate::client::retry::RetryExt;
use crate::client::GetOptionsExt;
use crate::multipart::UploadPart;
use crate::multipart::PartId;
use crate::path::DELIMITER;
use crate::{
ClientOptions, GetOptions, ListResult, MultipartId, Path, Result, RetryConfig,
Expand Down Expand Up @@ -479,7 +479,7 @@ impl S3Client {
&self,
location: &Path,
upload_id: &str,
parts: Vec<UploadPart>,
parts: Vec<PartId>,
) -> Result<()> {
let parts = parts
.into_iter()
Expand Down
30 changes: 8 additions & 22 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use crate::client::{
TokenCredentialProvider,
};
use crate::config::ConfigValue;
use crate::multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart};
use crate::multipart::{PartId, PutPart, WriteMultiPart};
use crate::{
ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
ObjectStore, Path, Result, RetryConfig,
Expand Down Expand Up @@ -227,7 +227,7 @@ impl ObjectStore for AmazonS3 {
client: Arc::clone(&self.client),
};

Ok((id, Box::new(CloudMultiPartUpload::new(upload, 8))))
Ok((id, Box::new(WriteMultiPart::new(upload, 8))))
}

async fn abort_multipart(
Expand Down Expand Up @@ -308,12 +308,8 @@ struct S3MultiPartUpload {
}

#[async_trait]
impl CloudMultiPartUploadImpl for S3MultiPartUpload {
async fn put_multipart_part(
&self,
buf: Vec<u8>,
part_idx: usize,
) -> Result<UploadPart, std::io::Error> {
impl PutPart for S3MultiPartUpload {
async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
use reqwest::header::ETAG;
let part = (part_idx + 1).to_string();

Expand All @@ -326,26 +322,16 @@ impl CloudMultiPartUploadImpl for S3MultiPartUpload {
)
.await?;

let etag = response
.headers()
.get(ETAG)
.context(MissingEtagSnafu)
.map_err(crate::Error::from)?;
let etag = response.headers().get(ETAG).context(MissingEtagSnafu)?;

let etag = etag
.to_str()
.context(BadHeaderSnafu)
.map_err(crate::Error::from)?;
let etag = etag.to_str().context(BadHeaderSnafu)?;

Ok(UploadPart {
Ok(PartId {
content_id: etag.to_string(),
})
}

async fn complete(
&self,
completed_parts: Vec<UploadPart>,
) -> Result<(), std::io::Error> {
async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()> {
self.client
.complete_multipart(&self.location, &self.upload_id, completed_parts)
.await?;
Expand Down
17 changes: 6 additions & 11 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
//! after 7 days.
use self::client::{BlockId, BlockList};
use crate::{
multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart},
multipart::{PartId, PutPart, WriteMultiPart},
path::Path,
ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
ObjectStore, Result, RetryConfig,
Expand All @@ -42,7 +42,6 @@ use percent_encoding::percent_decode_str;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, Snafu};
use std::fmt::{Debug, Formatter};
use std::io;
use std::str::FromStr;
use std::sync::Arc;
use tokio::io::AsyncWrite;
Expand Down Expand Up @@ -186,7 +185,7 @@ impl ObjectStore for MicrosoftAzure {
client: Arc::clone(&self.client),
location: location.to_owned(),
};
Ok((String::new(), Box::new(CloudMultiPartUpload::new(inner, 8))))
Ok((String::new(), Box::new(WriteMultiPart::new(inner, 8))))
}

async fn abort_multipart(
Expand Down Expand Up @@ -243,12 +242,8 @@ struct AzureMultiPartUpload {
}

#[async_trait]
impl CloudMultiPartUploadImpl for AzureMultiPartUpload {
async fn put_multipart_part(
&self,
buf: Vec<u8>,
part_idx: usize,
) -> Result<UploadPart, io::Error> {
impl PutPart for AzureMultiPartUpload {
async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
let content_id = format!("{part_idx:20}");
let block_id: BlockId = content_id.clone().into();

Expand All @@ -264,10 +259,10 @@ impl CloudMultiPartUploadImpl for AzureMultiPartUpload {
)
.await?;

Ok(UploadPart { content_id })
Ok(PartId { content_id })
}

async fn complete(&self, completed_parts: Vec<UploadPart>) -> Result<(), io::Error> {
async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()> {
let blocks = completed_parts
.into_iter()
.map(|part| BlockId::from(part.content_id))
Expand Down
77 changes: 37 additions & 40 deletions object_store/src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
//! to abort the upload and drop those unneeded parts. In addition, you may wish to
//! consider implementing automatic clean up of unused parts that are older than one
//! week.
use std::io;
use std::str::FromStr;
use std::sync::Arc;

Expand All @@ -52,7 +51,7 @@ use crate::client::{
TokenCredentialProvider,
};
use crate::{
multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart},
multipart::{PartId, PutPart, WriteMultiPart},
path::{Path, DELIMITER},
ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
ObjectStore, Result, RetryConfig,
Expand Down Expand Up @@ -117,6 +116,15 @@ enum Error {
#[snafu(display("Error getting put response body: {}", source))]
PutResponseBody { source: reqwest::Error },

#[snafu(display("Got invalid put response: {}", source))]
InvalidPutResponse { source: quick_xml::de::DeError },

#[snafu(display("Error performing post request {}: {}", path, source))]
PostRequest {
source: crate::client::retry::Error,
path: String,
},

#[snafu(display("Error decoding object size: {}", source))]
InvalidSize { source: std::num::ParseIntError },

Expand Down Expand Up @@ -148,6 +156,12 @@ enum Error {

#[snafu(display("Configuration key: '{}' is not known.", key))]
UnknownConfigurationKey { key: String },

#[snafu(display("ETag Header missing from response"))]
MissingEtag,

#[snafu(display("Received header containing non-ASCII data"))]
BadHeader { source: header::ToStrError },
}

impl From<Error> for super::Error {
Expand Down Expand Up @@ -283,14 +297,9 @@ impl GoogleCloudStorageClient {
})?;

let data = response.bytes().await.context(PutResponseBodySnafu)?;
let result: InitiateMultipartUploadResult = quick_xml::de::from_reader(
data.as_ref().reader(),
)
.context(InvalidXMLResponseSnafu {
method: "POST".to_string(),
url,
data,
})?;
let result: InitiateMultipartUploadResult =
quick_xml::de::from_reader(data.as_ref().reader())
.context(InvalidPutResponseSnafu)?;

Ok(result.upload_id)
}
Expand Down Expand Up @@ -472,24 +481,16 @@ struct GCSMultipartUpload {
}

#[async_trait]
impl CloudMultiPartUploadImpl for GCSMultipartUpload {
impl PutPart for GCSMultipartUpload {
/// Upload an object part <https://cloud.google.com/storage/docs/xml-api/put-object-multipart>
async fn put_multipart_part(
&self,
buf: Vec<u8>,
part_idx: usize,
) -> Result<UploadPart, io::Error> {
async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
let upload_id = self.multipart_id.clone();
let url = format!(
"{}/{}/{}",
self.client.base_url, self.client.bucket_name_encoded, self.encoded_path
);

let credential = self
.client
.get_credential()
.await
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
let credential = self.client.get_credential().await?;

let response = self
.client
Expand All @@ -504,26 +505,24 @@ impl CloudMultiPartUploadImpl for GCSMultipartUpload {
.header(header::CONTENT_LENGTH, format!("{}", buf.len()))
.body(buf)
.send_retry(&self.client.retry_config)
.await?;
.await
.context(PutRequestSnafu {
path: &self.encoded_path,
})?;

let content_id = response
.headers()
.get("ETag")
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidData,
"response headers missing ETag",
)
})?
.context(MissingEtagSnafu)?
.to_str()
.map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?
.context(BadHeaderSnafu)?
.to_string();

Ok(UploadPart { content_id })
Ok(PartId { content_id })
}

/// Complete a multipart upload <https://cloud.google.com/storage/docs/xml-api/post-object-complete>
async fn complete(&self, completed_parts: Vec<UploadPart>) -> Result<(), io::Error> {
async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()> {
let upload_id = self.multipart_id.clone();
let url = format!(
"{}/{}/{}",
Expand All @@ -539,16 +538,11 @@ impl CloudMultiPartUploadImpl for GCSMultipartUpload {
})
.collect();

let credential = self
.client
.get_credential()
.await
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;

let credential = self.client.get_credential().await?;
let upload_info = CompleteMultipartUpload { parts };

let data = quick_xml::se::to_string(&upload_info)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?
.context(InvalidPutResponseSnafu)?
// We cannot disable the escaping that transforms "/" to "&quote;" :(
// https://github.com/tafia/quick-xml/issues/362
// https://github.com/tafia/quick-xml/issues/350
Expand All @@ -561,7 +555,10 @@ impl CloudMultiPartUploadImpl for GCSMultipartUpload {
.query(&[("uploadId", upload_id)])
.body(data)
.send_retry(&self.client.retry_config)
.await?;
.await
.context(PostRequestSnafu {
path: &self.encoded_path,
})?;

Ok(())
}
Expand All @@ -588,7 +585,7 @@ impl ObjectStore for GoogleCloudStorage {
multipart_id: upload_id.clone(),
};

Ok((upload_id, Box::new(CloudMultiPartUpload::new(inner, 8))))
Ok((upload_id, Box::new(WriteMultiPart::new(inner, 8))))
}

async fn abort_multipart(
Expand Down
Loading

0 comments on commit fba19b0

Please sign in to comment.