-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-3411] Improve load-balancing of concurrently-submitted drivers across workers #1106
Conversation
Can one of the admins verify this patch? |
did you see any performance impact on the current strategy, "randomization at the start of every schedule point" is used not only at Master but also TaskSchedulerImpl... until so far, it works fine...... |
You mean the increased shuffles may lead to a bad performance? |
yes, potentially... and considering the long-term running of the cluster, eventually, the load is well balanced with the current strategy.....i.e. this commit only contributes to the case that the user submits a lot of drivers in a single batch and then waits for the result... |
Another situation is that the works lists changes frequently, which will make drivers relaunching a lot. Even when resources in all workers is not enough at same time, the drivers array could become larger as users keep submitting. |
I'm not sure about efficiency of changing to another mode for the extreme case that some worker (which is exactly the one running a lot of drivers) joins and leaves constantly.... for the second case, when the resources in all workers is not enough at same time, the driver cannot be scheduled..... |
Make it short, the commit will better balance the load strategy when there comes a lot of drivers, while not result in bad performance when drivers is few as the randomization frequency depends on drivers number. |
@WangTaoTheTonic Can you up merge this with master? Also could you file a JIRA associated with this PR? |
for (driver <- waitingDrivers) { | ||
for (driver <- waitingDrivers) { | ||
val aliveWorkers = workers.filter(_.state == WorkerState.ALIVE) | ||
val shuffledWorkers = Random.shuffle(aliveWorkers) // Randomization helps balance drivers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This creates multiple copies of all the workers and could be slow for large clusters. Maybe it's a better idea to keep track of a list of alive workers instead
e3d87e2
to
e243c5f
Compare
The PR is: https://issues.apache.org/jira/browse/SPARK-3411. |
Can you give this PR a more descriptive title? "Optimize the schedule procedure in Master" sounds like it could describe many different changes, so it's kind of hard to figure out what this PR might do by reading the current title. I'd go with something like "[SPARK-3411] Improve load-balancing of concurrently-submitted drivers across workers." |
I agree that this seems like a bit of a rare corner-case. |
@WangTaoTheTonic I looked at this more and I think it will actually be slower with the new changes. Before this patch we shuffle all the workers only once, but here we shuffle it for each waiting driver. If we have thousands of workers, shuffling can become fairly expensive. As for the original problem you set out to solve, it seems to me that the existing code is already doing the randomization correctly. This is because we draw the first worker from a list of shuffled workers, so it will already be random by the time we draw from it. |
If the problem is all of the drivers landing on the same randomly-chosen worker, I suppose you could treat the randomized list as a circular buffer and go through it in a round-robin order when picking which worker to launch the next driver on. |
Oh I see. Wouldn't it be sufficient to just pop the head of |
Yeah, I suppose so, but there was one corner-case that I was concerned about (that is addressed by treating it as a circular buffer): Let's say we have a scenario where we are trying to schedule three drivers but there are only two workers that they can run on, and let's also say that there's initially enough capacity to run all three drivers. In that case, I think we would pop the head of The circular buffer brings its own problems, though: let's say that there are no valid locations where we can schedule the driver. In this case, we should stop looping through the buffer so that we don't go into an infinite loop. |
Hm, it looks like (semi-pseudocode)
|
To @JoshRosen , the pr title is already modified, so is the jira. |
QA tests have started for PR 1106 at commit
|
QA tests have finished for PR 1106 at commit
|
QA tests have started for PR 1106 at commit
|
QA tests have finished for PR 1106 at commit
|
9a7bf94
to
2ca3091
Compare
QA tests have started for PR 1106 at commit
|
QA tests have finished for PR 1106 at commit
|
val startPos = curPos | ||
curPos = (curPos + 1) % aliveWorkerNum | ||
var launched = false | ||
while (curPos - 1 != startPos && !launched) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Somewhat minor, but why don't we just declare val startPos = curPos
after we increment it? Then we don't need the -1 here
@WangTaoTheTonic This mostly looks good. I left a few more minor comments. Anything else to add @markhamstra and @JoshRosen? |
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) | ||
val aliveWorkerNum = shuffledAliveWorkers.size | ||
var curPos = 0 | ||
for (driver <- List(waitingDrivers: _*)) { // iterate over a copy of waitingDrivers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be just:
for (driver <- waitingDrivers.toList)
QA tests have started for PR 1106 at commit
|
QA tests have finished for PR 1106 at commit
|
76346bc
to
d1a928b
Compare
QA tests have started for PR 1106 at commit
|
QA tests have finished for PR 1106 at commit
|
Thanks @WangTaoTheTonic I'm merging this |
I think, this issue is caused by #1106 Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp> Closes #2436 from sarutak/SPARK-3571 and squashes the following commits: 7a4deea [Kousuke Saruta] Modified Master.scala to use numWorkersVisited and numWorkersAlive instead of stopPos 4e51e35 [Kousuke Saruta] Modified Master to prevent from 0 divide 4817ecd [Kousuke Saruta] Brushed up previous change 71e84b6 [Kousuke Saruta] Modified Master to enable schedule normally
## What changes were proposed in this pull request? This patch contains the functionality to balance the load of the cluster-mode drivers among workers This patch restores the changes in #1106 which was erased due to the merging of #731 ## How was this patch tested? test with existing test cases Author: CodingCat <zhunansjtu@gmail.com> Closes #11702 from CodingCat/SPARK-13803. (cherry picked from commit bd5365b) Signed-off-by: Sean Owen <sowen@cloudera.com>
## What changes were proposed in this pull request? This patch contains the functionality to balance the load of the cluster-mode drivers among workers This patch restores the changes in #1106 which was erased due to the merging of #731 ## How was this patch tested? test with existing test cases Author: CodingCat <zhunansjtu@gmail.com> Closes #11702 from CodingCat/SPARK-13803.
## What changes were proposed in this pull request? This patch contains the functionality to balance the load of the cluster-mode drivers among workers This patch restores the changes in #1106 which was erased due to the merging of #731 ## How was this patch tested? test with existing test cases Author: CodingCat <zhunansjtu@gmail.com> Closes #11702 from CodingCat/SPARK-13803. (cherry picked from commit bd5365b) Signed-off-by: Sean Owen <sowen@cloudera.com>
## What changes were proposed in this pull request? This patch contains the functionality to balance the load of the cluster-mode drivers among workers This patch restores the changes in #1106 which was erased due to the merging of #731 ## How was this patch tested? test with existing test cases Author: CodingCat <zhunansjtu@gmail.com> Closes #11702 from CodingCat/SPARK-13803. (cherry picked from commit bd5365b) Signed-off-by: Sean Owen <sowen@cloudera.com>
## What changes were proposed in this pull request? This patch contains the functionality to balance the load of the cluster-mode drivers among workers This patch restores the changes in apache#1106 which was erased due to the merging of apache#731 ## How was this patch tested? test with existing test cases Author: CodingCat <zhunansjtu@gmail.com> Closes apache#11702 from CodingCat/SPARK-13803.
…Base (apache#1106) Co-authored-by: Egor Krivokon <>
If the waiting driver array is too big, the drivers in it will be dispatched to the first worker we get(if it has enough resources), with or without the Randomization.
We should do randomization every time we dispatch a driver, in order to better balance drivers.