Skip to content

Commit

Permalink
[SPARK-6490][Core] Add spark.rpc.* and deprecate spark.akka.*
Browse files Browse the repository at this point in the history
Deprecated `spark.akka.num.retries`, `spark.akka.retry.wait`, `spark.akka.askTimeout`,  `spark.akka.lookupTimeout`, and added `spark.rpc.num.retries`, `spark.rpc.retry.wait`, `spark.rpc.askTimeout`, `spark.rpc.lookupTimeout`.

Author: zsxwing <zsxwing@gmail.com>

Closes apache#5595 from zsxwing/SPARK-6490 and squashes the following commits:

e0d80a9 [zsxwing] Use getTimeAsMs and getTimeAsSeconds and other minor fixes
31dbe69 [zsxwing] Add spark.rpc.* and deprecate spark.akka.*
  • Loading branch information
zsxwing authored and rxin committed Apr 21, 2015
1 parent c736220 commit 8136810
Show file tree
Hide file tree
Showing 15 changed files with 86 additions and 53 deletions.
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,15 @@ private[spark] object SparkConf extends Logging {
"spark.yarn.am.waitTime" -> Seq(
AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3",
// Translate old value to a duration, with 10s wait time per try.
translation = s => s"${s.toLong * 10}s"))
translation = s => s"${s.toLong * 10}s")),
"spark.rpc.numRetries" -> Seq(
AlternateConfig("spark.akka.num.retries", "1.4")),
"spark.rpc.retry.wait" -> Seq(
AlternateConfig("spark.akka.retry.wait", "1.4")),
"spark.rpc.askTimeout" -> Seq(
AlternateConfig("spark.akka.askTimeout", "1.4")),
"spark.rpc.lookupTimeout" -> Seq(
AlternateConfig("spark.akka.lookupTimeout", "1.4"))
)

/**
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.log4j.{Level, Logger}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils}
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, RpcUtils, Utils}

/**
* Proxy that relays messages to the driver.
Expand All @@ -36,7 +36,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
extends Actor with ActorLogReceive with Logging {

var masterActor: ActorSelection = _
val timeout = AkkaUtils.askTimeout(conf)
val timeout = RpcUtils.askTimeout(conf)

override def preStart(): Unit = {
masterActor = context.actorSelection(
Expand Down Expand Up @@ -155,7 +155,7 @@ object Client {
if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
conf.set("spark.akka.logLifecycleEvents", "true")
}
conf.set("spark.akka.askTimeout", "10")
conf.set("spark.rpc.askTimeout", "10")
conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
Logger.getRootLogger.setLevel(driverArgs.logLevel)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.util.{ActorLogReceive, Utils, AkkaUtils}
import org.apache.spark.util.{ActorLogReceive, RpcUtils, Utils, AkkaUtils}

/**
* Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL,
Expand Down Expand Up @@ -193,7 +193,7 @@ private[spark] class AppClient(
def stop() {
if (actor != null) {
try {
val timeout = AkkaUtils.askTimeout(conf)
val timeout = RpcUtils.askTimeout(conf)
val future = actor.ask(StopAppClient)(timeout)
Await.result(future, timeout)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import org.apache.spark.deploy.rest.StandaloneRestServer
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, RpcUtils, SignalLogger, Utils}

private[master] class Master(
host: String,
Expand Down Expand Up @@ -931,7 +931,7 @@ private[deploy] object Master extends Logging {
securityManager = securityMgr)
val actor = actorSystem.actorOf(
Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)
val timeout = AkkaUtils.askTimeout(conf)
val timeout = RpcUtils.askTimeout(conf)
val portsRequest = actor.ask(BoundPortsRequest)(timeout)
val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse]
(actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.Logging
import org.apache.spark.deploy.master.Master
import org.apache.spark.ui.{SparkUI, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.AkkaUtils
import org.apache.spark.util.RpcUtils

/**
* Web UI server for the standalone master.
Expand All @@ -31,7 +31,7 @@ class MasterWebUI(val master: Master, requestedPort: Int)
extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging {

val masterActorRef = master.self
val timeout = AkkaUtils.askTimeout(master.conf)
val timeout = RpcUtils.askTimeout(master.conf)
val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)

initialize()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.json4s._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils}
import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription}
import org.apache.spark.deploy.ClientArguments._

Expand Down Expand Up @@ -223,7 +223,7 @@ private[rest] class KillRequestServlet(masterActor: ActorRef, conf: SparkConf)
}

protected def handleKill(submissionId: String): KillSubmissionResponse = {
val askTimeout = AkkaUtils.askTimeout(conf)
val askTimeout = RpcUtils.askTimeout(conf)
val response = AkkaUtils.askWithReply[DeployMessages.KillDriverResponse](
DeployMessages.RequestKillDriver(submissionId), masterActor, askTimeout)
val k = new KillSubmissionResponse
Expand Down Expand Up @@ -257,7 +257,7 @@ private[rest] class StatusRequestServlet(masterActor: ActorRef, conf: SparkConf)
}

protected def handleStatus(submissionId: String): SubmissionStatusResponse = {
val askTimeout = AkkaUtils.askTimeout(conf)
val askTimeout = RpcUtils.askTimeout(conf)
val response = AkkaUtils.askWithReply[DeployMessages.DriverStatusResponse](
DeployMessages.RequestDriverStatus(submissionId), masterActor, askTimeout)
val message = response.exception.map { s"Exception from the cluster:\n" + formatException(_) }
Expand Down Expand Up @@ -321,7 +321,7 @@ private[rest] class SubmitRequestServlet(
responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
requestMessage match {
case submitRequest: CreateSubmissionRequest =>
val askTimeout = AkkaUtils.askTimeout(conf)
val askTimeout = RpcUtils.askTimeout(conf)
val driverDescription = buildDriverDescription(submitRequest)
val response = AkkaUtils.askWithReply[DeployMessages.SubmitDriverResponse](
DeployMessages.RequestSubmitDriver(driverDescription), masterActor, askTimeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.deploy.worker.Worker
import org.apache.spark.deploy.worker.ui.WorkerWebUI._
import org.apache.spark.ui.{SparkUI, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.AkkaUtils
import org.apache.spark.util.RpcUtils

/**
* Web UI server for the standalone worker.
Expand All @@ -38,7 +38,7 @@ class WorkerWebUI(
extends WebUI(worker.securityMgr, requestedPort, worker.conf, name = "WorkerUI")
with Logging {

private[ui] val timeout = AkkaUtils.askTimeout(worker.conf)
private[ui] val timeout = RpcUtils.askTimeout(worker.conf)

initialize()

Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.language.postfixOps
import scala.reflect.ClassTag

import org.apache.spark.{Logging, SparkException, SecurityManager, SparkConf}
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.{RpcUtils, Utils}

/**
* An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to
Expand All @@ -38,7 +38,7 @@ import org.apache.spark.util.{AkkaUtils, Utils}
*/
private[spark] abstract class RpcEnv(conf: SparkConf) {

private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf)
private[spark] val defaultLookupTimeout = RpcUtils.lookupTimeout(conf)

/**
* Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement
Expand Down Expand Up @@ -282,9 +282,9 @@ trait ThreadSafeRpcEndpoint extends RpcEndpoint
private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
extends Serializable with Logging {

private[this] val maxRetries = conf.getInt("spark.akka.num.retries", 3)
private[this] val retryWaitMs = conf.getLong("spark.akka.retry.wait", 3000)
private[this] val defaultAskTimeout = conf.getLong("spark.akka.askTimeout", 30) seconds
private[this] val maxRetries = RpcUtils.numRetries(conf)
private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
private[this] val defaultAskTimeout = RpcUtils.askTimeout(conf)

/**
* return the address for the [[RpcEndpointRef]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.rpc._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.ui.JettyUtils
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.{RpcUtils, Utils}

import scala.util.control.NonFatal

Expand All @@ -46,7 +46,7 @@ private[spark] abstract class YarnSchedulerBackend(
private val yarnSchedulerEndpoint = rpcEnv.setupEndpoint(
YarnSchedulerBackend.ENDPOINT_NAME, new YarnSchedulerEndpoint(rpcEnv))

private implicit val askTimeout = AkkaUtils.askTimeout(sc.conf)
private implicit val askTimeout = RpcUtils.askTimeout(sc.conf)

/**
* Request executors from the ApplicationMaster by specifying the total number desired.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.AkkaUtils
import org.apache.spark.util.RpcUtils

private[spark]
class BlockManagerMaster(
Expand All @@ -32,7 +32,7 @@ class BlockManagerMaster(
isDriver: Boolean)
extends Logging {

val timeout = AkkaUtils.askTimeout(conf)
val timeout = RpcUtils.askTimeout(conf)

/** Remove a dead executor from the driver endpoint. This is only called on the driver side. */
def removeExecutor(execId: String) {
Expand Down
26 changes: 3 additions & 23 deletions core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.util

import scala.collection.JavaConversions.mapAsJavaMap
import scala.concurrent.Await
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent.duration.FiniteDuration

import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
import akka.pattern.ask
Expand Down Expand Up @@ -125,16 +125,6 @@ private[spark] object AkkaUtils extends Logging {
(actorSystem, boundPort)
}

/** Returns the default Spark timeout to use for Akka ask operations. */
def askTimeout(conf: SparkConf): FiniteDuration = {
Duration.create(conf.getLong("spark.akka.askTimeout", 30), "seconds")
}

/** Returns the default Spark timeout to use for Akka remote actor lookup. */
def lookupTimeout(conf: SparkConf): FiniteDuration = {
Duration.create(conf.getLong("spark.akka.lookupTimeout", 30), "seconds")
}

private val AKKA_MAX_FRAME_SIZE_IN_MB = Int.MaxValue / 1024 / 1024

/** Returns the configured max frame size for Akka messages in bytes. */
Expand All @@ -150,16 +140,6 @@ private[spark] object AkkaUtils extends Logging {
/** Space reserved for extra data in an Akka message besides serialized task or task result. */
val reservedSizeBytes = 200 * 1024

/** Returns the configured number of times to retry connecting */
def numRetries(conf: SparkConf): Int = {
conf.getInt("spark.akka.num.retries", 3)
}

/** Returns the configured number of milliseconds to wait on each retry */
def retryWaitMs(conf: SparkConf): Int = {
conf.getInt("spark.akka.retry.wait", 3000)
}

/**
* Send a message to the given actor and get its result within a default timeout, or
* throw a SparkException if this fails.
Expand Down Expand Up @@ -216,7 +196,7 @@ private[spark] object AkkaUtils extends Logging {
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
Utils.checkHost(driverHost, "Expected hostname")
val url = address(protocol(actorSystem), driverActorSystemName, driverHost, driverPort, name)
val timeout = AkkaUtils.lookupTimeout(conf)
val timeout = RpcUtils.lookupTimeout(conf)
logInfo(s"Connecting to $name: $url")
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
}
Expand All @@ -230,7 +210,7 @@ private[spark] object AkkaUtils extends Logging {
val executorActorSystemName = SparkEnv.executorActorSystemName
Utils.checkHost(host, "Expected hostname")
val url = address(protocol(actorSystem), executorActorSystemName, host, port, name)
val timeout = AkkaUtils.lookupTimeout(conf)
val timeout = RpcUtils.lookupTimeout(conf)
logInfo(s"Connecting to $name: $url")
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
}
Expand Down
23 changes: 23 additions & 0 deletions core/src/main/scala/org/apache/spark/util/RpcUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.spark.util

import scala.concurrent.duration._
import scala.language.postfixOps

import org.apache.spark.{SparkEnv, SparkConf}
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}

Expand All @@ -32,4 +35,24 @@ object RpcUtils {
Utils.checkHost(driverHost, "Expected hostname")
rpcEnv.setupEndpointRef(driverActorSystemName, RpcAddress(driverHost, driverPort), name)
}

/** Returns the configured number of times to retry connecting */
def numRetries(conf: SparkConf): Int = {
conf.getInt("spark.rpc.numRetries", 3)
}

/** Returns the configured number of milliseconds to wait on each retry */
def retryWaitMs(conf: SparkConf): Long = {
conf.getTimeAsMs("spark.rpc.retry.wait", "3s")
}

/** Returns the default Spark timeout to use for RPC ask operations. */
def askTimeout(conf: SparkConf): FiniteDuration = {
conf.getTimeAsSeconds("spark.rpc.askTimeout", "30s") seconds
}

/** Returns the default Spark timeout to use for RPC remote endpoint lookup. */
def lookupTimeout(conf: SparkConf): FiniteDuration = {
conf.getTimeAsSeconds("spark.rpc.lookupTimeout", "30s") seconds
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class MapOutputTrackerSuite extends FunSuite {
test("remote fetch below akka frame size") {
val newConf = new SparkConf
newConf.set("spark.akka.frameSize", "1")
newConf.set("spark.akka.askTimeout", "1") // Fail fast
newConf.set("spark.rpc.askTimeout", "1") // Fail fast

val masterTracker = new MapOutputTrackerMaster(conf)
val rpcEnv = createRpcEnv("spark")
Expand All @@ -180,7 +180,7 @@ class MapOutputTrackerSuite extends FunSuite {
test("remote fetch exceeds akka frame size") {
val newConf = new SparkConf
newConf.set("spark.akka.frameSize", "1")
newConf.set("spark.akka.askTimeout", "1") // Fail fast
newConf.set("spark.rpc.askTimeout", "1") // Fail fast

val masterTracker = new MapOutputTrackerMaster(conf)
val rpcEnv = createRpcEnv("test")
Expand Down
24 changes: 23 additions & 1 deletion core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package org.apache.spark

import java.util.concurrent.{TimeUnit, Executors}

import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.{Try, Random}

import org.scalatest.FunSuite
import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer}
import org.apache.spark.util.ResetSystemProperties
import org.apache.spark.util.{RpcUtils, ResetSystemProperties}
import com.esotericsoftware.kryo.Kryo

class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemProperties {
Expand Down Expand Up @@ -222,6 +224,26 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro
assert(conf.getTimeAsSeconds("spark.yarn.am.waitTime") === 420)
}

test("akka deprecated configs") {
val conf = new SparkConf()

assert(!conf.contains("spark.rpc.num.retries"))
assert(!conf.contains("spark.rpc.retry.wait"))
assert(!conf.contains("spark.rpc.askTimeout"))
assert(!conf.contains("spark.rpc.lookupTimeout"))

conf.set("spark.akka.num.retries", "1")
assert(RpcUtils.numRetries(conf) === 1)

conf.set("spark.akka.retry.wait", "2")
assert(RpcUtils.retryWaitMs(conf) === 2L)

conf.set("spark.akka.askTimeout", "3")
assert(RpcUtils.askTimeout(conf) === (3 seconds))

conf.set("spark.akka.lookupTimeout", "4")
assert(RpcUtils.lookupTimeout(conf) === (4 seconds))
}
}

class Class1 {}
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
})

val conf = new SparkConf()
conf.set("spark.akka.retry.wait", "0")
conf.set("spark.akka.num.retries", "1")
conf.set("spark.rpc.retry.wait", "0")
conf.set("spark.rpc.num.retries", "1")
val anotherEnv = createRpcEnv(conf, "remote", 13345)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-timeout")
Expand Down

0 comments on commit 8136810

Please sign in to comment.