Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#6 refactor shuffle-daos by abstracting shuffle IO for supporting both synchronous and asynchronous DAOS Object API #8

Merged
merged 13 commits into from
Feb 22, 2021
Merged
3 changes: 1 addition & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,4 @@ jobs:
install:
- #empty install step
script:
- cd ${TRAVIS_BUILD_DIR}/oap-shuffle/remote-shuffle/
- mvn -q test
- mvn -q clean install
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@
</dependencyManagement>

<modules>
<module>shuffle-hadoop</module>
<module>shuffle-daos</module>
<module>shuffle-hadoop</module>
</modules>

</project>
3 changes: 3 additions & 0 deletions shuffle-daos/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<configuration>
<argLine>-Xmx2048m</argLine>
</configuration>
<executions>
<execution>
<id>test</id>
Expand Down
290 changes: 127 additions & 163 deletions shuffle-daos/src/main/java/org/apache/spark/shuffle/daos/DaosReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,221 +24,185 @@
package org.apache.spark.shuffle.daos;

import io.daos.obj.DaosObject;
import io.daos.obj.IODataDesc;
import io.netty.util.internal.ObjectPool;
import io.netty.buffer.ByteBuf;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import org.apache.spark.shuffle.ShuffleReadMetricsReporter;
import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.BlockManagerId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.Tuple3;

import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
* A class with {@link DaosObject} wrapped to read data from DAOS in either caller's thread or
* dedicated executor thread. The actual read is performed by {@link DaosObject#fetch(IODataDesc)}.
* A abstract class with {@link DaosObject} wrapped to read data from DAOS.
*/
public class DaosReader {
public interface DaosReader {

private DaosObject object;

private Map<DaosShuffleInputStream.BufferSource, Integer> bufferSourceMap = new ConcurrentHashMap<>();

private BoundThreadExecutors executors;

private Map<DaosReader, Integer> readerMap;

private static Logger logger = LoggerFactory.getLogger(DaosReader.class);
DaosObject getObject();

/**
* construct DaosReader with <code>object</code> and dedicated read <code>executors</code>.
* release resources bound with this reader.
*
* @param object
* opened DaosObject
* @param executors
* null means read in caller's thread. Submit {@link ReadTask} to dedicate executor retrieved by
* {@link #nextReaderExecutor()} otherwise.
* @param force
* force close even if there is on-going read
*/
public DaosReader(DaosObject object, BoundThreadExecutors executors) {
this.object = object;
this.executors = executors;
}

public DaosObject getObject() {
return object;
}

public boolean hasExecutors() {
return executors != null;
}
void close(boolean force);

/**
* next executor. null if there is no executors being set.
* set global <code>readMap</code> and hook this reader for releasing resources.
*
* @return shareable executor instance. null means no executor set.
* @param readerMap
* global reader map
*/
public BoundThreadExecutors.SingleThreadExecutor nextReaderExecutor() {
if (executors != null) {
return executors.nextExecutor();
}
return null;
}
void setReaderMap(Map<DaosReader, Integer> readerMap);

/**
* release resources of all {@link org.apache.spark.shuffle.daos.DaosShuffleInputStream.BufferSource}
* bound with this reader.
* prepare read with some parameters.
*
* @param partSizeMap
* @param maxBytesInFlight
* how many bytes can be read concurrently
* @param maxReqSizeShuffleToMem
* maximum data can be put in memory
* @param metrics
* @return
*/
public void close() {
// force releasing
bufferSourceMap.forEach((k, v) -> k.cleanup(true));
bufferSourceMap.clear();
if (readerMap != null) {
readerMap.remove(this);
readerMap = null;
}
}

@Override
public String toString() {
return "DaosReader{" +
"object=" + object +
'}';
}
void prepare(LinkedHashMap<Tuple2<Long, Integer>, Tuple3<Long, BlockId, BlockManagerId>> partSizeMap,
long maxBytesInFlight, long maxReqSizeShuffleToMem, ShuffleReadMetricsReporter metrics);

/**
* register buffer source for resource cleanup.
* current map/reduce id being requested.
*
* @param source
* BufferSource instance
* @return map/reduce id tuple
*/
public void register(DaosShuffleInputStream.BufferSource source) {
bufferSourceMap.put(source, 1);
}
Tuple2<Long, Integer> curMapReduceId();

/**
* unregister buffer source if <code>source</code> is release already.
* get available buffer after iterating current buffer, next buffer in current desc and next desc.
*
* @param source
* BufferSource instance
* @return buffer with data read from DAOS
* @throws IOException
*/
public void unregister(DaosShuffleInputStream.BufferSource source) {
bufferSourceMap.remove(source);
}
ByteBuf nextBuf() throws IOException;

/**
* set global <code>readMap</code> and hook this reader for releasing resources.
* All data from current map output is read and
* reach to data from next map?
*
* @param readerMap
* global reader map
* @return true or false
*/
public void setReaderMap(Map<DaosReader, Integer> readerMap) {
readerMap.put(this, 0);
this.readerMap = readerMap;
}
boolean isNextMap();

/**
* Task to read from DAOS. Task itself is cached to reduce GC time.
* To reuse task for different reads, prepare and reset {@link ReadTaskContext} by calling
* {@link #newInstance(ReadTaskContext)}
* upper layer should call this method to read more map output
*/
static final class ReadTask implements Runnable {
private ReadTaskContext context;
private final ObjectPool.Handle<ReadTask> handle;
void setNextMap(boolean b);

private static final ObjectPool<ReadTask> objectPool = ObjectPool.newPool(handle -> new ReadTask(handle));
/**
* check if all data from current map output is read.
*/
void checkPartitionSize() throws IOException;

private static final Logger log = LoggerFactory.getLogger(ReadTask.class);
/**
* check if all map outputs are read.
*
* @throws IOException
*/
void checkTotalPartitions() throws IOException;

static ReadTask newInstance(ReadTaskContext context) {
ReadTask task = objectPool.get();
task.context = context;
return task;
/**
* reader configurations, please check configs prefixed with SHUFFLE_DAOS_READ in {@link package$#MODULE$}.
*/
final class ReaderConfig {
private long minReadSize;
private long maxBytesInFlight;
private long maxMem;
private int readBatchSize;
private int waitDataTimeMs;
private int waitTimeoutTimes;
private boolean fromOtherThread;

private static final Logger log = LoggerFactory.getLogger(ReaderConfig.class);

public ReaderConfig() {
this(true);
}

private ReadTask(ObjectPool.Handle<ReadTask> handle) {
this.handle = handle;
private ReaderConfig(boolean load) {
if (load) {
initialize();
}
}

@Override
public void run() {
boolean cancelled = context.cancelled;
try {
if (!cancelled) {
context.object.fetch(context.desc);
}
} catch (Exception e) {
log.error("failed to read for " + context.desc, e);
} finally {
// release desc buffer and keep data buffer
context.desc.release(cancelled);
context.signal();
context = null;
handle.recycle(this);
private void initialize() {
SparkConf conf = SparkEnv.get().conf();
minReadSize = (long)conf.get(package$.MODULE$.SHUFFLE_DAOS_READ_MINIMUM_SIZE()) * 1024;
this.maxBytesInFlight = -1L;
this.maxMem = -1L;
this.readBatchSize = (int)conf.get(package$.MODULE$.SHUFFLE_DAOS_READ_BATCH_SIZE());
this.waitDataTimeMs = (int)conf.get(package$.MODULE$.SHUFFLE_DAOS_READ_WAIT_DATA_MS());
this.waitTimeoutTimes = (int)conf.get(package$.MODULE$.SHUFFLE_DAOS_READ_WAIT_DATA_TIMEOUT_TIMES());
this.fromOtherThread = (boolean)conf.get(package$.MODULE$.SHUFFLE_DAOS_READ_FROM_OTHER_THREAD());
if (log.isDebugEnabled()) {
log.debug("minReadSize: " + minReadSize);
log.debug("maxBytesInFlight: " + maxBytesInFlight);
log.debug("maxMem: " + maxMem);
log.debug("readBatchSize: " + readBatchSize);
log.debug("waitDataTimeMs: " + waitDataTimeMs);
log.debug("waitTimeoutTimes: " + waitTimeoutTimes);
log.debug("fromOtherThread: " + fromOtherThread);
}
}
}

/**
* Context for read task. It holds all other object to read and sync between caller thread and read thread.
* It should be cached in caller thread for reusing.
*/
static final class ReadTaskContext extends LinkedTaskContext {

/**
* constructor with all parameters. Some of them can be reused later.
*
* @param object
* DAOS object to fetch data from DAOS
* @param counter
* counter to indicate how many data ready for being consumed
* @param takeLock
* lock to work with <code>notEmpty</code> condition to signal caller thread there is data ready to be consumed
* @param notEmpty
* condition to signal there is some data ready
* @param desc
* desc object to describe which part of data to be fetch and hold returned data
* @param mapReduceId
* to track which map reduce ID this task fetches data for
*/
ReadTaskContext(DaosObject object, AtomicInteger counter, Lock takeLock, Condition notEmpty,
IODataDesc desc, Object mapReduceId) {
super(object, counter, takeLock, notEmpty);
this.desc = desc;
this.morePara = mapReduceId;
public ReaderConfig copy(long maxBytesInFlight, long maxMem) {
ReaderConfig rc = new ReaderConfig(false);
rc.maxMem = maxMem;
rc.minReadSize = minReadSize;
rc.readBatchSize = readBatchSize;
rc.waitDataTimeMs = waitDataTimeMs;
rc.waitTimeoutTimes = waitTimeoutTimes;
rc.fromOtherThread = fromOtherThread;
if (maxBytesInFlight < rc.minReadSize) {
rc.maxBytesInFlight = minReadSize;
} else {
rc.maxBytesInFlight = maxBytesInFlight;
}
return rc;
}

@Override
public ReadTaskContext getNext() {
return (ReadTaskContext) next;
public int getReadBatchSize() {
return readBatchSize;
}

public Tuple2<Long, Integer> getMapReduceId() {
return (Tuple2<Long, Integer>) morePara;
public int getWaitDataTimeMs() {
return waitDataTimeMs;
}
}

/**
* Thread factory for DAOS read tasks.
*/
protected static class ReadThreadFactory implements ThreadFactory {
private AtomicInteger id = new AtomicInteger(0);

@Override
public Thread newThread(Runnable runnable) {
Thread t;
String name = "daos_read_" + id.getAndIncrement();
if (runnable == null) {
t = new Thread(name);
} else {
t = new Thread(runnable, name);
}
t.setDaemon(true);
t.setUncaughtExceptionHandler((thread, throwable) ->
logger.error("exception occurred in thread " + name, throwable));
return t;
public int getWaitTimeoutTimes() {
return waitTimeoutTimes;
}

public long getMaxBytesInFlight() {
return maxBytesInFlight;
}
}

public long getMaxMem() {
return maxMem;
}

public long getMinReadSize() {
return minReadSize;
}

public boolean isFromOtherThread() {
return fromOtherThread;
}
}
}
Loading