From 08311b9e082d02fd99ce91775937b8ba32943182 Mon Sep 17 00:00:00 2001 From: Philip Sampaio Date: Tue, 6 Aug 2024 03:21:07 -0300 Subject: [PATCH] fix: Fix `CloudWriter` to use buffer before making requests (#18027) --- crates/polars-io/src/cloud/adaptors.rs | 20 +++++++++++--------- examples/write_parquet_cloud/Cargo.toml | 4 +++- examples/write_parquet_cloud/src/main.rs | 1 - 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/crates/polars-io/src/cloud/adaptors.rs b/crates/polars-io/src/cloud/adaptors.rs index e5ff951c9657..cbce6e22b762 100644 --- a/crates/polars-io/src/cloud/adaptors.rs +++ b/crates/polars-io/src/cloud/adaptors.rs @@ -2,21 +2,23 @@ use std::sync::Arc; +use object_store::buffered::BufWriter; use object_store::path::Path; -use object_store::{MultipartUpload, ObjectStore, PutPayload}; +use object_store::ObjectStore; use polars_error::{to_compute_err, PolarsResult}; +use tokio::io::AsyncWriteExt; use super::CloudOptions; use crate::pl_async::get_runtime; -/// Adaptor which wraps the asynchronous interface of [ObjectStore::put_multipart](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html#tymethod.put_multipart) +/// Adaptor which wraps the interface of [ObjectStore::BufWriter](https://docs.rs/object_store/latest/object_store/buffered/struct.BufWriter.html) /// exposing a synchronous interface which implements `std::io::Write`. /// /// This allows it to be used in sync code which would otherwise write to a simple File or byte stream, /// such as with `polars::prelude::CsvWriter`. pub struct CloudWriter { // Internal writer, constructed at creation - writer: Box, + writer: BufWriter, } impl CloudWriter { @@ -29,7 +31,7 @@ impl CloudWriter { object_store: Arc, path: Path, ) -> PolarsResult { - let writer = object_store.put_multipart(&path).await?; + let writer = BufWriter::new(object_store, path); Ok(CloudWriter { writer }) } @@ -55,28 +57,28 @@ impl std::io::Write for CloudWriter { // async runtime here let buf = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(buf) }; get_runtime().block_on(async { - let res = self.writer.put_part(PutPayload::from_static(buf)).await; + let res = self.writer.write_all(buf).await; if res.is_err() { let _ = self.abort().await; } - Ok(buf.len()) + res.map(|_t| buf.len()) }) } fn flush(&mut self) -> std::io::Result<()> { get_runtime().block_on(async { - let res = self.writer.complete().await; + let res = self.writer.flush().await; if res.is_err() { let _ = self.abort().await; } - Ok(()) + res }) } } impl Drop for CloudWriter { fn drop(&mut self) { - let _ = get_runtime().block_on(self.writer.complete()); + let _ = get_runtime().block_on(self.writer.shutdown()); } } diff --git a/examples/write_parquet_cloud/Cargo.toml b/examples/write_parquet_cloud/Cargo.toml index fe02ad8f8457..1e79b04739b2 100644 --- a/examples/write_parquet_cloud/Cargo.toml +++ b/examples/write_parquet_cloud/Cargo.toml @@ -7,4 +7,6 @@ edition = "2021" [dependencies] aws-creds = "0.36.0" -polars = { path = "../../crates/polars", features = ["lazy", "aws", "parquet", "cloud_write"] } +polars = { path = "../../crates/polars", features = ["lazy", "aws", "parquet", "cloud_write", "streaming"] } + +[workspace] diff --git a/examples/write_parquet_cloud/src/main.rs b/examples/write_parquet_cloud/src/main.rs index 8ed29bc402cf..c928ceb4c765 100644 --- a/examples/write_parquet_cloud/src/main.rs +++ b/examples/write_parquet_cloud/src/main.rs @@ -2,7 +2,6 @@ use awscreds::Credentials; use cloud::AmazonS3ConfigKey as Key; use polars::prelude::*; -// Login to your aws account and then copy the ../datasets/foods1.parquet file to your own bucket. // Adjust the link below. const TEST_S3_LOCATION: &str = "s3://polarstesting/polars_write_example_cloud.parquet";