Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup multipart upload trait #4572

Merged
merged 2 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::client::list::ListClient;
use crate::client::list_response::ListResponse;
use crate::client::retry::RetryExt;
use crate::client::GetOptionsExt;
use crate::multipart::UploadPart;
use crate::multipart::PartId;
use crate::path::DELIMITER;
use crate::{
ClientOptions, GetOptions, ListResult, MultipartId, Path, Result, RetryConfig,
Expand Down Expand Up @@ -479,7 +479,7 @@ impl S3Client {
&self,
location: &Path,
upload_id: &str,
parts: Vec<UploadPart>,
parts: Vec<PartId>,
) -> Result<()> {
let parts = parts
.into_iter()
Expand Down
30 changes: 8 additions & 22 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use crate::client::{
TokenCredentialProvider,
};
use crate::config::ConfigValue;
use crate::multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart};
use crate::multipart::{PartId, PutPart, WriteMultiPart};
use crate::{
ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
ObjectStore, Path, Result, RetryConfig,
Expand Down Expand Up @@ -227,7 +227,7 @@ impl ObjectStore for AmazonS3 {
client: Arc::clone(&self.client),
};

Ok((id, Box::new(CloudMultiPartUpload::new(upload, 8))))
Ok((id, Box::new(WriteMultiPart::new(upload, 8))))
}

async fn abort_multipart(
Expand Down Expand Up @@ -308,12 +308,8 @@ struct S3MultiPartUpload {
}

#[async_trait]
impl CloudMultiPartUploadImpl for S3MultiPartUpload {
async fn put_multipart_part(
&self,
buf: Vec<u8>,
part_idx: usize,
) -> Result<UploadPart, std::io::Error> {
impl PutPart for S3MultiPartUpload {
async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
use reqwest::header::ETAG;
let part = (part_idx + 1).to_string();

Expand All @@ -326,26 +322,16 @@ impl CloudMultiPartUploadImpl for S3MultiPartUpload {
)
.await?;

let etag = response
.headers()
.get(ETAG)
.context(MissingEtagSnafu)
.map_err(crate::Error::from)?;
let etag = response.headers().get(ETAG).context(MissingEtagSnafu)?;

let etag = etag
.to_str()
.context(BadHeaderSnafu)
.map_err(crate::Error::from)?;
let etag = etag.to_str().context(BadHeaderSnafu)?;

Ok(UploadPart {
Ok(PartId {
content_id: etag.to_string(),
})
}

async fn complete(
&self,
completed_parts: Vec<UploadPart>,
) -> Result<(), std::io::Error> {
async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()> {
self.client
.complete_multipart(&self.location, &self.upload_id, completed_parts)
.await?;
Expand Down
17 changes: 6 additions & 11 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
//! after 7 days.
use self::client::{BlockId, BlockList};
use crate::{
multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart},
multipart::{PartId, PutPart, WriteMultiPart},
path::Path,
ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
ObjectStore, Result, RetryConfig,
Expand All @@ -42,7 +42,6 @@ use percent_encoding::percent_decode_str;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, Snafu};
use std::fmt::{Debug, Formatter};
use std::io;
use std::str::FromStr;
use std::sync::Arc;
use tokio::io::AsyncWrite;
Expand Down Expand Up @@ -186,7 +185,7 @@ impl ObjectStore for MicrosoftAzure {
client: Arc::clone(&self.client),
location: location.to_owned(),
};
Ok((String::new(), Box::new(CloudMultiPartUpload::new(inner, 8))))
Ok((String::new(), Box::new(WriteMultiPart::new(inner, 8))))
}

async fn abort_multipart(
Expand Down Expand Up @@ -243,12 +242,8 @@ struct AzureMultiPartUpload {
}

#[async_trait]
impl CloudMultiPartUploadImpl for AzureMultiPartUpload {
async fn put_multipart_part(
&self,
buf: Vec<u8>,
part_idx: usize,
) -> Result<UploadPart, io::Error> {
impl PutPart for AzureMultiPartUpload {
async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
let content_id = format!("{part_idx:20}");
let block_id: BlockId = content_id.clone().into();

Expand All @@ -264,10 +259,10 @@ impl CloudMultiPartUploadImpl for AzureMultiPartUpload {
)
.await?;

Ok(UploadPart { content_id })
Ok(PartId { content_id })
}

async fn complete(&self, completed_parts: Vec<UploadPart>) -> Result<(), io::Error> {
async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()> {
let blocks = completed_parts
.into_iter()
.map(|part| BlockId::from(part.content_id))
Expand Down
77 changes: 37 additions & 40 deletions object_store/src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
//! to abort the upload and drop those unneeded parts. In addition, you may wish to
//! consider implementing automatic clean up of unused parts that are older than one
//! week.
use std::io;
use std::str::FromStr;
use std::sync::Arc;

Expand All @@ -52,7 +51,7 @@ use crate::client::{
TokenCredentialProvider,
};
use crate::{
multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart},
multipart::{PartId, PutPart, WriteMultiPart},
path::{Path, DELIMITER},
ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
ObjectStore, Result, RetryConfig,
Expand Down Expand Up @@ -117,6 +116,15 @@ enum Error {
#[snafu(display("Error getting put response body: {}", source))]
PutResponseBody { source: reqwest::Error },

#[snafu(display("Got invalid put response: {}", source))]
InvalidPutResponse { source: quick_xml::de::DeError },

#[snafu(display("Error performing post request {}: {}", path, source))]
PostRequest {
source: crate::client::retry::Error,
path: String,
},

#[snafu(display("Error decoding object size: {}", source))]
InvalidSize { source: std::num::ParseIntError },

Expand Down Expand Up @@ -148,6 +156,12 @@ enum Error {

#[snafu(display("Configuration key: '{}' is not known.", key))]
UnknownConfigurationKey { key: String },

#[snafu(display("ETag Header missing from response"))]
MissingEtag,

#[snafu(display("Received header containing non-ASCII data"))]
BadHeader { source: header::ToStrError },
}

impl From<Error> for super::Error {
Expand Down Expand Up @@ -283,14 +297,9 @@ impl GoogleCloudStorageClient {
})?;

let data = response.bytes().await.context(PutResponseBodySnafu)?;
let result: InitiateMultipartUploadResult = quick_xml::de::from_reader(
data.as_ref().reader(),
)
.context(InvalidXMLResponseSnafu {
method: "POST".to_string(),
url,
data,
})?;
let result: InitiateMultipartUploadResult =
quick_xml::de::from_reader(data.as_ref().reader())
.context(InvalidPutResponseSnafu)?;

Ok(result.upload_id)
}
Expand Down Expand Up @@ -472,24 +481,16 @@ struct GCSMultipartUpload {
}

#[async_trait]
impl CloudMultiPartUploadImpl for GCSMultipartUpload {
impl PutPart for GCSMultipartUpload {
/// Upload an object part <https://cloud.google.com/storage/docs/xml-api/put-object-multipart>
async fn put_multipart_part(
&self,
buf: Vec<u8>,
part_idx: usize,
) -> Result<UploadPart, io::Error> {
async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
let upload_id = self.multipart_id.clone();
let url = format!(
"{}/{}/{}",
self.client.base_url, self.client.bucket_name_encoded, self.encoded_path
);

let credential = self
.client
.get_credential()
.await
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
let credential = self.client.get_credential().await?;

let response = self
.client
Expand All @@ -504,26 +505,24 @@ impl CloudMultiPartUploadImpl for GCSMultipartUpload {
.header(header::CONTENT_LENGTH, format!("{}", buf.len()))
.body(buf)
.send_retry(&self.client.retry_config)
.await?;
.await
.context(PutRequestSnafu {
path: &self.encoded_path,
})?;

let content_id = response
.headers()
.get("ETag")
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidData,
"response headers missing ETag",
)
})?
.context(MissingEtagSnafu)?
.to_str()
.map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?
.context(BadHeaderSnafu)?
.to_string();

Ok(UploadPart { content_id })
Ok(PartId { content_id })
}

/// Complete a multipart upload <https://cloud.google.com/storage/docs/xml-api/post-object-complete>
async fn complete(&self, completed_parts: Vec<UploadPart>) -> Result<(), io::Error> {
async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()> {
let upload_id = self.multipart_id.clone();
let url = format!(
"{}/{}/{}",
Expand All @@ -539,16 +538,11 @@ impl CloudMultiPartUploadImpl for GCSMultipartUpload {
})
.collect();

let credential = self
.client
.get_credential()
.await
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;

let credential = self.client.get_credential().await?;
let upload_info = CompleteMultipartUpload { parts };

let data = quick_xml::se::to_string(&upload_info)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?
.context(InvalidPutResponseSnafu)?
// We cannot disable the escaping that transforms "/" to "&quote;" :(
// https://github.com/tafia/quick-xml/issues/362
// https://github.com/tafia/quick-xml/issues/350
Expand All @@ -561,7 +555,10 @@ impl CloudMultiPartUploadImpl for GCSMultipartUpload {
.query(&[("uploadId", upload_id)])
.body(data)
.send_retry(&self.client.retry_config)
.await?;
.await
.context(PostRequestSnafu {
path: &self.encoded_path,
})?;

Ok(())
}
Expand All @@ -588,7 +585,7 @@ impl ObjectStore for GoogleCloudStorage {
multipart_id: upload_id.clone(),
};

Ok((upload_id, Box::new(CloudMultiPartUpload::new(inner, 8))))
Ok((upload_id, Box::new(WriteMultiPart::new(inner, 8))))
}

async fn abort_multipart(
Expand Down
Loading
Loading