@@ -69,31 +70,33 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
{UIUtils.formatDuration(stageData.executorRunTime)}
- {if (hasInput) {
+ {if (stageData.hasInput) {
}}
- {if (hasOutput) {
+ {if (stageData.hasOutput) {
}}
- {if (hasShuffleRead) {
+ {if (stageData.hasShuffleRead) {
}}
- {if (hasShuffleWrite) {
+ {if (stageData.hasShuffleWrite) {
}}
- {if (hasBytesSpilled) {
+ {if (stageData.hasBytesSpilled) {
- {if (hasShuffleRead) {
+ {if (stageData.hasShuffleRead) {
@@ -174,25 +177,32 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME),
("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME)) ++
{if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++
- {if (hasInput) Seq(("Input", "")) else Nil} ++
- {if (hasOutput) Seq(("Output", "")) else Nil} ++
- {if (hasShuffleRead) {
+ {if (stageData.hasInput) Seq(("Input Size / Records", "")) else Nil} ++
+ {if (stageData.hasOutput) Seq(("Output Size / Records", "")) else Nil} ++
+ {if (stageData.hasShuffleRead) {
Seq(("Shuffle Read Blocked Time", TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME),
- ("Shuffle Read", ""))
+ ("Shuffle Read Size / Records", ""))
+ } else {
+ Nil
+ }} ++
+ {if (stageData.hasShuffleWrite) {
+ Seq(("Write Time", ""), ("Shuffle Write Size / Records", ""))
+ } else {
+ Nil
+ }} ++
+ {if (stageData.hasBytesSpilled) {
+ Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", ""))
} else {
Nil
}} ++
- {if (hasShuffleWrite) Seq(("Write Time", ""), ("Shuffle Write", "")) else Nil} ++
- {if (hasBytesSpilled) Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", ""))
- else Nil} ++
Seq(("Errors", ""))
val unzipped = taskHeadersAndCssClasses.unzip
val taskTable = UIUtils.listingTable(
unzipped._1,
- taskRow(hasAccumulators, hasInput, hasOutput, hasShuffleRead, hasShuffleWrite,
- hasBytesSpilled),
+ taskRow(hasAccumulators, stageData.hasInput, stageData.hasOutput,
+ stageData.hasShuffleRead, stageData.hasShuffleWrite, stageData.hasBytesSpilled),
tasks,
headerClasses = unzipped._2)
// Excludes tasks which failed and have incomplete metrics
@@ -203,8 +213,11 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
None
}
else {
+ def getDistributionQuantiles(data: Seq[Double]): IndexedSeq[Double] =
+ Distribution(data).get.getQuantiles()
+
def getFormattedTimeQuantiles(times: Seq[Double]): Seq[Node] = {
- Distribution(times).get.getQuantiles().map { millis =>
+ getDistributionQuantiles(times).map { millis =>
{UIUtils.formatDuration(millis.toLong)} |
}
}
@@ -273,17 +286,36 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
getFormattedTimeQuantiles(schedulerDelays)
def getFormattedSizeQuantiles(data: Seq[Double]) =
- Distribution(data).get.getQuantiles().map(d => {Utils.bytesToString(d.toLong)} | )
+ getDistributionQuantiles(data).map(d => {Utils.bytesToString(d.toLong)} | )
+
+ def getFormattedSizeQuantilesWithRecords(data: Seq[Double], records: Seq[Double]) = {
+ val recordDist = getDistributionQuantiles(records).iterator
+ getDistributionQuantiles(data).map(d =>
+ {s"${Utils.bytesToString(d.toLong)} / ${recordDist.next().toLong}"} |
+ )
+ }
val inputSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.inputMetrics.map(_.bytesRead).getOrElse(0L).toDouble
}
- val inputQuantiles = Input | +: getFormattedSizeQuantiles(inputSizes)
+
+ val inputRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
+ metrics.get.inputMetrics.map(_.recordsRead).getOrElse(0L).toDouble
+ }
+
+ val inputQuantiles = Input Size / Records | +:
+ getFormattedSizeQuantilesWithRecords(inputSizes, inputRecords)
val outputSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.outputMetrics.map(_.bytesWritten).getOrElse(0L).toDouble
}
- val outputQuantiles = Output | +: getFormattedSizeQuantiles(outputSizes)
+
+ val outputRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
+ metrics.get.outputMetrics.map(_.recordsWritten).getOrElse(0L).toDouble
+ }
+
+ val outputQuantiles = Output Size / Records | +:
+ getFormattedSizeQuantilesWithRecords(outputSizes, outputRecords)
val shuffleReadBlockedTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleReadMetrics.map(_.fetchWaitTime).getOrElse(0L).toDouble
@@ -294,14 +326,24 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val shuffleReadSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
}
- val shuffleReadQuantiles = Shuffle Read (Remote) | +:
- getFormattedSizeQuantiles(shuffleReadSizes)
+
+ val shuffleReadRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
+ metrics.get.shuffleReadMetrics.map(_.recordsRead).getOrElse(0L).toDouble
+ }
+
+ val shuffleReadQuantiles = Shuffle Read Size / Records (Remote) | +:
+ getFormattedSizeQuantilesWithRecords(shuffleReadSizes, shuffleReadRecords)
val shuffleWriteSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble
}
- val shuffleWriteQuantiles = Shuffle Write | +:
- getFormattedSizeQuantiles(shuffleWriteSizes)
+
+ val shuffleWriteRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
+ metrics.get.shuffleWriteMetrics.map(_.shuffleRecordsWritten).getOrElse(0L).toDouble
+ }
+
+ val shuffleWriteQuantiles = Shuffle Write Size / Records | +:
+ getFormattedSizeQuantilesWithRecords(shuffleWriteSizes, shuffleWriteRecords)
val memoryBytesSpilledSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.memoryBytesSpilled.toDouble
@@ -326,9 +368,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
{serializationQuantiles}
,
{gettingResultQuantiles}
,
- if (hasInput) {inputQuantiles}
else Nil,
- if (hasOutput) {outputQuantiles}
else Nil,
- if (hasShuffleRead) {
+ if (stageData.hasInput) {inputQuantiles}
else Nil,
+ if (stageData.hasOutput) {outputQuantiles}
else Nil,
+ if (stageData.hasShuffleRead) {
{shuffleReadBlockedQuantiles}
@@ -336,9 +378,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
} else {
Nil
},
- if (hasShuffleWrite) {shuffleWriteQuantiles}
else Nil,
- if (hasBytesSpilled) {memoryBytesSpilledQuantiles}
else Nil,
- if (hasBytesSpilled) {diskBytesSpilledQuantiles}
else Nil)
+ if (stageData.hasShuffleWrite) {shuffleWriteQuantiles}
else Nil,
+ if (stageData.hasBytesSpilled) {memoryBytesSpilledQuantiles}
else Nil,
+ if (stageData.hasBytesSpilled) {diskBytesSpilledQuantiles}
else Nil)
val quantileHeaders = Seq("Metric", "Min", "25th percentile",
"Median", "75th percentile", "Max")
@@ -397,26 +439,32 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val inputReadable = maybeInput
.map(m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase()})")
.getOrElse("")
+ val inputRecords = maybeInput.map(_.recordsRead.toString).getOrElse("")
val maybeOutput = metrics.flatMap(_.outputMetrics)
val outputSortable = maybeOutput.map(_.bytesWritten.toString).getOrElse("")
val outputReadable = maybeOutput
.map(m => s"${Utils.bytesToString(m.bytesWritten)}")
.getOrElse("")
+ val outputRecords = maybeOutput.map(_.recordsWritten.toString).getOrElse("")
- val maybeShuffleReadBlockedTime = metrics.flatMap(_.shuffleReadMetrics).map(_.fetchWaitTime)
- val shuffleReadBlockedTimeSortable = maybeShuffleReadBlockedTime.map(_.toString).getOrElse("")
+ val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics)
+ val shuffleReadBlockedTimeSortable = maybeShuffleRead
+ .map(_.fetchWaitTime.toString).getOrElse("")
val shuffleReadBlockedTimeReadable =
- maybeShuffleReadBlockedTime.map(ms => UIUtils.formatDuration(ms)).getOrElse("")
+ maybeShuffleRead.map(ms => UIUtils.formatDuration(ms.fetchWaitTime)).getOrElse("")
- val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead)
- val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("")
- val shuffleReadReadable = maybeShuffleRead.map(Utils.bytesToString).getOrElse("")
+ val shuffleReadSortable = maybeShuffleRead.map(_.remoteBytesRead.toString).getOrElse("")
+ val shuffleReadReadable = maybeShuffleRead
+ .map(m => s"${Utils.bytesToString(m.remoteBytesRead)}").getOrElse("")
+ val shuffleReadRecords = maybeShuffleRead.map(_.recordsRead.toString).getOrElse("")
- val maybeShuffleWrite =
- metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten)
- val shuffleWriteSortable = maybeShuffleWrite.map(_.toString).getOrElse("")
- val shuffleWriteReadable = maybeShuffleWrite.map(Utils.bytesToString).getOrElse("")
+ val maybeShuffleWrite = metrics.flatMap(_.shuffleWriteMetrics)
+ val shuffleWriteSortable = maybeShuffleWrite.map(_.shuffleBytesWritten.toString).getOrElse("")
+ val shuffleWriteReadable = maybeShuffleWrite
+ .map(m => s"${Utils.bytesToString(m.shuffleBytesWritten)}").getOrElse("")
+ val shuffleWriteRecords = maybeShuffleWrite
+ .map(_.shuffleRecordsWritten.toString).getOrElse("")
val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime)
val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("")
@@ -472,12 +520,12 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
}}
{if (hasInput) {
- {inputReadable}
+ {s"$inputReadable / $inputRecords"}
|
}}
{if (hasOutput) {
- {outputReadable}
+ {s"$outputReadable / $outputRecords"}
|
}}
{if (hasShuffleRead) {
@@ -486,7 +534,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
{shuffleReadBlockedTimeReadable}
- {shuffleReadReadable}
+ {s"$shuffleReadReadable / $shuffleReadRecords"}
|
}}
{if (hasShuffleWrite) {
@@ -494,7 +542,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
{writeTimeReadable}
- {shuffleWriteReadable}
+ {s"$shuffleWriteReadable / $shuffleWriteRecords"}
|
}}
{if (hasBytesSpilled) {
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index 01f7e23212c3d..69aac6c862de5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -31,9 +31,13 @@ private[jobs] object UIData {
var failedTasks : Int = 0
var succeededTasks : Int = 0
var inputBytes : Long = 0
+ var inputRecords : Long = 0
var outputBytes : Long = 0
+ var outputRecords : Long = 0
var shuffleRead : Long = 0
+ var shuffleReadRecords : Long = 0
var shuffleWrite : Long = 0
+ var shuffleWriteRecords : Long = 0
var memoryBytesSpilled : Long = 0
var diskBytesSpilled : Long = 0
}
@@ -73,9 +77,13 @@ private[jobs] object UIData {
var executorRunTime: Long = _
var inputBytes: Long = _
+ var inputRecords: Long = _
var outputBytes: Long = _
+ var outputRecords: Long = _
var shuffleReadBytes: Long = _
+ var shuffleReadRecords : Long = _
var shuffleWriteBytes: Long = _
+ var shuffleWriteRecords: Long = _
var memoryBytesSpilled: Long = _
var diskBytesSpilled: Long = _
@@ -85,6 +93,12 @@ private[jobs] object UIData {
var accumulables = new HashMap[Long, AccumulableInfo]
var taskData = new HashMap[Long, TaskUIData]
var executorSummary = new HashMap[String, ExecutorSummary]
+
+ def hasInput = inputBytes > 0
+ def hasOutput = outputBytes > 0
+ def hasShuffleRead = shuffleReadBytes > 0
+ def hasShuffleWrite = shuffleWriteBytes > 0
+ def hasBytesSpilled = memoryBytesSpilled > 0 && diskBytesSpilled > 0
}
/**
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
index 12d23a92878cf..199f731b92bcc 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -30,7 +30,10 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
private val listener = parent.listener
def render(request: HttpServletRequest): Seq[Node] = {
- val rddId = request.getParameter("id").toInt
+ val parameterId = request.getParameter("id")
+ require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
+
+ val rddId = parameterId.toInt
val storageStatusList = listener.storageStatusList
val rddInfo = listener.rddInfoList.find(_.id == rddId).getOrElse {
// Rather than crashing, render an "RDD Not Found" page
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index c8407bbcb780b..b0b545640f5aa 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -293,22 +293,26 @@ private[spark] object JsonProtocol {
("Remote Blocks Fetched" -> shuffleReadMetrics.remoteBlocksFetched) ~
("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~
("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~
- ("Remote Bytes Read" -> shuffleReadMetrics.remoteBytesRead)
+ ("Remote Bytes Read" -> shuffleReadMetrics.remoteBytesRead) ~
+ ("Total Records Read" -> shuffleReadMetrics.recordsRead)
}
def shuffleWriteMetricsToJson(shuffleWriteMetrics: ShuffleWriteMetrics): JValue = {
("Shuffle Bytes Written" -> shuffleWriteMetrics.shuffleBytesWritten) ~
- ("Shuffle Write Time" -> shuffleWriteMetrics.shuffleWriteTime)
+ ("Shuffle Write Time" -> shuffleWriteMetrics.shuffleWriteTime) ~
+ ("Shuffle Records Written" -> shuffleWriteMetrics.shuffleRecordsWritten)
}
def inputMetricsToJson(inputMetrics: InputMetrics): JValue = {
("Data Read Method" -> inputMetrics.readMethod.toString) ~
- ("Bytes Read" -> inputMetrics.bytesRead)
+ ("Bytes Read" -> inputMetrics.bytesRead) ~
+ ("Records Read" -> inputMetrics.recordsRead)
}
def outputMetricsToJson(outputMetrics: OutputMetrics): JValue = {
("Data Write Method" -> outputMetrics.writeMethod.toString) ~
- ("Bytes Written" -> outputMetrics.bytesWritten)
+ ("Bytes Written" -> outputMetrics.bytesWritten) ~
+ ("Records Written" -> outputMetrics.recordsWritten)
}
def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = {
@@ -670,6 +674,7 @@ private[spark] object JsonProtocol {
metrics.incLocalBlocksFetched((json \ "Local Blocks Fetched").extract[Int])
metrics.incFetchWaitTime((json \ "Fetch Wait Time").extract[Long])
metrics.incRemoteBytesRead((json \ "Remote Bytes Read").extract[Long])
+ metrics.incRecordsRead((json \ "Total Records Read").extractOpt[Long].getOrElse(0))
metrics
}
@@ -677,13 +682,16 @@ private[spark] object JsonProtocol {
val metrics = new ShuffleWriteMetrics
metrics.incShuffleBytesWritten((json \ "Shuffle Bytes Written").extract[Long])
metrics.incShuffleWriteTime((json \ "Shuffle Write Time").extract[Long])
+ metrics.setShuffleRecordsWritten((json \ "Shuffle Records Written")
+ .extractOpt[Long].getOrElse(0))
metrics
}
def inputMetricsFromJson(json: JValue): InputMetrics = {
val metrics = new InputMetrics(
DataReadMethod.withName((json \ "Data Read Method").extract[String]))
- metrics.addBytesRead((json \ "Bytes Read").extract[Long])
+ metrics.incBytesRead((json \ "Bytes Read").extract[Long])
+ metrics.incRecordsRead((json \ "Records Read").extractOpt[Long].getOrElse(0))
metrics
}
@@ -691,6 +699,7 @@ private[spark] object JsonProtocol {
val metrics = new OutputMetrics(
DataWriteMethod.withName((json \ "Data Write Method").extract[String]))
metrics.setBytesWritten((json \ "Bytes Written").extract[Long])
+ metrics.setRecordsWritten((json \ "Records Written").extractOpt[Long].getOrElse(0))
metrics
}
diff --git a/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala b/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala
new file mode 100644
index 0000000000000..d9c7103b2f3bf
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.net.{URLClassLoader, URL}
+import java.util.Enumeration
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConversions._
+
+import org.apache.spark.util.ParentClassLoader
+
+/**
+ * URL class loader that exposes the `addURL` and `getURLs` methods in URLClassLoader.
+ */
+private[spark] class MutableURLClassLoader(urls: Array[URL], parent: ClassLoader)
+ extends URLClassLoader(urls, parent) {
+
+ override def addURL(url: URL): Unit = {
+ super.addURL(url)
+ }
+
+ override def getURLs(): Array[URL] = {
+ super.getURLs()
+ }
+
+}
+
+/**
+ * A mutable class loader that gives preference to its own URLs over the parent class loader
+ * when loading classes and resources.
+ */
+private[spark] class ChildFirstURLClassLoader(urls: Array[URL], parent: ClassLoader)
+ extends MutableURLClassLoader(urls, null) {
+
+ private val parentClassLoader = new ParentClassLoader(parent)
+
+ /**
+ * Used to implement fine-grained class loading locks similar to what is done by Java 7. This
+ * prevents deadlock issues when using non-hierarchical class loaders.
+ *
+ * Note that due to Java 6 compatibility (and some issues with implementing class loaders in
+ * Scala), Java 7's `ClassLoader.registerAsParallelCapable` method is not called.
+ */
+ private val locks = new ConcurrentHashMap[String, Object]()
+
+ override def loadClass(name: String, resolve: Boolean): Class[_] = {
+ var lock = locks.get(name)
+ if (lock == null) {
+ val newLock = new Object()
+ lock = locks.putIfAbsent(name, newLock)
+ if (lock == null) {
+ lock = newLock
+ }
+ }
+
+ lock.synchronized {
+ try {
+ super.loadClass(name, resolve)
+ } catch {
+ case e: ClassNotFoundException =>
+ parentClassLoader.loadClass(name, resolve)
+ }
+ }
+ }
+
+ override def getResource(name: String): URL = {
+ val url = super.findResource(name)
+ val res = if (url != null) url else parentClassLoader.getResource(name)
+ res
+ }
+
+ override def getResources(name: String): Enumeration[URL] = {
+ val urls = super.findResources(name)
+ val res =
+ if (urls != null && urls.hasMoreElements()) {
+ urls
+ } else {
+ parentClassLoader.getResources(name)
+ }
+ res
+ }
+
+ override def addURL(url: URL) {
+ super.addURL(url)
+ }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/util/ParentClassLoader.scala b/core/src/main/scala/org/apache/spark/util/ParentClassLoader.scala
index 3abc12681fe9a..6d8d9e8da3678 100644
--- a/core/src/main/scala/org/apache/spark/util/ParentClassLoader.scala
+++ b/core/src/main/scala/org/apache/spark/util/ParentClassLoader.scala
@@ -18,7 +18,7 @@
package org.apache.spark.util
/**
- * A class loader which makes findClass accesible to the child
+ * A class loader which makes some protected methods in ClassLoader accesible.
*/
private[spark] class ParentClassLoader(parent: ClassLoader) extends ClassLoader(parent) {
@@ -29,4 +29,9 @@ private[spark] class ParentClassLoader(parent: ClassLoader) extends ClassLoader(
override def loadClass(name: String): Class[_] = {
super.loadClass(name)
}
+
+ override def loadClass(name: String, resolve: Boolean): Class[_] = {
+ super.loadClass(name, resolve)
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 6ba03841f746b..eaec5a71e6819 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -763,6 +763,7 @@ private[spark] class ExternalSorter[K, V, C](
if (curWriteMetrics != null) {
m.incShuffleBytesWritten(curWriteMetrics.shuffleBytesWritten)
m.incShuffleWriteTime(curWriteMetrics.shuffleWriteTime)
+ m.incShuffleRecordsWritten(curWriteMetrics.shuffleRecordsWritten)
}
}
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 9eb87f016068d..d3123e854016b 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -175,6 +175,33 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(numExecutorsPending(manager) === 9)
}
+ test("cancel pending executors when no longer needed") {
+ sc = createSparkContext(1, 10)
+ val manager = sc.executorAllocationManager.get
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 5)))
+
+ assert(numExecutorsPending(manager) === 0)
+ assert(numExecutorsToAdd(manager) === 1)
+ assert(addExecutors(manager) === 1)
+ assert(numExecutorsPending(manager) === 1)
+ assert(numExecutorsToAdd(manager) === 2)
+ assert(addExecutors(manager) === 2)
+ assert(numExecutorsPending(manager) === 3)
+
+ val task1Info = createTaskInfo(0, 0, "executor-1")
+ sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task1Info))
+
+ assert(numExecutorsToAdd(manager) === 4)
+ assert(addExecutors(manager) === 2)
+
+ val task2Info = createTaskInfo(1, 0, "executor-1")
+ sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task2Info))
+ sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, null, task1Info, null))
+ sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, null, task2Info, null))
+
+ assert(adjustRequestedExecutors(manager) === -1)
+ }
+
test("remove executors") {
sc = createSparkContext(5, 10)
val manager = sc.executorAllocationManager.get
@@ -270,15 +297,15 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(removeExecutor(manager, "5"))
assert(removeExecutor(manager, "6"))
assert(executorIds(manager).size === 10)
- assert(addExecutors(manager) === 0) // still at upper limit
+ assert(addExecutors(manager) === 1)
onExecutorRemoved(manager, "3")
onExecutorRemoved(manager, "4")
assert(executorIds(manager).size === 8)
// Add succeeds again, now that we are no longer at the upper limit
// Number of executors added restarts at 1
- assert(addExecutors(manager) === 1)
- assert(addExecutors(manager) === 1) // upper limit reached again
+ assert(addExecutors(manager) === 2)
+ assert(addExecutors(manager) === 1) // upper limit reached
assert(addExecutors(manager) === 0)
assert(executorIds(manager).size === 8)
onExecutorRemoved(manager, "5")
@@ -286,9 +313,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
onExecutorAdded(manager, "13")
onExecutorAdded(manager, "14")
assert(executorIds(manager).size === 8)
- assert(addExecutors(manager) === 1)
- assert(addExecutors(manager) === 1) // upper limit reached again
- assert(addExecutors(manager) === 0)
+ assert(addExecutors(manager) === 0) // still at upper limit
onExecutorAdded(manager, "15")
onExecutorAdded(manager, "16")
assert(executorIds(manager).size === 10)
@@ -679,6 +704,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
private val _numExecutorsToAdd = PrivateMethod[Int]('numExecutorsToAdd)
private val _numExecutorsPending = PrivateMethod[Int]('numExecutorsPending)
+ private val _maxNumExecutorsNeeded = PrivateMethod[Int]('maxNumExecutorsNeeded)
private val _executorsPendingToRemove =
PrivateMethod[collection.Set[String]]('executorsPendingToRemove)
private val _executorIds = PrivateMethod[collection.Set[String]]('executorIds)
@@ -686,6 +712,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
private val _removeTimes = PrivateMethod[collection.Map[String, Long]]('removeTimes)
private val _schedule = PrivateMethod[Unit]('schedule)
private val _addExecutors = PrivateMethod[Int]('addExecutors)
+ private val _addOrCancelExecutorRequests = PrivateMethod[Int]('addOrCancelExecutorRequests)
private val _removeExecutor = PrivateMethod[Boolean]('removeExecutor)
private val _onExecutorAdded = PrivateMethod[Unit]('onExecutorAdded)
private val _onExecutorRemoved = PrivateMethod[Unit]('onExecutorRemoved)
@@ -724,7 +751,12 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
}
private def addExecutors(manager: ExecutorAllocationManager): Int = {
- manager invokePrivate _addExecutors()
+ val maxNumExecutorsNeeded = manager invokePrivate _maxNumExecutorsNeeded()
+ manager invokePrivate _addExecutors(maxNumExecutorsNeeded)
+ }
+
+ private def adjustRequestedExecutors(manager: ExecutorAllocationManager): Int = {
+ manager invokePrivate _addOrCancelExecutorRequests(0L)
}
private def removeExecutor(manager: ExecutorAllocationManager, id: String): Boolean = {
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index e08210ae60d17..ea6b73bc68b34 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -197,6 +197,18 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro
serializer.newInstance().serialize(new StringBuffer())
}
+ test("deprecated config keys") {
+ val conf = new SparkConf()
+ .set("spark.files.userClassPathFirst", "true")
+ .set("spark.yarn.user.classpath.first", "true")
+ assert(conf.contains("spark.files.userClassPathFirst"))
+ assert(conf.contains("spark.executor.userClassPathFirst"))
+ assert(conf.contains("spark.yarn.user.classpath.first"))
+ assert(conf.getBoolean("spark.files.userClassPathFirst", false))
+ assert(conf.getBoolean("spark.executor.userClassPathFirst", false))
+ assert(conf.getBoolean("spark.yarn.user.classpath.first", false))
+ }
+
}
class Class1 {}
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index ed02ca81e405c..e955636cf5b59 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -68,7 +68,8 @@ class JsonProtocolSuite extends FunSuite {
val completedApps = Array[ApplicationInfo]()
val activeDrivers = Array(createDriverInfo())
val completedDrivers = Array(createDriverInfo())
- val stateResponse = new MasterStateResponse("host", 8080, workers, activeApps, completedApps,
+ val stateResponse = new MasterStateResponse(
+ "host", 8080, None, workers, activeApps, completedApps,
activeDrivers, completedDrivers, RecoveryState.ALIVE)
val output = JsonProtocol.writeMasterState(stateResponse)
assertValidJson(output)
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 3f1355f82893e..46d745c4ecbfa 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -21,6 +21,8 @@ import java.io._
import scala.collection.mutable.ArrayBuffer
+import com.google.common.base.Charsets.UTF_8
+import com.google.common.io.ByteStreams
import org.scalatest.FunSuite
import org.scalatest.Matchers
import org.scalatest.concurrent.Timeouts
@@ -141,7 +143,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
- val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
+ val (childArgs, classpath, sysProps, mainClass) = prepareSubmitEnvironment(appArgs)
val childArgsStr = childArgs.mkString(" ")
childArgsStr should include ("--class org.SomeClass")
childArgsStr should include ("--executor-memory 5g")
@@ -180,7 +182,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
- val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
+ val (childArgs, classpath, sysProps, mainClass) = prepareSubmitEnvironment(appArgs)
childArgs.mkString(" ") should be ("arg1 arg2")
mainClass should be ("org.SomeClass")
classpath should have length (4)
@@ -201,6 +203,18 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
}
test("handles standalone cluster mode") {
+ testStandaloneCluster(useRest = true)
+ }
+
+ test("handles legacy standalone cluster mode") {
+ testStandaloneCluster(useRest = false)
+ }
+
+ /**
+ * Test whether the launch environment is correctly set up in standalone cluster mode.
+ * @param useRest whether to use the REST submission gateway introduced in Spark 1.3
+ */
+ private def testStandaloneCluster(useRest: Boolean): Unit = {
val clArgs = Seq(
"--deploy-mode", "cluster",
"--master", "spark://h:p",
@@ -212,17 +226,26 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
- val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
+ appArgs.useRest = useRest
+ val (childArgs, classpath, sysProps, mainClass) = prepareSubmitEnvironment(appArgs)
val childArgsStr = childArgs.mkString(" ")
- childArgsStr should startWith ("--memory 4g --cores 5 --supervise")
- childArgsStr should include regex ("launch spark://h:p .*thejar.jar org.SomeClass arg1 arg2")
- mainClass should be ("org.apache.spark.deploy.Client")
- classpath should have size (0)
- sysProps should have size (5)
+ if (useRest) {
+ childArgsStr should endWith ("thejar.jar org.SomeClass arg1 arg2")
+ mainClass should be ("org.apache.spark.deploy.rest.StandaloneRestClient")
+ } else {
+ childArgsStr should startWith ("--supervise --memory 4g --cores 5")
+ childArgsStr should include regex "launch spark://h:p .*thejar.jar org.SomeClass arg1 arg2"
+ mainClass should be ("org.apache.spark.deploy.Client")
+ }
+ classpath should have size 0
+ sysProps should have size 8
sysProps.keys should contain ("SPARK_SUBMIT")
sysProps.keys should contain ("spark.master")
sysProps.keys should contain ("spark.app.name")
sysProps.keys should contain ("spark.jars")
+ sysProps.keys should contain ("spark.driver.memory")
+ sysProps.keys should contain ("spark.driver.cores")
+ sysProps.keys should contain ("spark.driver.supervise")
sysProps.keys should contain ("spark.shuffle.spill")
sysProps("spark.shuffle.spill") should be ("false")
}
@@ -239,7 +262,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
- val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
+ val (childArgs, classpath, sysProps, mainClass) = prepareSubmitEnvironment(appArgs)
childArgs.mkString(" ") should be ("arg1 arg2")
mainClass should be ("org.SomeClass")
classpath should have length (1)
@@ -261,7 +284,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
- val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
+ val (childArgs, classpath, sysProps, mainClass) = prepareSubmitEnvironment(appArgs)
childArgs.mkString(" ") should be ("arg1 arg2")
mainClass should be ("org.SomeClass")
classpath should have length (1)
@@ -281,7 +304,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
- val (_, _, sysProps, mainClass) = createLaunchEnv(appArgs)
+ val (_, _, sysProps, mainClass) = prepareSubmitEnvironment(appArgs)
sysProps("spark.executor.memory") should be ("5g")
sysProps("spark.master") should be ("yarn-cluster")
mainClass should be ("org.apache.spark.deploy.yarn.Client")
@@ -339,7 +362,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
"--files", files,
"thejar.jar")
val appArgs = new SparkSubmitArguments(clArgs)
- val sysProps = SparkSubmit.createLaunchEnv(appArgs)._3
+ val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs)._3
appArgs.jars should be (Utils.resolveURIs(jars))
appArgs.files should be (Utils.resolveURIs(files))
sysProps("spark.jars") should be (Utils.resolveURIs(jars + ",thejar.jar"))
@@ -354,7 +377,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
"thejar.jar"
)
val appArgs2 = new SparkSubmitArguments(clArgs2)
- val sysProps2 = SparkSubmit.createLaunchEnv(appArgs2)._3
+ val sysProps2 = SparkSubmit.prepareSubmitEnvironment(appArgs2)._3
appArgs2.files should be (Utils.resolveURIs(files))
appArgs2.archives should be (Utils.resolveURIs(archives))
sysProps2("spark.yarn.dist.files") should be (Utils.resolveURIs(files))
@@ -367,7 +390,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
"mister.py"
)
val appArgs3 = new SparkSubmitArguments(clArgs3)
- val sysProps3 = SparkSubmit.createLaunchEnv(appArgs3)._3
+ val sysProps3 = SparkSubmit.prepareSubmitEnvironment(appArgs3)._3
appArgs3.pyFiles should be (Utils.resolveURIs(pyFiles))
sysProps3("spark.submit.pyFiles") should be (
PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
@@ -392,7 +415,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
"thejar.jar"
)
val appArgs = new SparkSubmitArguments(clArgs)
- val sysProps = SparkSubmit.createLaunchEnv(appArgs)._3
+ val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs)._3
sysProps("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar"))
sysProps("spark.files") should be(Utils.resolveURIs(files))
@@ -409,7 +432,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
"thejar.jar"
)
val appArgs2 = new SparkSubmitArguments(clArgs2)
- val sysProps2 = SparkSubmit.createLaunchEnv(appArgs2)._3
+ val sysProps2 = SparkSubmit.prepareSubmitEnvironment(appArgs2)._3
sysProps2("spark.yarn.dist.files") should be(Utils.resolveURIs(files))
sysProps2("spark.yarn.dist.archives") should be(Utils.resolveURIs(archives))
@@ -424,11 +447,24 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
"mister.py"
)
val appArgs3 = new SparkSubmitArguments(clArgs3)
- val sysProps3 = SparkSubmit.createLaunchEnv(appArgs3)._3
+ val sysProps3 = SparkSubmit.prepareSubmitEnvironment(appArgs3)._3
sysProps3("spark.submit.pyFiles") should be(
PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
}
+ test("user classpath first in driver") {
+ val systemJar = TestUtils.createJarWithFiles(Map("test.resource" -> "SYSTEM"))
+ val userJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"))
+ val args = Seq(
+ "--class", UserClasspathFirstTest.getClass.getName.stripSuffix("$"),
+ "--name", "testApp",
+ "--master", "local",
+ "--conf", "spark.driver.extraClassPath=" + systemJar,
+ "--conf", "spark.driver.userClassPathFirst=true",
+ userJar.toString)
+ runSparkSubmit(args)
+ }
+
test("SPARK_CONF_DIR overrides spark-defaults.conf") {
forConfDir(Map("spark.executor.memory" -> "2.3g")) { path =>
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
@@ -440,7 +476,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
val appArgs = new SparkSubmitArguments(args, Map("SPARK_CONF_DIR" -> path))
assert(appArgs.propertiesFile != null)
assert(appArgs.propertiesFile.startsWith(path))
- appArgs.executorMemory should be ("2.3g")
+ appArgs.executorMemory should be ("2.3g")
}
}
@@ -520,3 +556,15 @@ object SimpleApplicationTest {
}
}
}
+
+object UserClasspathFirstTest {
+ def main(args: Array[String]) {
+ val ccl = Thread.currentThread().getContextClassLoader()
+ val resource = ccl.getResourceAsStream("test.resource")
+ val bytes = ByteStreams.toByteArray(resource)
+ val contents = new String(bytes, 0, bytes.length, UTF_8)
+ if (contents != "USER") {
+ throw new SparkException("Should have read user resource, but instead read: " + contents)
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 1d95432258111..85939eaadccc7 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -37,13 +37,8 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
private var testDir: File = null
- private var provider: FsHistoryProvider = null
-
before {
testDir = Utils.createTempDir()
- provider = new FsHistoryProvider(new SparkConf()
- .set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
- .set("spark.history.fs.updateInterval", "0"))
}
after {
@@ -51,40 +46,41 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
}
test("Parse new and old application logs") {
- val conf = new SparkConf()
- .set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
- .set("spark.history.fs.updateInterval", "0")
- val provider = new FsHistoryProvider(conf)
+ val provider = new FsHistoryProvider(createTestConf())
// Write a new-style application log.
- val logFile1 = new File(testDir, "new1")
- writeFile(logFile1, true, None,
- SparkListenerApplicationStart("app1-1", None, 1L, "test"),
- SparkListenerApplicationEnd(2L)
+ val newAppComplete = new File(testDir, "new1")
+ writeFile(newAppComplete, true, None,
+ SparkListenerApplicationStart("new-app-complete", None, 1L, "test"),
+ SparkListenerApplicationEnd(4L)
)
// Write an unfinished app, new-style.
- val logFile2 = new File(testDir, "new2" + EventLoggingListener.IN_PROGRESS)
- writeFile(logFile2, true, None,
- SparkListenerApplicationStart("app2-2", None, 1L, "test")
+ val newAppIncomplete = new File(testDir, "new2" + EventLoggingListener.IN_PROGRESS)
+ writeFile(newAppIncomplete, true, None,
+ SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test")
)
// Write an old-style application log.
- val oldLog = new File(testDir, "old1")
- oldLog.mkdir()
- createEmptyFile(new File(oldLog, provider.SPARK_VERSION_PREFIX + "1.0"))
- writeFile(new File(oldLog, provider.LOG_PREFIX + "1"), false, None,
- SparkListenerApplicationStart("app3", None, 2L, "test"),
+ val oldAppComplete = new File(testDir, "old1")
+ oldAppComplete.mkdir()
+ createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0"))
+ writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None,
+ SparkListenerApplicationStart("old-app-complete", None, 2L, "test"),
SparkListenerApplicationEnd(3L)
)
- createEmptyFile(new File(oldLog, provider.APPLICATION_COMPLETE))
+ createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE))
+
+ // Check for logs so that we force the older unfinished app to be loaded, to make
+ // sure unfinished apps are also sorted correctly.
+ provider.checkForLogs()
// Write an unfinished app, old-style.
- val oldLog2 = new File(testDir, "old2")
- oldLog2.mkdir()
- createEmptyFile(new File(oldLog2, provider.SPARK_VERSION_PREFIX + "1.0"))
- writeFile(new File(oldLog2, provider.LOG_PREFIX + "1"), false, None,
- SparkListenerApplicationStart("app4", None, 2L, "test")
+ val oldAppIncomplete = new File(testDir, "old2")
+ oldAppIncomplete.mkdir()
+ createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0"))
+ writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None,
+ SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test")
)
// Force a reload of data from the log directory, and check that both logs are loaded.
@@ -96,14 +92,14 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
list.size should be (4)
list.count(e => e.completed) should be (2)
- list(0) should be (ApplicationHistoryInfo(oldLog.getName(), "app3", 2L, 3L,
- oldLog.lastModified(), "test", true))
- list(1) should be (ApplicationHistoryInfo(logFile1.getName(), "app1-1", 1L, 2L,
- logFile1.lastModified(), "test", true))
- list(2) should be (ApplicationHistoryInfo(oldLog2.getName(), "app4", 2L, -1L,
- oldLog2.lastModified(), "test", false))
- list(3) should be (ApplicationHistoryInfo(logFile2.getName(), "app2-2", 1L, -1L,
- logFile2.lastModified(), "test", false))
+ list(0) should be (ApplicationHistoryInfo(newAppComplete.getName(), "new-app-complete", 1L, 4L,
+ newAppComplete.lastModified(), "test", true))
+ list(1) should be (ApplicationHistoryInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
+ oldAppComplete.lastModified(), "test", true))
+ list(2) should be (ApplicationHistoryInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L,
+ -1L, oldAppIncomplete.lastModified(), "test", false))
+ list(3) should be (ApplicationHistoryInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L,
+ -1L, newAppIncomplete.lastModified(), "test", false))
// Make sure the UI can be rendered.
list.foreach { case info =>
@@ -113,6 +109,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
}
test("Parse legacy logs with compression codec set") {
+ val provider = new FsHistoryProvider(createTestConf())
val testCodecs = List((classOf[LZFCompressionCodec].getName(), true),
(classOf[SnappyCompressionCodec].getName(), true),
("invalid.codec", false))
@@ -156,10 +153,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
)
logFile2.setReadable(false, false)
- val conf = new SparkConf()
- .set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
- .set("spark.history.fs.updateInterval", "0")
- val provider = new FsHistoryProvider(conf)
+ val provider = new FsHistoryProvider(createTestConf())
provider.checkForLogs()
val list = provider.getListing().toSeq
@@ -168,10 +162,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
}
test("history file is renamed from inprogress to completed") {
- val conf = new SparkConf()
- .set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
- .set("spark.testing", "true")
- val provider = new FsHistoryProvider(conf)
+ val provider = new FsHistoryProvider(createTestConf())
val logFile1 = new File(testDir, "app1" + EventLoggingListener.IN_PROGRESS)
writeFile(logFile1, true, None,
@@ -191,9 +182,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
}
test("SPARK-5582: empty log directory") {
- val conf = new SparkConf()
- .set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
- val provider = new FsHistoryProvider(conf)
+ val provider = new FsHistoryProvider(createTestConf())
val logFile1 = new File(testDir, "app1" + EventLoggingListener.IN_PROGRESS)
writeFile(logFile1, true, None,
@@ -229,4 +218,8 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
new FileOutputStream(file).close()
}
+ private def createTestConf(): SparkConf = {
+ new SparkConf().set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
new file mode 100644
index 0000000000000..29aed89b67aa7
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.rest
+
+import java.io.{File, FileInputStream, FileOutputStream, PrintWriter}
+import java.util.jar.{JarEntry, JarOutputStream}
+import java.util.zip.ZipEntry
+
+import scala.collection.mutable.ArrayBuffer
+import scala.io.Source
+
+import akka.actor.ActorSystem
+import com.google.common.io.ByteStreams
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
+import org.scalatest.exceptions.TestFailedException
+
+import org.apache.spark._
+import org.apache.spark.util.Utils
+import org.apache.spark.deploy.{SparkSubmit, SparkSubmitArguments}
+import org.apache.spark.deploy.master.{DriverState, Master}
+import org.apache.spark.deploy.worker.Worker
+
+/**
+ * End-to-end tests for the REST application submission protocol in standalone mode.
+ */
+class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
+ private val systemsToStop = new ArrayBuffer[ActorSystem]
+ private val masterRestUrl = startLocalCluster()
+ private val client = new StandaloneRestClient
+ private val mainJar = StandaloneRestSubmitSuite.createJar()
+ private val mainClass = StandaloneRestApp.getClass.getName.stripSuffix("$")
+
+ override def afterAll() {
+ systemsToStop.foreach(_.shutdown())
+ }
+
+ test("simple submit until completion") {
+ val resultsFile = File.createTempFile("test-submit", ".txt")
+ val numbers = Seq(1, 2, 3)
+ val size = 500
+ val submissionId = submitApplication(resultsFile, numbers, size)
+ waitUntilFinished(submissionId)
+ validateResult(resultsFile, numbers, size)
+ }
+
+ test("kill empty submission") {
+ val response = client.killSubmission(masterRestUrl, "submission-that-does-not-exist")
+ val killResponse = getKillResponse(response)
+ val killSuccess = killResponse.success
+ assert(!killSuccess)
+ }
+
+ test("kill running submission") {
+ val resultsFile = File.createTempFile("test-kill", ".txt")
+ val numbers = Seq(1, 2, 3)
+ val size = 500
+ val submissionId = submitApplication(resultsFile, numbers, size)
+ val response = client.killSubmission(masterRestUrl, submissionId)
+ val killResponse = getKillResponse(response)
+ val killSuccess = killResponse.success
+ waitUntilFinished(submissionId)
+ val response2 = client.requestSubmissionStatus(masterRestUrl, submissionId)
+ val statusResponse = getStatusResponse(response2)
+ val statusSuccess = statusResponse.success
+ val driverState = statusResponse.driverState
+ assert(killSuccess)
+ assert(statusSuccess)
+ assert(driverState === DriverState.KILLED.toString)
+ // we should not see the expected results because we killed the submission
+ intercept[TestFailedException] { validateResult(resultsFile, numbers, size) }
+ }
+
+ test("request status for empty submission") {
+ val response = client.requestSubmissionStatus(masterRestUrl, "submission-that-does-not-exist")
+ val statusResponse = getStatusResponse(response)
+ val statusSuccess = statusResponse.success
+ assert(!statusSuccess)
+ }
+
+ /**
+ * Start a local cluster containing one Master and a few Workers.
+ * Do not use [[org.apache.spark.deploy.LocalSparkCluster]] here because we want the REST URL.
+ * Return the Master's REST URL to which applications should be submitted.
+ */
+ private def startLocalCluster(): String = {
+ val conf = new SparkConf(false)
+ .set("spark.master.rest.enabled", "true")
+ .set("spark.master.rest.port", "0")
+ val (numWorkers, coresPerWorker, memPerWorker) = (2, 1, 512)
+ val localHostName = Utils.localHostName()
+ val (masterSystem, masterPort, _, _masterRestPort) =
+ Master.startSystemAndActor(localHostName, 0, 0, conf)
+ val masterRestPort = _masterRestPort.getOrElse { fail("REST server not started on Master!") }
+ val masterUrl = "spark://" + localHostName + ":" + masterPort
+ val masterRestUrl = "spark://" + localHostName + ":" + masterRestPort
+ (1 to numWorkers).foreach { n =>
+ val (workerSystem, _) = Worker.startSystemAndActor(
+ localHostName, 0, 0, coresPerWorker, memPerWorker, Array(masterUrl), null, Some(n))
+ systemsToStop.append(workerSystem)
+ }
+ systemsToStop.append(masterSystem)
+ masterRestUrl
+ }
+
+ /** Submit the [[StandaloneRestApp]] and return the corresponding submission ID. */
+ private def submitApplication(resultsFile: File, numbers: Seq[Int], size: Int): String = {
+ val appArgs = Seq(resultsFile.getAbsolutePath) ++ numbers.map(_.toString) ++ Seq(size.toString)
+ val commandLineArgs = Array(
+ "--deploy-mode", "cluster",
+ "--master", masterRestUrl,
+ "--name", mainClass,
+ "--class", mainClass,
+ mainJar) ++ appArgs
+ val args = new SparkSubmitArguments(commandLineArgs)
+ val (_, _, sparkProperties, _) = SparkSubmit.prepareSubmitEnvironment(args)
+ val request = client.constructSubmitRequest(
+ mainJar, mainClass, appArgs.toArray, sparkProperties.toMap, Map.empty)
+ val response = client.createSubmission(masterRestUrl, request)
+ val submitResponse = getSubmitResponse(response)
+ val submissionId = submitResponse.submissionId
+ assert(submissionId != null, "Application submission was unsuccessful!")
+ submissionId
+ }
+
+ /** Wait until the given submission has finished running up to the specified timeout. */
+ private def waitUntilFinished(submissionId: String, maxSeconds: Int = 30): Unit = {
+ var finished = false
+ val expireTime = System.currentTimeMillis + maxSeconds * 1000
+ while (!finished) {
+ val response = client.requestSubmissionStatus(masterRestUrl, submissionId)
+ val statusResponse = getStatusResponse(response)
+ val driverState = statusResponse.driverState
+ finished =
+ driverState != DriverState.SUBMITTED.toString &&
+ driverState != DriverState.RUNNING.toString
+ if (System.currentTimeMillis > expireTime) {
+ fail(s"Driver $submissionId did not finish within $maxSeconds seconds.")
+ }
+ }
+ }
+
+ /** Return the response as a submit response, or fail with error otherwise. */
+ private def getSubmitResponse(response: SubmitRestProtocolResponse): CreateSubmissionResponse = {
+ response match {
+ case s: CreateSubmissionResponse => s
+ case e: ErrorResponse => fail(s"Server returned error: ${e.message}")
+ case r => fail(s"Expected submit response. Actual: ${r.toJson}")
+ }
+ }
+
+ /** Return the response as a kill response, or fail with error otherwise. */
+ private def getKillResponse(response: SubmitRestProtocolResponse): KillSubmissionResponse = {
+ response match {
+ case k: KillSubmissionResponse => k
+ case e: ErrorResponse => fail(s"Server returned error: ${e.message}")
+ case r => fail(s"Expected kill response. Actual: ${r.toJson}")
+ }
+ }
+
+ /** Return the response as a status response, or fail with error otherwise. */
+ private def getStatusResponse(response: SubmitRestProtocolResponse): SubmissionStatusResponse = {
+ response match {
+ case s: SubmissionStatusResponse => s
+ case e: ErrorResponse => fail(s"Server returned error: ${e.message}")
+ case r => fail(s"Expected status response. Actual: ${r.toJson}")
+ }
+ }
+
+ /** Validate whether the application produced the corrupt output. */
+ private def validateResult(resultsFile: File, numbers: Seq[Int], size: Int): Unit = {
+ val lines = Source.fromFile(resultsFile.getAbsolutePath).getLines().toSeq
+ val unexpectedContent =
+ if (lines.nonEmpty) {
+ "[\n" + lines.map { l => " " + l }.mkString("\n") + "\n]"
+ } else {
+ "[EMPTY]"
+ }
+ assert(lines.size === 2, s"Unexpected content in file: $unexpectedContent")
+ assert(lines(0).toInt === numbers.sum, s"Sum of ${numbers.mkString(",")} is incorrect")
+ assert(lines(1).toInt === (size / 2) + 1, "Result of Spark job is incorrect")
+ }
+}
+
+private object StandaloneRestSubmitSuite {
+ private val pathPrefix = this.getClass.getPackage.getName.replaceAll("\\.", "/")
+
+ /**
+ * Create a jar that contains all the class files needed for running the [[StandaloneRestApp]].
+ * Return the absolute path to that jar.
+ */
+ def createJar(): String = {
+ val jarFile = File.createTempFile("test-standalone-rest-protocol", ".jar")
+ val jarFileStream = new FileOutputStream(jarFile)
+ val jarStream = new JarOutputStream(jarFileStream, new java.util.jar.Manifest)
+ jarStream.putNextEntry(new ZipEntry(pathPrefix))
+ getClassFiles.foreach { cf =>
+ jarStream.putNextEntry(new JarEntry(pathPrefix + "/" + cf.getName))
+ val in = new FileInputStream(cf)
+ ByteStreams.copy(in, jarStream)
+ in.close()
+ }
+ jarStream.close()
+ jarFileStream.close()
+ jarFile.getAbsolutePath
+ }
+
+ /**
+ * Return a list of class files compiled for [[StandaloneRestApp]].
+ * This includes all the anonymous classes used in the application.
+ */
+ private def getClassFiles: Seq[File] = {
+ val className = Utils.getFormattedClassName(StandaloneRestApp)
+ val clazz = StandaloneRestApp.getClass
+ val basePath = clazz.getProtectionDomain.getCodeSource.getLocation.toURI.getPath
+ val baseDir = new File(basePath + "/" + pathPrefix)
+ baseDir.listFiles().filter(_.getName.contains(className))
+ }
+}
+
+/**
+ * Sample application to be submitted to the cluster using the REST gateway.
+ * All relevant classes will be packaged into a jar at run time.
+ */
+object StandaloneRestApp {
+ // Usage: [path to results file] [num1] [num2] [num3] [rddSize]
+ // The first line of the results file should be (num1 + num2 + num3)
+ // The second line should be (rddSize / 2) + 1
+ def main(args: Array[String]) {
+ assert(args.size == 5, s"Expected exactly 5 arguments: ${args.mkString(",")}")
+ val resultFile = new File(args(0))
+ val writer = new PrintWriter(resultFile)
+ try {
+ val conf = new SparkConf()
+ val sc = new SparkContext(conf)
+ val firstLine = args(1).toInt + args(2).toInt + args(3).toInt
+ val secondLine = sc.parallelize(1 to args(4).toInt)
+ .map { i => (i / 2, i) }
+ .reduceByKey(_ + _)
+ .count()
+ writer.println(firstLine)
+ writer.println(secondLine)
+ } catch {
+ case e: Exception =>
+ writer.println(e)
+ e.getStackTrace.foreach { l => writer.println(" " + l) }
+ } finally {
+ writer.close()
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala
new file mode 100644
index 0000000000000..1d64ec201e647
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.rest
+
+import java.lang.Boolean
+import java.lang.Integer
+
+import org.json4s.jackson.JsonMethods._
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkConf
+
+/**
+ * Tests for the REST application submission protocol.
+ */
+class SubmitRestProtocolSuite extends FunSuite {
+
+ test("validate") {
+ val request = new DummyRequest
+ intercept[SubmitRestProtocolException] { request.validate() } // missing everything
+ request.clientSparkVersion = "1.2.3"
+ intercept[SubmitRestProtocolException] { request.validate() } // missing name and age
+ request.name = "something"
+ intercept[SubmitRestProtocolException] { request.validate() } // missing only age
+ request.age = 2
+ intercept[SubmitRestProtocolException] { request.validate() } // age too low
+ request.age = 10
+ request.validate() // everything is set properly
+ request.clientSparkVersion = null
+ intercept[SubmitRestProtocolException] { request.validate() } // missing only Spark version
+ request.clientSparkVersion = "1.2.3"
+ request.name = null
+ intercept[SubmitRestProtocolException] { request.validate() } // missing only name
+ request.message = "not-setting-name"
+ intercept[SubmitRestProtocolException] { request.validate() } // still missing name
+ }
+
+ test("request to and from JSON") {
+ val request = new DummyRequest
+ intercept[SubmitRestProtocolException] { request.toJson } // implicit validation
+ request.clientSparkVersion = "1.2.3"
+ request.active = true
+ request.age = 25
+ request.name = "jung"
+ val json = request.toJson
+ assertJsonEquals(json, dummyRequestJson)
+ val newRequest = SubmitRestProtocolMessage.fromJson(json, classOf[DummyRequest])
+ assert(newRequest.clientSparkVersion === "1.2.3")
+ assert(newRequest.clientSparkVersion === "1.2.3")
+ assert(newRequest.active)
+ assert(newRequest.age === 25)
+ assert(newRequest.name === "jung")
+ assert(newRequest.message === null)
+ }
+
+ test("response to and from JSON") {
+ val response = new DummyResponse
+ response.serverSparkVersion = "3.3.4"
+ response.success = true
+ val json = response.toJson
+ assertJsonEquals(json, dummyResponseJson)
+ val newResponse = SubmitRestProtocolMessage.fromJson(json, classOf[DummyResponse])
+ assert(newResponse.serverSparkVersion === "3.3.4")
+ assert(newResponse.serverSparkVersion === "3.3.4")
+ assert(newResponse.success)
+ assert(newResponse.message === null)
+ }
+
+ test("CreateSubmissionRequest") {
+ val message = new CreateSubmissionRequest
+ intercept[SubmitRestProtocolException] { message.validate() }
+ message.clientSparkVersion = "1.2.3"
+ message.appResource = "honey-walnut-cherry.jar"
+ message.mainClass = "org.apache.spark.examples.SparkPie"
+ val conf = new SparkConf(false)
+ conf.set("spark.app.name", "SparkPie")
+ message.sparkProperties = conf.getAll.toMap
+ message.validate()
+ // optional fields
+ conf.set("spark.jars", "mayonnaise.jar,ketchup.jar")
+ conf.set("spark.files", "fireball.png")
+ conf.set("spark.driver.memory", "512m")
+ conf.set("spark.driver.cores", "180")
+ conf.set("spark.driver.extraJavaOptions", " -Dslices=5 -Dcolor=mostly_red")
+ conf.set("spark.driver.extraClassPath", "food-coloring.jar")
+ conf.set("spark.driver.extraLibraryPath", "pickle.jar")
+ conf.set("spark.driver.supervise", "false")
+ conf.set("spark.executor.memory", "256m")
+ conf.set("spark.cores.max", "10000")
+ message.sparkProperties = conf.getAll.toMap
+ message.appArgs = Array("two slices", "a hint of cinnamon")
+ message.environmentVariables = Map("PATH" -> "/dev/null")
+ message.validate()
+ // bad fields
+ var badConf = conf.clone().set("spark.driver.cores", "one hundred feet")
+ message.sparkProperties = badConf.getAll.toMap
+ intercept[SubmitRestProtocolException] { message.validate() }
+ badConf = conf.clone().set("spark.driver.supervise", "nope, never")
+ message.sparkProperties = badConf.getAll.toMap
+ intercept[SubmitRestProtocolException] { message.validate() }
+ badConf = conf.clone().set("spark.cores.max", "two men")
+ message.sparkProperties = badConf.getAll.toMap
+ intercept[SubmitRestProtocolException] { message.validate() }
+ message.sparkProperties = conf.getAll.toMap
+ // test JSON
+ val json = message.toJson
+ assertJsonEquals(json, submitDriverRequestJson)
+ val newMessage = SubmitRestProtocolMessage.fromJson(json, classOf[CreateSubmissionRequest])
+ assert(newMessage.clientSparkVersion === "1.2.3")
+ assert(newMessage.appResource === "honey-walnut-cherry.jar")
+ assert(newMessage.mainClass === "org.apache.spark.examples.SparkPie")
+ assert(newMessage.sparkProperties("spark.app.name") === "SparkPie")
+ assert(newMessage.sparkProperties("spark.jars") === "mayonnaise.jar,ketchup.jar")
+ assert(newMessage.sparkProperties("spark.files") === "fireball.png")
+ assert(newMessage.sparkProperties("spark.driver.memory") === "512m")
+ assert(newMessage.sparkProperties("spark.driver.cores") === "180")
+ assert(newMessage.sparkProperties("spark.driver.extraJavaOptions") === " -Dslices=5 -Dcolor=mostly_red")
+ assert(newMessage.sparkProperties("spark.driver.extraClassPath") === "food-coloring.jar")
+ assert(newMessage.sparkProperties("spark.driver.extraLibraryPath") === "pickle.jar")
+ assert(newMessage.sparkProperties("spark.driver.supervise") === "false")
+ assert(newMessage.sparkProperties("spark.executor.memory") === "256m")
+ assert(newMessage.sparkProperties("spark.cores.max") === "10000")
+ assert(newMessage.appArgs === message.appArgs)
+ assert(newMessage.sparkProperties === message.sparkProperties)
+ assert(newMessage.environmentVariables === message.environmentVariables)
+ }
+
+ test("CreateSubmissionResponse") {
+ val message = new CreateSubmissionResponse
+ intercept[SubmitRestProtocolException] { message.validate() }
+ message.serverSparkVersion = "1.2.3"
+ message.submissionId = "driver_123"
+ message.success = true
+ message.validate()
+ // test JSON
+ val json = message.toJson
+ assertJsonEquals(json, submitDriverResponseJson)
+ val newMessage = SubmitRestProtocolMessage.fromJson(json, classOf[CreateSubmissionResponse])
+ assert(newMessage.serverSparkVersion === "1.2.3")
+ assert(newMessage.submissionId === "driver_123")
+ assert(newMessage.success)
+ }
+
+ test("KillSubmissionResponse") {
+ val message = new KillSubmissionResponse
+ intercept[SubmitRestProtocolException] { message.validate() }
+ message.serverSparkVersion = "1.2.3"
+ message.submissionId = "driver_123"
+ message.success = true
+ message.validate()
+ // test JSON
+ val json = message.toJson
+ assertJsonEquals(json, killDriverResponseJson)
+ val newMessage = SubmitRestProtocolMessage.fromJson(json, classOf[KillSubmissionResponse])
+ assert(newMessage.serverSparkVersion === "1.2.3")
+ assert(newMessage.submissionId === "driver_123")
+ assert(newMessage.success)
+ }
+
+ test("SubmissionStatusResponse") {
+ val message = new SubmissionStatusResponse
+ intercept[SubmitRestProtocolException] { message.validate() }
+ message.serverSparkVersion = "1.2.3"
+ message.submissionId = "driver_123"
+ message.success = true
+ message.validate()
+ // optional fields
+ message.driverState = "RUNNING"
+ message.workerId = "worker_123"
+ message.workerHostPort = "1.2.3.4:7780"
+ // test JSON
+ val json = message.toJson
+ assertJsonEquals(json, driverStatusResponseJson)
+ val newMessage = SubmitRestProtocolMessage.fromJson(json, classOf[SubmissionStatusResponse])
+ assert(newMessage.serverSparkVersion === "1.2.3")
+ assert(newMessage.submissionId === "driver_123")
+ assert(newMessage.driverState === "RUNNING")
+ assert(newMessage.success)
+ assert(newMessage.workerId === "worker_123")
+ assert(newMessage.workerHostPort === "1.2.3.4:7780")
+ }
+
+ test("ErrorResponse") {
+ val message = new ErrorResponse
+ intercept[SubmitRestProtocolException] { message.validate() }
+ message.serverSparkVersion = "1.2.3"
+ message.message = "Field not found in submit request: X"
+ message.validate()
+ // test JSON
+ val json = message.toJson
+ assertJsonEquals(json, errorJson)
+ val newMessage = SubmitRestProtocolMessage.fromJson(json, classOf[ErrorResponse])
+ assert(newMessage.serverSparkVersion === "1.2.3")
+ assert(newMessage.message === "Field not found in submit request: X")
+ }
+
+ private val dummyRequestJson =
+ """
+ |{
+ | "action" : "DummyRequest",
+ | "active" : true,
+ | "age" : 25,
+ | "clientSparkVersion" : "1.2.3",
+ | "name" : "jung"
+ |}
+ """.stripMargin
+
+ private val dummyResponseJson =
+ """
+ |{
+ | "action" : "DummyResponse",
+ | "serverSparkVersion" : "3.3.4",
+ | "success": true
+ |}
+ """.stripMargin
+
+ private val submitDriverRequestJson =
+ """
+ |{
+ | "action" : "CreateSubmissionRequest",
+ | "appArgs" : [ "two slices", "a hint of cinnamon" ],
+ | "appResource" : "honey-walnut-cherry.jar",
+ | "clientSparkVersion" : "1.2.3",
+ | "environmentVariables" : {
+ | "PATH" : "/dev/null"
+ | },
+ | "mainClass" : "org.apache.spark.examples.SparkPie",
+ | "sparkProperties" : {
+ | "spark.driver.extraLibraryPath" : "pickle.jar",
+ | "spark.jars" : "mayonnaise.jar,ketchup.jar",
+ | "spark.driver.supervise" : "false",
+ | "spark.app.name" : "SparkPie",
+ | "spark.cores.max" : "10000",
+ | "spark.driver.memory" : "512m",
+ | "spark.files" : "fireball.png",
+ | "spark.driver.cores" : "180",
+ | "spark.driver.extraJavaOptions" : " -Dslices=5 -Dcolor=mostly_red",
+ | "spark.executor.memory" : "256m",
+ | "spark.driver.extraClassPath" : "food-coloring.jar"
+ | }
+ |}
+ """.stripMargin
+
+ private val submitDriverResponseJson =
+ """
+ |{
+ | "action" : "CreateSubmissionResponse",
+ | "serverSparkVersion" : "1.2.3",
+ | "submissionId" : "driver_123",
+ | "success" : true
+ |}
+ """.stripMargin
+
+ private val killDriverResponseJson =
+ """
+ |{
+ | "action" : "KillSubmissionResponse",
+ | "serverSparkVersion" : "1.2.3",
+ | "submissionId" : "driver_123",
+ | "success" : true
+ |}
+ """.stripMargin
+
+ private val driverStatusResponseJson =
+ """
+ |{
+ | "action" : "SubmissionStatusResponse",
+ | "driverState" : "RUNNING",
+ | "serverSparkVersion" : "1.2.3",
+ | "submissionId" : "driver_123",
+ | "success" : true,
+ | "workerHostPort" : "1.2.3.4:7780",
+ | "workerId" : "worker_123"
+ |}
+ """.stripMargin
+
+ private val errorJson =
+ """
+ |{
+ | "action" : "ErrorResponse",
+ | "message" : "Field not found in submit request: X",
+ | "serverSparkVersion" : "1.2.3"
+ |}
+ """.stripMargin
+
+ /** Assert that the contents in the two JSON strings are equal after ignoring whitespace. */
+ private def assertJsonEquals(jsonString1: String, jsonString2: String): Unit = {
+ val trimmedJson1 = jsonString1.trim
+ val trimmedJson2 = jsonString2.trim
+ val json1 = compact(render(parse(trimmedJson1)))
+ val json2 = compact(render(parse(trimmedJson2)))
+ // Put this on a separate line to avoid printing comparison twice when test fails
+ val equals = json1 == json2
+ assert(equals, "\"[%s]\" did not equal \"[%s]\"".format(trimmedJson1, trimmedJson2))
+ }
+}
+
+private class DummyResponse extends SubmitRestProtocolResponse
+private class DummyRequest extends SubmitRestProtocolRequest {
+ var active: Boolean = null
+ var age: Integer = null
+ var name: String = null
+ protected override def doValidate(): Unit = {
+ super.doValidate()
+ assertFieldIsSet(name, "name")
+ assertFieldIsSet(age, "age")
+ assert(age > 5, "Not old enough!")
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
new file mode 100644
index 0000000000000..326e203afe136
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import org.scalatest.FunSuite
+
+class TaskMetricsSuite extends FunSuite {
+ test("[SPARK-5701] updateShuffleReadMetrics: ShuffleReadMetrics not added when no shuffle deps") {
+ val taskMetrics = new TaskMetrics()
+ taskMetrics.updateShuffleReadMetrics()
+ assert(taskMetrics.shuffleReadMetrics.isEmpty)
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
index 81db66ae17464..78fa98a3b9065 100644
--- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
@@ -21,44 +21,46 @@ import java.io.{File, FileWriter, PrintWriter}
import scala.collection.mutable.ArrayBuffer
-import org.scalatest.FunSuite
-
+import org.apache.commons.lang.math.RandomUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{LongWritable, Text}
-import org.apache.hadoop.mapred.{FileSplit => OldFileSplit, InputSplit => OldInputSplit, JobConf,
- LineRecordReader => OldLineRecordReader, RecordReader => OldRecordReader, Reporter,
- TextInputFormat => OldTextInputFormat}
import org.apache.hadoop.mapred.lib.{CombineFileInputFormat => OldCombineFileInputFormat,
- CombineFileSplit => OldCombineFileSplit, CombineFileRecordReader => OldCombineFileRecordReader}
-import org.apache.hadoop.mapreduce.{InputSplit => NewInputSplit, RecordReader => NewRecordReader,
- TaskAttemptContext}
+ CombineFileRecordReader => OldCombineFileRecordReader, CombineFileSplit => OldCombineFileSplit}
+import org.apache.hadoop.mapred.{JobConf, Reporter, FileSplit => OldFileSplit,
+ InputSplit => OldInputSplit, LineRecordReader => OldLineRecordReader,
+ RecordReader => OldRecordReader, TextInputFormat => OldTextInputFormat}
import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat => NewCombineFileInputFormat,
CombineFileRecordReader => NewCombineFileRecordReader, CombineFileSplit => NewCombineFileSplit,
FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat}
+import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
+import org.apache.hadoop.mapreduce.{TaskAttemptContext, InputSplit => NewInputSplit,
+ RecordReader => NewRecordReader}
+import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.SharedSparkContext
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.util.Utils
-class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
+class InputOutputMetricsSuite extends FunSuite with SharedSparkContext
+ with BeforeAndAfter {
@transient var tmpDir: File = _
@transient var tmpFile: File = _
@transient var tmpFilePath: String = _
+ @transient val numRecords: Int = 100000
+ @transient val numBuckets: Int = 10
- override def beforeAll() {
- super.beforeAll()
-
+ before {
tmpDir = Utils.createTempDir()
val testTempDir = new File(tmpDir, "test")
testTempDir.mkdir()
tmpFile = new File(testTempDir, getClass.getSimpleName + ".txt")
val pw = new PrintWriter(new FileWriter(tmpFile))
- for (x <- 1 to 1000000) {
- pw.println("s")
+ for (x <- 1 to numRecords) {
+ pw.println(RandomUtils.nextInt(numBuckets))
}
pw.close()
@@ -66,8 +68,7 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
tmpFilePath = "file://" + tmpFile.getAbsolutePath
}
- override def afterAll() {
- super.afterAll()
+ after {
Utils.deleteRecursively(tmpDir)
}
@@ -155,6 +156,101 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
assert(bytesRead >= tmpFile.length())
}
+ test("input metrics on records read - simple") {
+ val records = runAndReturnRecordsRead {
+ sc.textFile(tmpFilePath, 4).count()
+ }
+ assert(records == numRecords)
+ }
+
+ test("input metrics on records read - more stages") {
+ val records = runAndReturnRecordsRead {
+ sc.textFile(tmpFilePath, 4)
+ .map(key => (key.length, 1))
+ .reduceByKey(_ + _)
+ .count()
+ }
+ assert(records == numRecords)
+ }
+
+ test("input metrics on records - New Hadoop API") {
+ val records = runAndReturnRecordsRead {
+ sc.newAPIHadoopFile(tmpFilePath, classOf[NewTextInputFormat], classOf[LongWritable],
+ classOf[Text]).count()
+ }
+ assert(records == numRecords)
+ }
+
+ test("input metrics on recordsd read with cache") {
+ // prime the cache manager
+ val rdd = sc.textFile(tmpFilePath, 4).cache()
+ rdd.collect()
+
+ val records = runAndReturnRecordsRead {
+ rdd.count()
+ }
+
+ assert(records == numRecords)
+ }
+
+ test("shuffle records read metrics") {
+ val recordsRead = runAndReturnShuffleRecordsRead {
+ sc.textFile(tmpFilePath, 4)
+ .map(key => (key, 1))
+ .groupByKey()
+ .collect()
+ }
+ assert(recordsRead == numRecords)
+ }
+
+ test("shuffle records written metrics") {
+ val recordsWritten = runAndReturnShuffleRecordsWritten {
+ sc.textFile(tmpFilePath, 4)
+ .map(key => (key, 1))
+ .groupByKey()
+ .collect()
+ }
+ assert(recordsWritten == numRecords)
+ }
+
+ /**
+ * Tests the metrics from end to end.
+ * 1) reading a hadoop file
+ * 2) shuffle and writing to a hadoop file.
+ * 3) writing to hadoop file.
+ */
+ test("input read/write and shuffle read/write metrics all line up") {
+ var inputRead = 0L
+ var outputWritten = 0L
+ var shuffleRead = 0L
+ var shuffleWritten = 0L
+ sc.addSparkListener(new SparkListener() {
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+ val metrics = taskEnd.taskMetrics
+ metrics.inputMetrics.foreach(inputRead += _.recordsRead)
+ metrics.outputMetrics.foreach(outputWritten += _.recordsWritten)
+ metrics.shuffleReadMetrics.foreach(shuffleRead += _.recordsRead)
+ metrics.shuffleWriteMetrics.foreach(shuffleWritten += _.shuffleRecordsWritten)
+ }
+ })
+
+ val tmpFile = new File(tmpDir, getClass.getSimpleName)
+
+ sc.textFile(tmpFilePath, 4)
+ .map(key => (key, 1))
+ .reduceByKey(_+_)
+ .saveAsTextFile("file://" + tmpFile.getAbsolutePath)
+
+ sc.listenerBus.waitUntilEmpty(500)
+ assert(inputRead == numRecords)
+
+ // Only supported on newer Hadoop
+ if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) {
+ assert(outputWritten == numBuckets)
+ }
+ assert(shuffleRead == shuffleWritten)
+ }
+
test("input metrics with interleaved reads") {
val numPartitions = 2
val cartVector = 0 to 9
@@ -193,18 +289,66 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
assert(cartesianBytes == firstSize * numPartitions + (cartVector.length * secondSize))
}
- private def runAndReturnBytesRead(job : => Unit): Long = {
- val taskBytesRead = new ArrayBuffer[Long]()
+ private def runAndReturnBytesRead(job: => Unit): Long = {
+ runAndReturnMetrics(job, _.taskMetrics.inputMetrics.map(_.bytesRead))
+ }
+
+ private def runAndReturnRecordsRead(job: => Unit): Long = {
+ runAndReturnMetrics(job, _.taskMetrics.inputMetrics.map(_.recordsRead))
+ }
+
+ private def runAndReturnRecordsWritten(job: => Unit): Long = {
+ runAndReturnMetrics(job, _.taskMetrics.outputMetrics.map(_.recordsWritten))
+ }
+
+ private def runAndReturnShuffleRecordsRead(job: => Unit): Long = {
+ runAndReturnMetrics(job, _.taskMetrics.shuffleReadMetrics.map(_.recordsRead))
+ }
+
+ private def runAndReturnShuffleRecordsWritten(job: => Unit): Long = {
+ runAndReturnMetrics(job, _.taskMetrics.shuffleWriteMetrics.map(_.shuffleRecordsWritten))
+ }
+
+ private def runAndReturnMetrics(job: => Unit,
+ collector: (SparkListenerTaskEnd) => Option[Long]): Long = {
+ val taskMetrics = new ArrayBuffer[Long]()
sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
- taskBytesRead += taskEnd.taskMetrics.inputMetrics.get.bytesRead
+ collector(taskEnd).foreach(taskMetrics += _)
}
})
job
sc.listenerBus.waitUntilEmpty(500)
- taskBytesRead.sum
+ taskMetrics.sum
+ }
+
+ test("output metrics on records written") {
+ // Only supported on newer Hadoop
+ if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) {
+ val file = new File(tmpDir, getClass.getSimpleName)
+ val filePath = "file://" + file.getAbsolutePath
+
+ val records = runAndReturnRecordsWritten {
+ sc.parallelize(1 to numRecords).saveAsTextFile(filePath)
+ }
+ assert(records == numRecords)
+ }
+ }
+
+ test("output metrics on records written - new Hadoop API") {
+ // Only supported on newer Hadoop
+ if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) {
+ val file = new File(tmpDir, getClass.getSimpleName)
+ val filePath = "file://" + file.getAbsolutePath
+
+ val records = runAndReturnRecordsWritten {
+ sc.parallelize(1 to numRecords).map(key => (key.toString, key.toString))
+ .saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](filePath)
+ }
+ assert(records == numRecords)
+ }
}
test("output metrics when writing text file") {
@@ -318,4 +462,4 @@ class NewCombineTextRecordReaderWrapper(
override def getCurrentValue(): Text = delegate.getCurrentValue
override def getProgress(): Float = delegate.getProgress
override def close(): Unit = delegate.close()
-}
\ No newline at end of file
+}
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
index 855f1b6276089..054a4c64897a9 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
@@ -29,9 +29,9 @@ class KryoSerializerDistributedSuite extends FunSuite {
test("kryo objects are serialised consistently in different processes") {
val conf = new SparkConf(false)
- conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- conf.set("spark.kryo.registrator", classOf[AppJarRegistrator].getName)
- conf.set("spark.task.maxFailures", "1")
+ .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .set("spark.kryo.registrator", classOf[AppJarRegistrator].getName)
+ .set("spark.task.maxFailures", "1")
val jar = TestUtils.createJarWithClasses(List(AppJarRegistrator.customClassName))
conf.setJars(List(jar.getPath))
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala
index bbc7e1357b90d..c21c92b63ad13 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala
@@ -31,6 +31,8 @@ class BlockObjectWriterSuite extends FunSuite {
new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics)
writer.write(Long.box(20))
+ // Record metrics update on every write
+ assert(writeMetrics.shuffleRecordsWritten === 1)
// Metrics don't update on every write
assert(writeMetrics.shuffleBytesWritten == 0)
// After 32 writes, metrics should update
@@ -39,6 +41,7 @@ class BlockObjectWriterSuite extends FunSuite {
writer.write(Long.box(i))
}
assert(writeMetrics.shuffleBytesWritten > 0)
+ assert(writeMetrics.shuffleRecordsWritten === 33)
writer.commitAndClose()
assert(file.length() == writeMetrics.shuffleBytesWritten)
}
@@ -51,6 +54,8 @@ class BlockObjectWriterSuite extends FunSuite {
new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics)
writer.write(Long.box(20))
+ // Record metrics update on every write
+ assert(writeMetrics.shuffleRecordsWritten === 1)
// Metrics don't update on every write
assert(writeMetrics.shuffleBytesWritten == 0)
// After 32 writes, metrics should update
@@ -59,7 +64,23 @@ class BlockObjectWriterSuite extends FunSuite {
writer.write(Long.box(i))
}
assert(writeMetrics.shuffleBytesWritten > 0)
+ assert(writeMetrics.shuffleRecordsWritten === 33)
writer.revertPartialWritesAndClose()
assert(writeMetrics.shuffleBytesWritten == 0)
+ assert(writeMetrics.shuffleRecordsWritten == 0)
+ }
+
+ test("Reopening a closed block writer") {
+ val file = new File("somefile")
+ file.deleteOnExit()
+ val writeMetrics = new ShuffleWriteMetrics()
+ val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
+ new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics)
+
+ writer.open()
+ writer.close()
+ intercept[IllegalStateException] {
+ writer.open()
+ }
}
}
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 68074ae32a672..e8405baa8e3ea 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -234,7 +234,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
taskMetrics.incMemoryBytesSpilled(base + 6)
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
taskMetrics.setInputMetrics(Some(inputMetrics))
- inputMetrics.addBytesRead(base + 7)
+ inputMetrics.incBytesRead(base + 7)
val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
taskMetrics.outputMetrics = Some(outputMetrics)
outputMetrics.setBytesWritten(base + 8)
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 842f54529baf0..f3017dc42cd5c 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -189,6 +189,34 @@ class JsonProtocolSuite extends FunSuite {
assert(newMetrics.inputMetrics.isEmpty)
}
+ test("Input/Output records backwards compatibility") {
+ // records read were added after 1.2
+ val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6,
+ hasHadoopInput = true, hasOutput = true, hasRecords = false)
+ assert(metrics.inputMetrics.nonEmpty)
+ assert(metrics.outputMetrics.nonEmpty)
+ val newJson = JsonProtocol.taskMetricsToJson(metrics)
+ val oldJson = newJson.removeField { case (field, _) => field == "Records Read" }
+ .removeField { case (field, _) => field == "Records Written" }
+ val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
+ assert(newMetrics.inputMetrics.get.recordsRead == 0)
+ assert(newMetrics.outputMetrics.get.recordsWritten == 0)
+ }
+
+ test("Shuffle Read/Write records backwards compatibility") {
+ // records read were added after 1.2
+ val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6,
+ hasHadoopInput = false, hasOutput = false, hasRecords = false)
+ assert(metrics.shuffleReadMetrics.nonEmpty)
+ assert(metrics.shuffleWriteMetrics.nonEmpty)
+ val newJson = JsonProtocol.taskMetricsToJson(metrics)
+ val oldJson = newJson.removeField { case (field, _) => field == "Total Records Read" }
+ .removeField { case (field, _) => field == "Shuffle Records Written" }
+ val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
+ assert(newMetrics.shuffleReadMetrics.get.recordsRead == 0)
+ assert(newMetrics.shuffleWriteMetrics.get.shuffleRecordsWritten == 0)
+ }
+
test("OutputMetrics backward compatibility") {
// OutputMetrics were added after 1.1
val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = false, hasOutput = true)
@@ -644,7 +672,8 @@ class JsonProtocolSuite extends FunSuite {
e: Int,
f: Int,
hasHadoopInput: Boolean,
- hasOutput: Boolean) = {
+ hasOutput: Boolean,
+ hasRecords: Boolean = true) = {
val t = new TaskMetrics
t.setHostname("localhost")
t.setExecutorDeserializeTime(a)
@@ -656,7 +685,8 @@ class JsonProtocolSuite extends FunSuite {
if (hasHadoopInput) {
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
- inputMetrics.addBytesRead(d + e + f)
+ inputMetrics.incBytesRead(d + e + f)
+ inputMetrics.incRecordsRead(if (hasRecords) (d + e + f) / 100 else -1)
t.setInputMetrics(Some(inputMetrics))
} else {
val sr = new ShuffleReadMetrics
@@ -664,16 +694,19 @@ class JsonProtocolSuite extends FunSuite {
sr.incLocalBlocksFetched(e)
sr.incFetchWaitTime(a + d)
sr.incRemoteBlocksFetched(f)
+ sr.incRecordsRead(if (hasRecords) (b + d) / 100 else -1)
t.setShuffleReadMetrics(Some(sr))
}
if (hasOutput) {
val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
outputMetrics.setBytesWritten(a + b + c)
+ outputMetrics.setRecordsWritten(if (hasRecords) (a + b + c)/100 else -1)
t.outputMetrics = Some(outputMetrics)
} else {
val sw = new ShuffleWriteMetrics
sw.incShuffleBytesWritten(a + b + c)
sw.incShuffleWriteTime(b + c + d)
+ sw.setShuffleRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1)
t.shuffleWriteMetrics = Some(sw)
}
// Make at most 6 blocks
@@ -907,11 +940,13 @@ class JsonProtocolSuite extends FunSuite {
| "Remote Blocks Fetched": 800,
| "Local Blocks Fetched": 700,
| "Fetch Wait Time": 900,
- | "Remote Bytes Read": 1000
+ | "Remote Bytes Read": 1000,
+ | "Total Records Read" : 10
| },
| "Shuffle Write Metrics": {
| "Shuffle Bytes Written": 1200,
- | "Shuffle Write Time": 1500
+ | "Shuffle Write Time": 1500,
+ | "Shuffle Records Written": 12
| },
| "Updated Blocks": [
| {
@@ -988,11 +1023,13 @@ class JsonProtocolSuite extends FunSuite {
| "Disk Bytes Spilled": 0,
| "Shuffle Write Metrics": {
| "Shuffle Bytes Written": 1200,
- | "Shuffle Write Time": 1500
+ | "Shuffle Write Time": 1500,
+ | "Shuffle Records Written": 12
| },
| "Input Metrics": {
| "Data Read Method": "Hadoop",
- | "Bytes Read": 2100
+ | "Bytes Read": 2100,
+ | "Records Read": 21
| },
| "Updated Blocks": [
| {
@@ -1069,11 +1106,13 @@ class JsonProtocolSuite extends FunSuite {
| "Disk Bytes Spilled": 0,
| "Input Metrics": {
| "Data Read Method": "Hadoop",
- | "Bytes Read": 2100
+ | "Bytes Read": 2100,
+ | "Records Read": 21
| },
| "Output Metrics": {
| "Data Write Method": "Hadoop",
- | "Bytes Written": 1200
+ | "Bytes Written": 1200,
+ | "Records Written": 12
| },
| "Updated Blocks": [
| {
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorURLClassLoaderSuite.scala b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
similarity index 90%
rename from core/src/test/scala/org/apache/spark/executor/ExecutorURLClassLoaderSuite.scala
rename to core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
index b7912c09d1410..31e3b7e7bb71b 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorURLClassLoaderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.executor
+package org.apache.spark.util
import java.net.URLClassLoader
@@ -24,7 +24,7 @@ import org.scalatest.FunSuite
import org.apache.spark.{LocalSparkContext, SparkContext, SparkException, TestUtils}
import org.apache.spark.util.Utils
-class ExecutorURLClassLoaderSuite extends FunSuite {
+class MutableURLClassLoaderSuite extends FunSuite {
val urls2 = List(TestUtils.createJarWithClasses(
classNames = Seq("FakeClass1", "FakeClass2", "FakeClass3"),
@@ -37,7 +37,7 @@ class ExecutorURLClassLoaderSuite extends FunSuite {
test("child first") {
val parentLoader = new URLClassLoader(urls2, null)
- val classLoader = new ChildExecutorURLClassLoader(urls, parentLoader)
+ val classLoader = new ChildFirstURLClassLoader(urls, parentLoader)
val fakeClass = classLoader.loadClass("FakeClass2").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "1")
@@ -47,7 +47,7 @@ class ExecutorURLClassLoaderSuite extends FunSuite {
test("parent first") {
val parentLoader = new URLClassLoader(urls2, null)
- val classLoader = new ExecutorURLClassLoader(urls, parentLoader)
+ val classLoader = new MutableURLClassLoader(urls, parentLoader)
val fakeClass = classLoader.loadClass("FakeClass1").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "2")
@@ -57,7 +57,7 @@ class ExecutorURLClassLoaderSuite extends FunSuite {
test("child first can fall back") {
val parentLoader = new URLClassLoader(urls2, null)
- val classLoader = new ChildExecutorURLClassLoader(urls, parentLoader)
+ val classLoader = new ChildFirstURLClassLoader(urls, parentLoader)
val fakeClass = classLoader.loadClass("FakeClass3").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "2")
@@ -65,7 +65,7 @@ class ExecutorURLClassLoaderSuite extends FunSuite {
test("child first can fail") {
val parentLoader = new URLClassLoader(urls2, null)
- val classLoader = new ChildExecutorURLClassLoader(urls, parentLoader)
+ val classLoader = new ChildFirstURLClassLoader(urls, parentLoader)
intercept[java.lang.ClassNotFoundException] {
classLoader.loadClass("FakeClassDoesNotExist").newInstance()
}
diff --git a/data/mllib/sample_lda_data.txt b/data/mllib/sample_lda_data.txt
new file mode 100644
index 0000000000000..2e76702ca9d67
--- /dev/null
+++ b/data/mllib/sample_lda_data.txt
@@ -0,0 +1,12 @@
+1 2 6 0 2 3 1 1 0 0 3
+1 3 0 1 3 0 0 2 0 0 1
+1 4 1 0 0 4 9 0 1 2 0
+2 1 0 3 0 0 5 0 2 3 9
+3 1 1 9 3 0 2 0 0 1 3
+4 2 0 3 4 5 1 1 1 4 0
+2 1 0 3 0 0 5 0 2 2 9
+1 1 1 9 2 1 2 0 0 1 3
+4 4 0 3 4 2 1 3 0 0 0
+2 8 2 0 3 0 2 0 2 7 2
+1 1 1 9 0 2 2 0 0 3 3
+4 1 0 0 4 5 1 3 0 1 0
diff --git a/dev/run-tests b/dev/run-tests
index 2257a566bb1bb..483958757a2dd 100755
--- a/dev/run-tests
+++ b/dev/run-tests
@@ -36,7 +36,7 @@ function handle_error () {
}
-# Build against the right verison of Hadoop.
+# Build against the right version of Hadoop.
{
if [ -n "$AMPLAB_JENKINS_BUILD_PROFILE" ]; then
if [ "$AMPLAB_JENKINS_BUILD_PROFILE" = "hadoop1.0" ]; then
@@ -77,7 +77,7 @@ export SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Pkinesis-asl"
fi
}
-# Only run Hive tests if there are sql changes.
+# Only run Hive tests if there are SQL changes.
# Partial solution for SPARK-1455.
if [ -n "$AMPLAB_JENKINS" ]; then
git fetch origin master:master
@@ -183,7 +183,7 @@ CURRENT_BLOCK=$BLOCK_SPARK_UNIT_TESTS
if [ -n "$_SQL_TESTS_ONLY" ]; then
# This must be an array of individual arguments. Otherwise, having one long string
# will be interpreted as a single test, which doesn't work.
- SBT_MAVEN_TEST_ARGS=("catalyst/test" "sql/test" "hive/test" "mllib/test")
+ SBT_MAVEN_TEST_ARGS=("catalyst/test" "sql/test" "hive/test" "hive-thriftserver/test" "mllib/test")
else
SBT_MAVEN_TEST_ARGS=("test")
fi
diff --git a/docs/configuration.md b/docs/configuration.md
index 00e973c245005..eb0d6d33c97d9 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -230,6 +230,15 @@ Apart from these, the following properties are also available, and may be useful
Set a special library path to use when launching the driver JVM.
+
+ spark.driver.userClassPathFirst |
+ false |
+
+ (Experimental) Whether to give user-added jars precedence over Spark's own jars when loading
+ classes in the the driver. This feature can be used to mitigate conflicts between Spark's
+ dependencies and user dependencies. It is currently an experimental feature.
+ |
+
spark.executor.extraJavaOptions |
(none) |
@@ -297,13 +306,11 @@ Apart from these, the following properties are also available, and may be useful
- spark.files.userClassPathFirst |
+ spark.executor.userClassPathFirst |
false |
- (Experimental) Whether to give user-added jars precedence over Spark's own jars when
- loading classes in Executors. This feature can be used to mitigate conflicts between
- Spark's dependencies and user dependencies. It is currently an experimental feature.
- (Currently, this setting does not work for YARN, see SPARK-2996 for more details).
+ (Experimental) Same functionality as spark.driver.userClassPathFirst , but
+ applied to executor instances.
|
@@ -865,8 +872,8 @@ Apart from these, the following properties are also available, and may be useful
spark.network.timeout |
120 |
- Default timeout for all network interactions, in seconds. This config will be used in
- place of spark.core.connection.ack.wait.timeout , spark.akka.timeout ,
+ Default timeout for all network interactions, in seconds. This config will be used in
+ place of spark.core.connection.ack.wait.timeout , spark.akka.timeout ,
spark.storage.blockManagerSlaveTimeoutMs or
spark.shuffle.io.connectionTimeout , if they are not configured.
|
@@ -911,8 +918,8 @@ Apart from these, the following properties are also available, and may be useful
spark.shuffle.io.preferDirectBufs |
true |
- (Netty only) Off-heap buffers are used to reduce garbage collection during shuffle and cache
- block transfer. For environments where off-heap memory is tightly limited, users may wish to
+ (Netty only) Off-heap buffers are used to reduce garbage collection during shuffle and cache
+ block transfer. For environments where off-heap memory is tightly limited, users may wish to
turn this off to force all allocations from Netty to be on-heap.
|
@@ -920,7 +927,7 @@ Apart from these, the following properties are also available, and may be useful
spark.shuffle.io.numConnectionsPerPeer |
1 |
- (Netty only) Connections between hosts are reused in order to reduce connection buildup for
+ (Netty only) Connections between hosts are reused in order to reduce connection buildup for
large clusters. For clusters with many hard disks and few hosts, this may result in insufficient
concurrency to saturate all disks, and so users may consider increasing this value.
|
@@ -930,7 +937,7 @@ Apart from these, the following properties are also available, and may be useful
3 |
(Netty only) Fetches that fail due to IO-related exceptions are automatically retried if this is
- set to a non-zero value. This retry logic helps stabilize large shuffles in the face of long GC
+ set to a non-zero value. This retry logic helps stabilize large shuffles in the face of long GC
pauses or transient network connectivity issues.
|
@@ -939,7 +946,7 @@ Apart from these, the following properties are also available, and may be useful
5 |
(Netty only) Seconds to wait between retries of fetches. The maximum delay caused by retrying
- is simply maxRetries * retryWait , by default 15 seconds.
+ is simply maxRetries * retryWait , by default 15 seconds.
|