-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24063][SS] Control maximum epoch backlog for ContinuousExecution #21392
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,6 +45,11 @@ private[sql] case object IncrementAndGetEpoch extends EpochCoordinatorMessage | |
*/ | ||
private[sql] case object StopContinuousExecutionWrites extends EpochCoordinatorMessage | ||
|
||
/** | ||
* Returns boolean indicating if size of the epochs queue has exceeded maximum epoch backlog. | ||
*/ | ||
private[sql] case object CheckIfMaxBacklogIsExceeded extends EpochCoordinatorMessage | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure we need to make a side-channel in the RPC handler for this. I'd try to just make the query fail when the condition is reached in the first place. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you mean make the query fail right from EpochCoordinator? If yes, I wanted to do so, but didn't figure out how to terminate query with exception. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we'd probably want to add some method like private[streaming] stopWithException(e) to ContinuousExecution. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, thought about something like this but wasn't sure if it's fine to do so in scope of this change. Thanks |
||
|
||
// Init messages | ||
/** | ||
* Set the reader and writer partition counts. Tasks may not be started until the coordinator | ||
|
@@ -123,6 +128,9 @@ private[continuous] class EpochCoordinator( | |
override val rpcEnv: RpcEnv) | ||
extends ThreadSafeRpcEndpoint with Logging { | ||
|
||
private val maxEpochBacklog = session.sqlContext.conf.maxEpochBacklog | ||
|
||
private var maxEpochBacklogExceeded: Boolean = false | ||
private var queryWritesStopped: Boolean = false | ||
|
||
private var numReaderPartitions: Int = _ | ||
|
@@ -153,9 +161,13 @@ private[continuous] class EpochCoordinator( | |
// If not, add the epoch being currently processed to epochs waiting to be committed, | ||
// otherwise commit it. | ||
if (lastCommittedEpoch != epoch - 1) { | ||
logDebug(s"Epoch $epoch has received commits from all partitions " + | ||
s"and is waiting for epoch ${epoch - 1} to be committed first.") | ||
epochsWaitingToBeCommitted.add(epoch) | ||
if (epochsWaitingToBeCommitted.size == maxEpochBacklog) { | ||
maxEpochBacklogExceeded = true | ||
} else { | ||
logDebug(s"Epoch $epoch has received commits from all partitions " + | ||
s"and is waiting for epoch ${epoch - 1} to be committed first.") | ||
epochsWaitingToBeCommitted.add(epoch) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. once maxEpochBacklogExceeded is set to true, can never be set to false again? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Basing on what I discussed with Jose the stream should be killed if backlog exceeds value of a certain config option, so yes, why set it back to false later. At least that's how I see it |
||
} | ||
} else { | ||
commitEpoch(epoch, thisEpochCommits) | ||
lastCommittedEpoch = epoch | ||
|
@@ -246,5 +258,8 @@ private[continuous] class EpochCoordinator( | |
case StopContinuousExecutionWrites => | ||
queryWritesStopped = true | ||
context.reply(()) | ||
|
||
case CheckIfMaxBacklogIsExceeded => | ||
context.reply(maxEpochBacklogExceeded) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throw exception will make epochUpdateThread stop working, but the application will keep working?
I think it's better to block and wait old epoch to be committed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed that the code as written won't shut down the stream. But I think it does make sense to kill the stream rather than waiting for old epochs. If we end up with a large backlog it's almost surely because some partition isn't making any progress, so I wouldn't expect Spark to ever be able to catch up.