Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into exchange-fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Aug 8, 2015
2 parents fee65c4 + 998f4ff commit 2c7e126
Show file tree
Hide file tree
Showing 179 changed files with 5,853 additions and 4,000 deletions.
2 changes: 1 addition & 1 deletion R/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ FAILED=0
LOGFILE=$FWDIR/unit-tests.out
rm -f $LOGFILE

SPARK_TESTING=1 $FWDIR/../bin/sparkR --conf spark.buffer.pageSize=4m --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
SPARK_TESTING=1 $FWDIR/../bin/sparkR --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
FAILED=$((PIPESTATUS[0]||$FAILED))

if [[ $FAILED != 0 ]]; then
Expand Down
9 changes: 3 additions & 6 deletions build/sbt-launch-lib.bash
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ dlog () {

acquire_sbt_jar () {
SBT_VERSION=`awk -F "=" '/sbt\.version/ {print $2}' ./project/build.properties`
URL1=http://typesafe.artifactoryonline.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
URL2=http://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
URL1=https://dl.bintray.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
JAR=build/sbt-launch-${SBT_VERSION}.jar

sbt_jar=$JAR
Expand All @@ -51,12 +50,10 @@ acquire_sbt_jar () {
printf "Attempting to fetch sbt\n"
JAR_DL="${JAR}.part"
if [ $(command -v curl) ]; then
(curl --fail --location --silent ${URL1} > "${JAR_DL}" ||\
(rm -f "${JAR_DL}" && curl --fail --location --silent ${URL2} > "${JAR_DL}")) &&\
curl --fail --location --silent ${URL1} > "${JAR_DL}" &&\
mv "${JAR_DL}" "${JAR}"
elif [ $(command -v wget) ]; then
(wget --quiet ${URL1} -O "${JAR_DL}" ||\
(rm -f "${JAR_DL}" && wget --quiet ${URL2} -O "${JAR_DL}")) &&\
wget --quiet ${URL1} -O "${JAR_DL}" &&\
mv "${JAR_DL}" "${JAR}"
else
printf "You do not have curl or wget installed, please install sbt manually from http://www.scala-sbt.org/\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.spark.shuffle.unsafe;

import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
import javax.annotation.Nullable;

import scala.Tuple2;

Expand All @@ -34,8 +34,11 @@
import org.apache.spark.serializer.DummySerializerInstance;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.ShuffleMemoryManager;
import org.apache.spark.storage.*;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.DiskBlockObjectWriter;
import org.apache.spark.storage.TempShuffleBlockId;
import org.apache.spark.unsafe.PlatformDependent;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.memory.TaskMemoryManager;
import org.apache.spark.util.Utils;
Expand Down Expand Up @@ -68,7 +71,7 @@ final class UnsafeShuffleExternalSorter {
private final int pageSizeBytes;
@VisibleForTesting
final int maxRecordSizeBytes;
private final TaskMemoryManager memoryManager;
private final TaskMemoryManager taskMemoryManager;
private final ShuffleMemoryManager shuffleMemoryManager;
private final BlockManager blockManager;
private final TaskContext taskContext;
Expand All @@ -91,7 +94,7 @@ final class UnsafeShuffleExternalSorter {
private long peakMemoryUsedBytes;

// These variables are reset after spilling:
@Nullable private UnsafeShuffleInMemorySorter sorter;
@Nullable private UnsafeShuffleInMemorySorter inMemSorter;
@Nullable private MemoryBlock currentPage = null;
private long currentPagePosition = -1;
private long freeSpaceInCurrentPage = 0;
Expand All @@ -105,7 +108,7 @@ public UnsafeShuffleExternalSorter(
int numPartitions,
SparkConf conf,
ShuffleWriteMetrics writeMetrics) throws IOException {
this.memoryManager = memoryManager;
this.taskMemoryManager = memoryManager;
this.shuffleMemoryManager = shuffleMemoryManager;
this.blockManager = blockManager;
this.taskContext = taskContext;
Expand All @@ -115,8 +118,7 @@ public UnsafeShuffleExternalSorter(
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.pageSizeBytes = (int) Math.min(
PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES,
conf.getSizeAsBytes("spark.buffer.pageSize", "64m"));
PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, shuffleMemoryManager.pageSizeBytes());
this.maxRecordSizeBytes = pageSizeBytes - 4;
this.writeMetrics = writeMetrics;
initializeForWriting();
Expand All @@ -134,7 +136,7 @@ private void initializeForWriting() throws IOException {
throw new IOException("Could not acquire " + memoryRequested + " bytes of memory");
}

this.sorter = new UnsafeShuffleInMemorySorter(initialSize);
this.inMemSorter = new UnsafeShuffleInMemorySorter(initialSize);
}

/**
Expand All @@ -161,7 +163,7 @@ private void writeSortedFile(boolean isLastFile) throws IOException {

// This call performs the actual sort.
final UnsafeShuffleInMemorySorter.UnsafeShuffleSorterIterator sortedRecords =
sorter.getSortedIterator();
inMemSorter.getSortedIterator();

// Currently, we need to open a new DiskBlockObjectWriter for each partition; we can avoid this
// after SPARK-5581 is fixed.
Expand Down Expand Up @@ -207,8 +209,8 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
}

final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
final Object recordPage = memoryManager.getPage(recordPointer);
final long recordOffsetInPage = memoryManager.getOffsetInPage(recordPointer);
final Object recordPage = taskMemoryManager.getPage(recordPointer);
final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer);
int dataRemaining = PlatformDependent.UNSAFE.getInt(recordPage, recordOffsetInPage);
long recordReadPosition = recordOffsetInPage + 4; // skip over record length
while (dataRemaining > 0) {
Expand Down Expand Up @@ -270,9 +272,9 @@ void spill() throws IOException {
spills.size() > 1 ? " times" : " time");

writeSortedFile(false);
final long sorterMemoryUsage = sorter.getMemoryUsage();
sorter = null;
shuffleMemoryManager.release(sorterMemoryUsage);
final long inMemSorterMemoryUsage = inMemSorter.getMemoryUsage();
inMemSorter = null;
shuffleMemoryManager.release(inMemSorterMemoryUsage);
final long spillSize = freeMemory();
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);

Expand All @@ -284,7 +286,7 @@ private long getMemoryUsage() {
for (MemoryBlock page : allocatedPages) {
totalPageSize += page.size();
}
return ((sorter == null) ? 0 : sorter.getMemoryUsage()) + totalPageSize;
return ((inMemSorter == null) ? 0 : inMemSorter.getMemoryUsage()) + totalPageSize;
}

private void updatePeakMemoryUsed() {
Expand All @@ -306,7 +308,7 @@ private long freeMemory() {
updatePeakMemoryUsed();
long memoryFreed = 0;
for (MemoryBlock block : allocatedPages) {
memoryManager.freePage(block);
taskMemoryManager.freePage(block);
shuffleMemoryManager.release(block.size());
memoryFreed += block.size();
}
Expand All @@ -320,54 +322,53 @@ private long freeMemory() {
/**
* Force all memory and spill files to be deleted; called by shuffle error-handling code.
*/
public void cleanupAfterError() {
public void cleanupResources() {
freeMemory();
for (SpillInfo spill : spills) {
if (spill.file.exists() && !spill.file.delete()) {
logger.error("Unable to delete spill file {}", spill.file.getPath());
}
}
if (sorter != null) {
shuffleMemoryManager.release(sorter.getMemoryUsage());
sorter = null;
if (inMemSorter != null) {
shuffleMemoryManager.release(inMemSorter.getMemoryUsage());
inMemSorter = null;
}
}

/**
* Checks whether there is enough space to insert a new record into the sorter.
*
* @param requiredSpace the required space in the data page, in bytes, including space for storing
* the record size.
* @return true if the record can be inserted without requiring more allocations, false otherwise.
*/
private boolean haveSpaceForRecord(int requiredSpace) {
assert (requiredSpace > 0);
return (sorter.hasSpaceForAnotherRecord() && (requiredSpace <= freeSpaceInCurrentPage));
}

/**
* Allocates more memory in order to insert an additional record. This will request additional
* memory from the {@link ShuffleMemoryManager} and spill if the requested memory can not be
* obtained.
*
* @param requiredSpace the required space in the data page, in bytes, including space for storing
* the record size.
* Checks whether there is enough space to insert an additional record in to the sort pointer
* array and grows the array if additional space is required. If the required space cannot be
* obtained, then the in-memory data will be spilled to disk.
*/
private void allocateSpaceForRecord(int requiredSpace) throws IOException {
if (!sorter.hasSpaceForAnotherRecord()) {
private void growPointerArrayIfNecessary() throws IOException {
assert(inMemSorter != null);
if (!inMemSorter.hasSpaceForAnotherRecord()) {
logger.debug("Attempting to expand sort pointer array");
final long oldPointerArrayMemoryUsage = sorter.getMemoryUsage();
final long oldPointerArrayMemoryUsage = inMemSorter.getMemoryUsage();
final long memoryToGrowPointerArray = oldPointerArrayMemoryUsage * 2;
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryToGrowPointerArray);
if (memoryAcquired < memoryToGrowPointerArray) {
shuffleMemoryManager.release(memoryAcquired);
spill();
} else {
sorter.expandPointerArray();
inMemSorter.expandPointerArray();
shuffleMemoryManager.release(oldPointerArrayMemoryUsage);
}
}
}

/**
* Allocates more memory in order to insert an additional record. This will request additional
* memory from the {@link ShuffleMemoryManager} and spill if the requested memory can not be
* obtained.
*
* @param requiredSpace the required space in the data page, in bytes, including space for storing
* the record size. This must be less than or equal to the page size (records
* that exceed the page size are handled via a different code path which uses
* special overflow pages).
*/
private void acquireNewPageIfNecessary(int requiredSpace) throws IOException {
growPointerArrayIfNecessary();
if (requiredSpace > freeSpaceInCurrentPage) {
logger.trace("Required space {} is less than free space in current page ({})", requiredSpace,
freeSpaceInCurrentPage);
Expand All @@ -388,7 +389,7 @@ private void allocateSpaceForRecord(int requiredSpace) throws IOException {
throw new IOException("Unable to acquire " + pageSizeBytes + " bytes of memory");
}
}
currentPage = memoryManager.allocatePage(pageSizeBytes);
currentPage = taskMemoryManager.allocatePage(pageSizeBytes);
currentPagePosition = currentPage.getBaseOffset();
freeSpaceInCurrentPage = pageSizeBytes;
allocatedPages.add(currentPage);
Expand All @@ -404,27 +405,58 @@ public void insertRecord(
long recordBaseOffset,
int lengthInBytes,
int partitionId) throws IOException {

growPointerArrayIfNecessary();
// Need 4 bytes to store the record length.
final int totalSpaceRequired = lengthInBytes + 4;
if (!haveSpaceForRecord(totalSpaceRequired)) {
allocateSpaceForRecord(totalSpaceRequired);

// --- Figure out where to insert the new record ----------------------------------------------

final MemoryBlock dataPage;
long dataPagePosition;
boolean useOverflowPage = totalSpaceRequired > pageSizeBytes;
if (useOverflowPage) {
long overflowPageSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(totalSpaceRequired);
// The record is larger than the page size, so allocate a special overflow page just to hold
// that record.
final long memoryGranted = shuffleMemoryManager.tryToAcquire(overflowPageSize);
if (memoryGranted != overflowPageSize) {
shuffleMemoryManager.release(memoryGranted);
spill();
final long memoryGrantedAfterSpill = shuffleMemoryManager.tryToAcquire(overflowPageSize);
if (memoryGrantedAfterSpill != overflowPageSize) {
shuffleMemoryManager.release(memoryGrantedAfterSpill);
throw new IOException("Unable to acquire " + overflowPageSize + " bytes of memory");
}
}
MemoryBlock overflowPage = taskMemoryManager.allocatePage(overflowPageSize);
allocatedPages.add(overflowPage);
dataPage = overflowPage;
dataPagePosition = overflowPage.getBaseOffset();
} else {
// The record is small enough to fit in a regular data page, but the current page might not
// have enough space to hold it (or no pages have been allocated yet).
acquireNewPageIfNecessary(totalSpaceRequired);
dataPage = currentPage;
dataPagePosition = currentPagePosition;
// Update bookkeeping information
freeSpaceInCurrentPage -= totalSpaceRequired;
currentPagePosition += totalSpaceRequired;
}
final Object dataPageBaseObject = dataPage.getBaseObject();

final long recordAddress =
memoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition);
final Object dataPageBaseObject = currentPage.getBaseObject();
PlatformDependent.UNSAFE.putInt(dataPageBaseObject, currentPagePosition, lengthInBytes);
currentPagePosition += 4;
freeSpaceInCurrentPage -= 4;
taskMemoryManager.encodePageNumberAndOffset(dataPage, dataPagePosition);
PlatformDependent.UNSAFE.putInt(dataPageBaseObject, dataPagePosition, lengthInBytes);
dataPagePosition += 4;
PlatformDependent.copyMemory(
recordBaseObject,
recordBaseOffset,
dataPageBaseObject,
currentPagePosition,
dataPagePosition,
lengthInBytes);
currentPagePosition += lengthInBytes;
freeSpaceInCurrentPage -= lengthInBytes;
sorter.insertRecord(recordAddress, partitionId);
assert(inMemSorter != null);
inMemSorter.insertRecord(recordAddress, partitionId);
}

/**
Expand All @@ -436,14 +468,14 @@ public void insertRecord(
*/
public SpillInfo[] closeAndGetSpills() throws IOException {
try {
if (sorter != null) {
if (inMemSorter != null) {
// Do not count the final file towards the spill count.
writeSortedFile(true);
freeMemory();
}
return spills.toArray(new SpillInfo[spills.size()]);
} catch (IOException e) {
cleanupAfterError();
cleanupResources();
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@

package org.apache.spark.shuffle.unsafe;

import javax.annotation.Nullable;
import java.io.*;
import java.nio.channels.FileChannel;
import java.util.Iterator;
import javax.annotation.Nullable;

import scala.Option;
import scala.Product2;
import scala.collection.JavaConversions;
import scala.collection.immutable.Map;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.collection.immutable.Map;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.ByteStreams;
Expand All @@ -38,10 +38,10 @@

import org.apache.spark.*;
import org.apache.spark.annotation.Private;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.io.CompressionCodec;
import org.apache.spark.io.CompressionCodec$;
import org.apache.spark.io.LZFCompressionCodec;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.network.util.LimitedInputStream;
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.scheduler.MapStatus$;
Expand Down Expand Up @@ -178,7 +178,7 @@ public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOEx
} finally {
if (sorter != null) {
try {
sorter.cleanupAfterError();
sorter.cleanupResources();
} catch (Exception e) {
// Only throw this error if we won't be masking another
// error.
Expand Down Expand Up @@ -482,7 +482,7 @@ public Option<MapStatus> stop(boolean success) {
if (sorter != null) {
// If sorter is non-null, then this implies that we called stop() in response to an error,
// so we need to clean up memory and spill files created by the sorter
sorter.cleanupAfterError();
sorter.cleanupResources();
}
}
}
Expand Down
Loading

0 comments on commit 2c7e126

Please sign in to comment.