Skip to content

Commit

Permalink
[#1472][part-4] feature(server): Add metrics for Netty's pinnedDirect…
Browse files Browse the repository at this point in the history
…Memory and usedDirectMemory (#1524)

### What changes were proposed in this pull request?

We need to know the exact direct memory usage of `PooledByteBufAllocator` in `Netty`.
So we should introduce metrics for Netty's `pinnedDirectMemory` and `usedDirectMemory`.

### Why are the changes needed?

A sub PR for: #1519

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing UTs.
  • Loading branch information
rickyma authored Feb 14, 2024
1 parent d87dc90 commit 72d85a7
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.util.NettyUtils;
import org.apache.uniffle.common.util.ThreadUtils;

public class NettyDirectMemoryTracker {
Expand Down Expand Up @@ -54,10 +55,19 @@ public void start() {
() -> {
try {
long usedDirectMemory = PlatformDependent.usedDirectMemory();
long allocatedDirectMemory =
NettyUtils.getNettyBufferAllocator().metric().usedDirectMemory();
long pinnedDirectMemory = NettyUtils.getNettyBufferAllocator().pinnedDirectMemory();
if (LOG.isDebugEnabled()) {
LOG.debug("Current direct memory usage: {}", usedDirectMemory);
LOG.debug(
"Current usedDirectMemory:{}, allocatedDirectMemory:{}, pinnedDirectMemory:{}",
usedDirectMemory,
allocatedDirectMemory,
pinnedDirectMemory);
}
ShuffleServerMetrics.gaugeUsedDirectMemorySize.set(usedDirectMemory);
ShuffleServerMetrics.gaugeAllocatedDirectMemorySize.set(allocatedDirectMemory);
ShuffleServerMetrics.gaugePinnedDirectMemorySize.set(pinnedDirectMemory);
} catch (Throwable t) {
LOG.error("Failed to report direct memory.", t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public class ShuffleServerMetrics {
private static final String USED_BUFFER_SIZE = "used_buffer_size";
private static final String READ_USED_BUFFER_SIZE = "read_used_buffer_size";
private static final String USED_DIRECT_MEMORY_SIZE = "used_direct_memory_size";
private static final String ALLOCATED_DIRECT_MEMORY_SIZE = "allocated_direct_memory_size";
private static final String PINNED_DIRECT_MEMORY_SIZE = "pinned_direct_memory_size";
private static final String TOTAL_FAILED_WRITTEN_EVENT_NUM = "total_failed_written_event_num";
private static final String TOTAL_DROPPED_EVENT_NUM = "total_dropped_event_num";
private static final String TOTAL_HADOOP_WRITE_DATA = "total_hadoop_write_data";
Expand Down Expand Up @@ -184,6 +186,8 @@ public class ShuffleServerMetrics {
public static Gauge.Child gaugeUsedBufferSize;
public static Gauge.Child gaugeReadBufferUsedSize;
public static Gauge.Child gaugeUsedDirectMemorySize;
public static Gauge.Child gaugeAllocatedDirectMemorySize;
public static Gauge.Child gaugePinnedDirectMemorySize;
public static Gauge.Child gaugeWriteHandler;
public static Gauge.Child gaugeEventQueueSize;
public static Gauge.Child gaugeHadoopFlushThreadPoolQueueSize;
Expand Down Expand Up @@ -380,6 +384,8 @@ private static void setUpMetrics() {
gaugeUsedBufferSize = metricsManager.addLabeledGauge(USED_BUFFER_SIZE);
gaugeReadBufferUsedSize = metricsManager.addLabeledGauge(READ_USED_BUFFER_SIZE);
gaugeUsedDirectMemorySize = metricsManager.addLabeledGauge(USED_DIRECT_MEMORY_SIZE);
gaugeAllocatedDirectMemorySize = metricsManager.addLabeledGauge(ALLOCATED_DIRECT_MEMORY_SIZE);
gaugePinnedDirectMemorySize = metricsManager.addLabeledGauge(PINNED_DIRECT_MEMORY_SIZE);
gaugeWriteHandler = metricsManager.addLabeledGauge(TOTAL_WRITE_HANDLER);
gaugeEventQueueSize = metricsManager.addLabeledGauge(EVENT_QUEUE_SIZE);
gaugeHadoopFlushThreadPoolQueueSize =
Expand Down

0 comments on commit 72d85a7

Please sign in to comment.