-
-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Properly project unordered column in parquet prefiltered #20189
Changes from 7 commits
55050ce
5dfd19a
8395a80
82756c6
35f6440
41ae898
15124be
e2f029e
43a6457
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -436,7 +436,7 @@ impl ProjectionPushDown { | |
&acc_projections, | ||
expr_arena, | ||
&file_info.schema, | ||
scan_type.sort_projection(&file_options) || hive_parts.is_some(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. revert sorting projections in the projection pushdown optimizer from 29373d1 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we should just project unsorted columns in the optimizer. That makes IO code a lot simpler. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have changed this for Parquet. I've left the other scans as they may rely on sorted projections. |
||
scan_type.sort_projection(&file_options), | ||
)?; | ||
|
||
hive_parts = if let Some(hive_parts) = hive_parts { | ||
|
@@ -480,10 +480,30 @@ impl ProjectionPushDown { | |
// based on its position in the file. This is extremely important for the | ||
// new-streaming engine. | ||
|
||
// row_index is separate | ||
let opt_row_index_col_name = file_options | ||
.row_index | ||
.as_ref() | ||
.map(|v| &v.name) | ||
.filter(|v| schema.contains(v)) | ||
.cloned(); | ||
|
||
if let Some(name) = &opt_row_index_col_name { | ||
out.insert_at_index( | ||
0, | ||
name.clone(), | ||
schema.get(name).unwrap().clone(), | ||
) | ||
.unwrap(); | ||
} | ||
|
||
{ | ||
let df_fields_iter = &mut schema | ||
.iter() | ||
.filter(|fld| !partition_schema.contains(fld.0)) | ||
.filter(|fld| { | ||
!partition_schema.contains(fld.0) | ||
&& Some(fld.0) != opt_row_index_col_name.as_ref() | ||
}) | ||
.map(|(a, b)| (a.clone(), b.clone())); | ||
|
||
let hive_fields_iter = &mut partition_schema | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a full-projection
scan().collect()
this materializes hive columns that exist in the file in the position based on the file schema.But in the case where projections were pushed down the resulting columns are actually not properly ordered because the columns indf
no longer matchschema
- this is still fine because we add aSelect {}
node on top of the scan in projection pushdown to get the correct order.