Skip to content

Commit

Permalink
make query execution more sensitive to epoch message late or lost
Browse files Browse the repository at this point in the history
  • Loading branch information
uncleGen committed Apr 3, 2019
1 parent 3286bff commit 8c71b2f
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1453,6 +1453,13 @@ object SQLConf {

val CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE =
buildConf("spark.sql.streaming.continuous.epochBacklogQueueSize")
.doc("The max number of entries to be stored in queue to wait for late epochs. " +
"If this parameter is exceeded by the size of the queue, stream will stop with an error.")
.intConf
.createWithDefault(10)

val CONTINUOUS_STREAMING_EPOCH_MSG_BACKLOG_QUEUE_SIZE =
buildConf("spark.sql.streaming.continuous.epochMessageBacklogQueueSize")
.doc("The max number of entries to be stored in queue to wait for late epochs. " +
"If this parameter is exceeded by the size of the queue, stream will stop with an error.")
.intConf
Expand Down Expand Up @@ -2116,6 +2123,9 @@ class SQLConf extends Serializable with Logging {
def continuousStreamingEpochBacklogQueueSize: Int =
getConf(CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE)

def continuousStreamingEpochMsgBacklogQueueSize: Int =
getConf(CONTINUOUS_STREAMING_EPOCH_MSG_BACKLOG_QUEUE_SIZE)

def continuousStreamingExecutorQueueSize: Int = getConf(CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE)

def continuousStreamingExecutorPollIntervalMs: Long =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private[sql] object EpochCoordinatorRef extends Logging {
private[continuous] class EpochCoordinator(
writeSupport: StreamingWrite,
stream: ContinuousStream,
query: ContinuousExecution,
queryExecution: ContinuousExecution,
startEpoch: Long,
session: SparkSession,
override val rpcEnv: RpcEnv)
Expand All @@ -126,6 +126,9 @@ private[continuous] class EpochCoordinator(
private val epochBacklogQueueSize =
session.sqlContext.conf.continuousStreamingEpochBacklogQueueSize

private val epochMsgBacklogQueueSize =
session.sqlContext.conf.continuousStreamingEpochMsgBacklogQueueSize

private var queryWritesStopped: Boolean = false

private var numReaderPartitions: Int = _
Expand Down Expand Up @@ -202,7 +205,7 @@ private[continuous] class EpochCoordinator(
// Sequencing is important here. We must commit to the writer before recording the commit
// in the query, or we will end up dropping the commit if we restart in the middle.
writeSupport.commit(epoch, messages.toArray)
query.commit(epoch)
queryExecution.commit(epoch)
}

override def receive: PartialFunction[Any, Unit] = {
Expand All @@ -224,24 +227,24 @@ private[continuous] class EpochCoordinator(
partitionOffsets.collect { case ((e, _), o) if e == epoch => o }
if (thisEpochOffsets.size == numReaderPartitions) {
logDebug(s"Epoch $epoch has offsets reported from all partitions: $thisEpochOffsets")
query.addOffset(epoch, stream, thisEpochOffsets.toSeq)
queryExecution.addOffset(epoch, stream, thisEpochOffsets.toSeq)
resolveCommitsAtEpoch(epoch)
}
checkProcessingQueueBoundaries()
}

private def checkProcessingQueueBoundaries() = {
if (partitionOffsets.size > epochBacklogQueueSize) {
query.stopInNewThread(new IllegalStateException("Size of the partition offset queue has " +
"exceeded its maximum"))
if (partitionOffsets.size > epochMsgBacklogQueueSize) {
queryExecution.stopInNewThread(new IllegalStateException("Size of the partition offset " +
"queue has exceeded its maximum"))
}
if (partitionCommits.size > epochBacklogQueueSize) {
query.stopInNewThread(new IllegalStateException("Size of the partition commit queue has " +
"exceeded its maximum"))
if (partitionCommits.size > epochMsgBacklogQueueSize) {
queryExecution.stopInNewThread(new IllegalStateException("Size of the partition commit " +
"queue has exceeded its maximum"))
}
if (epochsWaitingToBeCommitted.size > epochBacklogQueueSize) {
query.stopInNewThread(new IllegalStateException("Size of the epoch queue has " +
"exceeded its maximum"))
queryExecution.stopInNewThread(new IllegalStateException(s"Epoch ${lastCommittedEpoch + 1} " +
s"is late for more than ${epochsWaitingToBeCommitted.max - lastCommittedEpoch} epochs."))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.sql.LocalSparkSession
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.internal.SQLConf.CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE
import org.apache.spark.sql.internal.SQLConf.{CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE, CONTINUOUS_STREAMING_EPOCH_MSG_BACKLOG_QUEUE_SIZE}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
Expand All @@ -44,7 +44,8 @@ class EpochCoordinatorSuite
private var writeSupport: StreamingWrite = _
private var query: ContinuousExecution = _
private var orderVerifier: InOrder = _
private val epochBacklogQueueSize = 10
private val epochBacklogQueueSize = 2
private val epochMsgBacklogQueueSize = 10

override def beforeEach(): Unit = {
val stream = mock[ContinuousStream]
Expand All @@ -55,8 +56,10 @@ class EpochCoordinatorSuite
spark = new TestSparkSession(
new SparkContext(
"local[2]", "test-sql-context",
new SparkConf().set("spark.sql.testkey", "true")
.set(CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE, epochBacklogQueueSize)))
new SparkConf()
.set("spark.sql.testkey", "true")
.set(CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE, epochBacklogQueueSize)
.set(CONTINUOUS_STREAMING_EPOCH_MSG_BACKLOG_QUEUE_SIZE, epochMsgBacklogQueueSize)))

epochCoordinator
= EpochCoordinatorRef.create(writeSupport, stream, query, "test", 1, spark, SparkEnv.get)
Expand Down Expand Up @@ -192,37 +195,37 @@ class EpochCoordinatorSuite
verifyCommitsInOrderOf(List(1, 2, 3, 4, 5))
}

test("several epochs, max epoch backlog reached by partitionOffsets") {
test("several epochs, max epoch msg backlog reached by partitionOffsets") {
setWriterPartitions(1)
setReaderPartitions(1)

reportPartitionOffset(0, 1)
// Commit messages not arriving
for (i <- 2 to epochBacklogQueueSize + 1) {
for (i <- 2 to epochMsgBacklogQueueSize + 1) {
reportPartitionOffset(0, i)
}

makeSynchronousCall()

for (i <- 1 to epochBacklogQueueSize + 1) {
for (i <- 1 to epochMsgBacklogQueueSize + 1) {
verifyNoCommitFor(i)
}
verifyStoppedWithException("Size of the partition offset queue has exceeded its maximum")
}

test("several epochs, max epoch backlog reached by partitionCommits") {
test("several epochs, max epoch msg backlog reached by partitionCommits") {
setWriterPartitions(1)
setReaderPartitions(1)

commitPartitionEpoch(0, 1)
// Offset messages not arriving
for (i <- 2 to epochBacklogQueueSize + 1) {
for (i <- 2 to epochMsgBacklogQueueSize + 1) {
commitPartitionEpoch(0, i)
}

makeSynchronousCall()

for (i <- 1 to epochBacklogQueueSize + 1) {
for (i <- 1 to epochMsgBacklogQueueSize + 1) {
verifyNoCommitFor(i)
}
verifyStoppedWithException("Size of the partition commit queue has exceeded its maximum")
Expand All @@ -235,7 +238,7 @@ class EpochCoordinatorSuite
commitPartitionEpoch(0, 1)
reportPartitionOffset(0, 1)

// For partition 2 epoch 1 messages never arriving
// For partition 1 epoch 1 messages never arriving
// +2 because the first epoch not yet arrived
for (i <- 2 to epochBacklogQueueSize + 2) {
commitPartitionEpoch(0, i)
Expand All @@ -249,7 +252,7 @@ class EpochCoordinatorSuite
for (i <- 1 to epochBacklogQueueSize + 2) {
verifyNoCommitFor(i)
}
verifyStoppedWithException("Size of the epoch queue has exceeded its maximum")
verifyStoppedWithException(s"Epoch 1 is late for more than $epochBacklogQueueSize epochs.")
}

private def setWriterPartitions(numPartitions: Int): Unit = {
Expand Down

0 comments on commit 8c71b2f

Please sign in to comment.