Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into analysisException
Browse files Browse the repository at this point in the history
Conflicts:
	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
  • Loading branch information
marmbrus committed Feb 10, 2015
2 parents f88079f + f98707c commit a773bba
Show file tree
Hide file tree
Showing 186 changed files with 9,843 additions and 4,045 deletions.
28 changes: 28 additions & 0 deletions build/sbt
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,32 @@ loadConfigFile() {
[[ -f "$etc_sbt_opts_file" ]] && set -- $(loadConfigFile "$etc_sbt_opts_file") "$@"
[[ -f "$sbt_opts_file" ]] && set -- $(loadConfigFile "$sbt_opts_file") "$@"

exit_status=127
saved_stty=""

restoreSttySettings() {
stty $saved_stty
saved_stty=""
}

onExit() {
if [[ "$saved_stty" != "" ]]; then
restoreSttySettings
fi
exit $exit_status
}

saveSttySettings() {
saved_stty=$(stty -g 2>/dev/null)
if [[ ! $? ]]; then
saved_stty=""
fi
}

saveSttySettings
trap onExit INT

run "$@"

exit_status=$?
onExit
2 changes: 1 addition & 1 deletion build/sbt-launch-lib.bash
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ execRunner () {
echo ""
}

exec "$@"
"$@"
}

addJava () {
Expand Down
8 changes: 8 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,14 @@
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-graphite</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.10</artifactId>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ span.expand-details {
float: right;
}

span.rest-uri {
font-size: 10pt;
font-style: italic;
color: gray;
}

pre {
font-size: 0.8em;
}
Expand Down
11 changes: 8 additions & 3 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,15 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
val inputMetrics = blockResult.inputMetrics
val existingMetrics = context.taskMetrics
.getInputMetricsForReadMethod(inputMetrics.readMethod)
existingMetrics.addBytesRead(inputMetrics.bytesRead)

new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
existingMetrics.incBytesRead(inputMetrics.bytesRead)

val iter = blockResult.data.asInstanceOf[Iterator[T]]
new InterruptibleIterator[T](context, iter) {
override def next(): T = {
existingMetrics.incRecordsRead(1)
delegate.next()
}
}
case None =>
// Acquire a lock for loading this partition
// If another thread already holds the lock, wait for it to finish return its results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,32 @@ package org.apache.spark

/**
* A client that communicates with the cluster manager to request or kill executors.
* This is currently supported only in YARN mode.
*/
private[spark] trait ExecutorAllocationClient {

/**
* Express a preference to the cluster manager for a given total number of executors.
* This can result in canceling pending requests or filing additional requests.
* @return whether the request is acknowledged by the cluster manager.
*/
private[spark] def requestTotalExecutors(numExecutors: Int): Boolean

/**
* Request an additional number of executors from the cluster manager.
* Return whether the request is acknowledged by the cluster manager.
* @return whether the request is acknowledged by the cluster manager.
*/
def requestExecutors(numAdditionalExecutors: Int): Boolean

/**
* Request that the cluster manager kill the specified executors.
* Return whether the request is acknowledged by the cluster manager.
* @return whether the request is acknowledged by the cluster manager.
*/
def killExecutors(executorIds: Seq[String]): Boolean

/**
* Request that the cluster manager kill the specified executor.
* Return whether the request is acknowledged by the cluster manager.
* @return whether the request is acknowledged by the cluster manager.
*/
def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId))
}
149 changes: 104 additions & 45 deletions core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -201,18 +201,34 @@ private[spark] class ExecutorAllocationManager(
}

/**
* If the add time has expired, request new executors and refresh the add time.
* If the remove time for an existing executor has expired, kill the executor.
* The number of executors we would have if the cluster manager were to fulfill all our existing
* requests.
*/
private def targetNumExecutors(): Int =
numExecutorsPending + executorIds.size - executorsPendingToRemove.size

/**
* The maximum number of executors we would need under the current load to satisfy all running
* and pending tasks, rounded up.
*/
private def maxNumExecutorsNeeded(): Int = {
val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
(numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
}

/**
* This is called at a fixed interval to regulate the number of pending executor requests
* and number of executors running.
*
* First, adjust our requested executors based on the add time and our current needs.
* Then, if the remove time for an existing executor has expired, kill the executor.
*
* This is factored out into its own method for testing.
*/
private def schedule(): Unit = synchronized {
val now = clock.getTimeMillis
if (addTime != NOT_SET && now >= addTime) {
addExecutors()
logDebug(s"Starting timer to add more executors (to " +
s"expire in $sustainedSchedulerBacklogTimeout seconds)")
addTime += sustainedSchedulerBacklogTimeout * 1000
}

addOrCancelExecutorRequests(now)

removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
Expand All @@ -223,59 +239,89 @@ private[spark] class ExecutorAllocationManager(
}
}

/**
* Check to see whether our existing allocation and the requests we've made previously exceed our
* current needs. If so, let the cluster manager know so that it can cancel pending requests that
* are unneeded.
*
* If not, and the add time has expired, see if we can request new executors and refresh the add
* time.
*
* @return the delta in the target number of executors.
*/
private def addOrCancelExecutorRequests(now: Long): Int = synchronized {
val currentTarget = targetNumExecutors
val maxNeeded = maxNumExecutorsNeeded

if (maxNeeded < currentTarget) {
// The target number exceeds the number we actually need, so stop adding new
// executors and inform the cluster manager to cancel the extra pending requests.
val newTotalExecutors = math.max(maxNeeded, minNumExecutors)
client.requestTotalExecutors(newTotalExecutors)
numExecutorsToAdd = 1
updateNumExecutorsPending(newTotalExecutors)
} else if (addTime != NOT_SET && now >= addTime) {
val delta = addExecutors(maxNeeded)
logDebug(s"Starting timer to add more executors (to " +
s"expire in $sustainedSchedulerBacklogTimeout seconds)")
addTime += sustainedSchedulerBacklogTimeout * 1000
delta
} else {
0
}
}

/**
* Request a number of executors from the cluster manager.
* If the cap on the number of executors is reached, give up and reset the
* number of executors to add next round instead of continuing to double it.
* Return the number actually requested.
*
* @param maxNumExecutorsNeeded the maximum number of executors all currently running or pending
* tasks could fill
* @return the number of additional executors actually requested.
*/
private def addExecutors(): Int = synchronized {
// Do not request more executors if we have already reached the upper bound
val numExistingExecutors = executorIds.size + numExecutorsPending
if (numExistingExecutors >= maxNumExecutors) {
private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
// Do not request more executors if it would put our target over the upper bound
val currentTarget = targetNumExecutors
if (currentTarget >= maxNumExecutors) {
logDebug(s"Not adding executors because there are already ${executorIds.size} " +
s"registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors)")
numExecutorsToAdd = 1
return 0
}

// The number of executors needed to satisfy all pending tasks is the number of tasks pending
// divided by the number of tasks each executor can fit, rounded up.
val maxNumExecutorsPending =
(listener.totalPendingTasks() + tasksPerExecutor - 1) / tasksPerExecutor
if (numExecutorsPending >= maxNumExecutorsPending) {
logDebug(s"Not adding executors because there are already $numExecutorsPending " +
s"pending and pending tasks could only fill $maxNumExecutorsPending")
numExecutorsToAdd = 1
return 0
}

// It's never useful to request more executors than could satisfy all the pending tasks, so
// cap request at that amount.
// Also cap request with respect to the configured upper bound.
val maxNumExecutorsToAdd = math.min(
maxNumExecutorsPending - numExecutorsPending,
maxNumExecutors - numExistingExecutors)
assert(maxNumExecutorsToAdd > 0)

val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd)

val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
val addRequestAcknowledged = testing || client.requestExecutors(actualNumExecutorsToAdd)
val actualMaxNumExecutors = math.min(maxNumExecutors, maxNumExecutorsNeeded)
val newTotalExecutors = math.min(currentTarget + numExecutorsToAdd, actualMaxNumExecutors)
val addRequestAcknowledged = testing || client.requestTotalExecutors(newTotalExecutors)
if (addRequestAcknowledged) {
logInfo(s"Requesting $actualNumExecutorsToAdd new executor(s) because " +
s"tasks are backlogged (new desired total will be $newTotalExecutors)")
numExecutorsToAdd =
if (actualNumExecutorsToAdd == numExecutorsToAdd) numExecutorsToAdd * 2 else 1
numExecutorsPending += actualNumExecutorsToAdd
actualNumExecutorsToAdd
val delta = updateNumExecutorsPending(newTotalExecutors)
logInfo(s"Requesting $delta new executor(s) because tasks are backlogged" +
s" (new desired total will be $newTotalExecutors)")
numExecutorsToAdd = if (delta == numExecutorsToAdd) {
numExecutorsToAdd * 2
} else {
1
}
delta
} else {
logWarning(s"Unable to reach the cluster manager " +
s"to request $actualNumExecutorsToAdd executors!")
logWarning(
s"Unable to reach the cluster manager to request $newTotalExecutors total executors!")
0
}
}

/**
* Given the new target number of executors, update the number of pending executor requests,
* and return the delta from the old number of pending requests.
*/
private def updateNumExecutorsPending(newTotalExecutors: Int): Int = {
val newNumExecutorsPending =
newTotalExecutors - executorIds.size + executorsPendingToRemove.size
val delta = newNumExecutorsPending - numExecutorsPending
numExecutorsPending = newNumExecutorsPending
delta
}

/**
* Request the cluster manager to remove the given executor.
* Return whether the request is received.
Expand Down Expand Up @@ -415,6 +461,8 @@ private[spark] class ExecutorAllocationManager(
private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]
// Number of tasks currently running on the cluster. Should be 0 when no stages are active.
private var numRunningTasks: Int = _

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
val stageId = stageSubmitted.stageInfo.stageId
Expand All @@ -435,6 +483,10 @@ private[spark] class ExecutorAllocationManager(
// This is needed in case the stage is aborted for any reason
if (stageIdToNumTasks.isEmpty) {
allocationManager.onSchedulerQueueEmpty()
if (numRunningTasks != 0) {
logWarning("No stages are running, but numRunningTasks != 0")
numRunningTasks = 0
}
}
}
}
Expand All @@ -446,6 +498,7 @@ private[spark] class ExecutorAllocationManager(
val executorId = taskStart.taskInfo.executorId

allocationManager.synchronized {
numRunningTasks += 1
// This guards against the race condition in which the `SparkListenerTaskStart`
// event is posted before the `SparkListenerBlockManagerAdded` event, which is
// possible because these events are posted in different threads. (see SPARK-4951)
Expand Down Expand Up @@ -475,7 +528,8 @@ private[spark] class ExecutorAllocationManager(
val executorId = taskEnd.taskInfo.executorId
val taskId = taskEnd.taskInfo.taskId
allocationManager.synchronized {
// If the executor is no longer running scheduled any tasks, mark it as idle
numRunningTasks -= 1
// If the executor is no longer running any scheduled tasks, mark it as idle
if (executorIdToTaskIds.contains(executorId)) {
executorIdToTaskIds(executorId) -= taskId
if (executorIdToTaskIds(executorId).isEmpty) {
Expand Down Expand Up @@ -514,6 +568,11 @@ private[spark] class ExecutorAllocationManager(
}.sum
}

/**
* The number of tasks currently running across all stages.
*/
def totalRunningTasks(): Int = numRunningTasks

/**
* Return true if an executor is not currently running a task, and false otherwise.
*
Expand Down
Loading

0 comments on commit a773bba

Please sign in to comment.