Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into handle-configs-…
Browse files Browse the repository at this point in the history
…bash
  • Loading branch information
andrewor14 committed Aug 9, 2014
2 parents 2732ac0 + ec79063 commit aeb79c7
Show file tree
Hide file tree
Showing 34 changed files with 662 additions and 221 deletions.
27 changes: 18 additions & 9 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,9 @@ object SparkEnv extends Logging {
conf.set("spark.driver.port", boundPort.toString)
}

// Create an instance of the class named by the given Java system property, or by
// defaultClassName if the property is not set, and return it as a T
def instantiateClass[T](propertyName: String, defaultClassName: String): T = {
val name = conf.get(propertyName, defaultClassName)
val cls = Class.forName(name, true, Utils.getContextOrSparkClassLoader)
// Create an instance of the class with the given name, possibly initializing it with our conf
def instantiateClass[T](className: String): T = {
val cls = Class.forName(className, true, Utils.getContextOrSparkClassLoader)
// Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
// SparkConf, then one taking no arguments
try {
Expand All @@ -178,11 +176,17 @@ object SparkEnv extends Logging {
}
}

val serializer = instantiateClass[Serializer](
// Create an instance of the class named by the given SparkConf property, or defaultClassName
// if the property is not set, possibly initializing it with our conf
def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
instantiateClass[T](conf.get(propertyName, defaultClassName))
}

val serializer = instantiateClassFromConf[Serializer](
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
logDebug(s"Using serializer: ${serializer.getClass}")

val closureSerializer = instantiateClass[Serializer](
val closureSerializer = instantiateClassFromConf[Serializer](
"spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer")

def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
Expand Down Expand Up @@ -246,8 +250,13 @@ object SparkEnv extends Logging {
"."
}

val shuffleManager = instantiateClass[ShuffleManager](
"spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager")
// Let the user specify short names for shuffle managers
val shortShuffleMgrNames = Map(
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

val shuffleMemoryManager = new ShuffleMemoryManager(conf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.spark.broadcast

import java.io.{ByteArrayInputStream, ObjectInputStream, ObjectOutputStream}
import java.io.{ByteArrayOutputStream, ByteArrayInputStream, InputStream,
ObjectInputStream, ObjectOutputStream, OutputStream}

import scala.reflect.ClassTag
import scala.util.Random

import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
import org.apache.spark.util.Utils

/**
* A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like
Expand Down Expand Up @@ -214,11 +215,15 @@ private[broadcast] object TorrentBroadcast extends Logging {
private lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
private var initialized = false
private var conf: SparkConf = null
private var compress: Boolean = false
private var compressionCodec: CompressionCodec = null

def initialize(_isDriver: Boolean, conf: SparkConf) {
TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests
synchronized {
if (!initialized) {
compress = conf.getBoolean("spark.broadcast.compress", true)
compressionCodec = CompressionCodec.createCodec(conf)
initialized = true
}
}
Expand All @@ -228,8 +233,13 @@ private[broadcast] object TorrentBroadcast extends Logging {
initialized = false
}

def blockifyObject[T](obj: T): TorrentInfo = {
val byteArray = Utils.serialize[T](obj)
def blockifyObject[T: ClassTag](obj: T): TorrentInfo = {
val bos = new ByteArrayOutputStream()
val out: OutputStream = if (compress) compressionCodec.compressedOutputStream(bos) else bos
val ser = SparkEnv.get.serializer.newInstance()
val serOut = ser.serializeStream(out)
serOut.writeObject[T](obj).close()
val byteArray = bos.toByteArray
val bais = new ByteArrayInputStream(byteArray)

var blockNum = byteArray.length / BLOCK_SIZE
Expand All @@ -255,7 +265,7 @@ private[broadcast] object TorrentBroadcast extends Logging {
info
}

def unBlockifyObject[T](
def unBlockifyObject[T: ClassTag](
arrayOfBlocks: Array[TorrentBlock],
totalBytes: Int,
totalBlocks: Int): T = {
Expand All @@ -264,7 +274,16 @@ private[broadcast] object TorrentBroadcast extends Logging {
System.arraycopy(arrayOfBlocks(i).byteArray, 0, retByteArray,
i * BLOCK_SIZE, arrayOfBlocks(i).byteArray.length)
}
Utils.deserialize[T](retByteArray, Thread.currentThread.getContextClassLoader)

val in: InputStream = {
val arrIn = new ByteArrayInputStream(retByteArray)
if (compress) compressionCodec.compressedInputStream(arrIn) else arrIn
}
val ser = SparkEnv.get.serializer.newInstance()
val serIn = ser.deserializeStream(in)
val obj = serIn.readObject[T]()
serIn.close()
obj
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {

/** Fill in values by parsing user options. */
private def parseOpts(opts: Seq[String]): Unit = {
var inSparkOpts = true
val EQ_SEPARATED_OPT="""(--[^=]+)=(.+)""".r

// Delineates parsing of Spark options from parsing of user options.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ private[spark] class Executor(
for (taskRunner <- runningTasks.values()) {
if (!taskRunner.attemptedTask.isEmpty) {
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
metrics.updateShuffleReadMetrics
tasksMetrics += ((taskRunner.taskId, metrics))
}
}
Expand Down
55 changes: 42 additions & 13 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.executor

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.storage.{BlockId, BlockStatus}

Expand Down Expand Up @@ -81,12 +83,27 @@ class TaskMetrics extends Serializable {
var inputMetrics: Option[InputMetrics] = None

/**
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here.
* This includes read metrics aggregated over all the task's shuffle dependencies.
*/
private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None

def shuffleReadMetrics = _shuffleReadMetrics

/**
* This should only be used when recreating TaskMetrics, not when updating read metrics in
* executors.
*/
private[spark] def setShuffleReadMetrics(shuffleReadMetrics: Option[ShuffleReadMetrics]) {
_shuffleReadMetrics = shuffleReadMetrics
}

/**
* ShuffleReadMetrics per dependency for collecting independently while task is in progress.
*/
@transient private lazy val depsShuffleReadMetrics: ArrayBuffer[ShuffleReadMetrics] =
new ArrayBuffer[ShuffleReadMetrics]()

/**
* If this task writes to shuffle output, metrics on the written shuffle data will be collected
* here
Expand All @@ -98,19 +115,31 @@ class TaskMetrics extends Serializable {
*/
var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None

/** Adds the given ShuffleReadMetrics to any existing shuffle metrics for this task. */
def updateShuffleReadMetrics(newMetrics: ShuffleReadMetrics) = synchronized {
_shuffleReadMetrics match {
case Some(existingMetrics) =>
existingMetrics.shuffleFinishTime = math.max(
existingMetrics.shuffleFinishTime, newMetrics.shuffleFinishTime)
existingMetrics.fetchWaitTime += newMetrics.fetchWaitTime
existingMetrics.localBlocksFetched += newMetrics.localBlocksFetched
existingMetrics.remoteBlocksFetched += newMetrics.remoteBlocksFetched
existingMetrics.remoteBytesRead += newMetrics.remoteBytesRead
case None =>
_shuffleReadMetrics = Some(newMetrics)
/**
* A task may have multiple shuffle readers for multiple dependencies. To avoid synchronization
* issues from readers in different threads, in-progress tasks use a ShuffleReadMetrics for each
* dependency, and merge these metrics before reporting them to the driver. This method returns
* a ShuffleReadMetrics for a dependency and registers it for merging later.
*/
private [spark] def createShuffleReadMetricsForDependency(): ShuffleReadMetrics = synchronized {
val readMetrics = new ShuffleReadMetrics()
depsShuffleReadMetrics += readMetrics
readMetrics
}

/**
* Aggregates shuffle read metrics for all registered dependencies into shuffleReadMetrics.
*/
private[spark] def updateShuffleReadMetrics() = synchronized {
val merged = new ShuffleReadMetrics()
for (depMetrics <- depsShuffleReadMetrics) {
merged.fetchWaitTime += depMetrics.fetchWaitTime
merged.localBlocksFetched += depMetrics.localBlocksFetched
merged.remoteBlocksFetched += depMetrics.remoteBlocksFetched
merged.remoteBytesRead += depMetrics.remoteBytesRead
merged.shuffleFinishTime = math.max(merged.shuffleFinishTime, depMetrics.shuffleFinishTime)
}
_shuffleReadMetrics = Some(merged)
}
}

Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1233,6 +1233,11 @@ abstract class RDD[T: ClassTag](
dependencies.head.rdd.asInstanceOf[RDD[U]]
}

/** Returns the jth parent RDD: e.g. rdd.parent[T](0) is equivalent to rdd.firstParent[T] */
protected[spark] def parent[U: ClassTag](j: Int) = {
dependencies(j).rdd.asInstanceOf[RDD[U]]
}

/** The [[org.apache.spark.SparkContext]] that this RDD was created on. */
def context = sc

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
shuffleId: Int,
reduceId: Int,
context: TaskContext,
serializer: Serializer)
serializer: Serializer,
shuffleMetrics: ShuffleReadMetrics)
: Iterator[T] =
{
logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))
Expand Down Expand Up @@ -73,17 +74,11 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
}
}

val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer)
val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer, shuffleMetrics)
val itr = blockFetcherItr.flatMap(unpackBlock)

val completionIter = CompletionIterator[T, Iterator[T]](itr, {
val shuffleMetrics = new ShuffleReadMetrics
shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks
shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks
context.taskMetrics.updateShuffleReadMetrics(shuffleMetrics)
context.taskMetrics.updateShuffleReadMetrics()
})

new InterruptibleIterator[T](context, completionIter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ private[spark] class HashShuffleReader[K, C](

/** Read the combined key-values for this reduce task */
override def read(): Iterator[Product2[K, C]] = {
val readMetrics = context.taskMetrics.createShuffleReadMetricsForDependency()
val ser = Serializer.getSerializer(dep.serializer)
val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)
val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser,
readMetrics)

val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
Expand All @@ -58,7 +60,7 @@ private[spark] class HashShuffleReader[K, C](
// Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
// the ExternalSorter won't spill to disk.
val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
sorter.write(aggregatedIter)
sorter.insertAll(aggregatedIter)
context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled
sorter.iterator
Expand Down
Loading

0 comments on commit aeb79c7

Please sign in to comment.