Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-38045][SS][TEST] More strict validation on plan check for stream-stream join unit test #35341

Closed
wants to merge 1 commit into from

Conversation

HeartSaVioR
Copy link
Contributor

What changes were proposed in this pull request?

This PR is a follow-up of SPARK-35693 to enhance the unit test on stream-stream join to be more strict on plan check.

Why are the changes needed?

We would like to be more strict on plan check so that requirement of distribution against stream-stream join is fulfilled.

Does this PR introduce any user-facing change?

No, test only.

How was this patch tested?

Modified test passed.

@HeartSaVioR
Copy link
Contributor Author

cc. @tdas @zsxwing @viirya @xuanyuanking @cloud-fan Please take a look. Thanks!

@HeartSaVioR
Copy link
Contributor Author

It doesn't fail with existing codebase (with SPARK-35703) since it doesn't read from bucketed source. I'll see whether I can do the same test against bucketed source.

}
}

val numPartitions = spark.sqlContext.conf.getConf(SQLConf.SHUFFLE_PARTITIONS)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is based on the precondition that the number of state partitions is same as the number of shuffle partitions. If the state can maintain its own number of partitions, we will have to change the distribution requirement of stateful operator and hence need to change this.

@HeartSaVioR
Copy link
Contributor Author

I tried with managed table with file source which supports bucketing, but looks like file stream source does not pick up the bucket info even it goes through the managed table, hence the output partitioning of source is unknownpartitioning.

  test("aaa - streaming join should require HashClusteredDistribution from children") {
    withTempDir { dir =>
      val randPostfix = Random.nextInt(100000)
      val tbl1Name = s"tbl1a$randPostfix"
      val tbl2Name = s"tbl2a$randPostfix"

      val batchDf = 1.to(1000).toDF("value")
        .select('value as 'a, 'value * 2 as 'b, 'value * 3 as 'c)
      batchDf.write.bucketBy(7, "a", "b").mode(SaveMode.Overwrite).saveAsTable(tbl1Name)

      val batchDf2 = 1.to(1000).toDF("value")
        .select('value as 'a, 'value * 3 as 'b, 'value * 4 as 'c)
      batchDf2.write.bucketBy(7, "a", "b").mode(SaveMode.Overwrite).saveAsTable(tbl2Name)

      sql(s"DESCRIBE EXTENDED $tbl1Name").show(numRows = 21, truncate = false)
      sql(s"DESCRIBE EXTENDED $tbl2Name").show(numRows = 21, truncate = false)

      val input1 = spark.readStream.table(tbl1Name)
      val input2 = spark.readStream.table(tbl2Name)

      val df1 = input1.toDF
      val df2 = input2.toDF // .repartition('b)
      val joined = df1.join(df2, Seq("a", "b")).select('a)

      testStream(joined)(
        // CheckAnswer(1.to(1000): _*),
        ProcessAllAvailable(),
        Execute { query =>
          // Verify the query plan
          def partitionExpressionsColumns(expressions: Seq[Expression]): Seq[String] = {
            expressions.flatMap {
              case ref: AttributeReference => Some(ref.name)
            }
          }

          val numPartitions = spark.sqlContext.conf.getConf(SQLConf.SHUFFLE_PARTITIONS)

          assert(query.lastExecution.executedPlan.collect {
            case j @ StreamingSymmetricHashJoinExec(_, _, _, _, _, _, _, _,
            ShuffleExchangeExec(opA: HashPartitioning, _, _),
            ShuffleExchangeExec(opB: HashPartitioning, _, _))
              if partitionExpressionsColumns(opA.expressions) === Seq("a", "b")
                && partitionExpressionsColumns(opB.expressions) === Seq("a", "b")
                && opA.numPartitions == numPartitions && opB.numPartitions == numPartitions => j
          }.size == 1)
        })
    }
  }

If we want to test with SPARK-35703, we may need to have a source supporting bucket scan on streaming. I'm not sure we have it in built-in source.

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-35693][SS][TEST] More strict validation on plan check for stream-stream join unit test [SPARK-38045][SS][TEST] More strict validation on plan check for stream-stream join unit test Jan 27, 2022
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although this is a kind of follow-up, I created a new JIRA for this because the previous one is already released at 3.2.0 and this one will be resolved as 3.2.2. This will give us a better traceability.

@dongjoon-hyun
Copy link
Member

Let me merge this first because the following is worth to have another JIRA. Thank you, @HeartSaVioR and all.

If we want to test with SPARK-35703, we may need to have a source supporting bucket scan on streaming. I'm not sure we have it in built-in source.

@dongjoon-hyun
Copy link
Member

Could you make a backporting PR to branch-3.2, @HeartSaVioR ? There is a small conflict in branch-3.2.

@HeartSaVioR
Copy link
Contributor Author

Thanks for reviewing and merging! I'll submit a PR against 3.2 sooner than later.

HeartSaVioR added a commit to HeartSaVioR/spark that referenced this pull request Jan 28, 2022
…am-stream join unit test

This PR is a follow-up of SPARK-35693 to enhance the unit test on stream-stream join to be more strict on plan check.

We would like to be more strict on plan check so that requirement of distribution against stream-stream join is fulfilled.

No, test only.

Modified test passed.

Closes apache#35341 from HeartSaVioR/SPARK-35693-followup.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Jan 28, 2022

(just for better traceability) #35347 for 3.2

senthh pushed a commit to senthh/spark-1 that referenced this pull request Feb 3, 2022
…am-stream join unit test

### What changes were proposed in this pull request?

This PR is a follow-up of SPARK-35693 to enhance the unit test on stream-stream join to be more strict on plan check.

### Why are the changes needed?

We would like to be more strict on plan check so that requirement of distribution against stream-stream join is fulfilled.

### Does this PR introduce _any_ user-facing change?

No, test only.

### How was this patch tested?

Modified test passed.

Closes apache#35341 from HeartSaVioR/SPARK-35693-followup.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants