Skip to content

Commit

Permalink
Review fixes:
Browse files Browse the repository at this point in the history
 * CONSTANT.key used in tests
 * Removed newline hell
  • Loading branch information
gaborgsomogyi committed Feb 22, 2019
1 parent f6bc301 commit 96152da
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf.CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE
import org.apache.spark.sql.streaming.{StreamTest, Trigger}
import org.apache.spark.sql.test.TestSparkSession

Expand Down Expand Up @@ -356,7 +357,7 @@ class ContinuousEpochBacklogSuite extends ContinuousSuiteBase {
// This test forces the backlog to overflow by not standing up enough executors for the query
// to make progress.
test("epoch backlog overflow") {
withSQLConf(("spark.sql.streaming.continuous.epochBacklogQueueSize", "10")) {
withSQLConf((CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE.key, "10")) {
val df = spark.readStream
.format("rate")
.option("numPartitions", "2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.sql.LocalSparkSession
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.internal.SQLConf.CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, PartitionOffset}
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
Expand Down Expand Up @@ -55,7 +56,7 @@ class EpochCoordinatorSuite
new SparkContext(
"local[2]", "test-sql-context",
new SparkConf().set("spark.sql.testkey", "true")
.set("spark.sql.streaming.continuous.epochBacklogQueueSize",
.set(CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE.key,
epochBacklogQueueSize.toString)))

epochCoordinator
Expand Down Expand Up @@ -193,14 +194,11 @@ class EpochCoordinatorSuite
}

test("several epochs, max epoch backlog reached by partitionOffsets") {

setWriterPartitions(1)
setReaderPartitions(1)

reportPartitionOffset(0, 1)

// Commit messages not arriving

for (i <- 2 to epochBacklogQueueSize + 1) {
reportPartitionOffset(0, i)
}
Expand All @@ -214,14 +212,11 @@ class EpochCoordinatorSuite
}

test("several epochs, max epoch backlog reached by partitionCommits") {

setWriterPartitions(1)
setReaderPartitions(1)

commitPartitionEpoch(0, 1)

// Offset messages not arriving

for (i <- 2 to epochBacklogQueueSize + 1) {
commitPartitionEpoch(0, i)
}
Expand All @@ -235,15 +230,13 @@ class EpochCoordinatorSuite
}

test("several epochs, max epoch backlog reached by epochsWaitingToBeCommitted") {

setWriterPartitions(2)
setReaderPartitions(2)

commitPartitionEpoch(0, 1)
reportPartitionOffset(0, 1)

// For partition 2 epoch 1 messages never arriving

// +2 because the first epoch not yet arrived
for (i <- 2 to epochBacklogQueueSize + 2) {
commitPartitionEpoch(0, i)
Expand Down

0 comments on commit 96152da

Please sign in to comment.