-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-1706: Allow multiple executors per worker in Standalone mode #731
Conversation
worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app) | ||
private def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = { | ||
worker.memoryFree >= app.desc.memoryPerExecutor && !worker.hasExecutor(app) && | ||
worker.coresFree > 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure about this, but does the above mean that an application can be scheduled only once to a worker at a given point of time ?
So even if there are multiple cores, different partitions cant be executed in parallel for an app on that worker ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes,
but this function is only called when we want to schedule a single executor to a certain worker
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So what happens if the worker is already running one executor for the app - we cant schedule another executor on that worker until previous one is done ? (in this or subsequent schedule attempts)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so....and we can only assign the executor to the same worker in the subsequent schedule() calls...
and this logic has been here for a long while (at least since 0.8.x), the scheduling mode proposed in this PR is just to relax this constraint
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Earlier, since single executor, it meant something else.
Now, there is a difference... I don't think this is what we would want.
Though, I would defer to others on this .. @matrix any thoughts ?
On 12-May-2014 5:32 am, "Nan Zhu" notifications@github.com wrote:
In core/src/main/scala/org/apache/spark/deploy/master/Master.scala:
@@ -466,30 +466,14 @@ private[spark] class Master(
* launched an executor for the app on it (right now the standalone backend doesn't like having
* two executors on the same worker).
*/
- def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = {
- worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app)
- private def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = {
- worker.memoryFree >= app.desc.memoryPerExecutor && !worker.hasExecutor(app) &&
- worker.coresFree > 0
I think so....and we can only assign the executor to the same worker in
the subsequent schedule() calls...and this logic has been here for a long while (at least since 0.8.x), the
scheduling mode proposed in this PR is just to relax this constraint—
Reply to this email directly or view it on GitHubhttps://github.com//pull/731/files#r12511647
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe i think it has another environment.it schedule a single executor to a worker for every application.memory depends on the cores of assigning to worker,not the config 'spark.executor.memory'. because for application master assign different cores to executor,but these executors have the same memory.
@mridulm thanks for the comments, I addressed them and redefined the confusing maxCoreLeft variable, |
@@ -20,7 +20,7 @@ package org.apache.spark.deploy | |||
private[spark] class ApplicationDescription( | |||
val name: String, | |||
val maxCores: Option[Int], | |||
val memoryPerSlave: Int, | |||
val memoryPerExecutor: Int, // in Mb |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just call this memoryPerExecutorMB.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, fixed
ping.... |
QA tests have started for PR 731. This patch merges cleanly. |
QA results for PR 731: |
Would it be possible to reuse existing parameters: spark.executor.instances and spark.executor.cores instead of introducing new ones? |
@nishkamravi2 en....it's OK to reuse the parameters, I'm just not sure which option is more convenient for the user..... |
ping |
I think it would be better to reuse the same parameters to minimize discrepancy across different scheduling modes at the interface level. Also, once this PR gets merged, do we have a compelling use case for starting multiple workers per node or can we retire params like SPARK_WORKER_INSTANCES? Minor comment: could be modified to: //allow user to run multiple executor processes on a worker node (managed by a single worker daemon/JVM) Have this PR been tested beyond automated unit tests? |
you mean start multiple executors per worker? |
if this PR gets merged, i think we can retire params like SPARK_WORKER_INSTANCES.this PR also can allowe user to run multiple executors on a node. so multiple workers on a node is unnecessary. |
I haven't looked at the code change, but it might not be a good idea to remove support for multiple workers in a machine. When you have really large memory machines it is sometimes better to run multiple JVMs with smaller heaps rather than one big JVM. This is mostly to avoid GC pauses which grow with larger heap sizes. |
Correct me if I'm wrong, but I think this PR intends to facilitate that by allowing multiple executor JVM's to run on the same node (per worker) as opposed to launching multiple worker daemons and one executor per worker. Multiple worker daemons can be considered redundant. |
@nishkamravi2 I got your point now... yes, this patch is to enable the user to run multiple executors with a single worker instead of running multiple workers as @shivaram said, this is mainly for a better way of utilizing the memory space from my opinion, we should still keep supporting the multiple worker per server mechanism...before 1.0, when I used Spark, one of the big headache for me is that the worker can die due to the overloaded executor.....causing other executors die together....from 1.0, it seems to be much better, (maybe because we kill the executor process in its own thread now, but I'm not sure)....multiple workers can prevent this case |
QA tests have started for PR 731. This patch DID NOT merge cleanly! |
QA results for PR 731: |
@CodingCat Do we need to ping anyone specific to look at this PR? It's been many months since the last update. |
I will rebase this one and send email to Kay.... |
61af2e4
to
b4a8a68
Compare
@nchammas , sorry, thought it was SPARK-1143..I think it is supposed to be reviewed by @andrewor14 @pwendell and @mridulm ??? |
Test build #25251 has finished for PR 731 at commit
|
b4a8a68
to
782dfcb
Compare
Test build #25266 has finished for PR 731 at commit
|
782dfcb
to
2608d11
Compare
Test build #25323 has finished for PR 731 at commit
|
2608d11
to
d2f413a
Compare
Test build #29994 has finished for PR 731 at commit
|
@andrewor14 , I just found that, if we configure exact core number per executor, the current strategy will be incorrect (or say a bit user-unfriendly) e.g. I have 8 cores, 2 cores per machine, an application would like to use all of them; in spread mode, we will get an array we can say 3 here is not a smart choice, but with this case, the user has to understand how shall we go forward with the configuration of the exact number but change the allocation algorithm (bring a much larger patch, like before), or we go back to the max core number configuration? |
Test build #30016 has finished for PR 731 at commit
|
IGNORE THISafter rethinking about this patch, I think it seems to be fine to allocate zero cores in the case I mentioned above, and we just need to filter out those workers if the freeCore in the worker is less than the spark.executor.cores (if it is defined) |
Ignore my last comment, I still hold the position that,
|
Test build #30098 has finished for PR 731 at commit
|
Test build #30097 has finished for PR 731 at commit
|
* Can an app use the given worker? True if the worker has enough memory and we haven't already | ||
* launched an executor for the app on it (right now the standalone backend doesn't like having | ||
* two executors on the same worker). | ||
* Schedule executors to be launched on the workers.There are two modes of launching executors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you break "There are two modes of..." into a new paragraph?
@CodingCat We shouldn't have to worry about the case when the user asks for more resources per executors than are available on each worker. If each worker only has 2 cores the user shouldn't ask for 3 per executor. This holds regardless of whether spread out mode is used, since an executor cannot be "split" across machines. The existing approach is fine. |
@CodingCat I'm merging this into master. Thanks for keeping this patch open for a long time and patiently reiterating on the reviews. I think the final solution we have here is much simpler than the one we began with. |
Hey, @andrewor14 , my pleasure, many thanks for your patient review |
Test build #30274 has finished for PR 731 at commit
|
## What changes were proposed in this pull request? This patch contains the functionality to balance the load of the cluster-mode drivers among workers This patch restores the changes in #1106 which was erased due to the merging of #731 ## How was this patch tested? test with existing test cases Author: CodingCat <zhunansjtu@gmail.com> Closes #11702 from CodingCat/SPARK-13803. (cherry picked from commit bd5365b) Signed-off-by: Sean Owen <sowen@cloudera.com>
## What changes were proposed in this pull request? This patch contains the functionality to balance the load of the cluster-mode drivers among workers This patch restores the changes in #1106 which was erased due to the merging of #731 ## How was this patch tested? test with existing test cases Author: CodingCat <zhunansjtu@gmail.com> Closes #11702 from CodingCat/SPARK-13803.
## What changes were proposed in this pull request? This patch contains the functionality to balance the load of the cluster-mode drivers among workers This patch restores the changes in #1106 which was erased due to the merging of #731 ## How was this patch tested? test with existing test cases Author: CodingCat <zhunansjtu@gmail.com> Closes #11702 from CodingCat/SPARK-13803. (cherry picked from commit bd5365b) Signed-off-by: Sean Owen <sowen@cloudera.com>
## What changes were proposed in this pull request? This patch contains the functionality to balance the load of the cluster-mode drivers among workers This patch restores the changes in #1106 which was erased due to the merging of #731 ## How was this patch tested? test with existing test cases Author: CodingCat <zhunansjtu@gmail.com> Closes #11702 from CodingCat/SPARK-13803. (cherry picked from commit bd5365b) Signed-off-by: Sean Owen <sowen@cloudera.com>
## What changes were proposed in this pull request? This patch contains the functionality to balance the load of the cluster-mode drivers among workers This patch restores the changes in apache#1106 which was erased due to the merging of apache#731 ## How was this patch tested? test with existing test cases Author: CodingCat <zhunansjtu@gmail.com> Closes apache#11702 from CodingCat/SPARK-13803.
resubmit of #636 for a totally different algorithm
https://issues.apache.org/jira/browse/SPARK-1706
In current implementation, the user has to start multiple workers in a server for starting multiple executors in a server, which introduces additional overhead due to the more JVM processes...
In this patch, I changed the scheduling logic in master to enable the user to start multiple executor processes within the same JVM process.
min(min(memoryPerExecutor, worker.freeCore), maxLeftCoreToAssign)
wheremaxLeftCoreToAssign = maxExecutorCanAssign * maxCoreNumPerExecutor
Other small changes include
change memoryPerSlave in ApplicationDescription to memoryPerExecutor, as "Slave" is overrided to represent both worker and executor in the documents... (we have some discussion on this before?)