Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1796] fix(spark): Implicitly unregister map output on fetch failure #1797

Draft
wants to merge 23 commits into
base: master
Choose a base branch
from

Conversation

zuston
Copy link
Member

@zuston zuston commented Jun 17, 2024

What changes were proposed in this pull request?

  1. Implicitly unregister map output on fetch failure
  2. Introduce the unified RssShuffleStatus to track the stage task failure, and depending on this, fix the incorrect retry check condition that will be checked the task failure times whether reaching the spark.task.maxFailures value rather than partitionId failure or shuffleServer failure.
  3. Remove the 2-phase rpcs of write/fetch by using the simple rpc on reportFetch/WriteFailure to speed up

Why are the changes needed?

Fix: #1796. #1801 #1798

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit tests

@zuston zuston marked this pull request as draft June 17, 2024 06:36
@zuston zuston requested a review from advancedxy June 17, 2024 06:36
@zuston
Copy link
Member Author

zuston commented Jun 17, 2024

Could you help review this? @advancedxy If I understand incorrectly, feel free to point out

Copy link

github-actions bot commented Jun 17, 2024

Test Results

2 116 files   -   533  2 116 suites   - 533   2h 28m 20s ⏱️ - 2h 58m 20s
  656 tests  -   289    640 ✅  -   304   1 💤 ±0   2 ❌ + 2   13 🔥 + 13 
9 826 runs   - 1 955  9 586 ✅  - 2 180  15 💤 ±0  30 ❌ +30  195 🔥 +195 

For more details on these failures and errors, see this check.

Results for commit 73e5020. ± Comparison against base commit f8e4329.

This pull request removes 289 tests.
org.apache.hadoop.mapred.SortWriteBufferManagerTest ‑ testCombineBuffer
org.apache.hadoop.mapred.SortWriteBufferManagerTest ‑ testCommitBlocksWhenMemoryShuffleDisabled
org.apache.hadoop.mapred.SortWriteBufferManagerTest ‑ testOnePartition
org.apache.hadoop.mapred.SortWriteBufferManagerTest ‑ testWriteException
org.apache.hadoop.mapred.SortWriteBufferManagerTest ‑ testWriteNormal
org.apache.hadoop.mapred.SortWriteBufferTest ‑ testReadWrite
org.apache.hadoop.mapred.SortWriteBufferTest ‑ testSortBufferIterator
org.apache.hadoop.mapreduce.RssMRUtilsTest ‑ applyDynamicClientConfTest
org.apache.hadoop.mapreduce.RssMRUtilsTest ‑ baskAttemptIdTest
org.apache.hadoop.mapreduce.RssMRUtilsTest ‑ blockConvertTest
…

♻️ This comment has been updated with latest results.

@@ -371,6 +374,19 @@ public static RssException reportRssFetchFailedException(
rssFetchFailedException.getMessage());
RssReportShuffleFetchFailureResponse response = client.reportShuffleFetchFailure(req);
if (response.getReSubmitWholeStage()) {
TaskContext taskContext = TaskContext.get();
RssReassignServersRequest rssReassignServersRequest =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for bring this up.

I think I made a mistake in the previous impl, which doesn't unregister all the map output with shuffle fetch failure.
I think the right place to unregister the map output should be ShuffleManagerGrpcService's reportShuffleFetchFailure. When enough number of fetch failure is reported, it should unregister all the map output and tell the client to report a FetchFailedException. The shuffle server reassignment could be triggered too if configured to do so.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good insight. Thanks for your advice and I will go on

@zuston zuston marked this pull request as ready for review June 19, 2024 09:23
@zuston
Copy link
Member Author

zuston commented Jun 19, 2024

cc @yl09099 Could you help check some write failure logic, I have refactored these part code and make it align with the fetch failure.

LOG.warn("The shuffleId:{}, stageId:{} has been retried. Ignore it.");
return false;
}
if (shuffleStatus.getTaskFailureAttemptCount() >= sparkTaskMaxFailures) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In SMJ, one stage has two shuffle readers. If a task fails due to two different shuffle reader, the condition readerShuffleStatus.getTaskFailureAttemptCount() >= sparkTaskMaxFailures, will not expect.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me think more about this case. Do you some further solutions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we can add a state for the stage contain all the shuffleStatus in this stage.

class RssShuffleStageFailureState {
   int stageId;
   List<RssShuffleStatus> shuffleStatusList;
   boolean activateStageRetry();
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. StageId is a good trigger condition for the retry checking. But I'm not sure what I missed, especially for some corner cases? Could you help give some extra advice? @jerqi @advancedxy

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

&& writerShuffleStatus.isStageAttemptRetried(stageAttemptNumber)) {
return true;
}
return false;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In one case, the Reader triggers retry, and the retry is recorded. After the Writer fails to write data for several times, the retry is triggered. However, this method returns that the retry has been performed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The same stageIdAttemptNumber retry will ocurr one time, is this incorrect? @yl09099

@zuston
Copy link
Member Author

zuston commented Jun 24, 2024

Could you help review this? @advancedxy

@codecov-commenter
Copy link

Codecov Report

Attention: Patch coverage is 0% with 48 lines in your changes missing coverage. Please review.

Project coverage is 53.43%. Comparing base (dddcced) to head (afea817).
Report is 20 commits behind head on master.

Files Patch % Lines
...t/request/RssReportShuffleFetchFailureRequest.java 0.00% 15 Missing ⚠️
...t/request/RssReportShuffleWriteFailureRequest.java 0.00% 10 Missing ⚠️
...ffle/common/exception/RssFetchFailedException.java 0.00% 8 Missing ⚠️
...ffle/client/impl/grpc/ShuffleServerGrpcClient.java 0.00% 7 Missing ⚠️
...client/impl/grpc/ShuffleServerGrpcNettyClient.java 0.00% 4 Missing ⚠️
...torage/handler/impl/ComposedClientReadHandler.java 0.00% 4 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #1797      +/-   ##
============================================
- Coverage     53.53%   53.43%   -0.11%     
- Complexity     2356     2395      +39     
============================================
  Files           368      371       +3     
  Lines         16852    17156     +304     
  Branches       1540     1571      +31     
============================================
+ Hits           9022     9167     +145     
- Misses         7303     7451     +148     
- Partials        527      538      +11     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@advancedxy
Copy link
Contributor

Could you help review this? @advancedxy

The's CI failures, you may need to take a look at that first.

I'll take a look at this later tonight or tomorrow.

@advancedxy
Copy link
Contributor

advancedxy commented Jun 25, 2024

Thanks for working on this. I did a quick overview about this change, I think it's quite large to review. It would best to keep
this pr open and split it into several smaller PRs, namely:

  1. the API/interface change by including shuffle server info RssFetchFailed Exception(we may discuss more about that in the new PR), RssReportShuffleFetchFailureRequest and the RssShuffleStatus
  2. The rework of shuffle manager service and RssStageResubmitManager
  3. unify logic about fetch failure and write failure handling.
  4. left remaining logic if necessary.

@zuston
Copy link
Member Author

zuston commented Jun 25, 2024

Thanks for working on this. I did a quick overview about this change, I think it's quite large to review. It would best to keep this pr open and split it into several smaller PRs, namely:

  1. the API/interface change by including shuffle server info RssFetchFailed Exception(we may discuss more about that in the new PR), RssReportShuffleFetchFailureRequest and the RssShuffleStatus
  2. The rework of shuffle manager service and RssStageResubmitManager
  3. unify logic about fetch failure and write failure handling.
  4. left remaining logic if necessary.

Emm... Sorry I don't think this is a huge change, this is mostly based on your previous great work, just fix some bugs. And I don't have much time to rework to split multi PRs, so I hope we could review this in this PR to check whether some critical problems exist.

@advancedxy
Copy link
Contributor

Emm... Sorry I don't think this is a huge change, this is mostly based on your previous great work, just fix some bugs.

I agree it's not huge, but it's large enough that requires sufficient meta capacity and time to review, which unfortunately I don't have until this weekend.

And I don't have much time to rework to split multi PRs, so I hope we could review this in this PR to check whether some critical problems exist.

Well understood. However, I think this PR should be split into two PRs at least: handle fetch failure and handle write failures. I think the write failure is different from fetch failure and should be decoupled.

@jerqi do you have the time to review this by any chance? Otherwise, it will take some time and probably be reviewed in this weekend, do that sound right to you? @zuston

@zuston
Copy link
Member Author

zuston commented Jun 25, 2024

Good to know this. Thanks for your determined reply. @advancedxy

@zuston
Copy link
Member Author

zuston commented Jun 25, 2024

However, I think this PR should be split into two PRs at least: handle fetch failure and handle write failures. I think the write failure is different from fetch failure and should be decoupled.

Could you help show what’s the difference between fetc and write failure?

@advancedxy
Copy link
Contributor

Could you help show what’s the difference between fetch and write failure?

For starters, you should report shuffle failure and write failure via different request types. The stage retry logic is also a bit off as you need to retry the parent stage for fetch failure, but the current stage for write failure. They might have common logic to remove map output data, etc. But they should be handled in separate PRs instead of one, which makes the PR changes huge and hard to review.

@zuston
Copy link
Member Author

zuston commented Jun 27, 2024

After digging into this feature, I found there are many bugs and improvement need to be done. So I have to split them into small patch to fix. And this PR will be as the collection to place them to test the availablity

The stage retry logic is also a bit off as you need to retry the parent stage for fetch failure, but the current stage for write failure

Yes. This is the difference that I have distinguished in current PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug] Remaining map output when enable the stage retry on fetch failure
5 participants