Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark into zipWith…
Browse files Browse the repository at this point in the history
…Index
  • Loading branch information
witgo committed Apr 23, 2014
2 parents daa8f84 + 39f85e0 commit 7bf4d06
Show file tree
Hide file tree
Showing 60 changed files with 506 additions and 213 deletions.
2 changes: 2 additions & 0 deletions conf/spark-env.sh.template
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

# Options read when launching programs locally with
# ./bin/run-example or ./bin/spark-submit
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
# - SPARK_PUBLIC_DNS, to set the public dns name of the driver program
# - SPARK_CLASSPATH, default classpath entries to append
Expand All @@ -17,6 +18,7 @@
# - MESOS_NATIVE_LIBRARY, to point to your libmesos.so if you use Mesos

# Options read in YARN client mode
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - SPARK_EXECUTOR_INSTANCES, Number of workers to start (Default: 2)
# - SPARK_EXECUTOR_CORES, Number of cores for the workers (Default: 1).
# - SPARK_EXECUTOR_MEMORY, Memory per Worker (e.g. 1000M, 2G) (Default: 1G)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class TaskContext(
val attemptId: Long,
val runningLocally: Boolean = false,
@volatile var interrupted: Boolean = false,
private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty()
private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty
) extends Serializable {

@deprecated("use partitionId", "0.8.1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ object SparkSubmit {
childArgs ++= appArgs.childArgs
} else if (clusterManager == YARN) {
for (arg <- appArgs.childArgs) {
childArgs += ("--args", arg)
childArgs += ("--arg", arg)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
if (args.length == 0) printUsageAndExit(-1)
if (primaryResource == null) SparkSubmit.printErrorAndExit("Must specify a primary resource")
if (mainClass == null) SparkSubmit.printErrorAndExit("Must specify a main class with --class")

if (master.startsWith("yarn")) {
val hasHadoopEnv = sys.env.contains("HADOOP_CONF_DIR") || sys.env.contains("YARN_CONF_DIR")
val testing = sys.env.contains("SPARK_TESTING")
if (!hasHadoopEnv && !testing) {
throw new Exception(s"When running with master '$master' " +
"either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.")
}
}
}

override def toString = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class TaskMetrics extends Serializable {
}

private[spark] object TaskMetrics {
def empty(): TaskMetrics = new TaskMetrics
def empty: TaskMetrics = new TaskMetrics
}


Expand Down
25 changes: 25 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.rdd
import java.util.Random

import scala.collection.Map
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.{classTag, ClassTag}

Expand Down Expand Up @@ -229,6 +230,30 @@ abstract class RDD[T: ClassTag](
}
}

/**
* Return the ancestors of the given RDD that are related to it only through a sequence of
* narrow dependencies. This traverses the given RDD's dependency tree using DFS, but maintains
* no ordering on the RDDs returned.
*/
private[spark] def getNarrowAncestors: Seq[RDD[_]] = {
val ancestors = new mutable.HashSet[RDD[_]]

def visit(rdd: RDD[_]) {
val narrowDependencies = rdd.dependencies.filter(_.isInstanceOf[NarrowDependency[_]])
val narrowParents = narrowDependencies.map(_.rdd)
val narrowParentsNotVisited = narrowParents.filterNot(ancestors.contains)
narrowParentsNotVisited.foreach { parent =>
ancestors.add(parent)
visit(parent)
}
}

visit(this)

// In case there is a cycle, do not include the root itself
ancestors.filterNot(_ == this).toSeq
}

/**
* Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val taskInfo = taskEnd.taskInfo
var taskStatus = "TASK_TYPE=%s".format(taskEnd.taskType)
val taskMetrics = if (taskEnd.taskMetrics != null) taskEnd.taskMetrics else TaskMetrics.empty()
val taskMetrics = if (taskEnd.taskMetrics != null) taskEnd.taskMetrics else TaskMetrics.empty
taskEnd.reason match {
case Success => taskStatus += " STATUS=SUCCESS"
recordTaskMetrics(taskEnd.stageId, taskStatus, taskInfo, taskMetrics)
Expand Down
19 changes: 12 additions & 7 deletions core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.storage.RDDInfo
* Stores information about a stage to pass from the scheduler to SparkListeners.
*/
@DeveloperApi
class StageInfo(val stageId: Int, val name: String, val numTasks: Int, val rddInfo: RDDInfo) {
class StageInfo(val stageId: Int, val name: String, val numTasks: Int, val rddInfos: Seq[RDDInfo]) {
/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
var submissionTime: Option[Long] = None
/** Time when all tasks in the stage completed or when the stage was cancelled. */
Expand All @@ -41,12 +41,17 @@ class StageInfo(val stageId: Int, val name: String, val numTasks: Int, val rddIn
}
}

private[spark]
object StageInfo {
private[spark] object StageInfo {
/**
* Construct a StageInfo from a Stage.
*
* Each Stage is associated with one or many RDDs, with the boundary of a Stage marked by
* shuffle dependencies. Therefore, all ancestor RDDs related to this Stage's RDD through a
* sequence of narrow dependencies should also be associated with this Stage.
*/
def fromStage(stage: Stage): StageInfo = {
val rdd = stage.rdd
val rddName = Option(rdd.name).getOrElse(rdd.id.toString)
val rddInfo = new RDDInfo(rdd.id, rddName, rdd.partitions.size, rdd.getStorageLevel)
new StageInfo(stage.id, stage.name, stage.numTasks, rddInfo)
val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos
new StageInfo(stage.id, stage.name, stage.numTasks, rddInfos)
}
}
55 changes: 55 additions & 0 deletions core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.storage

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils

@DeveloperApi
class RDDInfo(
val id: Int,
val name: String,
val numPartitions: Int,
val storageLevel: StorageLevel)
extends Ordered[RDDInfo] {

var numCachedPartitions = 0
var memSize = 0L
var diskSize = 0L
var tachyonSize = 0L

override def toString = {
import Utils.bytesToString
("RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; " +
"TachyonSize: %s; DiskSize: %s").format(
name, id, storageLevel.toString, numCachedPartitions, numPartitions,
bytesToString(memSize), bytesToString(tachyonSize), bytesToString(diskSize))
}

override def compare(that: RDDInfo) = {
this.id - that.id
}
}

private[spark] object RDDInfo {
def fromRdd(rdd: RDD[_]): RDDInfo = {
val rddName = Option(rdd.name).getOrElse(rdd.id.toString)
new RDDInfo(rdd.id, rddName, rdd.partitions.size, rdd.getStorageLevel)
}
}
44 changes: 7 additions & 37 deletions core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,60 +21,30 @@ import scala.collection.Map
import scala.collection.mutable

import org.apache.spark.SparkContext
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils

private[spark]
class StorageStatus(
/** Storage information for each BlockManager. */
private[spark] class StorageStatus(
val blockManagerId: BlockManagerId,
val maxMem: Long,
val blocks: mutable.Map[BlockId, BlockStatus] = mutable.Map.empty) {

def memUsed() = blocks.values.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
def memUsed = blocks.values.map(_.memSize).reduceOption(_ + _).getOrElse(0L)

def memUsedByRDD(rddId: Int) =
rddBlocks.filterKeys(_.rddId == rddId).values.map(_.memSize).reduceOption(_ + _).getOrElse(0L)

def diskUsed() = blocks.values.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
def diskUsed = blocks.values.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)

def diskUsedByRDD(rddId: Int) =
rddBlocks.filterKeys(_.rddId == rddId).values.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)

def memRemaining : Long = maxMem - memUsed()
def memRemaining: Long = maxMem - memUsed

def rddBlocks = blocks.collect { case (rdd: RDDBlockId, status) => (rdd, status) }
}

@DeveloperApi
private[spark]
class RDDInfo(
val id: Int,
val name: String,
val numPartitions: Int,
val storageLevel: StorageLevel)
extends Ordered[RDDInfo] {

var numCachedPartitions = 0
var memSize = 0L
var diskSize = 0L
var tachyonSize = 0L

override def toString = {
import Utils.bytesToString
("RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s;" +
"TachyonSize: %s; DiskSize: %s").format(
name, id, storageLevel.toString, numCachedPartitions, numPartitions,
bytesToString(memSize), bytesToString(tachyonSize), bytesToString(diskSize))
}

override def compare(that: RDDInfo) = {
this.id - that.id
}
}

/* Helper methods for storage-related objects */
private[spark]
object StorageUtils {
/** Helper methods for storage-related objects. */
private[spark] object StorageUtils {

/**
* Returns basic information of all RDDs persisted in the given SparkContext. This does not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
def render(request: HttpServletRequest): Seq[Node] = {
val storageStatusList = listener.storageStatusList
val maxMem = storageStatusList.map(_.maxMem).fold(0L)(_ + _)
val memUsed = storageStatusList.map(_.memUsed()).fold(0L)(_ + _)
val memUsed = storageStatusList.map(_.memUsed).fold(0L)(_ + _)
val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_ + _)
val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId)
val execInfoSorted = execInfo.sortBy(_.getOrElse("Executor ID", ""))
Expand Down Expand Up @@ -106,9 +106,9 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
val execId = status.blockManagerId.executorId
val hostPort = status.blockManagerId.hostPort
val rddBlocks = status.blocks.size
val memUsed = status.memUsed()
val memUsed = status.memUsed
val maxMem = status.maxMem
val diskUsed = status.diskUsed()
val diskUsed = status.diskUsed
val activeTasks = listener.executorToTasksActive.getOrElse(execId, 0)
val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ private[ui] class StorageListener(storageStatusListener: StorageStatusListener)
}

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = synchronized {
val rddInfo = stageSubmitted.stageInfo.rddInfo
_rddInfoMap.getOrElseUpdate(rddInfo.id, rddInfo)
val rddInfos = stageSubmitted.stageInfo.rddInfos
rddInfos.foreach { info => _rddInfoMap.getOrElseUpdate(info.id, info) }
}

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
Expand Down
22 changes: 12 additions & 10 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private[spark] object JsonProtocol {
* -------------------------------------------------------------------- */

def stageInfoToJson(stageInfo: StageInfo): JValue = {
val rddInfo = rddInfoToJson(stageInfo.rddInfo)
val rddInfo = JArray(stageInfo.rddInfos.map(rddInfoToJson).toList)
val submissionTime = stageInfo.submissionTime.map(JInt(_)).getOrElse(JNothing)
val completionTime = stageInfo.completionTime.map(JInt(_)).getOrElse(JNothing)
val failureReason = stageInfo.failureReason.map(JString(_)).getOrElse(JNothing)
Expand Down Expand Up @@ -208,7 +208,8 @@ private[spark] object JsonProtocol {
taskMetrics.shuffleReadMetrics.map(shuffleReadMetricsToJson).getOrElse(JNothing)
val shuffleWriteMetrics =
taskMetrics.shuffleWriteMetrics.map(shuffleWriteMetricsToJson).getOrElse(JNothing)
val updatedBlocks = taskMetrics.updatedBlocks.map { blocks =>
val updatedBlocks =
taskMetrics.updatedBlocks.map { blocks =>
JArray(blocks.toList.map { case (id, status) =>
("Block ID" -> id.toString) ~
("Status" -> blockStatusToJson(status))
Expand Down Expand Up @@ -467,13 +468,13 @@ private[spark] object JsonProtocol {
val stageId = (json \ "Stage ID").extract[Int]
val stageName = (json \ "Stage Name").extract[String]
val numTasks = (json \ "Number of Tasks").extract[Int]
val rddInfo = rddInfoFromJson(json \ "RDD Info")
val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson)
val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long])
val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long])
val failureReason = Utils.jsonOption(json \ "Failure Reason").map(_.extract[String])
val emittedTaskSizeWarning = (json \ "Emitted Task Size Warning").extract[Boolean]

val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfo)
val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfos)
stageInfo.submissionTime = submissionTime
stageInfo.completionTime = completionTime
stageInfo.failureReason = failureReason
Expand Down Expand Up @@ -518,13 +519,14 @@ private[spark] object JsonProtocol {
Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson)
metrics.shuffleWriteMetrics =
Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson)
metrics.updatedBlocks = Utils.jsonOption(json \ "Updated Blocks").map { value =>
value.extract[List[JValue]].map { block =>
val id = BlockId((block \ "Block ID").extract[String])
val status = blockStatusFromJson(block \ "Status")
(id, status)
metrics.updatedBlocks =
Utils.jsonOption(json \ "Updated Blocks").map { value =>
value.extract[List[JValue]].map { block =>
val id = BlockId((block \ "Block ID").extract[String])
val status = blockStatusFromJson(block \ "Status")
(id, status)
}
}
}
metrics
}

Expand Down
6 changes: 3 additions & 3 deletions core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar

whenExecuting(blockManager) {
val context = new TaskContext(0, 0, 0, interrupted = false, runningLocally = false,
taskMetrics = TaskMetrics.empty())
taskMetrics = TaskMetrics.empty)
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
assert(value.toList === List(1, 2, 3, 4))
}
Expand All @@ -73,7 +73,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar

whenExecuting(blockManager) {
val context = new TaskContext(0, 0, 0, interrupted = false, runningLocally = false,
taskMetrics = TaskMetrics.empty())
taskMetrics = TaskMetrics.empty)
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
assert(value.toList === List(5, 6, 7))
}
Expand All @@ -87,7 +87,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar

whenExecuting(blockManager) {
val context = new TaskContext(0, 0, 0, runningLocally = true, interrupted = false,
taskMetrics = TaskMetrics.empty())
taskMetrics = TaskMetrics.empty)
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
assert(value.toList === List(1, 2, 3, 4))
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext {
val hadoopPart1 = generateFakeHadoopPartition()
val pipedRdd = new PipedRDD(nums, "printenv " + varName)
val tContext = new TaskContext(0, 0, 0, interrupted = false, runningLocally = false,
taskMetrics = TaskMetrics.empty())
taskMetrics = TaskMetrics.empty)
val rddIter = pipedRdd.compute(hadoopPart1, tContext)
val arr = rddIter.toArray
assert(arr(0) == "/some/path")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
childArgsStr should include ("--executor-memory 5g")
childArgsStr should include ("--driver-memory 4g")
childArgsStr should include ("--executor-cores 5")
childArgsStr should include ("--args arg1 --args arg2")
childArgsStr should include ("--arg arg1 --arg arg2")
childArgsStr should include ("--queue thequeue")
childArgsStr should include ("--files file1.txt,file2.txt")
childArgsStr should include ("--archives archive1.txt,archive2.txt")
Expand Down
Loading

0 comments on commit 7bf4d06

Please sign in to comment.