Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24063][SS] Add maximum epoch queue threshold for ContinuousExecution #23156

Closed
wants to merge 10 commits into from

Conversation

gaborgsomogyi
Copy link
Contributor

What changes were proposed in this pull request?

Continuous processing is waiting on epochs which are not yet complete (for example one partition is not making progress) and stores pending items in queues. These queues are unbounded and can consume up all the memory easily. In this PR I've added spark.sql.streaming.continuous.epochBacklogQueueSize configuration possibility to make them bounded. If the related threshold reached then the query will stop with IllegalStateException.

How was this patch tested?

Existing + additional unit tests.

@SparkQA
Copy link

SparkQA commented Nov 27, 2018

Test build #99328 has finished for PR 23156 at commit 72733c5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor Author

cc @jose-torres @HeartSaVioR

@arunmahadevan
Copy link
Contributor

Rather than controlling the queue sizes it would be better to limit the max epoch backlog and fail the query once that threshold is reached. There already seems to be patch that attempted to address this #21392

@gaborgsomogyi
Copy link
Contributor Author

@arunmahadevan don't fully understand your comment:

Rather than controlling the queue sizes it would be better to limit the max epoch backlog and fail the query once that threshold is reached.

I've written the following in the PR description:

If the related threshold reached then the query will stop with IllegalStateException.

AFAIK max epoch backlog == epochsWaitingToBeCommitted which is a queue,
but that's not the only unbounded part of EpochCoordinator (please see additional unit tests).
As a result I've limited partitionOffsets and partitionCommits as well.

@arunmahadevan
Copy link
Contributor

@gaborgsomogyi what I meant was rather than exposing a config to control the internal queue sizes, we could have a higher level config like the max pending epochs. This would act as a back pressure mechanism to stop further processing until the pending epochs are committed. I assume this would also put a limit on the three queues.

@gaborgsomogyi
Copy link
Contributor Author

gaborgsomogyi commented Dec 6, 2018

@arunmahadevan as I understand this is more like renaming the config than changing what the PR basically does, have I understood it well?

Not having backpressure but stopping the query is already agreed on another PRs, please check them. If the backlog reaches 10k items there is no way back.

@SparkQA
Copy link

SparkQA commented Dec 7, 2018

Test build #99822 has finished for PR 23156 at commit b0c5056.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Dec 10, 2018

I'd avoid not jumping in something regarding continuous mode unless the overall design (including aggregation and join) of continuous mode is cleared and stabilized.

@gaborgsomogyi
Copy link
Contributor Author

I thought this part is not affected. Who leads it? Asking it because haven't seen progress anywhere.

@HeartSaVioR
Copy link
Contributor

I think @jose-torres previously led the feature.

@gaborgsomogyi
Copy link
Contributor Author

Ah, ok. This solution was agreed with him on #20936.

@gaborgsomogyi
Copy link
Contributor Author

BTW, coming back to your clean up PR but it takes some time to switch context :)

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Dec 10, 2018

@gaborgsomogyi No problem :) When you get some other times please take a look at my other PRs as well. There're some shorter PRs as well.

@SparkQA
Copy link

SparkQA commented Jan 22, 2019

Test build #101550 has finished for PR 23156 at commit b0c5056.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 24, 2019

Test build #101640 has finished for PR 23156 at commit 357f834.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor Author

ping @jose-torres

Copy link
Contributor

@jose-torres jose-torres left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few comments, but I agree with the general strategy.

@gaborgsomogyi
Copy link
Contributor Author

Yay @jose-torres, congratulation becoming a committer! 🙂
Next week will address the suggestions.

@SparkQA
Copy link

SparkQA commented Jan 28, 2019

Test build #101760 has finished for PR 23156 at commit f6bc301.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor Author

ping @jose-torres

@gaborgsomogyi
Copy link
Contributor Author

cc @dongjoon-hyun

@dongjoon-hyun
Copy link
Member

Thank you for ping me, @gaborgsomogyi . Retest this please.

@SparkQA
Copy link

SparkQA commented Feb 10, 2019

Test build #102158 has finished for PR 23156 at commit f6bc301.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor Author

cc @vanzin

@gaborgsomogyi
Copy link
Contributor Author

Standing here for long time and I think resolved all the comments. Can someone pick this up?

@gaborgsomogyi
Copy link
Contributor Author

@vanzin thanks for your time!

@SparkQA
Copy link

SparkQA commented Feb 22, 2019

Test build #102657 has finished for PR 23156 at commit 96152da.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 22, 2019

Test build #102662 has finished for PR 23156 at commit 43e61ef.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 22, 2019

Test build #102664 has finished for PR 23156 at commit 9324c90.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 25, 2019

Test build #102746 has finished for PR 23156 at commit 8a9b67f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jose-torres
Copy link
Contributor

(The PR looks fine to me modulo @vanzin 's review comments - sorry I dropped it for so long.)

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just minor things. I'm not a big fan of checking exact exception messages in tests, but in this case it seems ok.

@SparkQA
Copy link

SparkQA commented Feb 27, 2019

Test build #102820 has finished for PR 23156 at commit d67db64.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Feb 27, 2019

Merging to master.

@vanzin vanzin closed this in c4bbfd1 Feb 27, 2019
Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My 2 cents, when a query fails due to the queue capability, it's already falling behind. What the user may do is probably just restarting the query and it will take time to bring a query back, which will make the situation worse.

A better way to solve this problem is making continuous processing support backpressure.

def stopInNewThread(error: Throwable): Unit = {
if (failure.compareAndSet(null, error)) {
logError(s"Query $prettyIdString received exception $error")
stopInNewThread()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there is a race here. The query stop may happen before the continuous-execution checks failure and the query will just stop without any exception, just like someone stops a query manually.

mccheah pushed a commit to palantir/spark that referenced this pull request May 15, 2019
…cution

## What changes were proposed in this pull request?

Continuous processing is waiting on epochs which are not yet complete (for example one partition is not making progress) and stores pending items in queues. These queues are unbounded and can consume up all the memory easily. In this PR I've added `spark.sql.streaming.continuous.epochBacklogQueueSize` configuration possibility to make them bounded. If the related threshold reached then the query will stop with `IllegalStateException`.

## How was this patch tested?

Existing + additional unit tests.

Closes apache#23156 from gaborgsomogyi/SPARK-24063.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants