Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[SPARK-32767][SQL] Bucket join should work if spark.sql.shuffle.parti…
…tions larger than bucket number ### What changes were proposed in this pull request? Bucket join should work if `spark.sql.shuffle.partitions` larger than bucket number, such as: ```scala spark.range(1000).write.bucketBy(432, "id").saveAsTable("t1") spark.range(1000).write.bucketBy(34, "id").saveAsTable("t2") sql("set spark.sql.shuffle.partitions=600") sql("set spark.sql.autoBroadcastJoinThreshold=-1") sql("select * from t1 join t2 on t1.id = t2.id").explain() ``` Before this pr: ``` == Physical Plan == *(5) SortMergeJoin [id#26L], [id#27L], Inner :- *(2) Sort [id#26L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#26L, 600), true : +- *(1) Filter isnotnull(id#26L) : +- *(1) ColumnarToRow : +- FileScan parquet default.t1[id#26L] Batched: true, DataFilters: [isnotnull(id#26L)], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 432 out of 432 +- *(4) Sort [id#27L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#27L, 600), true +- *(3) Filter isnotnull(id#27L) +- *(3) ColumnarToRow +- FileScan parquet default.t2[id#27L] Batched: true, DataFilters: [isnotnull(id#27L)], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 34 out of 34 ``` After this pr: ``` == Physical Plan == *(4) SortMergeJoin [id#26L], [id#27L], Inner :- *(1) Sort [id#26L ASC NULLS FIRST], false, 0 : +- *(1) Filter isnotnull(id#26L) : +- *(1) ColumnarToRow : +- FileScan parquet default.t1[id#26L] Batched: true, DataFilters: [isnotnull(id#26L)], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 432 out of 432 +- *(3) Sort [id#27L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#27L, 432), true +- *(2) Filter isnotnull(id#27L) +- *(2) ColumnarToRow +- FileScan parquet default.t2[id#27L] Batched: true, DataFilters: [isnotnull(id#27L)], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 34 out of 34 ``` ### Why are the changes needed? Spark 2.4 support this. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #29612 from wangyum/SPARK-32767. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
- Loading branch information