From 60659ce3b6e0469f122929103a3780a79c33e19f Mon Sep 17 00:00:00 2001 From: zhengchenyu Date: Tue, 22 Oct 2024 19:27:25 +0800 Subject: [PATCH] rename shared to buffered --- .../org/apache/uniffle/common/merger/Merger.java | 10 +++++----- .../apache/uniffle/common/records/RecordsReader.java | 4 ++-- .../apache/uniffle/common/records/RecordsWriter.java | 4 ++-- .../common/serializer/SerializerInstance.java | 4 ++-- ...=> BufferedRawWritableDeserializationStream.java} | 12 ++++++------ ...a => BufferedRawWritableSerializationStream.java} | 4 ++-- .../writable/WritableSerializerInstance.java | 12 ++++++------ 7 files changed, 25 insertions(+), 25 deletions(-) rename common/src/main/java/org/apache/uniffle/common/serializer/writable/{SharedRawWritableDeserializationStream.java => BufferedRawWritableDeserializationStream.java} (88%) rename common/src/main/java/org/apache/uniffle/common/serializer/writable/{SharedRawWritableSerializationStream.java => BufferedRawWritableSerializationStream.java} (94%) diff --git a/common/src/main/java/org/apache/uniffle/common/merger/Merger.java b/common/src/main/java/org/apache/uniffle/common/merger/Merger.java index 5a08fec9bd..c5dc21efaf 100644 --- a/common/src/main/java/org/apache/uniffle/common/merger/Merger.java +++ b/common/src/main/java/org/apache/uniffle/common/merger/Merger.java @@ -43,7 +43,7 @@ public static class MergeQueue extends PriorityQueue implements K private final Class valueClass; private Comparator comparator; private boolean raw; - private boolean shared; + private boolean buffered; private Object currentKey; private Object currentValue; @@ -57,7 +57,7 @@ public MergeQueue( Class valueClass, Comparator comparator, boolean raw, - boolean shared) { + boolean buffered) { this.rssConf = rssConf; this.segments = segments; this.keyClass = keyClass; @@ -67,7 +67,7 @@ public MergeQueue( } this.comparator = comparator; this.raw = raw; - this.shared = shared; + this.buffered = buffered; } public void setPopSegmentHook(Function popSegmentHook) { @@ -77,7 +77,7 @@ public void setPopSegmentHook(Function popSegmentHook) { @Override protected boolean lessThan(Object o1, Object o2) { if (raw) { - if (shared) { + if (buffered) { Segment s1 = (Segment) o1; Segment s2 = (Segment) o2; ByteBuf key1 = (ByteBuf) s1.getCurrentKey(); @@ -189,7 +189,7 @@ private void adjustPriorityQueue(Segment segment) throws IOException { public void merge(SerOutputStream output) throws IOException { RecordsWriter writer = - new RecordsWriter(rssConf, output, keyClass, valueClass, raw, shared); + new RecordsWriter(rssConf, output, keyClass, valueClass, raw, buffered); try { writer.init(); while (this.next()) { diff --git a/common/src/main/java/org/apache/uniffle/common/records/RecordsReader.java b/common/src/main/java/org/apache/uniffle/common/records/RecordsReader.java index 6ff94df68d..0594623a77 100644 --- a/common/src/main/java/org/apache/uniffle/common/records/RecordsReader.java +++ b/common/src/main/java/org/apache/uniffle/common/records/RecordsReader.java @@ -38,12 +38,12 @@ public RecordsReader( Class keyClass, Class valueClass, boolean raw, - boolean shared) { + boolean buffered) { SerializerFactory factory = new SerializerFactory(rssConf); Serializer serializer = factory.getSerializer(keyClass); assert factory.getSerializer(valueClass).getClass().equals(serializer.getClass()); SerializerInstance instance = serializer.newInstance(); - stream = instance.deserializeStream(input, keyClass, valueClass, raw, shared); + stream = instance.deserializeStream(input, keyClass, valueClass, raw, buffered); } public void init() { diff --git a/common/src/main/java/org/apache/uniffle/common/records/RecordsWriter.java b/common/src/main/java/org/apache/uniffle/common/records/RecordsWriter.java index 9ec75267ba..d68add57a5 100644 --- a/common/src/main/java/org/apache/uniffle/common/records/RecordsWriter.java +++ b/common/src/main/java/org/apache/uniffle/common/records/RecordsWriter.java @@ -36,12 +36,12 @@ public RecordsWriter( Class keyClass, Class valueClass, boolean raw, - boolean shared) { + boolean buffered) { SerializerFactory factory = new SerializerFactory(rssConf); Serializer serializer = factory.getSerializer(keyClass); assert factory.getSerializer(valueClass).getClass().equals(serializer.getClass()); SerializerInstance instance = serializer.newInstance(); - stream = instance.serializeStream(out, raw, shared); + stream = instance.serializeStream(out, raw, buffered); } public void init() { diff --git a/common/src/main/java/org/apache/uniffle/common/serializer/SerializerInstance.java b/common/src/main/java/org/apache/uniffle/common/serializer/SerializerInstance.java index 46eaed0786..a00dfebe56 100644 --- a/common/src/main/java/org/apache/uniffle/common/serializer/SerializerInstance.java +++ b/common/src/main/java/org/apache/uniffle/common/serializer/SerializerInstance.java @@ -29,8 +29,8 @@ public abstract class SerializerInstance { public abstract T deserialize(DataInputBuffer buffer, Class vClass) throws IOException; public abstract SerializationStream serializeStream( - SerOutputStream output, boolean raw, boolean shared); + SerOutputStream output, boolean raw, boolean buffered); public abstract DeserializationStream deserializeStream( - SerInputStream input, Class keyClass, Class valueClass, boolean raw, boolean shared); + SerInputStream input, Class keyClass, Class valueClass, boolean raw, boolean buffered); } diff --git a/common/src/main/java/org/apache/uniffle/common/serializer/writable/SharedRawWritableDeserializationStream.java b/common/src/main/java/org/apache/uniffle/common/serializer/writable/BufferedRawWritableDeserializationStream.java similarity index 88% rename from common/src/main/java/org/apache/uniffle/common/serializer/writable/SharedRawWritableDeserializationStream.java rename to common/src/main/java/org/apache/uniffle/common/serializer/writable/BufferedRawWritableDeserializationStream.java index 4d86d81959..55e2d55417 100644 --- a/common/src/main/java/org/apache/uniffle/common/serializer/writable/SharedRawWritableDeserializationStream.java +++ b/common/src/main/java/org/apache/uniffle/common/serializer/writable/BufferedRawWritableDeserializationStream.java @@ -29,16 +29,16 @@ import org.apache.uniffle.common.serializer.SerInputStream; import org.apache.uniffle.common.util.NettyUtils; -// Compare to RawWritableDeserializationStream, SharedRawWritableDeserializationStream use shared +// Compare to RawWritableDeserializationStream, BufferedRawWritableDeserializationStream use shared // buffer to store record. It means that after we use nextRecord, we store the record to shared -// buffer. It means we must copy from this before next nextRecord. -// Usually, SharedRawWritableDeserializationStream is used on the server side and +// buffer. It means we must use this before next nextRecord. +// Usually, BufferedRawWritableDeserializationStream is used on the server side and // RawWritableDeserializationStream is used on the client side. Because the records obtained -// in SharedRawWritableDeserializationStream are quickly used to form merged block, using +// in BufferedRawWritableDeserializationStream are quickly used to form merged block, using // shared buffer can avoid frequent memory requests. On the client side, the records obtained // are generally used for subsequent data processing and must be independent copies, so // RawWritableDeserializationStream is used in client side. -public class SharedRawWritableDeserializationStream +public class BufferedRawWritableDeserializationStream extends DeserializationStream { private static final int INIT_BUFFER_SIZE = 256; @@ -50,7 +50,7 @@ public class SharedRawWritableDeserializationStream extends SerializationStream { +public class BufferedRawWritableSerializationStream extends SerializationStream { // DataOutputStream::size return int, can not support big file which is larger than // Integer.MAX_VALUE. @@ -35,7 +35,7 @@ public class SharedRawWritableSerializationStream extends SerializationStr private SerOutputStream output; private DataOutputStream dataOut; - public SharedRawWritableSerializationStream( + public BufferedRawWritableSerializationStream( WritableSerializerInstance instance, SerOutputStream output) { this.output = output; } diff --git a/common/src/main/java/org/apache/uniffle/common/serializer/writable/WritableSerializerInstance.java b/common/src/main/java/org/apache/uniffle/common/serializer/writable/WritableSerializerInstance.java index 859cd304d1..7a4b84a584 100644 --- a/common/src/main/java/org/apache/uniffle/common/serializer/writable/WritableSerializerInstance.java +++ b/common/src/main/java/org/apache/uniffle/common/serializer/writable/WritableSerializerInstance.java @@ -48,10 +48,10 @@ public T deserialize(DataInputBuffer buffer, Class vClass) throws IOExceptio @Override public SerializationStream serializeStream( - SerOutputStream output, boolean raw, boolean shared) { + SerOutputStream output, boolean raw, boolean buffered) { if (raw) { - if (shared) { - return new SharedRawWritableSerializationStream(this, output); + if (buffered) { + return new BufferedRawWritableSerializationStream(this, output); } else { return new RawWritableSerializationStream(this, output); } @@ -62,10 +62,10 @@ public SerializationStream serializeStream( @Override public DeserializationStream deserializeStream( - SerInputStream input, Class keyClass, Class valueClass, boolean raw, boolean shared) { + SerInputStream input, Class keyClass, Class valueClass, boolean raw, boolean buffered) { if (raw) { - if (shared) { - return new SharedRawWritableDeserializationStream(this, input); + if (buffered) { + return new BufferedRawWritableDeserializationStream(this, input); } else { return new RawWritableDeserializationStream(this, input); }