diff --git a/crates/polars-core/src/frame/mod.rs b/crates/polars-core/src/frame/mod.rs index d1d80b7e23fc..e984a1a84bbf 100644 --- a/crates/polars-core/src/frame/mod.rs +++ b/crates/polars-core/src/frame/mod.rs @@ -3,7 +3,7 @@ use std::sync::OnceLock; use std::{mem, ops}; use polars_row::ArrayRef; -use polars_schema::schema::ensure_matching_schema_names; +use polars_schema::schema::debug_ensure_matching_schema_names; use polars_utils::itertools::Itertools; use rayon::prelude::*; @@ -1754,9 +1754,7 @@ impl DataFrame { cols: &[PlSmallStr], schema: &Schema, ) -> PolarsResult> { - if cfg!(debug_assertions) { - ensure_matching_schema_names(schema, &self.schema())?; - } + debug_ensure_matching_schema_names(schema, &self.schema())?; cols.iter() .map(|name| { diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index b3b128209708..5f4f381a2e9b 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -366,6 +366,7 @@ fn rg_to_dfs_prefiltered( } let mask_setting = PrefilterMaskSetting::init_from_env(); + let projected_schema = schema.try_project_indices(projection).unwrap(); let dfs: Vec> = POOL.install(move || { // Set partitioned fields to prevent quadratic behavior. @@ -415,7 +416,8 @@ fn rg_to_dfs_prefiltered( // Apply the predicate to the live columns and save the dataframe and the bitmask let md = &file_metadata.row_groups[rg_idx]; - let mut df = unsafe { DataFrame::new_no_checks(md.num_rows(), live_columns) }; + let mut df = + unsafe { DataFrame::new_no_checks(md.num_rows(), live_columns.clone()) }; materialize_hive_partitions( &mut df, @@ -426,6 +428,10 @@ fn rg_to_dfs_prefiltered( let s = predicate.evaluate_io(&df)?; let mask = s.bool().expect("filter predicates was not of type boolean"); + // Create without hive columns - the first merge phase does not handle hive partitions. This also saves + // some unnecessary filtering. + let mut df = unsafe { DataFrame::new_no_checks(md.num_rows(), live_columns) }; + if let Some(rc) = &row_index { df.with_row_index_mut(rc.name.clone(), Some(rg_offsets[rg_idx] + rc.offset)); } @@ -458,6 +464,13 @@ fn rg_to_dfs_prefiltered( // We don't need to do any further work if there are no dead columns if dead_idx_to_col_idx.is_empty() { + materialize_hive_partitions( + &mut df, + schema.as_ref(), + hive_partition_columns, + md.num_rows(), + ); + return Ok(Some(df)); } @@ -541,10 +554,7 @@ fn rg_to_dfs_prefiltered( let height = df.height(); let live_columns = df.take_columns(); - assert_eq!( - live_columns.len() + dead_columns.len(), - projection.len() + hive_partition_columns.map_or(0, |x| x.len()) - ); + assert_eq!(live_columns.len() + dead_columns.len(), projection.len()); let mut merged = Vec::with_capacity(live_columns.len() + dead_columns.len()); @@ -561,13 +571,20 @@ fn rg_to_dfs_prefiltered( hive::merge_sorted_to_schema_order( &mut dead_columns.into_iter(), // df_columns &mut live_columns.into_iter().skip(row_index.is_some() as usize), // hive_columns - schema, + &projected_schema, &mut merged, ); // SAFETY: This is completely based on the schema so all column names are unique // and the length is given by the parquet file which should always be the same. - let df = unsafe { DataFrame::new_no_checks(height, merged) }; + let mut df = unsafe { DataFrame::new_no_checks(height, merged) }; + + materialize_hive_partitions( + &mut df, + schema.as_ref(), + hive_partition_columns, + md.num_rows(), + ); PolarsResult::Ok(Some(df)) }) diff --git a/crates/polars-plan/src/plans/file_scan.rs b/crates/polars-plan/src/plans/file_scan.rs index e868b98d2799..655907e3f666 100644 --- a/crates/polars-plan/src/plans/file_scan.rs +++ b/crates/polars-plan/src/plans/file_scan.rs @@ -168,7 +168,7 @@ impl FileScan { #[cfg(feature = "ipc")] Self::Ipc { .. } => _file_options.row_index.is_some(), #[cfg(feature = "parquet")] - Self::Parquet { .. } => _file_options.row_index.is_some(), + Self::Parquet { .. } => false, #[allow(unreachable_patterns)] _ => false, } diff --git a/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs b/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs index 40273aebceaf..11eb35487df0 100644 --- a/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs +++ b/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs @@ -436,7 +436,7 @@ impl ProjectionPushDown { &acc_projections, expr_arena, &file_info.schema, - scan_type.sort_projection(&file_options) || hive_parts.is_some(), + scan_type.sort_projection(&file_options), )?; hive_parts = if let Some(hive_parts) = hive_parts { @@ -480,10 +480,30 @@ impl ProjectionPushDown { // based on its position in the file. This is extremely important for the // new-streaming engine. + // row_index is separate + let opt_row_index_col_name = file_options + .row_index + .as_ref() + .map(|v| &v.name) + .filter(|v| schema.contains(v)) + .cloned(); + + if let Some(name) = &opt_row_index_col_name { + out.insert_at_index( + 0, + name.clone(), + schema.get(name).unwrap().clone(), + ) + .unwrap(); + } + { let df_fields_iter = &mut schema .iter() - .filter(|fld| !partition_schema.contains(fld.0)) + .filter(|fld| { + !partition_schema.contains(fld.0) + && Some(fld.0) != opt_row_index_col_name.as_ref() + }) .map(|(a, b)| (a.clone(), b.clone())); let hive_fields_iter = &mut partition_schema diff --git a/crates/polars-schema/src/schema.rs b/crates/polars-schema/src/schema.rs index b7a1a57280a1..d29113635de8 100644 --- a/crates/polars-schema/src/schema.rs +++ b/crates/polars-schema/src/schema.rs @@ -401,39 +401,20 @@ where } } -pub fn ensure_matching_schema_names(lhs: &Schema, rhs: &Schema) -> PolarsResult<()> { - let mut iter_lhs = lhs.iter_names(); - let mut iter_rhs = rhs.iter_names(); +pub fn debug_ensure_matching_schema_names(lhs: &Schema, rhs: &Schema) -> PolarsResult<()> { + if cfg!(debug_assertions) { + let lhs = lhs.iter_names().collect::>(); + let rhs = rhs.iter_names().collect::>(); - for i in 0..iter_lhs.len().min(iter_rhs.len()) { - let l = iter_lhs.next().unwrap(); - let r = iter_rhs.next().unwrap(); - - if l != r { + if lhs != rhs { polars_bail!( SchemaMismatch: - "schema names differ at position {}: {} != {}", - 1 + i, l, r + "lhs: {:?} rhs: {:?}", + lhs, rhs ) } } - if let Some(v) = iter_lhs.next() { - polars_bail!( - SchemaMismatch: - "schema contained extra column: {}", - v - ) - } - - if let Some(v) = iter_rhs.next() { - polars_bail!( - SchemaMismatch: - "schema didn't contain column: {}", - v - ) - } - Ok(()) } diff --git a/crates/polars-stream/src/nodes/io_sources/parquet/row_group_decode.rs b/crates/polars-stream/src/nodes/io_sources/parquet/row_group_decode.rs index e2d28fac4907..8e4ddce836a1 100644 --- a/crates/polars-stream/src/nodes/io_sources/parquet/row_group_decode.rs +++ b/crates/polars-stream/src/nodes/io_sources/parquet/row_group_decode.rs @@ -78,7 +78,7 @@ impl RowGroupDecoder { } let mut decoded_cols = Vec::with_capacity(row_group_data.row_group_metadata.n_columns()); - self.decode_all_columns( + self.decode_projected_columns( &mut decoded_cols, &row_group_data, Some(polars_parquet::read::Filter::Range(slice_range.clone())), @@ -217,7 +217,7 @@ impl RowGroupDecoder { /// Potentially parallelizes based on number of rows & columns. Decoded columns are appended to /// `out_vec`. - async fn decode_all_columns( + async fn decode_projected_columns( &self, out_vec: &mut Vec, row_group_data: &Arc, @@ -497,7 +497,7 @@ impl RowGroupDecoder { // for `hive::merge_sorted_to_schema_order`. let mut opt_decode_err = None; - let mut decoded_live_cols_iter = self + let decoded_live_cols_iter = self .predicate_arrow_field_indices .iter() .map(|&i| self.projected_arrow_schema.get_at_index(i).unwrap()) @@ -512,18 +512,13 @@ impl RowGroupDecoder { }, } }); - let mut hive_cols_iter = shared_file_state.hive_series.iter().map(|s| { + let hive_cols_iter = shared_file_state.hive_series.iter().map(|s| { debug_assert!(s.len() >= projection_height); s.slice(0, projection_height) }); - hive::merge_sorted_to_schema_order( - &mut decoded_live_cols_iter, - &mut hive_cols_iter, - &self.reader_schema, - &mut live_columns, - ); - + live_columns.extend(decoded_live_cols_iter); + live_columns.extend(hive_cols_iter); opt_decode_err.transpose()?; if let Some(file_path_series) = &shared_file_state.file_path_series { @@ -531,7 +526,7 @@ impl RowGroupDecoder { live_columns.push(file_path_series.slice(0, projection_height)); } - let live_df = unsafe { + let mut live_df = unsafe { DataFrame::new_no_checks(row_group_data.row_group_metadata.num_rows(), live_columns) }; @@ -542,20 +537,52 @@ impl RowGroupDecoder { .evaluate_io(&live_df)?; let mask = mask.bool().unwrap(); + unsafe { + live_df.get_columns_mut().truncate( + self.row_index.is_some() as usize + self.predicate_arrow_field_indices.len(), + ) + } + let filtered = unsafe { filter_cols(live_df.take_columns(), mask, self.min_values_per_thread) } .await?; - let height = if let Some(fst) = filtered.first() { + let filtered_height = if let Some(fst) = filtered.first() { fst.len() } else { mask.num_trues() }; - let live_df_filtered = unsafe { DataFrame::new_no_checks(height, filtered) }; + let mut live_df_filtered = unsafe { DataFrame::new_no_checks(filtered_height, filtered) }; if self.non_predicate_arrow_field_indices.is_empty() { // User or test may have explicitly requested prefiltering + + hive::merge_sorted_to_schema_order( + unsafe { + &mut live_df_filtered + .get_columns_mut() + .drain(..) + .collect::>() + .into_iter() + }, + &mut shared_file_state + .hive_series + .iter() + .map(|s| s.slice(0, filtered_height)), + &self.reader_schema, + unsafe { live_df_filtered.get_columns_mut() }, + ); + + unsafe { + live_df_filtered.get_columns_mut().extend( + shared_file_state + .file_path_series + .as_ref() + .map(|c| c.slice(0, filtered_height)), + ) + } + return Ok(live_df_filtered); } @@ -621,13 +648,36 @@ impl RowGroupDecoder { &mut live_columns .into_iter() .skip(self.row_index.is_some() as usize), // hive_columns - &self.reader_schema, + &self.projected_arrow_schema, &mut merged, ); opt_decode_err.transpose()?; - let df = unsafe { DataFrame::new_no_checks(expected_num_rows, merged) }; + let mut out = Vec::with_capacity( + merged.len() + + shared_file_state.hive_series.len() + + shared_file_state.file_path_series.is_some() as usize, + ); + + hive::merge_sorted_to_schema_order( + &mut merged.into_iter(), + &mut shared_file_state + .hive_series + .iter() + .map(|s| s.slice(0, filtered_height)), + &self.reader_schema, + &mut out, + ); + + out.extend( + shared_file_state + .file_path_series + .as_ref() + .map(|c| c.slice(0, filtered_height)), + ); + + let df = unsafe { DataFrame::new_no_checks(expected_num_rows, out) }; Ok(df) } } diff --git a/crates/polars-stream/src/physical_plan/lower_expr.rs b/crates/polars-stream/src/physical_plan/lower_expr.rs index 4d75fd98bfe5..7bf04e198c8c 100644 --- a/crates/polars-stream/src/physical_plan/lower_expr.rs +++ b/crates/polars-stream/src/physical_plan/lower_expr.rs @@ -695,7 +695,8 @@ fn build_select_node_with_ctx( if let Some(columns) = all_simple_columns { let input_schema = ctx.phys_sm[input].output_schema.clone(); - if input_schema.len() == columns.len() + if !cfg!(debug_assertions) + && input_schema.len() == columns.len() && input_schema.iter_names().zip(&columns).all(|(l, r)| l == r) { // Input node already has the correct schema, just pass through. diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index f9f60afb209d..4a6384fb5f56 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -503,6 +503,7 @@ def test_hive_partition_force_async_17155(tmp_path: Path, monkeypatch: Any) -> N [ (partial(pl.scan_parquet, parallel="row_groups"), pl.DataFrame.write_parquet), (partial(pl.scan_parquet, parallel="columns"), pl.DataFrame.write_parquet), + (partial(pl.scan_parquet, parallel="prefiltered"), pl.DataFrame.write_parquet), ( lambda *a, **kw: pl.scan_parquet(*a, parallel="prefiltered", **kw).filter( pl.col("b") == pl.col("b") diff --git a/py-polars/tests/unit/io/test_parquet.py b/py-polars/tests/unit/io/test_parquet.py index 3d67e2869da0..b79a8ceff120 100644 --- a/py-polars/tests/unit/io/test_parquet.py +++ b/py-polars/tests/unit/io/test_parquet.py @@ -2588,4 +2588,39 @@ def test_utf8_verification_with_slice_20174() -> None: ) f.seek(0) - pl.scan_parquet(f).head(1).collect() + assert_frame_equal( + pl.scan_parquet(f).head(1).collect(), + pl.Series("s", ["a"]).to_frame(), + ) + + +@pytest.mark.parametrize("parallel", ["prefiltered", "row_groups"]) +@pytest.mark.parametrize( + "projection", + [ + {"a": pl.Int64(), "b": pl.Int64()}, + {"b": pl.Int64(), "a": pl.Int64()}, + ], +) +def test_parquet_prefiltered_unordered_projection_20175( + parallel: str, projection: dict[str, pl.DataType] +) -> None: + df = pl.DataFrame( + [ + pl.Series("a", [0], pl.Int64), + pl.Series("b", [0], pl.Int64), + ] + ) + + f = io.BytesIO() + df.write_parquet(f) + + f.seek(0) + out = ( + pl.scan_parquet(f, parallel=parallel) # type: ignore[arg-type] + .filter(pl.col.a >= 0) + .select(*projection.keys()) + .collect() + ) + + assert out.schema == projection