Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into countDistinctPartial
Browse files Browse the repository at this point in the history
  • Loading branch information
marmbrus committed Aug 18, 2014
2 parents 57ae3b1 + 66ade00 commit 27984d0
Show file tree
Hide file tree
Showing 50 changed files with 946 additions and 314 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ private[spark] class Master(
appIdToUI(app.id) = ui
webUi.attachSparkUI(ui)
// Application UI is successfully rebuilt, so link the Master UI to it
app.desc.appUiUrl = ui.basePath
app.desc.appUiUrl = ui.getBasePath
true
} catch {
case e: Exception =>
Expand Down
41 changes: 14 additions & 27 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ private[spark] class Worker(
val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)

val testing: Boolean = sys.props.contains("spark.testing")
val masterLock: Object = new Object()
var master: ActorSelection = null
var masterAddress: Address = null
var activeMasterUrl: String = ""
Expand Down Expand Up @@ -145,18 +144,16 @@ private[spark] class Worker(
}

def changeMaster(url: String, uiUrl: String) {
masterLock.synchronized {
activeMasterUrl = url
activeMasterWebUiUrl = uiUrl
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
masterAddress = activeMasterUrl match {
case Master.sparkUrlRegex(_host, _port) =>
Address("akka.tcp", Master.systemName, _host, _port.toInt)
case x =>
throw new SparkException("Invalid spark URL: " + x)
}
connected = true
activeMasterUrl = url
activeMasterWebUiUrl = uiUrl
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
masterAddress = activeMasterUrl match {
case Master.sparkUrlRegex(_host, _port) =>
Address("akka.tcp", Master.systemName, _host, _port.toInt)
case x =>
throw new SparkException("Invalid spark URL: " + x)
}
connected = true
}

def tryRegisterAllMasters() {
Expand Down Expand Up @@ -199,9 +196,7 @@ private[spark] class Worker(
}

case SendHeartbeat =>
masterLock.synchronized {
if (connected) { master ! Heartbeat(workerId) }
}
if (connected) { master ! Heartbeat(workerId) }

case WorkDirCleanup =>
// Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
Expand Down Expand Up @@ -244,27 +239,21 @@ private[spark] class Worker(
manager.start()
coresUsed += cores_
memoryUsed += memory_
masterLock.synchronized {
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
}
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
} catch {
case e: Exception => {
logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
if (executors.contains(appId + "/" + execId)) {
executors(appId + "/" + execId).kill()
executors -= appId + "/" + execId
}
masterLock.synchronized {
master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
}
master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
}
}
}

case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
masterLock.synchronized {
master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
}
master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
val fullId = appId + "/" + execId
if (ExecutorState.isFinished(state)) {
executors.get(fullId) match {
Expand Down Expand Up @@ -330,9 +319,7 @@ private[spark] class Worker(
case _ =>
logDebug(s"Driver $driverId changed state to $state")
}
masterLock.synchronized {
master ! DriverStateChanged(driverId, state, exception)
}
master ! DriverStateChanged(driverId, state, exception)
val driver = drivers.remove(driverId).get
finishedDrivers(driverId) = driver
memoryUsed -= driver.driverDesc.mem
Expand Down
15 changes: 11 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,12 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
* If the elements in RDD do not vary (max == min) always returns a single bucket.
*/
def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = {
// Compute the minimum and the maxium
// Scala's built-in range has issues. See #SI-8782
def customRange(min: Double, max: Double, steps: Int): IndexedSeq[Double] = {
val span = max - min
Range.Int(0, steps, 1).map(s => min + (s * span) / steps) :+ max
}
// Compute the minimum and the maximum
val (max: Double, min: Double) = self.mapPartitions { items =>
Iterator(items.foldRight(Double.NegativeInfinity,
Double.PositiveInfinity)((e: Double, x: Pair[Double, Double]) =>
Expand All @@ -107,9 +112,11 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
throw new UnsupportedOperationException(
"Histogram on either an empty RDD or RDD containing +/-infinity or NaN")
}
val increment = (max-min)/bucketCount.toDouble
val range = if (increment != 0) {
Range.Double.inclusive(min, max, increment)
val range = if (min != max) {
// Range.Double.inclusive(min, max, increment)
// The above code doesn't always work. See Scala bug #SI-8782.
// https://issues.scala-lang.org/browse/SI-8782
customRange(min, max, bucketCount)
} else {
List(min, min)
}
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ private[spark] class SparkUI(
}
}

def getAppName = appName

/** Set the app name for this UI. */
def setAppName(name: String) {
appName = name
Expand All @@ -100,6 +102,13 @@ private[spark] class SparkUI(
private[spark] def appUIAddress = s"http://$appUIHostPort"
}

private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String)
extends WebUITab(parent, prefix) {

def appName: String = parent.getAppName

}

private[spark] object SparkUI {
val DEFAULT_PORT = 4040
val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"
Expand Down
12 changes: 5 additions & 7 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -163,17 +163,15 @@ private[spark] object UIUtils extends Logging {

/** Returns a spark page with correctly formatted headers */
def headerSparkPage(
content: => Seq[Node],
basePath: String,
appName: String,
title: String,
tabs: Seq[WebUITab],
activeTab: WebUITab,
content: => Seq[Node],
activeTab: SparkUITab,
refreshInterval: Option[Int] = None): Seq[Node] = {

val header = tabs.map { tab =>
val appName = activeTab.appName
val header = activeTab.headerTabs.map { tab =>
<li class={if (tab == activeTab) "active" else ""}>
<a href={prependBaseUri(basePath, "/" + tab.prefix)}>{tab.name}</a>
<a href={prependBaseUri(activeTab.basePath, "/" + tab.prefix)}>{tab.name}</a>
</li>
}

Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/WebUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ private[spark] abstract class WebUI(
protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
private val className = Utils.getFormattedClassName(this)

def getBasePath: String = basePath
def getTabs: Seq[WebUITab] = tabs.toSeq
def getHandlers: Seq[ServletContextHandler] = handlers.toSeq
def getSecurityManager: SecurityManager = securityManager
Expand Down Expand Up @@ -135,6 +136,8 @@ private[spark] abstract class WebUITab(parent: WebUI, val prefix: String) {

/** Get a list of header tabs from the parent UI. */
def headerTabs: Seq[WebUITab] = parent.getTabs

def basePath: String = parent.getBasePath
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import scala.xml.Node
import org.apache.spark.ui.{UIUtils, WebUIPage}

private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") {
private val appName = parent.appName
private val basePath = parent.basePath
private val listener = parent.listener

def render(request: HttpServletRequest): Seq[Node] = {
Expand All @@ -45,7 +43,7 @@ private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("")
<h4>Classpath Entries</h4> {classpathEntriesTable}
</span>

UIUtils.headerSparkPage(content, basePath, appName, "Environment", parent.headerTabs, parent)
UIUtils.headerSparkPage("Environment", content, parent)
}

private def propertyHeader = Seq("Name", "Value")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.ui._

private[ui] class EnvironmentTab(parent: SparkUI) extends WebUITab(parent, "environment") {
val appName = parent.appName
val basePath = parent.basePath
private[ui] class EnvironmentTab(parent: SparkUI) extends SparkUITab(parent, "environment") {
val listener = new EnvironmentListener

attachPage(new EnvironmentPage(this))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ private case class ExecutorSummaryInfo(
maxMemory: Long)

private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
private val appName = parent.appName
private val basePath = parent.basePath
private val listener = parent.listener

def render(request: HttpServletRequest): Seq[Node] = {
Expand Down Expand Up @@ -101,8 +99,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
</div>
</div>;

UIUtils.headerSparkPage(content, basePath, appName, "Executors (" + execInfo.size + ")",
parent.headerTabs, parent)
UIUtils.headerSparkPage("Executors (" + execInfo.size + ")", content, parent)
}

/** Render an HTML row representing an executor */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@ import org.apache.spark.ExceptionFailure
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.storage.StorageStatusListener
import org.apache.spark.ui.{SparkUI, WebUITab}
import org.apache.spark.ui.{SparkUI, SparkUITab}

private[ui] class ExecutorsTab(parent: SparkUI) extends WebUITab(parent, "executors") {
val appName = parent.appName
val basePath = parent.basePath
private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") {
val listener = new ExecutorsListener(parent.storageStatusListener)

attachPage(new ExecutorsPage(this))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
stageData.shuffleReadBytes += shuffleReadDelta
execSummary.shuffleRead += shuffleReadDelta

val inputBytesDelta =
(taskMetrics.inputMetrics.map(_.bytesRead).getOrElse(0L)
- oldMetrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L))
stageData.inputBytes += inputBytesDelta
execSummary.inputBytes += inputBytesDelta

val diskSpillDelta =
taskMetrics.diskBytesSpilled - oldMetrics.map(_.diskBytesSpilled).getOrElse(0L)
stageData.diskBytesSpilled += diskSpillDelta
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import org.apache.spark.ui.{WebUIPage, UIUtils}

/** Page showing list of all ongoing and recently finished stages and pools */
private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("") {
private val appName = parent.appName
private val basePath = parent.basePath
private val live = parent.live
private val sc = parent.sc
private val listener = parent.listener
Expand Down Expand Up @@ -94,7 +92,7 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("")
<h4 id ="failed">Failed Stages ({failedStages.size})</h4> ++
failedStagesTable.toNodeSeq

UIUtils.headerSparkPage(content, basePath, appName, "Spark Stages", parent.headerTabs, parent)
UIUtils.headerSparkPage("Spark Stages", content, parent)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ import javax.servlet.http.HttpServletRequest

import org.apache.spark.SparkConf
import org.apache.spark.scheduler.SchedulingMode
import org.apache.spark.ui.{SparkUI, WebUITab}
import org.apache.spark.ui.{SparkUI, SparkUITab}

/** Web UI showing progress status of all jobs in the given SparkContext. */
private[ui] class JobProgressTab(parent: SparkUI) extends WebUITab(parent, "stages") {
val appName = parent.appName
val basePath = parent.basePath
private[ui] class JobProgressTab(parent: SparkUI) extends SparkUITab(parent, "stages") {
val live = parent.live
val sc = parent.sc
val conf = if (live) sc.conf else new SparkConf
Expand All @@ -53,4 +51,5 @@ private[ui] class JobProgressTab(parent: SparkUI) extends WebUITab(parent, "stag
Thread.sleep(100)
}
}

}
5 changes: 1 addition & 4 deletions core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import org.apache.spark.ui.{WebUIPage, UIUtils}

/** Page showing specific pool details */
private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") {
private val appName = parent.appName
private val basePath = parent.basePath
private val live = parent.live
private val sc = parent.sc
private val listener = parent.listener
Expand All @@ -51,8 +49,7 @@ private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") {
<h4>Summary </h4> ++ poolTable.toNodeSeq ++
<h4>{activeStages.size} Active Stages</h4> ++ activeStagesTable.toNodeSeq

UIUtils.headerSparkPage(content, basePath, appName, "Fair Scheduler Pool: " + poolName,
parent.headerTabs, parent)
UIUtils.headerSparkPage("Fair Scheduler Pool: " + poolName, content, parent)
}
}
}
7 changes: 3 additions & 4 deletions core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.apache.spark.ui.UIUtils

/** Table showing list of pools */
private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressTab) {
private val basePath = parent.basePath
private val listener = parent.listener

def toNodeSeq: Seq[Node] = {
Expand Down Expand Up @@ -59,11 +58,11 @@ private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressTab) {
case Some(stages) => stages.size
case None => 0
}
val href = "%s/stages/pool?poolname=%s"
.format(UIUtils.prependBaseUri(parent.basePath), p.name)
<tr>
<td>
<a href={"%s/stages/pool?poolname=%s".format(UIUtils.prependBaseUri(basePath), p.name)}>
{p.name}
</a>
<a href={href}>{p.name}</a>
</td>
<td>{p.minShare}</td>
<td>{p.weight}</td>
Expand Down
8 changes: 2 additions & 6 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import org.apache.spark.scheduler.AccumulableInfo

/** Page showing statistics and task list for a given stage */
private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
private val appName = parent.appName
private val basePath = parent.basePath
private val listener = parent.listener

def render(request: HttpServletRequest): Seq[Node] = {
Expand All @@ -44,8 +42,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
<h4>Summary Metrics</h4> No tasks have started yet
<h4>Tasks</h4> No tasks have started yet
</div>
return UIUtils.headerSparkPage(content, basePath, appName,
"Details for Stage %s".format(stageId), parent.headerTabs, parent)
return UIUtils.headerSparkPage("Details for Stage %s".format(stageId), content, parent)
}

val stageData = stageDataOption.get
Expand Down Expand Up @@ -227,8 +224,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
maybeAccumulableTable ++
<h4>Tasks</h4> ++ taskTable

UIUtils.headerSparkPage(content, basePath, appName, "Details for Stage %d".format(stageId),
parent.headerTabs, parent)
UIUtils.headerSparkPage("Details for Stage %d".format(stageId), content, parent)
}
}

Expand Down
Loading

0 comments on commit 27984d0

Please sign in to comment.