Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into df-self-join
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Mar 29, 2016
2 parents 86a8877 + 38326ca commit b35f980
Show file tree
Hide file tree
Showing 107 changed files with 6,599 additions and 1,163 deletions.
1 change: 1 addition & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(BSD 3 Clause) netlib core (com.github.fommil.netlib:core:1.1.2 - https://github.com/fommil/netlib-java/core)
(BSD 3 Clause) JPMML-Model (org.jpmml:pmml-model:1.2.7 - https://github.com/jpmml/jpmml-model)
(BSD License) AntLR Parser Generator (antlr:antlr:2.7.7 - http://www.antlr.org/)
(BSD License) ANTLR 4.5.2-1 (org.antlr:antlr4:4.5.2-1 - http://wwww.antlr.org/)
(BSD licence) ANTLR ST4 4.0.4 (org.antlr:ST4:4.0.4 - http://www.stringtemplate.org)
(BSD licence) ANTLR StringTemplate (org.antlr:stringtemplate:3.2.1 - http://www.stringtemplate.org)
(BSD License) Javolution (javolution:javolution:5.5.1 - http://javolution.org)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.api.*;
import org.slf4j.Logger;
Expand Down Expand Up @@ -118,7 +119,7 @@ protected void serviceInit(Configuration conf) {
// an application was stopped while the NM was down, we expect yarn to call stopApplication()
// when it comes back
registeredExecutorFile =
findRegisteredExecutorFile(conf.getStrings("yarn.nodemanager.local-dirs"));
findRegisteredExecutorFile(conf.getTrimmedStrings("yarn.nodemanager.local-dirs"));

TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
// If authentication is enabled, set up the shuffle server to use a
Expand Down Expand Up @@ -191,12 +192,12 @@ public void stopContainer(ContainerTerminationContext context) {

private File findRegisteredExecutorFile(String[] localDirs) {
for (String dir: localDirs) {
File f = new File(dir, "registeredExecutors.ldb");
File f = new File(new Path(dir).toUri().getPath(), "registeredExecutors.ldb");
if (f.exists()) {
return f;
}
}
return new File(localDirs[0], "registeredExecutors.ldb");
return new File(new Path(localDirs[0]).toUri().getPath(), "registeredExecutors.ldb");
}

/**
Expand Down
113 changes: 73 additions & 40 deletions core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@
* Bytes 4 to 8: len(k)
* Bytes 8 to 8 + len(k): key data
* Bytes 8 + len(k) to 8 + len(k) + len(v): value data
* Bytes 8 + len(k) + len(v) to 8 + len(k) + len(v) + 8: pointer to next pair
*
* This means that the first four bytes store the entire record (key + value) length. This format
* is consistent with {@link org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter},
* is compatible with {@link org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter},
* so we can pass records from this map directly into the sorter to sort records in place.
*/
public final class BytesToBytesMap extends MemoryConsumer {
Expand Down Expand Up @@ -132,7 +133,12 @@ public final class BytesToBytesMap extends MemoryConsumer {
/**
* Number of keys defined in the map.
*/
private int numElements;
private int numKeys;

/**
* Number of values defined in the map. A key could have multiple values.
*/
private int numValues;

/**
* The map will be expanded once the number of keys exceeds this threshold.
Expand Down Expand Up @@ -223,7 +229,12 @@ public BytesToBytesMap(
/**
* Returns the number of keys defined in the map.
*/
public int numElements() { return numElements; }
public int numKeys() { return numKeys; }

/**
* Returns the number of values defined in the map. A key could have multiple values.
*/
public int numValues() { return numValues; }

public final class MapIterator implements Iterator<Location> {

Expand Down Expand Up @@ -311,7 +322,8 @@ public Location next() {
if (currentPage != null) {
int totalLength = Platform.getInt(pageBaseObject, offsetInPage);
loc.with(currentPage, offsetInPage);
offsetInPage += 4 + totalLength;
// [total size] [key size] [key] [value] [pointer to next]
offsetInPage += 4 + totalLength + 8;
recordsInPage --;
return loc;
} else {
Expand Down Expand Up @@ -361,7 +373,7 @@ public long spill(long numBytes) throws IOException {
while (numRecords > 0) {
int length = Platform.getInt(base, offset);
writer.write(base, offset + 4, length, 0);
offset += 4 + length;
offset += 4 + length + 8;
numRecords--;
}
writer.close();
Expand Down Expand Up @@ -395,7 +407,7 @@ public void remove() {
* `lookup()`, the behavior of the returned iterator is undefined.
*/
public MapIterator iterator() {
return new MapIterator(numElements, loc, false);
return new MapIterator(numValues, loc, false);
}

/**
Expand All @@ -409,7 +421,7 @@ public MapIterator iterator() {
* `lookup()`, the behavior of the returned iterator is undefined.
*/
public MapIterator destructiveIterator() {
return new MapIterator(numElements, loc, true);
return new MapIterator(numValues, loc, true);
}

/**
Expand Down Expand Up @@ -559,6 +571,20 @@ private Location with(Object base, long offset, int length) {
return this;
}

/**
* Find the next pair that has the same key as current one.
*/
public boolean nextValue() {
assert isDefined;
long nextAddr = Platform.getLong(baseObject, valueOffset + valueLength);
if (nextAddr == 0) {
return false;
} else {
updateAddressesAndSizes(nextAddr);
return true;
}
}

/**
* Returns the memory page that contains the current record.
* This is only valid if this is returned by {@link BytesToBytesMap#iterator()}.
Expand Down Expand Up @@ -625,10 +651,9 @@ public int getValueLength() {
}

/**
* Store a new key and value. This method may only be called once for a given key; if you want
* to update the value associated with a key, then you can directly manipulate the bytes stored
* at the value address. The return value indicates whether the put succeeded or whether it
* failed because additional memory could not be acquired.
* Append a new value for the key. This method could be called multiple times for a given key.
* The return value indicates whether the put succeeded or whether it failed because additional
* memory could not be acquired.
* <p>
* It is only valid to call this method immediately after calling `lookup()` using the same key.
* </p>
Expand All @@ -637,15 +662,15 @@ public int getValueLength() {
* </p>
* <p>
* After calling this method, calls to `get[Key|Value]Address()` and `get[Key|Value]Length`
* will return information on the data stored by this `putNewKey` call.
* will return information on the data stored by this `append` call.
* </p>
* <p>
* As an example usage, here's the proper way to store a new key:
* </p>
* <pre>
* Location loc = map.lookup(keyBase, keyOffset, keyLength);
* if (!loc.isDefined()) {
* if (!loc.putNewKey(keyBase, keyOffset, keyLength, ...)) {
* if (!loc.append(keyBase, keyOffset, keyLength, ...)) {
* // handle failure to grow map (by spilling, for example)
* }
* }
Expand All @@ -657,26 +682,23 @@ public int getValueLength() {
* @return true if the put() was successful and false if the put() failed because memory could
* not be acquired.
*/
public boolean putNewKey(Object keyBase, long keyOffset, int keyLength,
Object valueBase, long valueOffset, int valueLength) {
assert (!isDefined) : "Can only set value once for a key";
assert (keyLength % 8 == 0);
assert (valueLength % 8 == 0);
assert(longArray != null);

public boolean append(Object kbase, long koff, int klen, Object vbase, long voff, int vlen) {
assert (klen % 8 == 0);
assert (vlen % 8 == 0);
assert (longArray != null);

if (numElements == MAX_CAPACITY
if (numKeys == MAX_CAPACITY
// The map could be reused from last spill (because of no enough memory to grow),
// then we don't try to grow again if hit the `growthThreshold`.
|| !canGrowArray && numElements > growthThreshold) {
|| !canGrowArray && numKeys > growthThreshold) {
return false;
}

// Here, we'll copy the data into our data pages. Because we only store a relative offset from
// the key address instead of storing the absolute address of the value, the key and value
// must be stored in the same memory page.
// (8 byte key length) (key) (value)
final long recordLength = 8 + keyLength + valueLength;
// (8 byte key length) (key) (value) (8 byte pointer to next value)
final long recordLength = 8 + klen + vlen + 8;
if (currentPage == null || currentPage.size() - pageCursor < recordLength) {
if (!acquireNewPage(recordLength + 4L)) {
return false;
Expand All @@ -687,30 +709,40 @@ public boolean putNewKey(Object keyBase, long keyOffset, int keyLength,
final Object base = currentPage.getBaseObject();
long offset = currentPage.getBaseOffset() + pageCursor;
final long recordOffset = offset;
Platform.putInt(base, offset, keyLength + valueLength + 4);
Platform.putInt(base, offset + 4, keyLength);
Platform.putInt(base, offset, klen + vlen + 4);
Platform.putInt(base, offset + 4, klen);
offset += 8;
Platform.copyMemory(keyBase, keyOffset, base, offset, keyLength);
offset += keyLength;
Platform.copyMemory(valueBase, valueOffset, base, offset, valueLength);
Platform.copyMemory(kbase, koff, base, offset, klen);
offset += klen;
Platform.copyMemory(vbase, voff, base, offset, vlen);
offset += vlen;
Platform.putLong(base, offset, 0);

// --- Update bookkeeping data structures ----------------------------------------------------
offset = currentPage.getBaseOffset();
Platform.putInt(base, offset, Platform.getInt(base, offset) + 1);
pageCursor += recordLength;
numElements++;
final long storedKeyAddress = taskMemoryManager.encodePageNumberAndOffset(
currentPage, recordOffset);
longArray.set(pos * 2, storedKeyAddress);
longArray.set(pos * 2 + 1, keyHashcode);
updateAddressesAndSizes(storedKeyAddress);
isDefined = true;
numValues++;
if (isDefined) {
// put this pair at the end of chain
while (nextValue()) { /* do nothing */ }
Platform.putLong(baseObject, valueOffset + valueLength, storedKeyAddress);
nextValue(); // point to new added value
} else {
numKeys++;
longArray.set(pos * 2, storedKeyAddress);
longArray.set(pos * 2 + 1, keyHashcode);
updateAddressesAndSizes(storedKeyAddress);
isDefined = true;

if (numElements > growthThreshold && longArray.size() < MAX_CAPACITY) {
try {
growAndRehash();
} catch (OutOfMemoryError oom) {
canGrowArray = false;
if (numKeys > growthThreshold && longArray.size() < MAX_CAPACITY) {
try {
growAndRehash();
} catch (OutOfMemoryError oom) {
canGrowArray = false;
}
}
}
return true;
Expand Down Expand Up @@ -866,7 +898,8 @@ public LongArray getArray() {
* Reset this map to initialized state.
*/
public void reset() {
numElements = 0;
numKeys = 0;
numValues = 0;
longArray.zeroOut();

while (dataPages.size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,15 @@ private[spark] class CoarseGrainedExecutorBackend(

case Shutdown =>
stopping.set(true)
executor.stop()
stop()
rpcEnv.shutdown()
new Thread("CoarseGrainedExecutorBackend-stop-executor") {
override def run(): Unit = {
// executor.stop() will call `SparkEnv.stop()` which waits until RpcEnv stops totally.
// However, if `executor.stop()` runs in some thread of RpcEnv, RpcEnv won't be able to
// stop until `executor.stop()` returns, which becomes a dead-lock (See SPARK-14180).
// Therefore, we put this line in a new thread.
executor.stop()
}
}.start()
}

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,20 +356,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

/**
* Reset the state of CoarseGrainedSchedulerBackend to the initial state. Currently it will only
* be called in the yarn-client mode when AM re-registers after a failure, also dynamic
* allocation is enabled.
* be called in the yarn-client mode when AM re-registers after a failure.
* */
protected def reset(): Unit = synchronized {
if (Utils.isDynamicAllocationEnabled(conf)) {
numPendingExecutors = 0
executorsPendingToRemove.clear()

// Remove all the lingering executors that should be removed but not yet. The reason might be
// because (1) disconnected event is not yet received; (2) executors die silently.
executorDataMap.toMap.foreach { case (eid, _) =>
driverEndpoint.askWithRetry[Boolean](
RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered.")))
}
numPendingExecutors = 0
executorsPendingToRemove.clear()

// Remove all the lingering executors that should be removed but not yet. The reason might be
// because (1) disconnected event is not yet received; (2) executors die silently.
executorDataMap.toMap.foreach { case (eid, _) =>
driverEndpoint.askWithRetry[Boolean](
RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered.")))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,6 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener {
}
}

override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
val info = taskEnd.taskInfo
val metrics = taskEnd.taskMetrics
if (info != null && metrics != null) {
val updatedBlocks = metrics.updatedBlockStatuses
if (updatedBlocks.length > 0) {
updateStorageStatus(info.executorId, updatedBlocks)
}
}
}

override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = synchronized {
updateStorageStatus(unpersistRDD.rddId)
}
Expand All @@ -102,4 +91,14 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener {
}
}
}

override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
val executorId = blockUpdated.blockUpdatedInfo.blockManagerId.executorId
val blockId = blockUpdated.blockUpdatedInfo.blockId
val storageLevel = blockUpdated.blockUpdatedInfo.storageLevel
val memSize = blockUpdated.blockUpdatedInfo.memSize
val diskSize = blockUpdated.blockUpdatedInfo.diskSize
val blockStatus = BlockStatus(storageLevel, memSize, diskSize)
updateStorageStatus(executorId, Seq((blockId, blockStatus)))
}
}
21 changes: 10 additions & 11 deletions core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,6 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc
StorageUtils.updateRddInfo(rddInfosToUpdate, activeStorageStatusList)
}

/**
* Assumes the storage status list is fully up-to-date. This implies the corresponding
* StorageStatusSparkListener must process the SparkListenerTaskEnd event before this listener.
*/
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
val metrics = taskEnd.taskMetrics
if (metrics != null && metrics.updatedBlockStatuses.nonEmpty) {
updateRDDInfo(metrics.updatedBlockStatuses)
}
}

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
val rddInfos = stageSubmitted.stageInfo.rddInfos
rddInfos.foreach { info => _rddInfoMap.getOrElseUpdate(info.id, info) }
Expand All @@ -84,4 +73,14 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc
override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = synchronized {
_rddInfoMap.remove(unpersistRDD.rddId)
}

override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
super.onBlockUpdated(blockUpdated)
val blockId = blockUpdated.blockUpdatedInfo.blockId
val storageLevel = blockUpdated.blockUpdatedInfo.storageLevel
val memSize = blockUpdated.blockUpdatedInfo.memSize
val diskSize = blockUpdated.blockUpdatedInfo.diskSize
val blockStatus = BlockStatus(storageLevel, memSize, diskSize)
updateRDDInfo(Seq((blockId, blockStatus)))
}
}
Loading

0 comments on commit b35f980

Please sign in to comment.