Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24063][SS] Control maximum epoch backlog for ContinuousExecution #21392

Closed
wants to merge 3 commits into from
Closed

[SPARK-24063][SS] Control maximum epoch backlog for ContinuousExecution #21392

wants to merge 3 commits into from

Conversation

spaced4ndy
Copy link

@spaced4ndy spaced4ndy commented May 22, 2018

What changes were proposed in this pull request?

This pull request adds maxEpochBacklog SQL configuration option. EpochCoordinator tracks if the length of the queue of waiting epochs has exceeded it. Upon this condition stream is stopped with an error indicating too many epochs stacked up

How was this patch tested?

Existing unit tests

@spaced4ndy
Copy link
Author

spaced4ndy commented May 22, 2018

@jose-torres Hi Jose, could you take a look at this pr please? I had doubts how to properly implement error reporting logic we discussed and this is what I came up with.
Also please advise how I can test these changes. I was writing this several weeks ago so I could forget something, but if my memory doesn't fail me I thought about the approach similar to tests in ContinuousSuite with custom StreamActions. I think I wasn't completely sure about implementation though. Would that be correct?

val maxBacklogExceeded = epochEndpoint.askSync[Boolean](CheckIfMaxBacklogIsExceeded)
if (maxBacklogExceeded) {
throw new IllegalStateException(
"Size of the epochs queue has exceeded maximum allowed epoch backlog.")
Copy link

@yanlin-Lynn yanlin-Lynn May 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throw exception will make epochUpdateThread stop working, but the application will keep working?
I think it's better to block and wait old epoch to be committed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that the code as written won't shut down the stream. But I think it does make sense to kill the stream rather than waiting for old epochs. If we end up with a large backlog it's almost surely because some partition isn't making any progress, so I wouldn't expect Spark to ever be able to catch up.

} else {
logDebug(s"Epoch $epoch has received commits from all partitions " +
s"and is waiting for epoch ${epoch - 1} to be committed first.")
epochsWaitingToBeCommitted.add(epoch)
Copy link

@yanlin-Lynn yanlin-Lynn May 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once maxEpochBacklogExceeded is set to true, can never be set to false again?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basing on what I discussed with Jose the stream should be killed if backlog exceeds value of a certain config option, so yes, why set it back to false later. At least that's how I see it

Copy link
Contributor

@jose-torres jose-torres left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The simplest way to test the changes would be to use a TestSparkSession with too few cores - e.g. local(1) but 2 input partitions. Then you can start a stream with something like a 1 ms checkpoint interval and max backlog 10 to quickly get a failure. (All epochs will end up in the backlog if there aren't enough cores to schedule one of the partitions, since no global commit will ever succeed.)

/**
* Returns boolean indicating if size of the epochs queue has exceeded maximum epoch backlog.
*/
private[sql] case object CheckIfMaxBacklogIsExceeded extends EpochCoordinatorMessage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we need to make a side-channel in the RPC handler for this. I'd try to just make the query fail when the condition is reached in the first place.

Copy link
Author

@spaced4ndy spaced4ndy May 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean make the query fail right from EpochCoordinator? If yes, I wanted to do so, but didn't figure out how to terminate query with exception.
EpochCoordinator has query: ContinuousExecution as a parameter, but then I don't see a suitable method for query.. Closest I found is stop() I guess.
Or am I looking in a completely wrong direction? Please give a hint.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'd probably want to add some method like private[streaming] stopWithException(e) to ContinuousExecution.

Copy link
Author

@spaced4ndy spaced4ndy May 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, thought about something like this but wasn't sure if it's fine to do so in scope of this change. Thanks

@HyukjinKwon
Copy link
Member

Hi all, any update on this PR?

@spaced4ndy
Copy link
Author

@HyukjinKwon Hi, I've stopped working on it for some time now

@HyukjinKwon
Copy link
Member

In that case, would you mind if I ask to leave this closed for now and reopen when you start to work on this? I am trying to leave active PRs only.

@spaced4ndy
Copy link
Author

Sure, no problem.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants