From 96152da2ff2f0ee1a285ea2788407a458e7cd09f Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Fri, 22 Feb 2019 14:44:19 +0100 Subject: [PATCH] Review fixes: * CONSTANT.key used in tests * Removed newline hell --- .../sql/streaming/continuous/ContinuousSuite.scala | 3 ++- .../streaming/continuous/EpochCoordinatorSuite.scala | 11 ++--------- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala index 5fdce89e95978..f0e6b4f7fbfeb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf.CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE import org.apache.spark.sql.streaming.{StreamTest, Trigger} import org.apache.spark.sql.test.TestSparkSession @@ -356,7 +357,7 @@ class ContinuousEpochBacklogSuite extends ContinuousSuiteBase { // This test forces the backlog to overflow by not standing up enough executors for the query // to make progress. test("epoch backlog overflow") { - withSQLConf(("spark.sql.streaming.continuous.epochBacklogQueueSize", "10")) { + withSQLConf((CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE.key, "10")) { val df = spark.readStream .format("rate") .option("numPartitions", "2") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala index 07e21e794006d..74f55f5a8ed5c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark._ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.sql.LocalSparkSession import org.apache.spark.sql.execution.streaming.continuous._ +import org.apache.spark.sql.internal.SQLConf.CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, PartitionOffset} import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport @@ -55,7 +56,7 @@ class EpochCoordinatorSuite new SparkContext( "local[2]", "test-sql-context", new SparkConf().set("spark.sql.testkey", "true") - .set("spark.sql.streaming.continuous.epochBacklogQueueSize", + .set(CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE.key, epochBacklogQueueSize.toString))) epochCoordinator @@ -193,14 +194,11 @@ class EpochCoordinatorSuite } test("several epochs, max epoch backlog reached by partitionOffsets") { - setWriterPartitions(1) setReaderPartitions(1) reportPartitionOffset(0, 1) - // Commit messages not arriving - for (i <- 2 to epochBacklogQueueSize + 1) { reportPartitionOffset(0, i) } @@ -214,14 +212,11 @@ class EpochCoordinatorSuite } test("several epochs, max epoch backlog reached by partitionCommits") { - setWriterPartitions(1) setReaderPartitions(1) commitPartitionEpoch(0, 1) - // Offset messages not arriving - for (i <- 2 to epochBacklogQueueSize + 1) { commitPartitionEpoch(0, i) } @@ -235,7 +230,6 @@ class EpochCoordinatorSuite } test("several epochs, max epoch backlog reached by epochsWaitingToBeCommitted") { - setWriterPartitions(2) setReaderPartitions(2) @@ -243,7 +237,6 @@ class EpochCoordinatorSuite reportPartitionOffset(0, 1) // For partition 2 epoch 1 messages never arriving - // +2 because the first epoch not yet arrived for (i <- 2 to epochBacklogQueueSize + 2) { commitPartitionEpoch(0, i)