Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: slice_pushdown optimization leading to incorrectly sliced row index on parquet file #20508

Merged

Conversation

brifitz
Copy link
Contributor

@brifitz brifitz commented Dec 30, 2024

Fixes #20485

The row_count_start values in the rg_to_dfs_par_over_rg() function were not properly reflecting the number of rows that had already been processed when the slice_pushdown optimization was being used.

@brifitz brifitz changed the title Fix slice_pushdown optimization leading to incorrectly sliced row index on parquet file Fix(rust): slice_pushdown optimization leading to incorrectly sliced row index on parquet file Dec 30, 2024
@brifitz brifitz changed the title Fix(rust): slice_pushdown optimization leading to incorrectly sliced row index on parquet file fix(rust): slice_pushdown optimization leading to incorrectly sliced row index on parquet file Dec 30, 2024
Copy link

codecov bot commented Dec 30, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 79.04%. Comparing base (25fab78) to head (ca87683).
Report is 2 commits behind head on main.

Additional details and impacted files
@@           Coverage Diff           @@
##             main   #20508   +/-   ##
=======================================
  Coverage   79.04%   79.04%           
=======================================
  Files        1563     1563           
  Lines      220798   220817   +19     
  Branches     2502     2502           
=======================================
+ Hits       174521   174545   +24     
+ Misses      45703    45698    -5     
  Partials      574      574           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@brifitz
Copy link
Contributor Author

brifitz commented Dec 30, 2024

I wasn't aware of the POLARS_FORCE_ASYNC flag before opening this pull request.

It's accounted for it in the latest commit.

From what I gather, the number of rows scanned and the number of rows actually read are being conflated, which led to the issue in #20485, and also led to another issue which caused the test pushed in the pull request to fail when async is enabled.

The latest commit fixes these issues by making a distinction being the number of rows scanned and the number of rows read.

@@ -689,15 +689,24 @@ fn rg_to_dfs_par_over_rg(
.sum();
let slice_end = slice.0 + slice.1;

// we distinguish between the number of rows scanned and the number of rows actually
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify how you distinct "scanned" from "read"?

Copy link
Contributor Author

@brifitz brifitz Dec 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe my terminology is not so clear, I'm happy to modify it to make it clearer if it's confusing. In the following, I'll explain what I mean by distinguishing between rows 'scanned' and 'read''

Note: The 'slice' input parameter to rg_to_dfs_par_over_rg() varies depending on whether slice pushdown is enabled or not. If slice pushdown is enabled, the slice input parameter actually represents the rows we want to slice, i.e., starting index and number of rows to slice. If slice pushdown is not enabled, the slice parameter represents all the rows, and the slicing will be performed later on in the physical plan (SliceExec).

i) When slice_pushdown is not enabled:
We iterate from row_group_start to row_group_end, and 'scan' through rg_md.num_rows() on each iteration. As slice pushdown is not enabled, in this case we are going to (later) 'read' all the rows into dataframes, which eventually get concatenated into a single dataframe. This dataframe will get sliced later on in the physical plan (SliceExec).

ii) When slice_pushdown is enabled:
We iterate from row_group_start to row_group_end, and 'scan' through rg_md.num_rows() on each iteration. In this case however, since slice pushdown is enabled, split_slice_at_file() lets us know that we need to only 'read' the number of rows that overlap with the row groups into dataframes later on.

The number of rows 'scanned' and 'read' can differ in this case, i.e., we may have scanned through 10,000 rows but found that only 10 rows need to be read, as only 10 rows from the slice overlap with a given row group.

On the other hand, the number of rows scanned (rows_scanned) is needed to correctly set the offset the row index should start from for each row group, when adding the row index column to the dataframe (which is why we use it when setting row_count_start). I.e., we may have have read only a subset of rows into a dataframe out of all the rows scanned, but we need to account for all the rows that been scanned when specifying the row index offset or it will be incorrect.

iii) When slice_pushdown and async Parquet reader are enabled:
This case is similar to ii), but there is an added complication. That is, row_group_start and row_group_end may have been modified due to processing that happens before rg_to_dfs_par_over_rg() executes (see call to compute_row_group_range() in BatchedParquetReader.next_batches()). Specifically, the number of row groups that need to be checked for overlap with the slice in rg_to_dfs_par_over_rg() may have been reduced by this (pre-)processing.

This occurs because each row group's start/end is compared against the slice start/end (which is essentially also an overlap check) in compute_row_group_range(), and this can reduce the range of row groups that need to be considered in rg_to_dfs_par_over_rg() to only those that overlap with the slice.

So, for example, row_count_start may not equal to 0 in rg_to_dfs_par_over_rg(). Essentially row groups have been skipped because of the earlier processing. But we need to account for that skipping in the number of rows scanned, or the row index that gets added to the dataframe will be incorrect. We need to sum up file_metadata.row_groups[i]num_rows() for i in (0..row_group_start), to account for rows scanned/skipped due to the earlier processing.

@brifitz brifitz force-pushed the fix-slice-pushdown-incorrect-row-index branch from 0c9d80e to ca87683 Compare January 1, 2025 10:58
@brifitz
Copy link
Contributor Author

brifitz commented Jan 1, 2025

Refined the comments to make the distinction between rows scanned (for overlap with slice) versus rows that actually get read into a dataframe clearer.

It appears a recent change in the main branch that is unrelated to this PR has broken a doctest, which is causing this PR to fail a check.

@ritchie46
Copy link
Member

Thanks a lot @brifitz

@ritchie46 ritchie46 merged commit ed37ace into pola-rs:main Jan 2, 2025
25 of 26 checks passed
@ritchie46 ritchie46 changed the title fix(rust): slice_pushdown optimization leading to incorrectly sliced row index on parquet file fix: slice_pushdown optimization leading to incorrectly sliced row index on parquet file Jan 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

slice_pushdown optimization leads to incorrect sliced row index on parquet file
2 participants