Skip to content

Commit

Permalink
[MINOR] feat(server): Introduce metrics related bitmaps(committedBloc…
Browse files Browse the repository at this point in the history
…kIds/cachedBlockIds/partitionToBlockIds) (#2186)

### What changes were proposed in this pull request?

Introduce metrics related bitmaps.

### Why are the changes needed?

Insight the block bitmap related count.

Fix: # (issue)

### Does this PR introduce _any_ user-facing change?

- committed_block_count
- reported_block_count
- cached_block_count

### How was this patch tested?

Locally.

<img width="2512" alt="image" src="https://github.com/user-attachments/assets/c57706db-a13f-44f5-93fd-235825122f5d">
  • Loading branch information
maobaolong authored Oct 24, 2024
1 parent b5290b2 commit 6b93819
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.uniffle.storage.request.CreateShuffleWriteHandlerRequest;

import static org.apache.uniffle.server.ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION;
import static org.apache.uniffle.server.ShuffleServerMetrics.COMMITTED_BLOCK_COUNT;

public class ShuffleFlushManager {

Expand Down Expand Up @@ -91,6 +92,15 @@ public ShuffleFlushManager(
shuffleServerConf, storageManager, shuffleServer, this::processFlushEvent);
isStorageAuditLogEnabled =
this.shuffleServerConf.getBoolean(ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED);

ShuffleServerMetrics.addLabeledCacheGauge(
COMMITTED_BLOCK_COUNT,
() ->
committedBlockIds.values().stream()
.flatMap(innerMap -> innerMap.values().stream())
.mapToLong(bitmap -> bitmap.getLongCardinality())
.sum(),
2 * 60 * 1000L /* 2 minutes */);
}

public void addToFlushQueue(ShuffleDataFlushEvent event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ public class ShuffleServerMetrics {
public static final String BUFFER_COUNT_IN_BUFFER_POOL = "buffer_count_in_buffer_pool";
public static final String SHUFFLE_COUNT_IN_BUFFER_POOL = "shuffle_count_in_buffer_pool";

public static final String COMMITTED_BLOCK_COUNT = "committed_block_count";
public static final String REPORTED_BLOCK_COUNT = "reported_block_count";
public static final String CACHED_BLOCK_COUNT = "cached_block_count";

public static Counter.Child counterTotalAppNum;
public static Counter.Child counterTotalAppWithHugePartitionNum;
public static Counter.Child counterTotalPartitionNum;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@

import static org.apache.uniffle.server.ShuffleServerConf.CLIENT_MAX_CONCURRENCY_LIMITATION_OF_ONE_PARTITION;
import static org.apache.uniffle.server.ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION;
import static org.apache.uniffle.server.ShuffleServerMetrics.CACHED_BLOCK_COUNT;
import static org.apache.uniffle.server.ShuffleServerMetrics.REPORTED_BLOCK_COUNT;
import static org.apache.uniffle.server.ShuffleServerMetrics.REQUIRE_BUFFER_COUNT;

public class ShuffleTaskManager {
Expand Down Expand Up @@ -244,6 +246,26 @@ public ShuffleTaskManager(
topNShuffleDataSizeOfAppCalcTask.start();

ShuffleServerMetrics.addLabeledGauge(REQUIRE_BUFFER_COUNT, requireBufferIds::size);
ShuffleServerMetrics.addLabeledCacheGauge(
REPORTED_BLOCK_COUNT,
() ->
partitionsToBlockIds.values().stream()
.flatMap(innerMap -> innerMap.values().stream())
.flatMapToLong(
arr ->
java.util.Arrays.stream(arr)
.mapToLong(Roaring64NavigableMap::getLongCardinality))
.sum(),
2 * 60 * 1000L /* 2 minutes */);
ShuffleServerMetrics.addLabeledCacheGauge(
CACHED_BLOCK_COUNT,
() ->
shuffleTaskInfos.values().stream()
.map(ShuffleTaskInfo::getCachedBlockIds)
.flatMap(map -> map.values().stream())
.mapToLong(Roaring64NavigableMap::getLongCardinality)
.sum(),
2 * 60 * 1000L /* 2 minutes */);
}

public ReentrantReadWriteLock.WriteLock getAppWriteLock(String appId) {
Expand Down

0 comments on commit 6b93819

Please sign in to comment.