-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24063][SS] Add maximum epoch queue threshold for ContinuousExecution #23156
Conversation
Test build #99328 has finished for PR 23156 at commit
|
Rather than controlling the queue sizes it would be better to limit the max epoch backlog and fail the query once that threshold is reached. There already seems to be patch that attempted to address this #21392 |
@arunmahadevan don't fully understand your comment:
I've written the following in the PR description:
AFAIK |
@gaborgsomogyi what I meant was rather than exposing a config to control the internal queue sizes, we could have a higher level config like the max pending epochs. This would act as a back pressure mechanism to stop further processing until the pending epochs are committed. I assume this would also put a limit on the three queues. |
@arunmahadevan as I understand this is more like renaming the config than changing what the PR basically does, have I understood it well? Not having backpressure but stopping the query is already agreed on another PRs, please check them. If the backlog reaches 10k items there is no way back. |
Test build #99822 has finished for PR 23156 at commit
|
I'd avoid not jumping in something regarding continuous mode unless the overall design (including aggregation and join) of continuous mode is cleared and stabilized. |
I thought this part is not affected. Who leads it? Asking it because haven't seen progress anywhere. |
I think @jose-torres previously led the feature. |
Ah, ok. This solution was agreed with him on #20936. |
BTW, coming back to your clean up PR but it takes some time to switch context :) |
@gaborgsomogyi No problem :) When you get some other times please take a look at my other PRs as well. There're some shorter PRs as well. |
Test build #101550 has finished for PR 23156 at commit
|
Test build #101640 has finished for PR 23156 at commit
|
ping @jose-torres |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few comments, but I agree with the general strategy.
...src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
Outdated
Show resolved
Hide resolved
Yay @jose-torres, congratulation becoming a committer! 🙂 |
Test build #101760 has finished for PR 23156 at commit
|
ping @jose-torres |
Thank you for ping me, @gaborgsomogyi . Retest this please. |
Test build #102158 has finished for PR 23156 at commit
|
cc @vanzin |
Standing here for long time and I think resolved all the comments. Can someone pick this up? |
...src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
Show resolved
Hide resolved
...src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
Outdated
Show resolved
Hide resolved
* CONSTANT.key used in tests * Removed newline hell
@vanzin thanks for your time! |
Test build #102657 has finished for PR 23156 at commit
|
Test build #102662 has finished for PR 23156 at commit
|
Test build #102664 has finished for PR 23156 at commit
|
Test build #102746 has finished for PR 23156 at commit
|
(The PR looks fine to me modulo @vanzin 's review comments - sorry I dropped it for so long.) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just minor things. I'm not a big fan of checking exact exception messages in tests, but in this case it seems ok.
...re/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
Outdated
Show resolved
Hide resolved
Test build #102820 has finished for PR 23156 at commit
|
Merging to master. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My 2 cents, when a query fails due to the queue capability, it's already falling behind. What the user may do is probably just restarting the query and it will take time to bring a query back, which will make the situation worse.
A better way to solve this problem is making continuous processing support backpressure.
def stopInNewThread(error: Throwable): Unit = { | ||
if (failure.compareAndSet(null, error)) { | ||
logError(s"Query $prettyIdString received exception $error") | ||
stopInNewThread() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like there is a race here. The query stop may happen before the continuous-execution checks failure
and the query will just stop without any exception, just like someone stops a query manually.
…cution ## What changes were proposed in this pull request? Continuous processing is waiting on epochs which are not yet complete (for example one partition is not making progress) and stores pending items in queues. These queues are unbounded and can consume up all the memory easily. In this PR I've added `spark.sql.streaming.continuous.epochBacklogQueueSize` configuration possibility to make them bounded. If the related threshold reached then the query will stop with `IllegalStateException`. ## How was this patch tested? Existing + additional unit tests. Closes apache#23156 from gaborgsomogyi/SPARK-24063. Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
What changes were proposed in this pull request?
Continuous processing is waiting on epochs which are not yet complete (for example one partition is not making progress) and stores pending items in queues. These queues are unbounded and can consume up all the memory easily. In this PR I've added
spark.sql.streaming.continuous.epochBacklogQueueSize
configuration possibility to make them bounded. If the related threshold reached then the query will stop withIllegalStateException
.How was this patch tested?
Existing + additional unit tests.