Skip to content

Commit

Permalink
rename shared to buffered
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengchenyu committed Oct 22, 2024
1 parent ccd9f2e commit 60659ce
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static class MergeQueue<K, V> extends PriorityQueue<Segment> implements K
private final Class<V> valueClass;
private Comparator comparator;
private boolean raw;
private boolean shared;
private boolean buffered;

private Object currentKey;
private Object currentValue;
Expand All @@ -57,7 +57,7 @@ public MergeQueue(
Class<V> valueClass,
Comparator<K> comparator,
boolean raw,
boolean shared) {
boolean buffered) {
this.rssConf = rssConf;
this.segments = segments;
this.keyClass = keyClass;
Expand All @@ -67,7 +67,7 @@ public MergeQueue(
}
this.comparator = comparator;
this.raw = raw;
this.shared = shared;
this.buffered = buffered;
}

public void setPopSegmentHook(Function<Integer, Segment> popSegmentHook) {
Expand All @@ -77,7 +77,7 @@ public void setPopSegmentHook(Function<Integer, Segment> popSegmentHook) {
@Override
protected boolean lessThan(Object o1, Object o2) {
if (raw) {
if (shared) {
if (buffered) {
Segment s1 = (Segment) o1;
Segment s2 = (Segment) o2;
ByteBuf key1 = (ByteBuf) s1.getCurrentKey();
Expand Down Expand Up @@ -189,7 +189,7 @@ private void adjustPriorityQueue(Segment segment) throws IOException {

public void merge(SerOutputStream output) throws IOException {
RecordsWriter<K, V> writer =
new RecordsWriter<K, V>(rssConf, output, keyClass, valueClass, raw, shared);
new RecordsWriter<K, V>(rssConf, output, keyClass, valueClass, raw, buffered);
try {
writer.init();
while (this.next()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ public RecordsReader(
Class<K> keyClass,
Class<V> valueClass,
boolean raw,
boolean shared) {
boolean buffered) {
SerializerFactory factory = new SerializerFactory(rssConf);
Serializer serializer = factory.getSerializer(keyClass);
assert factory.getSerializer(valueClass).getClass().equals(serializer.getClass());
SerializerInstance instance = serializer.newInstance();
stream = instance.deserializeStream(input, keyClass, valueClass, raw, shared);
stream = instance.deserializeStream(input, keyClass, valueClass, raw, buffered);
}

public void init() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ public RecordsWriter(
Class keyClass,
Class valueClass,
boolean raw,
boolean shared) {
boolean buffered) {
SerializerFactory factory = new SerializerFactory(rssConf);
Serializer serializer = factory.getSerializer(keyClass);
assert factory.getSerializer(valueClass).getClass().equals(serializer.getClass());
SerializerInstance instance = serializer.newInstance();
stream = instance.serializeStream(out, raw, shared);
stream = instance.serializeStream(out, raw, buffered);
}

public void init() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ public abstract class SerializerInstance {
public abstract <T> T deserialize(DataInputBuffer buffer, Class vClass) throws IOException;

public abstract <K, V> SerializationStream serializeStream(
SerOutputStream output, boolean raw, boolean shared);
SerOutputStream output, boolean raw, boolean buffered);

public abstract <K, V> DeserializationStream deserializeStream(
SerInputStream input, Class<K> keyClass, Class<V> valueClass, boolean raw, boolean shared);
SerInputStream input, Class<K> keyClass, Class<V> valueClass, boolean raw, boolean buffered);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@
import org.apache.uniffle.common.serializer.SerInputStream;
import org.apache.uniffle.common.util.NettyUtils;

// Compare to RawWritableDeserializationStream, SharedRawWritableDeserializationStream use shared
// Compare to RawWritableDeserializationStream, BufferedRawWritableDeserializationStream use shared
// buffer to store record. It means that after we use nextRecord, we store the record to shared
// buffer. It means we must copy from this before next nextRecord.
// Usually, SharedRawWritableDeserializationStream is used on the server side and
// buffer. It means we must use this before next nextRecord.
// Usually, BufferedRawWritableDeserializationStream is used on the server side and
// RawWritableDeserializationStream is used on the client side. Because the records obtained
// in SharedRawWritableDeserializationStream are quickly used to form merged block, using
// in BufferedRawWritableDeserializationStream are quickly used to form merged block, using
// shared buffer can avoid frequent memory requests. On the client side, the records obtained
// are generally used for subsequent data processing and must be independent copies, so
// RawWritableDeserializationStream is used in client side.
public class SharedRawWritableDeserializationStream<K extends Writable, V extends Writable>
public class BufferedRawWritableDeserializationStream<K extends Writable, V extends Writable>
extends DeserializationStream<ByteBuf, ByteBuf> {

private static final int INIT_BUFFER_SIZE = 256;
Expand All @@ -50,7 +50,7 @@ public class SharedRawWritableDeserializationStream<K extends Writable, V extend
private ByteBuf currentKeyBuffer;
private ByteBuf currentValueBuffer;

public SharedRawWritableDeserializationStream(
public BufferedRawWritableDeserializationStream(
WritableSerializerInstance instance, SerInputStream inputStream) {
this.inputStream = inputStream;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.uniffle.common.serializer.SerOutputStream;
import org.apache.uniffle.common.serializer.SerializationStream;

public class SharedRawWritableSerializationStream<K, V> extends SerializationStream {
public class BufferedRawWritableSerializationStream<K, V> extends SerializationStream {

// DataOutputStream::size return int, can not support big file which is larger than
// Integer.MAX_VALUE.
Expand All @@ -35,7 +35,7 @@ public class SharedRawWritableSerializationStream<K, V> extends SerializationStr
private SerOutputStream output;
private DataOutputStream dataOut;

public SharedRawWritableSerializationStream(
public BufferedRawWritableSerializationStream(
WritableSerializerInstance instance, SerOutputStream output) {
this.output = output;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ public <T> T deserialize(DataInputBuffer buffer, Class vClass) throws IOExceptio

@Override
public <K, V> SerializationStream serializeStream(
SerOutputStream output, boolean raw, boolean shared) {
SerOutputStream output, boolean raw, boolean buffered) {
if (raw) {
if (shared) {
return new SharedRawWritableSerializationStream(this, output);
if (buffered) {
return new BufferedRawWritableSerializationStream(this, output);
} else {
return new RawWritableSerializationStream(this, output);
}
Expand All @@ -62,10 +62,10 @@ public <K, V> SerializationStream serializeStream(

@Override
public <K, V> DeserializationStream deserializeStream(
SerInputStream input, Class<K> keyClass, Class<V> valueClass, boolean raw, boolean shared) {
SerInputStream input, Class<K> keyClass, Class<V> valueClass, boolean raw, boolean buffered) {
if (raw) {
if (shared) {
return new SharedRawWritableDeserializationStream(this, input);
if (buffered) {
return new BufferedRawWritableDeserializationStream(this, input);
} else {
return new RawWritableDeserializationStream(this, input);
}
Expand Down

0 comments on commit 60659ce

Please sign in to comment.