Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement sink for services s3 #2508

Merged
merged 11 commits into from
Jun 23, 2023
3 changes: 2 additions & 1 deletion bindings/haskell/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ use std::ffi::CStr;
use std::mem;
use std::os::raw::c_char;
use std::str::FromStr;
use types::{ByteSlice, Metadata};

use ::opendal as od;
use result::FFIResult;
use types::ByteSlice;
use types::Metadata;

/// # Safety
///
Expand Down
3 changes: 2 additions & 1 deletion bindings/haskell/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use std::ffi::c_char;

use ::opendal as od;
use chrono::SecondsFormat;
use std::ffi::c_char;

#[repr(C)]
#[derive(Debug)]
Expand Down
1 change: 1 addition & 0 deletions core/src/services/s3/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,7 @@ impl Accessor for S3Backend {
read_with_override_content_disposition: true,

write: true,
write_can_sink: true,
write_with_cache_control: true,
write_with_content_type: true,
write_without_content_length: true,
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/s3/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ impl S3Core {
pub fn s3_put_object_request(
&self,
path: &str,
size: Option<usize>,
size: Option<u64>,
content_type: Option<&str>,
content_disposition: Option<&str>,
cache_control: Option<&str>,
Expand Down
24 changes: 15 additions & 9 deletions core/src/services/s3/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ impl S3Writer {
}
}

async fn write_oneshot(&self, bs: Bytes) -> Result<()> {
async fn write_oneshot(&self, size: u64, body: AsyncBody) -> Result<()> {
let mut req = self.core.s3_put_object_request(
&self.path,
Some(bs.len()),
Some(size),
self.op.content_type(),
self.op.content_disposition(),
self.op.cache_control(),
AsyncBody::Bytes(bs),
body,
)?;

self.core.sign(&mut req).await?;
Expand Down Expand Up @@ -154,7 +154,9 @@ impl oio::Write for S3Writer {
Some(upload_id) => upload_id,
None => {
if self.op.content_length().unwrap_or_default() == bs.len() as u64 {
return self.write_oneshot(bs).await;
return self
.write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs))
.await;
} else {
let upload_id = self.initiate_upload().await?;
self.upload_id = Some(upload_id);
Expand Down Expand Up @@ -192,11 +194,15 @@ impl oio::Write for S3Writer {
}
}

async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
))
async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
if self.op.content_length().unwrap_or_default() == size {
return self.write_oneshot(size, AsyncBody::Stream(s)).await;
suyanhanx marked this conversation as resolved.
Show resolved Hide resolved
} else {
return Err(Error::new(
ErrorKind::Unsupported,
"S3 does not support streaming multipart upload",
));
}
}

async fn abort(&mut self) -> Result<()> {
Expand Down
8 changes: 7 additions & 1 deletion core/src/services/supabase/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
let (parts, body) = resp.into_parts();
let bs = body.bytes().await?;

let (mut kind, mut retryable) = (ErrorKind::Unexpected, false);
// Check HTTP status code first/
let (mut kind, mut retryable) = match parts.status.as_u16() {
500 | 502 | 503 | 504 => (ErrorKind::Unexpected, true),
_ => (ErrorKind::Unexpected, false),
};

// Than extrace the error message.
let (message, _) = from_slice::<SupabaseError>(&bs)
.map(|sb_err| {
(kind, retryable) = parse_supabase_error(&sb_err);
Expand Down
2 changes: 2 additions & 0 deletions core/src/types/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ pub struct Capability {

/// If operator supports write natively, it will be true.
pub write: bool,
/// If operator supports write by sink a stream into, it will be true.
pub write_can_sink: bool,
/// If operator supports write with without content length, it will
/// be true.
///
Expand Down
51 changes: 51 additions & 0 deletions core/src/types/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use bytes::Bytes;
use futures::future::BoxFuture;
use futures::AsyncWrite;
use futures::FutureExt;
use futures::TryStreamExt;

use crate::raw::oio::Write;
use crate::raw::*;
Expand Down Expand Up @@ -90,6 +91,56 @@ impl Writer {
}
}

/// Sink into writer.
///
/// sink will read data from given streamer and write them into writer
/// directly without extra in-memory buffer.
///
/// # Notes
///
/// - Sink doesn't support to be used with write concurrently.
/// - Sink doesn't support to be used without content length now.
///
/// # Examples
///
/// ```no_run
/// use std::io::Result;
///
/// use bytes::Bytes;
/// use futures::stream;
/// use futures::StreamExt;
/// use opendal::Operator;
///
/// #[tokio::main]
/// async fn sink_example(op: Operator) -> Result<()> {
/// let mut w = op
/// .writer_with("path/to/file")
/// .content_length(2 * 4096)
/// .await?;
/// let stream = stream::iter(vec![vec![0; 4096], vec![1; 4096]]).map(Ok);
/// w.sink(2 * 4096, stream).await?;
/// w.close().await?;
/// Ok(())
/// }
/// ```
pub async fn sink<S, T>(&mut self, size: u64, sink_from: S) -> Result<()>
where
S: futures::Stream<Item = Result<T>> + Send + Sync + Unpin + 'static,
T: Into<Bytes>,
{
if let State::Idle(Some(w)) = &mut self.state {
let s = Box::new(oio::into_stream::from_futures_stream(
sink_from.map_ok(|v| v.into()),
));
w.sink(size, s).await
} else {
unreachable!(
"writer state invalid while pipe_form, expect Idle, actual {}",
self.state
);
}
}

/// Abort the writer and clean up all written data.
///
/// ## Notes
Expand Down
42 changes: 42 additions & 0 deletions core/tests/behavior/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::time::Duration;
use anyhow::Result;
use futures::io::BufReader;
use futures::io::Cursor;
use futures::stream;
use futures::AsyncReadExt;
use futures::AsyncSeekExt;
use futures::StreamExt;
Expand Down Expand Up @@ -111,6 +112,7 @@ macro_rules! behavior_write_tests {
test_delete_stream,
test_remove_one_file,
test_writer_write,
test_writer_sink,
test_writer_abort,
test_writer_futures_copy,
test_fuzz_unsized_writer,
Expand Down Expand Up @@ -1121,6 +1123,46 @@ pub async fn test_writer_write(op: Operator) -> Result<()> {
Ok(())
}

/// Streaming data into writer
pub async fn test_writer_sink(op: Operator) -> Result<()> {
let cap = op.info().capability();
if !(cap.write && cap.write_can_sink) {
return Ok(());
}

let path = uuid::Uuid::new_v4().to_string();
let size = 5 * 1024 * 1024; // write file with 5 MiB
let content_a = gen_fixed_bytes(size);
let content_b = gen_fixed_bytes(size);
let stream = stream::iter(vec![content_a.clone(), content_b.clone()]).map(Ok);

let mut w = op
.writer_with(&path)
.content_length(2 * size as u64)
.await?;
w.sink(2 * size as u64, stream).await?;
w.close().await?;

let meta = op.stat(&path).await.expect("stat must succeed");
assert_eq!(meta.content_length(), (size * 2) as u64);

let bs = op.read(&path).await?;
assert_eq!(bs.len(), size * 2, "read size");
assert_eq!(
format!("{:x}", Sha256::digest(&bs[..size])),
format!("{:x}", Sha256::digest(content_a)),
"read content a"
);
assert_eq!(
format!("{:x}", Sha256::digest(&bs[size..])),
format!("{:x}", Sha256::digest(content_b)),
"read content b"
);

op.delete(&path).await.expect("delete must succeed");
Ok(())
}

/// Copy data from reader to writer
pub async fn test_writer_futures_copy(op: Operator) -> Result<()> {
let path = uuid::Uuid::new_v4().to_string();
Expand Down