Skip to content

Commit

Permalink
withSQLConf used and test commented
Browse files Browse the repository at this point in the history
  • Loading branch information
gaborgsomogyi committed Jan 28, 2019
1 parent 357f834 commit f6bc301
Showing 1 changed file with 18 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -347,27 +347,29 @@ class ContinuousMetaSuite extends ContinuousSuiteBase {
class ContinuousEpochBacklogSuite extends ContinuousSuiteBase {
import testImplicits._

// We need to specify spark.sql.streaming.continuous.epochBacklogQueueSize.
override protected def createSparkSession = new TestSparkSession(
new SparkContext(
"local[1]",
"continuous-stream-test-sql-context",
sparkConf.set("spark.sql.testkey", "true")
.set("spark.sql.streaming.continuous.epochBacklogQueueSize", "10")))
sparkConf.set("spark.sql.testkey", "true")))

// This test forces the backlog to overflow by not standing up enough executors for the query
// to make progress.
test("epoch backlog overflow") {
val df = spark.readStream
.format("rate")
.option("numPartitions", "2")
.option("rowsPerSecond", "500")
.load()
.select('value)

testStream(df, useV2Sink = true)(
StartStream(Trigger.Continuous(1)),
ExpectFailure[IllegalStateException] { e =>
e.getMessage.contains("queue has exceeded it's maximum")
}
)
withSQLConf(("spark.sql.streaming.continuous.epochBacklogQueueSize", "10")) {
val df = spark.readStream
.format("rate")
.option("numPartitions", "2")
.option("rowsPerSecond", "500")
.load()
.select('value)

testStream(df, useV2Sink = true)(
StartStream(Trigger.Continuous(1)),
ExpectFailure[IllegalStateException] { e =>
e.getMessage.contains("queue has exceeded it's maximum")
}
)
}
}
}

0 comments on commit f6bc301

Please sign in to comment.