Skip to content

Commit

Permalink
[REMOTE-SHUFFLE-11] Support DAOS Object Async API (#14)
Browse files Browse the repository at this point in the history
Reconstruct IO Layer and Support DAOS Object Async API

Signed-off-by: jiafu zhang <jiafu.zhang@intel.com>
  • Loading branch information
jiafuzha authored May 14, 2021
1 parent e435952 commit d2bbc03
Show file tree
Hide file tree
Showing 44 changed files with 1,741 additions and 685 deletions.
34 changes: 33 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ differences here.

```
spark.shuffle.manager org.apache.spark.shuffle.daos.DaosShuffleManager
spark.shuffle.daos.pool.uuid <POOL UUID>
spark.shuffle.daos.container.uuid <CONTAINER UUID>
```

### Classpath
Expand All @@ -239,4 +241,34 @@ differences here.
spark.driver.extraClassPath $HOME/miniconda2/envs/oapenv/oap_jars/daos-java-<version>.jar
$HOME/miniconda2/envs/oapenv/oap_jars/hadoop-daos-<version>.jar
$HOME/miniconda2/envs/oapenv/oap_jars/shuffle-daos-<version>.jar
```
```

### Configuration

There are some configurations for tuning shuffle IO. You can find all of them from package,
"org.apache.spark.shuffle.daos". Here are some of them.

```
spark.shuffle.daos.io.async true
```
This shuffle plugin supports both sync IO and async IO. The default is async IO. You can change it by setting this
value to "false". Most of other config items are valid for both sync IO and async IO. For sync or async only config
items, you can find them from either its doc or name itself.


```
spark.shuffle.remove.shuffle.data true
```
All shuffled data is written to DAOS container. They should be deleted after job is done to save space. If you want
to review the shuffled data after job, you can set it to "false".

```
spark.shuffle.daos.read.wait.ms 5000
spark.shuffle.daos.write.wait.ms 5000
```
They are maximum milliseconds to wait before throwing TimedOutException.

```
spark.shuffle.daos.write.buffer 800m
```
Total in-memory buffer size of each map task. You can tune it for you environment.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.intel.oap</groupId>
<artifactId>remote-shuffle-parent</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
<name>OAP Remote Shuffle Parent POM</name>
<packaging>pom</packaging>

Expand Down
2 changes: 1 addition & 1 deletion shuffle-daos/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
A remote shuffle plugin based on DAOS Object API. You can find DAOS and DAOS Java Wrapper in https://github.com/daos-stack/daos and https://github.com/daos-stack/daos/tree/master/src/client/java.
Thanks to DAOS, the plugin is espacially good for small shuffle block, such as around 200KB.

# TO BE CONTINUED
See Shuffle DAOS related documentation in [Readme under project root](../README.md).
2 changes: 1 addition & 1 deletion shuffle-daos/dev/checkstyle.license
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* (C) Copyright 2018-2020 Intel Corporation.
* (C) Copyright 2018-2021 Intel Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
6 changes: 3 additions & 3 deletions shuffle-daos/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.intel.oap</groupId>
<artifactId>remote-shuffle-parent</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
</parent>
<artifactId>shuffle-daos</artifactId>
<name>OAP Remote Shuffle Based on DAOS Object API</name>
Expand Down Expand Up @@ -205,7 +205,7 @@
<dependency>
<groupId>io.daos</groupId>
<artifactId>daos-java</artifactId>
<version>1.1.4</version>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down Expand Up @@ -241,4 +241,4 @@
</dependency>
</dependencies>

</project>
</project>
2 changes: 1 addition & 1 deletion shuffle-daos/scalastyle-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ This file is divided into 3 sections:
<check level="error" class="org.scalastyle.file.HeaderMatchesChecker" enabled="true">
<parameters>
<parameter name="header"><![CDATA[/*
* (C) Copyright 2018-2020 Intel Corporation.
* (C) Copyright 2018-2021 Intel Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* (C) Copyright 2018-2020 Intel Corporation.
* (C) Copyright 2018-2021 Intel Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,6 @@
* portions thereof marked with this legend must also reproduce the markings.
*/


package org.apache.spark.shuffle.daos;

import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* (C) Copyright 2018-2020 Intel Corporation.
* (C) Copyright 2018-2021 Intel Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -84,6 +84,11 @@ void prepare(LinkedHashMap<Tuple2<Long, Integer>, Tuple3<Long, BlockId, BlockMan
*/
Tuple2<Long, Integer> curMapReduceId();

/**
* find next mapReduce Id
*/
void nextMapReduceId();

/**
* get available buffer after iterating current buffer, next buffer in current desc and next desc.
*
Expand Down Expand Up @@ -128,26 +133,30 @@ final class ReaderConfig {
private int waitDataTimeMs;
private int waitTimeoutTimes;
private boolean fromOtherThread;
private SparkConf conf;

private static final Logger log = LoggerFactory.getLogger(ReaderConfig.class);

public ReaderConfig() {
this(true);
public ReaderConfig(SparkConf conf) {
this(conf, true);
}

private ReaderConfig(boolean load) {
private ReaderConfig(SparkConf conf, boolean load) {
this.conf = conf;
if (load) {
initialize();
}
}

private void initialize() {
SparkConf conf = SparkEnv.get().conf();
if (conf == null) {
this.conf = SparkEnv.get().conf();
}
minReadSize = (long)conf.get(package$.MODULE$.SHUFFLE_DAOS_READ_MINIMUM_SIZE()) * 1024;
this.maxBytesInFlight = -1L;
this.maxMem = -1L;
this.readBatchSize = (int)conf.get(package$.MODULE$.SHUFFLE_DAOS_READ_BATCH_SIZE());
this.waitDataTimeMs = (int)conf.get(package$.MODULE$.SHUFFLE_DAOS_READ_WAIT_DATA_MS());
this.waitDataTimeMs = (int)conf.get(package$.MODULE$.SHUFFLE_DAOS_READ_WAIT_MS());
this.waitTimeoutTimes = (int)conf.get(package$.MODULE$.SHUFFLE_DAOS_READ_WAIT_DATA_TIMEOUT_TIMES());
this.fromOtherThread = (boolean)conf.get(package$.MODULE$.SHUFFLE_DAOS_READ_FROM_OTHER_THREAD());
if (log.isDebugEnabled()) {
Expand All @@ -162,7 +171,7 @@ private void initialize() {
}

public ReaderConfig copy(long maxBytesInFlight, long maxMem) {
ReaderConfig rc = new ReaderConfig(false);
ReaderConfig rc = new ReaderConfig(conf, false);
rc.maxMem = maxMem;
rc.minReadSize = minReadSize;
rc.readBatchSize = readBatchSize;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* (C) Copyright 2018-2021 Intel Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* GOVERNMENT LICENSE RIGHTS-OPEN SOURCE SOFTWARE
* The Government's rights to use, modify, reproduce, release, perform, display,
* or disclose this software are subject to the terms of the Apache License as
* provided in Contract No. B609815.
* Any reproduction of computer software, computer software documentation, or
* portions thereof marked with this legend must also reproduce the markings.
*/

package org.apache.spark.shuffle.daos;

import io.daos.DaosEventQueue;
import io.daos.TimedOutException;
import io.daos.obj.DaosObject;
import io.daos.obj.IODataDescBase;
import io.daos.obj.IOSimpleDDAsync;
import io.netty.buffer.ByteBuf;

import java.io.IOException;
import java.util.*;

public class DaosReaderAsync extends DaosReaderBase {

private DaosEventQueue eq;

private Set<IOSimpleDDAsync> runningDescSet = new LinkedHashSet<>();

private LinkedList<IOSimpleDDAsync> readyList = new LinkedList<>();

private List<DaosEventQueue.Attachment> completedList = new LinkedList<>();

public DaosReaderAsync(DaosObject object, ReaderConfig config) throws IOException {
super(object, config);
eq = DaosEventQueue.getInstance(0);
}

@Override
protected IODataDescBase createFetchDataDesc(String reduceId) throws IOException {
return object.createAsyncDataDescForFetch(reduceId, eq.getEqWrapperHdl());
}

@Override
protected void addFetchEntry(IODataDescBase desc, String mapId, long offset, long readSize) throws IOException {
if (readSize > Integer.MAX_VALUE) {
throw new IllegalArgumentException("readSize should not exceed " + Integer.MAX_VALUE);
}
((IOSimpleDDAsync) desc).addEntryForFetch(mapId, offset, (int)readSize);
}

@Override
public ByteBuf nextBuf() throws IOException {
ByteBuf buf = tryCurrentEntry();
if (buf != null) {
return buf;
}
// next entry
buf = tryCurrentDesc();
if (buf != null) {
return buf;
}
// next desc
entryIdx = 0;
return readFromDaos();
}

private ByteBuf enterNewDesc(IOSimpleDDAsync desc) throws IOException {
if (currentDesc != null) {
currentDesc.release();
}
currentDesc = desc;
return validateLastEntryAndGetBuf(desc.getEntry(entryIdx));
}

private IOSimpleDDAsync nextDesc() throws IOException {
return readyList.poll();
}

private void progress() throws IOException {
if (runningDescSet.isEmpty()) {
return;
}
completedList.clear();
long timeOutMs = config.getWaitDataTimeMs();
long start = System.currentTimeMillis();
int n = eq.pollCompleted(completedList, runningDescSet.size(), timeOutMs);
while (n == 0) {
long dur = System.currentTimeMillis() - start;
if (dur > timeOutMs) {
throw new TimedOutException("timed out after " + dur);
}
n = eq.pollCompleted(completedList, runningDescSet.size(), timeOutMs - dur);
}
verifyCompleted();
}

private ByteBuf readFromDaos() throws IOException {
if (runningDescSet.isEmpty()) {
DaosEventQueue.Event event = null;
TimedOutException te = null;
try {
event = acquireEvent();
} catch (TimedOutException e) {
te = e;
}
IOSimpleDDAsync taskDesc = (IOSimpleDDAsync) createNextDesc(config.getMaxBytesInFlight());
if (taskDesc == null) {
if (event != null) {
event.putBack();
}
return null;
}
if (te != null) { // have data to read, but no event
throw te;
}
runningDescSet.add(taskDesc);
taskDesc.setEvent(event);
try {
object.fetchAsync(taskDesc);
} catch (IOException e) {
taskDesc.release();
runningDescSet.remove(taskDesc);
throw e;
}
}
progress();
IOSimpleDDAsync desc = nextDesc();
if (desc != null) {
return enterNewDesc(desc);
}
return null;
}

private DaosEventQueue.Event acquireEvent() throws IOException {
completedList.clear();
DaosEventQueue.Event event = eq.acquireEventBlocking(config.getWaitDataTimeMs(), completedList);
verifyCompleted();
return event;
}

private void verifyCompleted() throws IOException {
IOSimpleDDAsync failed = null;
int failedCnt = 0;
for (DaosEventQueue.Attachment attachment : completedList) {
if (runningDescSet.contains(attachment)) {
IOSimpleDDAsync desc = (IOSimpleDDAsync) attachment;
runningDescSet.remove(attachment);
if (desc.isSucceeded()) {
readyList.add(desc);
continue;
}
failedCnt++;
if (failed == null) {
failed = desc;
}
}
}
if (failedCnt > 0) {
throw new IOException("failed to read " + failedCnt + " IOSimpleDDAsync. First failed is " + failed);
}
}

@Override
public void close(boolean force) {
readyList.forEach(desc -> desc.release());
runningDescSet.forEach(desc -> desc.release());
if (!(readyList.isEmpty() && runningDescSet.isEmpty())) {
StringBuilder sb = new StringBuilder();
sb.append(readyList.isEmpty() ? "" : "not all data consumed. ");
sb.append(runningDescSet.isEmpty() ? "" : "some data is on flight");
throw new IllegalStateException(sb.toString());
}
}
}
Loading

0 comments on commit d2bbc03

Please sign in to comment.