Skip to content

Commit

Permalink
Merge pull request #12 from yifeih/yh/ess-metadata-v1
Browse files Browse the repository at this point in the history
ESS mapoutput metadata
  • Loading branch information
ifilonenko authored Jan 18, 2019
2 parents c2231a0 + 4a12c93 commit 40ab79f
Show file tree
Hide file tree
Showing 27 changed files with 390 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public RegisterShuffleIndex(

@Override
public boolean equals(Object other) {
if (other != null && other instanceof UploadShufflePartitionStream) {
UploadShufflePartitionStream o = (UploadShufflePartitionStream) other;
if (other != null && other instanceof RegisterShuffleIndex) {
RegisterShuffleIndex o = (RegisterShuffleIndex) other;
return Objects.equal(appId, o.appId)
&& shuffleId == o.shuffleId
&& mapId == o.mapId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public UploadShuffleIndex(

@Override
public boolean equals(Object other) {
if (other != null && other instanceof UploadShufflePartitionStream) {
UploadShufflePartitionStream o = (UploadShufflePartitionStream) other;
if (other != null && other instanceof UploadShuffleIndex) {
UploadShuffleIndex o = (UploadShuffleIndex) other;
return Objects.equal(appId, o.appId)
&& shuffleId == o.shuffleId
&& mapId == o.mapId;
Expand Down
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,11 @@
<artifactId>py4j</artifactId>
<version>0.10.8.1</version>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-java8-compat_${scala.binary.version}</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.apache.spark.shuffle.api;

import org.apache.spark.storage.ShuffleLocation;

import java.util.Optional;

public interface CommittedPartition {

/**
* Indicates the number of bytes written in a committed partition.
* Note that returning the length is mainly for backwards compatibility
* and should be removed in a more polished variant. After this method
* is called, the writer will be discarded; it's expected that the
* implementation will close any underlying resources.
*/
long length();

/**
* Indicates the shuffle location to which this partition was written.
* Some implementations may not need to specify a shuffle location.
*/
Optional<ShuffleLocation> shuffleLocation();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,9 @@ public interface ShufflePartitionWriter {

/**
* Indicate that the partition was written successfully and there are no more incoming bytes.
* Returns the length of the partition that is written. Note that returning the length is
* mainly for backwards compatibility and should be removed in a more polished variant.
* After this method is called, the writer will be discarded; it's expected that the
* implementation will close any underlying resources.
* Returns a {@link CommittedPartition} indicating information about that written partition.
*/
long commitAndGetTotalLength();
CommittedPartition commitPartition();

/**
* Indicate that the write has failed for some reason and the implementation can handle the
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.apache.spark.shuffle.external;

import org.apache.spark.shuffle.api.CommittedPartition;
import org.apache.spark.storage.ShuffleLocation;

import java.util.Optional;

public class ExternalCommittedPartition implements CommittedPartition {

private final long length;
private final Optional<ShuffleLocation> shuffleLocation;

public ExternalCommittedPartition(long length) {
this.length = length;
this.shuffleLocation = Optional.empty();
}

public ExternalCommittedPartition(long length, ShuffleLocation shuffleLocation) {
this.length = length;
this.shuffleLocation = Optional.of(shuffleLocation);
}

@Override
public long length() {
return length;
}

@Override
public Optional<ShuffleLocation> shuffleLocation() {
return shuffleLocation;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.apache.spark.shuffle.external;

import org.apache.spark.MapOutputTracker;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import org.apache.spark.network.TransportContext;
Expand All @@ -20,6 +21,7 @@ public class ExternalShuffleDataIO implements ShuffleDataIO {
private static SecurityManager securityManager;
private static String hostname;
private static int port;
private static MapOutputTracker mapOutputTracker;

public ExternalShuffleDataIO(
SparkConf sparkConf) {
Expand All @@ -35,14 +37,15 @@ public void initialize() {
securityManager = env.securityManager();
hostname = blockManager.getRandomShuffleHost();
port = blockManager.getRandomShufflePort();
mapOutputTracker = env.mapOutputTracker();
// TODO: Register Driver and Executor
}

@Override
public ShuffleReadSupport readSupport() {
return new ExternalShuffleReadSupport(
conf, context, securityManager.isAuthenticationEnabled(),
securityManager, hostname, port);
conf, context, securityManager.isAuthenticationEnabled(),
securityManager, mapOutputTracker);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.apache.spark.shuffle.external;

import org.apache.hadoop.mapreduce.task.reduce.Shuffle;
import org.apache.spark.network.protocol.Encoders;
import org.apache.spark.storage.ShuffleLocation;

import java.io.*;

public class ExternalShuffleLocation implements ShuffleLocation {

private String shuffleHostname;
private int shufflePort;

public ExternalShuffleLocation() { /* for serialization */ }

public ExternalShuffleLocation(String shuffleHostname, int shufflePort) {
this.shuffleHostname = shuffleHostname;
this.shufflePort = shufflePort;
}

@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeUTF(shuffleHostname);
out.writeInt(shufflePort);
}

@Override
public void readExternal(ObjectInput in) throws IOException {
this.shuffleHostname = in.readUTF();
this.shufflePort = in.readInt();
}

public String getShuffleHostname() {
return this.shuffleHostname;
}

public int getShufflePort() {
return this.shufflePort;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import java.nio.ByteBuffer;


public class ExternalShuffleMapOutputWriter implements ShuffleMapOutputWriter {

private final TransportClientFactory clientFactory;
Expand Down Expand Up @@ -79,8 +80,8 @@ public void commitAllPartitions() {
logger.info("clientid: " + client.getClientId() + " " + client.isActive());
client.sendRpcSync(uploadShuffleIndex, 60000);
} catch (Exception e) {
client.close();
logger.error("Encountered error while creating transport client", e);
client.close();
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.shuffle.protocol.UploadShufflePartitionStream;
import org.apache.spark.shuffle.api.CommittedPartition;
import org.apache.spark.shuffle.api.ShufflePartitionWriter;
import org.apache.spark.storage.ShuffleLocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Optional;

public class ExternalShufflePartitionWriter implements ShufflePartitionWriter {

Expand Down Expand Up @@ -51,7 +54,7 @@ public ExternalShufflePartitionWriter(
public OutputStream openPartitionStream() { return partitionBuffer; }

@Override
public long commitAndGetTotalLength() {
public CommittedPartition commitPartition() {
RpcResponseCallback callback = new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
Expand Down Expand Up @@ -88,12 +91,11 @@ public void onFailure(Throwable e) {
} finally {
logger.info("Successfully sent partition to ESS");
}
return totalLength;
return new ExternalCommittedPartition(totalLength, new ExternalShuffleLocation(hostName, port));
}

@Override
public void abort(Exception failureReason) {
clientFactory.close();
try {
this.partitionBuffer.close();
} catch(IOException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.apache.spark.shuffle.external;

import com.google.common.collect.Lists;
import org.apache.spark.MapOutputTracker;
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.client.TransportClientBootstrap;
import org.apache.spark.network.client.TransportClientFactory;
Expand All @@ -9,10 +10,13 @@
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.shuffle.api.ShufflePartitionReader;
import org.apache.spark.shuffle.api.ShuffleReadSupport;
import org.apache.spark.storage.ShuffleLocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.compat.java8.OptionConverters;

import java.util.List;
import java.util.Optional;

public class ExternalShuffleReadSupport implements ShuffleReadSupport {

Expand All @@ -22,22 +26,19 @@ public class ExternalShuffleReadSupport implements ShuffleReadSupport {
private final TransportContext context;
private final boolean authEnabled;
private final SecretKeyHolder secretKeyHolder;
private final String hostName;
private final int port;
private final MapOutputTracker mapOutputTracker;

public ExternalShuffleReadSupport(
TransportConf conf,
TransportContext context,
boolean authEnabled,
SecretKeyHolder secretKeyHolder,
String hostName,
int port) {
MapOutputTracker mapOutputTracker) {
this.conf = conf;
this.context = context;
this.authEnabled = authEnabled;
this.secretKeyHolder = secretKeyHolder;
this.hostName = hostName;
this.port = port;
this.mapOutputTracker = mapOutputTracker;
}

@Override
Expand All @@ -47,10 +48,20 @@ public ShufflePartitionReader newPartitionReader(String appId, int shuffleId, in
if (authEnabled) {
bootstraps.add(new AuthClientBootstrap(conf, appId, secretKeyHolder));
}
Optional<ShuffleLocation> maybeShuffleLocation = OptionConverters.toJava(mapOutputTracker.getShuffleLocation(shuffleId, mapId, 0));
assert maybeShuffleLocation.isPresent();
ExternalShuffleLocation externalShuffleLocation = (ExternalShuffleLocation) maybeShuffleLocation.get();
logger.info(String.format("Found external shuffle location on node: %s:%d",
externalShuffleLocation.getShuffleHostname(),
externalShuffleLocation.getShufflePort()));
TransportClientFactory clientFactory = context.createClientFactory(bootstraps);
try {
return new ExternalShufflePartitionReader(clientFactory,
hostName, port, appId, shuffleId, mapId);
externalShuffleLocation.getShuffleHostname(),
externalShuffleLocation.getShufflePort(),
appId,
shuffleId,
mapId);
} catch (Exception e) {
clientFactory.close();
logger.error("Encountered creating transport client for partition reader");
Expand Down
Loading

0 comments on commit 40ab79f

Please sign in to comment.