Skip to content

Commit

Permalink
fix(rust, python): Raise errors instead of panicking when sink_csv
Browse files Browse the repository at this point in the history
…fails (#17313)
  • Loading branch information
JamesCE2001 authored Jul 1, 2024
1 parent 71e43b6 commit 26f84cf
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 9 deletions.
17 changes: 9 additions & 8 deletions crates/polars-pipe/src/executors/sinks/output/file_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ pub(super) fn init_writer_thread(
// all chunks per push should be collected to determine in which order they should
// be written
morsels_per_sink: usize,
) -> JoinHandle<()> {
std::thread::spawn(move || {
) -> JoinHandle<PolarsResult<()>> {
std::thread::spawn(move || -> PolarsResult<()> {
// keep chunks around until all chunks per sink are written
// then we write them all at once.
let mut chunks = Vec::with_capacity(morsels_per_sink);
Expand Down Expand Up @@ -51,7 +51,7 @@ pub(super) fn init_writer_thread(
if df.n_chunks() > 1 {
df.as_single_chunk();
}
writer._write_batch(&df).unwrap();
writer._write_batch(&df)?;
}
}
// all chunks are written remove them
Expand All @@ -62,21 +62,22 @@ pub(super) fn init_writer_thread(
if df.n_chunks() > 1 {
df.as_single_chunk();
}
writer._write_batch(&df).unwrap();
writer._write_batch(&df)?;
}
writer._finish().unwrap();
return;
writer._finish()?;
return Ok(());
}
}
}
Ok(())
})
}

// Ensure the data is return in the order it was streamed
#[derive(Clone)]
pub struct FilesSink {
pub(crate) sender: Sender<Option<DataChunk>>,
pub(crate) io_thread_handle: Arc<Option<JoinHandle<()>>>,
pub(crate) io_thread_handle: Arc<Option<JoinHandle<PolarsResult<()>>>>,
}

impl Sink for FilesSink {
Expand Down Expand Up @@ -106,7 +107,7 @@ impl Sink for FilesSink {
.take()
.unwrap()
.join()
.unwrap();
.unwrap()?;

// return a dummy dataframe;
Ok(FinalizedSink::Finished(Default::default()))
Expand Down
4 changes: 3 additions & 1 deletion crates/polars-pipe/src/pipeline/dispatcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,9 @@ impl PipeLine {
) -> PolarsResult<Option<FinalizedSink>> {
let (sink_shared_count, mut reduced_sink) = self.run_pipeline_no_finalize(ec, pipelines)?;
assert_eq!(sink_shared_count, 0);
Ok(reduced_sink.finalize(ec).ok())

let finalized_reduced_sink = reduced_sink.finalize(ec)?;
Ok(Some(finalized_reduced_sink))
}
}

Expand Down
8 changes: 8 additions & 0 deletions py-polars/tests/unit/streaming/test_streaming_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,14 @@ def test_sink_csv_batch_size_zero() -> None:
lf.sink_csv("test.csv", batch_size=0)


def test_sink_csv_nested_data() -> None:
lf = pl.LazyFrame({"list": [[1, 2, 3, 4, 5]]})
with pytest.raises(
pl.exceptions.ComputeError, match="CSV format does not support nested data"
):
lf.sink_csv("path")


def test_scan_csv_only_header_10792(io_files_path: Path) -> None:
foods_file_path = io_files_path / "only_header.csv"
df = pl.scan_csv(foods_file_path).collect(streaming=True)
Expand Down

0 comments on commit 26f84cf

Please sign in to comment.