Skip to content

Commit

Permalink
Ensure row_count_start values in rg_to_dfs_par_over_rg() reflect num …
Browse files Browse the repository at this point in the history
…rows processed from file
  • Loading branch information
brifitz committed Dec 30, 2024
1 parent c88911f commit ea97383
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
5 changes: 3 additions & 2 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,10 +692,11 @@ fn rg_to_dfs_par_over_rg(
for i in row_group_start..row_group_end {
let row_count_start = *previous_row_count;
let rg_md = &file_metadata.row_groups[i];
let n_rows_this_file = rg_md.num_rows();
let rg_slice =
split_slice_at_file(&mut n_rows_processed, rg_md.num_rows(), slice.0, slice_end);
split_slice_at_file(&mut n_rows_processed, n_rows_this_file, slice.0, slice_end);
*previous_row_count = previous_row_count
.checked_add(rg_slice.1 as IdxSize)
.checked_add(n_rows_this_file as IdxSize)
.ok_or(ROW_COUNT_OVERFLOW_ERR)?;

if rg_slice.1 == 0 {
Expand Down
28 changes: 28 additions & 0 deletions py-polars/tests/unit/io/test_lazy_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,34 @@ def trim_to_metadata(path: str | Path) -> None:
)


@pytest.mark.write_disk
def test_predicate_slice_pushdown_row_index_20485(tmp_path: Path) -> None:
tmp_path.mkdir(exist_ok=True)

file_path = tmp_path / "slice_pushdown.parquet"
row_group_size = 100000
num_row_groups = 3

df = pl.select(ref=pl.int_range(num_row_groups * row_group_size))
df.write_parquet(file_path, row_group_size=row_group_size)

# Use a slice that starts near the end of one row group and extends into the next
# to test handling of slices that span multiple row groups.
slice_start = 199995
slice_len = 10
ldf = pl.scan_parquet(file_path)
sliced_df = ldf.with_row_index().slice(slice_start, slice_len).collect()
sliced_df_no_pushdown = (
ldf.with_row_index().slice(slice_start, slice_len).collect(slice_pushdown=False)
)

expected_index = list(range(slice_start, slice_start + slice_len))
actual_index = list(sliced_df["index"])
assert actual_index == expected_index

assert_frame_equal(sliced_df, sliced_df_no_pushdown)


@pytest.mark.write_disk
@pytest.mark.parametrize("streaming", [True, False])
def test_parquet_row_groups_shift_bug_18739(tmp_path: Path, streaming: bool) -> None:
Expand Down

0 comments on commit ea97383

Please sign in to comment.