Skip to content

Commit

Permalink
[SPARK-6428] Added explicit types for all public methods in core.
Browse files Browse the repository at this point in the history
Author: Reynold Xin <rxin@databricks.com>

Closes #5125 from rxin/core-explicit-type and squashes the following commits:

f471415 [Reynold Xin] Revert style checker changes.
81b66e4 [Reynold Xin] Code review feedback.
a7533e3 [Reynold Xin] Mima excludes.
1d795f5 [Reynold Xin] [SPARK-6428] Added explicit types for all public methods in core.
  • Loading branch information
rxin committed Mar 24, 2015
1 parent bfd3ee9 commit 4ce2782
Show file tree
Hide file tree
Showing 142 changed files with 597 additions and 526 deletions.
23 changes: 11 additions & 12 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package org.apache.spark

import java.io.{ObjectInputStream, Serializable}
import java.util.concurrent.atomic.AtomicLong
import java.lang.ThreadLocal

import scala.collection.generic.Growable
import scala.collection.mutable.Map
Expand Down Expand Up @@ -109,7 +107,7 @@ class Accumulable[R, T] (
* The typical use of this method is to directly mutate the local value, eg., to add
* an element to a Set.
*/
def localValue = value_
def localValue: R = value_

/**
* Set the accumulator's value; only allowed on master.
Expand Down Expand Up @@ -137,7 +135,7 @@ class Accumulable[R, T] (
Accumulators.register(this, false)
}

override def toString = if (value_ == null) "null" else value_.toString
override def toString: String = if (value_ == null) "null" else value_.toString
}

/**
Expand Down Expand Up @@ -257,22 +255,22 @@ object AccumulatorParam {

implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
def zero(initialValue: Double) = 0.0
def zero(initialValue: Double): Double = 0.0
}

implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
def zero(initialValue: Int) = 0
def zero(initialValue: Int): Int = 0
}

implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
def addInPlace(t1: Long, t2: Long) = t1 + t2
def zero(initialValue: Long) = 0L
def addInPlace(t1: Long, t2: Long): Long = t1 + t2
def zero(initialValue: Long): Long = 0L
}

implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
def addInPlace(t1: Float, t2: Float) = t1 + t2
def zero(initialValue: Float) = 0f
def addInPlace(t1: Float, t2: Float): Float = t1 + t2
def zero(initialValue: Float): Float = 0f
}

// TODO: Add AccumulatorParams for other types, e.g. lists and strings
Expand Down Expand Up @@ -351,6 +349,7 @@ private[spark] object Accumulators extends Logging {
}
}

def stringifyPartialValue(partialValue: Any) = "%s".format(partialValue)
def stringifyValue(value: Any) = "%s".format(value)
def stringifyPartialValue(partialValue: Any): String = "%s".format(partialValue)

This comment has been minimized.

Copy link
@rxin

rxin May 20, 2015

Author Contributor

Sorry?


def stringifyValue(value: Any): String = "%s".format(value)
}
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class ShuffleDependency[K, V, C](
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] {

override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]]
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

val shuffleId: Int = _rdd.context.newShuffleId()

Expand All @@ -91,7 +91,7 @@ class ShuffleDependency[K, V, C](
*/
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int) = List(partitionId)
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}


Expand All @@ -107,7 +107,7 @@ class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {

override def getParents(partitionId: Int) = {
override def getParents(partitionId: Int): List[Int] = {
if (partitionId >= outStart && partitionId < outStart + length) {
List(partitionId - outStart + inStart)
} else {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
}
}

def jobIds = Seq(jobWaiter.jobId)
def jobIds: Seq[Int] = Seq(jobWaiter.jobId)
}


Expand Down Expand Up @@ -276,7 +276,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {

override def value: Option[Try[T]] = p.future.value

def jobIds = jobs
def jobIds: Seq[Int] = jobs

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule
super.preStart()
}

override def receiveWithLogging = {
override def receiveWithLogging: PartialFunction[Any, Unit] = {
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, taskMetrics, blockManagerId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
extends Actor with ActorLogReceive with Logging {
val maxAkkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)

override def receiveWithLogging = {
override def receiveWithLogging: PartialFunction[Any, Unit] = {
case GetMapOutputStatuses(shuffleId: Int) =>
val hostPort = sender.path.address.hostPort
logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ object Partitioner {
* produce an unexpected or incorrect result.
*/
class HashPartitioner(partitions: Int) extends Partitioner {
def numPartitions = partitions
def numPartitions: Int = partitions

def getPartition(key: Any): Int = key match {
case null => 0
Expand Down Expand Up @@ -154,7 +154,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
}
}

def numPartitions = rangeBounds.length + 1
def numPartitions: Int = rangeBounds.length + 1

private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import org.apache.spark.util.Utils

@DeveloperApi
class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable {
def value = t
override def toString = t.toString

def value: T = t

override def toString: String = t.toString

private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
out.defaultWriteObject()
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
}

/** Set multiple parameters together */
def setAll(settings: Traversable[(String, String)]) = {
def setAll(settings: Traversable[(String, String)]): SparkConf = {
this.settings.putAll(settings.toMap.asJava)
this
}
Expand Down
48 changes: 26 additions & 22 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -986,15 +986,15 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
union(Seq(first) ++ rest)

/** Get an RDD that has no partitions or elements. */
def emptyRDD[T: ClassTag] = new EmptyRDD[T](this)
def emptyRDD[T: ClassTag]: EmptyRDD[T] = new EmptyRDD[T](this)

// Methods for creating shared variables

/**
* Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
* values to using the `+=` method. Only the driver can access the accumulator's `value`.
*/
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) =
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T] =
{
val acc = new Accumulator(initialValue, param)
cleaner.foreach(_.registerAccumulatorForCleanup(acc))
Expand All @@ -1006,7 +1006,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* in the Spark UI. Tasks can "add" values to the accumulator using the `+=` method. Only the
* driver can access the accumulator's `value`.
*/
def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]) = {
def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T])
: Accumulator[T] = {
val acc = new Accumulator(initialValue, param, Some(name))
cleaner.foreach(_.registerAccumulatorForCleanup(acc))
acc
Expand All @@ -1018,7 +1019,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @tparam R accumulator result type
* @tparam T type that can be added to the accumulator
*/
def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T]) = {
def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T])
: Accumulable[R, T] = {
val acc = new Accumulable(initialValue, param)
cleaner.foreach(_.registerAccumulatorForCleanup(acc))
acc
Expand All @@ -1031,7 +1033,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @tparam R accumulator result type
* @tparam T type that can be added to the accumulator
*/
def accumulable[R, T](initialValue: R, name: String)(implicit param: AccumulableParam[R, T]) = {
def accumulable[R, T](initialValue: R, name: String)(implicit param: AccumulableParam[R, T])
: Accumulable[R, T] = {
val acc = new Accumulable(initialValue, param, Some(name))
cleaner.foreach(_.registerAccumulatorForCleanup(acc))
acc
Expand Down Expand Up @@ -1209,7 +1212,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId)

/** The version of Spark on which this application is running. */
def version = SPARK_VERSION
def version: String = SPARK_VERSION

/**
* Return a map from the slave to the max memory available for caching and the remaining
Expand Down Expand Up @@ -1659,7 +1662,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}

def getCheckpointDir = checkpointDir
def getCheckpointDir: Option[String] = checkpointDir

/** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
def defaultParallelism: Int = {
Expand Down Expand Up @@ -1900,28 +1903,28 @@ object SparkContext extends Logging {
"backward compatibility.", "1.3.0")
object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
def zero(initialValue: Double) = 0.0
def zero(initialValue: Double): Double = 0.0
}

@deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
"backward compatibility.", "1.3.0")
object IntAccumulatorParam extends AccumulatorParam[Int] {
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
def zero(initialValue: Int) = 0
def zero(initialValue: Int): Int = 0
}

@deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
"backward compatibility.", "1.3.0")
object LongAccumulatorParam extends AccumulatorParam[Long] {
def addInPlace(t1: Long, t2: Long) = t1 + t2
def zero(initialValue: Long) = 0L
def addInPlace(t1: Long, t2: Long): Long = t1 + t2
def zero(initialValue: Long): Long = 0L
}

@deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
"backward compatibility.", "1.3.0")
object FloatAccumulatorParam extends AccumulatorParam[Float] {
def addInPlace(t1: Float, t2: Float) = t1 + t2
def zero(initialValue: Float) = 0f
def addInPlace(t1: Float, t2: Float): Float = t1 + t2
def zero(initialValue: Float): Float = 0f
}

// The following deprecated functions have already been moved to `object RDD` to
Expand All @@ -1931,18 +1934,18 @@ object SparkContext extends Logging {
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = {
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] =
RDD.rddToPairRDDFunctions(rdd)
}

@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = RDD.rddToAsyncRDDActions(rdd)
def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]): AsyncRDDActions[T] =
RDD.rddToAsyncRDDActions(rdd)

@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
rdd: RDD[(K, V)]) = {
rdd: RDD[(K, V)]): SequenceFileRDDFunctions[K, V] = {
val kf = implicitly[K => Writable]
val vf = implicitly[V => Writable]
// Set the Writable class to null and `SequenceFileRDDFunctions` will use Reflection to get it
Expand All @@ -1954,16 +1957,17 @@ object SparkContext extends Logging {
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
rdd: RDD[(K, V)]) =
rdd: RDD[(K, V)]): OrderedRDDFunctions[K, V, (K, V)] =
RDD.rddToOrderedRDDFunctions(rdd)

@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = RDD.doubleRDDToDoubleRDDFunctions(rdd)
def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]): DoubleRDDFunctions =
RDD.doubleRDDToDoubleRDDFunctions(rdd)

@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) =
def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]): DoubleRDDFunctions =
RDD.numericRDDToDoubleRDDFunctions(rdd)

// The following deprecated functions have already been moved to `object WritableFactory` to
Expand Down Expand Up @@ -2134,7 +2138,7 @@ object SparkContext extends Logging {
(backend, scheduler)

case LOCAL_N_REGEX(threads) =>
def localCpuCount = Runtime.getRuntime.availableProcessors()
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
// local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
val threadCount = if (threads == "*") localCpuCount else threads.toInt
if (threadCount <= 0) {
Expand All @@ -2146,7 +2150,7 @@ object SparkContext extends Logging {
(backend, scheduler)

case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
def localCpuCount = Runtime.getRuntime.availableProcessors()
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
// local[*, M] means the number of cores on the computer with M failures
// local[N, M] means exactly N threads with M failures
val threadCount = if (threads == "*") localCpuCount else threads.toInt
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/TaskState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ private[spark] object TaskState extends Enumeration {

type TaskState = Value

def isFailed(state: TaskState) = (LOST == state) || (FAILED == state)
def isFailed(state: TaskState): Boolean = (LOST == state) || (FAILED == state)

def isFinished(state: TaskState) = FINISHED_STATES.contains(state)
def isFinished(state: TaskState): Boolean = FINISHED_STATES.contains(state)

def toMesos(state: TaskState): MesosTaskState = state match {
case LAUNCHING => MesosTaskState.TASK_STARTING
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ private[spark] object TestUtils {

private class JavaSourceFromString(val name: String, val code: String)
extends SimpleJavaFileObject(createURI(name), SOURCE) {
override def getCharContent(ignoreEncodingErrors: Boolean) = code
override def getCharContent(ignoreEncodingErrors: Boolean): String = code
}

/** Creates a compiled class with the given name. Class file will be placed in destDir. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] =
wrapRDD(rdd.subtract(other, p))

override def toString = rdd.toString
override def toString: String = rdd.toString

/** Assign a name to this RDD */
def setName(name: String): JavaRDD[T] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class JavaSparkContext(val sc: SparkContext)

private[spark] val env = sc.env

def statusTracker = new JavaSparkStatusTracker(sc)
def statusTracker: JavaSparkStatusTracker = new JavaSparkStatusTracker(sc)

def isLocal: java.lang.Boolean = sc.isLocal

Expand Down
Loading

0 comments on commit 4ce2782

Please sign in to comment.