Skip to content

Commit

Permalink
[apache#1708] feat(server): support use skip list to store shuffleBuf…
Browse files Browse the repository at this point in the history
…fer in memory (apache#1763)

### What changes were proposed in this pull request?
Support use skip list to store shuffleBuffer in memory.

### Why are the changes needed?
If we assign a lot of memory to store shuffle data, it will help to improve the performance(The system load of the shuffle server will be reduced.)
Fix: apache#1708

### Does this PR introduce any user-facing change?
set rss.server.shuffleBuffer.type to SKIP_LIST

### How was this patch tested?
UTs and manual testing
  • Loading branch information
xianjingfeng authored Jun 17, 2024
1 parent 8eff38e commit 4300460
Show file tree
Hide file tree
Showing 10 changed files with 967 additions and 350 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.uniffle.common.config.ConfigOptions;
import org.apache.uniffle.common.config.ConfigUtils;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.server.buffer.ShuffleBufferType;

public class ShuffleServerConf extends RssBaseConf {

Expand Down Expand Up @@ -434,6 +435,18 @@ public class ShuffleServerConf extends RssBaseConf {
"The interval of trigger shuffle buffer manager to flush data to persistent storage. If <= 0"
+ ", then this flush check would be disabled.");

public static final ConfigOption<ShuffleBufferType> SERVER_SHUFFLE_BUFFER_TYPE =
ConfigOptions.key("rss.server.shuffleBuffer.type")
.enumType(ShuffleBufferType.class)
.defaultValue(ShuffleBufferType.LINKED_LIST)
.withDescription(
"The type for shuffle buffers. Setting as LINKED_LIST or SKIP_LIST."
+ " The default value is LINKED_LIST. SKIP_LIST will help to improve"
+ " the performance when there are a large number of blocks in memory"
+ " or when the memory occupied by the blocks is very large."
+ " The cpu usage of the shuffle server will be reduced."
+ " But SKIP_LIST doesn't support the slow-start feature of MR.");

public static final ConfigOption<Long> SERVER_SHUFFLE_FLUSH_THRESHOLD =
ConfigOptions.key("rss.server.shuffle.flush.threshold")
.longType()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.server.buffer;

import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;

import com.google.common.collect.Lists;
import io.netty.buffer.CompositeByteBuf;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.NettyUtils;
import org.apache.uniffle.server.ShuffleDataFlushEvent;

public abstract class AbstractShuffleBuffer implements ShuffleBuffer {

private static final Logger LOG = LoggerFactory.getLogger(AbstractShuffleBuffer.class);

private final long capacity;
protected long size;

public AbstractShuffleBuffer(long capacity) {
this.capacity = capacity;
this.size = 0;
}

/** Only for test */
@Override
public synchronized ShuffleDataFlushEvent toFlushEvent(
String appId,
int shuffleId,
int startPartition,
int endPartition,
Supplier<Boolean> isValid) {
return toFlushEvent(
appId,
shuffleId,
startPartition,
endPartition,
isValid,
ShuffleDataDistributionType.NORMAL);
}

@Override
public long getSize() {
return size;
}

@Override
public boolean isFull() {
return size > capacity;
}

@Override
public synchronized ShuffleDataResult getShuffleData(long lastBlockId, int readBufferSize) {
return getShuffleData(lastBlockId, readBufferSize, null);
}

// 1. generate buffer segments and other info: if blockId exist, start with which eventId
// 2. according to info from step 1, generate data
// todo: if block was flushed, it's possible to get duplicated data
@Override
public synchronized ShuffleDataResult getShuffleData(
long lastBlockId, int readBufferSize, Roaring64NavigableMap expectedTaskIds) {
try {
List<BufferSegment> bufferSegments = Lists.newArrayList();
List<ShufflePartitionedBlock> readBlocks = Lists.newArrayList();
updateBufferSegmentsAndResultBlocks(
lastBlockId, readBufferSize, bufferSegments, readBlocks, expectedTaskIds);
if (!bufferSegments.isEmpty()) {
CompositeByteBuf byteBuf =
new CompositeByteBuf(
NettyUtils.getNettyBufferAllocator(),
true,
Constants.COMPOSITE_BYTE_BUF_MAX_COMPONENTS);
// copy result data
updateShuffleData(readBlocks, byteBuf);
return new ShuffleDataResult(byteBuf, bufferSegments);
}
} catch (Exception e) {
LOG.error("Exception happened when getShuffleData in buffer", e);
}
return new ShuffleDataResult();
}

// here is the rule to read data in memory:
// 1. read from inFlushBlockMap order by eventId asc, then from blocks
// 2. if can't find lastBlockId, means related data may be flushed to storage, repeat step 1
protected abstract void updateBufferSegmentsAndResultBlocks(
long lastBlockId,
long readBufferSize,
List<BufferSegment> bufferSegments,
List<ShufflePartitionedBlock> resultBlocks,
Roaring64NavigableMap expectedTaskIds);

protected int calculateDataLength(List<BufferSegment> bufferSegments) {
BufferSegment bufferSegment = bufferSegments.get(bufferSegments.size() - 1);
return bufferSegment.getOffset() + bufferSegment.getLength();
}

private void updateShuffleData(List<ShufflePartitionedBlock> readBlocks, CompositeByteBuf data) {
int offset = 0;
for (ShufflePartitionedBlock block : readBlocks) {
// fill shuffle data
try {
data.addComponent(true, block.getData().retain());
} catch (Exception e) {
LOG.error(
"Unexpected exception for System.arraycopy, length["
+ block.getLength()
+ "], offset["
+ offset
+ "], dataLength["
+ data.capacity()
+ "]",
e);
throw e;
}
offset += block.getLength();
}
}

protected List<Long> sortFlushingEventId(List<Long> eventIdList) {
eventIdList.sort(
(id1, id2) -> {
if (id1 > id2) {
return 1;
}
return -1;
});
return eventIdList;
}

protected void updateSegmentsWithoutBlockId(
int offset,
Collection<ShufflePartitionedBlock> cachedBlocks,
long readBufferSize,
List<BufferSegment> bufferSegments,
List<ShufflePartitionedBlock> readBlocks,
Roaring64NavigableMap expectedTaskIds) {
int currentOffset = offset;
// read from first block
for (ShufflePartitionedBlock block : cachedBlocks) {
if (expectedTaskIds != null && !expectedTaskIds.contains(block.getTaskAttemptId())) {
continue;
}
// add bufferSegment with block
bufferSegments.add(
new BufferSegment(
block.getBlockId(),
currentOffset,
block.getLength(),
block.getUncompressLength(),
block.getCrc(),
block.getTaskAttemptId()));
readBlocks.add(block);
// update offset
currentOffset += block.getLength();
// check if length >= request buffer size
if (currentOffset >= readBufferSize) {
break;
}
}
}
}
Loading

0 comments on commit 4300460

Please sign in to comment.