Skip to content

Commit

Permalink
Remove multipart related code
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo committed Mar 1, 2023
1 parent 4b93bac commit 8329a44
Show file tree
Hide file tree
Showing 21 changed files with 2 additions and 1,361 deletions.
57 changes: 0 additions & 57 deletions src/layers/concurrent_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,63 +176,6 @@ impl<A: Accessor> LayeredAccessor for ConcurrentLimitAccessor<A> {
.map(|(rp, s)| (rp, ConcurrentLimitWrapper::new(s, permit)))
}

async fn create_multipart(
&self,
path: &str,
args: OpCreateMultipart,
) -> Result<RpCreateMultipart> {
let _permit = self
.semaphore
.acquire()
.await
.expect("semaphore must be valid");

self.inner.create_multipart(path, args).await
}

async fn write_multipart(
&self,
path: &str,
args: OpWriteMultipart,
r: input::Reader,
) -> Result<RpWriteMultipart> {
let _permit = self
.semaphore
.acquire()
.await
.expect("semaphore must be valid");

self.inner.write_multipart(path, args, r).await
}

async fn complete_multipart(
&self,
path: &str,
args: OpCompleteMultipart,
) -> Result<RpCompleteMultipart> {
let _permit = self
.semaphore
.acquire()
.await
.expect("semaphore must be valid");

self.inner.complete_multipart(path, args).await
}

async fn abort_multipart(
&self,
path: &str,
args: OpAbortMultipart,
) -> Result<RpAbortMultipart> {
let _permit = self
.semaphore
.acquire()
.await
.expect("semaphore must be valid");

self.inner.abort_multipart(path, args).await
}

async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
let _permit = self
.semaphore
Expand Down
61 changes: 0 additions & 61 deletions src/layers/error_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,67 +180,6 @@ impl<A: Accessor> LayeredAccessor for ErrorContextAccessor<A> {
})
}

async fn create_multipart(
&self,
path: &str,
args: OpCreateMultipart,
) -> Result<RpCreateMultipart> {
self.inner
.create_multipart(path, args)
.map_err(|err| {
err.with_operation(Operation::CreateMultipart.into_static())
.with_context("service", self.meta.scheme())
.with_context("path", path)
})
.await
}

async fn write_multipart(
&self,
path: &str,
args: OpWriteMultipart,
r: input::Reader,
) -> Result<RpWriteMultipart> {
self.inner
.write_multipart(path, args, r)
.map_err(|err| {
err.with_operation(Operation::WriteMultipart.into_static())
.with_context("service", self.meta.scheme())
.with_context("path", path)
})
.await
}

async fn complete_multipart(
&self,
path: &str,
args: OpCompleteMultipart,
) -> Result<RpCompleteMultipart> {
self.inner
.complete_multipart(path, args)
.map_err(|err| {
err.with_operation(Operation::CompleteMultipart.into_static())
.with_context("service", self.meta.scheme())
.with_context("path", path)
})
.await
}

async fn abort_multipart(
&self,
path: &str,
args: OpAbortMultipart,
) -> Result<RpAbortMultipart> {
self.inner
.abort_multipart(path, args)
.map_err(|err| {
err.with_operation(Operation::AbortMultipart.into_static())
.with_context("service", self.meta.scheme())
.with_context("path", path)
})
.await
}

async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
self.inner
.batch(args)
Expand Down
187 changes: 0 additions & 187 deletions src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,193 +573,6 @@ impl<A: Accessor> LayeredAccessor for LoggingAccessor<A> {
.await
}

async fn create_multipart(
&self,
path: &str,
args: OpCreateMultipart,
) -> Result<RpCreateMultipart> {
debug!(
target: LOGGING_TARGET,
"service={} operation={} path={} -> started",
self.scheme,
Operation::CreateMultipart,
path
);

self.inner
.create_multipart(path, args.clone())
.inspect(|v| match v {
Ok(_) => {
debug!(
target: LOGGING_TARGET,
"service={} operation={} path={} -> finished",
self.scheme,
Operation::CreateMultipart,
path
);
}
Err(err) => {
if let Some(lvl) = self.err_level(err) {
log!(
target: LOGGING_TARGET,
lvl,
"service={} operation={} path={} -> {}: {err:?}",
self.scheme,
Operation::CreateMultipart,
path,
self.err_status(err)
);
}
}
})
.await
}

async fn write_multipart(
&self,
path: &str,
args: OpWriteMultipart,
r: input::Reader,
) -> Result<RpWriteMultipart> {
debug!(
target: LOGGING_TARGET,
"service={} operation={} path={} upload_id={} part_number={:?} size={:?} -> started",
self.scheme,
Operation::WriteMultipart,
path,
args.upload_id(),
args.part_number(),
args.size()
);

let r = LoggingReader::new(
self.scheme,
Operation::Write,
path,
Some(args.size()),
r,
self.failure_level,
);
let r = Box::new(r);

self.inner
.write_multipart(path, args.clone(), r)
.inspect_ok(|_| {
debug!(
target: LOGGING_TARGET,
"service={} operation={} path={} upload_id={} part_number={:?} size={:?} -> written",
self.scheme,
Operation::WriteMultipart,
path,
args.upload_id(),
args.part_number(),
args.size()
);
})
.inspect_err(|err| {
if let Some(lvl) = self.err_level(err) {
log!(
target: LOGGING_TARGET,
lvl,
"service={} operation={} path={} upload_id={} part_number={:?} size={:?} -> {}: {err:?}",
self.scheme,
Operation::WriteMultipart,
path,
args.upload_id(),
args.part_number(),
args.size(),
self.err_status(err)
);
}
}).await
}

async fn complete_multipart(
&self,
path: &str,
args: OpCompleteMultipart,
) -> Result<RpCompleteMultipart> {
debug!(
target: LOGGING_TARGET,
"service={} operation={} path={} upload_id={} -> started",
self.scheme,
Operation::CompleteMultipart,
path,
args.upload_id(),
);

self.inner
.complete_multipart(path, args.clone())
.inspect_ok(|_| {
debug!(
target: LOGGING_TARGET,
"service={} operation={} path={} upload_id={} -> finished",
self.scheme,
Operation::CompleteMultipart,
path,
args.upload_id()
);
})
.inspect_err(|err| {
if let Some(lvl) = self.err_level(err) {
log!(
target: LOGGING_TARGET,
lvl,
"service={} operation={} path={} upload_id={} -> {}: {err:?}",
self.scheme,
Operation::CompleteMultipart,
path,
args.upload_id(),
self.err_status(err)
);
}
})
.await
}

async fn abort_multipart(
&self,
path: &str,
args: OpAbortMultipart,
) -> Result<RpAbortMultipart> {
debug!(
target: LOGGING_TARGET,
"service={} operation={} path={} upload_id={} -> started",
self.scheme,
Operation::AbortMultipart,
path,
args.upload_id()
);

self.inner
.abort_multipart(path, args.clone())
.inspect_ok(|_| {
debug!(
target: LOGGING_TARGET,
"service={} operation={} path={} upload_id={} -> finished",
self.scheme,
Operation::AbortMultipart,
path,
args.upload_id()
);
})
.inspect_err(|err| {
if let Some(lvl) = self.err_level(err) {
log!(
target: LOGGING_TARGET,
lvl,
"service={} operation={} path={} upload_id={} -> {}: {err:?}",
self.scheme,
Operation::AbortMultipart,
path,
args.upload_id(),
self.err_status(err)
);
}
})
.await
}

fn blocking_create(&self, path: &str, args: OpCreate) -> Result<RpCreate> {
debug!(
target: LOGGING_TARGET,
Expand Down
Loading

1 comment on commit 8329a44

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for opendal ready!

✅ Preview
https://opendal-lz8amukls-databend.vercel.app
https://opendal-git-object-writer.vercel.app

Built with commit 8329a44.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.