-
Notifications
You must be signed in to change notification settings - Fork 148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#1757] feat(server): Add block number check on getting shuffle result #1758
Changes from 7 commits
c2a1a8f
db0e92c
9193803
673652d
8e8c3d6
b82f3e4
07d9668
c8ad64d
1c7908e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -506,14 +506,24 @@ public void reportShuffleResult( | |
"appId[" + appId + "], shuffleId[" + shuffleId + "], taskAttemptId[" + taskAttemptId + "]"; | ||
|
||
try { | ||
int expected = | ||
partitionToBlockIds.values().stream().map(x -> x.length).reduce(0, (a, b) -> a + b); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is more readable: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good. |
||
LOG.info( | ||
"Report " | ||
+ partitionToBlockIds.size() | ||
+ " blocks as shuffle result for the task of " | ||
+ requestInfo); | ||
shuffleServer | ||
.getShuffleTaskManager() | ||
.addFinishedBlockIds(appId, shuffleId, partitionToBlockIds, bitmapNum); | ||
"Accepted {} blockIds report of {} partitions as shuffle result for the task of {}", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the log could be more readable:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good. |
||
expected, | ||
partitionToBlockIds.size(), | ||
request); | ||
int updated = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
shuffleServer | ||
.getShuffleTaskManager() | ||
.addFinishedBlockIds(appId, shuffleId, partitionToBlockIds, bitmapNum); | ||
if (expected != updated) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
LOG.warn( | ||
"Existing {} duplicated blockIds on blockId report for appId: {}, shuffleId: {}", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately, we encountered this warn level log these days, it make us stressed. Is there indicate something wrong? Can you explain what rare condition scenario can make updatedBlockCount less than expectedBlockCount? Why there are duplicated blockIds on blockId report? |
||
expected - updated, | ||
appId, | ||
shuffleId); | ||
} | ||
} catch (Exception e) { | ||
status = StatusCode.INTERNAL_ERROR; | ||
msg = "error happened when report shuffle result, check shuffle server for detail"; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -387,7 +387,16 @@ public StatusCode commitShuffle(String appId, int shuffleId) throws Exception { | |
return StatusCode.SUCCESS; | ||
} | ||
|
||
public void addFinishedBlockIds( | ||
/** | ||
* Add finished blockIds from client | ||
* | ||
* @param appId | ||
* @param shuffleId | ||
* @param partitionToBlockIds | ||
* @param bitmapNum | ||
* @return the number of added blockIds | ||
*/ | ||
public int addFinishedBlockIds( | ||
String appId, Integer shuffleId, Map<Integer, long[]> partitionToBlockIds, int bitmapNum) { | ||
refreshAppId(appId); | ||
Map<Integer, Roaring64NavigableMap[]> shuffleIdToPartitions = partitionsToBlockIds.get(appId); | ||
|
@@ -413,15 +422,28 @@ public void addFinishedBlockIds( | |
+ " bitmaps!"); | ||
} | ||
|
||
ShuffleTaskInfo taskInfo = getShuffleTaskInfo(appId); | ||
if (taskInfo == null) { | ||
throw new InvalidRequestException( | ||
"ShuffleTaskInfo is not found that should not happen for appId: " + appId); | ||
} | ||
int totalUpdated = 0; | ||
for (Map.Entry<Integer, long[]> entry : partitionToBlockIds.entrySet()) { | ||
Integer partitionId = entry.getKey(); | ||
Roaring64NavigableMap bitmap = blockIds[partitionId % bitmapNum]; | ||
int updated = 0; | ||
synchronized (bitmap) { | ||
for (long blockId : entry.getValue()) { | ||
bitmap.addLong(blockId); | ||
if (!bitmap.contains(blockId)) { | ||
bitmap.addLong(blockId); | ||
updated++; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Refer to @jerqi 's suggestion, you may also rename this: |
||
totalUpdated++; | ||
} | ||
} | ||
} | ||
taskInfo.incBlockNumber(shuffleId, partitionId, updated); | ||
} | ||
return totalUpdated; | ||
} | ||
|
||
public int updateAndGetCommitCount(String appId, int shuffleId) { | ||
|
@@ -553,13 +575,18 @@ public byte[] getFinishedBlockIds( | |
} | ||
Map<Integer, Roaring64NavigableMap[]> shuffleIdToPartitions = partitionsToBlockIds.get(appId); | ||
if (shuffleIdToPartitions == null) { | ||
LOG.warn("Empty blockIds for app: {}. This should not happen", appId); | ||
return null; | ||
} | ||
|
||
Roaring64NavigableMap[] blockIds = shuffleIdToPartitions.get(shuffleId); | ||
if (blockIds == null) { | ||
LOG.warn("Empty blockIds for app: {}, shuffleId: {}", appId, shuffleId); | ||
return new byte[] {}; | ||
} | ||
|
||
ShuffleTaskInfo taskInfo = getShuffleTaskInfo(appId); | ||
long expectedBlockNumber = 0; | ||
Map<Integer, Set<Integer>> bitmapIndexToPartitions = Maps.newHashMap(); | ||
for (int partitionId : partitions) { | ||
int bitmapIndex = partitionId % blockIds.length; | ||
|
@@ -569,6 +596,7 @@ public byte[] getFinishedBlockIds( | |
HashSet<Integer> newHashSet = Sets.newHashSet(partitionId); | ||
bitmapIndexToPartitions.put(bitmapIndex, newHashSet); | ||
} | ||
expectedBlockNumber += taskInfo.getBlockNumber(shuffleId, partitionId); | ||
} | ||
|
||
Roaring64NavigableMap res = Roaring64NavigableMap.bitmapOf(); | ||
|
@@ -577,6 +605,17 @@ public byte[] getFinishedBlockIds( | |
Roaring64NavigableMap bitmap = blockIds[entry.getKey()]; | ||
getBlockIdsByPartitionId(requestPartitions, bitmap, res, blockIdLayout); | ||
} | ||
|
||
if (res.getLongCardinality() != expectedBlockNumber) { | ||
throw new RssException( | ||
"Inconsistent block number for partitions: " | ||
+ partitions | ||
+ ". Excepted: " | ||
+ expectedBlockNumber | ||
+ ", actual: " | ||
+ res.getLongCardinality()); | ||
} | ||
|
||
return RssUtils.serializeBitMap(res); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe
expectedNum
andupdatedNum
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needn't. The type has indicated everything.