Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-4136. Under dynamic allocation, cancel outstanding executor requests when no longer needed #4168

Closed
wants to merge 3 commits into from

Conversation

sryza
Copy link
Contributor

@sryza sryza commented Jan 23, 2015

This takes advantage of the changes made in SPARK-4337 to cancel pending requests to YARN when they are no longer needed.

Each time the timer in ExecutorAllocationManager strikes, we compute maxNumNeededExecutors, the maximum number of executors we could fill with the current load. This is calculated as the total number of running and pending tasks divided by the number of cores per executor. If maxNumNeededExecutors is below the total number of running and pending executors, we call requestTotalExecutors(maxNumNeededExecutors) to let the cluster manager know that it should cancel any pending requests above this amount. If not, maxNumNeededExecutors is just used as a bound in alongside the configured maxExecutors to limit the number of new requests.

The patch modifies the API exposed by ExecutorAllocationClient for requesting additional executors by moving from requestExecutors to requestTotalExecutors. This makes the communication between the ExecutorAllocationManager and the YarnAllocator easier to reason about and removes some state that needed to be kept in the CoarseGrainedSchedulerBackend. I think an argument can be made that this makes for a less attractive user-facing API in SparkContext, but I'm having trouble envisioning situations where a user would want to use either of these APIs.

This will likely break some tests, but I wanted to get feedback on the approach before adding tests and polishing.

@SparkQA
Copy link

SparkQA commented Jan 23, 2015

Test build #25993 has started for PR 4168 at commit 79763d9.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 23, 2015

Test build #25993 has finished for PR 4168 at commit 79763d9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25993/
Test FAILed.

val now = clock.getTimeMillis
if (addTime != NOT_SET && now >= addTime) {
addExecutors()
if (maxNumNeededExecutors < numExecutorsPending + executorIds.size) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to exclude executorsPendingToRemove.size?because YarnAllocator have killed toRemoveExecutors, but ExecutorAllocationManager maybe donot receive onExecutorRemoved message. so that time executorIds has removed executors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I think you are right.

@SparkQA
Copy link

SparkQA commented Jan 27, 2015

Test build #26168 has started for PR 4168 at commit 9ba0e01.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 27, 2015

Test build #26168 has finished for PR 4168 at commit 9ba0e01.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26168/
Test FAILed.

@vanzin
Copy link
Contributor

vanzin commented Jan 28, 2015

@sryza looks like the test failures are legit?

@@ -438,6 +444,7 @@ private[spark] class ExecutorAllocationManager(
}

override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
numRunningTasks += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we should let numRunningTasks to synchronized,because schedule thread and listener thread are two different thread.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. It doesn't need synchronization because only one thread is writing to it, but we should make it volatile.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take that back on further inspection. It should be synchronized.

@andrewor14
Copy link
Contributor

@sryza is this still WIP? Are we aiming for this to go into 1.3?

@sryza sryza changed the title SPARK-4136. Under dynamic allocation, cancel outstanding executor requests when no longer needed [WIP] SPARK-4136. Under dynamic allocation, cancel outstanding executor requests when no longer needed Feb 5, 2015
@sryza
Copy link
Contributor Author

sryza commented Feb 5, 2015

@andrewor14 it's no longer a WIP, and I am aiming for it for 1.3. I just updated the title - sorry for the confusion.

@andrewor14
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Feb 9, 2015

Test build #27073 has started for PR 4168 at commit 9ba0e01.

  • This patch does not merge cleanly.

@SparkQA
Copy link

SparkQA commented Feb 9, 2015

Test build #27077 has started for PR 4168 at commit 16db9f4.

  • This patch merges cleanly.

/**
* If the add time has expired, request new executors and refresh the add time.
* If the remove time for an existing executor has expired, kill the executor.
* This is called at a fixed interval to relegate the number of pending executor requests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think relegate here is the right word. Did you mean regulate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops definitely

@SparkQA
Copy link

SparkQA commented Feb 9, 2015

Test build #27073 has finished for PR 4168 at commit 9ba0e01.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 10, 2015

Test build #27170 has started for PR 4168 at commit 37ce77d.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 10, 2015

Test build #27170 has finished for PR 4168 at commit 37ce77d.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27170/
Test FAILed.

* Request an additional number of executors from the cluster manager.
* Return whether the request is acknowledged by the cluster manager.
* This is currently only supported in YARN mode. Return whether the request is received.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to add in the javadocs that it's only supported in YARN mode, we should do it for all the methods here or just move this to the class javadocs. I prefer the latter.

@andrewor14
Copy link
Contributor

Hey @sryza thanks for reiterating quickly on the reviews. I left 1 question but other than that this looks pretty close.

@SparkQA
Copy link

SparkQA commented Feb 10, 2015

Test build #27168 has finished for PR 4168 at commit f80b7ec.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class GetField(child: Expression, field: StructField, ordinal: Int) extends UnaryExpression

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27168/
Test PASSed.

@pwendell
Copy link
Contributor

(I only looked at the public API's, but those look fine to me now - there are none!)

@andrewor14
Copy link
Contributor

Hey @sryza thanks a lot for fixing this. I will merge this into master and 1.3 after fixing the last batch of comments that I pointed out when I merge this.

* result in canceling pending requests or filing additional requests.
* This is currently only supported in Yarn mode. Return whether the request is received.
*/
@DeveloperApi
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not developer api if it's private[spark]

asfgit pushed a commit that referenced this pull request Feb 10, 2015
…uests when no longer needed

This takes advantage of the changes made in SPARK-4337 to cancel pending requests to YARN when they are no longer needed.

Each time the timer in `ExecutorAllocationManager` strikes, we compute `maxNumNeededExecutors`, the maximum number of executors we could fill with the current load.  This is calculated as the total number of running and pending tasks divided by the number of cores per executor.  If `maxNumNeededExecutors` is below the total number of running and pending executors, we call `requestTotalExecutors(maxNumNeededExecutors)` to let the cluster manager know that it should cancel any pending requests above this amount.  If not, `maxNumNeededExecutors` is just used as a bound in alongside the configured `maxExecutors` to limit the number of new requests.

The patch modifies the API exposed by `ExecutorAllocationClient` for requesting additional executors by moving from `requestExecutors` to `requestTotalExecutors`.  This makes the communication between the `ExecutorAllocationManager` and the `YarnAllocator` easier to reason about and removes some state that needed to be kept in the `CoarseGrainedSchedulerBackend`.  I think an argument can be made that this makes for a less attractive user-facing API in `SparkContext`, but I'm having trouble envisioning situations where a user would want to use either of these APIs.

This will likely break some tests, but I wanted to get feedback on the approach before adding tests and polishing.

Author: Sandy Ryza <sandy@cloudera.com>

Closes #4168 from sryza/sandy-spark-4136 and squashes the following commits:

37ce77d [Sandy Ryza] Warn on negative number
cd3b2ff [Sandy Ryza] SPARK-4136

(cherry picked from commit 69bc3bb)
Signed-off-by: Andrew Or <andrew@databricks.com>
@SparkQA
Copy link

SparkQA commented Feb 10, 2015

Test build #27220 has started for PR 4168 at commit 3cca880.

  • This patch merges cleanly.

@asfgit asfgit closed this in 69bc3bb Feb 10, 2015
@SparkQA
Copy link

SparkQA commented Feb 10, 2015

Test build #27220 has finished for PR 4168 at commit 3cca880.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27220/
Test FAILed.

@andrewor14
Copy link
Contributor

@sryza looks like I narrowly missed your last commit when I merged this. I have copied the changes there to this HOTFIX commit: b640c84

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants