Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A set of shuffle map output related changes #587

Merged
merged 11 commits into from
May 4, 2013
Merged
12 changes: 9 additions & 3 deletions core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package spark

import executor.{ShuffleReadMetrics, TaskMetrics}
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap

import spark.executor.{ShuffleReadMetrics, TaskMetrics}
import spark.serializer.Serializer
import spark.storage.BlockManagerId
import spark.util.CompletionIterator


private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
override def fetch[K, V](shuffleId: Int, reduceId: Int, metrics: TaskMetrics) = {

override def fetch[K, V](
shuffleId: Int, reduceId: Int, metrics: TaskMetrics, serializer: Serializer) = {

logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))
val blockManager = SparkEnv.get.blockManager

Expand Down Expand Up @@ -48,8 +53,9 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
}
}

val blockFetcherItr = blockManager.getMultiple(blocksByAddress)
val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer)
val itr = blockFetcherItr.flatMap(unpackBlock)

CompletionIterator[(K,V), Iterator[(K,V)]](itr, {
val shuffleMetrics = new ShuffleReadMetrics
shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
* @param shuffleId the shuffle id
* @param rdd the parent RDD
* @param partitioner partitioner used to partition the shuffle output
* @param serializerClass class name of the serializer to use
*/
class ShuffleDependency[K, V](
@transient rdd: RDD[(K, V)],
val partitioner: Partitioner)
val partitioner: Partitioner,
val serializerClass: String = null)
extends Dependency(rdd) {

val shuffleId: Int = rdd.context.newShuffleId()
Expand Down
11 changes: 6 additions & 5 deletions core/src/main/scala/spark/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true): RDD[(K, C)] = {
mapSideCombine: Boolean = true,
serializerClass: String = null): RDD[(K, C)] = {
if (getKeyClass().isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
Expand All @@ -67,13 +68,13 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
self.mapPartitions(aggregator.combineValuesByKey(_), true)
} else if (mapSideCombine) {
val mapSideCombined = self.mapPartitions(aggregator.combineValuesByKey(_), true)
val partitioned = new ShuffledRDD[K, C](mapSideCombined, partitioner)
val partitioned = new ShuffledRDD[K, C](mapSideCombined, partitioner, serializerClass)
partitioned.mapPartitions(aggregator.combineCombinersByKey(_), true)
} else {
// Don't apply map-side combiner.
// A sanity check to make sure mergeCombiners is not defined.
assert(mergeCombiners == null)
val values = new ShuffledRDD[K, V](self, partitioner)
val values = new ShuffledRDD[K, V](self, partitioner, serializerClass)
values.mapPartitions(aggregator.combineValuesByKey(_), true)
}
}
Expand Down Expand Up @@ -469,7 +470,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](

/**
* Return an RDD with the pairs from `this` whose keys are not in `other`.
*
*
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
Expand Down Expand Up @@ -645,7 +646,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* Return an RDD with the keys of each tuple.
*/
def keys: RDD[K] = self.map(_._1)

/**
* Return an RDD with the values of each tuple.
*/
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/spark/ShuffleFetcher.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package spark

import executor.TaskMetrics
import spark.executor.TaskMetrics
import spark.serializer.Serializer


private[spark] abstract class ShuffleFetcher {
/**
* Fetch the shuffle outputs for a given ShuffleDependency.
* @return An iterator over the elements of the fetched shuffle outputs.
*/
def fetch[K, V](shuffleId: Int, reduceId: Int, metrics: TaskMetrics) : Iterator[(K,V)]
def fetch[K, V](shuffleId: Int, reduceId: Int, metrics: TaskMetrics,
serializer: Serializer = SparkEnv.get.serializerManager.default): Iterator[(K,V)]

/** Stop the fetcher */
def stop() {}
Expand Down
20 changes: 13 additions & 7 deletions core/src/main/scala/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package spark
import akka.actor.{Actor, ActorRef, Props, ActorSystemImpl, ActorSystem}
import akka.remote.RemoteActorRefProvider

import serializer.Serializer
import spark.broadcast.BroadcastManager
import spark.storage.BlockManager
import spark.storage.BlockManagerMaster
import spark.network.ConnectionManager
import spark.serializer.{Serializer, SerializerManager}
import spark.util.AkkaUtils


/**
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
* including the serializer, Akka actor system, block manager, map output tracker, etc. Currently
Expand All @@ -20,6 +21,7 @@ import spark.util.AkkaUtils
class SparkEnv (
val executorId: String,
val actorSystem: ActorSystem,
val serializerManager: SerializerManager,
val serializer: Serializer,
val closureSerializer: Serializer,
val cacheManager: CacheManager,
Expand Down Expand Up @@ -91,8 +93,14 @@ object SparkEnv extends Logging {
Class.forName(name, true, classLoader).newInstance().asInstanceOf[T]
}

val serializer = instantiateClass[Serializer]("spark.serializer", "spark.JavaSerializer")

val serializerManager = new SerializerManager

val serializer = serializerManager.setDefault(
System.getProperty("spark.serializer", "spark.JavaSerializer"))

val closureSerializer = serializerManager.get(
System.getProperty("spark.closure.serializer", "spark.JavaSerializer"))

def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
if (isDriver) {
logInfo("Registering " + name)
Expand All @@ -116,9 +124,6 @@ object SparkEnv extends Logging {

val broadcastManager = new BroadcastManager(isDriver)

val closureSerializer = instantiateClass[Serializer](
"spark.closure.serializer", "spark.JavaSerializer")

val cacheManager = new CacheManager(blockManager)

// Have to assign trackerActor after initialization as MapOutputTrackerActor
Expand Down Expand Up @@ -153,6 +158,7 @@ object SparkEnv extends Logging {
new SparkEnv(
executorId,
actorSystem,
serializerManager,
serializer,
closureSerializer,
cacheManager,
Expand All @@ -164,5 +170,5 @@ object SparkEnv extends Logging {
httpFileServer,
sparkFilesDir)
}

}
12 changes: 7 additions & 5 deletions core/src/main/scala/spark/rdd/CoGroupedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ private[spark] class CoGroupAggregator
class CoGroupedRDD[K](
@transient var rdds: Seq[RDD[(K, _)]],
part: Partitioner,
val mapSideCombine: Boolean = true)
val mapSideCombine: Boolean = true,
val serializerClass: String = null)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {

private val aggr = new CoGroupAggregator
Expand All @@ -68,9 +69,9 @@ class CoGroupedRDD[K](
logInfo("Adding shuffle dependency with " + rdd)
if (mapSideCombine) {
val mapSideCombinedRDD = rdd.mapPartitions(aggr.combineValuesByKey(_), true)
new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part)
new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part, serializerClass)
} else {
new ShuffleDependency[Any, Any](rdd.asInstanceOf[RDD[(Any, Any)]], part)
new ShuffleDependency[Any, Any](rdd.asInstanceOf[RDD[(Any, Any)]], part, serializerClass)
}
}
}
Expand Down Expand Up @@ -112,6 +113,7 @@ class CoGroupedRDD[K](
}
}

val ser = SparkEnv.get.serializerManager.get(serializerClass)
for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
// Read them from the parent
Expand All @@ -124,12 +126,12 @@ class CoGroupedRDD[K](
val fetcher = SparkEnv.get.shuffleFetcher
if (mapSideCombine) {
// With map side combine on, for each key, the shuffle fetcher returns a list of values.
fetcher.fetch[K, Seq[Any]](shuffleId, split.index, context.taskMetrics).foreach {
fetcher.fetch[K, Seq[Any]](shuffleId, split.index, context.taskMetrics, ser).foreach {
case (key, values) => getSeq(key)(depNum) ++= values
}
} else {
// With map side combine off, for each key the shuffle fetcher returns a single value.
fetcher.fetch[K, Any](shuffleId, split.index, context.taskMetrics).foreach {
fetcher.fetch[K, Any](shuffleId, split.index, context.taskMetrics, ser).foreach {
case (key, value) => getSeq(key)(depNum) += value
}
}
Expand Down
10 changes: 7 additions & 3 deletions core/src/main/scala/spark/rdd/ShuffledRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package spark.rdd
import spark.{Partitioner, RDD, SparkEnv, ShuffleDependency, Partition, TaskContext}
import spark.SparkContext._


private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
override val index = idx
override def hashCode(): Int = idx
Expand All @@ -12,13 +13,15 @@ private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
* The resulting RDD from a shuffle (e.g. repartitioning of data).
* @param prev the parent RDD.
* @param part the partitioner used to partition the RDD
* @param serializerClass class name of the serializer to use.
* @tparam K the key class.
* @tparam V the value class.
*/
class ShuffledRDD[K, V](
@transient prev: RDD[(K, V)],
part: Partitioner)
extends RDD[(K, V)](prev.context, List(new ShuffleDependency(prev, part))) {
part: Partitioner,
serializerClass: String = null)
extends RDD[(K, V)](prev.context, List(new ShuffleDependency(prev, part, serializerClass))) {

override val partitioner = Some(part)

Expand All @@ -28,6 +31,7 @@ class ShuffledRDD[K, V](

override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = {
val shuffledId = dependencies.head.asInstanceOf[ShuffleDependency[K, V]].shuffleId
SparkEnv.get.shuffleFetcher.fetch[K, V](shuffledId, split.index, context.taskMetrics)
SparkEnv.get.shuffleFetcher.fetch[K, V](shuffledId, split.index, context.taskMetrics,
SparkEnv.get.serializerManager.get(serializerClass))
}
}
18 changes: 13 additions & 5 deletions core/src/main/scala/spark/rdd/SubtractedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import spark.SparkEnv
import spark.ShuffleDependency
import spark.OneToOneDependency


/**
* An optimized version of cogroup for set difference/subtraction.
*
Expand All @@ -31,7 +32,9 @@ import spark.OneToOneDependency
private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassManifest](
@transient var rdd1: RDD[(K, V)],
@transient var rdd2: RDD[(K, W)],
part: Partitioner) extends RDD[(K, V)](rdd1.context, Nil) {
part: Partitioner,
val serializerClass: String = null)
extends RDD[(K, V)](rdd1.context, Nil) {

override def getDependencies: Seq[Dependency[_]] = {
Seq(rdd1, rdd2).map { rdd =>
Expand All @@ -40,7 +43,7 @@ private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassM
new OneToOneDependency(rdd)
} else {
logInfo("Adding shuffle dependency with " + rdd)
new ShuffleDependency(rdd.asInstanceOf[RDD[(K, Any)]], part)
new ShuffleDependency(rdd.asInstanceOf[RDD[(K, Any)]], part, serializerClass)
}
}
}
Expand All @@ -65,6 +68,7 @@ private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassM

override def compute(p: Partition, context: TaskContext): Iterator[(K, V)] = {
val partition = p.asInstanceOf[CoGroupPartition]
val serializer = SparkEnv.get.serializerManager.get(serializerClass)
val map = new JHashMap[K, ArrayBuffer[V]]
def getSeq(k: K): ArrayBuffer[V] = {
val seq = map.get(k)
Expand All @@ -77,12 +81,16 @@ private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassM
}
}
def integrate(dep: CoGroupSplitDep, op: ((K, V)) => Unit) = dep match {
case NarrowCoGroupSplitDep(rdd, _, itsSplit) =>
case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
for (t <- rdd.iterator(itsSplit, context))
op(t.asInstanceOf[(K, V)])
case ShuffleCoGroupSplitDep(shuffleId) =>
for (t <- SparkEnv.get.shuffleFetcher.fetch(shuffleId, partition.index, context.taskMetrics))
}
case ShuffleCoGroupSplitDep(shuffleId) => {
val iter = SparkEnv.get.shuffleFetcher.fetch(shuffleId, partition.index,
context.taskMetrics, serializer)
for (t <- iter)
op(t.asInstanceOf[(K, V)])
}
}
// the first dep is rdd1; add all values to the map
integrate(partition.deps(0), t => getSeq(t._1) += t._2)
Expand Down
Loading