From 514728b8828d1543c5daff140ebe29ade51b63dc Mon Sep 17 00:00:00 2001 From: nameexhaustion Date: Mon, 24 Jun 2024 20:44:08 +1000 Subject: [PATCH] fix: Fix corrupted reads for hive parts from cloud and projection pushdown failure on hive parts (#17152) --- crates/polars-io/src/predicates.rs | 16 ++- .../src/executors/scan/parquet.rs | 46 ++----- .../src/executors/sources/parquet.rs | 4 +- crates/polars-plan/src/plans/hive.rs | 34 ++++- crates/polars-plan/src/plans/ir/mod.rs | 2 +- crates/polars-plan/src/plans/mod.rs | 2 +- .../plans/optimizer/predicate_pushdown/mod.rs | 4 +- .../optimizer/projection_pushdown/mod.rs | 32 ++++- py-polars/tests/unit/io/test_hive.py | 121 ++++++++++++++++-- 9 files changed, 196 insertions(+), 65 deletions(-) diff --git a/crates/polars-io/src/predicates.rs b/crates/polars-io/src/predicates.rs index 2da06f908769..e3cd636f5b94 100644 --- a/crates/polars-io/src/predicates.rs +++ b/crates/polars-io/src/predicates.rs @@ -44,7 +44,7 @@ pub fn apply_predicate( /// - Null count /// - Minimum value /// - Maximum value -#[derive(Debug)] +#[derive(Debug, Clone)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct ColumnStats { field: Field, @@ -91,6 +91,10 @@ impl ColumnStats { } } + pub fn field_name(&self) -> &SmartString { + self.field.name() + } + /// Returns the [`DataType`] of the column. pub fn dtype(&self) -> &DataType { self.field.data_type() @@ -195,7 +199,7 @@ fn use_min_max(dtype: &DataType) -> bool { /// A collection of column stats with a known schema. #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct BatchStats { schema: SchemaRef, stats: Vec, @@ -238,4 +242,12 @@ impl BatchStats { pub fn num_rows(&self) -> Option { self.num_rows } + + pub fn with_schema(&mut self, schema: SchemaRef) { + self.schema = schema; + } + + pub fn take_indices(&mut self, indices: &[usize]) { + self.stats = indices.iter().map(|&i| self.stats[i].clone()).collect(); + } } diff --git a/crates/polars-mem-engine/src/executors/scan/parquet.rs b/crates/polars-mem-engine/src/executors/scan/parquet.rs index 69a4e2348fcb..30c2a5a1fc27 100644 --- a/crates/polars-mem-engine/src/executors/scan/parquet.rs +++ b/crates/polars-mem-engine/src/executors/scan/parquet.rs @@ -7,7 +7,6 @@ use polars_core::config::{get_file_prefetch_size, verbose}; use polars_core::utils::accumulate_dataframes_vertical; use polars_io::cloud::CloudOptions; use polars_io::parquet::metadata::FileMetaDataRef; -use polars_io::parquet::read::materialize_empty_df; use polars_io::utils::is_cloud_url; use polars_io::RowIndex; @@ -16,7 +15,7 @@ use super::*; pub struct ParquetExec { paths: Arc<[PathBuf]>, file_info: FileInfo, - hive_parts: Option>>, + hive_parts: Option>, predicate: Option>, options: ParquetOptions, #[allow(dead_code)] @@ -31,7 +30,7 @@ impl ParquetExec { pub(crate) fn new( paths: Arc<[PathBuf]>, file_info: FileInfo, - hive_parts: Option>>, + hive_parts: Option>, predicate: Option>, options: ParquetOptions, cloud_options: Option, @@ -186,7 +185,12 @@ impl ParquetExec { let mut remaining_rows_to_read = self.file_options.n_rows.unwrap_or(usize::MAX); let mut base_row_index = self.file_options.row_index.take(); let mut processed = 0; - for (batch_idx, paths) in self.paths.chunks(batch_size).enumerate() { + + for batch_start in (0..self.paths.len()).step_by(batch_size) { + let end = std::cmp::min(batch_start.saturating_add(batch_size), self.paths.len()); + let paths = &self.paths[batch_start..end]; + let hive_parts = self.hive_parts.as_ref().map(|x| &x[batch_start..end]); + if remaining_rows_to_read == 0 && !result.is_empty() { return Ok(result); } @@ -201,7 +205,7 @@ impl ParquetExec { // First initialize the readers and get the metadata concurrently. let iter = paths.iter().enumerate().map(|(i, path)| async move { - let first_file = batch_idx == 0 && i == 0; + let first_file = batch_start == 0 && i == 0; // use the cached one as this saves a cloud call let (metadata, schema) = if first_file { (first_metadata.clone(), Some((*first_schema).clone())) @@ -255,8 +259,7 @@ impl ParquetExec { let iter = readers_and_metadata.into_iter().enumerate().map( |(i, (num_rows_this_file, reader))| { let (remaining_rows_to_read, cumulative_read) = &rows_statistics[i]; - let hive_partitions = self - .hive_parts + let hive_partitions = hive_parts .as_ref() .map(|x| x[i].materialize_partition_columns()); @@ -324,34 +327,7 @@ impl ParquetExec { .and_then(|_| self.predicate.take()) .map(phys_expr_to_io_expr); - let is_cloud = match self.paths.first() { - Some(p) => is_cloud_url(p.as_path()), - None => { - let hive_partitions = self - .hive_parts - .as_ref() - .filter(|x| !x.is_empty()) - .map(|x| x.first().unwrap().materialize_partition_columns()); - let (projection, _) = prepare_scan_args( - None, - &mut self.file_options.with_columns, - &mut self.file_info.schema, - self.file_options.row_index.is_some(), - hive_partitions.as_deref(), - ); - return Ok(materialize_empty_df( - projection.as_deref(), - self.file_info - .reader_schema - .as_ref() - .unwrap() - .as_ref() - .unwrap_left(), - hive_partitions.as_deref(), - self.file_options.row_index.as_ref(), - )); - }, - }; + let is_cloud = is_cloud_url(self.paths.first().unwrap()); let force_async = config::force_async(); let out = if is_cloud || force_async { diff --git a/crates/polars-pipe/src/executors/sources/parquet.rs b/crates/polars-pipe/src/executors/sources/parquet.rs index c4222d07c206..781a38749981 100644 --- a/crates/polars-pipe/src/executors/sources/parquet.rs +++ b/crates/polars-pipe/src/executors/sources/parquet.rs @@ -40,7 +40,7 @@ pub struct ParquetSource { cloud_options: Option, metadata: Option, file_info: FileInfo, - hive_parts: Option>>, + hive_parts: Option>, verbose: bool, run_async: bool, prefetch_size: usize, @@ -192,7 +192,7 @@ impl ParquetSource { metadata: Option, file_options: FileScanOptions, file_info: FileInfo, - hive_parts: Option>>, + hive_parts: Option>, verbose: bool, predicate: Option>, ) -> PolarsResult { diff --git a/crates/polars-plan/src/plans/hive.rs b/crates/polars-plan/src/plans/hive.rs index 505e404fea36..e0c2b0282d71 100644 --- a/crates/polars-plan/src/plans/hive.rs +++ b/crates/polars-plan/src/plans/hive.rs @@ -9,7 +9,7 @@ use polars_io::utils::{BOOLEAN_RE, FLOAT_RE, INTEGER_RE}; use serde::{Deserialize, Serialize}; #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct HivePartitions { /// Single value Series that can be used to run the predicate against. /// They are to be broadcasted if the predicates don't filter them out. @@ -17,6 +17,32 @@ pub struct HivePartitions { } impl HivePartitions { + pub fn get_projection_schema_and_indices>( + &self, + names: &[T], + ) -> (SchemaRef, Vec) { + let names = names.iter().map(T::as_ref).collect::>(); + let mut out_schema = Schema::with_capacity(self.stats.schema().len()); + let mut out_indices = Vec::with_capacity(self.stats.column_stats().len()); + + for (i, cs) in self.stats.column_stats().iter().enumerate() { + let name = cs.field_name(); + if names.contains(name.as_str()) { + out_indices.push(i); + out_schema + .insert_at_index(out_schema.len(), name.clone(), cs.dtype().clone()) + .unwrap(); + } + } + + (out_schema.into(), out_indices) + } + + pub fn apply_projection(&mut self, new_schema: SchemaRef, column_indices: &[usize]) { + self.stats.with_schema(new_schema); + self.stats.take_indices(column_indices); + } + pub fn get_statistics(&self) -> &BatchStats { &self.stats } @@ -40,7 +66,7 @@ pub fn hive_partitions_from_paths( paths: &[PathBuf], hive_start_idx: usize, schema: Option, -) -> PolarsResult>>> { +) -> PolarsResult>> { let Some(path) = paths.first() else { return Ok(None); }; @@ -131,10 +157,10 @@ pub fn hive_partitions_from_paths( let stats = BatchStats::new(hive_schema.clone(), column_stats, None); - hive_partitions.push(Arc::new(HivePartitions { stats })); + hive_partitions.push(HivePartitions { stats }); } - Ok(Some(hive_partitions)) + Ok(Some(Arc::from(hive_partitions))) } /// Determine the path separator for identifying Hive partitions. diff --git a/crates/polars-plan/src/plans/ir/mod.rs b/crates/polars-plan/src/plans/ir/mod.rs index 815e292e0b17..e4e09954255e 100644 --- a/crates/polars-plan/src/plans/ir/mod.rs +++ b/crates/polars-plan/src/plans/ir/mod.rs @@ -51,7 +51,7 @@ pub enum IR { Scan { paths: Arc<[PathBuf]>, file_info: FileInfo, - hive_parts: Option>>, + hive_parts: Option>, predicate: Option, /// schema of the projected file output_schema: Option, diff --git a/crates/polars-plan/src/plans/mod.rs b/crates/polars-plan/src/plans/mod.rs index ca9acc44cf53..c92f3b8bc48b 100644 --- a/crates/polars-plan/src/plans/mod.rs +++ b/crates/polars-plan/src/plans/mod.rs @@ -81,7 +81,7 @@ pub enum DslPlan { paths: Arc<[PathBuf]>, // Option as this is mostly materialized on the IR phase. file_info: Option, - hive_parts: Option>>, + hive_parts: Option>, predicate: Option, file_options: FileScanOptions, scan_type: FileScan, diff --git a/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs b/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs index 6d07ff86ddf7..42a505a92b80 100644 --- a/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs +++ b/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs @@ -378,7 +378,7 @@ impl<'a> PredicatePushDown<'a> { } scan_type.remove_metadata(); } - if paths.is_empty() { + if new_paths.is_empty() { let schema = output_schema.as_ref().unwrap_or(&file_info.schema); let df = DataFrame::from(schema.as_ref()); @@ -390,7 +390,7 @@ impl<'a> PredicatePushDown<'a> { }); } else { paths = Arc::from(new_paths); - scan_hive_parts = Some(new_hive_parts); + scan_hive_parts = Some(Arc::from(new_hive_parts)); } } } diff --git a/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs b/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs index 288ebe8768e3..e7a65b50ec79 100644 --- a/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs +++ b/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs @@ -395,7 +395,7 @@ impl ProjectionPushDown { Scan { paths, file_info, - hive_parts, + mut hive_parts, scan_type, predicate, mut file_options, @@ -422,19 +422,39 @@ impl ProjectionPushDown { file_options.row_index.as_ref(), ); - output_schema = if file_options.with_columns.is_none() { - None - } else { + output_schema = if let Some(ref with_columns) = file_options.with_columns { let mut schema = update_scan_schema( &acc_projections, expr_arena, &file_info.schema, scan_type.sort_projection(&file_options), )?; + + hive_parts = if let Some(hive_parts) = hive_parts { + let (new_schema, projected_indices) = hive_parts[0] + .get_projection_schema_and_indices(with_columns.as_ref()); + + Some( + hive_parts + .iter() + .cloned() + .map(|mut hp| { + hp.apply_projection( + new_schema.clone(), + projected_indices.as_ref(), + ); + hp + }) + .collect::>(), + ) + } else { + hive_parts + }; + // Hive partitions are created AFTER the projection, so the output // schema is incorrect. Here we ensure the columns that are projected and hive // parts are added at the proper place in the schema, which is at the end. - if let Some(ref hive_parts) = hive_parts { + if let Some(ref mut hive_parts) = hive_parts { let partition_schema = hive_parts.first().unwrap().schema(); for (name, _) in partition_schema.iter() { @@ -444,6 +464,8 @@ impl ProjectionPushDown { } } Some(Arc::new(schema)) + } else { + None }; } diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index af870de093ea..78ac01336f18 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -1,6 +1,8 @@ +import os import warnings from collections import OrderedDict from functools import partial +from multiprocessing import get_context from pathlib import Path from typing import Any, Callable @@ -12,13 +14,11 @@ from polars.testing import assert_frame_equal, assert_series_equal -@pytest.mark.skip( - reason="Broken by pyarrow 15 release: https://github.com/pola-rs/polars/issues/13892" -) -@pytest.mark.xdist_group("streaming") -@pytest.mark.write_disk() -def test_hive_partitioned_predicate_pushdown( - io_files_path: Path, tmp_path: Path, monkeypatch: Any, capfd: Any +def impl_test_hive_partitioned_predicate_pushdown( + io_files_path: Path, + tmp_path: Path, + monkeypatch: Any, + capfd: Any, ) -> None: monkeypatch.setenv("POLARS_VERBOSE", "1") df = pl.read_ipc(io_files_path / "*.ipc") @@ -72,6 +72,79 @@ def test_hive_partitioned_predicate_pushdown( ) +@pytest.mark.xdist_group("streaming") +@pytest.mark.write_disk() +def test_hive_partitioned_predicate_pushdown( + io_files_path: Path, + tmp_path: Path, + monkeypatch: Any, + capfd: Any, +) -> None: + impl_test_hive_partitioned_predicate_pushdown( + io_files_path, + tmp_path, + monkeypatch, + capfd, + ) + + +def init_env_spawned_single_threaded_async() -> None: + os.environ["SPAWNED_PROCESS"] = "1" + os.environ["POLARS_MAX_THREADS"] = "1" + os.environ["POLARS_PREFETCH_SIZE"] = "1" + + +@pytest.mark.xdist_group("streaming") +@pytest.mark.write_disk() +def test_hive_partitioned_predicate_pushdown_single_threaded_async( + io_files_path: Path, + tmp_path: Path, + monkeypatch: Any, + capfd: Any, +) -> None: + # We need to run this in a separate process to avoid leakage of + # `POLARS_MAX_THREADS`. You can test this locally (on a + # system with > 1 threads) by removing the process-spawning logic and + # directly calling `init_env_spawned_single_threaded_async`, and then + # running: + # ``` + # python -m pytest py-polars/tests/unit/io/ -m '' -k \ + # test_hive_partitioned_predicate_pushdown + # ``` + # And observe that the below assertion of `thread_pool_size` will fail. + if "SPAWNED_PROCESS" not in os.environ: + with get_context("spawn").Pool( + 1, initializer=init_env_spawned_single_threaded_async + ) as p: + pytest_path = Path(__file__).relative_to(Path.cwd()) + pytest_path: str = f"{pytest_path}::test_hive_partitioned_predicate_pushdown_single_threaded_async" # type: ignore[no-redef] + + assert ( + p.map( + pytest.main, # type: ignore[arg-type] + [ + [ + pytest_path, + "-m", + "", + ] + ], + )[0] + == 0 + ) + + return + + assert pl.thread_pool_size() == 1 + + impl_test_hive_partitioned_predicate_pushdown( + io_files_path, + tmp_path, + monkeypatch, + capfd, + ) + + @pytest.mark.write_disk() def test_hive_partitioned_predicate_pushdown_skips_correct_number_of_files( io_files_path: Path, tmp_path: Path, monkeypatch: Any, capfd: Any @@ -102,9 +175,6 @@ def test_hive_partitioned_predicate_pushdown_skips_correct_number_of_files( assert result.to_dict(as_series=False) == expected -@pytest.mark.skip( - reason="Broken by pyarrow 15 release: https://github.com/pola-rs/polars/issues/13892" -) @pytest.mark.xdist_group("streaming") @pytest.mark.write_disk() def test_hive_partitioned_slice_pushdown(io_files_path: Path, tmp_path: Path) -> None: @@ -139,9 +209,6 @@ def test_hive_partitioned_slice_pushdown(io_files_path: Path, tmp_path: Path) -> ] -@pytest.mark.skip( - reason="Broken by pyarrow 15 release: https://github.com/pola-rs/polars/issues/13892" -) @pytest.mark.xdist_group("streaming") @pytest.mark.write_disk() def test_hive_partitioned_projection_pushdown( @@ -443,3 +510,31 @@ def test_hive_partition_schema_inference(tmp_path: Path) -> None: out = pl.scan_parquet(tmp_path).collect() assert_series_equal(out["a"], expected[i]) + + +@pytest.mark.write_disk() +def test_hive_partition_force_async_17155(tmp_path: Path, monkeypatch: Any) -> None: + monkeypatch.setenv("POLARS_FORCE_ASYNC", "1") + monkeypatch.setenv("POLARS_PREFETCH_SIZE", "1") + + dfs = [ + pl.DataFrame({"x": 1}), + pl.DataFrame({"x": 2}), + pl.DataFrame({"x": 3}), + ] + + paths = [ + tmp_path / "a=1/b=1/data.bin", + tmp_path / "a=2/b=2/data.bin", + tmp_path / "a=3/b=3/data.bin", + ] + + for i in range(3): + paths[i].parent.mkdir(exist_ok=True, parents=True) + dfs[i].write_parquet(paths[i]) + + lf = pl.scan_parquet(tmp_path) + + assert_frame_equal( + lf.collect(), pl.DataFrame({k: [1, 2, 3] for k in ["x", "a", "b"]}) + )