diff --git a/Cargo.lock b/Cargo.lock index 8a690d6cfb9e..c09d03a98253 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -330,7 +330,7 @@ dependencies = [ "fastrand", "hex", "http 0.2.12", - "hyper", + "hyper 0.14.28", "ring", "time", "tokio", @@ -601,16 +601,16 @@ dependencies = [ "aws-smithy-types", "bytes", "fastrand", - "h2", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "http-body 1.0.0", - "hyper", - "hyper-rustls", + "hyper 0.14.28", + "hyper-rustls 0.24.2", "once_cell", "pin-project-lite", "pin-utils", - "rustls", + "rustls 0.21.12", "tokio", "tracing", ] @@ -1663,6 +1663,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "h2" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa82e28a107a8cc405f0839610bdc9b15f1e25ec7d696aa5cf173edbcb1486ab" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http 1.1.0", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "half" version = "2.4.1" @@ -1828,7 +1847,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "httparse", @@ -1842,6 +1861,26 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.5", + "http 1.1.0", + "http-body 1.0.0", + "httparse", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", + "want", +] + [[package]] name = "hyper-rustls" version = "0.24.2" @@ -1850,12 +1889,49 @@ checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", "http 0.2.12", - "hyper", + "hyper 0.14.28", "log", - "rustls", - "rustls-native-certs", + "rustls 0.21.12", + "rustls-native-certs 0.6.3", + "tokio", + "tokio-rustls 0.24.1", +] + +[[package]] +name = "hyper-rustls" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c" +dependencies = [ + "futures-util", + "http 1.1.0", + "hyper 1.3.1", + "hyper-util", + "rustls 0.22.4", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.25.0", + "tower-service", +] + +[[package]] +name = "hyper-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b875924a60b96e5d7b9ae7b066540b1dd1cbd90d1828f54c92e02a283351c56" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "hyper 1.3.1", + "pin-project-lite", + "socket2", "tokio", - "tokio-rustls", + "tower", + "tower-service", + "tracing", ] [[package]] @@ -2597,24 +2673,24 @@ dependencies = [ [[package]] name = "object_store" -version = "0.9.1" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8718f8b65fdf67a45108d1548347d4af7d71fb81ce727bbf9e3b2535e079db3" +checksum = "fbebfd32c213ba1907fa7a9c9138015a8de2b43e30c5aa45b18f7deb46786ad6" dependencies = [ "async-trait", - "base64 0.21.7", + "base64 0.22.1", "bytes", "chrono", "futures", "humantime", - "hyper", + "hyper 1.3.1", "itertools 0.12.1", "md-5", "parking_lot", "percent-encoding", "quick-xml", "rand", - "reqwest", + "reqwest 0.12.4", "ring", "rustls-pemfile 2.1.2", "serde", @@ -2747,6 +2823,26 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pin-project" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.66", +] + [[package]] name = "pin-project-lite" version = "0.2.14" @@ -2970,7 +3066,7 @@ dependencies = [ "chrono", "polars", "rand", - "reqwest", + "reqwest 0.11.27", "tokio", ] @@ -3045,7 +3141,7 @@ dependencies = [ "polars-utils", "rayon", "regex", - "reqwest", + "reqwest 0.11.27", "ryu", "serde", "serde_json", @@ -3724,11 +3820,10 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", - "hyper", - "hyper-rustls", + "hyper 0.14.28", "ipnet", "js-sys", "log", @@ -3736,16 +3831,54 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls", - "rustls-native-certs", - "rustls-pemfile 1.0.4", "serde", "serde_json", "serde_urlencoded", "sync_wrapper", "system-configuration", "tokio", - "tokio-rustls", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg 0.50.0", +] + +[[package]] +name = "reqwest" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "566cafdd92868e0939d3fb961bd0dc25fcfaaed179291093b3d43e6b3150ea10" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures-core", + "futures-util", + "h2 0.4.5", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.3.1", + "hyper-rustls 0.26.0", + "hyper-util", + "ipnet", + "js-sys", + "log", + "mime", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls 0.22.4", + "rustls-native-certs 0.7.0", + "rustls-pemfile 2.1.2", + "rustls-pki-types", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tokio-rustls 0.25.0", "tokio-util", "tower-service", "url", @@ -3753,7 +3886,7 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", - "winreg", + "winreg 0.52.0", ] [[package]] @@ -3830,10 +3963,24 @@ checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" dependencies = [ "log", "ring", - "rustls-webpki", + "rustls-webpki 0.101.7", "sct", ] +[[package]] +name = "rustls" +version = "0.22.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" +dependencies = [ + "log", + "ring", + "rustls-pki-types", + "rustls-webpki 0.102.4", + "subtle", + "zeroize", +] + [[package]] name = "rustls-native-certs" version = "0.6.3" @@ -3846,6 +3993,19 @@ dependencies = [ "security-framework", ] +[[package]] +name = "rustls-native-certs" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792" +dependencies = [ + "openssl-probe", + "rustls-pemfile 2.1.2", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "1.0.4" @@ -3881,6 +4041,17 @@ dependencies = [ "untrusted", ] +[[package]] +name = "rustls-webpki" +version = "0.102.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff448f7e92e913c4b7d4c6d8e4540a1724b319b4152b8aef6d4cf8339712b33e" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", +] + [[package]] name = "rustversion" version = "1.0.17" @@ -4538,7 +4709,18 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls", + "rustls 0.21.12", + "tokio", +] + +[[package]] +name = "tokio-rustls" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" +dependencies = [ + "rustls 0.22.4", + "rustls-pki-types", "tokio", ] @@ -4590,6 +4772,27 @@ dependencies = [ "winnow", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + [[package]] name = "tower-service" version = "0.3.2" @@ -5108,6 +5311,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "winreg" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a277a57398d4bfa075df44f501a17cfdf8542d224f0d36095a2adc7aee4ef0a5" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + [[package]] name = "x11rb" version = "0.13.1" diff --git a/Cargo.toml b/Cargo.toml index 10ad6c852655..98550ca22299 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,7 +58,7 @@ memchr = "2.6" multiversion = "0.7" ndarray = { version = "0.15", default-features = false } num-traits = "0.2" -object_store = { version = "0.9", default-features = false } +object_store = { version = "0.10", default-features = false } once_cell = "1" parking_lot = "0.12" percent-encoding = "2.3" diff --git a/crates/polars-io/src/cloud/adaptors.rs b/crates/polars-io/src/cloud/adaptors.rs index 9bb0c181883e..40a926dd1ff6 100644 --- a/crates/polars-io/src/cloud/adaptors.rs +++ b/crates/polars-io/src/cloud/adaptors.rs @@ -3,9 +3,8 @@ use std::sync::Arc; use object_store::path::Path; -use object_store::{MultipartId, ObjectStore}; +use object_store::{MultipartUpload, ObjectStore, PutPayload}; use polars_error::{to_compute_err, PolarsResult}; -use tokio::io::{AsyncWrite, AsyncWriteExt}; use super::CloudOptions; use crate::pl_async::get_runtime; @@ -16,14 +15,8 @@ use crate::pl_async::get_runtime; /// This allows it to be used in sync code which would otherwise write to a simple File or byte stream, /// such as with `polars::prelude::CsvWriter`. pub struct CloudWriter { - // Hold a reference to the store - object_store: Arc, - // The path in the object_store which we want to write to - path: Path, - // ID of a partially-done upload, used to abort the upload on error - multipart_id: MultipartId, // Internal writer, constructed at creation - writer: Box, + writer: Box, } impl CloudWriter { @@ -36,13 +29,8 @@ impl CloudWriter { object_store: Arc, path: Path, ) -> PolarsResult { - let (multipart_id, writer) = Self::build_writer(&object_store, &path).await?; - Ok(CloudWriter { - object_store, - path, - multipart_id, - writer, - }) + let writer = object_store.put_multipart(&path).await?; + Ok(CloudWriter { writer }) } /// Constructs a new CloudWriter from a path and an optional set of CloudOptions. @@ -55,47 +43,40 @@ impl CloudWriter { Self::new_with_object_store(object_store, cloud_location.prefix.into()).await } - async fn build_writer( - object_store: &Arc, - path: &Path, - ) -> object_store::Result<(MultipartId, Box)> { - let (multipart_id, s3_writer) = object_store.put_multipart(path).await?; - Ok((multipart_id, s3_writer)) - } - - async fn abort(&self) -> PolarsResult<()> { - self.object_store - .abort_multipart(&self.path, &self.multipart_id) - .await - .map_err(to_compute_err) + async fn abort(&mut self) -> PolarsResult<()> { + self.writer.abort().await.map_err(to_compute_err) } } impl std::io::Write for CloudWriter { fn write(&mut self, buf: &[u8]) -> std::io::Result { + // SAFETY: + // We extend the lifetime for the duration of this function. This is safe as well block the + // async runtime here + let buf = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(buf) }; get_runtime().block_on(async { - let res = self.writer.write(buf).await; + let res = self.writer.put_part(PutPayload::from_static(buf)).await; if res.is_err() { let _ = self.abort().await; } - res + Ok(buf.len()) }) } fn flush(&mut self) -> std::io::Result<()> { get_runtime().block_on(async { - let res = self.writer.flush().await; + let res = self.writer.complete().await; if res.is_err() { let _ = self.abort().await; } - res + Ok(()) }) } } impl Drop for CloudWriter { fn drop(&mut self) { - let _ = get_runtime().block_on(self.writer.shutdown()); + let _ = get_runtime().block_on(self.writer.complete()); } }