Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1757] feat(server): Add block number check on getting shuffle result #1758

Merged
merged 9 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -671,10 +671,10 @@ public static Stream<Arguments> testBlockIdLayouts() {
@ParameterizedTest
@MethodSource("testBlockIdLayouts")
public void multipleShuffleResultTest(BlockIdLayout layout) throws Exception {
String appId = "multipleShuffleResultTest_" + layout.sequenceNoBits;
Set<Long> expectedBlockIds = Sets.newConcurrentHashSet();
RssRegisterShuffleRequest rrsr =
new RssRegisterShuffleRequest(
"multipleShuffleResultTest", 100, Lists.newArrayList(new PartitionRange(0, 1)), "");
new RssRegisterShuffleRequest(appId, 100, Lists.newArrayList(new PartitionRange(0, 1)), "");
grpcShuffleServerClient.registerShuffle(rrsr);

Runnable r1 =
Expand All @@ -687,7 +687,7 @@ public void multipleShuffleResultTest(BlockIdLayout layout) throws Exception {
blockIds.add(blockId);
ptbs.put(1, blockIds);
RssReportShuffleResultRequest req1 =
new RssReportShuffleResultRequest("multipleShuffleResultTest", 1, 0, ptbs, 1);
new RssReportShuffleResultRequest(appId, 1, 0, ptbs, 1);
grpcShuffleServerClient.reportShuffleResult(req1);
}
};
Expand All @@ -701,7 +701,7 @@ public void multipleShuffleResultTest(BlockIdLayout layout) throws Exception {
blockIds.add(blockId);
ptbs.put(1, blockIds);
RssReportShuffleResultRequest req1 =
new RssReportShuffleResultRequest("multipleShuffleResultTest", 1, 1, ptbs, 1);
new RssReportShuffleResultRequest(appId, 1, 1, ptbs, 1);
grpcShuffleServerClient.reportShuffleResult(req1);
}
};
Expand All @@ -715,7 +715,7 @@ public void multipleShuffleResultTest(BlockIdLayout layout) throws Exception {
blockIds.add(blockId);
ptbs.put(1, blockIds);
RssReportShuffleResultRequest req1 =
new RssReportShuffleResultRequest("multipleShuffleResultTest", 1, 2, ptbs, 1);
new RssReportShuffleResultRequest(appId, 1, 2, ptbs, 1);
grpcShuffleServerClient.reportShuffleResult(req1);
}
};
Expand All @@ -734,8 +734,7 @@ public void multipleShuffleResultTest(BlockIdLayout layout) throws Exception {
blockIdBitmap.addLong(blockId);
}

RssGetShuffleResultRequest req =
new RssGetShuffleResultRequest("multipleShuffleResultTest", 1, 1, layout);
RssGetShuffleResultRequest req = new RssGetShuffleResultRequest(appId, 1, 1, layout);
RssGetShuffleResultResponse result = grpcShuffleServerClient.getShuffleResult(req);
Roaring64NavigableMap actualBlockIdBitmap = result.getBlockIdBitmap();
assertEquals(blockIdBitmap, actualBlockIdBitmap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,14 +506,23 @@ public void reportShuffleResult(
"appId[" + appId + "], shuffleId[" + shuffleId + "], taskAttemptId[" + taskAttemptId + "]";

try {
int expectedBlockCount = partitionToBlockIds.values().stream().mapToInt(x -> x.length).sum();
LOG.info(
"Report "
+ partitionToBlockIds.size()
+ " blocks as shuffle result for the task of "
+ requestInfo);
shuffleServer
.getShuffleTaskManager()
.addFinishedBlockIds(appId, shuffleId, partitionToBlockIds, bitmapNum);
"Accepted blockIds report for {} blocks across {} partitions as shuffle result for task {}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log is verbose and fill server log, is there a better way to reduce it or do you think DEBUG level can be better?

expectedBlockCount,
partitionToBlockIds.size(),
request);
int updatedBlockCount =
shuffleServer
.getShuffleTaskManager()
.addFinishedBlockIds(appId, shuffleId, partitionToBlockIds, bitmapNum);
if (expectedBlockCount != updatedBlockCount) {
LOG.warn(
"Existing {} duplicated blockIds on blockId report for appId: {}, shuffleId: {}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, we encountered this warn level log these days, it make us stressed.

Is there indicate something wrong? Can you explain what rare condition scenario can make updatedBlockCount less than expectedBlockCount?

Why there are duplicated blockIds on blockId report?

expectedBlockCount - updatedBlockCount,
appId,
shuffleId);
}
} catch (Exception e) {
status = StatusCode.INTERNAL_ERROR;
msg = "error happened when report shuffle result, check shuffle server for detail";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public class ShuffleTaskInfo {

private final AtomicReference<ShuffleSpecification> specification;

private final Map<Integer, Map<Integer, AtomicLong>> partitionBlockCounters;

public ShuffleTaskInfo(String appId) {
this.appId = appId;
this.currentTimes = System.currentTimeMillis();
Expand All @@ -75,6 +77,7 @@ public ShuffleTaskInfo(String appId) {
this.hugePartitionTags = JavaUtils.newConcurrentMap();
this.existHugePartition = new AtomicBoolean(false);
this.specification = new AtomicReference<>();
this.partitionBlockCounters = JavaUtils.newConcurrentMap();
}

public Long getCurrentTimes() {
Expand Down Expand Up @@ -198,6 +201,25 @@ public Set<Integer> getShuffleIds() {
return partitionDataSizes.keySet();
}

public void incBlockNumber(int shuffleId, int partitionId, int delta) {
this.partitionBlockCounters
.computeIfAbsent(shuffleId, x -> JavaUtils.newConcurrentMap())
.computeIfAbsent(partitionId, x -> new AtomicLong())
.addAndGet(delta);
}

public long getBlockNumber(int shuffleId, int partitionId) {
Map<Integer, AtomicLong> partitionBlockCounters = this.partitionBlockCounters.get(shuffleId);
if (partitionBlockCounters == null) {
zuston marked this conversation as resolved.
Show resolved Hide resolved
return 0L;
}
AtomicLong counter = partitionBlockCounters.get(partitionId);
if (counter == null) {
return 0L;
}
return counter.get();
}

@Override
public String toString() {
return "ShuffleTaskInfo{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,16 @@ public StatusCode commitShuffle(String appId, int shuffleId) throws Exception {
return StatusCode.SUCCESS;
}

public void addFinishedBlockIds(
/**
* Add finished blockIds from client
*
* @param appId
* @param shuffleId
* @param partitionToBlockIds
* @param bitmapNum
* @return the number of added blockIds
*/
public int addFinishedBlockIds(
String appId, Integer shuffleId, Map<Integer, long[]> partitionToBlockIds, int bitmapNum) {
refreshAppId(appId);
Map<Integer, Roaring64NavigableMap[]> shuffleIdToPartitions = partitionsToBlockIds.get(appId);
Expand All @@ -413,15 +422,28 @@ public void addFinishedBlockIds(
+ " bitmaps!");
}

ShuffleTaskInfo taskInfo = getShuffleTaskInfo(appId);
if (taskInfo == null) {
throw new InvalidRequestException(
"ShuffleTaskInfo is not found that should not happen for appId: " + appId);
}
int totalUpdatedBlockCount = 0;
for (Map.Entry<Integer, long[]> entry : partitionToBlockIds.entrySet()) {
Integer partitionId = entry.getKey();
Roaring64NavigableMap bitmap = blockIds[partitionId % bitmapNum];
int updatedBlockCount = 0;
synchronized (bitmap) {
for (long blockId : entry.getValue()) {
bitmap.addLong(blockId);
if (!bitmap.contains(blockId)) {
bitmap.addLong(blockId);
updatedBlockCount++;
totalUpdatedBlockCount++;
}
}
}
taskInfo.incBlockNumber(shuffleId, partitionId, updatedBlockCount);
}
return totalUpdatedBlockCount;
}

public int updateAndGetCommitCount(String appId, int shuffleId) {
Expand Down Expand Up @@ -553,13 +575,18 @@ public byte[] getFinishedBlockIds(
}
Map<Integer, Roaring64NavigableMap[]> shuffleIdToPartitions = partitionsToBlockIds.get(appId);
if (shuffleIdToPartitions == null) {
LOG.warn("Empty blockIds for app: {}. This should not happen", appId);
return null;
}

Roaring64NavigableMap[] blockIds = shuffleIdToPartitions.get(shuffleId);
if (blockIds == null) {
LOG.warn("Empty blockIds for app: {}, shuffleId: {}", appId, shuffleId);
return new byte[] {};
}

ShuffleTaskInfo taskInfo = getShuffleTaskInfo(appId);
long expectedBlockNumber = 0;
Map<Integer, Set<Integer>> bitmapIndexToPartitions = Maps.newHashMap();
for (int partitionId : partitions) {
int bitmapIndex = partitionId % blockIds.length;
Expand All @@ -569,6 +596,7 @@ public byte[] getFinishedBlockIds(
HashSet<Integer> newHashSet = Sets.newHashSet(partitionId);
bitmapIndexToPartitions.put(bitmapIndex, newHashSet);
}
expectedBlockNumber += taskInfo.getBlockNumber(shuffleId, partitionId);
}

Roaring64NavigableMap res = Roaring64NavigableMap.bitmapOf();
Expand All @@ -577,6 +605,17 @@ public byte[] getFinishedBlockIds(
Roaring64NavigableMap bitmap = blockIds[entry.getKey()];
getBlockIdsByPartitionId(requestPartitions, bitmap, res, blockIdLayout);
}

if (res.getLongCardinality() != expectedBlockNumber) {
throw new RssException(
"Inconsistent block number for partitions: "
+ partitions
+ ". Excepted: "
+ expectedBlockNumber
+ ", actual: "
+ res.getLongCardinality());
}

return RssUtils.serializeBitMap(res);
}

Expand Down
Loading