diff --git a/Cargo.lock b/Cargo.lock index 7cb51ae8e895..9ce454688c38 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2110,7 +2110,7 @@ dependencies = [ "bitflags 2.6.0", "cexpr", "clang-sys", - "itertools 0.12.1", + "itertools 0.10.5", "lazy_static", "lazycell", "log", @@ -6055,7 +6055,7 @@ dependencies = [ "moka", "murmur3", "once_cell", - "opendal 0.49.0", + "opendal", "ordered-float 4.1.1", "parquet 52.0.0", "paste", @@ -6113,7 +6113,7 @@ dependencies = [ [[package]] name = "icelake" version = "0.3.141592654" -source = "git+https://github.com/risingwavelabs/icelake.git?rev=db4868f9a5de8ff8f6c04ec4c203bcbe59564cbe#db4868f9a5de8ff8f6c04ec4c203bcbe59564cbe" +source = "git+https://github.com/risingwavelabs/icelake.git?rev=490e5af541edab0e9284ba19ddb56c8a16d1c36b#490e5af541edab0e9284ba19ddb56c8a16d1c36b" dependencies = [ "anyhow", "apache-avro 0.17.0 (git+https://github.com/apache/avro.git)", @@ -6139,7 +6139,7 @@ dependencies = [ "log", "murmur3", "once_cell", - "opendal 0.47.2", + "opendal", "ordered-float 3.9.1", "parquet 52.0.0", "prometheus", @@ -7911,9 +7911,9 @@ checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" [[package]] name = "opendal" -version = "0.47.2" +version = "0.49.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff159a2da374ef2d64848a6547943cf1af7d2ceada5ae77be175e1389aa07ae3" +checksum = "9b04d09b9822c2f75a1d2fc513a2c1279c70e91e7407936fffdf6a6976ec530a" dependencies = [ "anyhow", "async-trait", @@ -7931,42 +7931,12 @@ dependencies = [ "once_cell", "percent-encoding", "prometheus", - "quick-xml 0.31.0", - "reqsign 0.15.2", - "reqwest 0.12.4", - "serde", - "serde_json", - "sha2", - "tokio", - "uuid", -] - -[[package]] -name = "opendal" -version = "0.49.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39d516adf7db912c38af382c3e92c27cd62fbbc240e630920555d784c2ab1494" -dependencies = [ - "anyhow", - "async-trait", - "backon", - "base64 0.22.0", - "bytes", - "chrono", - "crc32c", - "flagset", - "futures", - "getrandom", - "http 1.1.0", - "log", - "md-5", - "once_cell", - "percent-encoding", "quick-xml 0.36.1", - "reqsign 0.16.0", + "reqsign", "reqwest 0.12.4", "serde", "serde_json", + "sha2", "tokio", "uuid", ] @@ -9227,7 +9197,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools 0.12.1", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.66", @@ -9419,7 +9389,7 @@ dependencies = [ "indoc", "libc", "memoffset", - "parking_lot 0.12.1", + "parking_lot 0.11.2", "portable-atomic", "pyo3-build-config", "pyo3-ffi", @@ -9518,16 +9488,6 @@ dependencies = [ "serde", ] -[[package]] -name = "quick-xml" -version = "0.31.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33" -dependencies = [ - "memchr", - "serde", -] - [[package]] name = "quick-xml" version = "0.35.0" @@ -9811,37 +9771,6 @@ dependencies = [ "bytecheck", ] -[[package]] -name = "reqsign" -version = "0.15.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70fe66d4cd0b5ed9b1abbfe639bf6baeaaf509f7da2d51b31111ba945be59286" -dependencies = [ - "anyhow", - "async-trait", - "base64 0.22.0", - "chrono", - "form_urlencoded", - "getrandom", - "hex", - "hmac", - "home", - "http 1.1.0", - "jsonwebtoken", - "log", - "once_cell", - "percent-encoding", - "quick-xml 0.31.0", - "rand", - "reqwest 0.12.4", - "rsa", - "rust-ini", - "serde", - "serde_json", - "sha1", - "sha2", -] - [[package]] name = "reqsign" version = "0.16.0" @@ -9858,11 +9787,14 @@ dependencies = [ "hmac", "home", "http 1.1.0", + "jsonwebtoken", "log", + "once_cell", "percent-encoding", "quick-xml 0.35.0", "rand", "reqwest 0.12.4", + "rsa", "rust-ini", "serde", "serde_json", @@ -10154,7 +10086,7 @@ dependencies = [ "madsim-tokio", "madsim-tonic", "memcomparable", - "opendal 0.47.2", + "opendal", "parking_lot 0.12.1", "parquet 52.0.0", "paste", @@ -10684,7 +10616,7 @@ dependencies = [ "mysql_common", "nexmark", "num-bigint", - "opendal 0.47.2", + "opendal", "openssl", "parking_lot 0.12.1", "parquet 52.0.0", @@ -11445,7 +11377,7 @@ dependencies = [ "madsim", "madsim-aws-sdk-s3", "madsim-tokio", - "opendal 0.47.2", + "opendal", "prometheus", "reqwest 0.12.4", "risingwave_common", diff --git a/Cargo.toml b/Cargo.toml index 456bf4bd1fbd..148c8f64587a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -137,14 +137,14 @@ otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = prost = { version = "0.13" } prost-build = { version = "0.13" } # branch rw_patch -icelake = { git = "https://github.com/risingwavelabs/icelake.git", rev = "db4868f9a5de8ff8f6c04ec4c203bcbe59564cbe", features = [ +icelake = { git = "https://github.com/risingwavelabs/icelake.git", rev = "490e5af541edab0e9284ba19ddb56c8a16d1c36b", features = [ "prometheus", ] } # branch dev iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "84bf51c9d0d5886e4ee306ca4f383f029e1767a4" } iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "84bf51c9d0d5886e4ee306ca4f383f029e1767a4" } iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "84bf51c9d0d5886e4ee306ca4f383f029e1767a4" } -opendal = "0.47" +opendal = "0.49" # used only by arrow-udf-flight arrow-flight = "52" arrow-udf-js = "0.4" diff --git a/src/batch/src/spill/spill_op.rs b/src/batch/src/spill/spill_op.rs index b3e842a269ec..6b3f8a739404 100644 --- a/src/batch/src/spill/spill_op.rs +++ b/src/batch/src/spill/spill_op.rs @@ -61,15 +61,13 @@ impl SpillOp { let op = match spill_backend { SpillBackend::Disk => { - let mut builder = Fs::default(); - builder.root(&root); + let builder = Fs::default().root(&root); Operator::new(builder)? .layer(RetryLayer::default()) .finish() } SpillBackend::Memory => { - let mut builder = Memory::default(); - builder.root(&root); + let builder = Memory::default().root(&root); Operator::new(builder)? .layer(RetryLayer::default()) .finish() @@ -86,8 +84,7 @@ impl SpillOp { std::env::var(RW_BATCH_SPILL_DIR_ENV).unwrap_or_else(|_| DEFAULT_SPILL_DIR.to_string()); let root = format!("/{}/{}/", spill_dir, RW_MANAGED_SPILL_DIR); - let mut builder = Fs::default(); - builder.root(&root); + let builder = Fs::default().root(&root); let op: Operator = Operator::new(builder)? .layer(RetryLayer::default()) diff --git a/src/connector/src/connector_common/iceberg/mock_catalog.rs b/src/connector/src/connector_common/iceberg/mock_catalog.rs index f9d60965b1d3..de598b65fff6 100644 --- a/src/connector/src/connector_common/iceberg/mock_catalog.rs +++ b/src/connector/src/connector_common/iceberg/mock_catalog.rs @@ -32,8 +32,7 @@ impl MockCatalog { fn sparse_table(self: &Arc) -> Table { Table::builder_from_catalog( { - let mut builder = Memory::default(); - builder.root("/tmp"); + let builder = Memory::default().root("/tmp"); Operator::new(builder).unwrap().finish() }, self.clone(), @@ -124,8 +123,7 @@ impl MockCatalog { fn range_table(self: &Arc) -> Table { Table::builder_from_catalog( { - let mut builder = Memory::default(); - builder.root("/tmp"); + let builder = Memory::default().root("/tmp"); Operator::new(builder).unwrap().finish() }, self.clone(), diff --git a/src/connector/src/connector_common/iceberg/storage_catalog.rs b/src/connector/src/connector_common/iceberg/storage_catalog.rs index 18e2ff0e036f..cd7bb2ca4ba0 100644 --- a/src/connector/src/connector_common/iceberg/storage_catalog.rs +++ b/src/connector/src/connector_common/iceberg/storage_catalog.rs @@ -117,16 +117,15 @@ impl StorageCatalog { /// `table_path`: relative path of table dir under warehouse root. async fn list_table_metadata_paths(&self, table_path: &str) -> Result> { // create s3 operator - let mut builder = opendal::services::S3::default(); - builder + let mut builder = opendal::services::S3::default() .root(&self.warehouse) .access_key_id(&self.config.access_key) .secret_access_key(&self.config.secret_key); if let Some(endpoint) = &self.config.endpoint { - builder.endpoint(endpoint); + builder = builder.endpoint(endpoint); } if let Some(region) = &self.config.region { - builder.region(region); + builder = builder.region(region); } let op: Operator = Operator::new(builder) .map_err(|err| Error::new(ErrorKind::Unexpected, err.to_report_string()))? diff --git a/src/connector/src/sink/file_sink/azblob.rs b/src/connector/src/sink/file_sink/azblob.rs index 3a600994639a..182f469902ec 100644 --- a/src/connector/src/sink/file_sink/azblob.rs +++ b/src/connector/src/sink/file_sink/azblob.rs @@ -58,12 +58,12 @@ impl FileSink { pub fn new_azblob_sink(config: AzblobConfig) -> Result { // Create azblob builder. let mut builder = Azblob::default(); - builder.container(&config.common.container_name); - - builder.endpoint(&config.common.endpoint_url); + builder = builder + .container(&config.common.container_name) + .endpoint(&config.common.endpoint_url); if let Some(account_name) = config.common.account_name { - builder.account_name(&account_name); + builder = builder.account_name(&account_name); } else { tracing::warn!( "account_name azblob is not set, container {}", @@ -72,7 +72,7 @@ impl FileSink { } if let Some(account_key) = config.common.account_key { - builder.account_key(&account_key); + builder = builder.account_key(&account_key); } else { tracing::warn!( "account_key azblob is not set, container {}", diff --git a/src/connector/src/sink/file_sink/fs.rs b/src/connector/src/sink/file_sink/fs.rs index 581f66ec7a79..5570bab561f2 100644 --- a/src/connector/src/sink/file_sink/fs.rs +++ b/src/connector/src/sink/file_sink/fs.rs @@ -55,9 +55,7 @@ pub const FS_SINK: &str = "fs"; impl FileSink { pub fn new_fs_sink(config: FsConfig) -> Result { // Create fs builder. - let mut builder = Fs::default(); - // Create fs backend builder. - builder.root(&config.common.path); + let builder = Fs::default().root(&config.common.path); let operator: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) .layer(RetryLayer::default()) diff --git a/src/connector/src/sink/file_sink/gcs.rs b/src/connector/src/sink/file_sink/gcs.rs index c38669909c73..bddf962aa3bd 100644 --- a/src/connector/src/sink/file_sink/gcs.rs +++ b/src/connector/src/sink/file_sink/gcs.rs @@ -67,13 +67,10 @@ pub const GCS_SINK: &str = "gcs"; impl FileSink { pub fn new_gcs_sink(config: GcsConfig) -> Result { // Create gcs builder. - let mut builder = Gcs::default(); - - builder.bucket(&config.common.bucket_name); - - builder.credential(&config.common.credential); - - builder.service_account(&config.common.service_account); + let builder = Gcs::default() + .bucket(&config.common.bucket_name) + .credential(&config.common.credential) + .service_account(&config.common.service_account); let operator: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) diff --git a/src/connector/src/sink/file_sink/s3.rs b/src/connector/src/sink/file_sink/s3.rs index 417094600e61..652057080e9e 100644 --- a/src/connector/src/sink/file_sink/s3.rs +++ b/src/connector/src/sink/file_sink/s3.rs @@ -61,16 +61,16 @@ pub const S3_SINK: &str = "s3"; impl FileSink { pub fn new_s3_sink(config: S3Config) -> Result { // Create s3 builder. - let mut builder = S3::default(); - builder.bucket(&config.common.bucket_name); - builder.region(&config.common.region_name); + let mut builder = S3::default() + .bucket(&config.common.bucket_name) + .region(&config.common.region_name); if let Some(endpoint_url) = config.common.endpoint_url { - builder.endpoint(&endpoint_url); + builder = builder.endpoint(&endpoint_url); } if let Some(access) = config.common.access { - builder.access_key_id(&access); + builder = builder.access_key_id(&access); } else { tracing::error!( "access key id of aws s3 is not set, bucket {}", @@ -79,7 +79,7 @@ impl FileSink { } if let Some(secret) = config.common.secret { - builder.secret_access_key(&secret); + builder = builder.secret_access_key(&secret); } else { tracing::error!( "secret access key of aws s3 is not set, bucket {}", @@ -88,9 +88,9 @@ impl FileSink { } if let Some(assume_role) = config.common.assume_role { - builder.role_arn(&assume_role); + builder = builder.role_arn(&assume_role); } - builder.disable_config_load(); + builder = builder.disable_config_load(); let operator: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) .layer(RetryLayer::default()) diff --git a/src/connector/src/sink/file_sink/webhdfs.rs b/src/connector/src/sink/file_sink/webhdfs.rs index b41a27e12db1..31ed904e9083 100644 --- a/src/connector/src/sink/file_sink/webhdfs.rs +++ b/src/connector/src/sink/file_sink/webhdfs.rs @@ -51,10 +51,9 @@ pub const WEBHDFS_SINK: &str = "webhdfs"; impl FileSink { pub fn new_webhdfs_sink(config: WebhdfsConfig) -> Result { // Create webhdfs backend builder. - let mut builder = Webhdfs::default(); - // Set the name node for hdfs. - builder.endpoint(&config.common.endpoint); - builder.root(&config.common.path); + let builder = Webhdfs::default() + .endpoint(&config.common.endpoint) + .root(&config.common.path); let operator: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) diff --git a/src/connector/src/source/filesystem/opendal_source/azblob_source.rs b/src/connector/src/source/filesystem/opendal_source/azblob_source.rs index 8c6dac01ab87..e2ceae4ecc6b 100644 --- a/src/connector/src/source/filesystem/opendal_source/azblob_source.rs +++ b/src/connector/src/source/filesystem/opendal_source/azblob_source.rs @@ -28,14 +28,12 @@ impl OpendalEnumerator { /// create opendal azblob source. pub fn new_azblob_source(azblob_properties: AzblobProperties) -> ConnectorResult { // Create azblob builder. - let mut builder = Azblob::default(); - - builder.container(&azblob_properties.container_name); - - builder.endpoint(&azblob_properties.endpoint_url); + let mut builder = Azblob::default() + .container(&azblob_properties.container_name) + .endpoint(&azblob_properties.endpoint_url); if let Some(account_name) = azblob_properties.account_name { - builder.account_name(&account_name); + builder = builder.account_name(&account_name); } else { tracing::warn!( "account_name azblob is not set, container {}", @@ -44,7 +42,7 @@ impl OpendalEnumerator { } if let Some(account_key) = azblob_properties.account_key { - builder.account_key(&account_key); + builder = builder.account_key(&account_key); } else { tracing::warn!( "account_key azblob is not set, container {}", diff --git a/src/connector/src/source/filesystem/opendal_source/gcs_source.rs b/src/connector/src/source/filesystem/opendal_source/gcs_source.rs index 9a6d883f3c92..ff05f88ad173 100644 --- a/src/connector/src/source/filesystem/opendal_source/gcs_source.rs +++ b/src/connector/src/source/filesystem/opendal_source/gcs_source.rs @@ -28,22 +28,19 @@ impl OpendalEnumerator { /// create opendal gcs source. pub fn new_gcs_source(gcs_properties: GcsProperties) -> ConnectorResult { // Create gcs builder. - let mut builder = Gcs::default(); - - builder.bucket(&gcs_properties.bucket_name); - + let mut builder = Gcs::default().bucket(&gcs_properties.bucket_name); // if credential env is set, use it. Otherwise, ADC will be used. if let Some(cred) = gcs_properties.credential { - builder.credential(&cred); + builder = builder.credential(&cred); } else { let cred = std::env::var("GOOGLE_APPLICATION_CREDENTIALS"); if let Ok(cred) = cred { - builder.credential(&cred); + builder = builder.credential(&cred); } } if let Some(service_account) = gcs_properties.service_account { - builder.service_account(&service_account); + builder = builder.service_account(&service_account); } let op: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) diff --git a/src/connector/src/source/filesystem/opendal_source/posix_fs_source.rs b/src/connector/src/source/filesystem/opendal_source/posix_fs_source.rs index a7a984da663c..ec0901fec8c6 100644 --- a/src/connector/src/source/filesystem/opendal_source/posix_fs_source.rs +++ b/src/connector/src/source/filesystem/opendal_source/posix_fs_source.rs @@ -31,10 +31,7 @@ impl OpendalEnumerator { /// create opendal posix fs source. pub fn new_posix_fs_source(posix_fs_properties: PosixFsProperties) -> ConnectorResult { // Create Fs builder. - let mut builder = Fs::default(); - - builder.root(&posix_fs_properties.root); - + let builder = Fs::default().root(&posix_fs_properties.root); let op: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) .layer(RetryLayer::default()) diff --git a/src/connector/src/source/filesystem/opendal_source/s3_source.rs b/src/connector/src/source/filesystem/opendal_source/s3_source.rs index 2eb6b7a29250..7230e28dfd4f 100644 --- a/src/connector/src/source/filesystem/opendal_source/s3_source.rs +++ b/src/connector/src/source/filesystem/opendal_source/s3_source.rs @@ -32,16 +32,16 @@ impl OpendalEnumerator { assume_role: Option, ) -> ConnectorResult { // Create s3 builder. - let mut builder = S3::default(); - builder.bucket(&s3_properties.bucket_name); - builder.region(&s3_properties.region_name); + let mut builder = S3::default() + .bucket(&s3_properties.bucket_name) + .region(&s3_properties.region_name); if let Some(endpoint_url) = s3_properties.endpoint_url { - builder.endpoint(&endpoint_url); + builder = builder.endpoint(&endpoint_url); } if let Some(access) = s3_properties.access { - builder.access_key_id(&access); + builder = builder.access_key_id(&access); } else { tracing::error!( "access key id of aws s3 is not set, bucket {}", @@ -50,7 +50,7 @@ impl OpendalEnumerator { } if let Some(secret) = s3_properties.secret { - builder.secret_access_key(&secret); + builder = builder.secret_access_key(&secret); } else { tracing::error!( "secret access key of aws s3 is not set, bucket {}", @@ -59,10 +59,10 @@ impl OpendalEnumerator { } if let Some(assume_role) = assume_role { - builder.role_arn(&assume_role); + builder = builder.role_arn(&assume_role); } - builder.disable_config_load(); + builder = builder.disable_config_load(); let (prefix, matcher) = if let Some(pattern) = s3_properties.match_pattern.as_ref() { let prefix = get_prefix(pattern); let matcher = glob::Pattern::new(pattern) diff --git a/src/connector/src/source/iceberg/parquet_file_reader.rs b/src/connector/src/source/iceberg/parquet_file_reader.rs index 6e323f4aec9b..eb98b2fdad21 100644 --- a/src/connector/src/source/iceberg/parquet_file_reader.rs +++ b/src/connector/src/source/iceberg/parquet_file_reader.rs @@ -106,7 +106,7 @@ pub async fn list_s3_directory( let prefix = format!("s3://{}/", bucket); if dir.starts_with(&prefix) { let mut builder = S3::default(); - builder + builder = builder .region(&s3_region) .access_key_id(&s3_access_key) .secret_access_key(&s3_secret_key) diff --git a/src/object_store/src/object/opendal_engine/azblob.rs b/src/object_store/src/object/opendal_engine/azblob.rs index 24ccacb3c649..9a03f1452145 100644 --- a/src/object_store/src/object/opendal_engine/azblob.rs +++ b/src/object_store/src/object/opendal_engine/azblob.rs @@ -33,14 +33,12 @@ impl OpendalObjectStore { metrics: Arc, ) -> ObjectResult { // Create azblob backend builder. - let mut builder = Azblob::default(); - builder.root(&root); - builder.container(&container_name); + let mut builder = Azblob::default().root(&root).container(&container_name); let endpoint = std::env::var(AZBLOB_ENDPOINT) .unwrap_or_else(|_| panic!("AZBLOB_ENDPOINT not found from environment variables")); - builder.endpoint(&endpoint); + builder = builder.endpoint(&endpoint); let op: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) diff --git a/src/object_store/src/object/opendal_engine/fs.rs b/src/object_store/src/object/opendal_engine/fs.rs index 3792151ff474..f013e5510662 100644 --- a/src/object_store/src/object/opendal_engine/fs.rs +++ b/src/object_store/src/object/opendal_engine/fs.rs @@ -32,11 +32,10 @@ impl OpendalObjectStore { metrics: Arc, ) -> ObjectResult { // Create fs backend builder. - let mut builder = Fs::default(); - builder.root(&root); + let mut builder = Fs::default().root(&root); if config.set_atomic_write_dir { let atomic_write_dir = format!("{}/{}", root, ATOMIC_WRITE_DIR); - builder.atomic_write_dir(&atomic_write_dir); + builder = builder.atomic_write_dir(&atomic_write_dir); } let op: Operator = Operator::new(builder)? diff --git a/src/object_store/src/object/opendal_engine/gcs.rs b/src/object_store/src/object/opendal_engine/gcs.rs index ee0d155058dd..c12428e72171 100644 --- a/src/object_store/src/object/opendal_engine/gcs.rs +++ b/src/object_store/src/object/opendal_engine/gcs.rs @@ -32,16 +32,12 @@ impl OpendalObjectStore { metrics: Arc, ) -> ObjectResult { // Create gcs backend builder. - let mut builder = Gcs::default(); - - builder.bucket(&bucket); - - builder.root(&root); + let mut builder = Gcs::default().bucket(&bucket).root(&root); // if credential env is set, use it. Otherwise, ADC will be used. let cred = std::env::var("GOOGLE_APPLICATION_CREDENTIALS"); if let Ok(cred) = cred { - builder.credential(&cred); + builder = builder.credential(&cred); } let op: Operator = Operator::new(builder)? diff --git a/src/object_store/src/object/opendal_engine/obs.rs b/src/object_store/src/object/opendal_engine/obs.rs index 31c86109c820..310742577ae8 100644 --- a/src/object_store/src/object/opendal_engine/obs.rs +++ b/src/object_store/src/object/opendal_engine/obs.rs @@ -32,11 +32,7 @@ impl OpendalObjectStore { metrics: Arc, ) -> ObjectResult { // Create obs backend builder. - let mut builder = Obs::default(); - - builder.bucket(&bucket); - - builder.root(&root); + let mut builder = Obs::default().bucket(&bucket).root(&root); let endpoint = std::env::var("OBS_ENDPOINT") .unwrap_or_else(|_| panic!("OBS_ENDPOINT not found from environment variables")); @@ -46,9 +42,10 @@ impl OpendalObjectStore { panic!("OBS_SECRET_ACCESS_KEY not found from environment variables") }); - builder.endpoint(&endpoint); - builder.access_key_id(&access_key_id); - builder.secret_access_key(&secret_access_key); + builder = builder + .endpoint(&endpoint) + .access_key_id(&access_key_id) + .secret_access_key(&secret_access_key); let op: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index 43629bbf5157..d44b1f745df7 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -33,19 +33,18 @@ impl OpendalObjectStore { metrics: Arc, ) -> ObjectResult { // Create s3 builder. - let mut builder = S3::default(); - builder.bucket(&bucket); + let mut builder = S3::default().bucket(&bucket); // For AWS S3, there is no need to set an endpoint; for other S3 compatible object stores, it is necessary to set this field. if let Ok(endpoint_url) = std::env::var("RW_S3_ENDPOINT") { - builder.endpoint(&endpoint_url); + builder = builder.endpoint(&endpoint_url); } if std::env::var("RW_IS_FORCE_PATH_STYLE").is_err() { - builder.enable_virtual_host_style(); + builder = builder.enable_virtual_host_style(); } let http_client = Self::new_http_client(&config)?; - builder.http_client(http_client); + builder = builder.http_client(http_client); let op: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) @@ -80,17 +79,14 @@ impl OpendalObjectStore { }; let (address, bucket) = rest.split_once('/').unwrap(); - let mut builder = S3::default(); - builder + let builder = S3::default() .bucket(bucket) .region("custom") .access_key_id(access_key_id) .secret_access_key(secret_access_key) - .endpoint(&format!("{}{}", endpoint_prefix, address)); - - builder.disable_config_load(); - let http_client = Self::new_http_client(&config)?; - builder.http_client(http_client); + .endpoint(&format!("{}{}", endpoint_prefix, address)) + .disable_config_load() + .http_client(Self::new_http_client(&config)?); let op: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) .finish(); @@ -128,17 +124,14 @@ impl OpendalObjectStore { aws_region: &str, ) -> ObjectResult { // Create s3 builder with credentials. - let mut builder = S3::default(); - - // set credentials for s3 sink - builder.bucket(bucket); - builder.access_key_id(aws_access_key_id); - builder.secret_access_key(aws_secret_access_key); - builder.region(aws_region); - builder.disable_config_load(); - - let http_client = Self::new_http_client(config.as_ref())?; - builder.http_client(http_client); + let builder = S3::default() + // set credentials for s3 sink + .bucket(bucket) + .access_key_id(aws_access_key_id) + .secret_access_key(aws_secret_access_key) + .region(aws_region) + .disable_config_load() + .http_client(Self::new_http_client(config.as_ref())?); let op: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) diff --git a/src/object_store/src/object/opendal_engine/oss.rs b/src/object_store/src/object/opendal_engine/oss.rs index cc0ecdfb7a94..e58e51fd64d8 100644 --- a/src/object_store/src/object/opendal_engine/oss.rs +++ b/src/object_store/src/object/opendal_engine/oss.rs @@ -32,11 +32,7 @@ impl OpendalObjectStore { metrics: Arc, ) -> ObjectResult { // Create oss backend builder. - let mut builder = Oss::default(); - - builder.bucket(&bucket); - - builder.root(&root); + let mut builder = Oss::default().bucket(&bucket).root(&root); let endpoint = std::env::var("OSS_ENDPOINT") .unwrap_or_else(|_| panic!("OSS_ENDPOINT not found from environment variables")); @@ -46,9 +42,10 @@ impl OpendalObjectStore { panic!("OSS_ACCESS_KEY_SECRET not found from environment variables") }); - builder.endpoint(&endpoint); - builder.access_key_id(&access_key_id); - builder.access_key_secret(&access_key_secret); + builder = builder + .endpoint(&endpoint) + .access_key_id(&access_key_id) + .access_key_secret(&access_key_secret); let op: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) diff --git a/src/object_store/src/object/opendal_engine/webhdfs.rs b/src/object_store/src/object/opendal_engine/webhdfs.rs index b214bcfad2cc..9314a442f8bb 100644 --- a/src/object_store/src/object/opendal_engine/webhdfs.rs +++ b/src/object_store/src/object/opendal_engine/webhdfs.rs @@ -35,13 +35,13 @@ impl OpendalObjectStore { // Create webhdfs backend builder. let mut builder = Webhdfs::default(); // Set the name node for webhdfs. - builder.endpoint(&endpoint); + builder = builder.endpoint(&endpoint); // Set the root for hdfs, all operations will happen under this root. // NOTE: the root must be absolute path. - builder.root(&root); + builder = builder.root(&root); let atomic_write_dir = format!("{}/{}", root, ATOMIC_WRITE_DIR); - builder.atomic_write_dir(&atomic_write_dir); + builder = builder.atomic_write_dir(&atomic_write_dir); let op: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) .finish();