-
-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: slice_pushdown
optimization leading to incorrectly sliced row index on parquet file
#20508
fix: slice_pushdown
optimization leading to incorrectly sliced row index on parquet file
#20508
Conversation
slice_pushdown
optimization leading to incorrectly sliced row index on parquet fileslice_pushdown
optimization leading to incorrectly sliced row index on parquet file
slice_pushdown
optimization leading to incorrectly sliced row index on parquet fileslice_pushdown
optimization leading to incorrectly sliced row index on parquet file
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #20508 +/- ##
=======================================
Coverage 79.04% 79.04%
=======================================
Files 1563 1563
Lines 220798 220817 +19
Branches 2502 2502
=======================================
+ Hits 174521 174545 +24
+ Misses 45703 45698 -5
Partials 574 574 ☔ View full report in Codecov by Sentry. |
I wasn't aware of the It's accounted for it in the latest commit. From what I gather, the number of rows scanned and the number of rows actually read are being conflated, which led to the issue in #20485, and also led to another issue which caused the test pushed in the pull request to fail when async is enabled. The latest commit fixes these issues by making a distinction being the number of rows scanned and the number of rows read. |
@@ -689,15 +689,24 @@ fn rg_to_dfs_par_over_rg( | |||
.sum(); | |||
let slice_end = slice.0 + slice.1; | |||
|
|||
// we distinguish between the number of rows scanned and the number of rows actually |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you clarify how you distinct "scanned" from "read"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe my terminology is not so clear, I'm happy to modify it to make it clearer if it's confusing. In the following, I'll explain what I mean by distinguishing between rows 'scanned' and 'read''
Note: The 'slice' input parameter to rg_to_dfs_par_over_rg()
varies depending on whether slice pushdown is enabled or not. If slice pushdown is enabled, the slice
input parameter actually represents the rows we want to slice, i.e., starting index and number of rows to slice. If slice pushdown is not enabled, the slice
parameter represents all the rows, and the slicing will be performed later on in the physical plan (SliceExec
).
i) When slice_pushdown is not enabled:
We iterate from row_group_start
to row_group_end
, and 'scan' through rg_md.num_rows() on each iteration. As slice pushdown is not enabled, in this case we are going to (later) 'read' all the rows into dataframes, which eventually get concatenated into a single dataframe. This dataframe will get sliced later on in the physical plan (SliceExec
).
ii) When slice_pushdown is enabled:
We iterate from row_group_start
to row_group_end
, and 'scan' through rg_md.num_rows() on each iteration. In this case however, since slice pushdown is enabled, split_slice_at_file()
lets us know that we need to only 'read' the number of rows that overlap with the row groups into dataframes later on.
The number of rows 'scanned' and 'read' can differ in this case, i.e., we may have scanned through 10,000 rows but found that only 10 rows need to be read, as only 10 rows from the slice overlap with a given row group.
On the other hand, the number of rows scanned (rows_scanned
) is needed to correctly set the offset the row index should start from for each row group, when adding the row index column to the dataframe (which is why we use it when setting row_count_start
). I.e., we may have have read only a subset of rows into a dataframe out of all the rows scanned, but we need to account for all the rows that been scanned when specifying the row index offset or it will be incorrect.
iii) When slice_pushdown and async Parquet reader are enabled:
This case is similar to ii), but there is an added complication. That is, row_group_start
and row_group_end
may have been modified due to processing that happens before rg_to_dfs_par_over_rg()
executes (see call to compute_row_group_range()
in BatchedParquetReader.next_batches()
). Specifically, the number of row groups that need to be checked for overlap with the slice in rg_to_dfs_par_over_rg()
may have been reduced by this (pre-)processing.
This occurs because each row group's start/end is compared against the slice start/end (which is essentially also an overlap check) in compute_row_group_range()
, and this can reduce the range of row groups that need to be considered in rg_to_dfs_par_over_rg()
to only those that overlap with the slice.
So, for example, row_count_start
may not equal to 0
in rg_to_dfs_par_over_rg()
. Essentially row groups have been skipped because of the earlier processing. But we need to account for that skipping in the number of rows scanned, or the row index that gets added to the dataframe will be incorrect. We need to sum up file_metadata.row_groups[i]num_rows()
for i in (0..row_group_start)
, to account for rows scanned/skipped due to the earlier processing.
…rows processed from file
…oup_start gt zero when initializing rows_scanned
0c9d80e
to
ca87683
Compare
Refined the comments to make the distinction between rows scanned (for overlap with slice) versus rows that actually get read into a dataframe clearer. It appears a recent change in the main branch that is unrelated to this PR has broken a doctest, which is causing this PR to fail a check. |
Thanks a lot @brifitz |
slice_pushdown
optimization leading to incorrectly sliced row index on parquet fileslice_pushdown
optimization leading to incorrectly sliced row index on parquet file
Fixes #20485
The
row_count_start
values in therg_to_dfs_par_over_rg()
function were not properly reflecting the number of rows that had already been processed when theslice_pushdown
optimization was being used.