diff --git a/crates/polars-pipe/src/executors/sinks/output/file_sink.rs b/crates/polars-pipe/src/executors/sinks/output/file_sink.rs index 937d88458ed9..0c0a8c4018fe 100644 --- a/crates/polars-pipe/src/executors/sinks/output/file_sink.rs +++ b/crates/polars-pipe/src/executors/sinks/output/file_sink.rs @@ -22,8 +22,8 @@ pub(super) fn init_writer_thread( // all chunks per push should be collected to determine in which order they should // be written morsels_per_sink: usize, -) -> JoinHandle<()> { - std::thread::spawn(move || { +) -> JoinHandle> { + std::thread::spawn(move || -> PolarsResult<()> { // keep chunks around until all chunks per sink are written // then we write them all at once. let mut chunks = Vec::with_capacity(morsels_per_sink); @@ -51,7 +51,7 @@ pub(super) fn init_writer_thread( if df.n_chunks() > 1 { df.as_single_chunk(); } - writer._write_batch(&df).unwrap(); + writer._write_batch(&df)?; } } // all chunks are written remove them @@ -62,13 +62,14 @@ pub(super) fn init_writer_thread( if df.n_chunks() > 1 { df.as_single_chunk(); } - writer._write_batch(&df).unwrap(); + writer._write_batch(&df)?; } - writer._finish().unwrap(); - return; + writer._finish()?; + return Ok(()); } } } + Ok(()) }) } @@ -76,7 +77,7 @@ pub(super) fn init_writer_thread( #[derive(Clone)] pub struct FilesSink { pub(crate) sender: Sender>, - pub(crate) io_thread_handle: Arc>>, + pub(crate) io_thread_handle: Arc>>>, } impl Sink for FilesSink { @@ -106,7 +107,7 @@ impl Sink for FilesSink { .take() .unwrap() .join() - .unwrap(); + .unwrap()?; // return a dummy dataframe; Ok(FinalizedSink::Finished(Default::default())) diff --git a/crates/polars-pipe/src/pipeline/dispatcher/mod.rs b/crates/polars-pipe/src/pipeline/dispatcher/mod.rs index 216dec376c05..17632a17fc23 100644 --- a/crates/polars-pipe/src/pipeline/dispatcher/mod.rs +++ b/crates/polars-pipe/src/pipeline/dispatcher/mod.rs @@ -304,7 +304,9 @@ impl PipeLine { ) -> PolarsResult> { let (sink_shared_count, mut reduced_sink) = self.run_pipeline_no_finalize(ec, pipelines)?; assert_eq!(sink_shared_count, 0); - Ok(reduced_sink.finalize(ec).ok()) + + let finalized_reduced_sink = reduced_sink.finalize(ec)?; + Ok(Some(finalized_reduced_sink)) } } diff --git a/py-polars/tests/unit/streaming/test_streaming_io.py b/py-polars/tests/unit/streaming/test_streaming_io.py index 1f60cd3c6f3e..d825030d9d50 100644 --- a/py-polars/tests/unit/streaming/test_streaming_io.py +++ b/py-polars/tests/unit/streaming/test_streaming_io.py @@ -194,6 +194,14 @@ def test_sink_csv_batch_size_zero() -> None: lf.sink_csv("test.csv", batch_size=0) +def test_sink_csv_nested_data() -> None: + lf = pl.LazyFrame({"list": [[1, 2, 3, 4, 5]]}) + with pytest.raises( + pl.exceptions.ComputeError, match="CSV format does not support nested data" + ): + lf.sink_csv("path") + + def test_scan_csv_only_header_10792(io_files_path: Path) -> None: foods_file_path = io_files_path / "only_header.csv" df = pl.scan_csv(foods_file_path).collect(streaming=True)