Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25299] Simpler scheduler integration #555

Closed
wants to merge 5 commits into from

Conversation

yifeih
Copy link

@yifeih yifeih commented May 22, 2019

No description provided.

@yifeih
Copy link
Author

yifeih commented May 22, 2019

This is a much simpler, less invasive version of of #548, but with limited functionality (and I'm not entirely sure it works... see below for reasoning)

To better support async and individual file server implementations, it would be helpful to be able to retrigger map tasks when a fetch failure happens (if we do not support retriggering map tasks, a fetch failure will always result in the entire job failing). This change allows retriggering on the simplest level: it only invalidates the mapper associated with the FetchFailure, and doesn't attempt to remove other MapStatuses on the same host or execId.

In this scenario, we'd expect other MapStatuses on the same hosts/execIds will be removed by other FetchFailed exceptions from other reducers. However, I'm not entirely sure it currently works like this. I noticed that the FetchFailed exception is ignored if the current stage's attemptId is not the one returned by the FetchFailed exception (https://github.com/palantir/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1519). Based on some of the code here in submitMissingTasks() (https://github.com/palantir/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1110), I think it takes whatever MapStatuses are missing in the MapOutputTracker and only resubmits those. If we're waiting on other executors to report back FetchFailed from the same source, and the FetchFailed errors don't return quickly enough, then a resubmission of missing tasks between every FetchFailure could resulting in maximizing the attemptId before we've marked all the MapStatuses for the same host/execId as needing a retry.

Hopefully that made sense? At least, I think that's how it works, in which case we'd probably need another type of solution...

@squito @mccheah for comments?

@squito
Copy link

squito commented May 24, 2019

yes, I think you're description of the problem is correct -- but I just commented on your google doc that I think you have the same problem in the use of unregisterOtherMapStatusesOnFetchFailure in the async case.

I'm wondering if really the driver needs to handle a new message UpdatedShuffleBlockLocation which would allow async replicas, rebalancers etc. to tell the driver. (But I'm not sure we need to do that now either.)

I'm still a little stuck, on this and #548, on why we need to allow each shuffle block to go to a different location, instead of sending the entire output of one map task to the same destination. Obviously that would be more flexible, but not doing that still seems to allow a lot of the designs we've been considering, while reducing the complexity a lot, so seems like the right step for incremental improvement.

@yifeih yifeih closed this Aug 6, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants