Skip to content

Commit

Permalink
[#1472][part-5] Use UnpooledByteBufAllocator to obtain accurate ByteB…
Browse files Browse the repository at this point in the history
…uf sizes
  • Loading branch information
rickyma committed Feb 21, 2024
1 parent 59aa30d commit bcbd1be
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.util.ByteBufUtils;
import org.apache.uniffle.common.util.NettyUtils;

public class Decoders {
public static ShuffleServerInfo decodeShuffleServerInfo(ByteBuf byteBuf) {
Expand All @@ -46,7 +47,8 @@ public static ShuffleBlockInfo decodeShuffleBlockInfo(ByteBuf byteBuf) {
long crc = byteBuf.readLong();
long taskAttemptId = byteBuf.readLong();
int dataLength = byteBuf.readInt();
ByteBuf data = byteBuf.retain().readSlice(dataLength);
ByteBuf data = NettyUtils.getNettyBufferAllocator().directBuffer(dataLength);
data.writeBytes(byteBuf, dataLength);
int lengthOfShuffleServers = byteBuf.readInt();
List<ShuffleServerInfo> serverInfos = Lists.newArrayList();
for (int k = 0; k < lengthOfShuffleServers; k++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ public long getRequireId() {
return requireId;
}

public void setRequireId(long requireId) {
this.requireId = requireId;
}

public Map<Integer, List<ShuffleBlockInfo>> getPartitionToBlocks() {
return partitionToBlocks;
}
Expand Down
24 changes: 12 additions & 12 deletions common/src/main/java/org/apache/uniffle/common/util/NettyUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import java.util.concurrent.ThreadFactory;

import io.netty.buffer.AbstractByteBufAllocator;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -39,6 +41,8 @@
public class NettyUtils {
private static final Logger logger = LoggerFactory.getLogger(NettyUtils.class);

private static final long MAX_DIRECT_MEMORY_IN_BYTES = PlatformDependent.maxDirectMemory();

/** Creates a Netty EventLoopGroup based on the IOMode. */
public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String threadPrefix) {
ThreadFactory threadFactory = ThreadUtils.getNettyThreadFactory(threadPrefix);
Expand Down Expand Up @@ -114,22 +118,18 @@ public static String getServerConnectionInfo(Channel channel) {
}

private static class AllocatorHolder {
private static final PooledByteBufAllocator INSTANCE = createAllocator();
private static final AbstractByteBufAllocator INSTANCE = createUnpooledByteBufAllocator(true);
}

public static PooledByteBufAllocator getNettyBufferAllocator() {
public static AbstractByteBufAllocator getNettyBufferAllocator() {
return AllocatorHolder.INSTANCE;
}

private static PooledByteBufAllocator createAllocator() {
return new PooledByteBufAllocator(
true,
PooledByteBufAllocator.defaultNumHeapArena(),
PooledByteBufAllocator.defaultNumDirectArena(),
PooledByteBufAllocator.defaultPageSize(),
PooledByteBufAllocator.defaultMaxOrder(),
0,
0,
PooledByteBufAllocator.defaultUseCacheForAllThreads());
public static UnpooledByteBufAllocator createUnpooledByteBufAllocator(boolean preferDirect) {
return new UnpooledByteBufAllocator(preferDirect);
}

public static long getMaxDirectMemory() {
return MAX_DIRECT_MEMORY_IN_BYTES;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,15 @@ public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest requ
}
}

int allocateSize = size;
SendShuffleDataRequest sendShuffleDataRequest =
new SendShuffleDataRequest(
requestId(),
request.getAppId(),
shuffleId,
0L,
stb.getValue(),
System.currentTimeMillis());
int allocateSize = size + sendShuffleDataRequest.encodedLength();
int finalBlockNum = blockNum;
try {
RetryUtils.retryWithCondition(
Expand All @@ -122,14 +130,7 @@ public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest requ
allocateSize, host, port));
}

SendShuffleDataRequest sendShuffleDataRequest =
new SendShuffleDataRequest(
requestId(),
request.getAppId(),
shuffleId,
requireId,
stb.getValue(),
System.currentTimeMillis());
sendShuffleDataRequest.setRequireId(requireId);
long start = System.currentTimeMillis();
RpcResponse rpcResponse =
transportClient.sendRpcSync(sendShuffleDataRequest, rpcTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.util.NettyUtils;
import org.apache.uniffle.common.util.ThreadUtils;

public class NettyDirectMemoryTracker {
Expand Down Expand Up @@ -55,19 +54,10 @@ public void start() {
() -> {
try {
long usedDirectMemory = PlatformDependent.usedDirectMemory();
long allocatedDirectMemory =
NettyUtils.getNettyBufferAllocator().metric().usedDirectMemory();
long pinnedDirectMemory = NettyUtils.getNettyBufferAllocator().pinnedDirectMemory();
if (LOG.isDebugEnabled()) {
LOG.debug(
"Current usedDirectMemory:{}, allocatedDirectMemory:{}, pinnedDirectMemory:{}",
usedDirectMemory,
allocatedDirectMemory,
pinnedDirectMemory);
LOG.debug("Current usedDirectMemory:{}", usedDirectMemory);
}
ShuffleServerMetrics.gaugeUsedDirectMemorySize.set(usedDirectMemory);
ShuffleServerMetrics.gaugeAllocatedDirectMemorySize.set(allocatedDirectMemory);
ShuffleServerMetrics.gaugePinnedDirectMemorySize.set(pinnedDirectMemory);
} catch (Throwable t) {
LOG.error("Failed to report direct memory.", t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ public void sendShuffleData(
final long start = System.currentTimeMillis();
List<ShufflePartitionedData> shufflePartitionedData = toPartitionedData(req);
long alreadyReleasedSize = 0;
boolean isFailureOccurs = false;
for (ShufflePartitionedData spd : shufflePartitionedData) {
String shuffleDataInfo =
"appId["
Expand All @@ -275,6 +276,7 @@ public void sendShuffleData(
+ ret;
LOG.error(errorMsg);
responseMessage = errorMsg;
isFailureOccurs = true;
break;
} else {
long toReleasedSize = spd.getTotalBlockSize();
Expand All @@ -293,9 +295,13 @@ public void sendShuffleData(
ret = StatusCode.INTERNAL_ERROR;
responseMessage = errorMsg;
LOG.error(errorMsg);
isFailureOccurs = true;
break;
}
}
if (isFailureOccurs) {
shuffleServer.getShuffleBufferManager().releaseMemory(info.getRequireSize(), false, false);
}
// since the required buffer id is only used once, the shuffle client would try to require
// another buffer whether
// current connection succeeded or not. Therefore, the preAllocatedBuffer is first get and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,6 @@ public class ShuffleServerMetrics {
public static Gauge.Child gaugeUsedBufferSize;
public static Gauge.Child gaugeReadBufferUsedSize;
public static Gauge.Child gaugeUsedDirectMemorySize;
public static Gauge.Child gaugeAllocatedDirectMemorySize;
public static Gauge.Child gaugePinnedDirectMemorySize;
public static Gauge.Child gaugeWriteHandler;
public static Gauge.Child gaugeEventQueueSize;
public static Gauge.Child gaugeHadoopFlushThreadPoolQueueSize;
Expand Down Expand Up @@ -384,8 +382,6 @@ private static void setUpMetrics() {
gaugeUsedBufferSize = metricsManager.addLabeledGauge(USED_BUFFER_SIZE);
gaugeReadBufferUsedSize = metricsManager.addLabeledGauge(READ_USED_BUFFER_SIZE);
gaugeUsedDirectMemorySize = metricsManager.addLabeledGauge(USED_DIRECT_MEMORY_SIZE);
gaugeAllocatedDirectMemorySize = metricsManager.addLabeledGauge(ALLOCATED_DIRECT_MEMORY_SIZE);
gaugePinnedDirectMemorySize = metricsManager.addLabeledGauge(PINNED_DIRECT_MEMORY_SIZE);
gaugeWriteHandler = metricsManager.addLabeledGauge(TOTAL_WRITE_HANDLER);
gaugeEventQueueSize = metricsManager.addLabeledGauge(EVENT_QUEUE_SIZE);
gaugeHadoopFlushThreadPoolQueueSize =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.common.collect.RangeMap;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeRangeMap;
import io.netty.util.internal.PlatformDependent;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,6 +42,7 @@
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.NettyUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.server.ShuffleFlushManager;
Expand Down Expand Up @@ -68,6 +70,7 @@ public class ShuffleBufferManager {
// Huge partition vars
private long hugePartitionSizeThreshold;
private long hugePartitionMemoryLimitSize;
private boolean nettyServerEnabled;

protected long bufferSize = 0;
protected AtomicLong preAllocatedSize = new AtomicLong(0L);
Expand All @@ -80,11 +83,16 @@ public class ShuffleBufferManager {
protected Map<String, Map<Integer, AtomicLong>> shuffleSizeMap = JavaUtils.newConcurrentMap();

public ShuffleBufferManager(ShuffleServerConf conf, ShuffleFlushManager shuffleFlushManager) {
this.nettyServerEnabled = conf.get(ShuffleServerConf.NETTY_SERVER_PORT) >= 0;
long heapSize = Runtime.getRuntime().maxMemory();
this.capacity = conf.getSizeAsBytes(ShuffleServerConf.SERVER_BUFFER_CAPACITY);
if (this.capacity < 0) {
this.capacity =
(long) (heapSize * conf.getDouble(ShuffleServerConf.SERVER_BUFFER_CAPACITY_RATIO));
nettyServerEnabled
? (long)
(NettyUtils.getMaxDirectMemory()
* conf.getDouble(ShuffleServerConf.SERVER_BUFFER_CAPACITY_RATIO))
: (long) (heapSize * conf.getDouble(ShuffleServerConf.SERVER_BUFFER_CAPACITY_RATIO));
}
this.readCapacity = conf.getSizeAsBytes(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY);
if (this.readCapacity < 0) {
Expand Down Expand Up @@ -321,6 +329,25 @@ public synchronized boolean requireMemory(long size, boolean isPreAllocated) {
if (isPreAllocated) {
requirePreAllocatedSize(size);
}
if (LOG.isDebugEnabled()) {
long usedDirectMemory = PlatformDependent.usedDirectMemory();
long usedHeapMemory =
Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
LOG.debug(
"Require memory succeeded with "
+ size
+ " bytes, usedMemory["
+ usedMemory.get()
+ "] include preAllocation["
+ preAllocatedSize.get()
+ "], inFlushSize["
+ inFlushSize.get()
+ "], usedDirectMemory["
+ usedDirectMemory
+ "], usedHeapMemory["
+ usedHeapMemory
+ "]");
}
return true;
}
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -372,7 +399,7 @@ private void releaseFlushMemory(long size) {
+ inFlushSize.get()
+ "] is less than released["
+ size
+ "], set allocated memory to 0");
+ "], set in flush memory to 0");
inFlushSize.set(0L);
}
ShuffleServerMetrics.gaugeInFlushBufferSize.set(inFlushSize.get());
Expand Down Expand Up @@ -465,7 +492,17 @@ void requirePreAllocatedSize(long delta) {
}

public void releasePreAllocatedSize(long delta) {
preAllocatedSize.addAndGet(-delta);
if (preAllocatedSize.get() >= delta) {
preAllocatedSize.addAndGet(-delta);
} else {
LOG.warn(
"Current pre-allocated memory["
+ preAllocatedSize.get()
+ "] is less than released["
+ delta
+ "], set pre-allocated memory to 0");
preAllocatedSize.set(0L);
}
ShuffleServerMetrics.gaugeAllocatedBufferSize.set(preAllocatedSize.get());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.uniffle.server.ShuffleServerMetrics;
import org.apache.uniffle.server.ShuffleTaskManager;
import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.common.StorageReadMetrics;
import org.apache.uniffle.storage.util.ShuffleStorageUtils;
Expand Down Expand Up @@ -114,11 +115,13 @@ public void handleSendShuffleDataRequest(TransportClient client, SendShuffleData
}
}
int requireSize = shuffleServer.getShuffleTaskManager().getRequireBufferSize(requireBufferId);
int requireBlocksSize =
requireSize - req.encodedLength() < 0 ? 0 : requireSize - req.encodedLength();

StatusCode ret = StatusCode.SUCCESS;
String responseMessage = "OK";
if (req.getPartitionToBlocks().size() > 0) {
ShuffleServerMetrics.counterTotalReceivedDataSize.inc(requireSize);
ShuffleServerMetrics.counterTotalReceivedDataSize.inc(requireBlocksSize);
ShuffleTaskManager manager = shuffleServer.getShuffleTaskManager();
PreAllocatedBufferInfo info = manager.getAndRemovePreAllocatedBuffer(requireBufferId);
boolean isPreAllocated = info != null;
Expand All @@ -134,15 +137,18 @@ public void handleSendShuffleDataRequest(TransportClient client, SendShuffleData
+ appId
+ "], shuffleId["
+ shuffleId
+ "]";
+ "], probably due to the pre-allocated buffer has expired. "
+ "Please increase the expiration time using "
+ ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED.key()
+ " in ShuffleServer's configuration";
LOG.warn(errorMsg);
responseMessage = errorMsg;
rpcResponse =
new RpcResponse(req.getRequestId(), StatusCode.INTERNAL_ERROR, responseMessage);
rpcResponse = new RpcResponse(req.getRequestId(), StatusCode.INTERNAL_ERROR, errorMsg);
client.getChannel().writeAndFlush(rpcResponse);
return;
}
final long start = System.currentTimeMillis();
ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager();
shuffleBufferManager.releaseMemory(req.encodedLength(), false, true);
List<ShufflePartitionedData> shufflePartitionedData = toPartitionedData(req);
long alreadyReleasedSize = 0;
boolean isFailureOccurs = false;
Expand Down Expand Up @@ -191,6 +197,7 @@ public void handleSendShuffleDataRequest(TransportClient client, SendShuffleData
// Once the cache failure occurs, we should explicitly release data held by byteBuf
if (isFailureOccurs) {
Arrays.stream(spd.getBlockList()).forEach(block -> block.getData().release());
shuffleBufferManager.releaseMemory(spd.getTotalBlockSize(), false, false);
}
}
}
Expand All @@ -199,8 +206,8 @@ public void handleSendShuffleDataRequest(TransportClient client, SendShuffleData
// current connection succeeded or not. Therefore, the preAllocatedBuffer is first get and
// removed, then after
// cacheShuffleData finishes, the preAllocatedSize should be updated accordingly.
if (info.getRequireSize() > alreadyReleasedSize) {
manager.releasePreAllocatedSize(info.getRequireSize() - alreadyReleasedSize);
if (requireBlocksSize > alreadyReleasedSize) {
manager.releasePreAllocatedSize(requireBlocksSize - alreadyReleasedSize);
}
rpcResponse = new RpcResponse(req.getRequestId(), ret, responseMessage);
long costTime = System.currentTimeMillis() - start;
Expand All @@ -218,7 +225,7 @@ public void handleSendShuffleDataRequest(TransportClient client, SendShuffleData
+ " ms with "
+ shufflePartitionedData.size()
+ " blocks and "
+ requireSize
+ requireBlocksSize
+ " bytes");
}
} else {
Expand Down

0 comments on commit bcbd1be

Please sign in to comment.