Skip to content

Commit

Permalink
[SPARK-38045][SS][TEST] More strict validation on plan check for stre…
Browse files Browse the repository at this point in the history
…am-stream join unit test

### What changes were proposed in this pull request?

This PR is a follow-up of SPARK-35693 to enhance the unit test on stream-stream join to be more strict on plan check.

### Why are the changes needed?

We would like to be more strict on plan check so that requirement of distribution against stream-stream join is fulfilled.

### Does this PR introduce _any_ user-facing change?

No, test only.

### How was this patch tested?

Modified test passed.

Closes #35341 from HeartSaVioR/SPARK-35693-followup.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
HeartSaVioR authored and dongjoon-hyun committed Jan 27, 2022
1 parent 88e8006 commit 5289fad
Showing 1 changed file with 15 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import org.scalatest.BeforeAndAfter

import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.streaming.{MemoryStream, StatefulOperatorStateInfo, StreamingSymmetricHashJoinExec, StreamingSymmetricHashJoinHelper}
import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreProviderId}
Expand Down Expand Up @@ -583,9 +585,21 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite {
CheckAnswer(1.to(1000): _*),
Execute { query =>
// Verify the query plan
def partitionExpressionsColumns(expressions: Seq[Expression]): Seq[String] = {
expressions.flatMap {
case ref: AttributeReference => Some(ref.name)
}
}

val numPartitions = spark.sqlContext.conf.getConf(SQLConf.SHUFFLE_PARTITIONS)

assert(query.lastExecution.executedPlan.collect {
case j @ StreamingSymmetricHashJoinExec(_, _, _, _, _, _, _, _,
_: ShuffleExchangeExec, _: ShuffleExchangeExec) => j
ShuffleExchangeExec(opA: HashPartitioning, _, _),
ShuffleExchangeExec(opB: HashPartitioning, _, _))
if partitionExpressionsColumns(opA.expressions) === Seq("a", "b")
&& partitionExpressionsColumns(opB.expressions) === Seq("a", "b")
&& opA.numPartitions == numPartitions && opB.numPartitions == numPartitions => j
}.size == 1)
})
}
Expand Down

0 comments on commit 5289fad

Please sign in to comment.