Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24063][SS] Control maximum epoch backlog for ContinuousExecution #21392

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,14 @@ object SQLConf {
.intConf
.createWithDefault(Int.MaxValue)

val MAX_EPOCH_BACKLOG = buildConf("spark.sql.streaming.continuous.maxEpochBacklog")
.internal()
.doc("The max number of epochs to be stored in queue to wait for late epochs. " +
"If this parameter is exceeded by the size of the queue, stream is stopped with an error " +
"indicating too many epochs stacked up.")
.intConf
.createWithDefault(10000)

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
Expand Down Expand Up @@ -1641,6 +1649,8 @@ class SQLConf extends Serializable with Logging {
def partitionOverwriteMode: PartitionOverwriteMode.Value =
PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE))

def maxEpochBacklog: Int = getConf(MAX_EPOCH_BACKLOG)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.spark.sql.execution.streaming.continuous

import java.lang.Thread.UncaughtExceptionHandler
import java.util.UUID
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import java.util.function.UnaryOperator

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -233,9 +235,15 @@ class ContinuousExecution(
}
false
} else if (isActive) {
currentBatchId = epochEndpoint.askSync[Long](IncrementAndGetEpoch)
logInfo(s"New epoch $currentBatchId is starting.")
true
val maxBacklogExceeded = epochEndpoint.askSync[Boolean](CheckIfMaxBacklogIsExceeded)
if (maxBacklogExceeded) {
throw new IllegalStateException(
"Size of the epochs queue has exceeded maximum allowed epoch backlog.")
Copy link

@yanlin-Lynn yanlin-Lynn May 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throw exception will make epochUpdateThread stop working, but the application will keep working?
I think it's better to block and wait old epoch to be committed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that the code as written won't shut down the stream. But I think it does make sense to kill the stream rather than waiting for old epochs. If we end up with a large backlog it's almost surely because some partition isn't making any progress, so I wouldn't expect Spark to ever be able to catch up.

} else {
currentBatchId = epochEndpoint.askSync[Long](IncrementAndGetEpoch)
logInfo(s"New epoch $currentBatchId is starting.")
true
}
} else {
false
}
Expand All @@ -248,7 +256,12 @@ class ContinuousExecution(
}
}, s"epoch update thread for $prettyIdString")

val throwableReference: AtomicReference[Throwable] = new AtomicReference[Throwable]()
try {
epochUpdateThread.setUncaughtExceptionHandler(new UncaughtExceptionHandler {
override def uncaughtException(thread: Thread, throwable: Throwable): Unit =
throwableReference.set(throwable)
})
epochUpdateThread.setDaemon(true)
epochUpdateThread.start()

Expand All @@ -268,6 +281,11 @@ class ContinuousExecution(
epochUpdateThread.interrupt()
epochUpdateThread.join()

val throwable: Throwable = throwableReference.get()
if (throwable != null && throwable.isInstanceOf[IllegalStateException]) {
throw throwable.asInstanceOf[IllegalStateException]
}

stopSources()
sparkSession.sparkContext.cancelJobGroup(runId.toString)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ private[sql] case object IncrementAndGetEpoch extends EpochCoordinatorMessage
*/
private[sql] case object StopContinuousExecutionWrites extends EpochCoordinatorMessage

/**
* Returns boolean indicating if size of the epochs queue has exceeded maximum epoch backlog.
*/
private[sql] case object CheckIfMaxBacklogIsExceeded extends EpochCoordinatorMessage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we need to make a side-channel in the RPC handler for this. I'd try to just make the query fail when the condition is reached in the first place.

Copy link
Author

@spaced4ndy spaced4ndy May 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean make the query fail right from EpochCoordinator? If yes, I wanted to do so, but didn't figure out how to terminate query with exception.
EpochCoordinator has query: ContinuousExecution as a parameter, but then I don't see a suitable method for query.. Closest I found is stop() I guess.
Or am I looking in a completely wrong direction? Please give a hint.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'd probably want to add some method like private[streaming] stopWithException(e) to ContinuousExecution.

Copy link
Author

@spaced4ndy spaced4ndy May 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, thought about something like this but wasn't sure if it's fine to do so in scope of this change. Thanks


// Init messages
/**
* Set the reader and writer partition counts. Tasks may not be started until the coordinator
Expand Down Expand Up @@ -123,6 +128,9 @@ private[continuous] class EpochCoordinator(
override val rpcEnv: RpcEnv)
extends ThreadSafeRpcEndpoint with Logging {

private val maxEpochBacklog = session.sqlContext.conf.maxEpochBacklog

private var maxEpochBacklogExceeded: Boolean = false
private var queryWritesStopped: Boolean = false

private var numReaderPartitions: Int = _
Expand Down Expand Up @@ -153,9 +161,13 @@ private[continuous] class EpochCoordinator(
// If not, add the epoch being currently processed to epochs waiting to be committed,
// otherwise commit it.
if (lastCommittedEpoch != epoch - 1) {
logDebug(s"Epoch $epoch has received commits from all partitions " +
s"and is waiting for epoch ${epoch - 1} to be committed first.")
epochsWaitingToBeCommitted.add(epoch)
if (epochsWaitingToBeCommitted.size == maxEpochBacklog) {
maxEpochBacklogExceeded = true
} else {
logDebug(s"Epoch $epoch has received commits from all partitions " +
s"and is waiting for epoch ${epoch - 1} to be committed first.")
epochsWaitingToBeCommitted.add(epoch)
Copy link

@yanlin-Lynn yanlin-Lynn May 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once maxEpochBacklogExceeded is set to true, can never be set to false again?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basing on what I discussed with Jose the stream should be killed if backlog exceeds value of a certain config option, so yes, why set it back to false later. At least that's how I see it

}
} else {
commitEpoch(epoch, thisEpochCommits)
lastCommittedEpoch = epoch
Expand Down Expand Up @@ -246,5 +258,8 @@ private[continuous] class EpochCoordinator(
case StopContinuousExecutionWrites =>
queryWritesStopped = true
context.reply(())

case CheckIfMaxBacklogIsExceeded =>
context.reply(maxEpochBacklogExceeded)
}
}