-
Notifications
You must be signed in to change notification settings - Fork 148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#1757] feat(server): Add block number check on getting shuffle result #1758
Conversation
@@ -198,6 +201,25 @@ public Set<Integer> getShuffleIds() { | |||
return partitionDataSizes.keySet(); | |||
} | |||
|
|||
public void incrBlockNumber(int shuffleId, int partitionId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
incrBlockNumber -> incBlockNumber
This should be a better naming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In which scenario do you expect the block number to disalign? What are you trying to guard against here? One side we add block ids and on the other side we retrieve block ids. You suspect incorrect usage of the API by the client or a bug server side? Sounds like that should be guarded against by testing so we are confident, not in production runtime.
server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
Outdated
Show resolved
Hide resolved
We suspect the race condition may exist on the process of reporting shuffle result, which may cause the blockId miss. But this is not determinzed, and so I want to introduce the extra blockId counter check for every partition. Anyway, fast fail is better than the silcent incorrect succeed. |
Maybe |
Yes. Let me optimize this part tomorrow. |
server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
Outdated
Show resolved
Hide resolved
@@ -506,14 +506,24 @@ public void reportShuffleResult( | |||
"appId[" + appId + "], shuffleId[" + shuffleId + "], taskAttemptId[" + taskAttemptId + "]"; | |||
|
|||
try { | |||
int expected = | |||
partitionToBlockIds.values().stream().map(x -> x.length).reduce(0, (a, b) -> a + b); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is more readable:
int expected = partitionToBlockIds.values().stream().mapToInt(x -> x.length).sum();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
@@ -506,14 +506,24 @@ public void reportShuffleResult( | |||
"appId[" + appId + "], shuffleId[" + shuffleId + "], taskAttemptId[" + taskAttemptId + "]"; | |||
|
|||
try { | |||
int expected = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe expectedNum
and updatedNum
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needn't. The type has indicated everything.
shuffleServer | ||
.getShuffleTaskManager() | ||
.addFinishedBlockIds(appId, shuffleId, partitionToBlockIds, bitmapNum); | ||
"Accepted {} blockIds report of {} partitions as shuffle result for the task of {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the log could be more readable:
LOG.info(
"Accepted blockIds report for {} blocks across {} partitions as shuffle result for task {}",
expectedNum,
partitionToBlockIds.size(),
request);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
expected, | ||
partitionToBlockIds.size(), | ||
request); | ||
int updated = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
-> updatedBlockCount
shuffleServer | ||
.getShuffleTaskManager() | ||
.addFinishedBlockIds(appId, shuffleId, partitionToBlockIds, bitmapNum); | ||
if (expected != updated) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expected
-> expectedBlockCount
bitmap.addLong(blockId); | ||
if (!bitmap.contains(blockId)) { | ||
bitmap.addLong(blockId); | ||
updated++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refer to @jerqi 's suggestion, you may also rename this:
updatedBlockCount
totalUpdatedBlockCount
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Thanks everyone for your review. Merged. |
…ng shuffle result apache#1758 LINK: apache#1758
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zuston I have couple of question inline, would you please take a look?
shuffleServer | ||
.getShuffleTaskManager() | ||
.addFinishedBlockIds(appId, shuffleId, partitionToBlockIds, bitmapNum); | ||
"Accepted blockIds report for {} blocks across {} partitions as shuffle result for task {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This log is verbose and fill server log, is there a better way to reduce it or do you think DEBUG
level can be better?
.addFinishedBlockIds(appId, shuffleId, partitionToBlockIds, bitmapNum); | ||
if (expectedBlockCount != updatedBlockCount) { | ||
LOG.warn( | ||
"Existing {} duplicated blockIds on blockId report for appId: {}, shuffleId: {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, we encountered this warn level log these days, it make us stressed.
Is there indicate something wrong? Can you explain what rare condition scenario can make updatedBlockCount less than expectedBlockCount?
Why there are duplicated blockIds on blockId report?
What changes were proposed in this pull request?
Add block number check on getting shuffle result
Why are the changes needed?
Data validation, ensure data stable and correct
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing tests.