diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 7022b986ea025..6beea5646f63b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -17,6 +17,8 @@ package org.apache.spark.deploy +import java.util.concurrent.TimeUnit + import scala.collection.mutable.HashSet import scala.concurrent.ExecutionContext import scala.reflect.ClassTag @@ -27,6 +29,7 @@ import org.apache.log4j.Logger import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} +import org.apache.spark.deploy.master.DriverState.DriverState import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config.Network.RPC_ASK_TIMEOUT import org.apache.spark.resource.ResourceUtils @@ -61,6 +64,12 @@ private class ClientEndpoint( private val lostMasters = new HashSet[RpcAddress] private var activeMasterEndpoint: RpcEndpointRef = null + private val waitAppCompletion = conf.getBoolean("spark.standalone.submit.waitAppCompletion", + false) + private val REPORT_DRIVER_STATUS_INTERVAL = 10000 + private var submittedDriverID = "" + private var driverStatusReported = false + private def getProperty(key: String, conf: SparkConf): Option[String] = { sys.props.get(key).orElse(conf.getOption(key)) @@ -107,8 +116,13 @@ private class ClientEndpoint( case "kill" => val driverId = driverArgs.driverId + submittedDriverID = driverId asyncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId)) } + logInfo("... waiting before polling master for driver state") + forwardMessageThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError { + monitorDriverStatus() + }, 5000, REPORT_DRIVER_STATUS_INTERVAL, TimeUnit.MILLISECONDS) } /** @@ -124,58 +138,87 @@ private class ClientEndpoint( } } - /* Find out driver status then exit the JVM */ - def pollAndReportStatus(driverId: String): Unit = { - // Since ClientEndpoint is the only RpcEndpoint in the process, blocking the event loop thread - // is fine. - logInfo("... waiting before polling master for driver state") - Thread.sleep(5000) - logInfo("... polling master for driver state") - val statusResponse = - activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId)) - if (statusResponse.found) { - logInfo(s"State of $driverId is ${statusResponse.state.get}") - // Worker node, if present - (statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match { - case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) => - logInfo(s"Driver running on $hostPort ($id)") - case _ => + private def monitorDriverStatus(): Unit = { + if (submittedDriverID != "") { + asyncSendToMasterAndForwardReply[DriverStatusResponse](RequestDriverStatus(submittedDriverID)) + } + } + + /** + * Processes and reports the driver status then exit the JVM if the + * waitAppCompletion is set to false, else reports the driver status + * if debug logs are enabled. + */ + + def reportDriverStatus( + found: Boolean, + state: Option[DriverState], + workerId: Option[String], + workerHostPort: Option[String], + exception: Option[Exception]): Unit = { + if (found) { + // Using driverStatusReported to avoid writing following + // logs again when waitAppCompletion is set to true + if (!driverStatusReported) { + driverStatusReported = true + logInfo(s"State of $submittedDriverID is ${state.get}") + // Worker node, if present + (workerId, workerHostPort, state) match { + case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) => + logInfo(s"Driver running on $hostPort ($id)") + case _ => + } } // Exception, if present - statusResponse.exception match { + exception match { case Some(e) => logError(s"Exception from cluster was: $e") e.printStackTrace() System.exit(-1) case _ => - System.exit(0) + state.get match { + case DriverState.FINISHED | DriverState.FAILED | + DriverState.ERROR | DriverState.KILLED => + logInfo(s"State of driver $submittedDriverID is ${state.get}, " + + s"exiting spark-submit JVM.") + System.exit(0) + case _ => + if (!waitAppCompletion) { + logInfo(s"spark-submit not configured to wait for completion, " + + s"exiting spark-submit JVM.") + System.exit(0) + } else { + logDebug(s"State of driver $submittedDriverID is ${state.get}, " + + s"continue monitoring driver status.") + } + } + } + } else { + logError(s"ERROR: Cluster master did not recognize $submittedDriverID") + System.exit(-1) } - } else { - logError(s"ERROR: Cluster master did not recognize $driverId") - System.exit(-1) } - } - override def receive: PartialFunction[Any, Unit] = { case SubmitDriverResponse(master, success, driverId, message) => logInfo(message) if (success) { activeMasterEndpoint = master - pollAndReportStatus(driverId.get) + submittedDriverID = driverId.get } else if (!Utils.responseFromBackup(message)) { System.exit(-1) } - case KillDriverResponse(master, driverId, success, message) => logInfo(message) if (success) { activeMasterEndpoint = master - pollAndReportStatus(driverId) } else if (!Utils.responseFromBackup(message)) { System.exit(-1) } + + case DriverStatusResponse(found, state, workerId, workerHostPort, exception) => + reportDriverStatus(found, state, workerId, workerHostPort, exception) } override def onDisconnected(remoteAddress: RpcAddress): Unit = { diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index 1e6f8c586d546..1f70d46d587a8 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -374,6 +374,25 @@ To run an interactive Spark shell against the cluster, run the following command You can also pass an option `--total-executor-cores ` to control the number of cores that spark-shell uses on the cluster. +# Client Properties + +Spark applications supports the following configuration properties specific to standalone mode: + + + + + + + + + +
Property NameDefault ValueMeaningSince Version
spark.standalone.submit.waitAppCompletionfalse + In standalone cluster mode, controls whether the client waits to exit until the application completes. + If set to true, the client process will stay alive polling the driver's status. + Otherwise, the client process will exit after submission. + 3.1.0
+ + # Launching Spark Applications The [`spark-submit` script](submitting-applications.html) provides the most straightforward way to