-
Notifications
You must be signed in to change notification settings - Fork 148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#2173] feat(remote merge): support netty for remote merge. #2202
Conversation
Test Results 2 926 files ± 0 2 926 suites ±0 6h 26m 6s ⏱️ + 16m 25s Results for commit 60659ce. ± Comparison against base commit 87f9b6f. This pull request removes 20 and adds 59 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
8b6c234
to
3a68303
Compare
@jerqi Can you please review this PR? |
if (raw) { | ||
return new RawWritableSerializationStream(this, output); | ||
if (shared) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we use a shared serialization stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In client side, the parsed record will be used by reduce, so we need a deep copy instance, every record have their own buffer. if we use shared buffer, error will occur.
But in server side, we write the record to the mergedblock immediately after parsing the record, so there is no need for each record to have a separate memory copy. For a segment/block, we can use only two shared buffer, this saves more memory.
BTW, although this PR is about Netty, a lot of work has actually been done on saving memory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Name makes me confused. Because SharedSerializationStream
needs to be operated by multiple threads. It will need the lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A SharedSerializationStream corresponds to a block. SharedSerializationStream will not be accessed by multiple threads, can only be executed under the merge thread corresponding to one partition. Here, 'Shared' means use shared buffer to merge.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we give a better name for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we give a better name for it?
In fact, I used to use shallow
as name before. But I change to 'shared'.
Compare to RawWritableDeserializationStream
, SharedRawWritableDeserializationStream
use shared buffer to store record, but RawWritableDeserializationStream
allocates a new buffer for each record.
If we need changed, RawWritableDeserializationStream
rename to DeepRawWritableDeserializationStream
, and SharedRawWritableDeserializationStream
rename to ShallowRawWritableDeserializationStream
. How about it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BufferDeserializationStream and PartitionDeserializationStream may be better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have three stream:
- WritableSerializationStream
- RawWritableDeserializationStream
- SharedRawWritableDeserializationStream
WritableSerializationStream
is used to parse bytes into Java objects, mainly used on the reduce side of tez and spark.
RawWritableSerializationStream
directly copies bytes without doing any actual deserialization. It is mainly used on the reduce side of mr, because mr requires raw interface.
SharedRawWritableDeserializationStream
is similar to RawWritableSerializationStream
, but uses some memory optimization methods. Mainly used for server-side merge. So deserialization is no needed.
The Raw
prefix means that the bytes are copied directly without unnecessary serialization. I think it should not be deleted.
Now that, I think WritableSerializationStream and RawWritableDeserializationStream names are not changed, rename SharedRawWritableDeserializationStream to BufferRawDeserializationStream. How about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok for me. Buffer
-> Buffered
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
What changes were proposed in this pull request?
Support netty for remote merge. Use direct ByteBuf to replace with byte[] when netty is enable. And optimized code structure to avoid memory leaks
Why are the changes needed?
Fix: #2173
Does this PR introduce any user-facing change?
No.
How was this patch tested?
unit test, integration test, real job in cluster.