diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala index f7930460fb222..5fdce89e95978 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala @@ -347,27 +347,29 @@ class ContinuousMetaSuite extends ContinuousSuiteBase { class ContinuousEpochBacklogSuite extends ContinuousSuiteBase { import testImplicits._ - // We need to specify spark.sql.streaming.continuous.epochBacklogQueueSize. override protected def createSparkSession = new TestSparkSession( new SparkContext( "local[1]", "continuous-stream-test-sql-context", - sparkConf.set("spark.sql.testkey", "true") - .set("spark.sql.streaming.continuous.epochBacklogQueueSize", "10"))) + sparkConf.set("spark.sql.testkey", "true"))) + // This test forces the backlog to overflow by not standing up enough executors for the query + // to make progress. test("epoch backlog overflow") { - val df = spark.readStream - .format("rate") - .option("numPartitions", "2") - .option("rowsPerSecond", "500") - .load() - .select('value) - - testStream(df, useV2Sink = true)( - StartStream(Trigger.Continuous(1)), - ExpectFailure[IllegalStateException] { e => - e.getMessage.contains("queue has exceeded it's maximum") - } - ) + withSQLConf(("spark.sql.streaming.continuous.epochBacklogQueueSize", "10")) { + val df = spark.readStream + .format("rate") + .option("numPartitions", "2") + .option("rowsPerSecond", "500") + .load() + .select('value) + + testStream(df, useV2Sink = true)( + StartStream(Trigger.Continuous(1)), + ExpectFailure[IllegalStateException] { e => + e.getMessage.contains("queue has exceeded it's maximum") + } + ) + } } }