Skip to content

Commit

Permalink
Make object_store::multipart public (#4570)
Browse files Browse the repository at this point in the history
* Make object_store::multipart public

* one more public

* docs

* doc

* more docs

* derive debug

* debug
  • Loading branch information
yjshen authored Jul 26, 2023
1 parent 0b75e8f commit bff6155
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 5 deletions.
4 changes: 2 additions & 2 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,8 @@ pub use client::{backoff::BackoffConfig, retry::RetryConfig, CredentialProvider}
#[cfg(any(feature = "gcp", feature = "aws", feature = "azure", feature = "http"))]
mod config;

#[cfg(any(feature = "azure", feature = "aws", feature = "gcp"))]
mod multipart;
#[cfg(feature = "cloud")]
pub mod multipart;
mod parse;
mod util;

Expand Down
30 changes: 27 additions & 3 deletions object_store/src/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
// specific language governing permissions and limitations
// under the License.

//! Cloud Multipart Upload
//!
//! This crate provides an asynchronous interface for multipart file uploads to cloud storage services.
//! It's designed to offer efficient, non-blocking operations,
//! especially useful when dealing with large files or high-throughput systems.

use async_trait::async_trait;
use futures::{stream::FuturesUnordered, Future, StreamExt};
use std::{io, pin::Pin, sync::Arc, task::Poll};
Expand All @@ -28,7 +34,7 @@ type BoxedTryFuture<T> = Pin<Box<dyn Future<Output = Result<T, io::Error>> + Sen
/// and used in combination with [`CloudMultiPartUpload`] to provide
/// multipart upload support
#[async_trait]
pub(crate) trait CloudMultiPartUploadImpl: 'static {
pub trait CloudMultiPartUploadImpl: 'static {
/// Upload a single part
async fn put_multipart_part(
&self,
Expand All @@ -42,12 +48,15 @@ pub(crate) trait CloudMultiPartUploadImpl: 'static {
async fn complete(&self, completed_parts: Vec<UploadPart>) -> Result<(), io::Error>;
}

/// Represents a part of a file that has been successfully uploaded in a multipart upload process.
#[derive(Debug, Clone)]
pub(crate) struct UploadPart {
pub struct UploadPart {
/// Id of this part
pub content_id: String,
}

pub(crate) struct CloudMultiPartUpload<T>
/// Struct that manages and controls multipart uploads to a cloud storage service.
pub struct CloudMultiPartUpload<T>
where
T: CloudMultiPartUploadImpl,
{
Expand Down Expand Up @@ -75,6 +84,7 @@ impl<T> CloudMultiPartUpload<T>
where
T: CloudMultiPartUploadImpl,
{
/// Create a new multipart upload with the implementation and the given maximum concurrency
pub fn new(inner: T, max_concurrency: usize) -> Self {
Self {
inner: Arc::new(inner),
Expand Down Expand Up @@ -103,6 +113,7 @@ where
to_copy
}

/// Poll current tasks
pub fn poll_tasks(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
Expand Down Expand Up @@ -257,3 +268,16 @@ where
Pin::new(completion_task).poll(cx)
}
}

impl<T: CloudMultiPartUploadImpl> std::fmt::Debug for CloudMultiPartUpload<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CloudMultiPartUpload")
.field("completed_parts", &self.completed_parts)
.field("tasks", &self.tasks)
.field("max_concurrency", &self.max_concurrency)
.field("current_buffer", &self.current_buffer)
.field("part_size", &self.part_size)
.field("current_part_idx", &self.current_part_idx)
.finish()
}
}

0 comments on commit bff6155

Please sign in to comment.