Skip to content

Commit

Permalink
Merge pull request #4 from markhamstra/master-csd
Browse files Browse the repository at this point in the history
Catching up with Apache branch-0.8
  • Loading branch information
markhamstra committed Dec 6, 2013
2 parents 084f7fb + fe1717c commit 9149495
Show file tree
Hide file tree
Showing 63 changed files with 2,066 additions and 1,975 deletions.
1,373 changes: 245 additions & 1,128 deletions CHANGES.txt

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
override def ready(atMost: Duration)(implicit permit: CanAwait): SimpleFutureAction.this.type = {
if (!atMost.isFinite()) {
awaitResult()
} else {
} else jobWaiter.synchronized {
val finishTime = System.currentTimeMillis() + atMost.toMillis
while (!isCompleted) {
val time = System.currentTimeMillis()
Expand Down
21 changes: 18 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,14 @@ class SparkContext(
executorEnvs ++= environment
}

// Set SPARK_USER for user who is running SparkContext.
val sparkUser = Option {
Option(System.getProperty("user.name")).getOrElse(System.getenv("SPARK_USER"))
}.getOrElse {
SparkContext.SPARK_UNKNOWN_USER
}
executorEnvs("SPARK_USER") = sparkUser

// Create and start the scheduler
private[spark] var taskScheduler: TaskScheduler = {
// Regular expression used for local[N] master format
Expand Down Expand Up @@ -275,6 +283,12 @@ class SparkContext(
override protected def childValue(parent: Properties): Properties = new Properties(parent)
}

private[spark] def getLocalProperties(): Properties = localProperties.get()

private[spark] def setLocalProperties(props: Properties) {
localProperties.set(props)
}

def initLocalProperties() {
localProperties.set(new Properties())
}
Expand All @@ -296,7 +310,7 @@ class SparkContext(
/** Set a human readable description of the current job. */
@deprecated("use setJobGroup", "0.8.1")
def setJobDescription(value: String) {
setJobGroup("", value)
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
}

/**
Expand Down Expand Up @@ -799,11 +813,10 @@ class SparkContext(
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite)
val start = System.nanoTime
val result = dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
rdd.doCheckpoint()
result
}

/**
Expand Down Expand Up @@ -985,6 +998,8 @@ object SparkContext {

private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"

private[spark] val SPARK_UNKNOWN_USER = "<unknown>"

implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
def zero(initialValue: Double) = 0.0
Expand Down
31 changes: 27 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,39 @@

package org.apache.spark.deploy

import java.security.PrivilegedExceptionAction

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.SparkException
import org.apache.spark.{SparkContext, SparkException}

/**
* Contains util methods to interact with Hadoop from Spark.
*/
private[spark]
class SparkHadoopUtil {
val conf = newConfiguration()
UserGroupInformation.setConfiguration(conf)

def runAsUser(user: String)(func: () => Unit) {
// if we are already running as the user intended there is no reason to do the doAs. It
// will actually break secure HDFS access as it doesn't fill in the credentials. Also if
// the user is UNKNOWN then we shouldn't be creating a remote unknown user
// (this is actually the path spark on yarn takes) since SPARK_USER is initialized only
// in SparkContext.
val currentUser = Option(System.getProperty("user.name")).
getOrElse(SparkContext.SPARK_UNKNOWN_USER)
if (user != SparkContext.SPARK_UNKNOWN_USER && currentUser != user) {
val ugi = UserGroupInformation.createRemoteUser(user)
ugi.doAs(new PrivilegedExceptionAction[Unit] {
def run: Unit = func()
})
} else {
func()
}
}

/**
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
Expand All @@ -42,9 +65,9 @@ class SparkHadoopUtil {

def isYarnMode(): Boolean = { false }
}

object SparkHadoopUtil {
private val hadoop = {
private val hadoop = {
val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
if (yarnMode) {
try {
Expand All @@ -56,7 +79,7 @@ object SparkHadoopUtil {
new SparkHadoopUtil
}
}

def get: SparkHadoopUtil = {
hadoop
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private[spark] class ExecutorRunner(
// SPARK-698: do not call the run.cmd script, as process.destroy()
// fails to kill a process tree on Windows
Seq(runner) ++ buildJavaOpts() ++ Seq(command.mainClass) ++
command.arguments.map(substituteVariables)
(command.arguments ++ Seq(appId)).map(substituteVariables)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ private[spark] object CoarseGrainedExecutorBackend {

def main(args: Array[String]) {
if (args.length < 4) {
//the reason we allow the last frameworkId argument is to make it easy to kill rogue executors
//the reason we allow the last appid argument is to make it easy to kill rogue executors
System.err.println(
"Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> <cores> " +
"[<appid>]")
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import java.util.concurrent._
import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap

import org.apache.spark.scheduler._
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler._
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -129,6 +130,8 @@ private[spark] class Executor(
// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]

val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)

def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
val tr = new TaskRunner(context, taskId, serializedTask)
runningTasks.put(taskId, tr)
Expand Down Expand Up @@ -176,7 +179,7 @@ private[spark] class Executor(
}
}

override def run() {
override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
val startTime = System.currentTimeMillis()
SparkEnv.set(env)
Thread.currentThread.setContextClassLoader(replClassLoader)
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
* sources in HBase, or S3).
*
* @param sc The SparkContext to associate the RDD with.
* @param broadCastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed
* @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed
* variabe references an instance of JobConf, then that JobConf will be used for the Hadoop job.
* Otherwise, a new JobConf will be created on each slave using the enclosed Configuration.
* @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD
Expand Down Expand Up @@ -132,6 +132,8 @@ class HadoopRDD[K, V](

override def getPartitions: Array[Partition] = {
val jobConf = getJobConf()
// add the credentials here as this can be called before SparkContext initialized
SparkHadoopUtil.get.addCredentials(jobConf)
val inputFormat = getInputFormat(jobConf)
if (inputFormat.isInstanceOf[Configurable]) {
inputFormat.asInstanceOf[Configurable].setConf(jobConf)
Expand Down
10 changes: 4 additions & 6 deletions core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,16 @@ package org.apache.spark.rdd
import org.apache.spark.{Partition, TaskContext}


private[spark]
class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
private[spark] class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T],
f: Iterator[T] => Iterator[U],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false)
extends RDD[U](prev) {

override val partitioner =
if (preservesPartitioning) firstParent[T].partitioner else None
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None

override def getPartitions: Array[Partition] = firstParent[T].partitions

override def compute(split: Partition, context: TaskContext) =
f(firstParent[T].iterator(split, context))
f(context, split.index, firstParent[T].iterator(split, context))
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boo
extends NarrowDependency[T](rdd) {

@transient
val partitions: Array[Partition] = rdd.partitions.zipWithIndex
.filter(s => partitionFilterFunc(s._2))
val partitions: Array[Partition] = rdd.partitions
.filter(s => partitionFilterFunc(s.index)).zipWithIndex
.map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition }

override def getParents(partitionId: Int) = List(partitions(partitionId).index)
override def getParents(partitionId: Int) = {
List(partitions(partitionId).asInstanceOf[PartitionPruningRDDPartition].parentSplit.index)
}
}


Expand Down
Loading

0 comments on commit 9149495

Please sign in to comment.