From 037755cb0d90df0dfc4afc95e066ce7695fd1326 Mon Sep 17 00:00:00 2001 From: Patrick Wendell <pwendell@gmail.com> Date: Mon, 7 Apr 2014 18:44:45 -0700 Subject: [PATCH] Some changes after working with andrew or --- .../org/apache/spark/TaskEndReason.scala | 27 ++++++++++++------- .../apache/spark/executor/TaskMetrics.scala | 4 +-- .../apache/spark/scheduler/JobResult.scala | 7 ++--- .../apache/spark/scheduler/StageInfo.scala | 2 +- .../org/apache/spark/scheduler/TaskInfo.scala | 12 ++++----- .../apache/spark/scheduler/TaskLocality.scala | 3 ++- .../apache/spark/storage/StorageUtils.scala | 3 ++- .../spark/util/collection/AppendOnlyMap.scala | 6 ++--- .../collection/ExternalAppendOnlyMap.scala | 4 +-- .../spark/util/collection/OpenHashMap.scala | 2 +- 10 files changed, 41 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index f1a753b6ab8a9..626f1260cff04 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -21,25 +21,30 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId /** + * <span class="developer badge">Developer API</span> * Various possible reasons why a task ended. The low-level TaskScheduler is supposed to retry * tasks several times for "ephemeral" failures, and only report back failures that require some * old stages to be resubmitted, such as shuffle map fetch failures. */ -private[spark] sealed trait TaskEndReason -private[spark] case object Success extends TaskEndReason +sealed trait TaskEndReason -private[spark] +/** <span class="developer badge">Developer API</span> */ +case object Success extends TaskEndReason + +/** <span class="developer badge">Developer API</span> */ case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it -private[spark] case class FetchFailed( +/** <span class="developer badge">Developer API</span> */ +case class FetchFailed( bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int) extends TaskEndReason -private[spark] case class ExceptionFailure( +/** <span class="developer badge">Developer API</span> */ +case class ExceptionFailure( className: String, description: String, stackTrace: Array[StackTraceElement], @@ -47,21 +52,25 @@ private[spark] case class ExceptionFailure( extends TaskEndReason /** + * <span class="developer badge">Developer API</span> * The task finished successfully, but the result was lost from the executor's block manager before * it was fetched. */ -private[spark] case object TaskResultLost extends TaskEndReason +case object TaskResultLost extends TaskEndReason -private[spark] case object TaskKilled extends TaskEndReason +/** <span class="developer badge">Developer API</span> */ +case object TaskKilled extends TaskEndReason /** + * <span class="developer badge">Developer API</span> * The task failed because the executor that it was running on was lost. This may happen because * the task crashed the JVM. */ -private[spark] case object ExecutorLostFailure extends TaskEndReason +case object ExecutorLostFailure extends TaskEndReason /** + * <span class="developer badge">Developer API</span> * We don't know why the task ended -- for example, because of a ClassNotFound exception when * deserializing the task result. */ -private[spark] case object UnknownReason extends TaskEndReason +case object UnknownReason extends TaskEndReason diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index e844149ea1ec8..af8ff39313187 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -81,8 +81,8 @@ class TaskMetrics extends Serializable { var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None } -object TaskMetrics { - private[spark] def empty(): TaskMetrics = new TaskMetrics +private[spark] object TaskMetrics { + def empty(): TaskMetrics = new TaskMetrics } diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala b/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala index 3cf4e3077e4a4..1fb6196718445 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala @@ -18,11 +18,12 @@ package org.apache.spark.scheduler /** + * <span class="developer badge">Developer API</span> * A result of a job in the DAGScheduler. */ -private[spark] sealed trait JobResult +sealed trait JobResult -private[spark] case object JobSucceeded extends JobResult +case object JobSucceeded extends JobResult // A failed stage ID of -1 means there is not a particular stage that caused the failure -private[spark] case class JobFailed(exception: Exception, failedStageId: Int) extends JobResult +case class JobFailed(exception: Exception, failedStageId: Int) extends JobResult diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala index 8115a7ed7896d..419cd96376c04 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala @@ -20,9 +20,9 @@ package org.apache.spark.scheduler import org.apache.spark.storage.RDDInfo /** + * <span class="developer badge">Developer API</span> * Stores information about a stage to pass from the scheduler to SparkListeners. */ -private[spark] class StageInfo(val stageId: Int, val name: String, val numTasks: Int, val rddInfo: RDDInfo) { /** When this stage was submitted from the DAGScheduler to a TaskScheduler. */ var submissionTime: Option[Long] = None diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala index 6183b125def99..515755a93c6e6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala @@ -18,9 +18,9 @@ package org.apache.spark.scheduler /** + * <span class="developer badge">Developer API</span> * Information about a running task attempt inside a TaskSet. */ -private[spark] class TaskInfo( val taskId: Long, val index: Int, @@ -46,15 +46,15 @@ class TaskInfo( var serializedSize: Int = 0 - def markGettingResult(time: Long = System.currentTimeMillis) { + private[spark] def markGettingResult(time: Long = System.currentTimeMillis) { gettingResultTime = time } - def markSuccessful(time: Long = System.currentTimeMillis) { + private[spark] def markSuccessful(time: Long = System.currentTimeMillis) { finishTime = time } - def markFailed(time: Long = System.currentTimeMillis) { + private[spark] def markFailed(time: Long = System.currentTimeMillis) { finishTime = time failed = true } @@ -83,11 +83,11 @@ class TaskInfo( def duration: Long = { if (!finished) { - throw new UnsupportedOperationException("duration() called on unfinished tasks") + throw new UnsupportedOperationException("duration() called on unfinished task") } else { finishTime - launchTime } } - def timeRunning(currentTime: Long): Long = currentTime - launchTime + private[spark] def timeRunning(currentTime: Long): Long = currentTime - launchTime } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala index 308edb12edd5c..d2d05a0b81bba 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala @@ -17,7 +17,8 @@ package org.apache.spark.scheduler -private[spark] object TaskLocality extends Enumeration { +/** <span class="developer badge">Developer API</span> */ +object TaskLocality extends Enumeration { // Process local is expected to be used ONLY within TaskSetManager for now. val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index 6153dfe0b7e13..63be6917d5131 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -47,7 +47,8 @@ class StorageStatus( } } -private[spark] + +/** <span class="developer badge">Developer API</span> */ class RDDInfo(val id: Int, val name: String, val numPartitions: Int, val storageLevel: StorageLevel) extends Ordered[RDDInfo] { diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala index b8c852b4ff5c7..6ee1d96bbc894 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala @@ -20,6 +20,7 @@ package org.apache.spark.util.collection import java.util.{Arrays, Comparator} /** + * <span class="developer badge">Developer API</span> * A simple open hash table optimized for the append-only use case, where keys * are never removed, but the value for each key may be changed. * @@ -29,9 +30,8 @@ import java.util.{Arrays, Comparator} * * TODO: Cache the hash values of each key? java.util.HashMap does that. */ -private[spark] -class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, - V)] with Serializable { +class AppendOnlyMap[K, V](initialCapacity: Int = 64) + extends Iterable[(K, V)] with Serializable { require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements") require(initialCapacity >= 1, "Invalid initial capacity") diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index caa06d5b445b4..f255b258889c0 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -31,6 +31,7 @@ import org.apache.spark.serializer.Serializer import org.apache.spark.storage.{BlockId, BlockManager} /** + * <span class="developer badge">Developer API</span> * An append-only map that spills sorted content to disk when there is insufficient space for it * to grow. * @@ -55,8 +56,7 @@ import org.apache.spark.storage.{BlockId, BlockManager} * `spark.shuffle.safetyFraction` specifies an additional margin of safety as a fraction of * this threshold, in case map size estimation is not sufficiently accurate. */ - -private[spark] class ExternalAppendOnlyMap[K, V, C]( +class ExternalAppendOnlyMap[K, V, C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala index c26f23d50024a..959fe44f5132a 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala @@ -20,13 +20,13 @@ package org.apache.spark.util.collection import scala.reflect.ClassTag /** + * <span class="developer badge">Developer API</span> * A fast hash map implementation for nullable keys. This hash map supports insertions and updates, * but not deletions. This map is about 5X faster than java.util.HashMap, while using much less * space overhead. * * Under the hood, it uses our OpenHashSet implementation. */ -private[spark] class OpenHashMap[K >: Null : ClassTag, @specialized(Long, Int, Double) V: ClassTag]( initialCapacity: Int) extends Iterable[(K, V)]