Skip to content

Commit

Permalink
[#1757] feat(server): Add block number check on getting shuffle result
Browse files Browse the repository at this point in the history
  • Loading branch information
zuston committed May 30, 2024
1 parent a3a49f0 commit da63e43
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public class ShuffleTaskInfo {

private final AtomicReference<ShuffleSpecification> specification;

private final Map<Integer, Map<Integer, AtomicLong>> partitionBlockCounters;

public ShuffleTaskInfo(String appId) {
this.appId = appId;
this.currentTimes = System.currentTimeMillis();
Expand All @@ -75,6 +77,7 @@ public ShuffleTaskInfo(String appId) {
this.hugePartitionTags = JavaUtils.newConcurrentMap();
this.existHugePartition = new AtomicBoolean(false);
this.specification = new AtomicReference<>();
this.partitionBlockCounters = JavaUtils.newConcurrentMap();
}

public Long getCurrentTimes() {
Expand Down Expand Up @@ -198,6 +201,25 @@ public Set<Integer> getShuffleIds() {
return partitionDataSizes.keySet();
}

public void incrBlockNumber(int shuffleId, int partitionId) {
this.partitionBlockCounters
.computeIfAbsent(shuffleId, x -> JavaUtils.newConcurrentMap())
.computeIfAbsent(partitionId, x -> new AtomicLong())
.incrementAndGet();
}

public long getBlockNumber(int shuffleId, int partitionId) {
Map<Integer, AtomicLong> partitionBlockCounters = this.partitionBlockCounters.get(shuffleId);
if (partitionBlockCounters == null) {
return 0L;
}
AtomicLong counter = partitionBlockCounters.get(partitionId);
if (counter == null) {
return 0L;
}
return counter.get();
}

@Override
public String toString() {
return "ShuffleTaskInfo{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,14 @@ public void addFinishedBlockIds(
+ " bitmaps!");
}

ShuffleTaskInfo taskInfo = getShuffleTaskInfo(appId);
if (taskInfo == null) {
throw new InvalidRequestException(
"ShuffleTaskInfo is not found that should not happen for appId: " + appId);
}
for (Map.Entry<Integer, long[]> entry : partitionToBlockIds.entrySet()) {
Integer partitionId = entry.getKey();
taskInfo.incrBlockNumber(shuffleId, partitionId);
Roaring64NavigableMap bitmap = blockIds[partitionId % bitmapNum];
synchronized (bitmap) {
for (long blockId : entry.getValue()) {
Expand Down Expand Up @@ -553,13 +559,18 @@ public byte[] getFinishedBlockIds(
}
Map<Integer, Roaring64NavigableMap[]> shuffleIdToPartitions = partitionsToBlockIds.get(appId);
if (shuffleIdToPartitions == null) {
LOG.warn("Empty blockIds for app: {}. This should not happen", appId);
return null;
}

Roaring64NavigableMap[] blockIds = shuffleIdToPartitions.get(shuffleId);
if (blockIds == null) {
LOG.warn("Empty blockIds for app: {}, shuffleId: {}", appId, shuffleId);
return new byte[] {};
}

ShuffleTaskInfo taskInfo = getShuffleTaskInfo(appId);
long expectedBlockNumber = 0;
Map<Integer, Set<Integer>> bitmapIndexToPartitions = Maps.newHashMap();
for (int partitionId : partitions) {
int bitmapIndex = partitionId % blockIds.length;
Expand All @@ -569,6 +580,7 @@ public byte[] getFinishedBlockIds(
HashSet<Integer> newHashSet = Sets.newHashSet(partitionId);
bitmapIndexToPartitions.put(bitmapIndex, newHashSet);
}
expectedBlockNumber += taskInfo.getBlockNumber(shuffleId, partitionId);
}

Roaring64NavigableMap res = Roaring64NavigableMap.bitmapOf();
Expand All @@ -577,6 +589,17 @@ public byte[] getFinishedBlockIds(
Roaring64NavigableMap bitmap = blockIds[entry.getKey()];
getBlockIdsByPartitionId(requestPartitions, bitmap, res, blockIdLayout);
}

if (res.getLongCardinality() != expectedBlockNumber) {
throw new RssException(
"Inconsistent block number for partitions: "
+ partitions
+ ". Excepted: "
+ expectedBlockNumber
+ ", actual: "
+ res.getLongCardinality());
}

return RssUtils.serializeBitMap(res);
}

Expand Down

0 comments on commit da63e43

Please sign in to comment.