Skip to content

Commit

Permalink
Flesh out cloud implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Mar 18, 2024
1 parent f641773 commit 1c8a965
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 71 deletions.
87 changes: 52 additions & 35 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,17 @@ use futures::{StreamExt, TryStreamExt};
use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH};
use reqwest::{Method, StatusCode};
use std::{sync::Arc, time::Duration};
use tokio::io::AsyncWrite;
use url::Url;

use crate::aws::client::{RequestError, S3Client};
use crate::client::get::GetClientExt;
use crate::client::list::ListClientExt;
use crate::client::CredentialProvider;
use crate::multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart};
use crate::multipart::{MultiPartStore, PartId};
use crate::signer::Signer;
use crate::{
Error, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path, PutMode,
PutOptions, PutResult, Result,
PutOptions, PutResult, Result, Upload, UploadPart,
};

static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging");
Expand Down Expand Up @@ -85,6 +84,7 @@ const STORE: &str = "S3";

/// [`CredentialProvider`] for [`AmazonS3`]
pub type AwsCredentialProvider = Arc<dyn CredentialProvider<Credential = AwsCredential>>;
use crate::client::parts::Parts;
pub use credential::{AwsAuthorizer, AwsCredential};

/// Interface for [Amazon S3](https://aws.amazon.com/s3/).
Expand Down Expand Up @@ -211,25 +211,18 @@ impl ObjectStore for AmazonS3 {
}
}

async fn put_multipart(
&self,
location: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
let id = self.client.create_multipart(location).await?;

let upload = S3MultiPartUpload {
location: location.clone(),
upload_id: id.clone(),
client: Arc::clone(&self.client),
};

Ok((id, Box::new(WriteMultiPart::new(upload, 8))))
}

async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> {
self.client
.delete_request(location, &[("uploadId", multipart_id)])
.await
async fn upload(&self, location: &Path) -> Result<Box<dyn Upload>> {
let upload_id = self.client.create_multipart(location).await?;

Ok(Box::new(S3MultiPartUpload {
part_idx: 0,
state: Arc::new(UploadState {
client: Arc::clone(&self.client),
location: location.clone(),
upload_id: upload_id.clone(),
parts: Default::default(),
}),
}))
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
Expand Down Expand Up @@ -319,25 +312,50 @@ impl ObjectStore for AmazonS3 {
}
}

#[derive(Debug)]
struct S3MultiPartUpload {
part_idx: usize,
state: Arc<UploadState>,
}

#[derive(Debug)]
struct UploadState {
parts: Parts,
location: Path,
upload_id: String,
client: Arc<S3Client>,
}

#[async_trait]
impl PutPart for S3MultiPartUpload {
async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
self.client
.put_part(&self.location, &self.upload_id, part_idx, buf.into())
impl Upload for S3MultiPartUpload {
fn put_part(&mut self, data: Bytes) -> UploadPart {
let idx = self.part_idx;
self.part_idx += 1;
let state = Arc::clone(&self.state);
Box::pin(async move {
let part = state
.client
.put_part(&state.location, &state.upload_id, idx, data)
.await?;
state.parts.put(idx, part);
Ok(())
})
}

async fn complete(&mut self) -> Result<PutResult> {
let parts = self.state.parts.finish(self.part_idx)?;

self.state
.client
.complete_multipart(&self.state.location, &self.state.upload_id, parts)
.await
}

async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()> {
self.client
.complete_multipart(&self.location, &self.upload_id, completed_parts)
.await?;
Ok(())
async fn abort(&mut self) -> Result<()> {
self.state
.client
.delete_request(&self.state.location, &[("uploadId", &self.state.upload_id)])
.await
}
}

Expand Down Expand Up @@ -377,7 +395,6 @@ mod tests {
use crate::{client::get::GetClient, tests::*};
use bytes::Bytes;
use hyper::HeaderMap;
use tokio::io::AsyncWriteExt;

const NON_EXISTENT_NAME: &str = "nonexistentname";

Expand Down Expand Up @@ -542,9 +559,9 @@ mod tests {
store.put(&locations[0], data.clone()).await.unwrap();
store.copy(&locations[0], &locations[1]).await.unwrap();

let (_, mut writer) = store.put_multipart(&locations[2]).await.unwrap();
writer.write_all(&data).await.unwrap();
writer.shutdown().await.unwrap();
let mut upload = store.upload(&locations[2]).await.unwrap();
upload.put_part(data.clone()).await.unwrap();
upload.complete().await.unwrap();

for location in &locations {
let res = store
Expand Down
65 changes: 42 additions & 23 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
path::Path,
signer::Signer,
GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult,
Result,
Result, Upload, UploadPart,
};
use async_trait::async_trait;
use bytes::Bytes;
Expand All @@ -36,7 +36,6 @@ use reqwest::Method;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::AsyncWrite;
use url::Url;

use crate::client::get::GetClientExt;
Expand All @@ -50,6 +49,8 @@ mod credential;

/// [`CredentialProvider`] for [`MicrosoftAzure`]
pub type AzureCredentialProvider = Arc<dyn CredentialProvider<Credential = AzureCredential>>;
use crate::azure::client::AzureClient;
use crate::client::parts::Parts;
pub use builder::{AzureConfigKey, MicrosoftAzureBuilder};
pub use credential::AzureCredential;

Expand Down Expand Up @@ -90,22 +91,17 @@ impl ObjectStore for MicrosoftAzure {
self.client.put_blob(location, bytes, opts).await
}

async fn put_multipart(
&self,
location: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
let inner = AzureMultiPartUpload {
client: Arc::clone(&self.client),
location: location.to_owned(),
};
Ok((String::new(), Box::new(WriteMultiPart::new(inner, 8))))
async fn upload(&self, location: &Path) -> Result<Box<dyn Upload>> {
Ok(Box::new(AzureMultiPartUpload {
part_idx: 0,
state: Arc::new(UploadState {
client: Arc::clone(&self.client),
location: location.clone(),
parts: Default::default(),
}),
}))
}

async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> {
// There is no way to drop blocks that have been uploaded. Instead, they simply
// expire in 7 days.
Ok(())
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
self.client.get_opts(location, options).await
Expand Down Expand Up @@ -193,20 +189,43 @@ impl Signer for MicrosoftAzure {
/// put_multipart_part -> PUT block
/// complete -> PUT block list
/// abort -> No equivalent; blocks are simply dropped after 7 days
#[derive(Debug, Clone)]
#[derive(Debug)]
struct AzureMultiPartUpload {
client: Arc<client::AzureClient>,
part_idx: usize,
state: Arc<UploadState>,
}

#[derive(Debug)]
struct UploadState {
location: Path,
parts: Parts,
client: Arc<AzureClient>,
}

#[async_trait]
impl PutPart for AzureMultiPartUpload {
async fn put_part(&self, buf: Vec<u8>, idx: usize) -> Result<PartId> {
self.client.put_block(&self.location, idx, buf.into()).await
impl Upload for AzureMultiPartUpload {
fn put_part(&mut self, data: Bytes) -> UploadPart {
let idx = self.part_idx;
self.part_idx += 1;
let state = Arc::clone(&self.state);
Box::pin(async move {
let part = state.client.put_block(&state.location, idx, data).await?;
state.parts.put(idx, part);
Ok(())
})
}

async fn complete(&mut self) -> Result<PutResult> {
let parts = self.state.parts.finish(self.part_idx)?;

self.state
.client
.put_block_list(&self.state.location, parts)
.await
}

async fn complete(&self, parts: Vec<PartId>) -> Result<()> {
self.client.put_block_list(&self.location, parts).await?;
async fn abort(&mut self) -> Result<()> {
// Nothing to do
Ok(())
}
}
Expand Down
4 changes: 2 additions & 2 deletions object_store/src/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::io::{Error, ErrorKind, SeekFrom};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};

/// The default buffer size used by [`BufReader`]
pub const DEFAULT_BUFFER_SIZE: usize = 1024 * 1024;
Expand Down Expand Up @@ -367,7 +367,7 @@ mod tests {
use super::*;
use crate::memory::InMemory;
use crate::path::Path;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};

#[tokio::test]
async fn test_buf_reader() {
Expand Down
3 changes: 1 addition & 2 deletions object_store/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,14 @@ use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use snafu::{OptionExt, ResultExt, Snafu};
use tokio::io::AsyncWrite;
use url::Url;

use crate::client::get::GetClientExt;
use crate::client::header::get_etag;
use crate::http::client::Client;
use crate::path::Path;
use crate::{
ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, ObjectMeta,
ObjectStore, PutMode, PutOptions, PutResult, Result, RetryConfig, Upload,
};

Expand Down
2 changes: 1 addition & 1 deletion object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2197,7 +2197,7 @@ mod tests {
pub(crate) async fn tagging<F, Fut>(storage: &dyn ObjectStore, validate: bool, get_tags: F)
where
F: Fn(Path) -> Fut + Send + Sync,
Fut: Future<Output = Result<reqwest::Response>> + Send,
Fut: std::future::Future<Output = Result<reqwest::Response>> + Send,
{
use bytes::Buf;
use serde::Deserialize;
Expand Down
9 changes: 1 addition & 8 deletions object_store/tests/get_range_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use object_store::path::Path;
use object_store::*;
use std::fmt::Formatter;
use tempfile::tempdir;
use tokio::io::AsyncWrite;

#[derive(Debug)]
struct MyStore(LocalFileSystem);
Expand All @@ -42,16 +41,10 @@ impl ObjectStore for MyStore {
self.0.put_opts(path, data, opts).await
}

async fn put_multipart(
&self,
_: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
async fn upload(&self, _location: &Path) -> Result<Box<dyn Upload>> {
todo!()
}

async fn abort_multipart(&self, _: &Path, _: &MultipartId) -> Result<()> {
todo!()
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
self.0.get_opts(location, options).await
Expand Down

0 comments on commit 1c8a965

Please sign in to comment.