Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1706: Allow multiple executors per worker in Standalone mode #731

Closed
wants to merge 27 commits into from

Conversation

CodingCat
Copy link
Contributor

resubmit of #636 for a totally different algorithm

https://issues.apache.org/jira/browse/SPARK-1706

In current implementation, the user has to start multiple workers in a server for starting multiple executors in a server, which introduces additional overhead due to the more JVM processes...

In this patch, I changed the scheduling logic in master to enable the user to start multiple executor processes within the same JVM process.

  1. user configure spark.executor.maxCoreNumPerExecutor to suggest the maximum core he/she would like to allocate to each executor
  2. Master assigns the executors to the workers with the major consideration on the memoryPerExecutor and the worker.freeMemory, and tries to allocate as many as possible cores to the executor min(min(memoryPerExecutor, worker.freeCore), maxLeftCoreToAssign) where maxLeftCoreToAssign = maxExecutorCanAssign * maxCoreNumPerExecutor

Other small changes include

change memoryPerSlave in ApplicationDescription to memoryPerExecutor, as "Slave" is overrided to represent both worker and executor in the documents... (we have some discussion on this before?)

worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app)
private def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = {
worker.memoryFree >= app.desc.memoryPerExecutor && !worker.hasExecutor(app) &&
worker.coresFree > 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about this, but does the above mean that an application can be scheduled only once to a worker at a given point of time ?
So even if there are multiple cores, different partitions cant be executed in parallel for an app on that worker ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes,

but this function is only called when we want to schedule a single executor to a certain worker

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what happens if the worker is already running one executor for the app - we cant schedule another executor on that worker until previous one is done ? (in this or subsequent schedule attempts)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so....and we can only assign the executor to the same worker in the subsequent schedule() calls...

and this logic has been here for a long while (at least since 0.8.x), the scheduling mode proposed in this PR is just to relax this constraint

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Earlier, since single executor, it meant something else.
Now, there is a difference... I don't think this is what we would want.
Though, I would defer to others on this .. @matrix any thoughts ?
On 12-May-2014 5:32 am, "Nan Zhu" notifications@github.com wrote:

In core/src/main/scala/org/apache/spark/deploy/master/Master.scala:

@@ -466,30 +466,14 @@ private[spark] class Master(
* launched an executor for the app on it (right now the standalone backend doesn't like having
* two executors on the same worker).
*/

  • def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = {
  • worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app)
  • private def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = {
  • worker.memoryFree >= app.desc.memoryPerExecutor && !worker.hasExecutor(app) &&
  • worker.coresFree > 0

I think so....and we can only assign the executor to the same worker in
the subsequent schedule() calls...

and this logic has been here for a long while (at least since 0.8.x), the
scheduling mode proposed in this PR is just to relax this constraint


Reply to this email directly or view it on GitHubhttps://github.com//pull/731/files#r12511647
.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe i think it has another environment.it schedule a single executor to a worker for every application.memory depends on the cores of assigning to worker,not the config 'spark.executor.memory'. because for application master assign different cores to executor,but these executors have the same memory.

@CodingCat
Copy link
Contributor Author

@mridulm thanks for the comments, I addressed them and redefined the confusing maxCoreLeft variable,

@@ -20,7 +20,7 @@ package org.apache.spark.deploy
private[spark] class ApplicationDescription(
val name: String,
val maxCores: Option[Int],
val memoryPerSlave: Int,
val memoryPerExecutor: Int, // in Mb
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just call this memoryPerExecutorMB.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, fixed

@CodingCat
Copy link
Contributor Author

ping....

@SparkQA
Copy link

SparkQA commented Jul 15, 2014

QA tests have started for PR 731. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16693/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 15, 2014

QA results for PR 731:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16693/consoleFull

@nishkamravi2
Copy link
Contributor

Would it be possible to reuse existing parameters: spark.executor.instances and spark.executor.cores instead of introducing new ones?

@CodingCat
Copy link
Contributor Author

@nishkamravi2 en....it's OK to reuse the parameters, I'm just not sure which option is more convenient for the user.....

@CodingCat
Copy link
Contributor Author

ping

@nishkamravi2
Copy link
Contributor

I think it would be better to reuse the same parameters to minimize discrepancy across different scheduling modes at the interface level. Also, once this PR gets merged, do we have a compelling use case for starting multiple workers per node or can we retire params like SPARK_WORKER_INSTANCES?

Minor comment:
// allow user to run multiple executors in the same worker
// (within the same worker JVM process)

could be modified to:

//allow user to run multiple executor processes on a worker node (managed by a single worker daemon/JVM)

Have this PR been tested beyond automated unit tests?

@CodingCat
Copy link
Contributor Author

you mean start multiple executors per worker?

@lianhuiwang
Copy link
Contributor

if this PR gets merged, i think we can retire params like SPARK_WORKER_INSTANCES.this PR also can allowe user to run multiple executors on a node. so multiple workers on a node is unnecessary.

@shivaram
Copy link
Contributor

I haven't looked at the code change, but it might not be a good idea to remove support for multiple workers in a machine. When you have really large memory machines it is sometimes better to run multiple JVMs with smaller heaps rather than one big JVM. This is mostly to avoid GC pauses which grow with larger heap sizes.

@nishkamravi2
Copy link
Contributor

Correct me if I'm wrong, but I think this PR intends to facilitate that by allowing multiple executor JVM's to run on the same node (per worker) as opposed to launching multiple worker daemons and one executor per worker. Multiple worker daemons can be considered redundant.

@CodingCat
Copy link
Contributor Author

@nishkamravi2 I got your point now... yes, this patch is to enable the user to run multiple executors with a single worker instead of running multiple workers

as @shivaram said, this is mainly for a better way of utilizing the memory space

from my opinion, we should still keep supporting the multiple worker per server mechanism...before 1.0, when I used Spark, one of the big headache for me is that the worker can die due to the overloaded executor.....causing other executors die together....from 1.0, it seems to be much better, (maybe because we kill the executor process in its own thread now, but I'm not sure)....multiple workers can prevent this case

@SparkQA
Copy link

SparkQA commented Aug 15, 2014

QA tests have started for PR 731. This patch DID NOT merge cleanly!
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18587/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 15, 2014

QA results for PR 731:
- This patch PASSES unit tests.

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18587/consoleFull

@nchammas
Copy link
Contributor

nchammas commented Jan 7, 2015

@CodingCat Do we need to ping anyone specific to look at this PR? It's been many months since the last update.

@CodingCat
Copy link
Contributor Author

I will rebase this one and send email to Kay....

@CodingCat
Copy link
Contributor Author

@nchammas , sorry, thought it was SPARK-1143..I think it is supposed to be reviewed by @andrewor14 @pwendell and @mridulm ???

@SparkQA
Copy link

SparkQA commented Jan 8, 2015

Test build #25251 has finished for PR 731 at commit b4a8a68.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 8, 2015

Test build #25266 has finished for PR 731 at commit 782dfcb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 9, 2015

Test build #25323 has finished for PR 731 at commit 2608d11.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 10, 2015

Test build #29994 has finished for PR 731 at commit b8ca561.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class FPGrowthModel(JavaModelWrapper):
    • class FPGrowth(object):
  • This patch does not change any dependencies.

@CodingCat
Copy link
Contributor Author

@andrewor14 , I just found that, if we configure exact core number per executor, the current strategy will be incorrect (or say a bit user-unfriendly)

e.g. I have 8 cores, 2 cores per machine, an application would like to use all of them; in spread mode, we will get an array assigned as Array(2, 2, 2, 2), if we set --executor-cores as 3, then the application will get 0 core, as we have no allocation which is no less than 3...

we can say 3 here is not a smart choice, but with this case, the user has to understand how spread & standalone works to choose a better number...

shall we go forward with the configuration of the exact number but change the allocation algorithm (bring a much larger patch, like before), or we go back to the max core number configuration?

@SparkQA
Copy link

SparkQA commented Apr 10, 2015

Test build #30016 has finished for PR 731 at commit 940cb42.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@CodingCat
Copy link
Contributor Author

IGNORE THIS

after rethinking about this patch, I think it seems to be fine to allocate zero cores in the case I mentioned above, and we just need to filter out those workers if the freeCore in the worker is less than the spark.executor.cores (if it is defined)

@CodingCat
Copy link
Contributor Author

Ignore my last comment, I still hold the position that,

  1. if we want to define the exact number of the cores per executor, we need to change the allocation algorithm (bringing a larger patch)
  2. otherwise, we go back to define the maxNumberOfCores Per Executor

@SparkQA
Copy link

SparkQA commented Apr 12, 2015

Test build #30098 has finished for PR 731 at commit 940cb42.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@SparkQA
Copy link

SparkQA commented Apr 12, 2015

Test build #30097 has finished for PR 731 at commit da102d4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

* Can an app use the given worker? True if the worker has enough memory and we haven't already
* launched an executor for the app on it (right now the standalone backend doesn't like having
* two executors on the same worker).
* Schedule executors to be launched on the workers.There are two modes of launching executors.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you break "There are two modes of..." into a new paragraph?

@andrewor14
Copy link
Contributor

@CodingCat We shouldn't have to worry about the case when the user asks for more resources per executors than are available on each worker. If each worker only has 2 cores the user shouldn't ask for 3 per executor. This holds regardless of whether spread out mode is used, since an executor cannot be "split" across machines. The existing approach is fine.

@andrewor14
Copy link
Contributor

@CodingCat I'm merging this into master. Thanks for keeping this patch open for a long time and patiently reiterating on the reviews. I think the final solution we have here is much simpler than the one we began with.

@asfgit asfgit closed this in 8f8dc45 Apr 14, 2015
@CodingCat
Copy link
Contributor Author

Hey, @andrewor14 , my pleasure, many thanks for your patient review

@SparkQA
Copy link

SparkQA commented Apr 14, 2015

Test build #30274 has finished for PR 731 at commit 6dee808.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

asfgit pushed a commit that referenced this pull request Mar 15, 2016
## What changes were proposed in this pull request?

This patch contains the functionality to balance the load of the cluster-mode drivers among workers

This patch restores the changes in #1106 which was erased due to the merging of #731

## How was this patch tested?

test with existing test cases

Author: CodingCat <zhunansjtu@gmail.com>

Closes #11702 from CodingCat/SPARK-13803.

(cherry picked from commit bd5365b)
Signed-off-by: Sean Owen <sowen@cloudera.com>
asfgit pushed a commit that referenced this pull request Mar 15, 2016
## What changes were proposed in this pull request?

This patch contains the functionality to balance the load of the cluster-mode drivers among workers

This patch restores the changes in #1106 which was erased due to the merging of #731

## How was this patch tested?

test with existing test cases

Author: CodingCat <zhunansjtu@gmail.com>

Closes #11702 from CodingCat/SPARK-13803.
asfgit pushed a commit that referenced this pull request Mar 15, 2016
## What changes were proposed in this pull request?

This patch contains the functionality to balance the load of the cluster-mode drivers among workers

This patch restores the changes in #1106 which was erased due to the merging of #731

## How was this patch tested?

test with existing test cases

Author: CodingCat <zhunansjtu@gmail.com>

Closes #11702 from CodingCat/SPARK-13803.

(cherry picked from commit bd5365b)
Signed-off-by: Sean Owen <sowen@cloudera.com>
asfgit pushed a commit that referenced this pull request Mar 15, 2016
## What changes were proposed in this pull request?

This patch contains the functionality to balance the load of the cluster-mode drivers among workers

This patch restores the changes in #1106 which was erased due to the merging of #731

## How was this patch tested?

test with existing test cases

Author: CodingCat <zhunansjtu@gmail.com>

Closes #11702 from CodingCat/SPARK-13803.

(cherry picked from commit bd5365b)
Signed-off-by: Sean Owen <sowen@cloudera.com>
roygao94 pushed a commit to roygao94/spark that referenced this pull request Mar 22, 2016
## What changes were proposed in this pull request?

This patch contains the functionality to balance the load of the cluster-mode drivers among workers

This patch restores the changes in apache#1106 which was erased due to the merging of apache#731

## How was this patch tested?

test with existing test cases

Author: CodingCat <zhunansjtu@gmail.com>

Closes apache#11702 from CodingCat/SPARK-13803.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.