diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 230598d63bed1..6d7de973a52c2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.kubernetes import java.io.File import java.security.SecureRandom -import java.util.concurrent.{Executors, TimeUnit} +import java.util.concurrent.{Executors, TimeoutException, TimeUnit} import javax.net.ssl.X509TrustManager import com.google.common.io.Files @@ -34,7 +34,7 @@ import scala.concurrent.ExecutionContext import scala.concurrent.duration.DurationInt import scala.util.Success -import org.apache.spark.{SPARK_VERSION, SparkConf} +import org.apache.spark.{SPARK_VERSION, SparkConf, SparkException} import org.apache.spark.deploy.rest.{AppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, TarGzippedData, UploadedAppResource} import org.apache.spark.deploy.rest.kubernetes._ import org.apache.spark.internal.Logging @@ -130,8 +130,8 @@ private[spark] class Client( val podWatcher = new Watcher[Pod] { override def eventReceived(action: Action, t: Pod): Unit = { if ((action == Action.ADDED || action == Action.MODIFIED) - && t.getStatus.getPhase == "Running" - && !submitCompletedFuture.isDone) { + && t.getStatus.getPhase == "Running" + && !submitCompletedFuture.isDone) { t.getStatus .getContainerStatuses .asScala @@ -216,8 +216,78 @@ private[spark] class Client( .endContainer() .endSpec() .done() - submitCompletedFuture.get(30, TimeUnit.SECONDS) - } + var submitSucceeded = false + try { + submitCompletedFuture.get(LAUNCH_TIMEOUT_SECONDS, TimeUnit.SECONDS) + submitSucceeded = true + } catch { + case e: TimeoutException => + val driverPod = try { + kubernetesClient.pods().withName(kubernetesAppId).get() + } catch { + case throwable: Throwable => + logError(s"Timed out while waiting $LAUNCH_TIMEOUT_SECONDS seconds for the" + + " driver pod to start, but an error occurred while fetching the driver" + + " pod's details.", throwable) + throw new SparkException(s"Timed out while waiting $LAUNCH_TIMEOUT_SECONDS" + + " seconds for the driver pod to start. Unfortunately, in attempting to fetch" + + " the latest state of the pod, another error was thrown. Check the logs for" + + " the error that was thrown in looking up the driver pod.", e) + } + val topLevelMessage = s"The driver pod with name ${driverPod.getMetadata.getName}" + + s" in namespace ${driverPod.getMetadata.getNamespace} was not ready in" + + s" $LAUNCH_TIMEOUT_SECONDS seconds." + val podStatusPhase = if (driverPod.getStatus.getPhase != null) { + s"Latest phase from the pod is: ${driverPod.getStatus.getPhase}" + } else { + "The pod had no final phase." + } + val podStatusMessage = if (driverPod.getStatus.getMessage != null) { + s"Latest message from the pod is: ${driverPod.getStatus.getMessage}" + } else { + "The pod had no final message." + } + val failedDriverContainerStatusString = driverPod.getStatus + .getContainerStatuses + .asScala + .find(_.getName == DRIVER_LAUNCHER_CONTAINER_NAME) + .map(status => { + val lastState = status.getState + if (lastState.getRunning != null) { + "Driver container last state: Running\n" + + s"Driver container started at: ${lastState.getRunning.getStartedAt}" + } else if (lastState.getWaiting != null) { + "Driver container last state: Waiting\n" + + s"Driver container wait reason: ${lastState.getWaiting.getReason}\n" + + s"Driver container message: ${lastState.getWaiting.getMessage}\n" + } else if (lastState.getTerminated != null) { + "Driver container last state: Terminated\n" + + s"Driver container started at: ${lastState.getTerminated.getStartedAt}\n" + + s"Driver container finished at: ${lastState.getTerminated.getFinishedAt}\n" + + s"Driver container exit reason: ${lastState.getTerminated.getReason}\n" + + s"Driver container exit code: ${lastState.getTerminated.getExitCode}\n" + + s"Driver container message: ${lastState.getTerminated.getMessage}" + } else { + "Driver container last state: Unknown" + } + }).getOrElse("The driver container wasn't found in the pod; expected to find" + + s" container with name $DRIVER_LAUNCHER_CONTAINER_NAME") + val finalErrorMessage = s"$topLevelMessage\n" + + s"$podStatusPhase\n" + + s"$podStatusMessage\n\n$failedDriverContainerStatusString" + logError(finalErrorMessage, e) + throw new SparkException(finalErrorMessage, e) + } finally { + if (!submitSucceeded) { + try { + kubernetesClient.pods.withName(kubernetesAppId).delete + } catch { + case throwable: Throwable => + logError("Failed to delete driver pod after it failed to run.", throwable) + } + } + } + } Utils.tryWithResource(kubernetesClient .pods() @@ -338,6 +408,7 @@ private object Client { private val DRIVER_LAUNCHER_CONTAINER_NAME = "spark-kubernetes-driver-launcher" private val SECURE_RANDOM = new SecureRandom() private val SPARK_SUBMISSION_SECRET_BASE_DIR = "/var/run/secrets/spark-submission" + private val LAUNCH_TIMEOUT_SECONDS = 30 def main(args: Array[String]): Unit = { require(args.length >= 2, s"Too few arguments. Usage: ${getClass.getName} " +