diff --git a/backend/src/main/scala/cromwell/backend/async/KnownJobFailureException.scala b/backend/src/main/scala/cromwell/backend/async/KnownJobFailureException.scala index 8dd9aef3544..97b76c77f5e 100644 --- a/backend/src/main/scala/cromwell/backend/async/KnownJobFailureException.scala +++ b/backend/src/main/scala/cromwell/backend/async/KnownJobFailureException.scala @@ -13,6 +13,10 @@ final case class WrongReturnCode(jobTag: String, returnCode: Int, stderrPath: Op override def getMessage = s"Job $jobTag exited with return code $returnCode which has not been declared as a valid return code. See 'continueOnReturnCode' runtime attribute for more details." } +final case class UnExpectedStatus(jobTag: String, returnCode: Int, jobStatus: String, stderrPath: Option[Path]) extends KnownJobFailureException { + override def getMessage = s"Job $jobTag exited with success code '$returnCode' but failed status '$jobStatus'. Suspecting spot kill and retrying." +} + final case class ReturnCodeIsNotAnInt(jobTag: String, returnCode: String, stderrPath: Option[Path]) extends KnownJobFailureException { override def getMessage = { if (returnCode.isEmpty) diff --git a/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala b/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala index 418abdcf2a9..d136acd905c 100644 --- a/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala +++ b/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala @@ -364,7 +364,7 @@ trait StandardAsyncExecutionActor * to re-do this before sending the response. */ private var jobPathsUpdated: Boolean = false - private def updateJobPaths(): Unit = if (!jobPathsUpdated) { + def updateJobPaths(): Unit = if (!jobPathsUpdated) { // .get's are safe on stdout and stderr after falling back to default names above. jobPaths.standardPaths = StandardPaths( output = hostPathFromContainerPath(executionStdout), @@ -1162,7 +1162,7 @@ trait StandardAsyncExecutionActor configurationDescriptor.slowJobWarningAfter foreach { duration => self ! WarnAboutSlownessAfter(handle.pendingJob.jobId, duration) } tellKvJobId(handle.pendingJob) map { _ => - if (logJobIds) jobLogger.info(s"job id: ${handle.pendingJob.jobId}") + if (logJobIds) jobLogger.debug(s"job id: ${handle.pendingJob.jobId}") tellMetadata(Map(CallMetadataKeys.JobId -> handle.pendingJob.jobId)) /* NOTE: Because of the async nature of the Scala Futures, there is a point in time where we have submitted this or @@ -1281,7 +1281,7 @@ trait StandardAsyncExecutionActor stderrSizeAndReturnCodeAndMemoryRetry flatMap { case (stderrSize, returnCodeAsString, retryWithMoreMemory) => val tryReturnCodeAsInt = Try(returnCodeAsString.trim.toInt) - + jobLogger.debug(s"Handling execution Result with status '${status.toString()}' and returnCode ${returnCodeAsString}") if (isDone(status)) { tryReturnCodeAsInt match { // stderr not empty : retry @@ -1293,12 +1293,17 @@ trait StandardAsyncExecutionActor case Success(returnCodeAsInt) if isAbort(returnCodeAsInt) && !retryWithMoreMemory => jobLogger.debug(s"Job was aborted, code was : '${returnCodeAsString}'") Future.successful(AbortedExecutionHandle) + // if instance killed after RC.txt creation : edge case with status == Failed AND returnCode == [accepted values] => retry. + case Success(returnCodeAsInt) if status.toString() == "Failed" && continueOnReturnCode.continueFor(returnCodeAsInt) => + jobLogger.debug(s"Suspected spot kill due to status/RC mismatch") + val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(UnExpectedStatus(jobDescriptor.key.tag, returnCodeAsInt, status.toString(), stderrAsOption), Option(returnCodeAsInt), None)) + retryElseFail(executionHandle) // job considered ok by accepted exit code case Success(returnCodeAsInt) if continueOnReturnCode.continueFor(returnCodeAsInt) => handleExecutionSuccess(status, oldHandle, returnCodeAsInt) // job failed on out-of-memory : retry case Success(returnCodeAsInt) if retryWithMoreMemory => - jobLogger.warn(s"Retrying job due to OOM with exit code : '${returnCodeAsString}' ") + jobLogger.info(s"Retrying job due to OOM with exit code : '${returnCodeAsString}' ") val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(RetryWithMoreMemory(jobDescriptor.key.tag, stderrAsOption, memoryRetryErrorKeys, log), Option(returnCodeAsInt), None)) retryElseFail(executionHandle, retryWithMoreMemory) // unaccepted return code : retry. @@ -1339,8 +1344,7 @@ trait StandardAsyncExecutionActor if (fileExists) asyncIo.contentAsStringAsync(jobPaths.returnCode, None, failOnOverflow = false) else { - jobLogger.warn("RC file not found. Setting job to failed & retry.") - //Thread.sleep(300000) + jobLogger.debug("RC file not found. Setting job to failed.") Future("1") } } diff --git a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/job/EngineJobExecutionActor.scala b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/job/EngineJobExecutionActor.scala index 3bbdb5eb1f2..f2bd2dd4b3d 100644 --- a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/job/EngineJobExecutionActor.scala +++ b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/job/EngineJobExecutionActor.scala @@ -262,7 +262,7 @@ class EngineJobExecutionActor(replyTo: ActorRef, log.info(template, jobTag, data.failedCopyAttempts, callCachingParameters.maxFailedCopyAttempts, data.aggregatedHashString) } else { log.info(s"BT-322 {} cache hit copying nomatch: could not find a suitable cache hit.", jobTag) - workflowLogger.info("Could not copy a suitable cache hit for {}. No copy attempts were made.", arg = jobTag) + workflowLogger.debug("Could not copy a suitable cache hit for {}. No copy attempts were made.", arg = jobTag) } runJob(data) diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala index 8ede8fb48b4..4ccc6272fc0 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala @@ -37,11 +37,16 @@ import java.io.FileNotFoundException import akka.actor.ActorRef import akka.pattern.AskSupport import akka.util.Timeout + +import cats.implicits._ + +import common.exception.MessageAggregation import common.collections.EnhancedCollections._ import common.util.StringUtil._ import common.validation.Validation._ + import cromwell.backend._ -import cromwell.backend.async.{ExecutionHandle, PendingExecutionHandle} +import cromwell.backend.async._ //{ExecutionHandle, PendingExecutionHandle} import cromwell.backend.impl.aws.IntervalLimitedAwsJobSubmitActor.SubmitAwsJobRequest import cromwell.backend.impl.aws.OccasionalStatusPollingActor.{NotifyOfStatus, WhatsMyStatus} import cromwell.backend.impl.aws.RunStatus.{Initializing, TerminalRunStatus} @@ -49,16 +54,21 @@ import cromwell.backend.impl.aws.io._ import cromwell.backend.io.DirectoryFunctions import cromwell.backend.io.JobPaths import cromwell.backend.standard.{StandardAsyncExecutionActor, StandardAsyncExecutionActorParams, StandardAsyncJob} +import cromwell.backend.OutputEvaluator._ + import cromwell.core._ +import cromwell.core.path.Path import cromwell.core.path.{DefaultPathBuilder, Path, PathBuilder, PathFactory} import cromwell.core.io.{DefaultIoCommandBuilder, IoCommandBuilder} import cromwell.core.retry.SimpleExponentialBackoff + import cromwell.filesystems.s3.S3Path import cromwell.filesystems.s3.batch.S3BatchCommandBuilder + import cromwell.services.keyvalue.KvClient + import org.slf4j.{Logger, LoggerFactory} import software.amazon.awssdk.services.batch.BatchClient -//import software.amazon.awssdk.services.batch.model.{BatchException, SubmitJobResponse} import software.amazon.awssdk.services.batch.model._ import wom.callable.Callable.OutputDefinition @@ -67,7 +77,7 @@ import wom.expression.NoIoFunctionSet import wom.types.{WomArrayType, WomSingleFileType} import wom.values._ -import scala.concurrent.{Future, Promise} +import scala.concurrent._ import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.control.NoStackTrace @@ -510,7 +520,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar case NotifyOfStatus(_, _, Some(value)) => Future.successful(value) case NotifyOfStatus(_, _, None) => - jobLogger.info("Having to fall back to AWS query for status") + jobLogger.debug("Having to fall back to AWS query for status") Future.fromTry(job.status(jobId)) case other => val message = s"Programmer Error (please report this): Received an unexpected message from the OccasionalPollingActor: $other" @@ -536,8 +546,24 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar val describeJobsResponse = batchClient.describeJobs(DescribeJobsRequest.builder.jobs(job.jobId).build) val jobDetail = describeJobsResponse.jobs.get(0) //OrElse(throw new RuntimeException(s"Could not get job details for job '${job.jobId}'")) val nrAttempts = jobDetail.attempts.size - val lastattempt = jobDetail.attempts.get(nrAttempts-1) - var containerRC = lastattempt.container.exitCode + // if job is terminated/cancelled before starting, there are no attempts. + val lastattempt = + try { + jobDetail.attempts.get(nrAttempts-1) + } catch { + case _ : Throwable => null + } + if (lastattempt == null ) { + Log.info(s"No attempts were made for job '${job.jobId}'. no memory-related retry needed.") + false + } + + var containerRC = + try { + lastattempt.container.exitCode + } catch { + case _ : Throwable => null + } // if missing, set to failed. if (containerRC == null ) { Log.debug(s"No RC found for job '${job.jobId}', most likely a spot kill") @@ -621,8 +647,9 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar override def getTerminalEvents(runStatus: RunStatus): Seq[ExecutionEvent] = { runStatus match { case successStatus: RunStatus.Succeeded => successStatus.eventList - case unknown => - throw new RuntimeException(s"handleExecutionSuccess not called with RunStatus.Success. Instead got $unknown") + case unknown => { + throw new RuntimeException(s"handleExecutionSuccess not called with RunStatus.Success. Instead got $unknown") + } } } @@ -642,4 +669,38 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar } ) } + + override def handleExecutionSuccess(runStatus: StandardAsyncRunState, + handle: StandardAsyncPendingExecutionHandle, + returnCode: Int)(implicit ec: ExecutionContext): Future[ExecutionHandle] = { + evaluateOutputs() map { + case ValidJobOutputs(outputs) => + // Need to make sure the paths are up to date before sending the detritus back in the response + updateJobPaths() + // If instance is terminated while copying stdout/stderr : status is failed while jobs outputs are ok + // => Retryable + if (runStatus.toString().equals("Failed")) { + jobLogger.warn("Got Failed RunStatus for success Execution") + + val exception = new MessageAggregation { + override def exceptionContext: String = "Got Failed RunStatus for success Execution" + override def errorMessages: Traversable[String] = Array("Got Failed RunStatus for success Execution") + } + FailedNonRetryableExecutionHandle(exception, kvPairsToSave = None) + } else { + SuccessfulExecutionHandle(outputs, returnCode, jobPaths.detritusPaths, getTerminalEvents(runStatus)) + } + case InvalidJobOutputs(errors) => + val exception = new MessageAggregation { + override def exceptionContext: String = "Failed to evaluate job outputs" + override def errorMessages: Traversable[String] = errors.toList + } + FailedNonRetryableExecutionHandle(exception, kvPairsToSave = None) + case JobOutputsEvaluationException(exception: Exception) if retryEvaluateOutputsAggregated(exception) => + // Return the execution handle in this case to retry the operation + handle + case JobOutputsEvaluationException(ex) => FailedNonRetryableExecutionHandle(ex, kvPairsToSave = None) + } + } + } diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala index 5c97886130f..0b5648c96fc 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala @@ -195,7 +195,6 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL | | # get the multipart chunk size | chunk_size=$$(_get_multipart_chunk_size $$local_path) - | echo "chunk size : $$chunk_size bytes" | local MP_THRESHOLD=${mp_threshold} | # then set them | $awsCmd configure set default.s3.multipart_threshold $$MP_THRESHOLD @@ -378,7 +377,8 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL //calls the client to submit the job def callClient(definitionArn: String, awsBatchAttributes: AwsBatchAttributes): Aws[F, SubmitJobResponse] = { - Log.info(s"Submitting taskId: $taskId, job definition : $definitionArn, script: $batch_script") + Log.debug(s"Submitting taskId: $taskId, job definition : $definitionArn, script: $batch_script") + Log.info(s"Submitting taskId: $taskId, script: $batch_script") val submit: F[SubmitJobResponse] = async.delay(batchClient.submitJob(