-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-4136. Under dynamic allocation, cancel outstanding executor requests when no longer needed #4168
Conversation
Test build #25993 has started for PR 4168 at commit
|
Test build #25993 has finished for PR 4168 at commit
|
Test FAILed. |
val now = clock.getTimeMillis | ||
if (addTime != NOT_SET && now >= addTime) { | ||
addExecutors() | ||
if (maxNumNeededExecutors < numExecutorsPending + executorIds.size) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to exclude executorsPendingToRemove.size?because YarnAllocator have killed toRemoveExecutors, but ExecutorAllocationManager maybe donot receive onExecutorRemoved message. so that time executorIds has removed executors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I think you are right.
79763d9
to
9ba0e01
Compare
Test build #26168 has started for PR 4168 at commit
|
Test build #26168 has finished for PR 4168 at commit
|
Test FAILed. |
@sryza looks like the test failures are legit? |
@@ -438,6 +444,7 @@ private[spark] class ExecutorAllocationManager( | |||
} | |||
|
|||
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { | |||
numRunningTasks += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we should let numRunningTasks to synchronized,because schedule thread and listener thread are two different thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. It doesn't need synchronization because only one thread is writing to it, but we should make it volatile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I take that back on further inspection. It should be synchronized.
@sryza is this still WIP? Are we aiming for this to go into 1.3? |
@andrewor14 it's no longer a WIP, and I am aiming for it for 1.3. I just updated the title - sorry for the confusion. |
retest this please |
Test build #27073 has started for PR 4168 at commit
|
9ba0e01
to
16db9f4
Compare
Test build #27077 has started for PR 4168 at commit
|
/** | ||
* If the add time has expired, request new executors and refresh the add time. | ||
* If the remove time for an existing executor has expired, kill the executor. | ||
* This is called at a fixed interval to relegate the number of pending executor requests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think relegate here is the right word. Did you mean regulate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops definitely
Test build #27073 has finished for PR 4168 at commit
|
f80b7ec
to
37ce77d
Compare
Test build #27170 has started for PR 4168 at commit
|
Test build #27170 has finished for PR 4168 at commit
|
Test FAILed. |
* Request an additional number of executors from the cluster manager. | ||
* Return whether the request is acknowledged by the cluster manager. | ||
* This is currently only supported in YARN mode. Return whether the request is received. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to add in the javadocs that it's only supported in YARN mode, we should do it for all the methods here or just move this to the class javadocs. I prefer the latter.
Hey @sryza thanks for reiterating quickly on the reviews. I left 1 question but other than that this looks pretty close. |
Test build #27168 has finished for PR 4168 at commit
|
Test PASSed. |
(I only looked at the public API's, but those look fine to me now - there are none!) |
Hey @sryza thanks a lot for fixing this. I will merge this into master and 1.3 after fixing the last batch of comments that I pointed out when I merge this. |
* result in canceling pending requests or filing additional requests. | ||
* This is currently only supported in Yarn mode. Return whether the request is received. | ||
*/ | ||
@DeveloperApi |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not developer api if it's private[spark]
…uests when no longer needed This takes advantage of the changes made in SPARK-4337 to cancel pending requests to YARN when they are no longer needed. Each time the timer in `ExecutorAllocationManager` strikes, we compute `maxNumNeededExecutors`, the maximum number of executors we could fill with the current load. This is calculated as the total number of running and pending tasks divided by the number of cores per executor. If `maxNumNeededExecutors` is below the total number of running and pending executors, we call `requestTotalExecutors(maxNumNeededExecutors)` to let the cluster manager know that it should cancel any pending requests above this amount. If not, `maxNumNeededExecutors` is just used as a bound in alongside the configured `maxExecutors` to limit the number of new requests. The patch modifies the API exposed by `ExecutorAllocationClient` for requesting additional executors by moving from `requestExecutors` to `requestTotalExecutors`. This makes the communication between the `ExecutorAllocationManager` and the `YarnAllocator` easier to reason about and removes some state that needed to be kept in the `CoarseGrainedSchedulerBackend`. I think an argument can be made that this makes for a less attractive user-facing API in `SparkContext`, but I'm having trouble envisioning situations where a user would want to use either of these APIs. This will likely break some tests, but I wanted to get feedback on the approach before adding tests and polishing. Author: Sandy Ryza <sandy@cloudera.com> Closes #4168 from sryza/sandy-spark-4136 and squashes the following commits: 37ce77d [Sandy Ryza] Warn on negative number cd3b2ff [Sandy Ryza] SPARK-4136 (cherry picked from commit 69bc3bb) Signed-off-by: Andrew Or <andrew@databricks.com>
Test build #27220 has started for PR 4168 at commit
|
Test build #27220 has finished for PR 4168 at commit
|
Test FAILed. |
This takes advantage of the changes made in SPARK-4337 to cancel pending requests to YARN when they are no longer needed.
Each time the timer in
ExecutorAllocationManager
strikes, we computemaxNumNeededExecutors
, the maximum number of executors we could fill with the current load. This is calculated as the total number of running and pending tasks divided by the number of cores per executor. IfmaxNumNeededExecutors
is below the total number of running and pending executors, we callrequestTotalExecutors(maxNumNeededExecutors)
to let the cluster manager know that it should cancel any pending requests above this amount. If not,maxNumNeededExecutors
is just used as a bound in alongside the configuredmaxExecutors
to limit the number of new requests.The patch modifies the API exposed by
ExecutorAllocationClient
for requesting additional executors by moving fromrequestExecutors
torequestTotalExecutors
. This makes the communication between theExecutorAllocationManager
and theYarnAllocator
easier to reason about and removes some state that needed to be kept in theCoarseGrainedSchedulerBackend
. I think an argument can be made that this makes for a less attractive user-facing API inSparkContext
, but I'm having trouble envisioning situations where a user would want to use either of these APIs.This will likely break some tests, but I wanted to get feedback on the approach before adding tests and polishing.