-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23503][SS] Enforce sequencing of committed epochs for Continuous Execution #20936
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The general approach looks correct to me.
partitionCommits.collect { case ((e, _), msg) if e == nextEpoch => msg } | ||
logDebug(s"Committing epoch $nextEpoch.") | ||
writer.commit(nextEpoch, nextEpochCommits.toArray) | ||
query.commit(nextEpoch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a bit of duplicated logic here - helper methods would probably be nice.
} else { | ||
logDebug(s"Epoch $epoch has received commits from all partitions" + | ||
s"and is waiting for epoch ${epoch - 1} to be committed first.") | ||
epochsWaitingToBeCommitted.add(epoch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe swap the order of the if else. I'd forgotten what the condition was for after scrolling down here.
@jose-torres Could you review latest changes for this PR please? |
LGTM, once #20983 is committed and we can pull in the tests from that PR. |
@tdas @jose-torres Hi, I tested this with #20983, can we run jenkins tests now? |
LGTM |
@@ -137,30 +137,65 @@ private[continuous] class EpochCoordinator( | |||
private val partitionOffsets = | |||
mutable.Map[(Long, Int), PartitionOffset]() | |||
|
|||
private var lastCommittedEpoch = startEpoch - 1 | |||
// Remembers epochs that have to wait for previous epochs to be committed first. | |||
private val epochsWaitingToBeCommitted = mutable.HashSet.empty[Long] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is orthogonal to the current PR, but I realized that both this and the commits/offsets maps are unbounded queues. We probably should introduce some SQLConf for the maximum epoch backlog, and report an error when too many stack up. I'll file a JIRA ticket for this.
Jenkins, ok to test |
Test build #89555 has finished for PR 20936 at commit
|
@felixcheung hi, could you merge this please? I'd use this to work on PR for epoch backlog issue Jose pointed out. Since it passes tests and Jose approves, I guess it's good to go. As a side note, could you also add me to whitelist so that I don't waste maintainers' time to launch jenkins tests for future PRs? I don't know if it's usual for new contributors, but I see it's a possible work flow in Spark contributing guide. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good, just requires a little bit more docs.
} | ||
} | ||
} | ||
|
||
private def findCommitsForEpoch(epoch: Long): Iterable[WriterCommitMessage] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add docs explaining what this does? As is, its hard to distinguish just from the name the difference between findCommitsForEpoch
and commitEpoch
. I think the term "commit" is overloaded here - commit
in findCommitsForEpoch
refers to per-partition commits, whereas commit
in commitEpoch
refers to committing the epoch to the offset log. May be its better to differentiate more clearly. commitEpoch
and findPartitionCommitsForEpoch
. And add docs to both methods also helps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tdas done
Test build #89876 has finished for PR 20936 at commit
|
jenkins retest this please. |
Test build #90245 has finished for PR 20936 at commit
|
@tdas Hi, could you give me some advice on how to resolve this error? I'm not sure how this PR could cause this, especially considering it was passing tests before and failed jenkins test says "It is not a test it is a sbt.testing.SuiteSelector" in description. |
retest this please |
(I agree that your PR isn't responsible here, there's a known problem with that suite.) |
Test build #90281 has finished for PR 20936 at commit
|
@tdas Hi, seems like it's good to go. Could you merge this when you have time? |
Jenkins retest this please. |
@efimpoberezkin I will merge it. Let me do another round of tests. |
retest this please |
@tdas can't start tests |
jenkins retest this please. |
Test build #90758 has finished for PR 20936 at commit
|
ugh that flaky kafka test. It's already reported, and I've been looking into it this week albeit with little luck. |
retest this please |
Test build #4181 has finished for PR 20936 at commit
|
Test build #90764 has finished for PR 20936 at commit
|
I merged this to master. Thanks. And sorry for the delay. |
What changes were proposed in this pull request?
Made changes to EpochCoordinator so that it enforces a commit order. In case a message for epoch n is lost and epoch (n + 1) is ready for commit before epoch n is, epoch (n + 1) will wait for epoch n to be committed first.
How was this patch tested?
Existing tests in ContinuousSuite and EpochCoordinatorSuite.