Skip to content

Commit

Permalink
Merge branch 'master' into issues/SPARK-18937
Browse files Browse the repository at this point in the history
  • Loading branch information
ueshin committed Feb 13, 2017
2 parents ffc4912 + bc0a0e6 commit a455f4f
Show file tree
Hide file tree
Showing 88 changed files with 2,278 additions and 2,689 deletions.
3 changes: 2 additions & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ setMethod("coltypes",
type <- PRIMITIVE_TYPES[[specialtype]]
}
}
type
type[[1]]
})

# Find which types don't have mapping to R
Expand Down Expand Up @@ -1136,6 +1136,7 @@ setMethod("collect",
if (!is.null(PRIMITIVE_TYPES[[colType]]) && colType != "binary") {
vec <- do.call(c, col)
stopifnot(class(vec) != "list")
class(vec) <- PRIMITIVE_TYPES[[colType]]
df[[colIndex]] <- vec
} else {
df[[colIndex]] <- col
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/types.R
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ PRIMITIVE_TYPES <- as.environment(list(
"string" = "character",
"binary" = "raw",
"boolean" = "logical",
"timestamp" = "POSIXct",
"timestamp" = c("POSIXct", "POSIXt"),
"date" = "Date",
# following types are not SQL types returned by dtypes(). They are listed here for usage
# by checkType() in schema.R.
Expand Down
42 changes: 40 additions & 2 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1306,9 +1306,9 @@ test_that("column functions", {

# Test first(), last()
df <- read.json(jsonPath)
expect_equal(collect(select(df, first(df$age)))[[1]], NA)
expect_equal(collect(select(df, first(df$age)))[[1]], NA_real_)
expect_equal(collect(select(df, first(df$age, TRUE)))[[1]], 30)
expect_equal(collect(select(df, first("age")))[[1]], NA)
expect_equal(collect(select(df, first("age")))[[1]], NA_real_)
expect_equal(collect(select(df, first("age", TRUE)))[[1]], 30)
expect_equal(collect(select(df, last(df$age)))[[1]], 19)
expect_equal(collect(select(df, last(df$age, TRUE)))[[1]], 19)
Expand Down Expand Up @@ -2777,6 +2777,44 @@ test_that("Call DataFrameWriter.load() API in Java without path and check argume
"Unnamed arguments ignored: 2, 3, a.")
})

test_that("Collect on DataFrame when NAs exists at the top of a timestamp column", {
ldf <- data.frame(col1 = c(0, 1, 2),
col2 = c(as.POSIXct("2017-01-01 00:00:01"),
NA,
as.POSIXct("2017-01-01 12:00:01")),
col3 = c(as.POSIXlt("2016-01-01 00:59:59"),
NA,
as.POSIXlt("2016-01-01 12:01:01")))
sdf1 <- createDataFrame(ldf)
ldf1 <- collect(sdf1)
expect_equal(dtypes(sdf1), list(c("col1", "double"),
c("col2", "timestamp"),
c("col3", "timestamp")))
expect_equal(class(ldf1$col1), "numeric")
expect_equal(class(ldf1$col2), c("POSIXct", "POSIXt"))
expect_equal(class(ldf1$col3), c("POSIXct", "POSIXt"))

# Columns with NAs at the top
sdf2 <- filter(sdf1, "col1 > 1")
ldf2 <- collect(sdf2)
expect_equal(dtypes(sdf2), list(c("col1", "double"),
c("col2", "timestamp"),
c("col3", "timestamp")))
expect_equal(class(ldf2$col1), "numeric")
expect_equal(class(ldf2$col2), c("POSIXct", "POSIXt"))
expect_equal(class(ldf2$col3), c("POSIXct", "POSIXt"))

# Columns with only NAs, the type will also be cast to PRIMITIVE_TYPE
sdf3 <- filter(sdf1, "col1 == 0")
ldf3 <- collect(sdf3)
expect_equal(dtypes(sdf3), list(c("col1", "double"),
c("col2", "timestamp"),
c("col3", "timestamp")))
expect_equal(class(ldf3$col1), "numeric")
expect_equal(class(ldf3$col2), c("POSIXct", "POSIXt"))
expect_equal(class(ldf3$col3), c("POSIXct", "POSIXt"))
})

unlink(parquetPath)
unlink(orcPath)
unlink(jsonPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,30 @@ private[spark] trait ExecutorAllocationClient {

/**
* Request that the cluster manager kill the specified executors.
*
* When asking the executor to be replaced, the executor loss is considered a failure, and
* killed tasks that are running on the executor will count towards the failure limits. If no
* replacement is being requested, then the tasks will not count towards the limit.
*
* @param executorIds identifiers of executors to kill
* @param replace whether to replace the killed executors with new ones, default false
* @param force whether to force kill busy executors, default false
* @return the ids of the executors acknowledged by the cluster manager to be removed.
*/
def killExecutors(executorIds: Seq[String]): Seq[String]
def killExecutors(
executorIds: Seq[String],
replace: Boolean = false,
force: Boolean = false): Seq[String]

/**
* Request that the cluster manager kill every executor on the specified host.
* Results in a call to killExecutors for each executor on the host, with the replace
* and force arguments set to true.
* @return whether the request is acknowledged by the cluster manager.
*/
def killExecutorsOnHost(host: String): Boolean

/**
* Request that the cluster manager kill the specified executor.
* @return whether the request is acknowledged by the cluster manager.
*/
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/SSLOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import org.apache.spark.internal.Logging
*
* @param enabled enables or disables SSL; if it is set to false, the rest of the
* settings are disregarded
* @param port the port where to bind the SSL server; if not defined, it will be
* based on the non-SSL port for the same service.
* @param keyStore a path to the key-store file
* @param keyStorePassword a password to access the key-store file
* @param keyPassword a password to access the private key in the key-store
Expand All @@ -47,6 +49,7 @@ import org.apache.spark.internal.Logging
*/
private[spark] case class SSLOptions(
enabled: Boolean = false,
port: Option[Int] = None,
keyStore: Option[File] = None,
keyStorePassword: Option[String] = None,
keyPassword: Option[String] = None,
Expand Down Expand Up @@ -164,6 +167,11 @@ private[spark] object SSLOptions extends Logging {
def parse(conf: SparkConf, ns: String, defaults: Option[SSLOptions] = None): SSLOptions = {
val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = defaults.exists(_.enabled))

val port = conf.getOption(s"$ns.port").map(_.toInt)
port.foreach { p =>
require(p >= 0, "Port number must be a non-negative value.")
}

val keyStore = conf.getOption(s"$ns.keyStore").map(new File(_))
.orElse(defaults.flatMap(_.keyStore))

Expand Down Expand Up @@ -198,6 +206,7 @@ private[spark] object SSLOptions extends Logging {

new SSLOptions(
enabled,
port,
keyStore,
keyStorePassword,
keyPassword,
Expand Down
55 changes: 43 additions & 12 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2207,10 +2207,32 @@ class SparkContext(config: SparkConf) extends Logging {
* Cancel a given job if it's scheduled or running.
*
* @param jobId the job ID to cancel
* @param reason optional reason for cancellation
* @note Throws `InterruptedException` if the cancel message cannot be sent
*/
def cancelJob(jobId: Int) {
dagScheduler.cancelJob(jobId)
def cancelJob(jobId: Int, reason: String): Unit = {
dagScheduler.cancelJob(jobId, Option(reason))
}

/**
* Cancel a given job if it's scheduled or running.
*
* @param jobId the job ID to cancel
* @note Throws `InterruptedException` if the cancel message cannot be sent
*/
def cancelJob(jobId: Int): Unit = {
dagScheduler.cancelJob(jobId, None)
}

/**
* Cancel a given stage and all jobs associated with it.
*
* @param stageId the stage ID to cancel
* @param reason reason for cancellation
* @note Throws `InterruptedException` if the cancel message cannot be sent
*/
def cancelStage(stageId: Int, reason: String): Unit = {
dagScheduler.cancelStage(stageId, Option(reason))
}

/**
Expand All @@ -2219,8 +2241,8 @@ class SparkContext(config: SparkConf) extends Logging {
* @param stageId the stage ID to cancel
* @note Throws `InterruptedException` if the cancel message cannot be sent
*/
def cancelStage(stageId: Int) {
dagScheduler.cancelStage(stageId)
def cancelStage(stageId: Int): Unit = {
dagScheduler.cancelStage(stageId, None)
}

/**
Expand Down Expand Up @@ -2489,6 +2511,13 @@ object SparkContext extends Logging {
}
}

/** Return the current active [[SparkContext]] if any. */
private[spark] def getActive: Option[SparkContext] = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
Option(activeContext.get())
}
}

/**
* Called at the beginning of the SparkContext constructor to ensure that no SparkContext is
* running. Throws an exception if a running context is detected and logs a warning if another
Expand Down Expand Up @@ -2745,11 +2774,12 @@ private object SparkMasterRegex {
}

/**
* A class encapsulating how to convert some type T to Writable. It stores both the Writable class
* corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion.
* The getter for the writable class takes a ClassTag[T] in case this is a generic object
* that doesn't know the type of T when it is created. This sounds strange but is necessary to
* support converting subclasses of Writable to themselves (writableWritableConverter).
* A class encapsulating how to convert some type `T` from `Writable`. It stores both the `Writable`
* class corresponding to `T` (e.g. `IntWritable` for `Int`) and a function for doing the
* conversion.
* The getter for the writable class takes a `ClassTag[T]` in case this is a generic object
* that doesn't know the type of `T` when it is created. This sounds strange but is necessary to
* support converting subclasses of `Writable` to themselves (`writableWritableConverter()`).
*/
private[spark] class WritableConverter[T](
val writableClass: ClassTag[T] => Class[_ <: Writable],
Expand Down Expand Up @@ -2800,9 +2830,10 @@ object WritableConverter {
}

/**
* A class encapsulating how to convert some type T to Writable. It stores both the Writable class
* corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion.
* The Writable class will be used in `SequenceFileRDDFunctions`.
* A class encapsulating how to convert some type `T` to `Writable`. It stores both the `Writable`
* class corresponding to `T` (e.g. `IntWritable` for `Int`) and a function for doing the
* conversion.
* The `Writable` class will be used in `SequenceFileRDDFunctions`.
*/
private[spark] class WritableFactory[T](
val writableClass: ClassTag[T] => Class[_ <: Writable],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ private[spark] class TypedConfigBuilder[T](
new TypedConfigBuilder(parent, s => fn(converter(s)), stringConverter)
}

/** Checks if the user-provided value for the config matches the validator. */
def checkValue(validator: T => Boolean, errorMsg: String): TypedConfigBuilder[T] = {
transform { v =>
if (!validator(v)) throw new IllegalArgumentException(errorMsg)
v
}
}

/** Check that user-provided values for the config match a pre-defined set. */
def checkValues(validValues: Set[T]): TypedConfigBuilder[T] = {
transform { v =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ package object config {
.timeConf(TimeUnit.MILLISECONDS)
.createOptional

private[spark] val BLACKLIST_KILL_ENABLED =
ConfigBuilder("spark.blacklist.killBlacklistedExecutors")
.booleanConf
.createWithDefault(false)

private[spark] val BLACKLIST_LEGACY_TIMEOUT_CONF =
ConfigBuilder("spark.scheduler.executorTaskBlacklistTime")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicReference

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.util.{Clock, SystemClock, Utils}
Expand Down Expand Up @@ -50,10 +50,11 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
private[scheduler] class BlacklistTracker (
private val listenerBus: LiveListenerBus,
conf: SparkConf,
allocationClient: Option[ExecutorAllocationClient],
clock: Clock = new SystemClock()) extends Logging {

def this(sc: SparkContext) = {
this(sc.listenerBus, sc.conf)
def this(sc: SparkContext, allocationClient: Option[ExecutorAllocationClient]) = {
this(sc.listenerBus, sc.conf, allocationClient)
}

BlacklistTracker.validateBlacklistConfs(conf)
Expand Down Expand Up @@ -173,6 +174,17 @@ private[scheduler] class BlacklistTracker (
listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal))
executorIdToFailureList.remove(exec)
updateNextExpiryTime()
if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
allocationClient match {
case Some(allocationClient) =>
logInfo(s"Killing blacklisted executor id $exec " +
s"since spark.blacklist.killBlacklistedExecutors is set.")
allocationClient.killExecutors(Seq(exec), true, true)
case None =>
logWarning(s"Not attempting to kill blacklisted executor id $exec " +
s"since allocation client is not defined.")
}
}

// In addition to blacklisting the executor, we also update the data for failures on the
// node, and potentially put the entire node into a blacklist as well.
Expand All @@ -187,6 +199,19 @@ private[scheduler] class BlacklistTracker (
nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size))
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
allocationClient match {
case Some(allocationClient) =>
logInfo(s"Killing all executors on blacklisted host $node " +
s"since spark.blacklist.killBlacklistedExecutors is set.")
if (allocationClient.killExecutorsOnHost(node) == false) {
logError(s"Killing executors on node $node failed.")
}
case None =>
logWarning(s"Not attempting to kill executors on blacklisted host $node " +
s"since allocation client is not defined.")
}
}
}
}
}
Expand Down
Loading

0 comments on commit a455f4f

Please sign in to comment.