Skip to content

Commit

Permalink
Use bytes instead
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo committed Feb 28, 2023
1 parent b21420e commit c527a4e
Show file tree
Hide file tree
Showing 18 changed files with 63 additions and 45 deletions.
6 changes: 3 additions & 3 deletions src/layers/concurrent_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,11 +365,11 @@ impl<R: output::BlockingRead> output::BlockingRead for ConcurrentLimitWrapper<R>

#[async_trait]
impl<R: output::Write> output::Write for ConcurrentLimitWrapper<R> {
async fn write(&mut self, bs: Vec<u8>) -> Result<()> {
async fn write(&mut self, bs: Bytes) -> Result<()> {
self.inner.write(bs).await
}

async fn append(&mut self, bs: Vec<u8>) -> Result<()> {
async fn append(&mut self, bs: Bytes) -> Result<()> {
self.inner.append(bs).await
}

Expand All @@ -379,7 +379,7 @@ impl<R: output::Write> output::Write for ConcurrentLimitWrapper<R> {
}

impl<R: output::BlockingWrite> output::BlockingWrite for ConcurrentLimitWrapper<R> {
fn write(&mut self, bs: Vec<u8>) -> Result<()> {
fn write(&mut self, bs: Bytes) -> Result<()> {
self.inner.write(bs)
}

Expand Down
5 changes: 3 additions & 2 deletions src/object/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::io::Read;
use std::ops::RangeBounds;
use std::sync::Arc;

use bytes::Bytes;
use flagset::FlagSet;
use futures::AsyncReadExt;
use time::Duration;
Expand Down Expand Up @@ -555,7 +556,7 @@ impl Object {
/// # Ok(())
/// # }
/// ```
pub async fn write_with(&self, args: OpWrite, bs: impl Into<Vec<u8>>) -> Result<()> {
pub async fn write_with(&self, args: OpWrite, bs: impl Into<Bytes>) -> Result<()> {
if !validate_path(self.path(), ObjectMode::FILE) {
return Err(
Error::new(ErrorKind::ObjectIsADirectory, "write path is a directory")
Expand Down Expand Up @@ -621,7 +622,7 @@ impl Object {
/// # Ok(())
/// # }
/// ```
pub fn blocking_write_with(&self, args: OpWrite, bs: impl Into<Vec<u8>>) -> Result<()> {
pub fn blocking_write_with(&self, args: OpWrite, bs: impl Into<Bytes>) -> Result<()> {
if !validate_path(self.path(), ObjectMode::FILE) {
return Err(
Error::new(ErrorKind::ObjectIsADirectory, "write path is a directory")
Expand Down
7 changes: 4 additions & 3 deletions src/raw/adapters/kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;

use async_trait::async_trait;
use bytes::Bytes;

use super::Adapter;
use crate::ops::*;
Expand Down Expand Up @@ -290,13 +291,13 @@ impl<S> KvWriter<S> {

#[async_trait]
impl<S: Adapter> output::Write for KvWriter<S> {
async fn write(&mut self, bs: Vec<u8>) -> Result<()> {
async fn write(&mut self, bs: Bytes) -> Result<()> {
self.kv.set(&self.path, &bs).await?;

Ok(())
}

async fn append(&mut self, bs: Vec<u8>) -> Result<()> {
async fn append(&mut self, bs: Bytes) -> Result<()> {
self.buf.extend(bs);

Ok(())
Expand All @@ -310,7 +311,7 @@ impl<S: Adapter> output::Write for KvWriter<S> {
}

impl<S: Adapter> output::BlockingWrite for KvWriter<S> {
fn write(&mut self, bs: Vec<u8>) -> Result<()> {
fn write(&mut self, bs: Bytes) -> Result<()> {
self.kv.blocking_set(&self.path, &bs)?;

Ok(())
Expand Down
8 changes: 5 additions & 3 deletions src/raw/io/output/blocking_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use bytes::Bytes;

use crate::*;

/// BlockingWriter is a type erased [`BlockingWrite`]
Expand All @@ -24,14 +26,14 @@ pub trait BlockingWrite: Send + Sync + 'static {
/// We consume the writer here to indicate that users should
/// write all content. To append multiple bytes together, use
/// `append` instead.
fn write(&mut self, bs: Vec<u8>) -> Result<()>;
fn write(&mut self, bs: Bytes) -> Result<()>;

/// Close the writer and make sure all data has been flushed.
fn close(&mut self) -> Result<()>;
}

impl BlockingWrite for () {
fn write(&mut self, bs: Vec<u8>) -> Result<()> {
fn write(&mut self, bs: Bytes) -> Result<()> {
let _ = bs;

unimplemented!("write is required to be implemented for output::BlockingWrite")
Expand All @@ -48,7 +50,7 @@ impl BlockingWrite for () {
/// `Box<dyn BlockingWrite>` won't implement `BlockingWrite` automanticly.
/// To make BlockingWriter work as expected, we must add this impl.
impl<T: BlockingWrite + ?Sized> BlockingWrite for Box<T> {
fn write(&mut self, bs: Vec<u8>) -> Result<()> {
fn write(&mut self, bs: Bytes) -> Result<()> {
(**self).write(bs)
}

Expand Down
13 changes: 7 additions & 6 deletions src/raw/io/output/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use crate::*;

use async_trait::async_trait;
use bytes::Bytes;

/// Writer is a type erased [`Write`]
pub type Writer = Box<dyn Write>;
Expand All @@ -25,28 +26,28 @@ pub trait Write: Unpin + Send + Sync {
/// Write whole content at once.
///
/// To append multiple bytes together, use `append` instead.
async fn write(&mut self, bs: Vec<u8>) -> Result<()>;
async fn write(&mut self, bs: Bytes) -> Result<()>;

/// Append bytes to the writer.
///
/// It is highly recommended to align the length of the input bytes
/// into blocks of 4MiB (except the last block) for better performance
/// and compatibility.
async fn append(&mut self, bs: Vec<u8>) -> Result<()>;
async fn append(&mut self, bs: Bytes) -> Result<()>;

/// Close the writer and make sure all data has been flushed.
async fn close(&mut self) -> Result<()>;
}

#[async_trait]
impl Write for () {
async fn write(&mut self, bs: Vec<u8>) -> Result<()> {
async fn write(&mut self, bs: Bytes) -> Result<()> {
let _ = bs;

unimplemented!("write is required to be implemented for output::Write")
}

async fn append(&mut self, bs: Vec<u8>) -> Result<()> {
async fn append(&mut self, bs: Bytes) -> Result<()> {
let _ = bs;

Err(Error::new(
Expand All @@ -67,11 +68,11 @@ impl Write for () {
/// work as expected, we must add this impl.
#[async_trait]
impl<T: Write + ?Sized> Write for Box<T> {
async fn write(&mut self, bs: Vec<u8>) -> Result<()> {
async fn write(&mut self, bs: Bytes) -> Result<()> {
(**self).write(bs).await
}

async fn append(&mut self, bs: Vec<u8>) -> Result<()> {
async fn append(&mut self, bs: Bytes) -> Result<()> {
(**self).append(bs).await
}

Expand Down
5 changes: 3 additions & 2 deletions src/services/azblob/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use async_trait::async_trait;
use bytes::Bytes;
use http::StatusCode;

use super::backend::AzblobBackend;
Expand All @@ -36,7 +37,7 @@ impl AzblobWriter {

#[async_trait]
impl output::Write for AzblobWriter {
async fn write(&mut self, bs: Vec<u8>) -> Result<()> {
async fn write(&mut self, bs: Bytes) -> Result<()> {
let mut req = self.backend.azblob_put_blob_request(
&self.path,
Some(self.op.size()),
Expand All @@ -62,7 +63,7 @@ impl output::Write for AzblobWriter {
}
}

async fn append(&mut self, bs: Vec<u8>) -> Result<()> {
async fn append(&mut self, bs: Bytes) -> Result<()> {
let _ = bs;

Err(Error::new(
Expand Down
5 changes: 3 additions & 2 deletions src/services/azdfs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use async_trait::async_trait;
use bytes::Bytes;
use http::StatusCode;

use super::backend::AzdfsBackend;
Expand All @@ -36,7 +37,7 @@ impl AzdfsWriter {

#[async_trait]
impl output::Write for AzdfsWriter {
async fn write(&mut self, bs: Vec<u8>) -> Result<()> {
async fn write(&mut self, bs: Bytes) -> Result<()> {
let mut req = self.backend.azdfs_create_request(
&self.path,
"file",
Expand Down Expand Up @@ -89,7 +90,7 @@ impl output::Write for AzdfsWriter {
}
}

async fn append(&mut self, bs: Vec<u8>) -> Result<()> {
async fn append(&mut self, bs: Bytes) -> Result<()> {
let _ = bs;

Err(Error::new(
Expand Down
7 changes: 4 additions & 3 deletions src/services/fs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::io::Write;
use std::path::PathBuf;

use async_trait::async_trait;
use bytes::Bytes;
use tokio::io::AsyncSeekExt;
use tokio::io::AsyncWriteExt;

Expand Down Expand Up @@ -50,7 +51,7 @@ impl output::Write for FsWriter<tokio::fs::File> {
///
/// File could be partial written, so we will seek to start to make sure
/// we write the same content.
async fn write(&mut self, bs: Vec<u8>) -> Result<()> {
async fn write(&mut self, bs: Bytes) -> Result<()> {
self.f
.seek(SeekFrom::Start(0))
.await
Expand All @@ -64,7 +65,7 @@ impl output::Write for FsWriter<tokio::fs::File> {
///
/// File could be partial written, so we will seek to start to make sure
/// we write the same content.
async fn append(&mut self, bs: Vec<u8>) -> Result<()> {
async fn append(&mut self, bs: Bytes) -> Result<()> {
self.f
.seek(SeekFrom::Start(self.pos))
.await
Expand All @@ -87,7 +88,7 @@ impl output::BlockingWrite for FsWriter<std::fs::File> {
///
/// File could be partial written, so we will seek to start to make sure
/// we write the same content.
fn write(&mut self, bs: Vec<u8>) -> Result<()> {
fn write(&mut self, bs: Bytes) -> Result<()> {
self.f.seek(SeekFrom::Start(0)).map_err(parse_io_error)?;
self.f.write_all(&bs).map_err(parse_io_error)?;

Expand Down
5 changes: 3 additions & 2 deletions src/services/ftp/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use async_trait::async_trait;
use bytes::Bytes;
use futures::AsyncWriteExt;

use super::backend::FtpBackend;
Expand All @@ -37,7 +38,7 @@ impl FtpWriter {

#[async_trait]
impl output::Write for FtpWriter {
async fn write(&mut self, bs: Vec<u8>) -> Result<()> {
async fn write(&mut self, bs: Bytes) -> Result<()> {
let mut ftp_stream = self.backend.ftp_connect(Operation::Write).await?;
let mut data_stream = ftp_stream.append_with_stream(&self.path).await?;
data_stream.write_all(&bs).await.map_err(|err| {
Expand All @@ -49,7 +50,7 @@ impl output::Write for FtpWriter {
Ok(())
}

async fn append(&mut self, bs: Vec<u8>) -> Result<()> {
async fn append(&mut self, bs: Bytes) -> Result<()> {
let _ = bs;

Err(Error::new(
Expand Down
5 changes: 3 additions & 2 deletions src/services/gcs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use async_trait::async_trait;
use bytes::Bytes;
use http::StatusCode;

use super::backend::GcsBackend;
Expand All @@ -36,7 +37,7 @@ impl GcsWriter {

#[async_trait]
impl output::Write for GcsWriter {
async fn write(&mut self, bs: Vec<u8>) -> Result<()> {
async fn write(&mut self, bs: Bytes) -> Result<()> {
let mut req = self.backend.gcs_insert_object_request(
&self.path,
Some(self.op.size()),
Expand All @@ -62,7 +63,7 @@ impl output::Write for GcsWriter {
}
}

async fn append(&mut self, bs: Vec<u8>) -> Result<()> {
async fn append(&mut self, bs: Bytes) -> Result<()> {
let _ = bs;

Err(Error::new(
Expand Down
5 changes: 3 additions & 2 deletions src/services/ghac/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use async_trait::async_trait;
use bytes::Bytes;

use super::backend::GhacBackend;
use super::error::parse_error;
Expand Down Expand Up @@ -43,7 +44,7 @@ impl GhacWriter {

#[async_trait]
impl output::Write for GhacWriter {
async fn write(&mut self, bs: Vec<u8>) -> Result<()> {
async fn write(&mut self, bs: Bytes) -> Result<()> {
let size = bs.len() as u64;
let req = self
.backend
Expand All @@ -63,7 +64,7 @@ impl output::Write for GhacWriter {
}
}

async fn append(&mut self, bs: Vec<u8>) -> Result<()> {
async fn append(&mut self, bs: Bytes) -> Result<()> {
let _ = bs;

Err(Error::new(
Expand Down
7 changes: 4 additions & 3 deletions src/services/hdfs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::io::SeekFrom;
use std::io::Write;

use async_trait::async_trait;
use bytes::Bytes;
use futures::AsyncSeekExt;
use futures::AsyncWriteExt;

Expand All @@ -42,7 +43,7 @@ impl output::Write for HdfsWriter<hdrs::AsyncFile> {
///
/// File could be partial written, so we will seek to start to make sure
/// we write the same content.
async fn write(&mut self, bs: Vec<u8>) -> Result<()> {
async fn write(&mut self, bs: Bytes) -> Result<()> {
self.f
.seek(SeekFrom::Start(0))
.await
Expand All @@ -56,7 +57,7 @@ impl output::Write for HdfsWriter<hdrs::AsyncFile> {
///
/// File could be partial written, so we will seek to start to make sure
/// we write the same content.
async fn append(&mut self, bs: Vec<u8>) -> Result<()> {
async fn append(&mut self, bs: Bytes) -> Result<()> {
self.f
.seek(SeekFrom::Start(self.pos))
.await
Expand All @@ -79,7 +80,7 @@ impl output::BlockingWrite for HdfsWriter<hdrs::File> {
///
/// File could be partial written, so we will seek to start to make sure
/// we write the same content.
fn write(&mut self, bs: Vec<u8>) -> Result<()> {
fn write(&mut self, bs: Bytes) -> Result<()> {
self.f.seek(SeekFrom::Start(0)).map_err(parse_io_error)?;
self.f.write_all(&bs).map_err(parse_io_error)?;

Expand Down
5 changes: 3 additions & 2 deletions src/services/ipmfs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use async_trait::async_trait;
use bytes::Bytes;
use http::StatusCode;

use super::backend::IpmfsBackend;
Expand All @@ -36,7 +37,7 @@ impl IpmfsWriter {

#[async_trait]
impl output::Write for IpmfsWriter {
async fn write(&mut self, bs: Vec<u8>) -> Result<()> {
async fn write(&mut self, bs: Bytes) -> Result<()> {
let resp = self
.backend
.ipmfs_write(
Expand All @@ -56,7 +57,7 @@ impl output::Write for IpmfsWriter {
}
}

async fn append(&mut self, bs: Vec<u8>) -> Result<()> {
async fn append(&mut self, bs: Bytes) -> Result<()> {
let _ = bs;

Err(Error::new(
Expand Down
Loading

1 comment on commit c527a4e

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for opendal ready!

✅ Preview
https://opendal-cobkgav0e-databend.vercel.app
https://opendal-git-object-writer.vercel.app

Built with commit c527a4e.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.