Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Properly project unordered column in parquet prefiltered #20189

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::OnceLock;
use std::{mem, ops};

use polars_row::ArrayRef;
use polars_schema::schema::ensure_matching_schema_names;
use polars_schema::schema::debug_ensure_matching_schema_names;
use polars_utils::itertools::Itertools;
use rayon::prelude::*;

Expand Down Expand Up @@ -1754,9 +1754,7 @@ impl DataFrame {
cols: &[PlSmallStr],
schema: &Schema,
) -> PolarsResult<Vec<Column>> {
if cfg!(debug_assertions) {
ensure_matching_schema_names(schema, &self.schema())?;
}
debug_ensure_matching_schema_names(schema, &self.schema())?;

cols.iter()
.map(|name| {
Expand Down
31 changes: 24 additions & 7 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ fn rg_to_dfs_prefiltered(
}

let mask_setting = PrefilterMaskSetting::init_from_env();
let projected_schema = schema.try_project_indices(projection).unwrap();

let dfs: Vec<Option<DataFrame>> = POOL.install(move || {
// Set partitioned fields to prevent quadratic behavior.
Expand Down Expand Up @@ -415,7 +416,8 @@ fn rg_to_dfs_prefiltered(

// Apply the predicate to the live columns and save the dataframe and the bitmask
let md = &file_metadata.row_groups[rg_idx];
let mut df = unsafe { DataFrame::new_no_checks(md.num_rows(), live_columns) };
let mut df =
unsafe { DataFrame::new_no_checks(md.num_rows(), live_columns.clone()) };

materialize_hive_partitions(
&mut df,
Expand All @@ -426,6 +428,10 @@ fn rg_to_dfs_prefiltered(
let s = predicate.evaluate_io(&df)?;
let mask = s.bool().expect("filter predicates was not of type boolean");

// Create without hive columns - the first merge phase does not handle hive partitions. This also saves
// some unnecessary filtering.
let mut df = unsafe { DataFrame::new_no_checks(md.num_rows(), live_columns) };

if let Some(rc) = &row_index {
df.with_row_index_mut(rc.name.clone(), Some(rg_offsets[rg_idx] + rc.offset));
}
Expand Down Expand Up @@ -458,6 +464,13 @@ fn rg_to_dfs_prefiltered(

// We don't need to do any further work if there are no dead columns
if dead_idx_to_col_idx.is_empty() {
materialize_hive_partitions(
&mut df,
schema.as_ref(),
hive_partition_columns,
md.num_rows(),
);

return Ok(Some(df));
}

Expand Down Expand Up @@ -541,10 +554,7 @@ fn rg_to_dfs_prefiltered(
let height = df.height();
let live_columns = df.take_columns();

assert_eq!(
live_columns.len() + dead_columns.len(),
projection.len() + hive_partition_columns.map_or(0, |x| x.len())
);
assert_eq!(live_columns.len() + dead_columns.len(), projection.len());

let mut merged = Vec::with_capacity(live_columns.len() + dead_columns.len());

Expand All @@ -561,13 +571,20 @@ fn rg_to_dfs_prefiltered(
hive::merge_sorted_to_schema_order(
&mut dead_columns.into_iter(), // df_columns
&mut live_columns.into_iter().skip(row_index.is_some() as usize), // hive_columns
schema,
&projected_schema,
&mut merged,
);

// SAFETY: This is completely based on the schema so all column names are unique
// and the length is given by the parquet file which should always be the same.
let df = unsafe { DataFrame::new_no_checks(height, merged) };
let mut df = unsafe { DataFrame::new_no_checks(height, merged) };

materialize_hive_partitions(
&mut df,
schema.as_ref(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a full-projection scan().collect() this materializes hive columns that exist in the file in the position based on the file schema.But in the case where projections were pushed down the resulting columns are actually not properly ordered because the columns in df no longer match schema - this is still fine because we add a Select {} node on top of the scan in projection pushdown to get the correct order.

hive_partition_columns,
md.num_rows(),
);

PolarsResult::Ok(Some(df))
})
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/plans/file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl FileScan {
#[cfg(feature = "ipc")]
Self::Ipc { .. } => _file_options.row_index.is_some(),
#[cfg(feature = "parquet")]
Self::Parquet { .. } => _file_options.row_index.is_some(),
Self::Parquet { .. } => false,
#[allow(unreachable_patterns)]
_ => false,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ impl ProjectionPushDown {
&acc_projections,
expr_arena,
&file_info.schema,
scan_type.sort_projection(&file_options) || hive_parts.is_some(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert sorting projections in the projection pushdown optimizer from 29373d1

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should just project unsorted columns in the optimizer. That makes IO code a lot simpler.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed this for Parquet. I've left the other scans as they may rely on sorted projections.

scan_type.sort_projection(&file_options),
)?;

hive_parts = if let Some(hive_parts) = hive_parts {
Expand Down Expand Up @@ -480,10 +480,30 @@ impl ProjectionPushDown {
// based on its position in the file. This is extremely important for the
// new-streaming engine.

// row_index is separate
let opt_row_index_col_name = file_options
.row_index
.as_ref()
.map(|v| &v.name)
.filter(|v| schema.contains(v))
.cloned();

if let Some(name) = &opt_row_index_col_name {
out.insert_at_index(
0,
name.clone(),
schema.get(name).unwrap().clone(),
)
.unwrap();
}

{
let df_fields_iter = &mut schema
.iter()
.filter(|fld| !partition_schema.contains(fld.0))
.filter(|fld| {
!partition_schema.contains(fld.0)
&& Some(fld.0) != opt_row_index_col_name.as_ref()
})
.map(|(a, b)| (a.clone(), b.clone()));

let hive_fields_iter = &mut partition_schema
Expand Down
33 changes: 7 additions & 26 deletions crates/polars-schema/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,39 +401,20 @@ where
}
}

pub fn ensure_matching_schema_names<D>(lhs: &Schema<D>, rhs: &Schema<D>) -> PolarsResult<()> {
let mut iter_lhs = lhs.iter_names();
let mut iter_rhs = rhs.iter_names();
pub fn debug_ensure_matching_schema_names<D>(lhs: &Schema<D>, rhs: &Schema<D>) -> PolarsResult<()> {
if cfg!(debug_assertions) {
let lhs = lhs.iter_names().collect::<Vec<_>>();
let rhs = rhs.iter_names().collect::<Vec<_>>();

for i in 0..iter_lhs.len().min(iter_rhs.len()) {
let l = iter_lhs.next().unwrap();
let r = iter_rhs.next().unwrap();

if l != r {
if lhs != rhs {
polars_bail!(
SchemaMismatch:
"schema names differ at position {}: {} != {}",
1 + i, l, r
"lhs: {:?} rhs: {:?}",
lhs, rhs
)
}
}

if let Some(v) = iter_lhs.next() {
polars_bail!(
SchemaMismatch:
"schema contained extra column: {}",
v
)
}

if let Some(v) = iter_rhs.next() {
polars_bail!(
SchemaMismatch:
"schema didn't contain column: {}",
v
)
}

Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl RowGroupDecoder {
}

let mut decoded_cols = Vec::with_capacity(row_group_data.row_group_metadata.n_columns());
self.decode_all_columns(
self.decode_projected_columns(
&mut decoded_cols,
&row_group_data,
Some(polars_parquet::read::Filter::Range(slice_range.clone())),
Expand Down Expand Up @@ -217,7 +217,7 @@ impl RowGroupDecoder {

/// Potentially parallelizes based on number of rows & columns. Decoded columns are appended to
/// `out_vec`.
async fn decode_all_columns(
async fn decode_projected_columns(
&self,
out_vec: &mut Vec<Column>,
row_group_data: &Arc<RowGroupData>,
Expand Down Expand Up @@ -497,7 +497,7 @@ impl RowGroupDecoder {
// for `hive::merge_sorted_to_schema_order`.
let mut opt_decode_err = None;

let mut decoded_live_cols_iter = self
let decoded_live_cols_iter = self
.predicate_arrow_field_indices
.iter()
.map(|&i| self.projected_arrow_schema.get_at_index(i).unwrap())
Expand All @@ -512,26 +512,21 @@ impl RowGroupDecoder {
},
}
});
let mut hive_cols_iter = shared_file_state.hive_series.iter().map(|s| {
let hive_cols_iter = shared_file_state.hive_series.iter().map(|s| {
debug_assert!(s.len() >= projection_height);
s.slice(0, projection_height)
});

hive::merge_sorted_to_schema_order(
&mut decoded_live_cols_iter,
&mut hive_cols_iter,
&self.reader_schema,
&mut live_columns,
);

live_columns.extend(decoded_live_cols_iter);
live_columns.extend(hive_cols_iter);
opt_decode_err.transpose()?;

if let Some(file_path_series) = &shared_file_state.file_path_series {
debug_assert!(file_path_series.len() >= projection_height);
live_columns.push(file_path_series.slice(0, projection_height));
}

let live_df = unsafe {
let mut live_df = unsafe {
DataFrame::new_no_checks(row_group_data.row_group_metadata.num_rows(), live_columns)
};

Expand All @@ -542,6 +537,12 @@ impl RowGroupDecoder {
.evaluate_io(&live_df)?;
let mask = mask.bool().unwrap();

unsafe {
live_df.get_columns_mut().truncate(
self.row_index.is_some() as usize + self.predicate_arrow_field_indices.len(),
)
}

let filtered =
unsafe { filter_cols(live_df.take_columns(), mask, self.min_values_per_thread) }
.await?;
Expand All @@ -552,10 +553,38 @@ impl RowGroupDecoder {
mask.num_trues()
};

let live_df_filtered = unsafe { DataFrame::new_no_checks(height, filtered) };
let mut live_df_filtered = unsafe { DataFrame::new_no_checks(height, filtered) };

let projection_height = height;

if self.non_predicate_arrow_field_indices.is_empty() {
// User or test may have explicitly requested prefiltering

hive::merge_sorted_to_schema_order(
unsafe {
&mut live_df_filtered
.get_columns_mut()
.drain(..)
.collect::<Vec<_>>()
.into_iter()
},
&mut shared_file_state
.hive_series
.iter()
.map(|s| s.slice(0, projection_height)),
&self.reader_schema,
unsafe { live_df_filtered.get_columns_mut() },
);

unsafe {
live_df_filtered.get_columns_mut().extend(
shared_file_state
.file_path_series
.as_ref()
.map(|c| c.slice(0, projection_height)),
)
}

return Ok(live_df_filtered);
}

Expand Down Expand Up @@ -621,13 +650,36 @@ impl RowGroupDecoder {
&mut live_columns
.into_iter()
.skip(self.row_index.is_some() as usize), // hive_columns
&self.reader_schema,
&self.projected_arrow_schema,
&mut merged,
);

opt_decode_err.transpose()?;

let df = unsafe { DataFrame::new_no_checks(expected_num_rows, merged) };
let mut out = Vec::with_capacity(
merged.len()
+ shared_file_state.hive_series.len()
+ shared_file_state.file_path_series.is_some() as usize,
);

hive::merge_sorted_to_schema_order(
&mut merged.into_iter(),
&mut shared_file_state
.hive_series
.iter()
.map(|s| s.slice(0, projection_height)),
&self.reader_schema,
&mut out,
);

out.extend(
shared_file_state
.file_path_series
.as_ref()
.map(|c| c.slice(0, projection_height)),
);

let df = unsafe { DataFrame::new_no_checks(expected_num_rows, out) };
Ok(df)
}
}
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-stream/src/physical_plan/lower_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,8 @@ fn build_select_node_with_ctx(

if let Some(columns) = all_simple_columns {
let input_schema = ctx.phys_sm[input].output_schema.clone();
if input_schema.len() == columns.len()
if !cfg!(debug_assertions)
&& input_schema.len() == columns.len()
&& input_schema.iter_names().zip(&columns).all(|(l, r)| l == r)
{
// Input node already has the correct schema, just pass through.
Expand Down
1 change: 1 addition & 0 deletions py-polars/tests/unit/io/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ def test_hive_partition_force_async_17155(tmp_path: Path, monkeypatch: Any) -> N
[
(partial(pl.scan_parquet, parallel="row_groups"), pl.DataFrame.write_parquet),
(partial(pl.scan_parquet, parallel="columns"), pl.DataFrame.write_parquet),
(partial(pl.scan_parquet, parallel="prefiltered"), pl.DataFrame.write_parquet),
(
lambda *a, **kw: pl.scan_parquet(*a, parallel="prefiltered", **kw).filter(
pl.col("b") == pl.col("b")
Expand Down
Loading
Loading