Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-6263
Browse files Browse the repository at this point in the history
  • Loading branch information
Lewuathe committed Jun 29, 2015
2 parents 6084e9c + 660c6ce commit 3fc27e7
Show file tree
Hide file tree
Showing 121 changed files with 2,064 additions and 1,458 deletions.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,6 @@ The following components are provided under the MIT License. See project link fo
(MIT License) SLF4J LOG4J-12 Binding (org.slf4j:slf4j-log4j12:1.7.5 - http://www.slf4j.org)
(MIT License) pyrolite (org.spark-project:pyrolite:2.0.1 - http://pythonhosted.org/Pyro4/)
(MIT License) scopt (com.github.scopt:scopt_2.10:3.2.0 - https://github.com/scopt/scopt)
(The MIT License) Mockito (org.mockito:mockito-all:1.8.5 - http://www.mockito.org)
(The MIT License) Mockito (org.mockito:mockito-core:1.8.5 - http://www.mockito.org)
(MIT License) jquery (https://jquery.org/license/)
(MIT License) AnchorJS (https://github.com/bryanbraun/anchorjs)
2 changes: 1 addition & 1 deletion R/pkg/R/client.R
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ generateSparkSubmitArgs <- function(args, sparkHome, jars, sparkSubmitOpts, pack
}

launchBackend <- function(args, sparkHome, jars, sparkSubmitOpts, packages) {
sparkSubmitBin <- determineSparkSubmitBin()
sparkSubmitBinName <- determineSparkSubmitBin()
if (sparkHome != "") {
sparkSubmitBin <- file.path(sparkHome, "bin", sparkSubmitBinName)
} else {
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ sparkR.init <- function(
sparkHome = sparkHome,
jars = jars,
sparkSubmitOpts = Sys.getenv("SPARKR_SUBMIT_ARGS", "sparkr-shell"),
sparkPackages = sparkPackages)
packages = sparkPackages)
# wait atmost 100 seconds for JVM to launch
wait <- 0.1
for (i in 1:25) {
Expand Down
Binary file added R/pkg/inst/test_support/sparktestjar_2.10-1.0.jar
Binary file not shown.
32 changes: 32 additions & 0 deletions R/pkg/inst/tests/jarTest.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
library(SparkR)

sc <- sparkR.init()

helloTest <- SparkR:::callJStatic("sparkR.test.hello",
"helloWorld",
"Dave")

basicFunction <- SparkR:::callJStatic("sparkR.test.basicFunction",
"addStuff",
2L,
2L)

sparkR.stop()
output <- c(helloTest, basicFunction)
writeLines(output)
37 changes: 37 additions & 0 deletions R/pkg/inst/tests/test_includeJAR.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
context("include an external JAR in SparkContext")

runScript <- function() {
sparkHome <- Sys.getenv("SPARK_HOME")
jarPath <- paste("--jars",
shQuote(file.path(sparkHome, "R/lib/SparkR/test_support/sparktestjar_2.10-1.0.jar")))
scriptPath <- file.path(sparkHome, "R/lib/SparkR/tests/jarTest.R")
submitPath <- file.path(sparkHome, "bin/spark-submit")
res <- system2(command = submitPath,
args = c(jarPath, scriptPath),
stdout = TRUE)
tail(res, 2)
}

test_that("sparkJars tag in SparkContext", {
testOutput <- runScript()
helloTest <- testOutput[1]
expect_true(helloTest == "Hello, Dave")
basicFunction <- testOutput[2]
expect_true(basicFunction == 4L)
})
15 changes: 12 additions & 3 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ library(testthat)

context("SparkSQL functions")

# Utility function for easily checking the values of a StructField
checkStructField <- function(actual, expectedName, expectedType, expectedNullable) {
expect_equal(class(actual), "structField")
expect_equal(actual$name(), expectedName)
expect_equal(actual$dataType.toString(), expectedType)
expect_equal(actual$nullable(), expectedNullable)
}

# Tests for SparkSQL functions in SparkR

sc <- sparkR.init()
Expand Down Expand Up @@ -52,9 +60,10 @@ test_that("infer types", {
list(type = 'array', elementType = "integer", containsNull = TRUE))
expect_equal(infer_type(list(1L, 2L)),
list(type = 'array', elementType = "integer", containsNull = TRUE))
expect_equal(infer_type(list(a = 1L, b = "2")),
structType(structField(x = "a", type = "integer", nullable = TRUE),
structField(x = "b", type = "string", nullable = TRUE)))
testStruct <- infer_type(list(a = 1L, b = "2"))
expect_true(class(testStruct) == "structType")
checkStructField(testStruct$fields()[[1]], "a", "IntegerType", TRUE)
checkStructField(testStruct$fields()[[2]], "b", "StringType", TRUE)
e <- new.env()
assign("a", 1L, envir = e)
expect_equal(infer_type(e),
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
1 change: 0 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

// Post init
_taskScheduler.postStartHook()
_env.metricsSystem.registerSource(new DAGSchedulerSource(dagScheduler))
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
_executorAllocationManager.foreach { e =>
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
Expand Down
17 changes: 16 additions & 1 deletion core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,21 @@ private[r] class RBackendHandler(server: RBackend)
ctx.close()
}

// Looks up a class given a class name. This function first checks the
// current class loader and if a class is not found, it looks up the class
// in the context class loader. Address [SPARK-5185]
def getStaticClass(objId: String): Class[_] = {
try {
val clsCurrent = Class.forName(objId)
clsCurrent
} catch {
// Use contextLoader if we can't find the JAR in the system class loader
case e: ClassNotFoundException =>
val clsContext = Class.forName(objId, true, Thread.currentThread().getContextClassLoader)
clsContext
}
}

def handleMethodCall(
isStatic: Boolean,
objId: String,
Expand All @@ -98,7 +113,7 @@ private[r] class RBackendHandler(server: RBackend)
var obj: Object = null
try {
val cls = if (isStatic) {
Class.forName(objId)
getStaticClass(objId)
} else {
JVMObjectTracker.get(objId) match {
case None => throw new IllegalArgumentException("Object not found " + objId)
Expand Down
55 changes: 38 additions & 17 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class DAGScheduler(

def this(sc: SparkContext) = this(sc, sc.taskScheduler)

private[scheduler] val metricsSource: DAGSchedulerSource = new DAGSchedulerSource(this)

private[scheduler] val nextJobId = new AtomicInteger(0)
private[scheduler] def numTotalJobs: Int = nextJobId.get()
private val nextStageId = new AtomicInteger(0)
Expand Down Expand Up @@ -905,22 +907,29 @@ class DAGScheduler(
return
}

val tasks: Seq[Task[_]] = stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = getPreferredLocs(stage.rdd, id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, taskBinary, part, locs)
}
val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = getPreferredLocs(stage.rdd, id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, taskBinary, part, locs)
}

case stage: ResultStage =>
val job = stage.resultOfJob.get
partitionsToCompute.map { id =>
val p: Int = job.partitions(id)
val part = stage.rdd.partitions(p)
val locs = getPreferredLocs(stage.rdd, p)
new ResultTask(stage.id, taskBinary, part, locs, id)
}
case stage: ResultStage =>
val job = stage.resultOfJob.get
partitionsToCompute.map { id =>
val p: Int = job.partitions(id)
val part = stage.rdd.partitions(p)
val locs = getPreferredLocs(stage.rdd, p)
new ResultTask(stage.id, taskBinary, part, locs, id)
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}")
runningStages -= stage
return
}

if (tasks.size > 0) {
Expand Down Expand Up @@ -1438,17 +1447,29 @@ class DAGScheduler(
taskScheduler.stop()
}

// Start the event thread at the end of the constructor
// Start the event thread and register the metrics source at the end of the constructor
env.metricsSystem.registerSource(metricsSource)
eventProcessLoop.start()
}

private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging {

private[this] val timer = dagScheduler.metricsSource.messageProcessingTimer

/**
* The main event loop of the DAG scheduler.
*/
override def onReceive(event: DAGSchedulerEvent): Unit = event match {
override def onReceive(event: DAGSchedulerEvent): Unit = {
val timerContext = timer.time()
try {
doOnReceive(event)
} finally {
timerContext.stop()
}
}

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
listener, properties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package org.apache.spark.scheduler

import com.codahale.metrics.{Gauge, MetricRegistry}
import com.codahale.metrics.{Gauge, MetricRegistry, Timer}

import org.apache.spark.metrics.source.Source

private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
private[scheduler] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
extends Source {
override val metricRegistry = new MetricRegistry()
override val sourceName = "DAGScheduler"
Expand All @@ -45,4 +45,8 @@ private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
metricRegistry.register(MetricRegistry.name("job", "activeJobs"), new Gauge[Int] {
override def getValue: Int = dagScheduler.activeJobs.size
})

/** Timer that tracks the time to process messages in the DAGScheduler's event loop */
val messageProcessingTimer: Timer =
metricRegistry.timer(MetricRegistry.name("messageProcessingTime"))
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.network.nio.{GetBlock, GotBlock, PutBlock}
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._
import org.apache.spark.util.BoundedPriorityQueue
import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf}
import org.apache.spark.util.collection.CompactBuffer

/**
Expand Down Expand Up @@ -94,8 +94,10 @@ class KryoSerializer(conf: SparkConf)
// For results returned by asJavaIterable. See JavaIterableWrapperSerializer.
kryo.register(JavaIterableWrapperSerializer.wrapperClass, new JavaIterableWrapperSerializer)

// Allow sending SerializableWritable
// Allow sending classes with custom Java serializers
kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
kryo.register(classOf[SerializableConfiguration], new KryoJavaSerializer())
kryo.register(classOf[SerializableJobConf], new KryoJavaSerializer())
kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,37 @@ class DAGSchedulerSuite
assert(sc.parallelize(1 to 10, 2).first() === 1)
}

test("getPartitions exceptions should not crash DAGScheduler and SparkContext (SPARK-8606)") {
val e1 = intercept[DAGSchedulerSuiteDummyException] {
val rdd = new MyRDD(sc, 2, Nil) {
override def getPartitions: Array[Partition] = {
throw new DAGSchedulerSuiteDummyException
}
}
rdd.reduceByKey(_ + _, 1).count()
}

// Make sure we can still run local commands as well as cluster commands.
assert(sc.parallelize(1 to 10, 2).count() === 10)
assert(sc.parallelize(1 to 10, 2).first() === 1)
}

test("getPreferredLocations errors should not crash DAGScheduler and SparkContext (SPARK-8606)") {
val e1 = intercept[SparkException] {
val rdd = new MyRDD(sc, 2, Nil) {
override def getPreferredLocations(split: Partition): Seq[String] = {
throw new DAGSchedulerSuiteDummyException
}
}
rdd.count()
}
assert(e1.getMessage.contains(classOf[DAGSchedulerSuiteDummyException].getName))

// Make sure we can still run local commands as well as cluster commands.
assert(sc.parallelize(1 to 10, 2).count() === 10)
assert(sc.parallelize(1 to 10, 2).first() === 1)
}

test("accumulator not calculated for resubmitted result stage") {
// just for register
val accum = new Accumulator[Int](0, AccumulatorParam.IntAccumulatorParam)
Expand Down
3 changes: 2 additions & 1 deletion dev/lint-python
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

SCRIPT_DIR="$( cd "$( dirname "$0" )" && pwd )"
SPARK_ROOT_DIR="$(dirname "$SCRIPT_DIR")"
PATHS_TO_CHECK="./python/pyspark/ ./ec2/spark_ec2.py ./examples/src/main/python/"
PATHS_TO_CHECK="./python/pyspark/ ./ec2/spark_ec2.py ./examples/src/main/python/ ./dev/sparktestsupport"
PATHS_TO_CHECK="$PATHS_TO_CHECK ./dev/run-tests.py ./python/run-tests.py"
PYTHON_LINT_REPORT_PATH="$SPARK_ROOT_DIR/dev/python-lint-report.txt"

cd "$SPARK_ROOT_DIR"
Expand Down
4 changes: 3 additions & 1 deletion dev/merge_spark_pr.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,8 @@ def main():

if __name__ == "__main__":
import doctest
doctest.testmod()
(failure_count, test_count) = doctest.testmod()
if failure_count:
exit(-1)

main()
Loading

0 comments on commit 3fc27e7

Please sign in to comment.