Skip to content

Commit

Permalink
remove refcount (#449)
Browse files Browse the repository at this point in the history
  • Loading branch information
qingwen220 authored Jan 18, 2025
1 parent a421b12 commit 9789ee2
Show file tree
Hide file tree
Showing 17 changed files with 24 additions and 200 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ public class ForwardOutputDesc<T> implements IOutputDesc, Serializable {
private final int edgeId;
// Partition number.
private final int numPartitions;
// Data ref count, for data disposal.
private final int refCount;
// Name of the output edge.
private final String edgeName;
// Data exchange mode.
Expand All @@ -47,7 +45,6 @@ public ForwardOutputDesc(
int vertexId,
int edgeId,
int numPartitions,
int refCount,
String edgeName,
DataExchangeMode dataExchangeMode,
List<Integer> targetTaskIndices,
Expand All @@ -56,7 +53,6 @@ public ForwardOutputDesc(
this.vertexId = vertexId;
this.edgeId = edgeId;
this.numPartitions = numPartitions;
this.refCount = refCount;
this.edgeName = edgeName;
this.dataExchangeMode = dataExchangeMode;
this.targetTaskIndices = targetTaskIndices;
Expand All @@ -76,10 +72,6 @@ public int getNumPartitions() {
return this.numPartitions;
}

public int getRefCount() {
return this.refCount;
}

public String getEdgeName() {
return this.edgeName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ public void update(UpdateEmitterRequest request) {
.setTaskName(taskArgs.getTaskName())
.setChannelNum(forwardOutputDesc.getTargetTaskIndices().size())
.setEncoder(encoder)
.setDataExchangeMode(forwardOutputDesc.getDataExchangeMode())
.setRefCount(forwardOutputDesc.getRefCount());
.setDataExchangeMode(forwardOutputDesc.getDataExchangeMode());
pipeRecordWriter.init(writerContext);

AtomicBoolean flag = new AtomicBoolean(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,4 @@ public interface IWriterContext extends Serializable {

DataExchangeMode getDataExchangeMode();

int getRefCount();

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public void init(IWriterContext writerContext) {
}

@Override
protected PipelineSlice newSlice(String taskLogTag, SliceId sliceId, int refCount) {
return new PipelineSlice(taskLogTag, sliceId, refCount);
protected PipelineSlice newSlice(String taskLogTag, SliceId sliceId) {
return new PipelineSlice(taskLogTag, sliceId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void init(IWriterContext writerContext) {
this.maxBufferSize = this.shuffleConfig.getFlushBufferSizeBytes();

this.buffers = this.buildBufferBuilder(this.targetChannels);
this.resultSlices = this.buildResultSlices(this.targetChannels, writerContext.getRefCount());
this.resultSlices = this.buildResultSlices(this.targetChannels);
this.recordSerializer = this.getRecordSerializer();
}

Expand All @@ -89,20 +89,20 @@ private BufferBuilder[] buildBufferBuilder(int channels) {
return buffers;
}

protected IPipelineSlice[] buildResultSlices(int channels, int refCount) {
protected IPipelineSlice[] buildResultSlices(int channels) {
IPipelineSlice[] slices = new IPipelineSlice[channels];
WriterId writerId = new WriterId(this.pipelineId, this.edgeId, this.taskIndex);
SliceManager sliceManager = ShuffleManager.getInstance().getSliceManager();
for (int i = 0; i < channels; i++) {
SliceId sliceId = new SliceId(writerId, i);
IPipelineSlice slice = this.newSlice(this.taskLogTag, sliceId, refCount);
IPipelineSlice slice = this.newSlice(this.taskLogTag, sliceId);
slices[i] = slice;
sliceManager.register(sliceId, slice);
}
return slices;
}

protected abstract IPipelineSlice newSlice(String taskLogTag, SliceId sliceId, int refCount);
protected abstract IPipelineSlice newSlice(String taskLogTag, SliceId sliceId);

@SuppressWarnings("unchecked")
private IRecordSerializer<T> getRecordSerializer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ public SpillableShardWriter(ShuffleAddress shuffleAddress) {
}

@Override
protected IPipelineSlice newSlice(String taskLogTag, SliceId sliceId, int refCount) {
return new SpillablePipelineSlice(taskLogTag, sliceId, refCount);
protected IPipelineSlice newSlice(String taskLogTag, SliceId sliceId) {
return new SpillablePipelineSlice(taskLogTag, sliceId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public class WriterContext implements IWriterContext {
private DataExchangeMode dataExchangeMode;
private int targetChannels;
private ShuffleConfig config;
private int refCount;
private IEncoder<?> encoder;

public WriterContext(long pipelineId, String pipelineName) {
Expand Down Expand Up @@ -82,11 +81,6 @@ public WriterContext setEncoder(IEncoder<?> encoder) {
return this;
}

public WriterContext setRefCount(int refCount) {
this.refCount = refCount;
return this;
}

@Override
public PipelineInfo getPipelineInfo() {
return pipelineInfo;
Expand Down Expand Up @@ -137,11 +131,6 @@ public DataExchangeMode getDataExchangeMode() {
return this.dataExchangeMode;
}

@Override
public int getRefCount() {
return this.refCount;
}

public static WriterContextBuilder newBuilder() {
return new WriterContextBuilder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public WriteNextMessageIfPossibleListener(PipeBuffer pipeBuffer) {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
try {
if (buffer != null && buffer.isDisposable()) {
if (buffer != null) {
buffer.release();
}
if (future.isSuccess()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
public abstract class AbstractBuffer implements OutBuffer {

private final ShuffleMemoryTracker memoryTracker;
protected int refCount;

public AbstractBuffer(boolean enableMemoryTrack) {
this.memoryTracker = enableMemoryTrack
Expand All @@ -30,16 +29,6 @@ public AbstractBuffer(ShuffleMemoryTracker memoryTracker) {
this.memoryTracker = memoryTracker;
}

@Override
public void setRefCount(int refCount) {
this.refCount = refCount;
}

@Override
public boolean isDisposable() {
return this.refCount <= 0;
}

protected void requireMemory(long dataSize) {
if (this.memoryTracker != null) {
memoryTracker.requireMemory(dataSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,6 @@ public interface OutBuffer {
*/
void write(OutputStream outputStream) throws IOException;

/**
* Set ref count, the number of consumer which handle this buffer.
*
* @param refCount ref count.
*/
void setRefCount(int refCount);

/**
* Check if this buffer disposable.
*
* @return if disposable.
*/
boolean isDisposable();

/**
* Release this buffer.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,14 @@ public abstract class AbstractSlice implements IPipelineSlice {

protected final SliceId sliceId;
protected final String taskLogTag;
protected int refCount;
protected int totalBufferCount;
protected ArrayDeque<PipeBuffer> buffers;
protected PipelineSliceReader sliceReader;
protected volatile boolean isReleased;

public AbstractSlice(String taskLogTag, SliceId sliceId, int refCount) {
public AbstractSlice(String taskLogTag, SliceId sliceId) {
this.sliceId = sliceId;
this.taskLogTag = taskLogTag;
this.refCount = refCount;
this.totalBufferCount = 0;
this.buffers = new ArrayDeque<>();
}
Expand All @@ -61,23 +59,17 @@ public PipelineSliceReader createSliceReader(long startBatchId, PipelineSliceLis
throw new GeaflowRuntimeException("slice is already created:" + sliceId);
}

refCount--;
LOGGER.info("creating reader for {} {} with startBatch:{} refCount:{}",
taskLogTag, sliceId, startBatchId, refCount);
LOGGER.info("creating reader for {} {} with startBatch:{}",
taskLogTag, sliceId, startBatchId);

// multiple repeatable readers can exist at the same time.
if (refCount >= 1) {
sliceReader = new RepeatableSliceReader(this, startBatchId, listener);
} else {
sliceReader = new DisposableSliceReader(this, startBatchId, listener);
}
sliceReader = new DisposableSliceReader(this, startBatchId, listener);
return sliceReader;
}
}

@Override
public boolean canRelease() {
return refCount == 0 && !hasNext();
return !hasNext();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import com.antgroup.geaflow.shuffle.message.SliceId;
import com.antgroup.geaflow.shuffle.pipeline.buffer.PipeBuffer;
import java.util.Iterator;

public interface IPipelineSlice {

Expand Down Expand Up @@ -60,9 +59,4 @@ public interface IPipelineSlice {
*/
PipeBuffer next();

/**
* Get slice buffer iterator.
*/
Iterator<PipeBuffer> getBufferIterator();

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.antgroup.geaflow.shuffle.message.SliceId;
import com.antgroup.geaflow.shuffle.pipeline.buffer.PipeBuffer;
import com.google.common.base.Preconditions;
import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -28,11 +27,7 @@ public class PipelineSlice extends AbstractSlice {
private boolean flushRequested;

public PipelineSlice(String taskLogTag, SliceId sliceId) {
super(taskLogTag, sliceId, 1);
}

public PipelineSlice(String taskLogTag, SliceId sliceId, int refCount) {
super(taskLogTag, sliceId, refCount);
super(taskLogTag, sliceId);
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -87,12 +82,6 @@ private void notifyDataAvailable(long batchId) {
}
}

public void setRefCount(int refCount) {
synchronized (buffers) {
this.refCount = refCount;
}
}

// ------------------------------------------------------------------------
// Consume
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -133,11 +122,6 @@ private void updateFlushRequested(boolean flushRequested) {
this.flushRequested = flushRequested;
}

@Override
public Iterator<PipeBuffer> getBufferIterator() {
return buffers.iterator();
}

@Override
public void release() {
final PipelineSliceReader reader;
Expand Down

This file was deleted.

Loading

0 comments on commit 9789ee2

Please sign in to comment.