Skip to content

Commit

Permalink
Cleanup api
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo committed Mar 1, 2023
1 parent 3515430 commit 4b93bac
Show file tree
Hide file tree
Showing 25 changed files with 48 additions and 78 deletions.
24 changes: 8 additions & 16 deletions src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,26 +291,22 @@ impl<A: Accessor> LayeredAccessor for LoggingAccessor<A> {
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
debug!(
target: LOGGING_TARGET,
"service={} operation={} path={} size={:?} -> started",
"service={} operation={} path={} -> started",
self.scheme,
Operation::Write,
path,
args.size()
path
);

let size = args.size();

self.inner
.write(path, args)
.await
.map(|v| {
debug!(
target: LOGGING_TARGET,
"service={} operation={} path={} size={:?} -> written",
"service={} operation={} path={} -> start writing",
self.scheme,
Operation::Write,
path,
size
);
v
})
Expand All @@ -319,11 +315,10 @@ impl<A: Accessor> LayeredAccessor for LoggingAccessor<A> {
log!(
target: LOGGING_TARGET,
lvl,
"service={} operation={} path={} size={:?} -> {}: {err:?}",
"service={} operation={} path={} -> {}: {err:?}",
self.scheme,
Operation::Write,
path,
size,
self.err_status(&err)
)
};
Expand Down Expand Up @@ -853,23 +848,21 @@ impl<A: Accessor> LayeredAccessor for LoggingAccessor<A> {
fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
debug!(
target: LOGGING_TARGET,
"service={} operation={} path={} size={:?} -> started",
"service={} operation={} path={} -> started",
self.scheme,
Operation::BlockingWrite,
path,
args.size()
);

self.inner
.blocking_write(path, args.clone())
.blocking_write(path, args)
.map(|v| {
debug!(
target: LOGGING_TARGET,
"service={} operation={} path={} size={:?} -> written",
"service={} operation={} path={} -> written",
self.scheme,
Operation::BlockingWrite,
path,
args.size()
);
v
})
Expand All @@ -878,11 +871,10 @@ impl<A: Accessor> LayeredAccessor for LoggingAccessor<A> {
log!(
target: LOGGING_TARGET,
lvl,
"service={} operation={} path={} size={:?} -> {}: {err:?}",
"service={} operation={} path={} -> {}: {err:?}",
self.scheme,
Operation::BlockingWrite,
path,
args.size(),
self.err_status(&err)
);
}
Expand Down
12 changes: 4 additions & 8 deletions src/object/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,9 +529,7 @@ impl Object {
/// # }
/// ```
pub async fn write(&self, bs: impl Into<Bytes>) -> Result<()> {
let bs = bs.into();
let op = OpWrite::new(bs.len() as u64);
self.write_with(op, bs).await
self.write_with(OpWrite::new(), bs).await
}

/// Write multiple bytes into object.
Expand Down Expand Up @@ -633,10 +631,8 @@ impl Object {
/// # Ok(())
/// # }
/// ```
pub fn blocking_write(&self, bs: impl Into<Vec<u8>>) -> Result<()> {
let bs: Vec<u8> = bs.into();
let op = OpWrite::new(bs.len() as u64);
self.blocking_write_with(op, bs)
pub fn blocking_write(&self, bs: impl Into<Bytes>) -> Result<()> {
self.blocking_write_with(OpWrite::new(), bs)
}

/// Write multiple bytes into object.
Expand Down Expand Up @@ -1380,7 +1376,7 @@ impl Object {
/// curl -X PUT "https://s3.amazonaws.com/examplebucket/test.txt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=access_key_id/20130721/us-east-1/s3/aws4_request&X-Amz-Date=20130721T201207Z&X-Amz-Expires=86400&X-Amz-SignedHeaders=host&X-Amz-Signature=<signature-value>" -d "Hello, World!"
/// ```
pub fn presign_write(&self, expire: Duration) -> Result<PresignedRequest> {
self.presign_write_with(OpWrite::new(0), expire)
self.presign_write_with(OpWrite::new(), expire)
}

/// Presign an operation for write with option described in OpenDAL [rfc-0661](../../docs/rfcs/0661-path-in-accessor.md)
Expand Down
9 changes: 1 addition & 8 deletions src/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,6 @@ impl OpStat {
pub struct OpWrite {
append: bool,

size: u64,
content_type: Option<String>,
content_disposition: Option<String>,
}
Expand All @@ -371,21 +370,15 @@ impl OpWrite {
/// Create a new `OpWrite`.
///
/// If input path is not a file path, an error will be returned.
pub fn new(size: u64) -> Self {
pub fn new() -> Self {
Self {
append: false,

size,
content_type: None,
content_disposition: None,
}
}

/// Get size from option.
pub fn size(&self) -> u64 {
self.size
}

pub(crate) fn with_append(mut self) -> Self {
self.append = true;
self
Expand Down
8 changes: 4 additions & 4 deletions src/raw/adapters/kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,16 @@ impl<S: Adapter> Accessor for Backend<S> {
Ok((RpRead::new(bs.len() as u64), output::Cursor::from(bs)))
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let p = build_abs_path(&self.root, path);

Ok((RpWrite::new(args.size()), KvWriter::new(self.kv.clone(), p)))
Ok((RpWrite::new(), KvWriter::new(self.kv.clone(), p)))
}

fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
fn blocking_write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
let p = build_abs_path(&self.root, path);

Ok((RpWrite::new(args.size()), KvWriter::new(self.kv.clone(), p)))
Ok((RpWrite::new(), KvWriter::new(self.kv.clone(), p)))
}

async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
Expand Down
13 changes: 3 additions & 10 deletions src/raw/rps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,19 +275,12 @@ impl RpStat {

/// Reply for `write` operation.
#[derive(Debug, Clone, Default)]
pub struct RpWrite {
written: u64,
}
pub struct RpWrite {}

impl RpWrite {
/// Create a new reply for write.
pub fn new(written: u64) -> Self {
Self { written }
}

/// Get the written size (in bytes) of write operation.
pub fn written(&self) -> u64 {
self.written
pub fn new() -> Self {
Self {}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/services/azblob/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ impl AzblobBackend {
pub fn azblob_put_blob_request(
&self,
path: &str,
size: Option<u64>,
size: Option<usize>,
content_type: Option<&str>,
body: AsyncBody,
) -> Result<Request<AsyncBody>> {
Expand Down
2 changes: 1 addition & 1 deletion src/services/azblob/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl output::Write for AzblobWriter {
async fn write(&mut self, bs: Bytes) -> Result<()> {
let mut req = self.backend.azblob_put_blob_request(
&self.path,
Some(self.op.size()),
Some(bs.len()),
self.op.content_type(),
AsyncBody::Bytes(bs),
)?;
Expand Down
2 changes: 1 addition & 1 deletion src/services/azdfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ impl AzdfsBackend {
pub fn azdfs_update_request(
&self,
path: &str,
size: Option<u64>,
size: Option<usize>,
body: AsyncBody,
) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);
Expand Down
8 changes: 3 additions & 5 deletions src/services/azdfs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,9 @@ impl output::Write for AzdfsWriter {
}
}

let mut req = self.backend.azdfs_update_request(
&self.path,
Some(self.op.size()),
AsyncBody::Bytes(bs),
)?;
let mut req =
self.backend
.azdfs_update_request(&self.path, Some(bs.len()), AsyncBody::Bytes(bs))?;

self.backend
.signer
Expand Down
4 changes: 2 additions & 2 deletions src/services/fs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ impl Accessor for FsBackend {
.await
.map_err(parse_io_error)?;

Ok((RpWrite::new(0), FsWriter::new(target_path, tmp_path, f)))
Ok((RpWrite::new(), FsWriter::new(target_path, tmp_path, f)))
}

async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
Expand Down Expand Up @@ -628,7 +628,7 @@ impl Accessor for FsBackend {
.open(path)
.map_err(parse_io_error)?;

Ok((RpWrite::new(0), FsWriter::new(target_path, tmp_path, f)))
Ok((RpWrite::new(), FsWriter::new(target_path, tmp_path, f)))
}

fn blocking_stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
Expand Down
2 changes: 1 addition & 1 deletion src/services/ftp/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ impl Accessor for FtpBackend {

async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
Ok((
RpWrite::new(0),
RpWrite::new(),
FtpWriter::new(self.clone(), path.to_string()),
))
}
Expand Down
2 changes: 1 addition & 1 deletion src/services/gcs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ impl GcsBackend {
pub fn gcs_insert_object_request(
&self,
path: &str,
size: Option<u64>,
size: Option<usize>,
content_type: Option<&str>,
body: AsyncBody,
) -> Result<Request<AsyncBody>> {
Expand Down
2 changes: 1 addition & 1 deletion src/services/gcs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl output::Write for GcsWriter {
async fn write(&mut self, bs: Bytes) -> Result<()> {
let mut req = self.backend.gcs_insert_object_request(
&self.path,
Some(self.op.size()),
Some(bs.len()),
self.op.content_type(),
AsyncBody::Bytes(bs),
)?;
Expand Down
10 changes: 4 additions & 6 deletions src/services/ghac/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ impl Accessor for GhacBackend {
));
}

let req = self.ghac_reserve(path, 1).await?;
let req = self.ghac_reserve(path).await?;

let resp = self.client.send_async(req).await?;

Expand Down Expand Up @@ -392,8 +392,8 @@ impl Accessor for GhacBackend {
}
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let req = self.ghac_reserve(path, args.size()).await?;
async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let req = self.ghac_reserve(path).await?;

let resp = self.client.send_async(req).await?;

Expand Down Expand Up @@ -523,15 +523,14 @@ impl GhacBackend {
.map_err(new_request_build_error)
}

async fn ghac_reserve(&self, path: &str, size: u64) -> Result<Request<AsyncBody>> {
async fn ghac_reserve(&self, path: &str) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);

let url = format!("{}{CACHE_URL_BASE}/caches", self.cache_url);

let bs = serde_json::to_vec(&GhacReserveRequest {
key: p,
version: self.version.to_string(),
cache_size: size,
})
.map_err(new_json_serialize_error)?;

Expand Down Expand Up @@ -628,7 +627,6 @@ struct GhacQueryResponse {
struct GhacReserveRequest {
key: String,
version: String,
cache_size: u64,
}

#[derive(Deserialize)]
Expand Down
4 changes: 2 additions & 2 deletions src/services/hdfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ impl Accessor for HdfsBackend {
.await
.map_err(parse_io_error)?;

Ok((RpWrite::new(0), HdfsWriter::new(f)))
Ok((RpWrite::new(), HdfsWriter::new(f)))
}

async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
Expand Down Expand Up @@ -512,7 +512,7 @@ impl Accessor for HdfsBackend {
.open(&p)
.map_err(parse_io_error)?;

Ok((RpWrite::new(0), HdfsWriter::new(f)))
Ok((RpWrite::new(), HdfsWriter::new(f)))
}

fn blocking_stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
Expand Down
2 changes: 1 addition & 1 deletion src/services/obs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ impl ObsBackend {
pub fn obs_put_object_request(
&self,
path: &str,
size: Option<u64>,
size: Option<usize>,
content_type: Option<&str>,
body: AsyncBody,
) -> Result<Request<AsyncBody>> {
Expand Down
2 changes: 1 addition & 1 deletion src/services/obs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl output::Write for ObsWriter {
async fn write(&mut self, bs: Bytes) -> Result<()> {
let mut req = self.backend.obs_put_object_request(
&self.path,
Some(self.op.size()),
Some(bs.len()),
self.op.content_type(),
AsyncBody::Bytes(bs),
)?;
Expand Down
4 changes: 2 additions & 2 deletions src/services/oss/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ impl OssBackend {
pub fn oss_put_object_request(
&self,
path: &str,
size: Option<u64>,
size: Option<usize>,
content_type: Option<&str>,
content_disposition: Option<&str>,
body: AsyncBody,
Expand Down Expand Up @@ -735,7 +735,7 @@ impl OssBackend {
async fn oss_put_object(
&self,
path: &str,
size: Option<u64>,
size: Option<usize>,
content_type: Option<&str>,
content_disposition: Option<&str>,
body: AsyncBody,
Expand Down
2 changes: 1 addition & 1 deletion src/services/oss/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl output::Write for OssWriter {
async fn write(&mut self, bs: Bytes) -> Result<()> {
let mut req = self.backend.oss_put_object_request(
&self.path,
Some(self.op.size()),
Some(bs.len()),
self.op.content_type(),
self.op.content_disposition(),
AsyncBody::Bytes(bs),
Expand Down
2 changes: 1 addition & 1 deletion src/services/s3/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1366,7 +1366,7 @@ impl S3Backend {
pub fn s3_put_object_request(
&self,
path: &str,
size: Option<u64>,
size: Option<usize>,
content_type: Option<&str>,
content_disposition: Option<&str>,
body: AsyncBody,
Expand Down
Loading

0 comments on commit 4b93bac

Please sign in to comment.