Skip to content

Commit

Permalink
feat(storage): bump opendal to v0.49.0 (#18266)
Browse files Browse the repository at this point in the history
Co-authored-by: congyi <15605187270@163.com>
  • Loading branch information
chenzl25 and wcy-fdu authored Sep 23, 2024
1 parent d5dbc6a commit 27458d8
Show file tree
Hide file tree
Showing 22 changed files with 101 additions and 209 deletions.
100 changes: 16 additions & 84 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,14 @@ otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev =
prost = { version = "0.13" }
prost-build = { version = "0.13" }
# branch rw_patch
icelake = { git = "https://github.com/risingwavelabs/icelake.git", rev = "db4868f9a5de8ff8f6c04ec4c203bcbe59564cbe", features = [
icelake = { git = "https://github.com/risingwavelabs/icelake.git", rev = "490e5af541edab0e9284ba19ddb56c8a16d1c36b", features = [
"prometheus",
] }
# branch dev
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "84bf51c9d0d5886e4ee306ca4f383f029e1767a4" }
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "84bf51c9d0d5886e4ee306ca4f383f029e1767a4" }
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "84bf51c9d0d5886e4ee306ca4f383f029e1767a4" }
opendal = "0.47"
opendal = "0.49"
# used only by arrow-udf-flight
arrow-flight = "52"
arrow-udf-js = "0.4"
Expand Down
9 changes: 3 additions & 6 deletions src/batch/src/spill/spill_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,13 @@ impl SpillOp {

let op = match spill_backend {
SpillBackend::Disk => {
let mut builder = Fs::default();
builder.root(&root);
let builder = Fs::default().root(&root);
Operator::new(builder)?
.layer(RetryLayer::default())
.finish()
}
SpillBackend::Memory => {
let mut builder = Memory::default();
builder.root(&root);
let builder = Memory::default().root(&root);
Operator::new(builder)?
.layer(RetryLayer::default())
.finish()
Expand All @@ -86,8 +84,7 @@ impl SpillOp {
std::env::var(RW_BATCH_SPILL_DIR_ENV).unwrap_or_else(|_| DEFAULT_SPILL_DIR.to_string());
let root = format!("/{}/{}/", spill_dir, RW_MANAGED_SPILL_DIR);

let mut builder = Fs::default();
builder.root(&root);
let builder = Fs::default().root(&root);

let op: Operator = Operator::new(builder)?
.layer(RetryLayer::default())
Expand Down
6 changes: 2 additions & 4 deletions src/connector/src/connector_common/iceberg/mock_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ impl MockCatalog {
fn sparse_table(self: &Arc<Self>) -> Table {
Table::builder_from_catalog(
{
let mut builder = Memory::default();
builder.root("/tmp");
let builder = Memory::default().root("/tmp");
Operator::new(builder).unwrap().finish()
},
self.clone(),
Expand Down Expand Up @@ -124,8 +123,7 @@ impl MockCatalog {
fn range_table(self: &Arc<Self>) -> Table {
Table::builder_from_catalog(
{
let mut builder = Memory::default();
builder.root("/tmp");
let builder = Memory::default().root("/tmp");
Operator::new(builder).unwrap().finish()
},
self.clone(),
Expand Down
7 changes: 3 additions & 4 deletions src/connector/src/connector_common/iceberg/storage_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,15 @@ impl StorageCatalog {
/// `table_path`: relative path of table dir under warehouse root.
async fn list_table_metadata_paths(&self, table_path: &str) -> Result<Vec<String>> {
// create s3 operator
let mut builder = opendal::services::S3::default();
builder
let mut builder = opendal::services::S3::default()
.root(&self.warehouse)
.access_key_id(&self.config.access_key)
.secret_access_key(&self.config.secret_key);
if let Some(endpoint) = &self.config.endpoint {
builder.endpoint(endpoint);
builder = builder.endpoint(endpoint);
}
if let Some(region) = &self.config.region {
builder.region(region);
builder = builder.region(region);
}
let op: Operator = Operator::new(builder)
.map_err(|err| Error::new(ErrorKind::Unexpected, err.to_report_string()))?
Expand Down
10 changes: 5 additions & 5 deletions src/connector/src/sink/file_sink/azblob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ impl<S: OpendalSinkBackend> FileSink<S> {
pub fn new_azblob_sink(config: AzblobConfig) -> Result<Operator> {
// Create azblob builder.
let mut builder = Azblob::default();
builder.container(&config.common.container_name);

builder.endpoint(&config.common.endpoint_url);
builder = builder
.container(&config.common.container_name)
.endpoint(&config.common.endpoint_url);

if let Some(account_name) = config.common.account_name {
builder.account_name(&account_name);
builder = builder.account_name(&account_name);
} else {
tracing::warn!(
"account_name azblob is not set, container {}",
Expand All @@ -72,7 +72,7 @@ impl<S: OpendalSinkBackend> FileSink<S> {
}

if let Some(account_key) = config.common.account_key {
builder.account_key(&account_key);
builder = builder.account_key(&account_key);
} else {
tracing::warn!(
"account_key azblob is not set, container {}",
Expand Down
4 changes: 1 addition & 3 deletions src/connector/src/sink/file_sink/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ pub const FS_SINK: &str = "fs";
impl<S: OpendalSinkBackend> FileSink<S> {
pub fn new_fs_sink(config: FsConfig) -> Result<Operator> {
// Create fs builder.
let mut builder = Fs::default();
// Create fs backend builder.
builder.root(&config.common.path);
let builder = Fs::default().root(&config.common.path);
let operator: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
Expand Down
11 changes: 4 additions & 7 deletions src/connector/src/sink/file_sink/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,10 @@ pub const GCS_SINK: &str = "gcs";
impl<S: OpendalSinkBackend> FileSink<S> {
pub fn new_gcs_sink(config: GcsConfig) -> Result<Operator> {
// Create gcs builder.
let mut builder = Gcs::default();

builder.bucket(&config.common.bucket_name);

builder.credential(&config.common.credential);

builder.service_account(&config.common.service_account);
let builder = Gcs::default()
.bucket(&config.common.bucket_name)
.credential(&config.common.credential)
.service_account(&config.common.service_account);

let operator: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
Expand Down
16 changes: 8 additions & 8 deletions src/connector/src/sink/file_sink/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,16 @@ pub const S3_SINK: &str = "s3";
impl<S: OpendalSinkBackend> FileSink<S> {
pub fn new_s3_sink(config: S3Config) -> Result<Operator> {
// Create s3 builder.
let mut builder = S3::default();
builder.bucket(&config.common.bucket_name);
builder.region(&config.common.region_name);
let mut builder = S3::default()
.bucket(&config.common.bucket_name)
.region(&config.common.region_name);

if let Some(endpoint_url) = config.common.endpoint_url {
builder.endpoint(&endpoint_url);
builder = builder.endpoint(&endpoint_url);
}

if let Some(access) = config.common.access {
builder.access_key_id(&access);
builder = builder.access_key_id(&access);
} else {
tracing::error!(
"access key id of aws s3 is not set, bucket {}",
Expand All @@ -79,7 +79,7 @@ impl<S: OpendalSinkBackend> FileSink<S> {
}

if let Some(secret) = config.common.secret {
builder.secret_access_key(&secret);
builder = builder.secret_access_key(&secret);
} else {
tracing::error!(
"secret access key of aws s3 is not set, bucket {}",
Expand All @@ -88,9 +88,9 @@ impl<S: OpendalSinkBackend> FileSink<S> {
}

if let Some(assume_role) = config.common.assume_role {
builder.role_arn(&assume_role);
builder = builder.role_arn(&assume_role);
}
builder.disable_config_load();
builder = builder.disable_config_load();
let operator: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
Expand Down
Loading

0 comments on commit 27458d8

Please sign in to comment.