From 521e918063fb7892fe7332bdfd6395a5ca0eedc8 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sun, 10 Mar 2024 09:28:55 +1300 Subject: [PATCH 01/15] Replace AsyncWrite with Upload trait (#5458) --- object_store/src/azure/mod.rs | 10 +- object_store/src/buffered.rs | 70 +++--- object_store/src/chunked.rs | 16 +- object_store/src/client/mod.rs | 3 + object_store/src/client/parts.rs | 48 +++++ object_store/src/gcp/mod.rs | 98 +++++---- object_store/src/http/mod.rs | 13 +- object_store/src/lib.rs | 106 +++------ object_store/src/limit.rs | 59 +++-- object_store/src/local.rs | 358 +++++++++++-------------------- object_store/src/memory.rs | 90 +++----- object_store/src/multipart.rs | 240 +-------------------- object_store/src/prefix.rs | 18 +- object_store/src/throttle.rs | 14 +- object_store/src/upload.rs | 151 +++++++++++++ 15 files changed, 539 insertions(+), 755 deletions(-) create mode 100644 object_store/src/client/parts.rs create mode 100644 object_store/src/upload.rs diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 712b7a36c56a..423052bfee68 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -19,15 +19,11 @@ //! //! ## Streaming uploads //! -//! [ObjectStore::put_multipart] will upload data in blocks and write a blob from those -//! blocks. Data is buffered internally to make blocks of at least 5MB and blocks -//! are uploaded concurrently. +//! [ObjectStore::upload] will upload data in blocks and write a blob from those blocks. //! -//! [ObjectStore::abort_multipart] is a no-op, since Azure Blob Store doesn't provide -//! a way to drop old blocks. Instead unused blocks are automatically cleaned up -//! after 7 days. +//! Unused blocks will automatically be dropped after 7 days. use crate::{ - multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart}, + multipart::{MultiPartStore, PartId}, path::Path, signer::Signer, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult, diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index fdefe599f79e..f86a7cb7ff43 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -18,7 +18,7 @@ //! Utilities for performing tokio-style buffered IO use crate::path::Path; -use crate::{MultipartId, ObjectMeta, ObjectStore}; +use crate::{ChunkedUpload, ObjectMeta, ObjectStore}; use bytes::Bytes; use futures::future::{BoxFuture, FutureExt}; use futures::ready; @@ -213,7 +213,6 @@ impl AsyncBufRead for BufReader { pub struct BufWriter { capacity: usize, state: BufWriterState, - multipart_id: Option, store: Arc, } @@ -221,22 +220,19 @@ impl std::fmt::Debug for BufWriter { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("BufWriter") .field("capacity", &self.capacity) - .field("multipart_id", &self.multipart_id) .finish() } } -type MultipartResult = (MultipartId, Box); - enum BufWriterState { /// Buffer up to capacity bytes Buffer(Path, Vec), /// [`ObjectStore::put_multipart`] - Prepare(BoxFuture<'static, std::io::Result>), + Prepare(BoxFuture<'static, std::io::Result>), /// Write to a multipart upload - Write(Box), + Write(Option), /// [`ObjectStore::put`] - Put(BoxFuture<'static, std::io::Result<()>>), + Flush(BoxFuture<'static, std::io::Result<()>>), } impl BufWriter { @@ -251,14 +247,8 @@ impl BufWriter { capacity, store, state: BufWriterState::Buffer(path, Vec::new()), - multipart_id: None, } } - - /// Returns the [`MultipartId`] if multipart upload - pub fn multipart_id(&self) -> Option<&MultipartId> { - self.multipart_id.as_ref() - } } impl AsyncWrite for BufWriter { @@ -270,12 +260,15 @@ impl AsyncWrite for BufWriter { let cap = self.capacity; loop { return match &mut self.state { - BufWriterState::Write(write) => Pin::new(write).poll_write(cx, buf), - BufWriterState::Put(_) => panic!("Already shut down"), + BufWriterState::Write(Some(write)) => { + write.write(buf); + Poll::Ready(Ok(buf.len())) + } + BufWriterState::Write(None) | BufWriterState::Flush(_) => { + panic!("Already shut down") + } BufWriterState::Prepare(f) => { - let (id, w) = ready!(f.poll_unpin(cx)?); - self.state = BufWriterState::Write(w); - self.multipart_id = Some(id); + self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into()); continue; } BufWriterState::Buffer(path, b) => { @@ -284,9 +277,10 @@ impl AsyncWrite for BufWriter { let path = std::mem::take(path); let store = Arc::clone(&self.store); self.state = BufWriterState::Prepare(Box::pin(async move { - let (id, mut writer) = store.put_multipart(&path).await?; - writer.write_all(&buffer).await?; - Ok((id, writer)) + let upload = store.upload(&path).await?; + let mut chunked = ChunkedUpload::new(upload); + chunked.write(&buffer); + Ok(chunked) })); continue; } @@ -300,13 +294,10 @@ impl AsyncWrite for BufWriter { fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { return match &mut self.state { - BufWriterState::Buffer(_, _) => Poll::Ready(Ok(())), - BufWriterState::Write(write) => Pin::new(write).poll_flush(cx), - BufWriterState::Put(_) => panic!("Already shut down"), + BufWriterState::Write(_) | BufWriterState::Buffer(_, _) => Poll::Ready(Ok(())), + BufWriterState::Flush(_) => panic!("Already shut down"), BufWriterState::Prepare(f) => { - let (id, w) = ready!(f.poll_unpin(cx)?); - self.state = BufWriterState::Write(w); - self.multipart_id = Some(id); + self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into()); continue; } }; @@ -317,21 +308,28 @@ impl AsyncWrite for BufWriter { loop { match &mut self.state { BufWriterState::Prepare(f) => { - let (id, w) = ready!(f.poll_unpin(cx)?); - self.state = BufWriterState::Write(w); - self.multipart_id = Some(id); + self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into()); } BufWriterState::Buffer(p, b) => { let buf = std::mem::take(b); let path = std::mem::take(p); let store = Arc::clone(&self.store); - self.state = BufWriterState::Put(Box::pin(async move { + self.state = BufWriterState::Flush(Box::pin(async move { store.put(&path, buf.into()).await?; Ok(()) })); } - BufWriterState::Put(f) => return f.poll_unpin(cx), - BufWriterState::Write(w) => return Pin::new(w).poll_shutdown(cx), + BufWriterState::Flush(f) => return f.poll_unpin(cx), + BufWriterState::Write(x) => { + let upload = x.take().unwrap(); + self.state = BufWriterState::Flush( + async move { + upload.finish().await?; + Ok(()) + } + .boxed(), + ) + } } } } @@ -443,9 +441,7 @@ mod tests { writer.write_all(&[0; 20]).await.unwrap(); writer.flush().await.unwrap(); writer.write_all(&[0; 5]).await.unwrap(); - assert!(writer.multipart_id().is_none()); writer.shutdown().await.unwrap(); - assert!(writer.multipart_id().is_none()); assert_eq!(store.head(&path).await.unwrap().size, 25); // Test multipart @@ -453,9 +449,7 @@ mod tests { writer.write_all(&[0; 20]).await.unwrap(); writer.flush().await.unwrap(); writer.write_all(&[0; 20]).await.unwrap(); - assert!(writer.multipart_id().is_some()); writer.shutdown().await.unwrap(); - assert!(writer.multipart_id().is_some()); assert_eq!(store.head(&path).await.unwrap().size, 40); } diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs index d33556f4b12e..8ae3026cb0f2 100644 --- a/object_store/src/chunked.rs +++ b/object_store/src/chunked.rs @@ -25,14 +25,13 @@ use async_trait::async_trait; use bytes::{BufMut, Bytes, BytesMut}; use futures::stream::BoxStream; use futures::StreamExt; -use tokio::io::AsyncWrite; use crate::path::Path; +use crate::Result; use crate::{ GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, PutOptions, - PutResult, + PutResult, Upload, }; -use crate::{MultipartId, Result}; /// Wraps a [`ObjectStore`] and makes its get response return chunks /// in a controllable manner. @@ -67,15 +66,8 @@ impl ObjectStore for ChunkedStore { self.inner.put_opts(location, bytes, opts).await } - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)> { - self.inner.put_multipart(location).await - } - - async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> { - self.inner.abort_multipart(location, multipart_id).await + async fn upload(&self, location: &Path) -> Result> { + self.inner.upload(location).await } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs index 252e9fdcadf5..7728f38954f9 100644 --- a/object_store/src/client/mod.rs +++ b/object_store/src/client/mod.rs @@ -40,6 +40,9 @@ pub mod header; #[cfg(any(feature = "aws", feature = "gcp"))] pub mod s3; +#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))] +pub mod parts; + use async_trait::async_trait; use std::collections::HashMap; use std::str::FromStr; diff --git a/object_store/src/client/parts.rs b/object_store/src/client/parts.rs new file mode 100644 index 000000000000..bea63a1a87a9 --- /dev/null +++ b/object_store/src/client/parts.rs @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use parking_lot::Mutex; +use crate::multipart::PartId; + +/// An interior mutable collection of upload parts and their corresponding part index +#[derive(Debug, Default)] +pub(crate) struct Parts(Mutex>); + +impl Parts { + /// Record the [`PartId`] for a given index + /// + /// Note: calling this method multiple times with the same `part_idx` + /// will result in multiple [`PartId`] in the final output + pub(crate) fn put(&self, part_idx: usize, id: PartId) { + self.0.lock().push((part_idx, id)) + } + + /// Produce the final list of [`PartId`] ordered by `part_idx` + /// + /// `expected` is the number of parts expected in the final result + pub(crate) fn finish(&self, expected: usize) -> crate::Result> { + let mut parts = self.0.lock(); + if parts.len() != expected { + return Err(crate::Error::Generic { + store: "Parts", + source: "Missing part".to_string().into(), + }); + } + parts.sort_unstable_by_key(|(idx, _)| *idx); + Ok(parts.drain(..).map(|(_, v)| v).collect()) + } +} \ No newline at end of file diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 8633abbfb4dc..0feb057d1fec 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -20,15 +20,11 @@ //! ## Multi-part uploads //! //! [Multi-part uploads](https://cloud.google.com/storage/docs/multipart-uploads) -//! can be initiated with the [ObjectStore::put_multipart] method. -//! Data passed to the writer is automatically buffered to meet the minimum size -//! requirements for a part. Multiple parts are uploaded concurrently. -//! -//! If the writer fails for any reason, you may have parts uploaded to GCS but not -//! used that you may be charged for. Use the [ObjectStore::abort_multipart] method -//! to abort the upload and drop those unneeded parts. In addition, you may wish to -//! consider implementing automatic clean up of unused parts that are older than one -//! week. +//! can be initiated with the [ObjectStore::upload] method. If neither [`Upload::complete`] +//! nor [`Upload::abort`] is invoked, you may have parts uploaded to GCS but not used, +//! that you will be charged for. It is recommended you configure a [lifecycle rule] to +//! abort incomplete multipart uploads after a certain period of time to avoid being +//! charged for storing partial uploads //! //! ## Using HTTP/2 //! @@ -36,23 +32,23 @@ //! because it allows much higher throughput in our benchmarks (see //! [#5194](https://github.com/apache/arrow-rs/issues/5194)). HTTP/2 can be //! enabled by setting [crate::ClientConfigKey::Http1Only] to false. +//! +//! [lifecycle rule]: https://cloud.google.com/storage/docs/lifecycle#abort-mpu use std::sync::Arc; use crate::client::CredentialProvider; use crate::{ - multipart::{PartId, PutPart, WriteMultiPart}, - path::Path, - GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult, - Result, + multipart::PartId, path::Path, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, + ObjectStore, PutOptions, PutResult, Result, Upload, UploadPart, }; use async_trait::async_trait; use bytes::Bytes; use client::GoogleCloudStorageClient; use futures::stream::BoxStream; -use tokio::io::AsyncWrite; use crate::client::get::GetClientExt; use crate::client::list::ListClientExt; +use crate::client::parts::Parts; use crate::multipart::MultiPartStore; pub use builder::{GoogleCloudStorageBuilder, GoogleConfigKey}; pub use credential::GcpCredential; @@ -89,27 +85,50 @@ impl GoogleCloudStorage { } } +#[derive(Debug)] struct GCSMultipartUpload { + state: Arc, + part_idx: usize, +} + +#[derive(Debug)] +struct UploadState { client: Arc, path: Path, multipart_id: MultipartId, + parts: Parts, } #[async_trait] -impl PutPart for GCSMultipartUpload { - /// Upload an object part - async fn put_part(&self, buf: Vec, part_idx: usize) -> Result { - self.client - .put_part(&self.path, &self.multipart_id, part_idx, buf.into()) +impl Upload for GCSMultipartUpload { + fn put_part(&mut self, data: Bytes) -> UploadPart { + let idx = self.part_idx; + self.part_idx += 1; + let state = Arc::clone(&self.state); + Box::pin(async move { + let part = state + .client + .put_part(&state.path, &state.multipart_id, idx, data) + .await?; + state.parts.put(idx, part); + Ok(()) + }) + } + + async fn complete(&mut self) -> Result { + let parts = self.state.parts.finish(self.part_idx)?; + + self.state + .client + .multipart_complete(&self.state.path, &self.state.multipart_id, parts) .await } - /// Complete a multipart upload - async fn complete(&self, completed_parts: Vec) -> Result<()> { - self.client - .multipart_complete(&self.path, &self.multipart_id, completed_parts) - .await?; - Ok(()) + async fn abort(&mut self) -> Result<()> { + self.state + .client + .multipart_cleanup(&self.state.path, &self.state.multipart_id) + .await } } @@ -119,27 +138,18 @@ impl ObjectStore for GoogleCloudStorage { self.client.put(location, bytes, opts).await } - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)> { + async fn upload(&self, location: &Path) -> Result> { let upload_id = self.client.multipart_initiate(location).await?; - let inner = GCSMultipartUpload { - client: Arc::clone(&self.client), - path: location.clone(), - multipart_id: upload_id.clone(), - }; - - Ok((upload_id, Box::new(WriteMultiPart::new(inner, 8)))) - } - - async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> { - self.client - .multipart_cleanup(location, multipart_id) - .await?; - - Ok(()) + Ok(Box::new(GCSMultipartUpload { + part_idx: 0, + state: Arc::new(UploadState { + client: Arc::clone(&self.client), + path: location.clone(), + multipart_id: upload_id.clone(), + parts: Default::default(), + }), + })) } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs index f1d11db4762c..ba549abed783 100644 --- a/object_store/src/http/mod.rs +++ b/object_store/src/http/mod.rs @@ -46,7 +46,7 @@ use crate::http::client::Client; use crate::path::Path; use crate::{ ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, - ObjectStore, PutMode, PutOptions, PutResult, Result, RetryConfig, + ObjectStore, PutMode, PutOptions, PutResult, Result, RetryConfig, Upload, }; mod client; @@ -115,15 +115,8 @@ impl ObjectStore for HttpStore { }) } - async fn put_multipart( - &self, - _location: &Path, - ) -> Result<(MultipartId, Box)> { - Err(super::Error::NotImplemented) - } - - async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> { - Err(super::Error::NotImplemented) + async fn upload(&self, _location: &Path) -> Result> { + Err(crate::Error::NotImplemented) } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 8132002b6e01..d302ce54e77d 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -496,9 +496,11 @@ pub use tags::TagSet; pub mod multipart; mod parse; +mod upload; mod util; pub use parse::{parse_url, parse_url_opts}; +pub use upload::*; pub use util::GetRange; use crate::path::Path; @@ -515,7 +517,6 @@ use std::fmt::{Debug, Formatter}; use std::io::{Read, Seek, SeekFrom}; use std::ops::Range; use std::sync::Arc; -use tokio::io::AsyncWrite; /// An alias for a dynamically dispatched object store implementation. pub type DynObjectStore = dyn ObjectStore; @@ -538,48 +539,11 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { /// Save the provided bytes to the specified location with the given options async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result; - /// Get a multi-part upload that allows writing data in chunks. + /// Perform a multipart upload /// - /// Most cloud-based uploads will buffer and upload parts in parallel. - /// - /// To complete the upload, [AsyncWrite::poll_shutdown] must be called - /// to completion. This operation is guaranteed to be atomic, it will either - /// make all the written data available at `location`, or fail. No clients - /// should be able to observe a partially written object. - /// - /// For some object stores (S3, GCS, and local in particular), if the - /// writer fails or panics, you must call [ObjectStore::abort_multipart] - /// to clean up partially written data. - /// - ///
- /// It is recommended applications wait for any in-flight requests to complete by calling `flush`, if - /// there may be a significant gap in time (> ~30s) before the next write. - /// These gaps can include times where the function returns control to the - /// caller while keeping the writer open. If `flush` is not called, futures - /// for in-flight requests may be left unpolled long enough for the requests - /// to time out, causing the write to fail. - ///
- /// - /// For applications requiring fine-grained control of multipart uploads - /// see [`MultiPartStore`], although note that this interface cannot be - /// supported by all [`ObjectStore`] backends. - /// - /// For applications looking to implement this interface for a custom - /// multipart API, see [`WriteMultiPart`] which handles the complexities - /// of performing parallel uploads of fixed size parts. - /// - /// [`WriteMultiPart`]: multipart::WriteMultiPart - /// [`MultiPartStore`]: multipart::MultiPartStore - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)>; - - /// Cleanup an aborted upload. - /// - /// See documentation for individual stores for exact behavior, as capabilities - /// vary by object store. - async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()>; + /// Client should prefer [`ObjectStore::put`] for small payloads, as streaming uploads + /// typically require multiple separate requests. See [`Upload`] for more information + async fn upload(&self, location: &Path) -> Result>; /// Return the bytes that are stored at the specified location. async fn get(&self, location: &Path) -> Result { @@ -764,19 +728,8 @@ macro_rules! as_ref_impl { self.as_ref().put_opts(location, bytes, opts).await } - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)> { - self.as_ref().put_multipart(location).await - } - - async fn abort_multipart( - &self, - location: &Path, - multipart_id: &MultipartId, - ) -> Result<()> { - self.as_ref().abort_multipart(location, multipart_id).await + async fn upload(&self, location: &Path) -> Result> { + self.as_ref().upload(location).await } async fn get(&self, location: &Path) -> Result { @@ -1247,8 +1200,6 @@ mod tests { use futures::stream::FuturesUnordered; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; - use std::future::Future; - use tokio::io::AsyncWriteExt; pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) { put_get_delete_list_opts(storage).await @@ -1923,12 +1874,11 @@ mod tests { let location = Path::from("test_dir/test_upload_file.txt"); // Can write to storage - let data = get_chunks(5_000, 10); + let data = get_chunks(5 * 1024 * 1024, 3); let bytes_expected = data.concat(); - let (_, mut writer) = storage.put_multipart(&location).await.unwrap(); - for chunk in &data { - writer.write_all(chunk).await.unwrap(); - } + let mut upload = storage.upload(&location).await.unwrap(); + let uploads = data.into_iter().map(|x| upload.put_part(x)); + futures::future::try_join_all(uploads).await.unwrap(); // Object should not yet exist in store let meta_res = storage.head(&location).await; @@ -1944,7 +1894,8 @@ mod tests { let result = storage.list_with_delimiter(None).await.unwrap(); assert_eq!(&result.objects, &[]); - writer.shutdown().await.unwrap(); + upload.complete().await.unwrap(); + let bytes_written = storage.get(&location).await.unwrap().bytes().await.unwrap(); assert_eq!(bytes_expected, bytes_written); @@ -1952,22 +1903,19 @@ mod tests { // Sizes chosen to ensure we write three parts let data = get_chunks(3_200_000, 7); let bytes_expected = data.concat(); - let (_, mut writer) = storage.put_multipart(&location).await.unwrap(); + let upload = storage.upload(&location).await.unwrap(); + let mut writer = ChunkedUpload::new(upload); for chunk in &data { - writer.write_all(chunk).await.unwrap(); + writer.write(chunk) } - writer.shutdown().await.unwrap(); + writer.finish().await.unwrap(); let bytes_written = storage.get(&location).await.unwrap().bytes().await.unwrap(); assert_eq!(bytes_expected, bytes_written); // We can abort an empty write let location = Path::from("test_dir/test_abort_upload.txt"); - let (upload_id, writer) = storage.put_multipart(&location).await.unwrap(); - drop(writer); - storage - .abort_multipart(&location, &upload_id) - .await - .unwrap(); + let mut upload = storage.upload(&location).await.unwrap(); + upload.abort().await.unwrap(); let get_res = storage.get(&location).await; assert!(get_res.is_err()); assert!(matches!( @@ -1976,17 +1924,13 @@ mod tests { )); // We can abort an in-progress write - let (upload_id, mut writer) = storage.put_multipart(&location).await.unwrap(); - if let Some(chunk) = data.first() { - writer.write_all(chunk).await.unwrap(); - let _ = writer.write(chunk).await.unwrap(); - } - drop(writer); - - storage - .abort_multipart(&location, &upload_id) + let mut upload = storage.upload(&location).await.unwrap(); + upload + .put_part(data.first().unwrap().clone()) .await .unwrap(); + + upload.abort().await.unwrap(); let get_res = storage.get(&location).await; assert!(get_res.is_err()); assert!(matches!( diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs index d1363d9a4d46..b20a95cadd6f 100644 --- a/object_store/src/limit.rs +++ b/object_store/src/limit.rs @@ -18,8 +18,8 @@ //! An object store that limits the maximum concurrency of the wrapped implementation use crate::{ - BoxStream, GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, ObjectMeta, - ObjectStore, Path, PutOptions, PutResult, Result, StreamExt, + BoxStream, GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, Path, + PutOptions, PutResult, Result, StreamExt, Upload, UploadPart, }; use async_trait::async_trait; use bytes::Bytes; @@ -81,18 +81,12 @@ impl ObjectStore for LimitStore { let _permit = self.semaphore.acquire().await.unwrap(); self.inner.put_opts(location, bytes, opts).await } - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)> { - let permit = Arc::clone(&self.semaphore).acquire_owned().await.unwrap(); - let (id, write) = self.inner.put_multipart(location).await?; - Ok((id, Box::new(PermitWrapper::new(write, permit)))) - } - - async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> { - let _permit = self.semaphore.acquire().await.unwrap(); - self.inner.abort_multipart(location, multipart_id).await + async fn upload(&self, location: &Path) -> Result> { + let upload = self.inner.upload(location).await?; + Ok(Box::new(LimitUpload { + semaphore: Arc::clone(&self.semaphore), + upload, + })) } async fn get(&self, location: &Path) -> Result { let permit = Arc::clone(&self.semaphore).acquire_owned().await.unwrap(); @@ -257,6 +251,43 @@ impl AsyncWrite for PermitWrapper { } } +#[derive(Debug)] +pub struct LimitUpload { + upload: Box, + semaphore: Arc, +} + +impl LimitUpload { + pub fn new(upload: Box, max_concurrency: usize) -> Self { + Self { + upload, + semaphore: Arc::new(Semaphore::new(max_concurrency)), + } + } +} + +#[async_trait] +impl Upload for LimitUpload { + fn put_part(&mut self, data: Bytes) -> UploadPart { + let upload = self.upload.put_part(data); + let s = Arc::clone(&self.semaphore); + Box::pin(async move { + let _permit = s.acquire().await.unwrap(); + upload.await + }) + } + + async fn complete(&mut self) -> Result { + let _permit = self.semaphore.acquire().await.unwrap(); + self.upload.complete().await + } + + async fn abort(&mut self) -> Result<()> { + let _permit = self.semaphore.acquire().await.unwrap(); + self.upload.abort().await + } +} + #[cfg(test)] mod tests { use crate::limit::LimitStore; diff --git a/object_store/src/local.rs b/object_store/src/local.rs index d631771778db..76394d72ecae 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -16,34 +16,32 @@ // under the License. //! An object store implementation for a local filesystem -use crate::{ - maybe_spawn_blocking, - path::{absolute_path_to_url, Path}, - util::InvalidGetRange, - GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, ObjectMeta, ObjectStore, - PutMode, PutOptions, PutResult, Result, -}; -use async_trait::async_trait; -use bytes::Bytes; -use chrono::{DateTime, Utc}; -use futures::future::BoxFuture; -use futures::ready; -use futures::{stream::BoxStream, StreamExt}; -use futures::{FutureExt, TryStreamExt}; -use snafu::{ensure, ResultExt, Snafu}; use std::fs::{metadata, symlink_metadata, File, Metadata, OpenOptions}; use std::io::{ErrorKind, Read, Seek, SeekFrom, Write}; use std::ops::Range; -use std::pin::Pin; use std::sync::Arc; -use std::task::Poll; use std::time::SystemTime; use std::{collections::BTreeSet, convert::TryFrom, io}; use std::{collections::VecDeque, path::PathBuf}; -use tokio::io::AsyncWrite; + +use async_trait::async_trait; +use bytes::Bytes; +use chrono::{DateTime, Utc}; +use futures::{stream::BoxStream, StreamExt}; +use futures::{FutureExt, TryStreamExt}; +use parking_lot::Mutex; +use snafu::{ensure, OptionExt, ResultExt, Snafu}; use url::Url; use walkdir::{DirEntry, WalkDir}; +use crate::{ + maybe_spawn_blocking, + path::{absolute_path_to_url, Path}, + util::InvalidGetRange, + GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, PutMode, + PutOptions, PutResult, Result, Upload, UploadPart, +}; + /// A specialized `Error` for filesystem object store-related errors #[derive(Debug, Snafu)] #[allow(missing_docs)] @@ -155,6 +153,9 @@ pub(crate) enum Error { InvalidPath { path: String, }, + + #[snafu(display("Upload aborted"))] + Aborted, } impl From for super::Error { @@ -342,8 +343,7 @@ impl ObjectStore for LocalFileSystem { let path = self.path_to_filesystem(location)?; maybe_spawn_blocking(move || { - let (mut file, suffix) = new_staged_upload(&path)?; - let staging_path = staged_upload_path(&path, &suffix); + let (mut file, staging_path) = new_staged_upload(&path)?; let mut e_tag = None; let err = match file.write_all(&bytes) { @@ -395,31 +395,10 @@ impl ObjectStore for LocalFileSystem { .await } - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)> { - let dest = self.path_to_filesystem(location)?; - - let (file, suffix) = new_staged_upload(&dest)?; - Ok(( - suffix.clone(), - Box::new(LocalUpload::new(dest, suffix, Arc::new(file))), - )) - } - - async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> { + async fn upload(&self, location: &Path) -> Result> { let dest = self.path_to_filesystem(location)?; - let path: PathBuf = staged_upload_path(&dest, multipart_id); - - maybe_spawn_blocking(move || match std::fs::remove_file(&path) { - Ok(_) => Ok(()), - Err(source) => match source.kind() { - ErrorKind::NotFound => Ok(()), // Already deleted - _ => Err(Error::UnableToDeleteFile { path, source }.into()), - }, - }) - .await + let (file, src) = new_staged_upload(&dest)?; + Ok(Box::new(LocalUpload::new(src, dest, file))) } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { @@ -677,17 +656,17 @@ fn create_parent_dirs(path: &std::path::Path, source: io::Error) -> Result<()> { Ok(()) } -/// Generates a unique file path `{base}#{suffix}`, returning the opened `File` and `suffix` +/// Generates a unique file path `{base}#{suffix}`, returning the opened `File` and `path` /// /// Creates any directories if necessary -fn new_staged_upload(base: &std::path::Path) -> Result<(File, String)> { +fn new_staged_upload(base: &std::path::Path) -> Result<(File, PathBuf)> { let mut multipart_id = 1; loop { let suffix = multipart_id.to_string(); let path = staged_upload_path(base, &suffix); let mut options = OpenOptions::new(); match options.read(true).write(true).create_new(true).open(&path) { - Ok(f) => return Ok((f, suffix)), + Ok(f) => return Ok((f, path)), Err(source) => match source.kind() { ErrorKind::AlreadyExists => multipart_id += 1, ErrorKind::NotFound => create_parent_dirs(&path, source)?, @@ -705,194 +684,91 @@ fn staged_upload_path(dest: &std::path::Path, suffix: &str) -> PathBuf { staging_path.into() } -enum LocalUploadState { - /// Upload is ready to send new data - Idle(Arc), - /// In the middle of a write - Writing(Arc, BoxFuture<'static, Result>), - /// In the middle of syncing data and closing file. - /// - /// Future will contain last reference to file, so it will call drop on completion. - ShuttingDown(BoxFuture<'static, Result<(), io::Error>>), - /// File is being moved from it's temporary location to the final location - Committing(BoxFuture<'static, Result<(), io::Error>>), - /// Upload is complete - Complete, +#[derive(Debug)] +struct LocalUpload { + /// The upload state + state: Arc, + /// The location of the temporary file + src: Option, + /// The next offset to write into the file + offset: u64, } -struct LocalUpload { - inner_state: LocalUploadState, +#[derive(Debug)] +struct UploadState { dest: PathBuf, - multipart_id: MultipartId, + file: Mutex>, } impl LocalUpload { - pub fn new(dest: PathBuf, multipart_id: MultipartId, file: Arc) -> Self { + pub fn new(src: PathBuf, dest: PathBuf, file: File) -> Self { Self { - inner_state: LocalUploadState::Idle(file), - dest, - multipart_id, + state: Arc::new(UploadState { + dest, + file: Mutex::new(Some(file)), + }), + src: Some(src), + offset: 0, } } } -impl AsyncWrite for LocalUpload { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> Poll> { - let invalid_state = |condition: &str| -> Poll> { - Poll::Ready(Err(io::Error::new( - ErrorKind::InvalidInput, - format!("Tried to write to file {condition}."), - ))) - }; +#[async_trait] +impl Upload for LocalUpload { + fn put_part(&mut self, data: Bytes) -> UploadPart { + let offset = self.offset; + self.offset += data.len() as u64; - if let Ok(runtime) = tokio::runtime::Handle::try_current() { - let mut data: Vec = buf.to_vec(); - let data_len = data.len(); - - loop { - match &mut self.inner_state { - LocalUploadState::Idle(file) => { - let file = Arc::clone(file); - let file2 = Arc::clone(&file); - let data: Vec = std::mem::take(&mut data); - self.inner_state = LocalUploadState::Writing( - file, - Box::pin( - runtime - .spawn_blocking(move || (&*file2).write_all(&data)) - .map(move |res| match res { - Err(err) => Err(io::Error::new(ErrorKind::Other, err)), - Ok(res) => res.map(move |_| data_len), - }), - ), - ); - } - LocalUploadState::Writing(file, inner_write) => { - let res = ready!(inner_write.poll_unpin(cx)); - self.inner_state = LocalUploadState::Idle(Arc::clone(file)); - return Poll::Ready(res); - } - LocalUploadState::ShuttingDown(_) => { - return invalid_state("when writer is shutting down"); - } - LocalUploadState::Committing(_) => { - return invalid_state("when writer is committing data"); - } - LocalUploadState::Complete => { - return invalid_state("when writer is complete"); - } - } - } - } else if let LocalUploadState::Idle(file) = &self.inner_state { - let file = Arc::clone(file); - (&*file).write_all(buf)?; - Poll::Ready(Ok(buf.len())) - } else { - // If we are running on this thread, then only possible states are Idle and Complete. - invalid_state("when writer is already complete.") - } + let s = Arc::clone(&self.state); + maybe_spawn_blocking(move || { + let mut f = s.file.lock(); + let file = f.as_mut().context(AbortedSnafu)?; + file.seek(SeekFrom::Start(offset)) + .context(SeekSnafu { path: &s.dest })?; + file.write_all(&data).context(UnableToCopyDataToFileSnafu)?; + Ok(()) + }) + .boxed() } - fn poll_flush( - self: Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> Poll> { - Poll::Ready(Ok(())) + async fn complete(&mut self) -> Result { + let src = self.src.take().context(AbortedSnafu)?; + let s = Arc::clone(&self.state); + maybe_spawn_blocking(move || { + // Ensure no inflight writes + let f = s.file.lock().take().context(AbortedSnafu)?; + std::fs::rename(&src, &s.dest).context(UnableToRenameFileSnafu)?; + let metadata = f.metadata().map_err(|e| Error::Metadata { + source: e.into(), + path: src.to_string_lossy().to_string(), + })?; + + Ok(PutResult { + e_tag: Some(get_etag(&metadata)), + version: None, + }) + }) + .await } - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - if let Ok(runtime) = tokio::runtime::Handle::try_current() { - loop { - match &mut self.inner_state { - LocalUploadState::Idle(file) => { - // We are moving file into the future, and it will be dropped on it's completion, closing the file. - let file = Arc::clone(file); - self.inner_state = LocalUploadState::ShuttingDown(Box::pin( - runtime - .spawn_blocking(move || (*file).sync_all()) - .map(move |res| match res { - Err(err) => Err(io::Error::new(io::ErrorKind::Other, err)), - Ok(res) => res, - }), - )); - } - LocalUploadState::ShuttingDown(fut) => match fut.poll_unpin(cx) { - Poll::Ready(res) => { - res?; - let staging_path = staged_upload_path(&self.dest, &self.multipart_id); - let dest = self.dest.clone(); - self.inner_state = LocalUploadState::Committing(Box::pin( - runtime - .spawn_blocking(move || std::fs::rename(&staging_path, &dest)) - .map(move |res| match res { - Err(err) => Err(io::Error::new(io::ErrorKind::Other, err)), - Ok(res) => res, - }), - )); - } - Poll::Pending => { - return Poll::Pending; - } - }, - LocalUploadState::Writing(_, _) => { - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::InvalidInput, - "Tried to commit a file where a write is in progress.", - ))); - } - LocalUploadState::Committing(fut) => { - let res = ready!(fut.poll_unpin(cx)); - self.inner_state = LocalUploadState::Complete; - return Poll::Ready(res); - } - LocalUploadState::Complete => { - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::Other, - "Already complete", - ))) - } - } - } - } else { - let staging_path = staged_upload_path(&self.dest, &self.multipart_id); - match &mut self.inner_state { - LocalUploadState::Idle(file) => { - let file = Arc::clone(file); - self.inner_state = LocalUploadState::Complete; - file.sync_all()?; - drop(file); - std::fs::rename(staging_path, &self.dest)?; - Poll::Ready(Ok(())) - } - _ => { - // If we are running on this thread, then only possible states are Idle and Complete. - Poll::Ready(Err(io::Error::new(ErrorKind::Other, "Already complete"))) - } - } - } + async fn abort(&mut self) -> Result<()> { + let src = self.src.take().context(AbortedSnafu)?; + maybe_spawn_blocking(move || { + std::fs::remove_file(&src).context(UnableToDeleteFileSnafu { path: &src })?; + Ok(()) + }) + .await } } impl Drop for LocalUpload { fn drop(&mut self) { - match self.inner_state { - LocalUploadState::Complete => (), - _ => { - self.inner_state = LocalUploadState::Complete; - let path = staged_upload_path(&self.dest, &self.multipart_id); - // Try to cleanup intermediate file ignoring any error - match tokio::runtime::Handle::try_current() { - Ok(r) => drop(r.spawn_blocking(move || std::fs::remove_file(path))), - Err(_) => drop(std::fs::remove_file(path)), - }; - } + if let Some(src) = self.src.take() { + // Try to clean up intermediate file ignoring any error + match tokio::runtime::Handle::try_current() { + Ok(r) => drop(r.spawn_blocking(move || std::fs::remove_file(src))), + Err(_) => drop(std::fs::remove_file(src)), + }; } } } @@ -1095,12 +971,13 @@ fn convert_walkdir_result( #[cfg(test)] mod tests { - use super::*; - use crate::test_util::flatten_list_stream; - use crate::tests::*; use futures::TryStreamExt; use tempfile::{NamedTempFile, TempDir}; - use tokio::io::AsyncWriteExt; + + use crate::test_util::flatten_list_stream; + use crate::tests::*; + + use super::*; #[tokio::test] async fn file_test() { @@ -1125,7 +1002,18 @@ mod tests { put_get_delete_list(&integration).await; list_uses_directories_correctly(&integration).await; list_with_delimiter(&integration).await; - stream_get(&integration).await; + + // Can't use stream_get test as ChunkedUpload uses a tokio JoinSet + let p = Path::from("manual_upload"); + let mut upload = integration.upload(&p).await.unwrap(); + upload.put_part(Bytes::from_static(b"123")).await.unwrap(); + upload.put_part(Bytes::from_static(b"45678")).await.unwrap(); + let r = upload.complete().await.unwrap(); + + let get = integration.get(&p).await.unwrap(); + assert_eq!(get.meta.e_tag.as_ref().unwrap(), r.e_tag.as_ref().unwrap()); + let actual = get.bytes().await.unwrap(); + assert_eq!(actual.as_ref(), b"12345678"); }); } @@ -1422,12 +1310,11 @@ mod tests { let location = Path::from("some_file"); let data = Bytes::from("arbitrary data"); - let (multipart_id, mut writer) = integration.put_multipart(&location).await.unwrap(); - writer.write_all(&data).await.unwrap(); + let mut u1 = integration.upload(&location).await.unwrap(); + u1.put_part(data.clone()).await.unwrap(); - let (multipart_id_2, mut writer_2) = integration.put_multipart(&location).await.unwrap(); - assert_ne!(multipart_id, multipart_id_2); - writer_2.write_all(&data).await.unwrap(); + let mut u2 = integration.upload(&location).await.unwrap(); + u2.put_part(data).await.unwrap(); let list = flatten_list_stream(&integration, None).await.unwrap(); assert_eq!(list.len(), 0); @@ -1520,11 +1407,13 @@ mod tests { #[cfg(not(target_arch = "wasm32"))] #[cfg(test)] mod not_wasm_tests { - use crate::local::LocalFileSystem; - use crate::{ObjectStore, Path}; use std::time::Duration; + + use bytes::Bytes; use tempfile::TempDir; - use tokio::io::AsyncWriteExt; + + use crate::local::LocalFileSystem; + use crate::{ObjectStore, Path}; #[tokio::test] async fn test_cleanup_intermediate_files() { @@ -1532,12 +1421,13 @@ mod not_wasm_tests { let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap(); let location = Path::from("some_file"); - let (_, mut writer) = integration.put_multipart(&location).await.unwrap(); - writer.write_all(b"hello").await.unwrap(); + let data = Bytes::from_static(b"hello"); + let mut upload = integration.upload(&location).await.unwrap(); + upload.put_part(data).await.unwrap(); let file_count = std::fs::read_dir(root.path()).unwrap().count(); assert_eq!(file_count, 1); - drop(writer); + drop(upload); tokio::time::sleep(Duration::from_millis(1)).await; @@ -1549,13 +1439,15 @@ mod not_wasm_tests { #[cfg(target_family = "unix")] #[cfg(test)] mod unix_test { - use crate::local::LocalFileSystem; - use crate::{ObjectStore, Path}; + use std::fs::OpenOptions; + use nix::sys::stat; use nix::unistd; - use std::fs::OpenOptions; use tempfile::TempDir; + use crate::local::LocalFileSystem; + use crate::{ObjectStore, Path}; + #[tokio::test] async fn test_fifo() { let filename = "some_file"; diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index 41ee1091a3b2..9e7fc2701b7c 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -16,27 +16,21 @@ // under the License. //! An in-memory object store implementation -use crate::multipart::{MultiPartStore, PartId}; -use crate::util::InvalidGetRange; -use crate::{ - path::Path, GetRange, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, - PutMode, PutOptions, PutResult, Result, UpdateVersion, -}; -use crate::{GetOptions, MultipartId}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; +use std::ops::Range; +use std::sync::Arc; + use async_trait::async_trait; use bytes::Bytes; use chrono::{DateTime, Utc}; use futures::{stream::BoxStream, StreamExt}; use parking_lot::RwLock; use snafu::{OptionExt, ResultExt, Snafu}; -use std::collections::BTreeSet; -use std::collections::{BTreeMap, HashMap}; -use std::io; -use std::ops::Range; -use std::pin::Pin; -use std::sync::Arc; -use std::task::Poll; -use tokio::io::AsyncWrite; + +use crate::{GetRange, GetResult, GetResultPayload, ListResult, MultipartId, ObjectMeta, ObjectStore, path::Path, PutMode, PutOptions, PutResult, Result, UpdateVersion, Upload, UploadPart}; +use crate::GetOptions; +use crate::multipart::{MultiPartStore, PartId}; +use crate::util::InvalidGetRange; /// A specialized `Error` for in-memory object store-related errors #[derive(Debug, Snafu)] @@ -213,23 +207,12 @@ impl ObjectStore for InMemory { }) } - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)> { - Ok(( - String::new(), - Box::new(InMemoryUpload { - location: location.clone(), - data: Vec::new(), - storage: Arc::clone(&self.storage), - }), - )) - } - - async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> { - // Nothing to clean up - Ok(()) + async fn upload(&self, location: &Path) -> Result> { + Ok(Box::new(InMemoryUpload { + location: location.clone(), + parts: vec![], + storage: Arc::clone(&self.storage), + })) } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { @@ -482,45 +465,42 @@ impl InMemory { } } +#[derive(Debug)] struct InMemoryUpload { location: Path, - data: Vec, + parts: Vec, storage: Arc>, } -impl AsyncWrite for InMemoryUpload { - fn poll_write( - mut self: Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> Poll> { - self.data.extend_from_slice(buf); - Poll::Ready(Ok(buf.len())) +#[async_trait] +impl Upload for InMemoryUpload { + fn put_part(&mut self, data: Bytes) -> UploadPart { + self.parts.push(data); + Box::pin(futures::future::ready(Ok(()))) } - fn poll_flush( - self: Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> Poll> { - Poll::Ready(Ok(())) + async fn complete(&mut self) -> Result { + let cap = self.parts.iter().map(|x| x.len()).sum(); + let mut buf = Vec::with_capacity(cap); + self.parts.iter().for_each(|x| buf.extend_from_slice(x)); + let etag = self.storage.write().insert(&self.location, buf.into()); + Ok(PutResult { + e_tag: Some(etag.to_string()), + version: None, + }) } - fn poll_shutdown( - mut self: Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> Poll> { - let data = Bytes::from(std::mem::take(&mut self.data)); - self.storage.write().insert(&self.location, data); - Poll::Ready(Ok(())) + async fn abort(&mut self) -> Result<()> { + Ok(()) } } #[cfg(test)] mod tests { - use super::*; - use crate::tests::*; + use super::*; + #[tokio::test] async fn in_memory_test() { let integration = InMemory::new(); diff --git a/object_store/src/multipart.rs b/object_store/src/multipart.rs index 1dcd5a6f4960..749fdeb9c022 100644 --- a/object_store/src/multipart.rs +++ b/object_store/src/multipart.rs @@ -17,33 +17,15 @@ //! Cloud Multipart Upload //! -//! This crate provides an asynchronous interface for multipart file uploads to cloud storage services. -//! It's designed to offer efficient, non-blocking operations, +//! This crate provides an asynchronous interface for multipart file uploads to +//! cloud storage services. It's designed to offer efficient, non-blocking operations, //! especially useful when dealing with large files or high-throughput systems. use async_trait::async_trait; use bytes::Bytes; -use futures::{stream::FuturesUnordered, Future, StreamExt}; -use std::{io, pin::Pin, sync::Arc, task::Poll}; -use tokio::io::AsyncWrite; -use crate::path::Path; use crate::{MultipartId, PutResult, Result}; - -type BoxedTryFuture = Pin> + Send>>; - -/// A trait used in combination with [`WriteMultiPart`] to implement -/// [`AsyncWrite`] on top of an API for multipart upload -#[async_trait] -pub trait PutPart: Send + Sync + 'static { - /// Upload a single part - async fn put_part(&self, buf: Vec, part_idx: usize) -> Result; - - /// Complete the upload with the provided parts - /// - /// `completed_parts` is in order of part number - async fn complete(&self, completed_parts: Vec) -> Result<()>; -} +use crate::path::Path; /// Represents a part of a file that has been successfully uploaded in a multipart upload process. #[derive(Debug, Clone)] @@ -52,222 +34,6 @@ pub struct PartId { pub content_id: String, } -/// Wrapper around a [`PutPart`] that implements [`AsyncWrite`] -/// -/// Data will be uploaded in fixed size chunks of 10 MiB in parallel, -/// up to the configured maximum concurrency -pub struct WriteMultiPart { - inner: Arc, - /// A list of completed parts, in sequential order. - completed_parts: Vec>, - /// Part upload tasks currently running - tasks: FuturesUnordered>, - /// Maximum number of upload tasks to run concurrently - max_concurrency: usize, - /// Buffer that will be sent in next upload. - current_buffer: Vec, - /// Size of each part. - /// - /// While S3 and Minio support variable part sizes, R2 requires they all be - /// exactly the same size. - part_size: usize, - /// Index of current part - current_part_idx: usize, - /// The completion task - completion_task: Option>, -} - -impl WriteMultiPart { - /// Create a new multipart upload with the implementation and the given maximum concurrency - pub fn new(inner: T, max_concurrency: usize) -> Self { - Self { - inner: Arc::new(inner), - completed_parts: Vec::new(), - tasks: FuturesUnordered::new(), - max_concurrency, - current_buffer: Vec::new(), - // TODO: Should self vary by provider? - // TODO: Should we automatically increase then when part index gets large? - - // Minimum size of 5 MiB - // https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html - // https://cloud.google.com/storage/quotas#requests - part_size: 10 * 1024 * 1024, - current_part_idx: 0, - completion_task: None, - } - } - - // Add data to the current buffer, returning the number of bytes added - fn add_to_buffer(mut self: Pin<&mut Self>, buf: &[u8], offset: usize) -> usize { - let remaining_capacity = self.part_size - self.current_buffer.len(); - let to_copy = std::cmp::min(remaining_capacity, buf.len() - offset); - self.current_buffer - .extend_from_slice(&buf[offset..offset + to_copy]); - to_copy - } - - /// Poll current tasks - fn poll_tasks( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Result<(), io::Error> { - if self.tasks.is_empty() { - return Ok(()); - } - while let Poll::Ready(Some(res)) = self.tasks.poll_next_unpin(cx) { - let (part_idx, part) = res?; - let total_parts = self.completed_parts.len(); - self.completed_parts - .resize(std::cmp::max(part_idx + 1, total_parts), None); - self.completed_parts[part_idx] = Some(part); - } - Ok(()) - } - - // The `poll_flush` function will only flush the in-progress tasks. - // The `final_flush` method called during `poll_shutdown` will flush - // the `current_buffer` along with in-progress tasks. - // Please see https://github.com/apache/arrow-rs/issues/3390 for more details. - fn final_flush( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - // Poll current tasks - self.as_mut().poll_tasks(cx)?; - - // If current_buffer is not empty, see if it can be submitted - if !self.current_buffer.is_empty() && self.tasks.len() < self.max_concurrency { - let out_buffer: Vec = std::mem::take(&mut self.current_buffer); - let inner = Arc::clone(&self.inner); - let part_idx = self.current_part_idx; - self.tasks.push(Box::pin(async move { - let upload_part = inner.put_part(out_buffer, part_idx).await?; - Ok((part_idx, upload_part)) - })); - } - - self.as_mut().poll_tasks(cx)?; - - // If tasks and current_buffer are empty, return Ready - if self.tasks.is_empty() && self.current_buffer.is_empty() { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } - } -} - -impl AsyncWrite for WriteMultiPart { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> Poll> { - // Poll current tasks - self.as_mut().poll_tasks(cx)?; - - let mut offset = 0; - - loop { - // Fill up current buffer - offset += self.as_mut().add_to_buffer(buf, offset); - - // If we don't have a full buffer or we have too many tasks, break - if self.current_buffer.len() < self.part_size - || self.tasks.len() >= self.max_concurrency - { - break; - } - - let new_buffer = Vec::with_capacity(self.part_size); - let out_buffer = std::mem::replace(&mut self.current_buffer, new_buffer); - let inner = Arc::clone(&self.inner); - let part_idx = self.current_part_idx; - self.tasks.push(Box::pin(async move { - let upload_part = inner.put_part(out_buffer, part_idx).await?; - Ok((part_idx, upload_part)) - })); - self.current_part_idx += 1; - - // We need to poll immediately after adding to setup waker - self.as_mut().poll_tasks(cx)?; - } - - // If offset is zero, then we didn't write anything because we didn't - // have capacity for more tasks and our buffer is full. - if offset == 0 && !buf.is_empty() { - Poll::Pending - } else { - Poll::Ready(Ok(offset)) - } - } - - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - // Poll current tasks - self.as_mut().poll_tasks(cx)?; - - // If tasks is empty, return Ready - if self.tasks.is_empty() { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } - } - - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - // First, poll flush - match self.as_mut().final_flush(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(res) => res?, - }; - - // If shutdown task is not set, set it - let parts = std::mem::take(&mut self.completed_parts); - let parts = parts - .into_iter() - .enumerate() - .map(|(idx, part)| { - part.ok_or_else(|| { - io::Error::new( - io::ErrorKind::Other, - format!("Missing information for upload part {idx}"), - ) - }) - }) - .collect::>()?; - - let inner = Arc::clone(&self.inner); - let completion_task = self.completion_task.get_or_insert_with(|| { - Box::pin(async move { - inner.complete(parts).await?; - Ok(()) - }) - }); - - Pin::new(completion_task).poll(cx) - } -} - -impl std::fmt::Debug for WriteMultiPart { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("WriteMultiPart") - .field("completed_parts", &self.completed_parts) - .field("tasks", &self.tasks) - .field("max_concurrency", &self.max_concurrency) - .field("current_buffer", &self.current_buffer) - .field("part_size", &self.part_size) - .field("current_part_idx", &self.current_part_idx) - .finish() - } -} - /// A low-level interface for interacting with multipart upload APIs /// /// Most use-cases should prefer [`ObjectStore::put_multipart`] as this is supported by more diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs index 38f9b07bbd05..4b38a6131953 100644 --- a/object_store/src/prefix.rs +++ b/object_store/src/prefix.rs @@ -19,12 +19,11 @@ use bytes::Bytes; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; use std::ops::Range; -use tokio::io::AsyncWrite; use crate::path::Path; use crate::{ - GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult, - Result, + GetOptions, GetResult, ListResult, ObjectMeta, ObjectStore, PutOptions, PutResult, Result, + Upload, }; #[doc(hidden)] @@ -91,18 +90,11 @@ impl ObjectStore for PrefixStore { self.inner.put_opts(&full_path, bytes, opts).await } - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)> { - let full_path = self.full_path(location); - self.inner.put_multipart(&full_path).await + async fn upload(&self, location: &Path) -> Result> { + let full_path = self.full_path(&location); + self.inner.upload(&full_path).await } - async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> { - let full_path = self.full_path(location); - self.inner.abort_multipart(&full_path, multipart_id).await - } async fn get(&self, location: &Path) -> Result { let full_path = self.full_path(location); self.inner.get(&full_path).await diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs index 252256a4599e..cef74d5db0f6 100644 --- a/object_store/src/throttle.rs +++ b/object_store/src/throttle.rs @@ -20,16 +20,15 @@ use parking_lot::Mutex; use std::ops::Range; use std::{convert::TryInto, sync::Arc}; +use crate::GetOptions; use crate::{ path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, PutOptions, - PutResult, Result, + PutResult, Result, Upload, }; -use crate::{GetOptions, MultipartId}; use async_trait::async_trait; use bytes::Bytes; use futures::{stream::BoxStream, FutureExt, StreamExt}; use std::time::Duration; -use tokio::io::AsyncWrite; /// Configuration settings for throttled store #[derive(Debug, Default, Clone, Copy)] @@ -158,14 +157,7 @@ impl ObjectStore for ThrottledStore { self.inner.put_opts(location, bytes, opts).await } - async fn put_multipart( - &self, - _location: &Path, - ) -> Result<(MultipartId, Box)> { - Err(super::Error::NotImplemented) - } - - async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> { + async fn upload(&self, _location: &Path) -> Result> { Err(super::Error::NotImplemented) } diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs new file mode 100644 index 000000000000..7d4fc62e7c83 --- /dev/null +++ b/object_store/src/upload.rs @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{PutResult, Result}; +use async_trait::async_trait; +use bytes::Bytes; +use futures::future::BoxFuture; +use tokio::task::JoinSet; + +/// An upload part request +pub type UploadPart = BoxFuture<'static, Result<()>>; + +#[async_trait] +pub trait Upload: Send + std::fmt::Debug { + /// Upload the next part + /// + /// Returns a stream + /// + /// Most stores require that all parts excluding the last are at least 5 MiB, and some + /// further require that all parts excluding the last be the same size, e.g. [R2]. + /// Clients wanting to maximise compatibility should therefore perform writes in + /// fixed size blocks larger than 5 MiB. + /// + /// Implementations may invoke this method multiple times and then await on the + /// returned futures in parallel + /// + /// ```no_run + /// # use futures::StreamExt; + /// # use object_store::Upload; + /// # + /// # async fn test() { + /// # + /// let mut upload: Box<&dyn Upload> = todo!(); + /// let mut p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into()); + /// let mut p2 = upload.put_part(vec![1; 10 * 1024 * 1024].into()); + /// + /// let (u1, u2) = futures::future::join(p1.next(), p2.next()).await; + /// u1.unwrap().unwrap(); + /// u2.unwrap().unwrap(); + /// let result = upload.complete().await.unwrap(); + /// # } + /// ``` + /// + /// [R2]: https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations + fn put_part(&mut self, data: Bytes) -> UploadPart; + + /// Complete the multipart upload + async fn complete(&mut self) -> Result; + + /// Abort the multipart upload + /// + /// It is implementation defined behaviour if called concurrently with [`UploadPart::execute`] + async fn abort(&mut self) -> Result<()>; +} + +/// A synchronous write API for uploading data in parallel +/// +/// Makes use of [`JoinSet`] under the hood to multiplex upload tasks, +/// avoiding issues caused by sharing a single tokio's cooperative task +/// budget across multiple IO operations. +/// +/// The design also takes inspiration from [`Sink`] with [`ChunkedUpload::wait_for_capacity`] +/// allowing back pressure on producers, prior to buffering the next part. However, unlike +/// [`Sink`] this back pressure is optional, allowing integration with synchronous producers +/// +/// [`Sink`]: futures::sink::Sink +#[derive(Debug)] +pub struct ChunkedUpload { + upload: Box, + + buffer: Vec, + + tasks: JoinSet>, +} + +impl ChunkedUpload { + /// Create a new [`ChunkedUpload`] + pub fn new(upload: Box) -> Self { + Self::new_with_capacity(upload, 5 * 1024 * 1024) + } + + /// Create a new [`ChunkedUpload`] that will upload in fixed `capacity` sized chunks + pub fn new_with_capacity(upload: Box, capacity: usize) -> Self { + Self { + upload, + buffer: Vec::with_capacity(capacity), + tasks: Default::default(), + } + } + + /// Wait until there are `max_concurrency` or fewer requests in-flight + pub async fn wait_for_capacity(&mut self, max_concurrency: usize) -> Result<()> { + while self.tasks.len() > max_concurrency { + self.tasks.join_next().await.unwrap()??; + } + Ok(()) + } + + /// Write data to this [`ChunkedUpload`] + /// + /// Back pressure can optionally be applied to producers by calling + /// [`Self::wait_for_capacity`] prior to calling this method + pub fn write(&mut self, mut buf: &[u8]) { + while !buf.is_empty() { + let capacity = self.buffer.capacity(); + let remaining = capacity - self.buffer.len(); + let to_read = buf.len().min(remaining); + self.buffer.extend_from_slice(&buf[..to_read]); + if to_read == remaining { + let part = std::mem::replace(&mut self.buffer, Vec::with_capacity(capacity)); + self.put_part(part.into()) + } + buf = &buf[to_read..] + } + } + + fn put_part(&mut self, part: Bytes) { + self.tasks.spawn(self.upload.put_part(part)); + } + + /// Abort this upload + pub async fn abort(mut self) -> Result<()> { + self.tasks.shutdown().await; + self.upload.abort().await + } + + /// Flush final chunk, and await completion of all in-flight requests + pub async fn finish(mut self) -> Result { + if !self.buffer.is_empty() { + let part = std::mem::take(&mut self.buffer); + self.put_part(part.into()) + } + + self.wait_for_capacity(0).await?; + self.upload.complete().await + } +} From b172a6e5887929c65bc67acca0e676eab913adc6 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sun, 17 Mar 2024 15:40:53 +1300 Subject: [PATCH 02/15] Make BufWriter abortable --- object_store/src/buffered.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index f86a7cb7ff43..fee81cdaa772 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -249,6 +249,19 @@ impl BufWriter { state: BufWriterState::Buffer(path, Vec::new()), } } + + /// Abort this writer, cleaning up any partially uploaded state + /// + /// # Panic + /// + /// Panics if this writer has already been shutdown or aborted + pub async fn abort(&mut self) -> crate::Result<()> { + match &mut self.state { + BufWriterState::Buffer(_, _) | BufWriterState::Prepare(_) => Ok(()), + BufWriterState::Flush(_) => panic!("Already shut down"), + BufWriterState::Write(x) => x.take().unwrap().abort().await, + } + } } impl AsyncWrite for BufWriter { From 1c8a965b2e2490fa39d66a0f0b531ef93ca0d9c3 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 18 Mar 2024 15:38:18 +1300 Subject: [PATCH 03/15] Flesh out cloud implementations --- object_store/src/aws/mod.rs | 87 +++++++++++++++++----------- object_store/src/azure/mod.rs | 65 +++++++++++++-------- object_store/src/buffered.rs | 4 +- object_store/src/http/mod.rs | 3 +- object_store/src/lib.rs | 2 +- object_store/tests/get_range_file.rs | 9 +-- 6 files changed, 99 insertions(+), 71 deletions(-) diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index b11f4513b6df..7419d6463dcc 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -38,18 +38,17 @@ use futures::{StreamExt, TryStreamExt}; use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH}; use reqwest::{Method, StatusCode}; use std::{sync::Arc, time::Duration}; -use tokio::io::AsyncWrite; use url::Url; use crate::aws::client::{RequestError, S3Client}; use crate::client::get::GetClientExt; use crate::client::list::ListClientExt; use crate::client::CredentialProvider; -use crate::multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart}; +use crate::multipart::{MultiPartStore, PartId}; use crate::signer::Signer; use crate::{ Error, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path, PutMode, - PutOptions, PutResult, Result, + PutOptions, PutResult, Result, Upload, UploadPart, }; static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging"); @@ -85,6 +84,7 @@ const STORE: &str = "S3"; /// [`CredentialProvider`] for [`AmazonS3`] pub type AwsCredentialProvider = Arc>; +use crate::client::parts::Parts; pub use credential::{AwsAuthorizer, AwsCredential}; /// Interface for [Amazon S3](https://aws.amazon.com/s3/). @@ -211,25 +211,18 @@ impl ObjectStore for AmazonS3 { } } - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)> { - let id = self.client.create_multipart(location).await?; - - let upload = S3MultiPartUpload { - location: location.clone(), - upload_id: id.clone(), - client: Arc::clone(&self.client), - }; - - Ok((id, Box::new(WriteMultiPart::new(upload, 8)))) - } - - async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> { - self.client - .delete_request(location, &[("uploadId", multipart_id)]) - .await + async fn upload(&self, location: &Path) -> Result> { + let upload_id = self.client.create_multipart(location).await?; + + Ok(Box::new(S3MultiPartUpload { + part_idx: 0, + state: Arc::new(UploadState { + client: Arc::clone(&self.client), + location: location.clone(), + upload_id: upload_id.clone(), + parts: Default::default(), + }), + })) } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { @@ -319,25 +312,50 @@ impl ObjectStore for AmazonS3 { } } +#[derive(Debug)] struct S3MultiPartUpload { + part_idx: usize, + state: Arc, +} + +#[derive(Debug)] +struct UploadState { + parts: Parts, location: Path, upload_id: String, client: Arc, } #[async_trait] -impl PutPart for S3MultiPartUpload { - async fn put_part(&self, buf: Vec, part_idx: usize) -> Result { - self.client - .put_part(&self.location, &self.upload_id, part_idx, buf.into()) +impl Upload for S3MultiPartUpload { + fn put_part(&mut self, data: Bytes) -> UploadPart { + let idx = self.part_idx; + self.part_idx += 1; + let state = Arc::clone(&self.state); + Box::pin(async move { + let part = state + .client + .put_part(&state.location, &state.upload_id, idx, data) + .await?; + state.parts.put(idx, part); + Ok(()) + }) + } + + async fn complete(&mut self) -> Result { + let parts = self.state.parts.finish(self.part_idx)?; + + self.state + .client + .complete_multipart(&self.state.location, &self.state.upload_id, parts) .await } - async fn complete(&self, completed_parts: Vec) -> Result<()> { - self.client - .complete_multipart(&self.location, &self.upload_id, completed_parts) - .await?; - Ok(()) + async fn abort(&mut self) -> Result<()> { + self.state + .client + .delete_request(&self.state.location, &[("uploadId", &self.state.upload_id)]) + .await } } @@ -377,7 +395,6 @@ mod tests { use crate::{client::get::GetClient, tests::*}; use bytes::Bytes; use hyper::HeaderMap; - use tokio::io::AsyncWriteExt; const NON_EXISTENT_NAME: &str = "nonexistentname"; @@ -542,9 +559,9 @@ mod tests { store.put(&locations[0], data.clone()).await.unwrap(); store.copy(&locations[0], &locations[1]).await.unwrap(); - let (_, mut writer) = store.put_multipart(&locations[2]).await.unwrap(); - writer.write_all(&data).await.unwrap(); - writer.shutdown().await.unwrap(); + let mut upload = store.upload(&locations[2]).await.unwrap(); + upload.put_part(data.clone()).await.unwrap(); + upload.complete().await.unwrap(); for location in &locations { let res = store diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 423052bfee68..8bf139884820 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -27,7 +27,7 @@ use crate::{ path::Path, signer::Signer, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult, - Result, + Result, Upload, UploadPart, }; use async_trait::async_trait; use bytes::Bytes; @@ -36,7 +36,6 @@ use reqwest::Method; use std::fmt::Debug; use std::sync::Arc; use std::time::Duration; -use tokio::io::AsyncWrite; use url::Url; use crate::client::get::GetClientExt; @@ -50,6 +49,8 @@ mod credential; /// [`CredentialProvider`] for [`MicrosoftAzure`] pub type AzureCredentialProvider = Arc>; +use crate::azure::client::AzureClient; +use crate::client::parts::Parts; pub use builder::{AzureConfigKey, MicrosoftAzureBuilder}; pub use credential::AzureCredential; @@ -90,22 +91,17 @@ impl ObjectStore for MicrosoftAzure { self.client.put_blob(location, bytes, opts).await } - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)> { - let inner = AzureMultiPartUpload { - client: Arc::clone(&self.client), - location: location.to_owned(), - }; - Ok((String::new(), Box::new(WriteMultiPart::new(inner, 8)))) + async fn upload(&self, location: &Path) -> Result> { + Ok(Box::new(AzureMultiPartUpload { + part_idx: 0, + state: Arc::new(UploadState { + client: Arc::clone(&self.client), + location: location.clone(), + parts: Default::default(), + }), + })) } - async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> { - // There is no way to drop blocks that have been uploaded. Instead, they simply - // expire in 7 days. - Ok(()) - } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { self.client.get_opts(location, options).await @@ -193,20 +189,43 @@ impl Signer for MicrosoftAzure { /// put_multipart_part -> PUT block /// complete -> PUT block list /// abort -> No equivalent; blocks are simply dropped after 7 days -#[derive(Debug, Clone)] +#[derive(Debug)] struct AzureMultiPartUpload { - client: Arc, + part_idx: usize, + state: Arc, +} + +#[derive(Debug)] +struct UploadState { location: Path, + parts: Parts, + client: Arc, } #[async_trait] -impl PutPart for AzureMultiPartUpload { - async fn put_part(&self, buf: Vec, idx: usize) -> Result { - self.client.put_block(&self.location, idx, buf.into()).await +impl Upload for AzureMultiPartUpload { + fn put_part(&mut self, data: Bytes) -> UploadPart { + let idx = self.part_idx; + self.part_idx += 1; + let state = Arc::clone(&self.state); + Box::pin(async move { + let part = state.client.put_block(&state.location, idx, data).await?; + state.parts.put(idx, part); + Ok(()) + }) + } + + async fn complete(&mut self) -> Result { + let parts = self.state.parts.finish(self.part_idx)?; + + self.state + .client + .put_block_list(&self.state.location, parts) + .await } - async fn complete(&self, parts: Vec) -> Result<()> { - self.client.put_block_list(&self.location, parts).await?; + async fn abort(&mut self) -> Result<()> { + // Nothing to do Ok(()) } } diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index e7f26649da6e..6f6e6824c9b4 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -27,7 +27,7 @@ use std::io::{Error, ErrorKind, SeekFrom}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, AsyncWriteExt, ReadBuf}; +use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; /// The default buffer size used by [`BufReader`] pub const DEFAULT_BUFFER_SIZE: usize = 1024 * 1024; @@ -367,7 +367,7 @@ mod tests { use super::*; use crate::memory::InMemory; use crate::path::Path; - use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt}; + use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; #[tokio::test] async fn test_buf_reader() { diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs index ba549abed783..e5be8ce86106 100644 --- a/object_store/src/http/mod.rs +++ b/object_store/src/http/mod.rs @@ -37,7 +37,6 @@ use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; use snafu::{OptionExt, ResultExt, Snafu}; -use tokio::io::AsyncWrite; use url::Url; use crate::client::get::GetClientExt; @@ -45,7 +44,7 @@ use crate::client::header::get_etag; use crate::http::client::Client; use crate::path::Path; use crate::{ - ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, + ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, ObjectMeta, ObjectStore, PutMode, PutOptions, PutResult, Result, RetryConfig, Upload, }; diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 8bcaa45ff4ce..a6f50110e267 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -2197,7 +2197,7 @@ mod tests { pub(crate) async fn tagging(storage: &dyn ObjectStore, validate: bool, get_tags: F) where F: Fn(Path) -> Fut + Send + Sync, - Fut: Future> + Send, + Fut: std::future::Future> + Send, { use bytes::Buf; use serde::Deserialize; diff --git a/object_store/tests/get_range_file.rs b/object_store/tests/get_range_file.rs index f73d78578f08..846648d1d285 100644 --- a/object_store/tests/get_range_file.rs +++ b/object_store/tests/get_range_file.rs @@ -25,7 +25,6 @@ use object_store::path::Path; use object_store::*; use std::fmt::Formatter; use tempfile::tempdir; -use tokio::io::AsyncWrite; #[derive(Debug)] struct MyStore(LocalFileSystem); @@ -42,16 +41,10 @@ impl ObjectStore for MyStore { self.0.put_opts(path, data, opts).await } - async fn put_multipart( - &self, - _: &Path, - ) -> Result<(MultipartId, Box)> { + async fn upload(&self, _location: &Path) -> Result> { todo!() } - async fn abort_multipart(&self, _: &Path, _: &MultipartId) -> Result<()> { - todo!() - } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { self.0.get_opts(location, options).await From 764289a8e8d2aad95090184d335bb98fccc3c3eb Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 18 Mar 2024 15:49:16 +1300 Subject: [PATCH 04/15] Review feedback --- object_store/src/aws/mod.rs | 17 +++++++-------- object_store/src/azure/mod.rs | 4 ++-- object_store/src/buffered.rs | 2 +- object_store/src/chunked.rs | 4 ++-- object_store/src/gcp/mod.rs | 4 ++-- object_store/src/http/mod.rs | 2 +- object_store/src/lib.rs | 14 ++++++------- object_store/src/limit.rs | 4 ++-- object_store/src/local.rs | 10 ++++----- object_store/src/memory.rs | 2 +- object_store/src/prefix.rs | 4 ++-- object_store/src/throttle.rs | 2 +- object_store/src/upload.rs | 31 ++++++++++++++++++++++++++-- object_store/tests/get_range_file.rs | 2 +- 14 files changed, 63 insertions(+), 39 deletions(-) diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 7419d6463dcc..fd95f54d5959 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -17,17 +17,14 @@ //! An object store implementation for S3 //! -//! ## Multi-part uploads +//! ## Multipart uploads //! -//! Multi-part uploads can be initiated with the [ObjectStore::put_multipart] method. -//! Data passed to the writer is automatically buffered to meet the minimum size -//! requirements for a part. Multiple parts are uploaded concurrently. +//! Multipart uploads can be initiated with the [ObjectStore::put_multipart] method. //! //! If the writer fails for any reason, you may have parts uploaded to AWS but not -//! used that you may be charged for. Use the [ObjectStore::abort_multipart] method -//! to abort the upload and drop those unneeded parts. In addition, you may wish to -//! consider implementing [automatic cleanup] of unused parts that are older than one -//! week. +//! used that you will be charged for. [`Upload::abort`] may be invoked to drop +//! these unneeded parts, however, it is recommended that you consider implementing +//! [automatic cleanup] of unused parts that are older than some threshold. //! //! [automatic cleanup]: https://aws.amazon.com/blogs/aws/s3-lifecycle-management-update-support-for-multipart-uploads-and-delete-markers/ @@ -211,7 +208,7 @@ impl ObjectStore for AmazonS3 { } } - async fn upload(&self, location: &Path) -> Result> { + async fn put_multipart(&self, location: &Path) -> Result> { let upload_id = self.client.create_multipart(location).await?; Ok(Box::new(S3MultiPartUpload { @@ -559,7 +556,7 @@ mod tests { store.put(&locations[0], data.clone()).await.unwrap(); store.copy(&locations[0], &locations[1]).await.unwrap(); - let mut upload = store.upload(&locations[2]).await.unwrap(); + let mut upload = store.put_multipart(&locations[2]).await.unwrap(); upload.put_part(data.clone()).await.unwrap(); upload.complete().await.unwrap(); diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 8bf139884820..9ef96311fe03 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -19,7 +19,7 @@ //! //! ## Streaming uploads //! -//! [ObjectStore::upload] will upload data in blocks and write a blob from those blocks. +//! [ObjectStore::put_multipart] will upload data in blocks and write a blob from those blocks. //! //! Unused blocks will automatically be dropped after 7 days. use crate::{ @@ -91,7 +91,7 @@ impl ObjectStore for MicrosoftAzure { self.client.put_blob(location, bytes, opts).await } - async fn upload(&self, location: &Path) -> Result> { + async fn put_multipart(&self, location: &Path) -> Result> { Ok(Box::new(AzureMultiPartUpload { part_idx: 0, state: Arc::new(UploadState { diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index 6f6e6824c9b4..16b191cc5ef2 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -294,7 +294,7 @@ impl AsyncWrite for BufWriter { let path = std::mem::take(path); let store = Arc::clone(&self.store); self.state = BufWriterState::Prepare(Box::pin(async move { - let upload = store.upload(&path).await?; + let upload = store.put_multipart(&path).await?; let mut chunked = ChunkedUpload::new(upload); chunked.write(&buffer); Ok(chunked) diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs index 8ae3026cb0f2..4ff0d2eb8da0 100644 --- a/object_store/src/chunked.rs +++ b/object_store/src/chunked.rs @@ -66,8 +66,8 @@ impl ObjectStore for ChunkedStore { self.inner.put_opts(location, bytes, opts).await } - async fn upload(&self, location: &Path) -> Result> { - self.inner.upload(location).await + async fn put_multipart(&self, location: &Path) -> Result> { + self.inner.put_multipart(location).await } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 0feb057d1fec..5c9a544f58d1 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -20,7 +20,7 @@ //! ## Multi-part uploads //! //! [Multi-part uploads](https://cloud.google.com/storage/docs/multipart-uploads) -//! can be initiated with the [ObjectStore::upload] method. If neither [`Upload::complete`] +//! can be initiated with the [ObjectStore::put_multipart] method. If neither [`Upload::complete`] //! nor [`Upload::abort`] is invoked, you may have parts uploaded to GCS but not used, //! that you will be charged for. It is recommended you configure a [lifecycle rule] to //! abort incomplete multipart uploads after a certain period of time to avoid being @@ -138,7 +138,7 @@ impl ObjectStore for GoogleCloudStorage { self.client.put(location, bytes, opts).await } - async fn upload(&self, location: &Path) -> Result> { + async fn put_multipart(&self, location: &Path) -> Result> { let upload_id = self.client.multipart_initiate(location).await?; Ok(Box::new(GCSMultipartUpload { diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs index e5be8ce86106..64707dd43361 100644 --- a/object_store/src/http/mod.rs +++ b/object_store/src/http/mod.rs @@ -114,7 +114,7 @@ impl ObjectStore for HttpStore { }) } - async fn upload(&self, _location: &Path) -> Result> { + async fn put_multipart(&self, _location: &Path) -> Result> { Err(crate::Error::NotImplemented) } diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index a6f50110e267..253c97dfc1c0 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -548,7 +548,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { /// /// Client should prefer [`ObjectStore::put`] for small payloads, as streaming uploads /// typically require multiple separate requests. See [`Upload`] for more information - async fn upload(&self, location: &Path) -> Result>; + async fn put_multipart(&self, location: &Path) -> Result>; /// Return the bytes that are stored at the specified location. async fn get(&self, location: &Path) -> Result { @@ -733,8 +733,8 @@ macro_rules! as_ref_impl { self.as_ref().put_opts(location, bytes, opts).await } - async fn upload(&self, location: &Path) -> Result> { - self.as_ref().upload(location).await + async fn put_multipart(&self, location: &Path) -> Result> { + self.as_ref().put_multipart(location).await } async fn get(&self, location: &Path) -> Result { @@ -1881,7 +1881,7 @@ mod tests { // Can write to storage let data = get_chunks(5 * 1024 * 1024, 3); let bytes_expected = data.concat(); - let mut upload = storage.upload(&location).await.unwrap(); + let mut upload = storage.put_multipart(&location).await.unwrap(); let uploads = data.into_iter().map(|x| upload.put_part(x)); futures::future::try_join_all(uploads).await.unwrap(); @@ -1908,7 +1908,7 @@ mod tests { // Sizes chosen to ensure we write three parts let data = get_chunks(3_200_000, 7); let bytes_expected = data.concat(); - let upload = storage.upload(&location).await.unwrap(); + let upload = storage.put_multipart(&location).await.unwrap(); let mut writer = ChunkedUpload::new(upload); for chunk in &data { writer.write(chunk) @@ -1919,7 +1919,7 @@ mod tests { // We can abort an empty write let location = Path::from("test_dir/test_abort_upload.txt"); - let mut upload = storage.upload(&location).await.unwrap(); + let mut upload = storage.put_multipart(&location).await.unwrap(); upload.abort().await.unwrap(); let get_res = storage.get(&location).await; assert!(get_res.is_err()); @@ -1929,7 +1929,7 @@ mod tests { )); // We can abort an in-progress write - let mut upload = storage.upload(&location).await.unwrap(); + let mut upload = storage.put_multipart(&location).await.unwrap(); upload .put_part(data.first().unwrap().clone()) .await diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs index b20a95cadd6f..0da086ffe8ce 100644 --- a/object_store/src/limit.rs +++ b/object_store/src/limit.rs @@ -81,8 +81,8 @@ impl ObjectStore for LimitStore { let _permit = self.semaphore.acquire().await.unwrap(); self.inner.put_opts(location, bytes, opts).await } - async fn upload(&self, location: &Path) -> Result> { - let upload = self.inner.upload(location).await?; + async fn put_multipart(&self, location: &Path) -> Result> { + let upload = self.inner.put_multipart(location).await?; Ok(Box::new(LimitUpload { semaphore: Arc::clone(&self.semaphore), upload, diff --git a/object_store/src/local.rs b/object_store/src/local.rs index 76394d72ecae..f04dc0df5fc9 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -395,7 +395,7 @@ impl ObjectStore for LocalFileSystem { .await } - async fn upload(&self, location: &Path) -> Result> { + async fn put_multipart(&self, location: &Path) -> Result> { let dest = self.path_to_filesystem(location)?; let (file, src) = new_staged_upload(&dest)?; Ok(Box::new(LocalUpload::new(src, dest, file))) @@ -1005,7 +1005,7 @@ mod tests { // Can't use stream_get test as ChunkedUpload uses a tokio JoinSet let p = Path::from("manual_upload"); - let mut upload = integration.upload(&p).await.unwrap(); + let mut upload = integration.put_multipart(&p).await.unwrap(); upload.put_part(Bytes::from_static(b"123")).await.unwrap(); upload.put_part(Bytes::from_static(b"45678")).await.unwrap(); let r = upload.complete().await.unwrap(); @@ -1310,10 +1310,10 @@ mod tests { let location = Path::from("some_file"); let data = Bytes::from("arbitrary data"); - let mut u1 = integration.upload(&location).await.unwrap(); + let mut u1 = integration.put_multipart(&location).await.unwrap(); u1.put_part(data.clone()).await.unwrap(); - let mut u2 = integration.upload(&location).await.unwrap(); + let mut u2 = integration.put_multipart(&location).await.unwrap(); u2.put_part(data).await.unwrap(); let list = flatten_list_stream(&integration, None).await.unwrap(); @@ -1422,7 +1422,7 @@ mod not_wasm_tests { let location = Path::from("some_file"); let data = Bytes::from_static(b"hello"); - let mut upload = integration.upload(&location).await.unwrap(); + let mut upload = integration.put_multipart(&location).await.unwrap(); upload.put_part(data).await.unwrap(); let file_count = std::fs::read_dir(root.path()).unwrap().count(); diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index 9e7fc2701b7c..aff5b7cba424 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -207,7 +207,7 @@ impl ObjectStore for InMemory { }) } - async fn upload(&self, location: &Path) -> Result> { + async fn put_multipart(&self, location: &Path) -> Result> { Ok(Box::new(InMemoryUpload { location: location.clone(), parts: vec![], diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs index 4b38a6131953..9dc48edf1b61 100644 --- a/object_store/src/prefix.rs +++ b/object_store/src/prefix.rs @@ -90,9 +90,9 @@ impl ObjectStore for PrefixStore { self.inner.put_opts(&full_path, bytes, opts).await } - async fn upload(&self, location: &Path) -> Result> { + async fn put_multipart(&self, location: &Path) -> Result> { let full_path = self.full_path(&location); - self.inner.upload(&full_path).await + self.inner.put_multipart(&full_path).await } async fn get(&self, location: &Path) -> Result { diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs index cef74d5db0f6..b3914b4111e5 100644 --- a/object_store/src/throttle.rs +++ b/object_store/src/throttle.rs @@ -157,7 +157,7 @@ impl ObjectStore for ThrottledStore { self.inner.put_opts(location, bytes, opts).await } - async fn upload(&self, _location: &Path) -> Result> { + async fn put_multipart(&self, _location: &Path) -> Result> { Err(super::Error::NotImplemented) } diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index 7d4fc62e7c83..19884e9e5c31 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -24,6 +24,18 @@ use tokio::task::JoinSet; /// An upload part request pub type UploadPart = BoxFuture<'static, Result<()>>; +/// A trait allowing writing an object in fixed size chunks +/// +/// Consecutive chunks of data can be written by calling [`Upload::put_part`] and polling +/// the returned futures to completion. Multiple futures returned by [`Upload::put_part`] +/// may be polled in parallel, allowing for concurrent uploads. +/// +/// Once all part uploads have been polled to completion, the upload can be completed by +/// calling [`Upload::complete`]. This will make the entire uploaded object visible +/// as an atomic operation.It is implementation behind behaviour if [`Upload::complete`] +/// is called before all [`UploadPart`] have been polled to completion. +/// +/// If #[async_trait] pub trait Upload: Send + std::fmt::Debug { /// Upload the next part @@ -59,15 +71,30 @@ pub trait Upload: Send + std::fmt::Debug { fn put_part(&mut self, data: Bytes) -> UploadPart; /// Complete the multipart upload + /// + /// It is implementation defined behaviour if this method is called before polling + /// all [`UploadPart`] returned by [`Upload::put_part`] to completion. Additionally, + /// it is implementation defined behaviour to call [`Upload::complete`] on an already + /// completed or aborted [`Upload`]. async fn complete(&mut self) -> Result; /// Abort the multipart upload /// - /// It is implementation defined behaviour if called concurrently with [`UploadPart::execute`] + /// If an [`Upload`] is dropped without [`Upload::complete`] being called, + /// some implementations will automatically reap any uploaded parts. However, + /// this is not always possible, e.g. for S3 and GCS. [`Upload::abort`] can + /// therefore be invoked to perform this cleanup. + /// + /// It is recommended that where possible users configure appropriate lifecycle + /// rules to automatically reap unused parts older than some threshold, as this + /// will more reliably handle different failure modes. + /// + /// It is implementation defined behaviour to call [`Upload::abort`] on an already + /// completed or aborted [`Upload`] async fn abort(&mut self) -> Result<()>; } -/// A synchronous write API for uploading data in parallel +/// A synchronous write API for uploading data in parallel in fixed size chunks /// /// Makes use of [`JoinSet`] under the hood to multiplex upload tasks, /// avoiding issues caused by sharing a single tokio's cooperative task diff --git a/object_store/tests/get_range_file.rs b/object_store/tests/get_range_file.rs index 846648d1d285..e644e9d6def1 100644 --- a/object_store/tests/get_range_file.rs +++ b/object_store/tests/get_range_file.rs @@ -41,7 +41,7 @@ impl ObjectStore for MyStore { self.0.put_opts(path, data, opts).await } - async fn upload(&self, _location: &Path) -> Result> { + async fn put_multipart(&self, _location: &Path) -> Result> { todo!() } From fbc1ec83583f5f6156b6c4ed38d4f5d5d7aa9af8 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 18 Mar 2024 16:25:34 +1300 Subject: [PATCH 05/15] Misc tweaks and fixes --- object_store/src/aws/mod.rs | 4 ++-- object_store/src/azure/mod.rs | 4 ++-- object_store/src/gcp/mod.rs | 4 ++-- object_store/src/lib.rs | 4 ++-- object_store/src/limit.rs | 40 ++--------------------------------- object_store/src/memory.rs | 4 ++-- object_store/src/multipart.rs | 5 +++-- 7 files changed, 15 insertions(+), 50 deletions(-) diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index fd95f54d5959..b797ecd983c6 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -41,7 +41,7 @@ use crate::aws::client::{RequestError, S3Client}; use crate::client::get::GetClientExt; use crate::client::list::ListClientExt; use crate::client::CredentialProvider; -use crate::multipart::{MultiPartStore, PartId}; +use crate::multipart::{MultipartStore, PartId}; use crate::signer::Signer; use crate::{ Error, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path, PutMode, @@ -357,7 +357,7 @@ impl Upload for S3MultiPartUpload { } #[async_trait] -impl MultiPartStore for AmazonS3 { +impl MultipartStore for AmazonS3 { async fn create_multipart(&self, path: &Path) -> Result { self.client.create_multipart(path).await } diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 9ef96311fe03..c6f73585dae7 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -23,7 +23,7 @@ //! //! Unused blocks will automatically be dropped after 7 days. use crate::{ - multipart::{MultiPartStore, PartId}, + multipart::{MultipartStore, PartId}, path::Path, signer::Signer, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult, @@ -231,7 +231,7 @@ impl Upload for AzureMultiPartUpload { } #[async_trait] -impl MultiPartStore for MicrosoftAzure { +impl MultipartStore for MicrosoftAzure { async fn create_multipart(&self, _: &Path) -> Result { Ok(String::new()) } diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 5c9a544f58d1..97461f196ee4 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -49,7 +49,7 @@ use futures::stream::BoxStream; use crate::client::get::GetClientExt; use crate::client::list::ListClientExt; use crate::client::parts::Parts; -use crate::multipart::MultiPartStore; +use crate::multipart::MultipartStore; pub use builder::{GoogleCloudStorageBuilder, GoogleConfigKey}; pub use credential::GcpCredential; @@ -186,7 +186,7 @@ impl ObjectStore for GoogleCloudStorage { } #[async_trait] -impl MultiPartStore for GoogleCloudStorage { +impl MultipartStore for GoogleCloudStorage { async fn create_multipart(&self, path: &Path) -> Result { self.client.multipart_initiate(path).await } diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 253c97dfc1c0..d1b1ddd37988 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -1199,7 +1199,7 @@ mod test_util { #[cfg(test)] mod tests { use super::*; - use crate::multipart::MultiPartStore; + use crate::multipart::MultipartStore; use crate::test_util::flatten_list_stream; use chrono::TimeZone; use futures::stream::FuturesUnordered; @@ -2130,7 +2130,7 @@ mod tests { storage.delete(&path2).await.unwrap(); } - pub(crate) async fn multipart(storage: &dyn ObjectStore, multipart: &dyn MultiPartStore) { + pub(crate) async fn multipart(storage: &dyn ObjectStore, multipart: &dyn MultipartStore) { let path = Path::from("test_multipart"); let chunk_size = 5 * 1024 * 1024; diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs index 0da086ffe8ce..d2780ac3a78e 100644 --- a/object_store/src/limit.rs +++ b/object_store/src/limit.rs @@ -24,12 +24,10 @@ use crate::{ use async_trait::async_trait; use bytes::Bytes; use futures::{FutureExt, Stream}; -use std::io::{Error, IoSlice}; use std::ops::Range; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use tokio::io::AsyncWrite; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; /// Store wrapper that wraps an inner store and limits the maximum number of concurrent @@ -215,42 +213,7 @@ impl Stream for PermitWrapper { } } -impl AsyncWrite for PermitWrapper { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut self.inner).poll_write(cx, buf) - } - - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - Pin::new(&mut self.inner).poll_flush(cx) - } - - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - Pin::new(&mut self.inner).poll_shutdown(cx) - } - - fn poll_write_vectored( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - bufs: &[IoSlice<'_>], - ) -> Poll> { - Pin::new(&mut self.inner).poll_write_vectored(cx, bufs) - } - - fn is_write_vectored(&self) -> bool { - self.inner.is_write_vectored() - } -} - +/// An [`Upload`] wrapper that limits the maximum number of concurrent requests #[derive(Debug)] pub struct LimitUpload { upload: Box, @@ -258,6 +221,7 @@ pub struct LimitUpload { } impl LimitUpload { + /// Create a new [`LimitUpload`] limiting `upload` to `max_concurrency` concurrent requests pub fn new(upload: Box, max_concurrency: usize) -> Self { Self { upload, diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index aff5b7cba424..87569ba26402 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -29,7 +29,7 @@ use snafu::{OptionExt, ResultExt, Snafu}; use crate::{GetRange, GetResult, GetResultPayload, ListResult, MultipartId, ObjectMeta, ObjectStore, path::Path, PutMode, PutOptions, PutResult, Result, UpdateVersion, Upload, UploadPart}; use crate::GetOptions; -use crate::multipart::{MultiPartStore, PartId}; +use crate::multipart::{MultipartStore, PartId}; use crate::util::InvalidGetRange; /// A specialized `Error` for in-memory object store-related errors @@ -374,7 +374,7 @@ impl ObjectStore for InMemory { } #[async_trait] -impl MultiPartStore for InMemory { +impl MultipartStore for InMemory { async fn create_multipart(&self, _path: &Path) -> Result { let mut storage = self.storage.write(); let etag = storage.next_etag; diff --git a/object_store/src/multipart.rs b/object_store/src/multipart.rs index 749fdeb9c022..eb33610eb78b 100644 --- a/object_store/src/multipart.rs +++ b/object_store/src/multipart.rs @@ -43,7 +43,7 @@ pub struct PartId { /// [`ObjectStore::put_multipart`]: crate::ObjectStore::put_multipart /// [`LocalFileSystem`]: crate::local::LocalFileSystem #[async_trait] -pub trait MultiPartStore: Send + Sync + 'static { +pub trait MultipartStore: Send + Sync + 'static { /// Creates a new multipart upload, returning the [`MultipartId`] async fn create_multipart(&self, path: &Path) -> Result; @@ -54,10 +54,11 @@ pub trait MultiPartStore: Send + Sync + 'static { /// /// Most stores require that all parts excluding the last are at least 5 MiB, and some /// further require that all parts excluding the last be the same size, e.g. [R2]. - /// [`WriteMultiPart`] performs writes in fixed size blocks of 10 MiB, and clients wanting + /// [`ChunkedUpload`] performs writes in fixed size blocks of 5 MiB, and clients wanting /// to maximise compatibility should look to do likewise. /// /// [R2]: https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations + /// [`ChunkedUpload`]: crate::upload::ChunkedUpload async fn put_part( &self, path: &Path, From 4c6a2a8a69295048f35e202ff29905b6f39e08f7 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 18 Mar 2024 16:28:19 +1300 Subject: [PATCH 06/15] Format --- object_store/src/azure/mod.rs | 1 - object_store/src/client/parts.rs | 4 ++-- object_store/src/http/mod.rs | 4 ++-- object_store/src/memory.rs | 7 +++++-- object_store/src/multipart.rs | 2 +- object_store/tests/get_range_file.rs | 1 - 6 files changed, 10 insertions(+), 9 deletions(-) diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index c6f73585dae7..7f1b54c42dc0 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -102,7 +102,6 @@ impl ObjectStore for MicrosoftAzure { })) } - async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { self.client.get_opts(location, options).await } diff --git a/object_store/src/client/parts.rs b/object_store/src/client/parts.rs index bea63a1a87a9..9fc301edcf81 100644 --- a/object_store/src/client/parts.rs +++ b/object_store/src/client/parts.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -use parking_lot::Mutex; use crate::multipart::PartId; +use parking_lot::Mutex; /// An interior mutable collection of upload parts and their corresponding part index #[derive(Debug, Default)] @@ -45,4 +45,4 @@ impl Parts { parts.sort_unstable_by_key(|(idx, _)| *idx); Ok(parts.drain(..).map(|(_, v)| v).collect()) } -} \ No newline at end of file +} diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs index 64707dd43361..9e406a9789e9 100644 --- a/object_store/src/http/mod.rs +++ b/object_store/src/http/mod.rs @@ -44,8 +44,8 @@ use crate::client::header::get_etag; use crate::http::client::Client; use crate::path::Path; use crate::{ - ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, ObjectMeta, - ObjectStore, PutMode, PutOptions, PutResult, Result, RetryConfig, Upload, + ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, ObjectMeta, ObjectStore, + PutMode, PutOptions, PutResult, Result, RetryConfig, Upload, }; mod client; diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index 87569ba26402..a55583926bc2 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -27,10 +27,13 @@ use futures::{stream::BoxStream, StreamExt}; use parking_lot::RwLock; use snafu::{OptionExt, ResultExt, Snafu}; -use crate::{GetRange, GetResult, GetResultPayload, ListResult, MultipartId, ObjectMeta, ObjectStore, path::Path, PutMode, PutOptions, PutResult, Result, UpdateVersion, Upload, UploadPart}; -use crate::GetOptions; use crate::multipart::{MultipartStore, PartId}; use crate::util::InvalidGetRange; +use crate::GetOptions; +use crate::{ + path::Path, GetRange, GetResult, GetResultPayload, ListResult, MultipartId, ObjectMeta, + ObjectStore, PutMode, PutOptions, PutResult, Result, UpdateVersion, Upload, UploadPart, +}; /// A specialized `Error` for in-memory object store-related errors #[derive(Debug, Snafu)] diff --git a/object_store/src/multipart.rs b/object_store/src/multipart.rs index eb33610eb78b..2ef0b1c65863 100644 --- a/object_store/src/multipart.rs +++ b/object_store/src/multipart.rs @@ -24,8 +24,8 @@ use async_trait::async_trait; use bytes::Bytes; -use crate::{MultipartId, PutResult, Result}; use crate::path::Path; +use crate::{MultipartId, PutResult, Result}; /// Represents a part of a file that has been successfully uploaded in a multipart upload process. #[derive(Debug, Clone)] diff --git a/object_store/tests/get_range_file.rs b/object_store/tests/get_range_file.rs index e644e9d6def1..32dd3b51b500 100644 --- a/object_store/tests/get_range_file.rs +++ b/object_store/tests/get_range_file.rs @@ -45,7 +45,6 @@ impl ObjectStore for MyStore { todo!() } - async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { self.0.get_opts(location, options).await } From 96f77d4f461145e97d6955e70fc9a668d4165f21 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 18 Mar 2024 16:30:47 +1300 Subject: [PATCH 07/15] Replace multi-part with multipart --- object_store/src/gcp/client.rs | 2 +- object_store/src/gcp/mod.rs | 4 ++-- object_store/src/lib.rs | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs index e4b0f9af7d15..def53beefe78 100644 --- a/object_store/src/gcp/client.rs +++ b/object_store/src/gcp/client.rs @@ -272,7 +272,7 @@ impl GoogleCloudStorageClient { }) } - /// Initiate a multi-part upload + /// Initiate a multipart upload pub async fn multipart_initiate(&self, path: &Path) -> Result { let credential = self.get_credential().await?; let url = self.object_url(path); diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 97461f196ee4..9f1b59d4eb9a 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -17,9 +17,9 @@ //! An object store implementation for Google Cloud Storage //! -//! ## Multi-part uploads +//! ## Multipart uploads //! -//! [Multi-part uploads](https://cloud.google.com/storage/docs/multipart-uploads) +//! [Multipart uploads](https://cloud.google.com/storage/docs/multipart-uploads) //! can be initiated with the [ObjectStore::put_multipart] method. If neither [`Upload::complete`] //! nor [`Upload::abort`] is invoked, you may have parts uploaded to GCS but not used, //! that you will be charged for. It is recommended you configure a [lifecycle rule] to diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index d1b1ddd37988..f39924420e79 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -526,7 +526,7 @@ use std::sync::Arc; /// An alias for a dynamically dispatched object store implementation. pub type DynObjectStore = dyn ObjectStore; -/// Id type for multi-part uploads. +/// Id type for multipart uploads. pub type MultipartId = String; /// Universal API to multiple object store services. From 24627f55f8abed7057299446127cf45835b796ab Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 18 Mar 2024 16:31:27 +1300 Subject: [PATCH 08/15] More docs --- object_store/src/upload.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index 19884e9e5c31..065555a18671 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -34,8 +34,6 @@ pub type UploadPart = BoxFuture<'static, Result<()>>; /// calling [`Upload::complete`]. This will make the entire uploaded object visible /// as an atomic operation.It is implementation behind behaviour if [`Upload::complete`] /// is called before all [`UploadPart`] have been polled to completion. -/// -/// If #[async_trait] pub trait Upload: Send + std::fmt::Debug { /// Upload the next part From 554decf90d32df177f831dcab5cdaacc7463e858 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 18 Mar 2024 16:32:13 +1300 Subject: [PATCH 09/15] Clippy --- object_store/src/prefix.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs index 9dc48edf1b61..b8d740b622ba 100644 --- a/object_store/src/prefix.rs +++ b/object_store/src/prefix.rs @@ -91,7 +91,7 @@ impl ObjectStore for PrefixStore { } async fn put_multipart(&self, location: &Path) -> Result> { - let full_path = self.full_path(&location); + let full_path = self.full_path(location); self.inner.put_multipart(&full_path).await } From 60b2ae79c3f1a640b0209382a66685f2ac84c0d5 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 18 Mar 2024 16:35:57 +1300 Subject: [PATCH 10/15] Rename to MultipartUpload --- object_store/src/aws/mod.rs | 10 ++++---- object_store/src/azure/mod.rs | 8 +++---- object_store/src/chunked.rs | 6 ++--- object_store/src/gcp/mod.rs | 12 +++++----- object_store/src/http/mod.rs | 6 ++--- object_store/src/lib.rs | 6 ++--- object_store/src/limit.rs | 14 ++++++------ object_store/src/local.rs | 8 +++---- object_store/src/memory.rs | 8 +++---- object_store/src/prefix.rs | 6 ++--- object_store/src/throttle.rs | 6 ++--- object_store/src/upload.rs | 34 ++++++++++++++-------------- object_store/tests/get_range_file.rs | 2 +- 13 files changed, 63 insertions(+), 63 deletions(-) diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index b797ecd983c6..b33771de9a86 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -22,7 +22,7 @@ //! Multipart uploads can be initiated with the [ObjectStore::put_multipart] method. //! //! If the writer fails for any reason, you may have parts uploaded to AWS but not -//! used that you will be charged for. [`Upload::abort`] may be invoked to drop +//! used that you will be charged for. [`MultipartUpload::abort`] may be invoked to drop //! these unneeded parts, however, it is recommended that you consider implementing //! [automatic cleanup] of unused parts that are older than some threshold. //! @@ -44,8 +44,8 @@ use crate::client::CredentialProvider; use crate::multipart::{MultipartStore, PartId}; use crate::signer::Signer; use crate::{ - Error, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path, PutMode, - PutOptions, PutResult, Result, Upload, UploadPart, + Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, + ObjectStore, Path, PutMode, PutOptions, PutResult, Result, UploadPart, }; static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging"); @@ -208,7 +208,7 @@ impl ObjectStore for AmazonS3 { } } - async fn put_multipart(&self, location: &Path) -> Result> { + async fn put_multipart(&self, location: &Path) -> Result> { let upload_id = self.client.create_multipart(location).await?; Ok(Box::new(S3MultiPartUpload { @@ -324,7 +324,7 @@ struct UploadState { } #[async_trait] -impl Upload for S3MultiPartUpload { +impl MultipartUpload for S3MultiPartUpload { fn put_part(&mut self, data: Bytes) -> UploadPart { let idx = self.part_idx; self.part_idx += 1; diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 7f1b54c42dc0..5d3a405ccc93 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -26,8 +26,8 @@ use crate::{ multipart::{MultipartStore, PartId}, path::Path, signer::Signer, - GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult, - Result, Upload, UploadPart, + GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore, + PutOptions, PutResult, Result, UploadPart, }; use async_trait::async_trait; use bytes::Bytes; @@ -91,7 +91,7 @@ impl ObjectStore for MicrosoftAzure { self.client.put_blob(location, bytes, opts).await } - async fn put_multipart(&self, location: &Path) -> Result> { + async fn put_multipart(&self, location: &Path) -> Result> { Ok(Box::new(AzureMultiPartUpload { part_idx: 0, state: Arc::new(UploadState { @@ -202,7 +202,7 @@ struct UploadState { } #[async_trait] -impl Upload for AzureMultiPartUpload { +impl MultipartUpload for AzureMultiPartUpload { fn put_part(&mut self, data: Bytes) -> UploadPart { let idx = self.part_idx; self.part_idx += 1; diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs index 4ff0d2eb8da0..6db7f4b35e24 100644 --- a/object_store/src/chunked.rs +++ b/object_store/src/chunked.rs @@ -29,8 +29,8 @@ use futures::StreamExt; use crate::path::Path; use crate::Result; use crate::{ - GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, PutOptions, - PutResult, Upload, + GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + PutOptions, PutResult, }; /// Wraps a [`ObjectStore`] and makes its get response return chunks @@ -66,7 +66,7 @@ impl ObjectStore for ChunkedStore { self.inner.put_opts(location, bytes, opts).await } - async fn put_multipart(&self, location: &Path) -> Result> { + async fn put_multipart(&self, location: &Path) -> Result> { self.inner.put_multipart(location).await } diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 9f1b59d4eb9a..d371f40f465d 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -20,8 +20,8 @@ //! ## Multipart uploads //! //! [Multipart uploads](https://cloud.google.com/storage/docs/multipart-uploads) -//! can be initiated with the [ObjectStore::put_multipart] method. If neither [`Upload::complete`] -//! nor [`Upload::abort`] is invoked, you may have parts uploaded to GCS but not used, +//! can be initiated with the [ObjectStore::put_multipart] method. If neither [`MultipartUpload::complete`] +//! nor [`MultipartUpload::abort`] is invoked, you may have parts uploaded to GCS but not used, //! that you will be charged for. It is recommended you configure a [lifecycle rule] to //! abort incomplete multipart uploads after a certain period of time to avoid being //! charged for storing partial uploads @@ -38,8 +38,8 @@ use std::sync::Arc; use crate::client::CredentialProvider; use crate::{ - multipart::PartId, path::Path, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, - ObjectStore, PutOptions, PutResult, Result, Upload, UploadPart, + multipart::PartId, path::Path, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, + ObjectMeta, ObjectStore, PutOptions, PutResult, Result, UploadPart, }; use async_trait::async_trait; use bytes::Bytes; @@ -100,7 +100,7 @@ struct UploadState { } #[async_trait] -impl Upload for GCSMultipartUpload { +impl MultipartUpload for GCSMultipartUpload { fn put_part(&mut self, data: Bytes) -> UploadPart { let idx = self.part_idx; self.part_idx += 1; @@ -138,7 +138,7 @@ impl ObjectStore for GoogleCloudStorage { self.client.put(location, bytes, opts).await } - async fn put_multipart(&self, location: &Path) -> Result> { + async fn put_multipart(&self, location: &Path) -> Result> { let upload_id = self.client.multipart_initiate(location).await?; Ok(Box::new(GCSMultipartUpload { diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs index 9e406a9789e9..626337df27f9 100644 --- a/object_store/src/http/mod.rs +++ b/object_store/src/http/mod.rs @@ -44,8 +44,8 @@ use crate::client::header::get_etag; use crate::http::client::Client; use crate::path::Path; use crate::{ - ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, ObjectMeta, ObjectStore, - PutMode, PutOptions, PutResult, Result, RetryConfig, Upload, + ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, + ObjectStore, PutMode, PutOptions, PutResult, Result, RetryConfig, }; mod client; @@ -114,7 +114,7 @@ impl ObjectStore for HttpStore { }) } - async fn put_multipart(&self, _location: &Path) -> Result> { + async fn put_multipart(&self, _location: &Path) -> Result> { Err(crate::Error::NotImplemented) } diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index f39924420e79..920ba67f8595 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -547,8 +547,8 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { /// Perform a multipart upload /// /// Client should prefer [`ObjectStore::put`] for small payloads, as streaming uploads - /// typically require multiple separate requests. See [`Upload`] for more information - async fn put_multipart(&self, location: &Path) -> Result>; + /// typically require multiple separate requests. See [`MultipartUpload`] for more information + async fn put_multipart(&self, location: &Path) -> Result>; /// Return the bytes that are stored at the specified location. async fn get(&self, location: &Path) -> Result { @@ -733,7 +733,7 @@ macro_rules! as_ref_impl { self.as_ref().put_opts(location, bytes, opts).await } - async fn put_multipart(&self, location: &Path) -> Result> { + async fn put_multipart(&self, location: &Path) -> Result> { self.as_ref().put_multipart(location).await } diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs index d2780ac3a78e..e5f6841638e1 100644 --- a/object_store/src/limit.rs +++ b/object_store/src/limit.rs @@ -18,8 +18,8 @@ //! An object store that limits the maximum concurrency of the wrapped implementation use crate::{ - BoxStream, GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, Path, - PutOptions, PutResult, Result, StreamExt, Upload, UploadPart, + BoxStream, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, + ObjectStore, Path, PutOptions, PutResult, Result, StreamExt, UploadPart, }; use async_trait::async_trait; use bytes::Bytes; @@ -79,7 +79,7 @@ impl ObjectStore for LimitStore { let _permit = self.semaphore.acquire().await.unwrap(); self.inner.put_opts(location, bytes, opts).await } - async fn put_multipart(&self, location: &Path) -> Result> { + async fn put_multipart(&self, location: &Path) -> Result> { let upload = self.inner.put_multipart(location).await?; Ok(Box::new(LimitUpload { semaphore: Arc::clone(&self.semaphore), @@ -213,16 +213,16 @@ impl Stream for PermitWrapper { } } -/// An [`Upload`] wrapper that limits the maximum number of concurrent requests +/// An [`MultipartUpload`] wrapper that limits the maximum number of concurrent requests #[derive(Debug)] pub struct LimitUpload { - upload: Box, + upload: Box, semaphore: Arc, } impl LimitUpload { /// Create a new [`LimitUpload`] limiting `upload` to `max_concurrency` concurrent requests - pub fn new(upload: Box, max_concurrency: usize) -> Self { + pub fn new(upload: Box, max_concurrency: usize) -> Self { Self { upload, semaphore: Arc::new(Semaphore::new(max_concurrency)), @@ -231,7 +231,7 @@ impl LimitUpload { } #[async_trait] -impl Upload for LimitUpload { +impl MultipartUpload for LimitUpload { fn put_part(&mut self, data: Bytes) -> UploadPart { let upload = self.upload.put_part(data); let s = Arc::clone(&self.semaphore); diff --git a/object_store/src/local.rs b/object_store/src/local.rs index f04dc0df5fc9..a25b8204bdc3 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -38,8 +38,8 @@ use crate::{ maybe_spawn_blocking, path::{absolute_path_to_url, Path}, util::InvalidGetRange, - GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, PutMode, - PutOptions, PutResult, Result, Upload, UploadPart, + GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + PutMode, PutOptions, PutResult, Result, UploadPart, }; /// A specialized `Error` for filesystem object store-related errors @@ -395,7 +395,7 @@ impl ObjectStore for LocalFileSystem { .await } - async fn put_multipart(&self, location: &Path) -> Result> { + async fn put_multipart(&self, location: &Path) -> Result> { let dest = self.path_to_filesystem(location)?; let (file, src) = new_staged_upload(&dest)?; Ok(Box::new(LocalUpload::new(src, dest, file))) @@ -714,7 +714,7 @@ impl LocalUpload { } #[async_trait] -impl Upload for LocalUpload { +impl MultipartUpload for LocalUpload { fn put_part(&mut self, data: Bytes) -> UploadPart { let offset = self.offset; self.offset += data.len() as u64; diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index a55583926bc2..6c960d4f24fb 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -31,8 +31,8 @@ use crate::multipart::{MultipartStore, PartId}; use crate::util::InvalidGetRange; use crate::GetOptions; use crate::{ - path::Path, GetRange, GetResult, GetResultPayload, ListResult, MultipartId, ObjectMeta, - ObjectStore, PutMode, PutOptions, PutResult, Result, UpdateVersion, Upload, UploadPart, + path::Path, GetRange, GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload, + ObjectMeta, ObjectStore, PutMode, PutOptions, PutResult, Result, UpdateVersion, UploadPart, }; /// A specialized `Error` for in-memory object store-related errors @@ -210,7 +210,7 @@ impl ObjectStore for InMemory { }) } - async fn put_multipart(&self, location: &Path) -> Result> { + async fn put_multipart(&self, location: &Path) -> Result> { Ok(Box::new(InMemoryUpload { location: location.clone(), parts: vec![], @@ -476,7 +476,7 @@ struct InMemoryUpload { } #[async_trait] -impl Upload for InMemoryUpload { +impl MultipartUpload for InMemoryUpload { fn put_part(&mut self, data: Bytes) -> UploadPart { self.parts.push(data); Box::pin(futures::future::ready(Ok(()))) diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs index b8d740b622ba..053f71a2d063 100644 --- a/object_store/src/prefix.rs +++ b/object_store/src/prefix.rs @@ -22,8 +22,8 @@ use std::ops::Range; use crate::path::Path; use crate::{ - GetOptions, GetResult, ListResult, ObjectMeta, ObjectStore, PutOptions, PutResult, Result, - Upload, + GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutOptions, + PutResult, Result, }; #[doc(hidden)] @@ -90,7 +90,7 @@ impl ObjectStore for PrefixStore { self.inner.put_opts(&full_path, bytes, opts).await } - async fn put_multipart(&self, location: &Path) -> Result> { + async fn put_multipart(&self, location: &Path) -> Result> { let full_path = self.full_path(location); self.inner.put_multipart(&full_path).await } diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs index b3914b4111e5..5ca1eedbf739 100644 --- a/object_store/src/throttle.rs +++ b/object_store/src/throttle.rs @@ -22,8 +22,8 @@ use std::{convert::TryInto, sync::Arc}; use crate::GetOptions; use crate::{ - path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, PutOptions, - PutResult, Result, Upload, + path::Path, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + PutOptions, PutResult, Result, }; use async_trait::async_trait; use bytes::Bytes; @@ -157,7 +157,7 @@ impl ObjectStore for ThrottledStore { self.inner.put_opts(location, bytes, opts).await } - async fn put_multipart(&self, _location: &Path) -> Result> { + async fn put_multipart(&self, _location: &Path) -> Result> { Err(super::Error::NotImplemented) } diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index 065555a18671..b6057560f24a 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -26,16 +26,16 @@ pub type UploadPart = BoxFuture<'static, Result<()>>; /// A trait allowing writing an object in fixed size chunks /// -/// Consecutive chunks of data can be written by calling [`Upload::put_part`] and polling -/// the returned futures to completion. Multiple futures returned by [`Upload::put_part`] +/// Consecutive chunks of data can be written by calling [`MultipartUpload::put_part`] and polling +/// the returned futures to completion. Multiple futures returned by [`MultipartUpload::put_part`] /// may be polled in parallel, allowing for concurrent uploads. /// /// Once all part uploads have been polled to completion, the upload can be completed by -/// calling [`Upload::complete`]. This will make the entire uploaded object visible -/// as an atomic operation.It is implementation behind behaviour if [`Upload::complete`] +/// calling [`MultipartUpload::complete`]. This will make the entire uploaded object visible +/// as an atomic operation.It is implementation behind behaviour if [`MultipartUpload::complete`] /// is called before all [`UploadPart`] have been polled to completion. #[async_trait] -pub trait Upload: Send + std::fmt::Debug { +pub trait MultipartUpload: Send + std::fmt::Debug { /// Upload the next part /// /// Returns a stream @@ -50,11 +50,11 @@ pub trait Upload: Send + std::fmt::Debug { /// /// ```no_run /// # use futures::StreamExt; - /// # use object_store::Upload; + /// # use object_store::MultipartUpload; /// # /// # async fn test() { /// # - /// let mut upload: Box<&dyn Upload> = todo!(); + /// let mut upload: Box<&dyn MultipartUpload> = todo!(); /// let mut p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into()); /// let mut p2 = upload.put_part(vec![1; 10 * 1024 * 1024].into()); /// @@ -71,24 +71,24 @@ pub trait Upload: Send + std::fmt::Debug { /// Complete the multipart upload /// /// It is implementation defined behaviour if this method is called before polling - /// all [`UploadPart`] returned by [`Upload::put_part`] to completion. Additionally, - /// it is implementation defined behaviour to call [`Upload::complete`] on an already - /// completed or aborted [`Upload`]. + /// all [`UploadPart`] returned by [`MultipartUpload::put_part`] to completion. Additionally, + /// it is implementation defined behaviour to call [`MultipartUpload::complete`] on an already + /// completed or aborted [`MultipartUpload`]. async fn complete(&mut self) -> Result; /// Abort the multipart upload /// - /// If an [`Upload`] is dropped without [`Upload::complete`] being called, + /// If an [`MultipartUpload`] is dropped without [`MultipartUpload::complete`] being called, /// some implementations will automatically reap any uploaded parts. However, - /// this is not always possible, e.g. for S3 and GCS. [`Upload::abort`] can + /// this is not always possible, e.g. for S3 and GCS. [`MultipartUpload::abort`] can /// therefore be invoked to perform this cleanup. /// /// It is recommended that where possible users configure appropriate lifecycle /// rules to automatically reap unused parts older than some threshold, as this /// will more reliably handle different failure modes. /// - /// It is implementation defined behaviour to call [`Upload::abort`] on an already - /// completed or aborted [`Upload`] + /// It is implementation defined behaviour to call [`MultipartUpload::abort`] on an already + /// completed or aborted [`MultipartUpload`] async fn abort(&mut self) -> Result<()>; } @@ -105,7 +105,7 @@ pub trait Upload: Send + std::fmt::Debug { /// [`Sink`]: futures::sink::Sink #[derive(Debug)] pub struct ChunkedUpload { - upload: Box, + upload: Box, buffer: Vec, @@ -114,12 +114,12 @@ pub struct ChunkedUpload { impl ChunkedUpload { /// Create a new [`ChunkedUpload`] - pub fn new(upload: Box) -> Self { + pub fn new(upload: Box) -> Self { Self::new_with_capacity(upload, 5 * 1024 * 1024) } /// Create a new [`ChunkedUpload`] that will upload in fixed `capacity` sized chunks - pub fn new_with_capacity(upload: Box, capacity: usize) -> Self { + pub fn new_with_capacity(upload: Box, capacity: usize) -> Self { Self { upload, buffer: Vec::with_capacity(capacity), diff --git a/object_store/tests/get_range_file.rs b/object_store/tests/get_range_file.rs index 32dd3b51b500..309a86d8fe9d 100644 --- a/object_store/tests/get_range_file.rs +++ b/object_store/tests/get_range_file.rs @@ -41,7 +41,7 @@ impl ObjectStore for MyStore { self.0.put_opts(path, data, opts).await } - async fn put_multipart(&self, _location: &Path) -> Result> { + async fn put_multipart(&self, _location: &Path) -> Result> { todo!() } From 2bcf83a2d6b8d2c6713f1c12dd8088f4c796ba32 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 18 Mar 2024 17:04:37 +1300 Subject: [PATCH 11/15] Rename ChunkedUpload to WriteMultipart --- object_store/src/buffered.rs | 8 ++++---- object_store/src/lib.rs | 2 +- object_store/src/local.rs | 2 +- object_store/src/multipart.rs | 4 ++-- object_store/src/upload.rs | 12 ++++++------ 5 files changed, 14 insertions(+), 14 deletions(-) diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index 16b191cc5ef2..c4ee6b32a07b 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -18,7 +18,7 @@ //! Utilities for performing tokio-style buffered IO use crate::path::Path; -use crate::{ChunkedUpload, ObjectMeta, ObjectStore}; +use crate::{WriteMultipart, ObjectMeta, ObjectStore}; use bytes::Bytes; use futures::future::{BoxFuture, FutureExt}; use futures::ready; @@ -232,9 +232,9 @@ enum BufWriterState { /// Buffer up to capacity bytes Buffer(Path, Vec), /// [`ObjectStore::put_multipart`] - Prepare(BoxFuture<'static, std::io::Result>), + Prepare(BoxFuture<'static, std::io::Result>), /// Write to a multipart upload - Write(Option), + Write(Option), /// [`ObjectStore::put`] Flush(BoxFuture<'static, std::io::Result<()>>), } @@ -295,7 +295,7 @@ impl AsyncWrite for BufWriter { let store = Arc::clone(&self.store); self.state = BufWriterState::Prepare(Box::pin(async move { let upload = store.put_multipart(&path).await?; - let mut chunked = ChunkedUpload::new(upload); + let mut chunked = WriteMultipart::new(upload); chunked.write(&buffer); Ok(chunked) })); diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 920ba67f8595..8eeb26e69f63 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -1909,7 +1909,7 @@ mod tests { let data = get_chunks(3_200_000, 7); let bytes_expected = data.concat(); let upload = storage.put_multipart(&location).await.unwrap(); - let mut writer = ChunkedUpload::new(upload); + let mut writer = WriteMultipart::new(upload); for chunk in &data { writer.write(chunk) } diff --git a/object_store/src/local.rs b/object_store/src/local.rs index a25b8204bdc3..a7eb4661f686 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -1003,7 +1003,7 @@ mod tests { list_uses_directories_correctly(&integration).await; list_with_delimiter(&integration).await; - // Can't use stream_get test as ChunkedUpload uses a tokio JoinSet + // Can't use stream_get test as WriteMultipart uses a tokio JoinSet let p = Path::from("manual_upload"); let mut upload = integration.put_multipart(&p).await.unwrap(); upload.put_part(Bytes::from_static(b"123")).await.unwrap(); diff --git a/object_store/src/multipart.rs b/object_store/src/multipart.rs index 2ef0b1c65863..26cce3936244 100644 --- a/object_store/src/multipart.rs +++ b/object_store/src/multipart.rs @@ -54,11 +54,11 @@ pub trait MultipartStore: Send + Sync + 'static { /// /// Most stores require that all parts excluding the last are at least 5 MiB, and some /// further require that all parts excluding the last be the same size, e.g. [R2]. - /// [`ChunkedUpload`] performs writes in fixed size blocks of 5 MiB, and clients wanting + /// [`WriteMultipart`] performs writes in fixed size blocks of 5 MiB, and clients wanting /// to maximise compatibility should look to do likewise. /// /// [R2]: https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations - /// [`ChunkedUpload`]: crate::upload::ChunkedUpload + /// [`WriteMultipart`]: crate::upload::WriteMultipart async fn put_part( &self, path: &Path, diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index b6057560f24a..b9d86b1423eb 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -98,13 +98,13 @@ pub trait MultipartUpload: Send + std::fmt::Debug { /// avoiding issues caused by sharing a single tokio's cooperative task /// budget across multiple IO operations. /// -/// The design also takes inspiration from [`Sink`] with [`ChunkedUpload::wait_for_capacity`] +/// The design also takes inspiration from [`Sink`] with [`WriteMultipart::wait_for_capacity`] /// allowing back pressure on producers, prior to buffering the next part. However, unlike /// [`Sink`] this back pressure is optional, allowing integration with synchronous producers /// /// [`Sink`]: futures::sink::Sink #[derive(Debug)] -pub struct ChunkedUpload { +pub struct WriteMultipart { upload: Box, buffer: Vec, @@ -112,13 +112,13 @@ pub struct ChunkedUpload { tasks: JoinSet>, } -impl ChunkedUpload { - /// Create a new [`ChunkedUpload`] +impl WriteMultipart { + /// Create a new [`WriteMultipart`] pub fn new(upload: Box) -> Self { Self::new_with_capacity(upload, 5 * 1024 * 1024) } - /// Create a new [`ChunkedUpload`] that will upload in fixed `capacity` sized chunks + /// Create a new [`WriteMultipart`] that will upload in fixed `capacity` sized chunks pub fn new_with_capacity(upload: Box, capacity: usize) -> Self { Self { upload, @@ -135,7 +135,7 @@ impl ChunkedUpload { Ok(()) } - /// Write data to this [`ChunkedUpload`] + /// Write data to this [`WriteMultipart`] /// /// Back pressure can optionally be applied to producers by calling /// [`Self::wait_for_capacity`] prior to calling this method From e83c993e2697b95b982da6405b1934894e31b51a Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 18 Mar 2024 17:22:20 +1300 Subject: [PATCH 12/15] Doc tweaks --- object_store/src/buffered.rs | 2 +- object_store/src/gcp/mod.rs | 10 +++++----- object_store/src/lib.rs | 15 ++++++--------- object_store/src/upload.rs | 30 +++++++++++++----------------- 4 files changed, 25 insertions(+), 32 deletions(-) diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index c4ee6b32a07b..39f8eafbef7e 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -18,7 +18,7 @@ //! Utilities for performing tokio-style buffered IO use crate::path::Path; -use crate::{WriteMultipart, ObjectMeta, ObjectStore}; +use crate::{ObjectMeta, ObjectStore, WriteMultipart}; use bytes::Bytes; use futures::future::{BoxFuture, FutureExt}; use futures::ready; diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index d371f40f465d..2058d1f8055b 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -20,11 +20,11 @@ //! ## Multipart uploads //! //! [Multipart uploads](https://cloud.google.com/storage/docs/multipart-uploads) -//! can be initiated with the [ObjectStore::put_multipart] method. If neither [`MultipartUpload::complete`] -//! nor [`MultipartUpload::abort`] is invoked, you may have parts uploaded to GCS but not used, -//! that you will be charged for. It is recommended you configure a [lifecycle rule] to -//! abort incomplete multipart uploads after a certain period of time to avoid being -//! charged for storing partial uploads +//! can be initiated with the [ObjectStore::put_multipart] method. If neither +//! [`MultipartUpload::complete`] nor [`MultipartUpload::abort`] is invoked, you may +//! have parts uploaded to GCS but not used, that you will be charged for. It is recommended +//! you configure a [lifecycle rule] to abort incomplete multipart uploads after a certain +//! period of time to avoid being charged for storing partial uploads. //! //! ## Using HTTP/2 //! diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 8eeb26e69f63..e02675d88abe 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -269,12 +269,11 @@ //! //! # Multipart Upload //! -//! Use the [`ObjectStore::put_multipart`] method to atomically write a large amount of data, -//! with implementations automatically handling parallel, chunked upload where appropriate. +//! Use the [`ObjectStore::put_multipart`] method to atomically write a large amount of data //! //! ``` //! # use object_store::local::LocalFileSystem; -//! # use object_store::ObjectStore; +//! # use object_store::{ObjectStore, WriteMultipart}; //! # use std::sync::Arc; //! # use bytes::Bytes; //! # use tokio::io::AsyncWriteExt; @@ -286,12 +285,10 @@ //! # //! let object_store: Arc = get_object_store(); //! let path = Path::from("data/large_file"); -//! let (_id, mut writer) = object_store.put_multipart(&path).await.unwrap(); -//! -//! let bytes = Bytes::from_static(b"hello"); -//! writer.write_all(&bytes).await.unwrap(); -//! writer.flush().await.unwrap(); -//! writer.shutdown().await.unwrap(); +//! let upload = object_store.put_multipart(&path).await.unwrap(); +//! let mut write = WriteMultipart::new(upload); +//! write.write(b"hello"); +//! write.finish().await.unwrap(); //! # } //! ``` //! diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index b9d86b1423eb..78356774f3c5 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -38,8 +38,6 @@ pub type UploadPart = BoxFuture<'static, Result<()>>; pub trait MultipartUpload: Send + std::fmt::Debug { /// Upload the next part /// - /// Returns a stream - /// /// Most stores require that all parts excluding the last are at least 5 MiB, and some /// further require that all parts excluding the last be the same size, e.g. [R2]. /// Clients wanting to maximise compatibility should therefore perform writes in @@ -55,13 +53,10 @@ pub trait MultipartUpload: Send + std::fmt::Debug { /// # async fn test() { /// # /// let mut upload: Box<&dyn MultipartUpload> = todo!(); - /// let mut p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into()); - /// let mut p2 = upload.put_part(vec![1; 10 * 1024 * 1024].into()); - /// - /// let (u1, u2) = futures::future::join(p1.next(), p2.next()).await; - /// u1.unwrap().unwrap(); - /// u2.unwrap().unwrap(); - /// let result = upload.complete().await.unwrap(); + /// let p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into()); + /// let p2 = upload.put_part(vec![1; 10 * 1024 * 1024].into()); + /// futures::future::try_join(p1, p2).await.unwrap(); + /// upload.complete().await.unwrap(); /// # } /// ``` /// @@ -72,23 +67,24 @@ pub trait MultipartUpload: Send + std::fmt::Debug { /// /// It is implementation defined behaviour if this method is called before polling /// all [`UploadPart`] returned by [`MultipartUpload::put_part`] to completion. Additionally, - /// it is implementation defined behaviour to call [`MultipartUpload::complete`] on an already - /// completed or aborted [`MultipartUpload`]. + /// it is implementation defined behaviour to call [`MultipartUpload::complete`] + /// on an already completed or aborted [`MultipartUpload`]. async fn complete(&mut self) -> Result; /// Abort the multipart upload /// - /// If an [`MultipartUpload`] is dropped without [`MultipartUpload::complete`] being called, + /// If a [`MultipartUpload`] is dropped without calling [`MultipartUpload::complete`], /// some implementations will automatically reap any uploaded parts. However, /// this is not always possible, e.g. for S3 and GCS. [`MultipartUpload::abort`] can /// therefore be invoked to perform this cleanup. /// - /// It is recommended that where possible users configure appropriate lifecycle - /// rules to automatically reap unused parts older than some threshold, as this - /// will more reliably handle different failure modes. + /// It is recommended that where possible users configure lifecycle rules + /// to automatically reap unused parts older than some threshold, as this + /// will more reliably handle different failure modes. See [crate::aws] and + /// [crate::gcp] for more information. /// - /// It is implementation defined behaviour to call [`MultipartUpload::abort`] on an already - /// completed or aborted [`MultipartUpload`] + /// It is implementation defined behaviour to call [`MultipartUpload::abort`] + /// on an already completed or aborted [`MultipartUpload`] async fn abort(&mut self) -> Result<()>; } From 40f7ee36a066ede70a058252a3a8fe9c0c643919 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Wed, 20 Mar 2024 10:54:17 +1300 Subject: [PATCH 13/15] Apply suggestions from code review Co-authored-by: Andrew Lamb --- object_store/src/upload.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index 78356774f3c5..c9740e5242ea 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -78,10 +78,10 @@ pub trait MultipartUpload: Send + std::fmt::Debug { /// this is not always possible, e.g. for S3 and GCS. [`MultipartUpload::abort`] can /// therefore be invoked to perform this cleanup. /// - /// It is recommended that where possible users configure lifecycle rules - /// to automatically reap unused parts older than some threshold, as this - /// will more reliably handle different failure modes. See [crate::aws] and - /// [crate::gcp] for more information. + /// Given it is not possible call `abort` in all failure scenarios (e.g. if your program is `SIGKILL`ed due to + /// OOM), it is recommended to configure your object store with lifecycle rules + /// to automatically cleanup unused parts older than some threshold. + /// See [crate::aws] and [crate::gcp] for more information. /// /// It is implementation defined behaviour to call [`MultipartUpload::abort`] /// on an already completed or aborted [`MultipartUpload`] @@ -90,9 +90,7 @@ pub trait MultipartUpload: Send + std::fmt::Debug { /// A synchronous write API for uploading data in parallel in fixed size chunks /// -/// Makes use of [`JoinSet`] under the hood to multiplex upload tasks, -/// avoiding issues caused by sharing a single tokio's cooperative task -/// budget across multiple IO operations. +/// Uses multiple tokio tasks in a [`JoinSet`] to multiplex upload tasks in parallel /// /// The design also takes inspiration from [`Sink`] with [`WriteMultipart::wait_for_capacity`] /// allowing back pressure on producers, prior to buffering the next part. However, unlike @@ -109,7 +107,7 @@ pub struct WriteMultipart { } impl WriteMultipart { - /// Create a new [`WriteMultipart`] + /// Create a new [`WriteMultipart`] that will upload using 5MB chunks pub fn new(upload: Box) -> Self { Self::new_with_capacity(upload, 5 * 1024 * 1024) } @@ -133,6 +131,10 @@ impl WriteMultipart { /// Write data to this [`WriteMultipart`] /// + /// Note this method is synchronous (not `async`) and will immediately start new uploads + /// as soon as the internal `capacity` is hit, regardless of + /// how many outstanding uploads are already in progress. + /// /// Back pressure can optionally be applied to producers by calling /// [`Self::wait_for_capacity`] prior to calling this method pub fn write(&mut self, mut buf: &[u8]) { @@ -153,7 +155,7 @@ impl WriteMultipart { self.tasks.spawn(self.upload.put_part(part)); } - /// Abort this upload + /// Abort this upload, attempting to clean up any successfully uploaded parts pub async fn abort(mut self) -> Result<()> { self.tasks.shutdown().await; self.upload.abort().await From 5ea92d46b843215dbfad06b53de375b58ecdd88f Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 20 Mar 2024 12:09:19 +1300 Subject: [PATCH 14/15] Docs --- object_store/src/upload.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index c9740e5242ea..4818c3781a34 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -74,14 +74,15 @@ pub trait MultipartUpload: Send + std::fmt::Debug { /// Abort the multipart upload /// /// If a [`MultipartUpload`] is dropped without calling [`MultipartUpload::complete`], - /// some implementations will automatically reap any uploaded parts. However, - /// this is not always possible, e.g. for S3 and GCS. [`MultipartUpload::abort`] can - /// therefore be invoked to perform this cleanup. + /// some object stores will automatically clean up any previously uploaded parts. + /// However, some stores, such as S3 and GCS, cannot perform cleanup on drop. + /// As such [`MultipartUpload::abort`] can be invoked to perform this cleanup. /// - /// Given it is not possible call `abort` in all failure scenarios (e.g. if your program is `SIGKILL`ed due to - /// OOM), it is recommended to configure your object store with lifecycle rules - /// to automatically cleanup unused parts older than some threshold. - /// See [crate::aws] and [crate::gcp] for more information. + /// It will not be possible to call `abort` in all failure scenarios, for example + /// non-graceful shutdown of the calling application. It is therefore recommended + /// object stores are configured with lifecycle rules to automatically cleanup + /// unused parts older than some threshold. See [crate::aws] and [crate::gcp] + /// for more information. /// /// It is implementation defined behaviour to call [`MultipartUpload::abort`] /// on an already completed or aborted [`MultipartUpload`] From fd1d1987ded7fb9993f4e1cd536b6df143541fed Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 20 Mar 2024 12:13:56 +1300 Subject: [PATCH 15/15] Format --- object_store/src/upload.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index 4818c3781a34..6f8bfa8a5f73 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -133,9 +133,9 @@ impl WriteMultipart { /// Write data to this [`WriteMultipart`] /// /// Note this method is synchronous (not `async`) and will immediately start new uploads - /// as soon as the internal `capacity` is hit, regardless of + /// as soon as the internal `capacity` is hit, regardless of /// how many outstanding uploads are already in progress. - /// + /// /// Back pressure can optionally be applied to producers by calling /// [`Self::wait_for_capacity`] prior to calling this method pub fn write(&mut self, mut buf: &[u8]) {