Skip to content

Commit

Permalink
reflect review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
HeartSaVioR committed Oct 11, 2024
1 parent 3c15030 commit d731a1e
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -672,10 +672,10 @@ case class StreamingSymmetricHashJoinExec(
// - Left side: input can be skipped to be added to the state store if it's already matched
// and the join type is left semi.
// For other cases, the input should be added, including the case it's going to be evicted
// in this batch. It hasn't yet evaluated with inputs from right side "for this batch".
// in this batch. It hasn't yet evaluated with inputs from right side for this batch.
// Refer to the classdoc of SteramingSymmetricHashJoinExec about how stream-stream join
// works.
// - Right side: for this side, the evaluation with inputs from left side "for this batch"
// - Right side: for this side, the evaluation with inputs from left side for this batch
// is done at this point. That said, input can be skipped to be added to the state store
// if input is going to be evicted in this batch. Though, input should be added to the
// state store if it's right outer join or full outer join, as unmatched output is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,7 @@ class MultiStatefulOperatorsSuite
// does not delay the state watermark in stream-stream join).
// Before SPARK-49829, left side does not add the input to state store if it's going to evict
// in this batch, which breaks the match between input from left side and input from right
// side "for this batch".
// side for this batch.
testStream(joined)(
MultiAddData(
(inputStream1, Seq(1L, 2L, 3L, 4L, 5L)),
Expand All @@ -921,6 +921,7 @@ class MultiStatefulOperatorsSuite
(inputStream2, Seq(15L, 16L, 17L, 18L, 19L))
),
// watermark: 15 - 5 = 10 (windows for [0, 10) are completed)
// Before SPARK-49829, the test fails because this row is not produced.
CheckNewAnswer((10L, 15L, 35L)),
MultiAddData(
(inputStream1, Seq(100L)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1630,7 +1630,9 @@ class StreamingOuterJoinSuite extends StreamingJoinSuite {
// Before Spark introduces multiple stateful operator, WM for late record was same as
// WM for eviction, hence ("d", 1) was treated as late record.
// With the multiple state operator, ("d", 1) is added in batch 1 but also evicted in
// batch 1. Before SPARK-49829, this wasn't producing unmatched row, and it is fixed.
// batch 1. Note that the eviction is happening with state watermark: for this join,
// state watermark = state eviction under join condition. Before SPARK-49829, this
// wasn't producing unmatched row, and it is fixed.
AddData(memoryStream1, ("d", 1)),
CheckNewAnswer(("a", 1), ("d", 1)),
assertLeftRows(Seq()),
Expand Down Expand Up @@ -2152,6 +2154,9 @@ class StreamingLeftSemiJoinSuite extends StreamingJoinSuite {
// just trigger a new batch with arbitrary data as the original test relies on no-data
// batch, and we need to check with remaining unmatched outputs
AddData(memoryStream1, (100L, 6)),
// Before SPARK-49829, the test fails because (23, 4, null, null) wasn't produced.
// (The assertion of state for left inputs & right inputs weren't included on the test
// before SPARK-49829.)
CheckNewAnswer(Row(22, 3, null, 3), Row(23, 4, null, null))
)

Expand Down

0 comments on commit d731a1e

Please sign in to comment.