From 430046040cf611dfed7cfce0ed2ce583032f3b06 Mon Sep 17 00:00:00 2001 From: xianjingfeng Date: Mon, 17 Jun 2024 15:17:42 +0800 Subject: [PATCH] [#1708] feat(server): support use skip list to store shuffleBuffer in memory (#1763) ### What changes were proposed in this pull request? Support use skip list to store shuffleBuffer in memory. ### Why are the changes needed? If we assign a lot of memory to store shuffle data, it will help to improve the performance(The system load of the shuffle server will be reduced.) Fix: #1708 ### Does this PR introduce any user-facing change? set rss.server.shuffleBuffer.type to SKIP_LIST ### How was this patch tested? UTs and manual testing --- .../uniffle/server/ShuffleServerConf.java | 13 + .../server/buffer/AbstractShuffleBuffer.java | 188 ++++++++++ .../uniffle/server/buffer/ShuffleBuffer.java | 343 +----------------- .../server/buffer/ShuffleBufferManager.java | 13 +- .../server/buffer/ShuffleBufferType.java | 23 ++ .../buffer/ShuffleBufferWithLinkedList.java | 246 +++++++++++++ .../buffer/ShuffleBufferWithSkipList.java | 230 ++++++++++++ .../uniffle/server/buffer/BufferTestBase.java | 19 +- ...a => ShuffleBufferWithLinkedListTest.java} | 34 +- .../buffer/ShuffleBufferWithSkipListTest.java | 208 +++++++++++ 10 files changed, 967 insertions(+), 350 deletions(-) create mode 100644 server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java create mode 100644 server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferType.java create mode 100644 server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java create mode 100644 server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java rename server/src/test/java/org/apache/uniffle/server/buffer/{ShuffleBufferTest.java => ShuffleBufferWithLinkedListTest.java} (96%) create mode 100644 server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipListTest.java diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java index 064a6b28b2..ebc4aaccb5 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java @@ -25,6 +25,7 @@ import org.apache.uniffle.common.config.ConfigOptions; import org.apache.uniffle.common.config.ConfigUtils; import org.apache.uniffle.common.config.RssBaseConf; +import org.apache.uniffle.server.buffer.ShuffleBufferType; public class ShuffleServerConf extends RssBaseConf { @@ -434,6 +435,18 @@ public class ShuffleServerConf extends RssBaseConf { "The interval of trigger shuffle buffer manager to flush data to persistent storage. If <= 0" + ", then this flush check would be disabled."); + public static final ConfigOption SERVER_SHUFFLE_BUFFER_TYPE = + ConfigOptions.key("rss.server.shuffleBuffer.type") + .enumType(ShuffleBufferType.class) + .defaultValue(ShuffleBufferType.LINKED_LIST) + .withDescription( + "The type for shuffle buffers. Setting as LINKED_LIST or SKIP_LIST." + + " The default value is LINKED_LIST. SKIP_LIST will help to improve" + + " the performance when there are a large number of blocks in memory" + + " or when the memory occupied by the blocks is very large." + + " The cpu usage of the shuffle server will be reduced." + + " But SKIP_LIST doesn't support the slow-start feature of MR."); + public static final ConfigOption SERVER_SHUFFLE_FLUSH_THRESHOLD = ConfigOptions.key("rss.server.shuffle.flush.threshold") .longType() diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java new file mode 100644 index 0000000000..15197bc86e --- /dev/null +++ b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.server.buffer; + +import java.util.Collection; +import java.util.List; +import java.util.function.Supplier; + +import com.google.common.collect.Lists; +import io.netty.buffer.CompositeByteBuf; +import org.roaringbitmap.longlong.Roaring64NavigableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.common.BufferSegment; +import org.apache.uniffle.common.ShuffleDataDistributionType; +import org.apache.uniffle.common.ShuffleDataResult; +import org.apache.uniffle.common.ShufflePartitionedBlock; +import org.apache.uniffle.common.util.Constants; +import org.apache.uniffle.common.util.NettyUtils; +import org.apache.uniffle.server.ShuffleDataFlushEvent; + +public abstract class AbstractShuffleBuffer implements ShuffleBuffer { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractShuffleBuffer.class); + + private final long capacity; + protected long size; + + public AbstractShuffleBuffer(long capacity) { + this.capacity = capacity; + this.size = 0; + } + + /** Only for test */ + @Override + public synchronized ShuffleDataFlushEvent toFlushEvent( + String appId, + int shuffleId, + int startPartition, + int endPartition, + Supplier isValid) { + return toFlushEvent( + appId, + shuffleId, + startPartition, + endPartition, + isValid, + ShuffleDataDistributionType.NORMAL); + } + + @Override + public long getSize() { + return size; + } + + @Override + public boolean isFull() { + return size > capacity; + } + + @Override + public synchronized ShuffleDataResult getShuffleData(long lastBlockId, int readBufferSize) { + return getShuffleData(lastBlockId, readBufferSize, null); + } + + // 1. generate buffer segments and other info: if blockId exist, start with which eventId + // 2. according to info from step 1, generate data + // todo: if block was flushed, it's possible to get duplicated data + @Override + public synchronized ShuffleDataResult getShuffleData( + long lastBlockId, int readBufferSize, Roaring64NavigableMap expectedTaskIds) { + try { + List bufferSegments = Lists.newArrayList(); + List readBlocks = Lists.newArrayList(); + updateBufferSegmentsAndResultBlocks( + lastBlockId, readBufferSize, bufferSegments, readBlocks, expectedTaskIds); + if (!bufferSegments.isEmpty()) { + CompositeByteBuf byteBuf = + new CompositeByteBuf( + NettyUtils.getNettyBufferAllocator(), + true, + Constants.COMPOSITE_BYTE_BUF_MAX_COMPONENTS); + // copy result data + updateShuffleData(readBlocks, byteBuf); + return new ShuffleDataResult(byteBuf, bufferSegments); + } + } catch (Exception e) { + LOG.error("Exception happened when getShuffleData in buffer", e); + } + return new ShuffleDataResult(); + } + + // here is the rule to read data in memory: + // 1. read from inFlushBlockMap order by eventId asc, then from blocks + // 2. if can't find lastBlockId, means related data may be flushed to storage, repeat step 1 + protected abstract void updateBufferSegmentsAndResultBlocks( + long lastBlockId, + long readBufferSize, + List bufferSegments, + List resultBlocks, + Roaring64NavigableMap expectedTaskIds); + + protected int calculateDataLength(List bufferSegments) { + BufferSegment bufferSegment = bufferSegments.get(bufferSegments.size() - 1); + return bufferSegment.getOffset() + bufferSegment.getLength(); + } + + private void updateShuffleData(List readBlocks, CompositeByteBuf data) { + int offset = 0; + for (ShufflePartitionedBlock block : readBlocks) { + // fill shuffle data + try { + data.addComponent(true, block.getData().retain()); + } catch (Exception e) { + LOG.error( + "Unexpected exception for System.arraycopy, length[" + + block.getLength() + + "], offset[" + + offset + + "], dataLength[" + + data.capacity() + + "]", + e); + throw e; + } + offset += block.getLength(); + } + } + + protected List sortFlushingEventId(List eventIdList) { + eventIdList.sort( + (id1, id2) -> { + if (id1 > id2) { + return 1; + } + return -1; + }); + return eventIdList; + } + + protected void updateSegmentsWithoutBlockId( + int offset, + Collection cachedBlocks, + long readBufferSize, + List bufferSegments, + List readBlocks, + Roaring64NavigableMap expectedTaskIds) { + int currentOffset = offset; + // read from first block + for (ShufflePartitionedBlock block : cachedBlocks) { + if (expectedTaskIds != null && !expectedTaskIds.contains(block.getTaskAttemptId())) { + continue; + } + // add bufferSegment with block + bufferSegments.add( + new BufferSegment( + block.getBlockId(), + currentOffset, + block.getLength(), + block.getUncompressLength(), + block.getCrc(), + block.getTaskAttemptId())); + readBlocks.add(block); + // update offset + currentOffset += block.getLength(); + // check if length >= request buffer size + if (currentOffset >= readBufferSize) { + break; + } + } + } +} diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java index 4a9a215a77..cb01bb0a91 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java @@ -17,359 +17,50 @@ package org.apache.uniffle.server.buffer; -import java.util.Comparator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.function.Supplier; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; -import io.netty.buffer.CompositeByteBuf; import org.roaringbitmap.longlong.Roaring64NavigableMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.uniffle.common.BufferSegment; import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleDataResult; import org.apache.uniffle.common.ShufflePartitionedBlock; import org.apache.uniffle.common.ShufflePartitionedData; -import org.apache.uniffle.common.util.Constants; -import org.apache.uniffle.common.util.JavaUtils; -import org.apache.uniffle.common.util.NettyUtils; import org.apache.uniffle.server.ShuffleDataFlushEvent; -import org.apache.uniffle.server.ShuffleFlushManager; -public class ShuffleBuffer { +public interface ShuffleBuffer { + long append(ShufflePartitionedData data); - private static final Logger LOG = LoggerFactory.getLogger(ShuffleBuffer.class); - - private final long capacity; - private long size; - // blocks will be added to inFlushBlockMap as pair - // it will be removed after flush to storage - // the strategy ensure that shuffle is in memory or storage - private List blocks; - private Map> inFlushBlockMap; - - public ShuffleBuffer(long capacity) { - this.capacity = capacity; - this.size = 0; - this.blocks = new LinkedList<>(); - this.inFlushBlockMap = JavaUtils.newConcurrentMap(); - } - - public long append(ShufflePartitionedData data) { - long mSize = 0; - - synchronized (this) { - for (ShufflePartitionedBlock block : data.getBlockList()) { - blocks.add(block); - mSize += block.getSize(); - } - size += mSize; - } - - return mSize; - } - - public synchronized ShuffleDataFlushEvent toFlushEvent( + ShuffleDataFlushEvent toFlushEvent( String appId, int shuffleId, int startPartition, int endPartition, Supplier isValid, - ShuffleDataDistributionType dataDistributionType) { - if (blocks.isEmpty()) { - return null; - } - // buffer will be cleared, and new list must be created for async flush - List spBlocks = new LinkedList<>(blocks); - List inFlushedQueueBlocks = spBlocks; - if (dataDistributionType == ShuffleDataDistributionType.LOCAL_ORDER) { - /** - * When reordering the blocks, it will break down the original reads sequence to cause the - * data lost in some cases. So we should create a reference copy to avoid this. - */ - inFlushedQueueBlocks = new LinkedList<>(spBlocks); - spBlocks.sort(Comparator.comparingLong(ShufflePartitionedBlock::getTaskAttemptId)); - } - long eventId = ShuffleFlushManager.ATOMIC_EVENT_ID.getAndIncrement(); - final ShuffleDataFlushEvent event = - new ShuffleDataFlushEvent( - eventId, appId, shuffleId, startPartition, endPartition, size, spBlocks, isValid, this); - event.addCleanupCallback( - () -> { - this.clearInFlushBuffer(event.getEventId()); - spBlocks.forEach(spb -> spb.getData().release()); - }); - inFlushBlockMap.put(eventId, inFlushedQueueBlocks); - blocks.clear(); - size = 0; - return event; - } + ShuffleDataDistributionType dataDistributionType); /** Only for test */ - public synchronized ShuffleDataFlushEvent toFlushEvent( - String appId, - int shuffleId, - int startPartition, - int endPartition, - Supplier isValid) { - return toFlushEvent( - appId, - shuffleId, - startPartition, - endPartition, - isValid, - ShuffleDataDistributionType.NORMAL); - } - - public List getBlocks() { - return blocks; - } - - public long getSize() { - return size; - } + ShuffleDataFlushEvent toFlushEvent( + String appId, int shuffleId, int startPartition, int endPartition, Supplier isValid); - public boolean isFull() { - return size > capacity; - } + ShuffleDataResult getShuffleData(long lastBlockId, int readBufferSize); - public synchronized void clearInFlushBuffer(long eventId) { - inFlushBlockMap.remove(eventId); - } - - @VisibleForTesting - public Map> getInFlushBlockMap() { - return inFlushBlockMap; - } + ShuffleDataResult getShuffleData( + long lastBlockId, int readBufferSize, Roaring64NavigableMap expectedTaskIds); - public synchronized ShuffleDataResult getShuffleData(long lastBlockId, int readBufferSize) { - return getShuffleData(lastBlockId, readBufferSize, null); - } + long getSize(); - // 1. generate buffer segments and other info: if blockId exist, start with which eventId - // 2. according to info from step 1, generate data - // todo: if block was flushed, it's possible to get duplicated data - public synchronized ShuffleDataResult getShuffleData( - long lastBlockId, int readBufferSize, Roaring64NavigableMap expectedTaskIds) { - try { - List bufferSegments = Lists.newArrayList(); - List readBlocks = Lists.newArrayList(); - updateBufferSegmentsAndResultBlocks( - lastBlockId, readBufferSize, bufferSegments, readBlocks, expectedTaskIds); - if (!bufferSegments.isEmpty()) { - CompositeByteBuf byteBuf = - new CompositeByteBuf( - NettyUtils.getNettyBufferAllocator(), - true, - Constants.COMPOSITE_BYTE_BUF_MAX_COMPONENTS); - // copy result data - updateShuffleData(readBlocks, byteBuf); - return new ShuffleDataResult(byteBuf, bufferSegments); - } - } catch (Exception e) { - LOG.error("Exception happened when getShuffleData in buffer", e); - } - return new ShuffleDataResult(); - } + boolean isFull(); - // here is the rule to read data in memory: - // 1. read from inFlushBlockMap order by eventId asc, then from blocks - // 2. if can't find lastBlockId, means related data may be flushed to storage, repeat step 1 - private void updateBufferSegmentsAndResultBlocks( - long lastBlockId, - long readBufferSize, - List bufferSegments, - List resultBlocks, - Roaring64NavigableMap expectedTaskIds) { - long nextBlockId = lastBlockId; - List sortedEventId = sortFlushingEventId(); - int offset = 0; - boolean hasLastBlockId = false; - // read from inFlushBlockMap first to make sure the order of - // data read is according to the order of data received - // The number of events means how many batches are in flushing status, - // it should be less than 5, or there has some problem with storage - if (!inFlushBlockMap.isEmpty()) { - for (Long eventId : sortedEventId) { - // update bufferSegments with different strategy according to lastBlockId - if (nextBlockId == Constants.INVALID_BLOCK_ID) { - updateSegmentsWithoutBlockId( - offset, - inFlushBlockMap.get(eventId), - readBufferSize, - bufferSegments, - resultBlocks, - expectedTaskIds); - hasLastBlockId = true; - } else { - hasLastBlockId = - updateSegmentsWithBlockId( - offset, - inFlushBlockMap.get(eventId), - readBufferSize, - nextBlockId, - bufferSegments, - resultBlocks, - expectedTaskIds); - // if last blockId is found, read from begin with next cached blocks - if (hasLastBlockId) { - // reset blockId to read from begin in next cached blocks - nextBlockId = Constants.INVALID_BLOCK_ID; - } - } - if (!bufferSegments.isEmpty()) { - offset = calculateDataLength(bufferSegments); - } - if (offset >= readBufferSize) { - break; - } - } - } - // try to read from cached blocks which is not in flush queue - if (blocks.size() > 0 && offset < readBufferSize) { - if (nextBlockId == Constants.INVALID_BLOCK_ID) { - updateSegmentsWithoutBlockId( - offset, blocks, readBufferSize, bufferSegments, resultBlocks, expectedTaskIds); - hasLastBlockId = true; - } else { - hasLastBlockId = - updateSegmentsWithBlockId( - offset, - blocks, - readBufferSize, - nextBlockId, - bufferSegments, - resultBlocks, - expectedTaskIds); - } - } - if ((!inFlushBlockMap.isEmpty() || blocks.size() > 0) && offset == 0 && !hasLastBlockId) { - // can't find lastBlockId, it should be flushed - // but there still has data in memory - // try read again with blockId = Constants.INVALID_BLOCK_ID - updateBufferSegmentsAndResultBlocks( - Constants.INVALID_BLOCK_ID, - readBufferSize, - bufferSegments, - resultBlocks, - expectedTaskIds); - } - } - - private int calculateDataLength(List bufferSegments) { - BufferSegment bufferSegment = bufferSegments.get(bufferSegments.size() - 1); - return bufferSegment.getOffset() + bufferSegment.getLength(); - } - - private void updateShuffleData(List readBlocks, CompositeByteBuf data) { - int offset = 0; - for (ShufflePartitionedBlock block : readBlocks) { - // fill shuffle data - try { - data.addComponent(true, block.getData().retain()); - } catch (Exception e) { - LOG.error( - "Unexpected exception for System.arraycopy, length[" - + block.getLength() - + "], offset[" - + offset - + "], dataLength[" - + data.capacity() - + "]", - e); - throw e; - } - offset += block.getLength(); - } - } + /** Only for test */ + List getBlocks(); - private List sortFlushingEventId() { - List eventIdList = Lists.newArrayList(inFlushBlockMap.keySet()); - eventIdList.sort( - (id1, id2) -> { - if (id1 > id2) { - return 1; - } - return -1; - }); - return eventIdList; - } + void release(); - private void updateSegmentsWithoutBlockId( - int offset, - List cachedBlocks, - long readBufferSize, - List bufferSegments, - List readBlocks, - Roaring64NavigableMap expectedTaskIds) { - int currentOffset = offset; - // read from first block - for (ShufflePartitionedBlock block : cachedBlocks) { - if (expectedTaskIds != null && !expectedTaskIds.contains(block.getTaskAttemptId())) { - continue; - } - // add bufferSegment with block - bufferSegments.add( - new BufferSegment( - block.getBlockId(), - currentOffset, - block.getLength(), - block.getUncompressLength(), - block.getCrc(), - block.getTaskAttemptId())); - readBlocks.add(block); - // update offset - currentOffset += block.getLength(); - // check if length >= request buffer size - if (currentOffset >= readBufferSize) { - break; - } - } - } + void clearInFlushBuffer(long eventId); - private boolean updateSegmentsWithBlockId( - int offset, - List cachedBlocks, - long readBufferSize, - long lastBlockId, - List bufferSegments, - List readBlocks, - Roaring64NavigableMap expectedTaskIds) { - int currentOffset = offset; - // find lastBlockId, then read from next block - boolean foundBlockId = false; - for (ShufflePartitionedBlock block : cachedBlocks) { - if (!foundBlockId) { - // find lastBlockId - if (block.getBlockId() == lastBlockId) { - foundBlockId = true; - } - continue; - } - if (expectedTaskIds != null && !expectedTaskIds.contains(block.getTaskAttemptId())) { - continue; - } - // add bufferSegment with block - bufferSegments.add( - new BufferSegment( - block.getBlockId(), - currentOffset, - block.getLength(), - block.getUncompressLength(), - block.getCrc(), - block.getTaskAttemptId())); - readBlocks.add(block); - // update offset - currentOffset += block.getLength(); - if (currentOffset >= readBufferSize) { - break; - } - } - return foundBlockId; - } + @VisibleForTesting + Map> getInFlushBlockMap(); } diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java index e85d2eae40..c3506921dc 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java @@ -57,6 +57,7 @@ public class ShuffleBufferManager { private static final Logger LOG = LoggerFactory.getLogger(ShuffleBufferManager.class); + private final ShuffleBufferType shuffleBufferType; private ShuffleTaskManager shuffleTaskManager; private final ShuffleFlushManager shuffleFlushManager; private long capacity; @@ -137,6 +138,7 @@ public ShuffleBufferManager( capacity * conf.get(ShuffleServerConf.HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO)); appBlockSizeMetricEnabled = conf.getBoolean(ShuffleServerConf.APP_LEVEL_SHUFFLE_BLOCK_SIZE_METRIC_ENABLED); + shuffleBufferType = conf.get(ShuffleServerConf.SERVER_SHUFFLE_BUFFER_TYPE); } public void setShuffleTaskManager(ShuffleTaskManager taskManager) { @@ -152,7 +154,13 @@ public StatusCode registerBuffer( if (bufferRangeMap.get(startPartition) == null) { ShuffleServerMetrics.counterTotalPartitionNum.inc(); ShuffleServerMetrics.gaugeTotalPartitionNum.inc(); - bufferRangeMap.put(Range.closed(startPartition, endPartition), new ShuffleBuffer(bufferSize)); + ShuffleBuffer shuffleBuffer; + if (shuffleBufferType == ShuffleBufferType.SKIP_LIST) { + shuffleBuffer = new ShuffleBufferWithSkipList(bufferSize); + } else { + shuffleBuffer = new ShuffleBufferWithLinkedList(bufferSize); + } + bufferRangeMap.put(Range.closed(startPartition, endPartition), shuffleBuffer); } else { LOG.warn( "Already register for appId[" @@ -282,7 +290,6 @@ void flushSingleBufferIfNecessary( buffer.getBlocks().size()); } flushBuffer(buffer, appId, shuffleId, startPartition, endPartition, isHugePartition); - return; } } @@ -712,7 +719,7 @@ public void removeBufferByShuffleId(String appId, Collection shuffleIds Collection buffers = bufferRangeMap.asMapOfRanges().values(); if (buffers != null) { for (ShuffleBuffer buffer : buffers) { - buffer.getBlocks().forEach(spb -> spb.getData().release()); + buffer.release(); ShuffleServerMetrics.gaugeTotalPartitionNum.dec(); size += buffer.getSize(); } diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferType.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferType.java new file mode 100644 index 0000000000..ad1fa04ff7 --- /dev/null +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferType.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.server.buffer; + +public enum ShuffleBufferType { + SKIP_LIST, + LINKED_LIST +} diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java new file mode 100644 index 0000000000..c01bb08231 --- /dev/null +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.server.buffer; + +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +import com.google.common.collect.Lists; +import org.roaringbitmap.longlong.Roaring64NavigableMap; + +import org.apache.uniffle.common.BufferSegment; +import org.apache.uniffle.common.ShuffleDataDistributionType; +import org.apache.uniffle.common.ShufflePartitionedBlock; +import org.apache.uniffle.common.ShufflePartitionedData; +import org.apache.uniffle.common.util.Constants; +import org.apache.uniffle.common.util.JavaUtils; +import org.apache.uniffle.server.ShuffleDataFlushEvent; +import org.apache.uniffle.server.ShuffleFlushManager; + +public class ShuffleBufferWithLinkedList extends AbstractShuffleBuffer { + // blocks will be added to inFlushBlockMap as pair + // it will be removed after flush to storage + // the strategy ensure that shuffle is in memory or storage + private List blocks; + private Map> inFlushBlockMap; + + public ShuffleBufferWithLinkedList(long capacity) { + super(capacity); + this.blocks = new LinkedList<>(); + this.inFlushBlockMap = JavaUtils.newConcurrentMap(); + } + + @Override + public long append(ShufflePartitionedData data) { + long mSize = 0; + + synchronized (this) { + for (ShufflePartitionedBlock block : data.getBlockList()) { + blocks.add(block); + mSize += block.getSize(); + } + size += mSize; + } + + return mSize; + } + + @Override + public synchronized ShuffleDataFlushEvent toFlushEvent( + String appId, + int shuffleId, + int startPartition, + int endPartition, + Supplier isValid, + ShuffleDataDistributionType dataDistributionType) { + if (blocks.isEmpty()) { + return null; + } + // buffer will be cleared, and new list must be created for async flush + List spBlocks = new LinkedList<>(blocks); + List inFlushedQueueBlocks = spBlocks; + if (dataDistributionType == ShuffleDataDistributionType.LOCAL_ORDER) { + /** + * When reordering the blocks, it will break down the original reads sequence to cause the + * data lost in some cases. So we should create a reference copy to avoid this. + */ + inFlushedQueueBlocks = new LinkedList<>(spBlocks); + spBlocks.sort(Comparator.comparingLong(ShufflePartitionedBlock::getTaskAttemptId)); + } + long eventId = ShuffleFlushManager.ATOMIC_EVENT_ID.getAndIncrement(); + final ShuffleDataFlushEvent event = + new ShuffleDataFlushEvent( + eventId, appId, shuffleId, startPartition, endPartition, size, spBlocks, isValid, this); + event.addCleanupCallback( + () -> { + this.clearInFlushBuffer(event.getEventId()); + spBlocks.forEach(spb -> spb.getData().release()); + }); + inFlushBlockMap.put(eventId, inFlushedQueueBlocks); + blocks.clear(); + size = 0; + return event; + } + + @Override + public List getBlocks() { + return blocks; + } + + @Override + public void release() { + blocks.forEach(spb -> spb.getData().release()); + } + + @Override + public synchronized void clearInFlushBuffer(long eventId) { + inFlushBlockMap.remove(eventId); + } + + @Override + public Map> getInFlushBlockMap() { + return inFlushBlockMap; + } + + @Override + protected void updateBufferSegmentsAndResultBlocks( + long lastBlockId, + long readBufferSize, + List bufferSegments, + List resultBlocks, + Roaring64NavigableMap expectedTaskIds) { + long nextBlockId = lastBlockId; + List eventIdList = Lists.newArrayList(inFlushBlockMap.keySet()); + List sortedEventId = sortFlushingEventId(eventIdList); + int offset = 0; + boolean hasLastBlockId = false; + // read from inFlushBlockMap first to make sure the order of + // data read is according to the order of data received + // The number of events means how many batches are in flushing status, + // it should be less than 5, or there has some problem with storage + if (!inFlushBlockMap.isEmpty()) { + for (Long eventId : sortedEventId) { + // update bufferSegments with different strategy according to lastBlockId + if (nextBlockId == Constants.INVALID_BLOCK_ID) { + updateSegmentsWithoutBlockId( + offset, + inFlushBlockMap.get(eventId), + readBufferSize, + bufferSegments, + resultBlocks, + expectedTaskIds); + hasLastBlockId = true; + } else { + hasLastBlockId = + updateSegmentsWithBlockId( + offset, + inFlushBlockMap.get(eventId), + readBufferSize, + nextBlockId, + bufferSegments, + resultBlocks, + expectedTaskIds); + // if last blockId is found, read from begin with next cached blocks + if (hasLastBlockId) { + // reset blockId to read from begin in next cached blocks + nextBlockId = Constants.INVALID_BLOCK_ID; + } + } + if (!bufferSegments.isEmpty()) { + offset = calculateDataLength(bufferSegments); + } + if (offset >= readBufferSize) { + break; + } + } + } + // try to read from cached blocks which is not in flush queue + if (blocks.size() > 0 && offset < readBufferSize) { + if (nextBlockId == Constants.INVALID_BLOCK_ID) { + updateSegmentsWithoutBlockId( + offset, blocks, readBufferSize, bufferSegments, resultBlocks, expectedTaskIds); + hasLastBlockId = true; + } else { + hasLastBlockId = + updateSegmentsWithBlockId( + offset, + blocks, + readBufferSize, + nextBlockId, + bufferSegments, + resultBlocks, + expectedTaskIds); + } + } + if ((!inFlushBlockMap.isEmpty() || blocks.size() > 0) && offset == 0 && !hasLastBlockId) { + // can't find lastBlockId, it should be flushed + // but there still has data in memory + // try read again with blockId = Constants.INVALID_BLOCK_ID + updateBufferSegmentsAndResultBlocks( + Constants.INVALID_BLOCK_ID, + readBufferSize, + bufferSegments, + resultBlocks, + expectedTaskIds); + } + } + + private boolean updateSegmentsWithBlockId( + int offset, + List cachedBlocks, + long readBufferSize, + long lastBlockId, + List bufferSegments, + List readBlocks, + Roaring64NavigableMap expectedTaskIds) { + int currentOffset = offset; + // find lastBlockId, then read from next block + boolean foundBlockId = false; + for (ShufflePartitionedBlock block : cachedBlocks) { + if (!foundBlockId) { + // find lastBlockId + if (block.getBlockId() == lastBlockId) { + foundBlockId = true; + } + continue; + } + if (expectedTaskIds != null && !expectedTaskIds.contains(block.getTaskAttemptId())) { + continue; + } + // add bufferSegment with block + bufferSegments.add( + new BufferSegment( + block.getBlockId(), + currentOffset, + block.getLength(), + block.getUncompressLength(), + block.getCrc(), + block.getTaskAttemptId())); + readBlocks.add(block); + // update offset + currentOffset += block.getLength(); + if (currentOffset >= readBufferSize) { + break; + } + } + return foundBlockId; + } +} diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java new file mode 100644 index 0000000000..4783b4f56d --- /dev/null +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.server.buffer; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import com.google.common.collect.Lists; +import org.roaringbitmap.longlong.Roaring64NavigableMap; + +import org.apache.uniffle.common.BufferSegment; +import org.apache.uniffle.common.ShuffleDataDistributionType; +import org.apache.uniffle.common.ShufflePartitionedBlock; +import org.apache.uniffle.common.ShufflePartitionedData; +import org.apache.uniffle.common.util.Constants; +import org.apache.uniffle.common.util.JavaUtils; +import org.apache.uniffle.server.ShuffleDataFlushEvent; +import org.apache.uniffle.server.ShuffleFlushManager; + +public class ShuffleBufferWithSkipList extends AbstractShuffleBuffer { + private ConcurrentSkipListMap blocksMap; + private final Map> inFlushBlockMap; + + public ShuffleBufferWithSkipList(long capacity) { + super(capacity); + this.blocksMap = newConcurrentSkipListMap(); + this.inFlushBlockMap = JavaUtils.newConcurrentMap(); + } + + private ConcurrentSkipListMap newConcurrentSkipListMap() { + // We just need to ensure the order of taskAttemptId here for we need sort blocks when flush. + // taskAttemptId is in the lowest bits of blockId, so we should reverse it when making + // comparisons. + return new ConcurrentSkipListMap<>(Comparator.comparingLong(Long::reverse)); + } + + @Override + public long append(ShufflePartitionedData data) { + long mSize = 0; + + synchronized (this) { + for (ShufflePartitionedBlock block : data.getBlockList()) { + blocksMap.put(block.getBlockId(), block); + mSize += block.getSize(); + } + size += mSize; + } + + return mSize; + } + + @Override + public synchronized ShuffleDataFlushEvent toFlushEvent( + String appId, + int shuffleId, + int startPartition, + int endPartition, + Supplier isValid, + ShuffleDataDistributionType dataDistributionType) { + if (blocksMap.isEmpty()) { + return null; + } + List spBlocks = new LinkedList<>(blocksMap.values()); + long eventId = ShuffleFlushManager.ATOMIC_EVENT_ID.getAndIncrement(); + final ShuffleDataFlushEvent event = + new ShuffleDataFlushEvent( + eventId, appId, shuffleId, startPartition, endPartition, size, spBlocks, isValid, this); + event.addCleanupCallback( + () -> { + this.clearInFlushBuffer(event.getEventId()); + spBlocks.forEach(spb -> spb.getData().release()); + }); + inFlushBlockMap.put(eventId, blocksMap); + blocksMap = newConcurrentSkipListMap(); + size = 0; + return event; + } + + @Override + public List getBlocks() { + return new LinkedList<>(blocksMap.values()); + } + + @Override + public void release() { + blocksMap.values().forEach(spb -> spb.getData().release()); + } + + @Override + public synchronized void clearInFlushBuffer(long eventId) { + inFlushBlockMap.remove(eventId); + } + + @Override + public Map> getInFlushBlockMap() { + return inFlushBlockMap.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> new ArrayList<>(e.getValue().values()))); + } + + @Override + protected void updateBufferSegmentsAndResultBlocks( + long lastBlockId, + long readBufferSize, + List bufferSegments, + List resultBlocks, + Roaring64NavigableMap expectedTaskIds) { + long nextBlockId = lastBlockId; + List eventIdList = Lists.newArrayList(inFlushBlockMap.keySet()); + List sortedEventId = sortFlushingEventId(eventIdList); + int offset = 0; + boolean hasLastBlockId = false; + // read from inFlushBlockMap first to make sure the order of + // data read is according to the order of data received + // The number of events means how many batches are in flushing status, + // it should be less than 5, or there has some problem with storage + if (!inFlushBlockMap.isEmpty()) { + for (Long eventId : sortedEventId) { + hasLastBlockId = + updateSegments( + offset, + inFlushBlockMap.get(eventId), + readBufferSize, + nextBlockId, + bufferSegments, + resultBlocks, + expectedTaskIds); + // if last blockId is found, read from begin with next cached blocks + if (hasLastBlockId) { + // reset blockId to read from begin in next cached blocks + nextBlockId = Constants.INVALID_BLOCK_ID; + } + if (!bufferSegments.isEmpty()) { + offset = calculateDataLength(bufferSegments); + } + if (offset >= readBufferSize) { + break; + } + } + } + // try to read from cached blocks which is not in flush queue + if (!blocksMap.isEmpty() && offset < readBufferSize) { + hasLastBlockId = + updateSegments( + offset, + blocksMap, + readBufferSize, + nextBlockId, + bufferSegments, + resultBlocks, + expectedTaskIds); + } + if ((!inFlushBlockMap.isEmpty() || !blocksMap.isEmpty()) && offset == 0 && !hasLastBlockId) { + // can't find lastBlockId, it should be flushed + // but there still has data in memory + // try read again with blockId = Constants.INVALID_BLOCK_ID + updateBufferSegmentsAndResultBlocks( + Constants.INVALID_BLOCK_ID, + readBufferSize, + bufferSegments, + resultBlocks, + expectedTaskIds); + } + } + + private boolean updateSegments( + int offset, + ConcurrentSkipListMap cachedBlocks, + long readBufferSize, + long lastBlockId, + List bufferSegments, + List readBlocks, + Roaring64NavigableMap expectedTaskIds) { + int currentOffset = offset; + ConcurrentNavigableMap remainingBlocks; + boolean hasLastBlockId; + if (lastBlockId == Constants.INVALID_BLOCK_ID) { + remainingBlocks = cachedBlocks; + } else { + if (cachedBlocks.get(lastBlockId) == null) { + return false; + } + remainingBlocks = cachedBlocks.tailMap(lastBlockId, false); + } + + hasLastBlockId = true; + for (ShufflePartitionedBlock block : remainingBlocks.values()) { + if (expectedTaskIds != null && !expectedTaskIds.contains(block.getTaskAttemptId())) { + continue; + } + // add bufferSegment with block + bufferSegments.add( + new BufferSegment( + block.getBlockId(), + currentOffset, + block.getLength(), + block.getUncompressLength(), + block.getCrc(), + block.getTaskAttemptId())); + readBlocks.add(block); + // update offset + currentOffset += block.getLength(); + if (currentOffset >= readBufferSize) { + break; + } + } + return hasLastBlockId; + } +} diff --git a/server/src/test/java/org/apache/uniffle/server/buffer/BufferTestBase.java b/server/src/test/java/org/apache/uniffle/server/buffer/BufferTestBase.java index 5e53a80b27..2314a1b6cc 100644 --- a/server/src/test/java/org/apache/uniffle/server/buffer/BufferTestBase.java +++ b/server/src/test/java/org/apache/uniffle/server/buffer/BufferTestBase.java @@ -18,13 +18,14 @@ package org.apache.uniffle.server.buffer; import java.util.Random; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.apache.uniffle.common.ShufflePartitionedBlock; import org.apache.uniffle.common.ShufflePartitionedData; +import org.apache.uniffle.common.util.BlockIdLayout; import org.apache.uniffle.common.util.ChecksumUtils; import org.apache.uniffle.server.ShuffleServerMetrics; @@ -40,7 +41,7 @@ public static void clear() { ShuffleServerMetrics.clear(); } - private static AtomicLong atomBlockId = new AtomicLong(0); + private static AtomicInteger atomSequenceNo = new AtomicInteger(0); protected ShufflePartitionedData createData(int len) { return createData(1, len); @@ -53,16 +54,18 @@ protected ShufflePartitionedData createData(int partitionId, int len) { protected ShufflePartitionedData createData(int partitionId, int taskAttemptId, int len) { byte[] buf = new byte[len]; new Random().nextBytes(buf); + long blockId = + BlockIdLayout.DEFAULT.getBlockId( + getAtomSequenceNo().incrementAndGet(), partitionId, taskAttemptId); ShufflePartitionedBlock block = new ShufflePartitionedBlock( - len, - len, - ChecksumUtils.getCrc32(buf), - atomBlockId.incrementAndGet(), - taskAttemptId, - buf); + len, len, ChecksumUtils.getCrc32(buf), blockId, taskAttemptId, buf); ShufflePartitionedData data = new ShufflePartitionedData(partitionId, new ShufflePartitionedBlock[] {block}); return data; } + + protected AtomicInteger getAtomSequenceNo() { + return atomSequenceNo; + } } diff --git a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferTest.java b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java similarity index 96% rename from server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferTest.java rename to server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java index 5eb1bcfa72..90ac86c9fa 100644 --- a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferTest.java +++ b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java @@ -18,6 +18,7 @@ package org.apache.uniffle.server.buffer; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import com.google.common.collect.Lists; import org.junit.jupiter.api.Test; @@ -38,11 +39,13 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -public class ShuffleBufferTest extends BufferTestBase { +public class ShuffleBufferWithLinkedListTest extends BufferTestBase { + + private static AtomicInteger atomSequenceNo = new AtomicInteger(0); @Test public void appendTest() { - ShuffleBuffer shuffleBuffer = new ShuffleBuffer(100); + ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(100); shuffleBuffer.append(createData(10)); // ShufflePartitionedBlock has constant 32 bytes overhead assertEquals(42, shuffleBuffer.getSize()); @@ -59,7 +62,7 @@ public void appendTest() { @Test public void appendMultiBlocksTest() { - ShuffleBuffer shuffleBuffer = new ShuffleBuffer(100); + ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(100); ShufflePartitionedData data1 = createData(10); ShufflePartitionedData data2 = createData(10); ShufflePartitionedBlock[] dataCombine = new ShufflePartitionedBlock[2]; @@ -71,7 +74,7 @@ public void appendMultiBlocksTest() { @Test public void toFlushEventTest() { - ShuffleBuffer shuffleBuffer = new ShuffleBuffer(100); + ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(100); ShuffleDataFlushEvent event = shuffleBuffer.toFlushEvent("appId", 0, 0, 1, null); assertNull(event); shuffleBuffer.append(createData(10)); @@ -85,7 +88,7 @@ public void toFlushEventTest() { @Test public void getShuffleDataWithExpectedTaskIdsFilterTest() { /** case1: all blocks in cached(or in flushed map) and size < readBufferSize */ - ShuffleBuffer shuffleBuffer = new ShuffleBuffer(100); + ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(100); ShufflePartitionedData spd1 = createData(1, 1, 15); ShufflePartitionedData spd2 = createData(1, 0, 15); ShufflePartitionedData spd3 = createData(1, 2, 55); @@ -197,7 +200,7 @@ public void getShuffleDataWithExpectedTaskIdsFilterTest() { @Test public void getShuffleDataWithLocalOrderTest() { - ShuffleBuffer shuffleBuffer = new ShuffleBuffer(200); + ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(200); ShufflePartitionedData spd1 = createData(1, 1, 15); ShufflePartitionedData spd2 = createData(1, 0, 15); ShufflePartitionedData spd3 = createData(1, 2, 15); @@ -235,7 +238,7 @@ public void getShuffleDataWithLocalOrderTest() { @Test public void getShuffleDataTest() { - ShuffleBuffer shuffleBuffer = new ShuffleBuffer(200); + ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(200); // case1: cached data only, blockId = -1, readBufferSize > buffer size ShufflePartitionedData spd1 = createData(10); ShufflePartitionedData spd2 = createData(20); @@ -247,7 +250,7 @@ public void getShuffleDataTest() { assertArrayEquals(expectedData, sdr.getData()); // case2: cached data only, blockId = -1, readBufferSize = buffer size - shuffleBuffer = new ShuffleBuffer(200); + shuffleBuffer = new ShuffleBufferWithLinkedList(200); spd1 = createData(20); spd2 = createData(20); shuffleBuffer.append(spd1); @@ -258,7 +261,7 @@ public void getShuffleDataTest() { assertArrayEquals(expectedData, sdr.getData()); // case3-1: cached data only, blockId = -1, readBufferSize < buffer size - shuffleBuffer = new ShuffleBuffer(200); + shuffleBuffer = new ShuffleBufferWithLinkedList(200); spd1 = createData(20); spd2 = createData(21); shuffleBuffer.append(spd1); @@ -269,7 +272,7 @@ public void getShuffleDataTest() { assertArrayEquals(expectedData, sdr.getData()); // case3-2: cached data only, blockId = -1, readBufferSize < buffer size - shuffleBuffer = new ShuffleBuffer(200); + shuffleBuffer = new ShuffleBufferWithLinkedList(200); spd1 = createData(15); spd2 = createData(15); ShufflePartitionedData spd3 = createData(15); @@ -289,7 +292,7 @@ public void getShuffleDataTest() { assertArrayEquals(expectedData, sdr.getData()); // case5: flush data only, blockId = -1, readBufferSize < buffer size - shuffleBuffer = new ShuffleBuffer(200); + shuffleBuffer = new ShuffleBufferWithLinkedList(200); spd1 = createData(15); spd2 = createData(15); shuffleBuffer.append(spd1); @@ -307,13 +310,13 @@ public void getShuffleDataTest() { assertEquals(0, sdr.getBufferSegments().size()); // case6: no data in buffer & flush buffer - shuffleBuffer = new ShuffleBuffer(200); + shuffleBuffer = new ShuffleBufferWithLinkedList(200); sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 10); assertEquals(0, sdr.getBufferSegments().size()); assertEquals(0, sdr.getDataLength()); // case7: get data with multiple flush buffer and cached buffer - shuffleBuffer = new ShuffleBuffer(200); + shuffleBuffer = new ShuffleBufferWithLinkedList(200); spd1 = createData(15); spd2 = createData(15); spd3 = createData(15); @@ -596,4 +599,9 @@ private void compareBufferSegment( segmentIndex++; } } + + @Override + protected AtomicInteger getAtomSequenceNo() { + return atomSequenceNo; + } } diff --git a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipListTest.java b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipListTest.java new file mode 100644 index 0000000000..bf5040304b --- /dev/null +++ b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipListTest.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.server.buffer; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; +import org.roaringbitmap.longlong.Roaring64NavigableMap; + +import org.apache.uniffle.common.BufferSegment; +import org.apache.uniffle.common.ShuffleDataDistributionType; +import org.apache.uniffle.common.ShuffleDataResult; +import org.apache.uniffle.common.ShufflePartitionedBlock; +import org.apache.uniffle.common.ShufflePartitionedData; +import org.apache.uniffle.common.util.Constants; +import org.apache.uniffle.server.ShuffleDataFlushEvent; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ShuffleBufferWithSkipListTest extends BufferTestBase { + private static AtomicInteger atomSequenceNo = new AtomicInteger(0); + + @Test + public void appendTest() { + ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList(100); + shuffleBuffer.append(createData(10)); + // ShufflePartitionedBlock has constant 32 bytes overhead + assertEquals(42, shuffleBuffer.getSize()); + assertFalse(shuffleBuffer.isFull()); + + shuffleBuffer.append(createData(26)); + assertEquals(100, shuffleBuffer.getSize()); + assertFalse(shuffleBuffer.isFull()); + + shuffleBuffer.append(createData(1)); + assertEquals(133, shuffleBuffer.getSize()); + assertTrue(shuffleBuffer.isFull()); + } + + @Test + public void appendMultiBlocksTest() { + ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList(100); + ShufflePartitionedData data1 = createData(10); + ShufflePartitionedData data2 = createData(10); + ShufflePartitionedBlock[] dataCombine = new ShufflePartitionedBlock[2]; + dataCombine[0] = data1.getBlockList()[0]; + dataCombine[1] = data2.getBlockList()[0]; + shuffleBuffer.append(new ShufflePartitionedData(1, dataCombine)); + assertEquals(84, shuffleBuffer.getSize()); + } + + @Test + public void toFlushEventTest() { + ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList(100); + ShuffleDataFlushEvent event = shuffleBuffer.toFlushEvent("appId", 0, 0, 1, null); + assertNull(event); + shuffleBuffer.append(createData(10)); + assertEquals(42, shuffleBuffer.getSize()); + event = shuffleBuffer.toFlushEvent("appId", 0, 0, 1, null); + assertEquals(42, event.getSize()); + assertEquals(0, shuffleBuffer.getSize()); + assertEquals(0, shuffleBuffer.getBlocks().size()); + } + + @Test + public void getShuffleDataWithExpectedTaskIdsFilterTest() { + /** case1: all blocks in cached(or in flushed map) and size < readBufferSize */ + ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList(100); + ShufflePartitionedData spd1 = createData(1, 1, 15); + ShufflePartitionedData spd2 = createData(1, 0, 15); + ShufflePartitionedData spd3 = createData(1, 2, 55); + ShufflePartitionedData spd4 = createData(1, 1, 45); + shuffleBuffer.append(spd1); + shuffleBuffer.append(spd2); + shuffleBuffer.append(spd3); + shuffleBuffer.append(spd4); + + Roaring64NavigableMap expectedTasks = Roaring64NavigableMap.bitmapOf(1, 2); + ShuffleDataResult result = + shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 1000, expectedTasks); + assertEquals(3, result.getBufferSegments().size()); + for (BufferSegment segment : result.getBufferSegments()) { + assertTrue(expectedTasks.contains(segment.getTaskAttemptId())); + } + assertEquals(0, result.getBufferSegments().get(0).getOffset()); + // Currently, if we use skip_list, we can't guarantee that the reading order is same as + // writing order. So only check the total segment size of taskAttempt 1. + assertEquals( + 60, + result.getBufferSegments().get(0).getLength() + + result.getBufferSegments().get(1).getLength()); + assertEquals(60, result.getBufferSegments().get(2).getOffset()); + assertEquals(55, result.getBufferSegments().get(2).getLength()); + + expectedTasks = Roaring64NavigableMap.bitmapOf(0); + result = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 1000, expectedTasks); + assertEquals(1, result.getBufferSegments().size()); + assertEquals(15, result.getBufferSegments().get(0).getLength()); + + /** + * case2: all blocks in cached(or in flushed map) and size > readBufferSize, so it will read + * multiple times. + * + *

required blocks size list: 15+45, 55 + */ + expectedTasks = Roaring64NavigableMap.bitmapOf(1, 2); + result = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 60, expectedTasks); + assertEquals(2, result.getBufferSegments().size()); + assertEquals(0, result.getBufferSegments().get(0).getOffset()); + assertEquals( + 60, + result.getBufferSegments().get(0).getLength() + + result.getBufferSegments().get(1).getLength()); + + // 2nd read + long lastBlockId = result.getBufferSegments().get(1).getBlockId(); + result = shuffleBuffer.getShuffleData(lastBlockId, 60, expectedTasks); + assertEquals(1, result.getBufferSegments().size()); + assertEquals(0, result.getBufferSegments().get(0).getOffset()); + assertEquals(55, result.getBufferSegments().get(0).getLength()); + + /** case3: all blocks in flushed map and size < readBufferSize */ + expectedTasks = Roaring64NavigableMap.bitmapOf(1, 2); + ShuffleDataFlushEvent event1 = + shuffleBuffer.toFlushEvent("appId", 0, 0, 1, null, ShuffleDataDistributionType.LOCAL_ORDER); + result = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 1000, expectedTasks); + assertEquals(3, result.getBufferSegments().size()); + for (BufferSegment segment : result.getBufferSegments()) { + assertTrue(expectedTasks.contains(segment.getTaskAttemptId())); + } + assertEquals(0, result.getBufferSegments().get(0).getOffset()); + // Currently, if we use skip_list, we can't guarantee that the reading order is same as + // writing order. So only check the total segment size of taskAttempt 1. + assertEquals( + 60, + result.getBufferSegments().get(0).getLength() + + result.getBufferSegments().get(1).getLength()); + assertEquals(60, result.getBufferSegments().get(2).getOffset()); + assertEquals(55, result.getBufferSegments().get(2).getLength()); + + /** case4: all blocks in flushed map and size > readBufferSize, it will read multiple times */ + expectedTasks = Roaring64NavigableMap.bitmapOf(1, 2); + result = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 60, expectedTasks); + assertEquals(2, result.getBufferSegments().size()); + assertEquals(0, result.getBufferSegments().get(0).getOffset()); + assertEquals( + 60, + result.getBufferSegments().get(0).getLength() + + result.getBufferSegments().get(1).getLength()); + + // 2nd read + lastBlockId = result.getBufferSegments().get(1).getBlockId(); + result = shuffleBuffer.getShuffleData(lastBlockId, 60, expectedTasks); + assertEquals(1, result.getBufferSegments().size()); + assertEquals(0, result.getBufferSegments().get(0).getOffset()); + assertEquals(55, result.getBufferSegments().get(0).getLength()); + + /** + * case5: partial blocks in cache and another in flushedMap, and it will read multiple times. + * + *

required size: 15, 55, 45 (in flushed map) 55, 45, 5, 25(in cached) + */ + ShufflePartitionedData spd5 = createData(1, 2, 55); + ShufflePartitionedData spd6 = createData(1, 1, 45); + ShufflePartitionedData spd7 = createData(1, 1, 5); + ShufflePartitionedData spd8 = createData(1, 1, 25); + shuffleBuffer.append(spd5); + shuffleBuffer.append(spd6); + shuffleBuffer.append(spd7); + shuffleBuffer.append(spd8); + + expectedTasks = Roaring64NavigableMap.bitmapOf(1, 2); + result = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 60, expectedTasks); + assertEquals(2, result.getBufferSegments().size()); + + // 2nd read + lastBlockId = result.getBufferSegments().get(1).getBlockId(); + result = shuffleBuffer.getShuffleData(lastBlockId, 60, expectedTasks); + assertEquals(2, result.getBufferSegments().size()); + // 3rd read + lastBlockId = result.getBufferSegments().get(1).getBlockId(); + result = shuffleBuffer.getShuffleData(lastBlockId, 60, expectedTasks); + assertEquals(3, result.getBufferSegments().size()); + } + + @Override + protected AtomicInteger getAtomSequenceNo() { + return atomSequenceNo; + } +}