-
Notifications
You must be signed in to change notification settings - Fork 148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#2181]improvement(server): Replace ShuffleBlockInfo with ShufflePartitionedBlock in netty #2182
base: master
Are you sure you want to change the base?
Conversation
To ensure upgrade compatibility, only the server-side code was modified. |
7e3c3e3
to
c67fa9f
Compare
c67fa9f
to
f1745f3
Compare
Is this a improvement? |
In our stress test environment, it will generates 80million blocks. Removing useless ShuffleBlockInfo will reduce the JVM GC pressure. |
@maobaolong Could you help me review this? |
@lwllvyb With pleasure! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lwllvyb Thanks for this great work, I guess you could do the following changes for this PR.
- Split the unrelated changes into another PR.
- Create an issue to track the followup task and comment(TODO comment) to a place.
- Calc the decodeLength by a simply and clear way rather than compute it everywhere. Maybe you can get it by ByteBuf position change.
long crc = byteBuf.readLong(); | ||
long taskAttemptId = byteBuf.readLong(); | ||
public static Pair<Integer, ShufflePartitionedBlock> decodeShuffleBlockInfo(ByteBuf byteBuf) { | ||
// partId Int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to create another issue to track the next plan base on this PR, and it could be clear and easy to track it by adding a issue link here as a TODO
comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should deprecate the SEND_SHUFFLE_DATA_REQUEST and removed in the future when release a new major version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we upgrade to V2, we could send the encoded length to server, to avoid re-compute the encodedLength in the server side again.
@@ -59,19 +59,27 @@ public static int encodeLengthOfShuffleServerInfo(ShuffleServerInfo shuffleServe | |||
+ 2 * Integer.BYTES; | |||
} | |||
|
|||
public static int encodeLengthShuffleBlockInfoCommon() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be a Constant?
// freeMemory Long | ||
byteBuf.skipBytes(8); | ||
|
||
int shuffleBlockInfoLength = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean shuffleBlockInfoLength to shuffleBlockInfoEncodeLength?
} | ||
|
||
public int getEncodedLength() { | ||
return encodedLength; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This make me confusing, what is the different between getEncodedLength()
and encodedLength
?
@@ -912,11 +905,9 @@ private ServerRpcAuditContext createAuditContext( | |||
auditLogger = AUDIT_LOGGER; | |||
} | |||
ServerRpcAuditContext auditContext = new ServerRpcAuditContext(auditLogger); | |||
auditContext.withCreationTimeNs(System.nanoTime()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be another fix?
+ Encoders.encodeLengthOfShuffleServerInfos(serverInfos); | ||
|
||
return Pair.of( | ||
shuffleBlockInfoLength, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could be better to calc the decodedLength by the byteBuf position
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #2182 +/- ##
============================================
+ Coverage 52.86% 53.26% +0.39%
+ Complexity 3385 3221 -164
============================================
Files 517 481 -36
Lines 28208 26073 -2135
Branches 2633 2467 -166
============================================
- Hits 14913 13887 -1026
+ Misses 12332 11281 -1051
+ Partials 963 905 -58 ☔ View full report in Codecov by Sentry. |
@maobaolong Fixed |
7951a9e
to
64af380
Compare
@@ -69,6 +70,32 @@ public static ShuffleBlockInfo decodeShuffleBlockInfo(ByteBuf byteBuf) { | |||
taskAttemptId); | |||
} | |||
|
|||
public static ShufflePartitionedBlock decodeShuffleBlockInfoV1(ByteBuf byteBuf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about decodeShufflePartitionedBlockV1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lwllvyb Maybe you miss this comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your suggestion. I will change the function name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maobaolong Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your update, it is better enough for code.
I comment inline to show the place that I concern, could you show that this change can not introduce memory leak?
|
||
@Override | ||
public String getOperationType() { | ||
return "sendShuffleData"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sendShuffleDataV1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed it as you suggested.
@@ -274,7 +273,7 @@ public void handleSendShuffleDataRequest(TransportClient client, SendShuffleData | |||
return; | |||
} | |||
final long start = System.currentTimeMillis(); | |||
shuffleBufferManager.releaseMemory(req.encodedLength(), false, true); | |||
shuffleBufferManager.releaseMemory(req.getDecodedLength(), false, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worried about req.getDecodedLength()
is not equals to encodedLength
since the encodedLength
is calculated approximately, but req.getDecodedLength()
is calculated accurately.
So It is necessary to have a test to ensure the releaseMemory is equals to requireMemory, so that there is not memory leak take place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There already has a test class NettyProtocolTest
to make sure that the encodedLength sent from client is same as the getDecodedLength of the server received.
Does we need a new test case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lwllvyb It is enough, thanks for show me this
64af380
to
ffb34d5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lwllvyb Thanks for this improvement, left a minor comment before, please take a look, otherwise, LGTM.
And look forward the next step base on this
ffb34d5
to
c603565
Compare
Ping @jerqi |
…lock with netty mode.
c603565
to
17d4112
Compare
import org.apache.uniffle.common.ShufflePartitionedBlock; | ||
import org.apache.uniffle.common.util.ByteBufUtils; | ||
|
||
public class SendShuffleDataRequestV1 extends RequestMessage { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we use v1 here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The earliest version was a modification of the original class and there will be a lot of intrusive code and it will not be easy to review and understand.
SendShuffleDataRequestV1 class will not decode useless information to ShuffleBlockInfo which is temporary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will we have v2? If we don't have v2, maybe we should remove v1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SendShuffleDataRequestV1
will be only used by Shuffle Server and SendShuffleDataRequest
only be used by client. It is compatible with currently running cluster and serves as a transition.
Next stage, i will add SendShuffleDataRequestV2
which will be used by Shuffle Server and client. SendShuffleDataRequestV2
will not include useless information which is sent from client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The request of the client is different from the one of the server. It's a little weird.
What changes were proposed in this pull request?
On the server side of the netty mode, when the server receives the sendShuffleData message, it decodes it into an unnecessary ShuffleBlockInfo and then converts it to ShufflePartitionedBlock in handleSendShuffleDataRequest.
Why are the changes needed?
Fix: #2181
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Locally