Skip to content

Commit

Permalink
Merge pull request #4 from apache/master
Browse files Browse the repository at this point in the history
merge upstream changes
  • Loading branch information
nchammas committed Aug 5, 2014
2 parents 0e0245f + 41e0a21 commit 1db5314
Show file tree
Hide file tree
Showing 43 changed files with 1,492 additions and 260 deletions.
19 changes: 15 additions & 4 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,21 @@ import org.apache.spark.serializer.JavaSerializer
*
* @param initialValue initial value of accumulator
* @param param helper object defining how to add elements of type `R` and `T`
* @param name human-readable name for use in Spark's web UI
* @tparam R the full accumulated data (result type)
* @tparam T partial data that can be added in
*/
class Accumulable[R, T] (
@transient initialValue: R,
param: AccumulableParam[R, T])
param: AccumulableParam[R, T],
val name: Option[String])
extends Serializable {

val id = Accumulators.newId
def this(@transient initialValue: R, param: AccumulableParam[R, T]) =
this(initialValue, param, None)

val id: Long = Accumulators.newId

@transient private var value_ = initialValue // Current value on master
val zero = param.zero(initialValue) // Zero value to be passed to workers
private var deserialized = false
Expand Down Expand Up @@ -219,8 +225,10 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
* @param param helper object defining how to add elements of type `T`
* @tparam T result type
*/
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T])
extends Accumulable[T,T](initialValue, param)
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String])
extends Accumulable[T,T](initialValue, param, name) {
def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None)
}

/**
* A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add
Expand Down Expand Up @@ -281,4 +289,7 @@ private object Accumulators {
}
}
}

def stringifyPartialValue(partialValue: Any) = "%s".format(partialValue)
def stringifyValue(value: Any) = "%s".format(value)
}
107 changes: 90 additions & 17 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,19 @@ import org.apache.spark.deploy.SparkHadoopUtil
* secure the UI if it has data that other users should not be allowed to see. The javax
* servlet filter specified by the user can authenticate the user and then once the user
* is logged in, Spark can compare that user versus the view acls to make sure they are
* authorized to view the UI. The configs 'spark.ui.acls.enable' and 'spark.ui.view.acls'
* authorized to view the UI. The configs 'spark.acls.enable' and 'spark.ui.view.acls'
* control the behavior of the acls. Note that the person who started the application
* always has view access to the UI.
*
* Spark has a set of modify acls (`spark.modify.acls`) that controls which users have permission
* to modify a single application. This would include things like killing the application. By
* default the person who started the application has modify access. For modify access through
* the UI, you must have a filter that does authentication in place for the modify acls to work
* properly.
*
* Spark also has a set of admin acls (`spark.admin.acls`) which is a set of users/administrators
* who always have permission to view or modify the Spark application.
*
* Spark does not currently support encryption after authentication.
*
* At this point spark has multiple communication protocols that need to be secured and
Expand Down Expand Up @@ -137,18 +146,32 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
private val sparkSecretLookupKey = "sparkCookie"

private val authOn = sparkConf.getBoolean("spark.authenticate", false)
private var uiAclsOn = sparkConf.getBoolean("spark.ui.acls.enable", false)
// keep spark.ui.acls.enable for backwards compatibility with 1.0
private var aclsOn = sparkConf.getOption("spark.acls.enable").getOrElse(
sparkConf.get("spark.ui.acls.enable", "false")).toBoolean

// admin acls should be set before view or modify acls
private var adminAcls: Set[String] =
stringToSet(sparkConf.get("spark.admin.acls", ""))

private var viewAcls: Set[String] = _

// list of users who have permission to modify the application. This should
// apply to both UI and CLI for things like killing the application.
private var modifyAcls: Set[String] = _

// always add the current user and SPARK_USER to the viewAcls
private val defaultAclUsers = Seq[String](System.getProperty("user.name", ""),
private val defaultAclUsers = Set[String](System.getProperty("user.name", ""),
Option(System.getenv("SPARK_USER")).getOrElse(""))

setViewAcls(defaultAclUsers, sparkConf.get("spark.ui.view.acls", ""))
setModifyAcls(defaultAclUsers, sparkConf.get("spark.modify.acls", ""))

private val secretKey = generateSecretKey()
logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") +
"; ui acls " + (if (uiAclsOn) "enabled" else "disabled") +
"; users with view permissions: " + viewAcls.toString())
"; ui acls " + (if (aclsOn) "enabled" else "disabled") +
"; users with view permissions: " + viewAcls.toString() +
"; users with modify permissions: " + modifyAcls.toString())

// Set our own authenticator to properly negotiate user/password for HTTP connections.
// This is needed by the HTTP client fetching from the HttpServer. Put here so its
Expand All @@ -169,18 +192,51 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
)
}

private[spark] def setViewAcls(defaultUsers: Seq[String], allowedUsers: String) {
viewAcls = (defaultUsers ++ allowedUsers.split(',')).map(_.trim()).filter(!_.isEmpty).toSet
/**
* Split a comma separated String, filter out any empty items, and return a Set of strings
*/
private def stringToSet(list: String): Set[String] = {
list.split(',').map(_.trim).filter(!_.isEmpty).toSet
}

/**
* Admin acls should be set before the view or modify acls. If you modify the admin
* acls you should also set the view and modify acls again to pick up the changes.
*/
def setViewAcls(defaultUsers: Set[String], allowedUsers: String) {
viewAcls = (adminAcls ++ defaultUsers ++ stringToSet(allowedUsers))
logInfo("Changing view acls to: " + viewAcls.mkString(","))
}

private[spark] def setViewAcls(defaultUser: String, allowedUsers: String) {
setViewAcls(Seq[String](defaultUser), allowedUsers)
def setViewAcls(defaultUser: String, allowedUsers: String) {
setViewAcls(Set[String](defaultUser), allowedUsers)
}

def getViewAcls: String = viewAcls.mkString(",")

/**
* Admin acls should be set before the view or modify acls. If you modify the admin
* acls you should also set the view and modify acls again to pick up the changes.
*/
def setModifyAcls(defaultUsers: Set[String], allowedUsers: String) {
modifyAcls = (adminAcls ++ defaultUsers ++ stringToSet(allowedUsers))
logInfo("Changing modify acls to: " + modifyAcls.mkString(","))
}

def getModifyAcls: String = modifyAcls.mkString(",")

/**
* Admin acls should be set before the view or modify acls. If you modify the admin
* acls you should also set the view and modify acls again to pick up the changes.
*/
def setAdminAcls(adminUsers: String) {
adminAcls = stringToSet(adminUsers)
logInfo("Changing admin acls to: " + adminAcls.mkString(","))
}

private[spark] def setUIAcls(aclSetting: Boolean) {
uiAclsOn = aclSetting
logInfo("Changing acls enabled to: " + uiAclsOn)
def setAcls(aclSetting: Boolean) {
aclsOn = aclSetting
logInfo("Changing acls enabled to: " + aclsOn)
}

/**
Expand Down Expand Up @@ -224,22 +280,39 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
* Check to see if Acls for the UI are enabled
* @return true if UI authentication is enabled, otherwise false
*/
def uiAclsEnabled(): Boolean = uiAclsOn
def aclsEnabled(): Boolean = aclsOn

/**
* Checks the given user against the view acl list to see if they have
* authorization to view the UI. If the UI acls must are disabled
* via spark.ui.acls.enable, all users have view access.
* authorization to view the UI. If the UI acls are disabled
* via spark.acls.enable, all users have view access. If the user is null
* it is assumed authentication is off and all users have access.
*
* @param user to see if is authorized
* @return true is the user has permission, otherwise false
*/
def checkUIViewPermissions(user: String): Boolean = {
logDebug("user=" + user + " uiAclsEnabled=" + uiAclsEnabled() + " viewAcls=" +
logDebug("user=" + user + " aclsEnabled=" + aclsEnabled() + " viewAcls=" +
viewAcls.mkString(","))
if (uiAclsEnabled() && (user != null) && (!viewAcls.contains(user))) false else true
if (aclsEnabled() && (user != null) && (!viewAcls.contains(user))) false else true
}

/**
* Checks the given user against the modify acl list to see if they have
* authorization to modify the application. If the UI acls are disabled
* via spark.acls.enable, all users have modify access. If the user is null
* it is assumed authentication isn't turned on and all users have access.
*
* @param user to see if is authorized
* @return true is the user has permission, otherwise false
*/
def checkModifyPermissions(user: String): Boolean = {
logDebug("user=" + user + " aclsEnabled=" + aclsEnabled() + " modifyAcls=" +
modifyAcls.mkString(","))
if (aclsEnabled() && (user != null) && (!modifyAcls.contains(user))) false else true
}


/**
* Check to see if authentication for the Spark communication protocols is enabled
* @return true if authentication is enabled, otherwise false
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
}
}

// Validate memory fractions
val memoryKeys = Seq(
"spark.storage.memoryFraction",
"spark.shuffle.memoryFraction",
"spark.shuffle.safetyFraction",
"spark.storage.unrollFraction",
"spark.storage.safetyFraction")
for (key <- memoryKeys) {
val value = getDouble(key, 0.5)
if (value > 1 || value < 0) {
throw new IllegalArgumentException("$key should be between 0 and 1 (was '$value').")
}
}

// Check for legacy configs
sys.env.get("SPARK_JAVA_OPTS").foreach { value =>
val warning =
Expand Down
19 changes: 19 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,15 @@ class SparkContext(config: SparkConf) extends Logging {
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) =
new Accumulator(initialValue, param)

/**
* Create an [[org.apache.spark.Accumulator]] variable of a given type, with a name for display
* in the Spark UI. Tasks can "add" values to the accumulator using the `+=` method. Only the
* driver can access the accumulator's `value`.
*/
def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]) = {
new Accumulator(initialValue, param, Some(name))
}

/**
* Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values
* with `+=`. Only the driver can access the accumuable's `value`.
Expand All @@ -769,6 +778,16 @@ class SparkContext(config: SparkConf) extends Logging {
def accumulable[T, R](initialValue: T)(implicit param: AccumulableParam[T, R]) =
new Accumulable(initialValue, param)

/**
* Create an [[org.apache.spark.Accumulable]] shared variable, with a name for display in the
* Spark UI. Tasks can add values to the accumuable using the `+=` operator. Only the driver can
* access the accumuable's `value`.
* @tparam T accumulator type
* @tparam R type that can be added to the accumulator
*/
def accumulable[T, R](initialValue: T, name: String)(implicit param: AccumulableParam[T, R]) =
new Accumulable(initialValue, param, Some(name))

/**
* Create an accumulator from a "mutable collection" type.
*
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.ConnectionManager
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.shuffle.{ShuffleMemoryManager, ShuffleManager}
import org.apache.spark.storage._
import org.apache.spark.util.{AkkaUtils, Utils}

Expand Down Expand Up @@ -66,12 +66,9 @@ class SparkEnv (
val httpFileServer: HttpFileServer,
val sparkFilesDir: String,
val metricsSystem: MetricsSystem,
val shuffleMemoryManager: ShuffleMemoryManager,
val conf: SparkConf) extends Logging {

// A mapping of thread ID to amount of memory, in bytes, used for shuffle aggregations
// All accesses should be manually synchronized
val shuffleMemoryMap = mutable.HashMap[Long, Long]()

private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()

// A general, soft-reference map for metadata needed during HadoopRDD split computation
Expand Down Expand Up @@ -252,6 +249,8 @@ object SparkEnv extends Logging {
val shuffleManager = instantiateClass[ShuffleManager](
"spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager")

val shuffleMemoryManager = new ShuffleMemoryManager(conf)

// Warn about deprecated spark.cache.class property
if (conf.contains("spark.cache.class")) {
logWarning("The spark.cache.class property is no longer being used! Specify storage " +
Expand All @@ -273,6 +272,7 @@ object SparkEnv extends Logging {
httpFileServer,
sparkFilesDir,
metricsSystem,
shuffleMemoryManager,
conf)
}

Expand Down
Loading

0 comments on commit 1db5314

Please sign in to comment.