Skip to content

Commit

Permalink
Fallback to Spark for offseted cases
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Feb 23, 2024
1 parent 39aba19 commit 3ee53ad
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,8 @@ class CometSparkSessionExtensions
case s: TakeOrderedAndProjectExec
if isCometNative(s.child) && isCometOperatorEnabled(conf, "takeOrderedAndProjectExec")
&& isCometShuffleEnabled(conf) &&
CometTakeOrderedAndProjectExec.isSupported(s.projectList, s.sortOrder, s.child) =>
CometTakeOrderedAndProjectExec.isSupported(s.projectList, s.sortOrder, s.child) &&
s.offset == 0 =>
// TODO: support offset for Spark 3.4
QueryPlanSerde.operator2Proto(s) match {
case Some(nativeOp) =>
Expand Down
18 changes: 18 additions & 0 deletions spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,24 @@ class CometExecSuite extends CometTestBase {
}
})
}

test("Fallback to Spark for TakeOrderedAndProjectExec with offset") {
Seq("true", "false").foreach(aqeEnabled =>
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) {
withTable("t1") {
val numRows = 10
spark
.range(numRows)
.selectExpr("if (id % 2 = 0, null, id) AS a", s"$numRows - id AS b")
.repartition(3) // Force repartition to test data will come to single partition
.write
.saveAsTable("t1")

val df = sql("SELECT * FROM t1 ORDER BY a, b LIMIT 3").offset(1).groupBy($"a").sum("b")
checkSparkAnswer(df)
}
})
}
}

case class BucketedTableTestSpec(
Expand Down

0 comments on commit 3ee53ad

Please sign in to comment.