Skip to content

Commit

Permalink
slight more refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
HeartSaVioR committed Sep 30, 2024
1 parent 4f14646 commit d50a906
Showing 1 changed file with 16 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1602,6 +1602,18 @@ class StreamingOuterJoinSuite extends StreamingJoinSuite {
val joinedDf = data1.join(data2, Seq("key", "eventTime"), "leftOuter")
.selectExpr("key", "CAST(eventTime AS long) AS eventTime")

def assertLeftRows(expected: Seq[Row]): AssertOnQuery = {
assertStateStoreRows(0L, "left", expected) { df =>
df.selectExpr("value.key", "CAST(value.eventTime AS long)")
}
}

def assertRightRows(expected: Seq[Row]): AssertOnQuery = {
assertStateStoreRows(0L, "right", expected) { df =>
df.selectExpr("value.key", "CAST(value.eventTime AS long)")
}
}

testStream(joinedDf)(
StartStream(checkpointLocation = checkpoint.getCanonicalPath),
// batch 0
Expand All @@ -1611,12 +1623,8 @@ class StreamingOuterJoinSuite extends StreamingJoinSuite {
(memoryStream2, Seq(("b", 2), ("c", 1)))
),
CheckNewAnswer(("b", 2)),
assertStateStoreRows(0L, "left", Seq(Row("a", 1), Row("b", 2))) { df =>
df.selectExpr("value.key", "CAST(value.eventTime AS long)")
},
assertStateStoreRows(0L, "right", Seq(Row("b", 2), Row("c", 1))) { df =>
df.selectExpr("value.key", "CAST(value.eventTime AS long)")
},
assertLeftRows(Seq(Row("a", 1), Row("b", 2))),
assertRightRows(Seq(Row("b", 1), Row("c", 1))),
// batch 1
// WM: late record = 0, eviction = 2
// Before Spark introduces multiple stateful operator, WM for late record was same as
Expand All @@ -1625,12 +1633,8 @@ class StreamingOuterJoinSuite extends StreamingJoinSuite {
// batch 1. Before SPARK-49829, this wasn't producing unmatched row, and it is fixed.
AddData(memoryStream1, ("d", 1)),
CheckNewAnswer(("a", 1), ("d", 1)),
assertStateStoreRows(0L, "left", Seq()) { df =>
df.selectExpr("value.key", "CAST(value.eventTime AS long)")
},
assertStateStoreRows(0L, "right", Seq()) { df =>
df.selectExpr("value.key", "CAST(value.eventTime AS long)")
}
assertLeftRows(Seq()),
assertRightRows(Seq()),
)
}
}
Expand Down

0 comments on commit d50a906

Please sign in to comment.