Skip to content

Commit

Permalink
fix: Fix CloudWriter to use buffer before making requests
Browse files Browse the repository at this point in the history
The issue started after the bump of `ObjectStore` to v0.10. Before that,
ObjectStore was doing an internal buffer.

The implementation is using `ObjectStore::BufWriter`, that is going to
perform a "put" request if the size of data is below the "capacity".
Otherwise it is going to do a "put multipart" instead.

Fixes pola-rs#17172
  • Loading branch information
philss committed Aug 2, 2024
1 parent 618a710 commit 7850e30
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 11 deletions.
20 changes: 11 additions & 9 deletions crates/polars-io/src/cloud/adaptors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,23 @@
use std::sync::Arc;

use object_store::buffered::BufWriter;
use object_store::path::Path;
use object_store::{MultipartUpload, ObjectStore, PutPayload};
use object_store::ObjectStore;
use polars_error::{to_compute_err, PolarsResult};
use tokio::io::AsyncWriteExt;

use super::CloudOptions;
use crate::pl_async::get_runtime;

/// Adaptor which wraps the asynchronous interface of [ObjectStore::put_multipart](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html#tymethod.put_multipart)
/// Adaptor which wraps the interface of [ObjectStore::BufWriter](https://docs.rs/object_store/latest/object_store/buffered/struct.BufWriter.html)
/// exposing a synchronous interface which implements `std::io::Write`.
///
/// This allows it to be used in sync code which would otherwise write to a simple File or byte stream,
/// such as with `polars::prelude::CsvWriter`.
pub struct CloudWriter {
// Internal writer, constructed at creation
writer: Box<dyn MultipartUpload>,
writer: BufWriter,
}

impl CloudWriter {
Expand All @@ -29,7 +31,7 @@ impl CloudWriter {
object_store: Arc<dyn ObjectStore>,
path: Path,
) -> PolarsResult<Self> {
let writer = object_store.put_multipart(&path).await?;
let writer = BufWriter::new(object_store, path);
Ok(CloudWriter { writer })
}

Expand All @@ -55,28 +57,28 @@ impl std::io::Write for CloudWriter {
// async runtime here
let buf = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(buf) };
get_runtime().block_on(async {
let res = self.writer.put_part(PutPayload::from_static(buf)).await;
let res = self.writer.write_all(buf).await;
if res.is_err() {
let _ = self.abort().await;
}
Ok(buf.len())
res.map(|_t| buf.len())
})
}

fn flush(&mut self) -> std::io::Result<()> {
get_runtime().block_on(async {
let res = self.writer.complete().await;
let res = self.writer.flush().await;
if res.is_err() {
let _ = self.abort().await;
}
Ok(())
res
})
}
}

impl Drop for CloudWriter {
fn drop(&mut self) {
let _ = get_runtime().block_on(self.writer.complete());
let _ = get_runtime().block_on(self.writer.shutdown());
}
}

Expand Down
4 changes: 3 additions & 1 deletion examples/write_parquet_cloud/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ edition = "2021"

[dependencies]
aws-creds = "0.36.0"
polars = { path = "../../crates/polars", features = ["lazy", "aws", "parquet", "cloud_write"] }
polars = { path = "../../crates/polars", features = ["lazy", "aws", "parquet", "cloud_write", "streaming"] }

[workspace]
1 change: 0 additions & 1 deletion examples/write_parquet_cloud/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use awscreds::Credentials;
use cloud::AmazonS3ConfigKey as Key;
use polars::prelude::*;

// Login to your aws account and then copy the ../datasets/foods1.parquet file to your own bucket.
// Adjust the link below.
const TEST_S3_LOCATION: &str = "s3://polarstesting/polars_write_example_cloud.parquet";

Expand Down

0 comments on commit 7850e30

Please sign in to comment.