Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1472][part-5] Use UnpooledByteBufAllocator to obtain accurate ByteBuf sizes to fix inaccurate usedMemory issue causing OOM #1534

Merged
merged 2 commits into from
Feb 23, 2024

Conversation

rickyma
Copy link
Contributor

@rickyma rickyma commented Feb 21, 2024

What changes were proposed in this pull request?

When we use UnpooledByteBufAllocator to allocate off-heap ByteBuf, Netty directly requests off-heap memory from the operating system instead of allocating it according to pageSize and chunkSize. This way, we can obtain the exact ByteBuf size during the pre-allocation of memory, avoiding distortion of metrics such as usedMemory.

Moreover, we have restored the code submission of the PR #1521. We ensure that there is sufficient direct memory for the Netty server during decoding sendShuffleDataRequest by taking into account the encodedLength of ByteBuf in advance during the pre-allocation of memory, thus avoiding OOM during decoding sendShuffleDataRequest.

Since we are not using PooledByteBufAllocator, the PR #1524 is no longer needed.

Why are the changes needed?

A sub PR for: #1519

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing UTs.

@rickyma
Copy link
Contributor Author

rickyma commented Feb 21, 2024

@jerqi @zuston PTAL.

@codecov-commenter
Copy link

codecov-commenter commented Feb 21, 2024

Codecov Report

Attention: 43 lines in your changes are missing coverage. Please review.

Comparison is base (59aa30d) 54.17% compared to head (d84cf09) 55.12%.

Files Patch % Lines
...niffle/server/netty/ShuffleServerNettyHandler.java 0.00% 15 Missing ⚠️
...he/uniffle/server/buffer/ShuffleBufferManager.java 18.75% 10 Missing and 3 partials ⚠️
...client/impl/grpc/ShuffleServerGrpcNettyClient.java 0.00% 7 Missing ⚠️
...pache/uniffle/server/ShuffleServerGrpcService.java 20.00% 3 Missing and 1 partial ⚠️
.../common/netty/protocol/SendShuffleDataRequest.java 0.00% 2 Missing ⚠️
...ava/org/apache/uniffle/common/util/NettyUtils.java 75.00% 1 Missing ⚠️
...pache/uniffle/server/NettyDirectMemoryTracker.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #1534      +/-   ##
============================================
+ Coverage     54.17%   55.12%   +0.94%     
- Complexity     2822     2824       +2     
============================================
  Files           435      415      -20     
  Lines         24501    22155    -2346     
  Branches       2074     2079       +5     
============================================
- Hits          13274    12212    -1062     
+ Misses        10397     9179    -1218     
+ Partials        830      764      -66     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@rickyma
Copy link
Contributor Author

rickyma commented Feb 21, 2024

Why is Rust CI always failed?

Copy link

github-actions bot commented Feb 21, 2024

Test Results

2 438 files  +1  2 438 suites  +1   4h 43m 24s ⏱️ + 1m 4s
  823 tests ±0    822 ✅ +1   1 💤 ±0  0 ❌ ±0 
9 743 runs  +1  9 729 ✅ +2  14 💤 ±0  0 ❌ ±0 

Results for commit d84cf09. ± Comparison against base commit 59aa30d.

♻️ This comment has been updated with latest results.

@zuston
Copy link
Member

zuston commented Feb 21, 2024

What's the initial motivation of using PooledByteBufAllocator? @leixm

@XuQianJin-Stars
Copy link

In the previous implementation, the pool cache was not used either.

@@ -80,11 +83,16 @@ public class ShuffleBufferManager {
protected Map<String, Map<Integer, AtomicLong>> shuffleSizeMap = JavaUtils.newConcurrentMap();

public ShuffleBufferManager(ShuffleServerConf conf, ShuffleFlushManager shuffleFlushManager) {
this.nettyServerEnabled = conf.get(ShuffleServerConf.NETTY_SERVER_PORT) >= 0;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use RPC_SERVER_TYPE=GRPC_NETTY to judge?

Copy link
Contributor Author

@rickyma rickyma Feb 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -80,11 +83,16 @@ public class ShuffleBufferManager {
protected Map<String, Map<Integer, AtomicLong>> shuffleSizeMap = JavaUtils.newConcurrentMap();

public ShuffleBufferManager(ShuffleServerConf conf, ShuffleFlushManager shuffleFlushManager) {
this.nettyServerEnabled = conf.get(ShuffleServerConf.NETTY_SERVER_PORT) >= 0;
long heapSize = Runtime.getRuntime().maxMemory();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PlatformDependent.maxDirectMemory() This method contains the logic of Runtime.getRuntime().maxMemory(). Can it be merged?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We allow users to configure the maximum value of the direct memory through the variable MAX_DIRECT_MEMORY_SIZE in the rss-env.sh script, so the value of PlatformDependent.maxDirectMemory() may be different from the value of Runtime.getRuntime().maxMemory(), so we cannot merge this part of the code.

@rickyma
Copy link
Contributor Author

rickyma commented Feb 21, 2024

In the previous implementation, the pool cache was not used either.

Yeah, that seems right. Because the parameters smallCacheSize and normalCacheSize are set to 0 when initializing PooledByteBufAllocator.

@rickyma rickyma force-pushed the issue-1472-part-5-new branch 3 times, most recently from c53111c to 408ffcf Compare February 21, 2024 15:43
Copy link
Member

@zuston zuston left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. Left some minor comments

@@ -256,6 +256,7 @@ public void sendShuffleData(
final long start = System.currentTimeMillis();
List<ShufflePartitionedData> shufflePartitionedData = toPartitionedData(req);
long alreadyReleasedSize = 0;
boolean hasFailureOccurred = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not related with this PR. You can submit another PR to fix this.

Copy link
Contributor Author

@rickyma rickyma Feb 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code logic of GRPC and Netty is basically the same. In order to solve the problem of usedMemory being inaccurate in the Netty scenario, I fixed the Netty scenario and also made the same changes to the GRPC side.

So, do we keep the changes for hasFailureOccurred in Netty? It is a bit weird to only modify the same logic code for the Netty part?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Let's reserve this.

@@ -80,11 +84,16 @@ public class ShuffleBufferManager {
protected Map<String, Map<Integer, AtomicLong>> shuffleSizeMap = JavaUtils.newConcurrentMap();

public ShuffleBufferManager(ShuffleServerConf conf, ShuffleFlushManager shuffleFlushManager) {
this.nettyServerEnabled = conf.get(ShuffleServerConf.RPC_SERVER_TYPE) == ServerType.GRPC_NETTY;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be defined by the ShuffleServer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You wanna pass nettyServerEnabled from ShuffleServer into ShuffleBufferManager?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be fixed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would add an extra parameter in ShuffleBufferManager's constructor.

image

It is acceptable? Because the parameter ShuffleServerConf has already contained the value of nettyServerEnabled indirectly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The single source principle should be ensured. And the checking logic is not same, you can see the current code in shuffleServer.java

    nettyServerEnabled = shuffleServerConf.get(ShuffleServerConf.NETTY_SERVER_PORT) >= 0;
    if (nettyServerEnabled) {
      streamServer = new StreamServer(this);
    }

Copy link
Contributor Author

@rickyma rickyma Feb 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this. I'll fix this in the next PR which will fix #1008 together(mentioned in #1531). I've tried before. If I change this in the current PR, plenty of UTs will be failed. (really plenty of UTs, not kidding) @zuston

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. let me merge this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The single source principle should be ensured. And the checking logic is not same, you can see the current code in shuffleServer.java

    nettyServerEnabled = shuffleServerConf.get(ShuffleServerConf.NETTY_SERVER_PORT) >= 0;
    if (nettyServerEnabled) {
      streamServer = new StreamServer(this);
    }

Done in #1540.

@@ -465,7 +493,17 @@ void requirePreAllocatedSize(long delta) {
}

public void releasePreAllocatedSize(long delta) {
preAllocatedSize.addAndGet(-delta);
if (preAllocatedSize.get() >= delta) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it could be optimized by like this:

int allocated = preAllocatedSize.addAndGet(-delta);
if (allocated < 0) {
LOG.warn(
          "Current pre-allocated memory["
              + preAllocatedSize.get()
              + "] is less than released["
              + delta
              + "], set pre-allocated memory to 0");
      preAllocatedSize.set(0L);
}

Copy link
Contributor Author

@rickyma rickyma Feb 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just following the same pattern in releaseMemory, releaseFlushMemory and releaseReadMemory.

Maybe we can refactor the pattern in all the above methods like you said later in another PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

@rickyma rickyma requested a review from zuston February 22, 2024 06:23
@jerqi
Copy link
Contributor

jerqi commented Feb 22, 2024

There is a flaky test. You can see https://github.com/apache/incubator-uniffle/actions/runs/7983021033/job/21797479973

Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 14, localhost, executor driver): org.apache.uniffle.common.exception.RssFetchFailedException: Failed to read shuffle data from HOT handler
	at org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler.readShuffleData(ComposedClientReadHandler.java:124)
	at org.apache.uniffle.client.impl.ShuffleReadClientImpl.read(ShuffleReadClientImpl.java:273)
	at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:185)
	at org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:115)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:156)
	at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
	at org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:155)
	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
	at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:83)
	at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:148)
	at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101)
	at org.apache.uniffle.common.netty.buffer.NettyManagedBuffer.release(NettyManagedBuffer.java:59)
	at org.apache.uniffle.common.ShuffleIndexResult.release(ShuffleIndexResult.java:75)
	at org.apache.uniffle.storage.handler.impl.DataSkippableReadHandler.readShuffleData(DataSkippableReadHandler.java:82)
	at org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler.readShuffleData(ComposedClientReadHandler.java:113)
	... 19 more

@rickyma
Copy link
Contributor Author

rickyma commented Feb 22, 2024

There is a flaky test. You can see https://github.com/apache/incubator-uniffle/actions/runs/7983021033/job/21797479973

Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 14, localhost, executor driver): org.apache.uniffle.common.exception.RssFetchFailedException: Failed to read shuffle data from HOT handler
	at org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler.readShuffleData(ComposedClientReadHandler.java:124)
	at org.apache.uniffle.client.impl.ShuffleReadClientImpl.read(ShuffleReadClientImpl.java:273)
	at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:185)
	at org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:115)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:156)
	at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
	at org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:155)
	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
	at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:83)
	at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:148)
	at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101)
	at org.apache.uniffle.common.netty.buffer.NettyManagedBuffer.release(NettyManagedBuffer.java:59)
	at org.apache.uniffle.common.ShuffleIndexResult.release(ShuffleIndexResult.java:75)
	at org.apache.uniffle.storage.handler.impl.DataSkippableReadHandler.readShuffleData(DataSkippableReadHandler.java:82)
	at org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler.readShuffleData(ComposedClientReadHandler.java:113)
	... 19 more

I don't think it's related to this PR. It does not happen everytime, I'll take a deep look in the next couple of days.
I think maybe #1522 does not solve the issue completely. Perhaps there are other places where potential leaks may exist.

@rickyma
Copy link
Contributor Author

rickyma commented Feb 22, 2024

There is a flaky test. You can see https://github.com/apache/incubator-uniffle/actions/runs/7983021033/job/21797479973

Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 14, localhost, executor driver): org.apache.uniffle.common.exception.RssFetchFailedException: Failed to read shuffle data from HOT handler
	at org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler.readShuffleData(ComposedClientReadHandler.java:124)
	at org.apache.uniffle.client.impl.ShuffleReadClientImpl.read(ShuffleReadClientImpl.java:273)
	at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:185)
	at org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:115)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:156)
	at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
	at org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:155)
	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
	at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:83)
	at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:148)
	at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101)
	at org.apache.uniffle.common.netty.buffer.NettyManagedBuffer.release(NettyManagedBuffer.java:59)
	at org.apache.uniffle.common.ShuffleIndexResult.release(ShuffleIndexResult.java:75)
	at org.apache.uniffle.storage.handler.impl.DataSkippableReadHandler.readShuffleData(DataSkippableReadHandler.java:82)
	at org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler.readShuffleData(ComposedClientReadHandler.java:113)
	... 19 more

Done by #1536. @jerqi

@zuston zuston merged commit 9142516 into apache:master Feb 23, 2024
40 checks passed
zuston pushed a commit that referenced this pull request Feb 29, 2024
…le data requests (#1551)

### What changes were proposed in this pull request?

Refresh `timestamp` when sending `SendShuffleDataRequest`.

### Why are the changes needed?

A follow-up PR for: #1534

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing UTs.
zuston pushed a commit that referenced this pull request Mar 25, 2024
… when failing to cache shuffle data (#1597)

### What changes were proposed in this pull request?

Release memory more accurately when failing to cache shuffle data.

### Why are the changes needed?

A follow-up PR for: #1534.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing UTs.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants