Skip to content

Commit

Permalink
Merge pull request #7371 from Xuanwo/bump-opendal
Browse files Browse the repository at this point in the history
feat: Bump OpenDAL to v0.14
  • Loading branch information
mergify[bot] authored Aug 30, 2022
2 parents eb1c26e + 80f0a44 commit 869344f
Show file tree
Hide file tree
Showing 16 changed files with 54 additions and 48 deletions.
24 changes: 17 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/common/contexts/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ test = false
common-base = { path = "../base" }

async-trait = "0.1.56"
opendal = { version = "0.13.1", features = ["layers-retry"] }
opendal = { version = "0.14.1", features = ["layers-retry"] }
42 changes: 19 additions & 23 deletions src/common/contexts/src/dal/dal_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ use std::time::Instant;

use async_trait::async_trait;
use opendal::io_util::observe_read;
use opendal::io_util::observe_write;
use opendal::io_util::ReadEvent;
use opendal::io_util::WriteEvent;
use opendal::ops::OpCreate;
use opendal::ops::OpDelete;
use opendal::ops::OpList;
Expand All @@ -33,7 +31,6 @@ use opendal::ops::PresignedRequest;
use opendal::Accessor;
use opendal::AccessorMetadata;
use opendal::BytesReader;
use opendal::BytesWriter;
use opendal::DirStreamer;
use opendal::Layer;
use opendal::ObjectMetadata;
Expand Down Expand Up @@ -120,30 +117,29 @@ impl Accessor for DalContext {
})
}

async fn write(&self, args: &OpWrite) -> Result<BytesWriter> {
async fn write(&self, args: &OpWrite, r: BytesReader) -> Result<u64> {
let metric = self.metrics.clone();

self.get_inner()?.write(args).await.map(|w| {
let mut last_pending = None;
let w = observe_write(w, move |e| {
let start = match last_pending {
None => Instant::now(),
Some(t) => t,
};
match e {
WriteEvent::Pending => last_pending = Some(start),
WriteEvent::Written(n) => {
last_pending = None;
metric.inc_write_bytes(n);
}
WriteEvent::Error(_) => last_pending = None,
_ => {}
let mut last_pending = None;

let r = observe_read(r, move |e| {
let start = match last_pending {
None => Instant::now(),
Some(t) => t,
};
match e {
ReadEvent::Pending => last_pending = Some(start),
ReadEvent::Read(n) => {
last_pending = None;
metric.inc_write_bytes(n);
}
metric.inc_write_bytes_cost(start.elapsed().as_millis() as u64);
});
ReadEvent::Error(_) => last_pending = None,
_ => {}
}
metric.inc_write_bytes_cost(start.elapsed().as_millis() as u64);
});

Box::new(w) as BytesWriter
})
self.get_inner()?.write(args, Box::new(r)).await
}

async fn stat(&self, args: &OpStat) -> Result<ObjectMetadata> {
Expand Down
5 changes: 2 additions & 3 deletions src/common/contexts/src/dal/dal_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use opendal::ops::PresignedRequest;
use opendal::Accessor;
use opendal::AccessorMetadata;
use opendal::BytesReader;
use opendal::BytesWriter;
use opendal::DirStreamer;
use opendal::Layer;
use opendal::ObjectMetadata;
Expand Down Expand Up @@ -106,11 +105,11 @@ impl Accessor for DalRuntime {
.expect("join must success")
}

async fn write(&self, args: &OpWrite) -> Result<BytesWriter> {
async fn write(&self, args: &OpWrite, r: BytesReader) -> Result<u64> {
let op = self.get_inner()?;
let args = args.clone();
self.runtime
.spawn(async move { op.write(&args).await })
.spawn(async move { op.write(&args, r).await })
.await
.expect("join must success")
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ anyhow = "1.0.58"
backon = "0.0.2"
globiter = "0.1.0"
once_cell = "1.12.0"
opendal = { version = "0.13.1", features = [
opendal = { version = "0.14.1", features = [
"layers-retry",
"services-http",
"layers-tracing",
Expand Down
2 changes: 1 addition & 1 deletion src/query/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ common-users = { path = "../users" }

async-trait = "0.1.56"
dyn-clone = "1.0.6"
opendal = { version = "0.13.1", features = ["layers-retry"] }
opendal = { version = "0.14.1", features = ["layers-retry"] }
parking_lot = "0.12.1"
2 changes: 1 addition & 1 deletion src/query/config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ common-tracing = { path = "../../common/tracing" }

clap = { version = "3.2.5", features = ["derive", "env"] }
once_cell = "1.12.0"
opendal = { version = "0.13.1", features = ["layers-retry", "compress"], optional = true }
opendal = { version = "0.14.1", features = ["layers-retry", "compress"], optional = true }
semver = "1.0.10"
serde = { version = "1.0.137", features = ["derive"] }
serfig = "0.0.2"
Expand Down
2 changes: 1 addition & 1 deletion src/query/pipeline/sources/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ common-streams = { path = "../../streams" }
async-trait = { version = "0.1.0", package = "async-trait-fn" }
futures = "0.3.21"
futures-util = "0.3.21"
opendal = { version = "0.13.1", features = ["layers-retry", "compress"] }
opendal = { version = "0.14.1", features = ["layers-retry", "compress"] }
parking_lot = "0.12.1"
2 changes: 1 addition & 1 deletion src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ num = "0.4.0"
num_cpus = "1.13.1"
octocrab = "0.16.0"
once_cell = "1.12.0"
opendal = { version = "0.13.1", features = ["layers-retry", "layers-tracing", "layers-metrics", "compress"] }
opendal = { version = "0.14.1", features = ["layers-retry", "layers-tracing", "layers-metrics", "compress"] }
opensrv-mysql = "0.2.0"
openssl = { version = "0.10.40", features = ["vendored"] }
parking_lot = "0.12.1"
Expand Down
3 changes: 1 addition & 2 deletions src/query/service/tests/it/storages/fuse/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use opendal::ops::OpRead;
use opendal::ops::OpWrite;
use opendal::Accessor;
use opendal::BytesReader;
use opendal::BytesWriter;
use opendal::Operator;
use parking_lot::Mutex;
use uuid::Uuid;
Expand Down Expand Up @@ -235,7 +234,7 @@ where
Err(err)
}

async fn write(&self, _args: &OpWrite) -> std::io::Result<BytesWriter> {
async fn write(&self, _args: &OpWrite, _r: BytesReader) -> std::io::Result<u64> {
let v = &mut (*self.err.lock());
let err = v.take_err();
v.increase_write_op_count();
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/fuse/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ backoff = { version = "0.4.0", features = ["futures", "tokio"] }
chrono = "0.4.19"
futures = "0.3.21"
futures-util = "0.3.21"
opendal = { version = "0.13.1", features = ["layers-retry"] }
opendal = { version = "0.14.1", features = ["layers-retry"] }
serde = { version = "1.0.137", features = ["derive"] }
serde_json = "1.0.81"
tracing = "0.1.35"
Expand Down
7 changes: 4 additions & 3 deletions src/query/storages/fuse/src/io/write/meta_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io::Error;

use common_exception::Result;
use opendal::Operator;
use serde::Serialize;
Expand All @@ -22,12 +24,11 @@ use crate::io::retry::Retryable;

pub async fn write_meta<T>(data_accessor: &Operator, location: &str, meta: &T) -> Result<()>
where T: Serialize {
let bytes = &serde_json::to_vec(&meta)?;
let op = || async {
data_accessor.object(location).write(bytes).await?;
let bs = serde_json::to_vec(&meta).map_err(Error::other)?;
data_accessor
.object(location)
.write(bytes)
.write(bs)
.await
.map_err(retry::from_io_error)
};
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#![feature(generic_associated_types)]
#![feature(type_alias_impl_trait)]
#![feature(io_error_other)]
#![deny(unused_crate_dependencies)]

mod constants;
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/hive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ common-storages-util = { path = "../util" }
async-recursion = "1.0.0"
async-trait = "0.1.56"
futures = "0.3.21"
opendal = { version = "0.13.1", features = ["layers-retry"] }
opendal = { version = "0.14.1", features = ["layers-retry"] }
serde = { version = "1.0.137", features = ["derive"] }
thrift = "0.15.0"
tracing = "0.1.35"
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/preludes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ futures-util = "0.3.21"
itertools = "0.10.3"
num_cpus = "1.13.1"
once_cell = "1.12.0"
opendal = { version = "0.13.1", features = ["layers-retry"] }
opendal = { version = "0.14.1", features = ["layers-retry"] }
parking_lot = "0.12.1"
reqwest = "0.11.11"
semver = "1.0.10"
Expand Down
2 changes: 1 addition & 1 deletion src/query/streams/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ serde_json = { version = "1.0.81", default-features = false, features = ["preser
tempfile = "3.3.0"

[dev-dependencies]
opendal = { version = "0.13.1", features = ["layers-retry", "compress"] }
opendal = { version = "0.14.1", features = ["layers-retry", "compress"] }

1 comment on commit 869344f

@vercel
Copy link

@vercel vercel bot commented on 869344f Aug 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend-git-main-databend.vercel.app
databend.rs
databend.vercel.app
databend-databend.vercel.app

Please sign in to comment.