Skip to content

Commit

Permalink
Update diff
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Apr 23, 2024
1 parent 099fcdf commit 9767acc
Showing 1 changed file with 36 additions and 0 deletions.
36 changes: 36 additions & 0 deletions dev/diffs/3.4.2.diff
Original file line number Diff line number Diff line change
Expand Up @@ -1570,6 +1570,42 @@ index 2a2a83d35e1..e3b7b290b3e 100644
val initialStateDS = Seq(("keyInStateAndData", new RunningCount(1))).toDS()
val initialState: KeyValueGroupedDataset[String, RunningCount] =
initialStateDS.groupByKey(_._1).mapValues(_._2)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index ef5b8a769fe..84fe1bfabc9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.logical.{Range, RepartitionByExpression}
import org.apache.spark.sql.catalyst.streaming.{InternalOutputModes, StreamingRelationV2}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.comet.CometLocalLimitExec
import org.apache.spark.sql.execution.{LocalLimitExec, SimpleMode, SparkPlan}
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.streaming._
@@ -1103,11 +1104,12 @@ class StreamSuite extends StreamTest {
val localLimits = execPlan.collect {
case l: LocalLimitExec => l
case l: StreamingLocalLimitExec => l
+ case l: CometLocalLimitExec => l
}

require(
localLimits.size == 1,
- s"Cant verify local limit optimization with this plan:\n$execPlan")
+ s"Cant verify local limit optimization ${localLimits.size} with this plan:\n$execPlan")

if (expectStreamingLimit) {
assert(
@@ -1115,7 +1117,8 @@ class StreamSuite extends StreamTest {
s"Local limit was not StreamingLocalLimitExec:\n$execPlan")
} else {
assert(
- localLimits.head.isInstanceOf[LocalLimitExec],
+ localLimits.head.isInstanceOf[LocalLimitExec] ||
+ localLimits.head.isInstanceOf[CometLocalLimitExec],
s"Local limit was not LocalLimitExec:\n$execPlan")
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
index abe606ad9c1..2d930b64cca 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
Expand Down

0 comments on commit 9767acc

Please sign in to comment.