Skip to content

Commit

Permalink
Merge branch 'master' into cromwell-83
Browse files Browse the repository at this point in the history
  • Loading branch information
henriqueribeiro authored Aug 16, 2022
2 parents b3f8906 + 3262085 commit d40342a
Show file tree
Hide file tree
Showing 45 changed files with 2,034 additions and 505 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
[![Build Status](https://travis-ci.com/broadinstitute/cromwell.svg?branch=develop)](https://travis-ci.com/broadinstitute/cromwell?branch=develop)
[![codecov](https://codecov.io/gh/broadinstitute/cromwell/branch/develop/graph/badge.svg)](https://codecov.io/gh/broadinstitute/cromwell)

## Welcome to Cromwell
## Welcome to the "AWS-friendly" Cromwell

More information regarding AWS features can be found [here](https://github.com/henriqueribeiro/cromwell/tree/master/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws)

Contact: henrique [at] loka [dot] com

Cromwell is an open-source Workflow Management System for bioinformatics. Licensing is [BSD 3-Clause](LICENSE.txt).

Expand Down
2 changes: 2 additions & 0 deletions backend/src/main/scala/cromwell/backend/backend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ object CommonBackendConfigurationAttributes {
"default-runtime-attributes.noAddress",
"default-runtime-attributes.docker",
"default-runtime-attributes.queueArn",
"default-runtime-attributes.awsBatchRetryAttempts",
"default-runtime-attributes.ulimits",
"default-runtime-attributes.failOnStderr",
"slow-job-warning-time",
"dockerhub",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,9 @@ trait StandardAsyncExecutionActor
/** Any custom code that should be run within commandScriptContents before the instantiated command. */
def scriptPreamble: String = ""

/** Any custom code that should be run within commandScriptContents right before exiting. */
def scriptClosure: String = ""

def cwd: Path = commandDirectory
def rcPath: Path = cwd./(jobPaths.returnCodeFilename)

Expand Down Expand Up @@ -382,7 +385,7 @@ trait StandardAsyncExecutionActor
def commandScriptContents: ErrorOr[String] = {
val commandString = instantiatedCommand.commandString
val commandStringAbbreviated = StringUtils.abbreviateMiddle(commandString, "...", abbreviateCommandLength)
jobLogger.info(s"`$commandStringAbbreviated`")
jobLogger.debug(s"`$commandStringAbbreviated`")
tellMetadata(Map(CallMetadataKeys.CommandLine -> commandStringAbbreviated))

val cwd = commandDirectory
Expand Down Expand Up @@ -447,12 +450,14 @@ trait StandardAsyncExecutionActor
|touch $stdoutRedirection $stderrRedirection
|tee $stdoutRedirection < "$$$out" &
|tee $stderrRedirection < "$$$err" >&2 &
|set -x
|(
|cd ${cwd.pathAsString}
|ENVIRONMENT_VARIABLES
|INSTANTIATED_COMMAND
|) $stdinRedirection > "$$$out" 2> "$$$err"
|echo $$? > $rcTmpPath
|set +x
|$emptyDirectoryFillCommand
|(
|cd ${cwd.pathAsString}
Expand All @@ -461,12 +466,14 @@ trait StandardAsyncExecutionActor
|${directoryScripts(directoryOutputs)}
|)
|mv $rcTmpPath $rcPath
|SCRIPT_CLOSURE
|""".stripMargin
.replace("SCRIPT_PREAMBLE", scriptPreamble)
.replace("ENVIRONMENT_VARIABLES", environmentVariables)
.replace("INSTANTIATED_COMMAND", commandString)
.replace("SCRIPT_EPILOGUE", scriptEpilogue)
.replace("DOCKER_OUTPUT_DIR_LINK", dockerOutputDir))
.replace("DOCKER_OUTPUT_DIR_LINK", dockerOutputDir)
.replace("SCRIPT_CLOSURE", scriptClosure))
}

def runtimeEnvironmentPathMapper(env: RuntimeEnvironment): RuntimeEnvironment = {
Expand Down Expand Up @@ -1258,48 +1265,17 @@ trait StandardAsyncExecutionActor
*/
def handleExecutionResult(status: StandardAsyncRunState,
oldHandle: StandardAsyncPendingExecutionHandle): Future[ExecutionHandle] = {

def memoryRetryRC: Future[Boolean] = {
def returnCodeAsBoolean(codeAsOption: Option[String]): Boolean = {
codeAsOption match {
case Some(codeAsString) =>
Try(codeAsString.trim.toInt) match {
case Success(code) => code match {
case StderrContainsRetryKeysCode => true
case _ => false
}
case Failure(e) =>
log.error(s"'CheckingForMemoryRetry' action exited with code '$codeAsString' which couldn't be " +
s"converted to an Integer. Task will not be retried with more memory. Error: ${ExceptionUtils.getMessage(e)}")
false
}
case None => false
}
}

def readMemoryRetryRCFile(fileExists: Boolean): Future[Option[String]] = {
if (fileExists)
asyncIo.contentAsStringAsync(jobPaths.memoryRetryRC, None, failOnOverflow = false).map(Option(_))
else
Future.successful(None)
}

for {
fileExists <- asyncIo.existsAsync(jobPaths.memoryRetryRC)
retryCheckRCAsOption <- readMemoryRetryRCFile(fileExists)
retryWithMoreMemory = returnCodeAsBoolean(retryCheckRCAsOption)
} yield retryWithMoreMemory
}


// get path to sderr
val stderr = jobPaths.standardPaths.error
lazy val stderrAsOption: Option[Path] = Option(stderr)

// get the three needed variables, using helper functions below, or direct assignment.
val stderrSizeAndReturnCodeAndMemoryRetry = for {
returnCodeAsString <- asyncIo.contentAsStringAsync(jobPaths.returnCode, None, failOnOverflow = false)
returnCodeAsString <- JobExitCode
// Only check stderr size if we need to, otherwise this results in a lot of unnecessary I/O that
// may fail due to race conditions on quickly-executing jobs.
stderrSize <- if (failOnStdErr) asyncIo.sizeAsync(stderr) else Future.successful(0L)
retryWithMoreMemory <- memoryRetryRC
retryWithMoreMemory <- memoryRetryRC(oldHandle.pendingJob)
} yield (stderrSize, returnCodeAsString, retryWithMoreMemory)

stderrSizeAndReturnCodeAndMemoryRetry flatMap {
Expand All @@ -1308,25 +1284,36 @@ trait StandardAsyncExecutionActor

if (isDone(status)) {
tryReturnCodeAsInt match {
// stderr not empty : retry
case Success(returnCodeAsInt) if failOnStdErr && stderrSize.intValue > 0 =>
val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(StderrNonEmpty(jobDescriptor.key.tag, stderrSize, stderrAsOption), Option(returnCodeAsInt), None))
retryElseFail(executionHandle)
case Success(returnCodeAsInt) if isAbort(returnCodeAsInt) =>
// job was aborted (cancelled by user?)
// on AWS OOM kill are code 137 : check retryWithMoreMemory here
case Success(returnCodeAsInt) if isAbort(returnCodeAsInt) && !retryWithMoreMemory =>
jobLogger.debug(s"Job was aborted, code was : '${returnCodeAsString}'")
Future.successful(AbortedExecutionHandle)
// job considered ok by accepted exit code
case Success(returnCodeAsInt) if continueOnReturnCode.continueFor(returnCodeAsInt) =>
handleExecutionSuccess(status, oldHandle, returnCodeAsInt)
// job failed on out-of-memory : retry
case Success(returnCodeAsInt) if retryWithMoreMemory =>
jobLogger.warn(s"Retrying job due to OOM with exit code : '${returnCodeAsString}' ")
val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(RetryWithMoreMemory(jobDescriptor.key.tag, stderrAsOption, memoryRetryErrorKeys, log), Option(returnCodeAsInt), None))
retryElseFail(executionHandle, retryWithMoreMemory)
// unaccepted return code : retry.
case Success(returnCodeAsInt) =>
jobLogger.debug(s"Retrying with wrong exit code : '${returnCodeAsString}'")
val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(WrongReturnCode(jobDescriptor.key.tag, returnCodeAsInt, stderrAsOption), Option(returnCodeAsInt), None))
retryElseFail(executionHandle)
case Failure(_) =>
jobLogger.warn(s"General failure of job with exit code : '${returnCodeAsString}'")
Future.successful(FailedNonRetryableExecutionHandle(ReturnCodeIsNotAnInt(jobDescriptor.key.tag, returnCodeAsString, stderrAsOption), kvPairsToSave = None))
}
} else {
tryReturnCodeAsInt match {
case Success(returnCodeAsInt) if retryWithMoreMemory && !continueOnReturnCode.continueFor(returnCodeAsInt) =>
jobLogger.debug(s"job not done but retrying already? : ${status.toString()}")
val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(RetryWithMoreMemory(jobDescriptor.key.tag, stderrAsOption, memoryRetryErrorKeys, log), Option(returnCodeAsInt), None))
retryElseFail(executionHandle, retryWithMoreMemory)
case _ =>
Expand All @@ -1344,6 +1331,63 @@ trait StandardAsyncExecutionActor
}
}

// helper function for handleExecutionResult : get the exit code of the job.
def JobExitCode: Future[String] = {

// read if the file exists
def readRCFile(fileExists: Boolean): Future[String] = {
if (fileExists)
asyncIo.contentAsStringAsync(jobPaths.returnCode, None, failOnOverflow = false)
else {
jobLogger.warn("RC file not found. Setting job to failed & retry.")
//Thread.sleep(300000)
Future("1")
}
}
//finally : assign the yielded variable
for {
fileExists <- asyncIo.existsAsync(jobPaths.returnCode)
jobRC <- readRCFile(fileExists)
} yield jobRC
}

// helper function for handleExecutionResult : get the memory retry code.
def memoryRetryRC(job: StandardAsyncJob): Future[Boolean] = {
// job is used in aws override version. use here to prevent compilation error.
log.debug(s"Looking for memoryRetry in job '${job.jobId}'")
// convert int to boolean
def returnCodeAsBoolean(codeAsOption: Option[String]): Boolean = {
codeAsOption match {
case Some(codeAsString) =>
Try(codeAsString.trim.toInt) match {
case Success(code) => code match {
case StderrContainsRetryKeysCode => true
case _ => false
}
case Failure(e) =>
log.error(s"'CheckingForMemoryRetry' action exited with code '$codeAsString' which couldn't be " +
s"converted to an Integer. Task will not be retried with more memory. Error: ${ExceptionUtils.getMessage(e)}")
false
}
case None => false
}
}
// read if the file exists
def readMemoryRetryRCFile(fileExists: Boolean): Future[Option[String]] = {
if (fileExists)
asyncIo.contentAsStringAsync(jobPaths.memoryRetryRC, None, failOnOverflow = false).map(Option(_))
else
Future.successful(None)
}
//finally : assign the yielded variable
for {
fileExists <- asyncIo.existsAsync(jobPaths.memoryRetryRC)
retryCheckRCAsOption <- readMemoryRetryRCFile(fileExists)
retryWithMoreMemory = returnCodeAsBoolean(retryCheckRCAsOption)
} yield retryWithMoreMemory
}


/**
* Send the job id of the running job to the key value store.
*
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,8 @@ docker {
}
dockerhub.num-threads = 10
quay.num-threads = 10
ecr.num-threads = 10
ecr-public.num-threads = 10
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import cromwell.core.actor.StreamIntegration.{BackPressure, StreamContext}
import cromwell.core.{Dispatcher, DockerConfiguration}
import cromwell.docker.DockerInfoActor._
import cromwell.docker.registryv2.DockerRegistryV2Abstract
import cromwell.docker.registryv2.flows.aws.{AmazonEcr, AmazonEcrPublic}
import cromwell.docker.registryv2.flows.dockerhub.DockerHubRegistry
import cromwell.docker.registryv2.flows.google.GoogleRegistry
import cromwell.docker.registryv2.flows.quay.QuayRegistry
Expand Down Expand Up @@ -234,7 +235,9 @@ object DockerInfoActor {
List(
("dockerhub", { c: DockerRegistryConfig => new DockerHubRegistry(c) }),
("google", { c: DockerRegistryConfig => new GoogleRegistry(c) }),
("quay", { c: DockerRegistryConfig => new QuayRegistry(c) })
("quay", { c: DockerRegistryConfig => new QuayRegistry(c) }),
("ecr", {c: DockerRegistryConfig => new AmazonEcr(c)}),
("ecr-public", {c: DockerRegistryConfig => new AmazonEcrPublic(c)})
).traverse[ErrorOr, DockerRegistry]({
case (configPath, constructor) => DockerRegistryConfig.fromConfig(config.as[Config](configPath)).map(constructor)
}).unsafe("Docker registry configuration")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,25 @@ object DockerCliFlow {
/** Utility for converting the flow image id to the format output by the docker cli. */
private def cliKeyFromImageId(context: DockerInfoContext): DockerCliKey = {
val imageId = context.dockerImageID
(imageId.host, imageId.repository) match {
case (None, None) =>
// For docker hub images (host == None), and don't include "library".
val repository = imageId.image
val tag = imageId.reference
DockerCliKey(repository, tag)
case _ =>
// For all other images, include the host and repository.
val repository = s"${imageId.hostAsString}${imageId.nameWithDefaultRepository}"
val tag = imageId.reference
DockerCliKey(repository, tag)
// private aws ECR does not have library, check for ECR in docker host.
if ( imageId.hostAsString.matches(raw"\d+\.dkr\.ecr\..+\.amazonaws\.com/") ) {
val repository = s"${imageId.hostAsString}${imageId.image}"
val tag = imageId.reference
DockerCliKey(repository, tag)
} else {
(imageId.host, imageId.repository) match {
case (None, None) =>
// For docker hub images (host == None), and don't include "library".
val repository = imageId.image
val tag = imageId.reference
DockerCliKey(repository, tag)
case _ =>
// For all other images, include the host and repository.
val repository = s"${imageId.hostAsString}${imageId.nameWithDefaultRepository}"
val tag = imageId.reference
DockerCliKey(repository, tag)

}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ abstract class DockerRegistryV2Abstract(override val config: DockerRegistryConfi
implicit val cs = IO.contextShift(ec)
implicit val timer = IO.timer(ec)

protected val authorizationScheme: AuthScheme = AuthScheme.Bearer

/**
* This is the main function. Given a docker context and an http client, retrieve information about the docker image.
*/
Expand Down Expand Up @@ -204,7 +206,7 @@ abstract class DockerRegistryV2Abstract(override val config: DockerRegistryConfi
* Request to get the manifest, using the auth token if provided
*/
private def manifestRequest(token: Option[String], imageId: DockerImageIdentifier): IO[Request[IO]] = {
val authorizationHeader = token.map(t => Authorization(Credentials.Token(AuthScheme.Bearer, t)))
val authorizationHeader = token.map(t => Authorization(Credentials.Token(authorizationScheme, t)))
val request = Method.GET(
buildManifestUri(imageId),
List(
Expand Down Expand Up @@ -268,7 +270,7 @@ abstract class DockerRegistryV2Abstract(override val config: DockerRegistryConfi
}
}

private def getDigestFromResponse(response: Response[IO]): IO[DockerHashResult] = response match {
protected def getDigestFromResponse(response: Response[IO]): IO[DockerHashResult] = response match {
case Status.Successful(r) => extractDigestFromHeaders(r.headers)
case Status.Unauthorized(_) => IO.raiseError(new Unauthorized)
case Status.NotFound(_) => IO.raiseError(new NotFound)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package cromwell.docker.registryv2.flows.aws

import cats.effect.IO
import cromwell.docker.{DockerImageIdentifier, DockerInfoActor, DockerRegistryConfig}
import org.http4s.AuthScheme
import org.http4s.client.Client
import org.slf4j.{Logger, LoggerFactory}
import software.amazon.awssdk.services.ecr.EcrClient

import scala.compat.java8.OptionConverters._
import scala.concurrent.Future

class AmazonEcr(override val config: DockerRegistryConfig, ecrClient: EcrClient = EcrClient.create()) extends AmazonEcrAbstract(config) {
private val logger: Logger = LoggerFactory.getLogger(this.getClass)

override protected val authorizationScheme: AuthScheme = AuthScheme.Basic

/**
* e.g 123456789012.dkr.ecr.us-east-1.amazonaws.com
*/
override protected def registryHostName(dockerImageIdentifier: DockerImageIdentifier): String = {
var hostname = dockerImageIdentifier.hostAsString
if (hostname.lastIndexOf("/").equals(hostname.length -1)) {
hostname = hostname.substring(0, hostname.length -1)
}
hostname
}
/**
* Returns true if this flow is able to process this docker image,
* false otherwise
*/
override def accepts(dockerImageIdentifier: DockerImageIdentifier): Boolean = dockerImageIdentifier.hostAsString.contains("amazonaws.com")

override protected def getToken(dockerInfoContext: DockerInfoActor.DockerInfoContext)(implicit client: Client[IO]): IO[Option[String]] = {
logger.info("obtaining access token for '{}'", dockerInfoContext.dockerImageID.fullName)
val eventualMaybeToken = Future(ecrClient.getAuthorizationToken
.authorizationData()
.stream()
.findFirst()
.asScala
.map(_.authorizationToken()))

IO.fromFuture(IO(eventualMaybeToken))
}
}
Loading

0 comments on commit d40342a

Please sign in to comment.