Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incorrect results due to repartitioning a sorted ParquetExec #8451

Closed
alamb opened this issue Dec 7, 2023 · 5 comments · Fixed by #8517
Closed

Incorrect results due to repartitioning a sorted ParquetExec #8451

alamb opened this issue Dec 7, 2023 · 5 comments · Fixed by #8517
Assignees
Labels
bug Something isn't working

Comments

@alamb
Copy link
Contributor

alamb commented Dec 7, 2023

Describe the bug

We have a case where the EnforceDistribution rule has repatitioned a ParquetExec which parallelized the read (which is good) but that parallelization resulted in destroying the sort order (as it mixes parts of different files together in the same partition). The rest of the plan relies on the output being sorted, and thus since it is no longer sorted we see incorrect results

To Reproduce

The input plan looks like this:

OutputRequirementExec
  ProjectionExec: expr=[tag@1 as tag]
    FilterExec: CAST(field@0 AS Utf8) !=
      ProjectionExec: expr=[field@1 as field, tag@3 as tag]
        DeduplicateExec: [tag@3 ASC,time@2 ASC]
          FilterExec: tag@3 > foo AND time@2 > 2
            ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[__chunk_order, field, time, tag], output_ordering=[tag@3 ASC, time@2 ASC, __chunk_order@0 ASC], ...

The output of EnforceDistirbution looks like this:

2023-12-06T18:40:19.827226Z TRACE datafusion::physical_planner: Optimized physical plan by EnforceDistribution:
OutputRequirementExec
  ProjectionExec: expr=[tag@1 as tag]
    FilterExec: CAST(field@0 AS Utf8) !=
      RepartitionExec: partitioning=RoundRobinBatch(6), input_partitions=1
        ProjectionExec: expr=[field@1 as field, tag@3 as tag]
          DeduplicateExec: [tag@3 ASC,time@2 ASC]
            SortPreservingMergeExec: [tag@3 ASC,time@2 ASC,__chunk_order@0 ASC] <----- This needs the input to be sorted
              FilterExec: tag@3 > foo AND time@2 > 2
                ParquetExec: file_groups={6 groups: [[1.parquet:0..1, 2.parquet:0..16666666], [2.parquet:16666666..33333333], [2.parquet:33333333..50000000], [2.parquet:50000000..66666667], [2.parquet:66666667..83333334], ...]}, ... <---- this file is no longer sorted (as it was repartitioned)

Specifically, the DataFusion planner parallelized the read of the parquet files into multiple partitions and in so doing has destroyed the sort order.

(the 16666666..33333333 annotations mean read that byte range in the file)

This is actually reflected correctly by the ParquetExec (it no longer says "output_ordering" because it is no longer sorted) however, the plan now has a SortPreservingMerge added above it, which implies that the output is sorted, which is incorrect.

Input

ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[__chunk_order, field, time, tag], output_ordering=[tag@3 ASC, time@2 ASC, __chunk_order@0 ASC],....

Output:

ParquetExec: file_groups={6 groups: [[1.parquet:0..1, 2.parquet:0..16666666], [2.parquet:16666666..33333333], [2.parquet:33333333..50000000], [2.parquet:50000000..66666667], [2.parquet:66666667..83333334], ...]}, ...

So things that are wrong:

  1. The output of the scan is no longer sorted but it is being merged using SortPreservingMerge (which avoids the required resort)
  2. It is not right to be repartitioning the sorted input files into multiple partitions in the first place, as that destroys the sort order. There is a config setting that is supposed to control this datafusion.optimizer.prefer_existing_sort and IOx sets it to true:

I am working on a reproducer in DataFusion

Expected behavior

The correct answer should be produced.

I think this means that either:

  1. the ParquetExec should not be repartitioned if it would destroy the sort order,
  2. The parquet exec repartition code should be aware of the repartition and not destroy the sort order

Additional context

We found that setting the config setting datafusion.optimizer.repartition_file_scans and IOx sets to false was a workaround:

@alamb alamb added the bug Something isn't working label Dec 7, 2023
@alamb alamb self-assigned this Dec 7, 2023
@alamb
Copy link
Contributor Author

alamb commented Dec 7, 2023

This could be potentially more subtle as splitting itself doesn't destroy the sort order, what destroys the sort order is if any file group has more than one entry that are not contiguous from the source file (as each entry is effectively appended to the previous one)

In this case, what has happened is that one group has a portion from different files in it, which is what causes the wrong results

@alamb
Copy link
Contributor Author

alamb commented Dec 8, 2023

I have made a reproducer on a branch and I know what is wrong -- I now need to work on the fix https://github.com/alamb/arrow-datafusion/tree/alamb/bad_redistribution

@alamb
Copy link
Contributor Author

alamb commented Dec 11, 2023

Here is a PR that adds tests: #8505

@alamb
Copy link
Contributor Author

alamb commented Dec 11, 2023

Update is that I have a bunch of tests and I understand the issue. I expect to have a PR up with a fix tomorrow

@alamb
Copy link
Contributor Author

alamb commented Dec 12, 2023

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant