Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REMOTE-SHUFFLE-11] Support DAOS Object Async API #14

Merged
merged 20 commits into from
May 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ differences here.

```
spark.shuffle.manager org.apache.spark.shuffle.daos.DaosShuffleManager
spark.shuffle.daos.pool.uuid <POOL UUID>
spark.shuffle.daos.container.uuid <CONTAINER UUID>
```

### Classpath
Expand All @@ -241,4 +243,34 @@ differences here.
spark.driver.extraClassPath $HOME/miniconda2/envs/oapenv/oap_jars/daos-java-<version>.jar
$HOME/miniconda2/envs/oapenv/oap_jars/hadoop-daos-<version>.jar
$HOME/miniconda2/envs/oapenv/oap_jars/shuffle-daos-<version>.jar
```
```

### Configuration

There are some configurations for tuning shuffle IO. You can find all of them from package,
"org.apache.spark.shuffle.daos". Here are some of them.

```
spark.shuffle.daos.io.async true
```
This shuffle plugin supports both sync IO and async IO. The default is async IO. You can change it by setting this
value to "false". Most of other config items are valid for both sync IO and async IO. For sync or async only config
items, you can find them from either its doc or name itself.


```
spark.shuffle.remove.shuffle.data true
```
All shuffled data is written to DAOS container. They should be deleted after job is done to save space. If you want
to review the shuffled data after job, you can set it to "false".

```
spark.shuffle.daos.read.wait.ms 5000
spark.shuffle.daos.write.wait.ms 5000
```
They are maximum milliseconds to wait before throwing TimedOutException.

```
spark.shuffle.daos.write.buffer 800m
```
Total in-memory buffer size of each map task. You can tune it for you environment.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.intel.oap</groupId>
<artifactId>remote-shuffle-parent</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
<name>OAP Remote Shuffle Parent POM</name>
<packaging>pom</packaging>

Expand Down
2 changes: 1 addition & 1 deletion shuffle-daos/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
A remote shuffle plugin based on DAOS Object API. You can find DAOS and DAOS Java Wrapper in https://github.com/daos-stack/daos and https://github.com/daos-stack/daos/tree/master/src/client/java.
Thanks to DAOS, the plugin is espacially good for small shuffle block, such as around 200KB.

# TO BE CONTINUED
See Shuffle DAOS related documentation in [Readme under project root](../README.md).
2 changes: 1 addition & 1 deletion shuffle-daos/dev/checkstyle.license
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* (C) Copyright 2018-2020 Intel Corporation.
* (C) Copyright 2018-2021 Intel Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
6 changes: 3 additions & 3 deletions shuffle-daos/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.intel.oap</groupId>
<artifactId>remote-shuffle-parent</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
</parent>
<artifactId>shuffle-daos</artifactId>
<name>OAP Remote Shuffle Based on DAOS Object API</name>
Expand Down Expand Up @@ -205,7 +205,7 @@
<dependency>
<groupId>io.daos</groupId>
<artifactId>daos-java</artifactId>
<version>1.1.4</version>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down Expand Up @@ -241,4 +241,4 @@
</dependency>
</dependencies>

</project>
</project>
2 changes: 1 addition & 1 deletion shuffle-daos/scalastyle-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ This file is divided into 3 sections:
<check level="error" class="org.scalastyle.file.HeaderMatchesChecker" enabled="true">
<parameters>
<parameter name="header"><![CDATA[/*
* (C) Copyright 2018-2020 Intel Corporation.
* (C) Copyright 2018-2021 Intel Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* (C) Copyright 2018-2020 Intel Corporation.
* (C) Copyright 2018-2021 Intel Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,6 @@
* portions thereof marked with this legend must also reproduce the markings.
*/


package org.apache.spark.shuffle.daos;

import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* (C) Copyright 2018-2020 Intel Corporation.
* (C) Copyright 2018-2021 Intel Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -84,6 +84,11 @@ void prepare(LinkedHashMap<Tuple2<Long, Integer>, Tuple3<Long, BlockId, BlockMan
*/
Tuple2<Long, Integer> curMapReduceId();

/**
* find next mapReduce Id
*/
void nextMapReduceId();

/**
* get available buffer after iterating current buffer, next buffer in current desc and next desc.
*
Expand Down Expand Up @@ -128,26 +133,30 @@ final class ReaderConfig {
private int waitDataTimeMs;
private int waitTimeoutTimes;
private boolean fromOtherThread;
private SparkConf conf;

private static final Logger log = LoggerFactory.getLogger(ReaderConfig.class);

public ReaderConfig() {
this(true);
public ReaderConfig(SparkConf conf) {
this(conf, true);
}

private ReaderConfig(boolean load) {
private ReaderConfig(SparkConf conf, boolean load) {
this.conf = conf;
if (load) {
initialize();
}
}

private void initialize() {
SparkConf conf = SparkEnv.get().conf();
if (conf == null) {
this.conf = SparkEnv.get().conf();
}
minReadSize = (long)conf.get(package$.MODULE$.SHUFFLE_DAOS_READ_MINIMUM_SIZE()) * 1024;
this.maxBytesInFlight = -1L;
this.maxMem = -1L;
this.readBatchSize = (int)conf.get(package$.MODULE$.SHUFFLE_DAOS_READ_BATCH_SIZE());
this.waitDataTimeMs = (int)conf.get(package$.MODULE$.SHUFFLE_DAOS_READ_WAIT_DATA_MS());
this.waitDataTimeMs = (int)conf.get(package$.MODULE$.SHUFFLE_DAOS_READ_WAIT_MS());
this.waitTimeoutTimes = (int)conf.get(package$.MODULE$.SHUFFLE_DAOS_READ_WAIT_DATA_TIMEOUT_TIMES());
this.fromOtherThread = (boolean)conf.get(package$.MODULE$.SHUFFLE_DAOS_READ_FROM_OTHER_THREAD());
if (log.isDebugEnabled()) {
Expand All @@ -162,7 +171,7 @@ private void initialize() {
}

public ReaderConfig copy(long maxBytesInFlight, long maxMem) {
ReaderConfig rc = new ReaderConfig(false);
ReaderConfig rc = new ReaderConfig(conf, false);
rc.maxMem = maxMem;
rc.minReadSize = minReadSize;
rc.readBatchSize = readBatchSize;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* (C) Copyright 2018-2021 Intel Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* GOVERNMENT LICENSE RIGHTS-OPEN SOURCE SOFTWARE
* The Government's rights to use, modify, reproduce, release, perform, display,
* or disclose this software are subject to the terms of the Apache License as
* provided in Contract No. B609815.
* Any reproduction of computer software, computer software documentation, or
* portions thereof marked with this legend must also reproduce the markings.
*/

package org.apache.spark.shuffle.daos;

import io.daos.DaosEventQueue;
import io.daos.TimedOutException;
import io.daos.obj.DaosObject;
import io.daos.obj.IODataDescBase;
import io.daos.obj.IOSimpleDDAsync;
import io.netty.buffer.ByteBuf;

import java.io.IOException;
import java.util.*;

public class DaosReaderAsync extends DaosReaderBase {

private DaosEventQueue eq;

private Set<IOSimpleDDAsync> runningDescSet = new LinkedHashSet<>();

private LinkedList<IOSimpleDDAsync> readyList = new LinkedList<>();

private List<DaosEventQueue.Attachment> completedList = new LinkedList<>();

public DaosReaderAsync(DaosObject object, ReaderConfig config) throws IOException {
super(object, config);
eq = DaosEventQueue.getInstance(0);
}

@Override
protected IODataDescBase createFetchDataDesc(String reduceId) throws IOException {
return object.createAsyncDataDescForFetch(reduceId, eq.getEqWrapperHdl());
}

@Override
protected void addFetchEntry(IODataDescBase desc, String mapId, long offset, long readSize) throws IOException {
if (readSize > Integer.MAX_VALUE) {
throw new IllegalArgumentException("readSize should not exceed " + Integer.MAX_VALUE);
}
((IOSimpleDDAsync) desc).addEntryForFetch(mapId, offset, (int)readSize);
}

@Override
public ByteBuf nextBuf() throws IOException {
ByteBuf buf = tryCurrentEntry();
if (buf != null) {
return buf;
}
// next entry
buf = tryCurrentDesc();
if (buf != null) {
return buf;
}
// next desc
entryIdx = 0;
return readFromDaos();
}

private ByteBuf enterNewDesc(IOSimpleDDAsync desc) throws IOException {
if (currentDesc != null) {
currentDesc.release();
}
currentDesc = desc;
return validateLastEntryAndGetBuf(desc.getEntry(entryIdx));
}

private IOSimpleDDAsync nextDesc() throws IOException {
return readyList.poll();
}

private void progress() throws IOException {
if (runningDescSet.isEmpty()) {
return;
}
completedList.clear();
long timeOutMs = config.getWaitDataTimeMs();
long start = System.currentTimeMillis();
int n = eq.pollCompleted(completedList, runningDescSet.size(), timeOutMs);
while (n == 0) {
long dur = System.currentTimeMillis() - start;
if (dur > timeOutMs) {
throw new TimedOutException("timed out after " + dur);
}
n = eq.pollCompleted(completedList, runningDescSet.size(), timeOutMs - dur);
}
verifyCompleted();
}

private ByteBuf readFromDaos() throws IOException {
if (runningDescSet.isEmpty()) {
DaosEventQueue.Event event = null;
TimedOutException te = null;
try {
event = acquireEvent();
} catch (TimedOutException e) {
te = e;
}
IOSimpleDDAsync taskDesc = (IOSimpleDDAsync) createNextDesc(config.getMaxBytesInFlight());
if (taskDesc == null) {
if (event != null) {
event.putBack();
}
return null;
}
if (te != null) { // have data to read, but no event
throw te;
}
runningDescSet.add(taskDesc);
taskDesc.setEvent(event);
try {
object.fetchAsync(taskDesc);
} catch (IOException e) {
taskDesc.release();
runningDescSet.remove(taskDesc);
throw e;
}
}
progress();
IOSimpleDDAsync desc = nextDesc();
if (desc != null) {
return enterNewDesc(desc);
}
return null;
}

private DaosEventQueue.Event acquireEvent() throws IOException {
completedList.clear();
DaosEventQueue.Event event = eq.acquireEventBlocking(config.getWaitDataTimeMs(), completedList);
verifyCompleted();
return event;
}

private void verifyCompleted() throws IOException {
IOSimpleDDAsync failed = null;
int failedCnt = 0;
for (DaosEventQueue.Attachment attachment : completedList) {
if (runningDescSet.contains(attachment)) {
IOSimpleDDAsync desc = (IOSimpleDDAsync) attachment;
runningDescSet.remove(attachment);
if (desc.isSucceeded()) {
readyList.add(desc);
continue;
}
failedCnt++;
if (failed == null) {
failed = desc;
}
}
}
if (failedCnt > 0) {
throw new IOException("failed to read " + failedCnt + " IOSimpleDDAsync. First failed is " + failed);
}
}

@Override
public void close(boolean force) {
readyList.forEach(desc -> desc.release());
runningDescSet.forEach(desc -> desc.release());
if (!(readyList.isEmpty() && runningDescSet.isEmpty())) {
StringBuilder sb = new StringBuilder();
sb.append(readyList.isEmpty() ? "" : "not all data consumed. ");
sb.append(runningDescSet.isEmpty() ? "" : "some data is on flight");
throw new IllegalStateException(sb.toString());
}
}
}
Loading