-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-27355][SS] Make query execution more sensitive to epoch message late or lost #24283
Conversation
cc @jose-torres |
Test build #104248 has finished for PR 24283 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
Outdated
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
Outdated
Show resolved
Hide resolved
Test build #104290 has finished for PR 24283 at commit
|
As Attila pointed out it contains unnecessary changes which maybe distracted me but at the first glance I don't see the main value.
Not sure if I understand the main reasoning. If one Kafka partition have some problem because that specific server is slower than others after 10 epoch just kill the query? If it's an intermittent problem not sure it's the right thing to do. |
@gaborgsomogyi Thanks for your reply. #23156 introduced a maximum queue threshold before stop the stream with a error. In #23156 , we used the same threshold for different queue, i.e. |
I see the main intention now. I agree that the different queues are filled up with different speed and considered this when the configuration added. I think the threshold is configurable which makes the actual implementation flexible enough to handle this situation. From my point of view additional fine tuning doesn't really help but makes this code more complex which has to be maintained. |
@gaborgsomogyi As you said, different queues are filled up with different speed, so we can not set a proper value for this config. If we set it too small, the queue will be filled up quickly when the number of partition is large enough. But if we set it too large, we may wait for many epochs before failure if partition number is small enough, like 1. So can we merge these two configs, just keep the epoch threshold? Then there is no need to check |
Test build #104373 has finished for PR 24283 at commit
|
ping @gaborgsomogyi @zsxwing |
@gaborgsomogyi I have unified the two configs. We will only check late epochs, but not check |
Test build #104826 has finished for PR 24283 at commit
|
Test build #104828 has finished for PR 24283 at commit
|
ping @gaborgsomogyi and @attilapiros |
Test build #105197 has finished for PR 24283 at commit
|
Test build #105201 has finished for PR 24283 at commit
|
retest this please |
Test build #105243 has finished for PR 24283 at commit
|
We're closing this PR because it hasn't been updated in a while. If you'd like to revive this PR, please reopen it! |
What changes were proposed in this pull request?
In SPARK-23503, we enforce sequencing of committed epochs for Continuous Execution. In case a message for epoch n is lost and epoch (n + 1) is ready for commit before epoch n is, epoch (n + 1) will wait for epoch n to be committed first. With extreme condition, we will wait for
epochBacklogQueueSize
(10000 in default) epochs and then failed. There is no need to wait for such a long time before query fail if there maybe some message LATE/LOST. In this PR, we make the condition more sensitive.How was this patch tested?
update existing unit tests.