Skip to content

Commit

Permalink
WX-1252 Per-backend runtime attributes (#7380)
Browse files Browse the repository at this point in the history
  • Loading branch information
aednichols authored Mar 11, 2024
1 parent 79c2bff commit 6e669cf
Show file tree
Hide file tree
Showing 16 changed files with 202 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ trait BackendLifecycleActorFactory {
def dockerHashCredentials(workflowDescriptor: BackendWorkflowDescriptor,
initializationDataOption: Option[BackendInitializationData]
): List[Any] = List.empty

/**
* Allows Cromwell to self-identify which cloud it's running on for runtime attribute purposes
*/
def platform: Option[Platform] = None
}

object BackendLifecycleActorFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import cromwell.util.JsonFormatting.WomValueJsonFormatter
import common.validation.ErrorOr.ErrorOr
import wom.callable.Callable.InputDefinition
import wom.expression.IoFunctionSet
import wom.values.WomObject
import wom.values.WomValue
import wom.{RuntimeAttributes, WomExpressionException}

Expand All @@ -20,13 +21,57 @@ case class RuntimeAttributeDefinition(name: String, factoryDefault: Option[WomVa

object RuntimeAttributeDefinition {

/**
* "Evaluate" means hydrating the runtime attributes with information from the inputs
* @param unevaluated WOM expressions that may or may not reference inputs
* @param wdlFunctions The set of IO for the current backend
* @param evaluatedInputs The inputs
* @param platform Optional, directs platform-based prioritization
* @return Evaluated
*/
def evaluateRuntimeAttributes(unevaluated: RuntimeAttributes,
wdlFunctions: IoFunctionSet,
evaluatedInputs: Map[InputDefinition, WomValue]
evaluatedInputs: Map[InputDefinition, WomValue],
platform: Option[Platform] = None
): ErrorOr[Map[String, WomValue]] = {
import common.validation.ErrorOr._
val inputsMap = evaluatedInputs map { case (x, y) => x.name -> y }
unevaluated.attributes.traverseValues(_.evaluateValue(inputsMap, wdlFunctions))
val evaluated = unevaluated.attributes.traverseValues(_.evaluateValue(inputsMap, wdlFunctions))

// Platform mapping must come after evaluation because we need to evaluate
// e.g. `gcp: userDefinedObject` to find out what its runtime value is.
// The type system informs us of this because a `WomExpression` in `unevaluated`
// cannot be safely read as a `WomObject` with a `values` map until evaluation
evaluated.map(e => applyPlatform(e, platform))
}

def applyPlatform(attributes: Map[String, WomValue], maybePlatform: Option[Platform]): Map[String, WomValue] = {

def extractPlatformAttributes(platform: Platform): Map[String, WomValue] =
attributes.get(platform.runtimeKey) match {
case Some(obj: WomObject) =>
// WDL spec: "Use objects to avoid collisions"
// https://github.com/openwdl/wdl/blob/wdl-1.1/SPEC.md#conventions-and-best-practices
obj.values
case _ =>
// A malformed non-object override such as gcp: "banana" is ignored
Map.empty
}

val platformAttributes = maybePlatform match {
case Some(platform) =>
extractPlatformAttributes(platform)
case None =>
Map.empty
}

// We've scooped our desired platform, now delete "azure", "gcp", etc.
val originalAttributesWithoutPlatforms: Map[String, WomValue] =
attributes -- Platform.all.map(_.runtimeKey)

// With `++` keys from the RHS overwrite duplicates in LHS, which is what we want
// RHS `Map.empty` is a no-op
originalAttributesWithoutPlatforms ++ platformAttributes
}

def buildMapBasedLookup(evaluatedDeclarations: Map[InputDefinition, Try[WomValue]])(identifier: String): WomValue = {
Expand Down
23 changes: 23 additions & 0 deletions backend/src/main/scala/cromwell/backend/backend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,26 @@ object CommonBackendConfigurationAttributes {
final case class AttemptedLookupResult(name: String, value: Try[WomValue]) {
def toPair = name -> value
}

sealed trait Platform {
def runtimeKey: String
}

object Platform {
def all: Seq[Platform] = Seq(Gcp, Azure, Aws)

def apply(str: String): Option[Platform] =
all.find(_.runtimeKey == str)
}

object Gcp extends Platform {
override def runtimeKey: String = "gcp"
}

object Azure extends Platform {
override def runtimeKey: String = "azure"
}

object Aws extends Platform {
override def runtimeKey: String = "aws"
}
3 changes: 3 additions & 0 deletions centaur/src/it/scala/centaur/CentaurTestSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ object CentaurTestSuite extends StrictLogging {
}

val cromwellBackends = CentaurCromwellClient.backends.unsafeRunSync().supportedBackends.map(_.toLowerCase)
val defaultBackend = CentaurCromwellClient.backends.unsafeRunSync().defaultBackend.toLowerCase
logger.info(s"Cromwell under test configured with backends ${cromwellBackends.mkString(", ")}")
logger.info(s"Unless overridden by workflow options file, tests use default backend: $defaultBackend")

def isWdlUpgradeTest(testCase: CentaurTestCase): Boolean = testCase.containsTag("wdl_upgrade")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: biscayne_new_runtime_attributes_lifesciences
testFormat: workflowsuccess
tags: ["wdl_biscayne"]

# Will run on a Cromwell that supports any one of these backends
backendsMode: any
backends: [Papi, Papiv2, GCPBatch]

files {
workflow: wdl_biscayne/biscayne_new_runtime_attributes/biscayne_new_runtime_attributes.wdl
}

metadata {
"calls.runtime_attributes_wf.runtime_attributes_task.runtimeAttributes.docker": "rockylinux:9",
"calls.runtime_attributes_wf.runtime_attributes_task.runtimeAttributes.cpu": 4
"calls.runtime_attributes_wf.runtime_attributes_task.runtimeAttributes.memory": "6 GB"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
name: biscayne_new_runtime_attributes_local
testFormat: workflowsuccess
tags: ["wdl_biscayne"]

# This test should only run in the Local suite, on its default `Local` backend. Unfortunately the `Local` backend
# leaks into other suites, so require an irrelevant `LocalNoDocker` backend that is only found in Local suite.
backends: [Local, LocalNoDocker]

files {
workflow: wdl_biscayne/biscayne_new_runtime_attributes/biscayne_new_runtime_attributes.wdl
}

# CPU, memory attributes not applicable for Local backend
metadata {
"calls.runtime_attributes_wf.runtime_attributes_task.runtimeAttributes.docker": "ubuntu:latest",
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
name: biscayne_new_runtime_attributes_tes
testFormat: workflowsuccess
tags: ["wdl_biscayne"]
backends: [TES]

files {
workflow: wdl_biscayne/biscayne_new_runtime_attributes/biscayne_new_runtime_attributes.wdl
}

metadata {
"calls.runtime_attributes_wf.runtime_attributes_task.runtimeAttributes.docker": "debian:latest",
"calls.runtime_attributes_wf.runtime_attributes_task.runtimeAttributes.cpu": 4
"calls.runtime_attributes_wf.runtime_attributes_task.runtimeAttributes.memory": "4 GB"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
version development-1.1

workflow runtime_attributes_wf {
call runtime_attributes_task
output {
String out = runtime_attributes_task.out
}
}
task runtime_attributes_task {

command <<<
echo "Zardoz"
>>>

meta {
volatile: true
}

runtime {
# Meaningless keys are ignored
banana: object {
cpuPlatform: "Banana Lake"
}

gcp: object {
# Platform-specific keys take precedence
docker: "rockylinux:9",
memory: "6 GB"
}

azure: object {
memory: "4 GB",
docker: "debian:latest"
}

# Generic keys are ignored in favor of platform ones
docker: "ubuntu:latest"
memory: "8 GB"

# We still read generic keys that are not overridden
cpu: 4
}

output {
String out = read_string(stdout())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,15 @@ case class CentaurTestCase(workflow: Workflow,

def isIgnored(supportedBackends: List[String]): Boolean = {
val backendSupported = workflow.backends match {
case AllBackendsRequired(allBackends) => allBackends forall supportedBackends.contains
case AnyBackendRequired(anyBackend) => anyBackend exists supportedBackends.contains
case OnlyBackendsAllowed(onlyBackends) => supportedBackends forall onlyBackends.contains
case AllBackendsRequired(testBackends) =>
// Test will run on servers that support all of the test's backends (or more) (default)
testBackends forall supportedBackends.contains
case AnyBackendRequired(testBackends) =>
// Test will run on servers that support at least one of the test's backends (or more)
testBackends exists supportedBackends.contains
case OnlyBackendsAllowed(testBackends) =>
// Test will run on servers that only support backends the test specifies (or fewer)
supportedBackends forall testBackends.contains
}

testOptions.ignore || !backendSupported
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ class JobPreparationActor(workflowDescriptor: EngineWorkflowDescriptor,
val unevaluatedRuntimeAttributes = jobKey.call.callable.runtimeAttributes
evaluateRuntimeAttributes(unevaluatedRuntimeAttributes,
expressionLanguageFunctions,
inputEvaluation
inputEvaluation,
factory.platform
) map curriedAddDefaultsToAttributes
}
}
Expand Down
1 change: 1 addition & 0 deletions src/ci/resources/tes_application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ backend {
dockerRoot = "/cromwell-executions"
# TES Endpoint for cromwell to use. Might look something like: "https://lz7388ada396994bb48ea5c87a02eed673689c82c2af423d03.servicebus.windows.net/something/tes/v1/tasks
endpoint = "http://127.0.0.1:9000/v1/tasks"
platform = "azure"
concurrent-job-limit = 1000
# Identity to execute the workflow as. Might look something like: "pet-2676571657071603a9eab"
workflow-execution-identity = ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ package cromwell.backend.impl.aws

import akka.actor.{ActorRef, Props}
import cromwell.backend.{
Aws,
BackendConfigurationDescriptor,
BackendInitializationData,
BackendWorkflowDescriptor,
JobExecutionMap
JobExecutionMap,
Platform
}
import cromwell.backend.standard.{
StandardAsyncExecutionActor,
Expand Down Expand Up @@ -106,4 +108,6 @@ case class AwsBatchBackendLifecycleActorFactory(name: String, configurationDescr

override def backendSingletonActorProps(serviceRegistryActor: ActorRef): Option[Props] =
Option(AwsBatchSingletonActor.props(configuration.awsConfig.region, Option(configuration.awsAuth)))

override def platform: Option[Platform] = Option(Aws)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import cromwell.backend.{
BackendConfigurationDescriptor,
BackendInitializationData,
BackendWorkflowDescriptor,
JobExecutionMap
Gcp,
JobExecutionMap,
Platform
}
import cromwell.cloudsupport.gcp.GoogleConfiguration
import cromwell.core.CallOutputs
Expand Down Expand Up @@ -97,6 +99,8 @@ class GcpBatchBackendLifecycleActorFactory(override val name: String,
GcpBatchBackendSingletonActor.props(requestFactory, serviceRegistryActor = serviceRegistryActor)(requestHandler)
)
}

override def platform: Option[Platform] = Option(Gcp)
}

object GcpBatchBackendLifecycleActorFactory extends StrictLogging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ abstract class PipelinesApiBackendLifecycleActorFactory(
List(dockerCredentials, googleCredentials).flatten
case _ => List.empty[Any]
}

override def platform: Option[Platform] = Option(Gcp)
}

object PipelinesApiBackendLifecycleActorFactory extends StrictLogging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ case class TesBackendLifecycleActorFactory(name: String, configurationDescriptor
restarting: Boolean
): StandardInitializationActorParams =
TesInitializationActorParams(workflowDescriptor, calls, tesConfiguration, serviceRegistryActor)

override def platform: Option[Platform] = tesConfiguration.platform
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package cromwell.backend.impl.tes

import com.typesafe.config.Config
import cromwell.backend.BackendConfigurationDescriptor
import cromwell.backend.{BackendConfigurationDescriptor, Platform}
import cromwell.core.retry.SimpleExponentialBackoff
import net.ceedubs.ficus.Ficus._

Expand All @@ -12,6 +12,8 @@ class TesConfiguration(val configurationDescriptor: BackendConfigurationDescript

val endpointURL = configurationDescriptor.backendConfig.getString("endpoint")
val runtimeConfig = configurationDescriptor.backendRuntimeAttributesConfig
val platform: Option[Platform] =
configurationDescriptor.backendConfig.as[Option[String]]("platform").flatMap(Platform(_))
val useBackendParameters =
configurationDescriptor.backendConfig
.as[Option[Boolean]](TesConfiguration.useBackendParametersKey)
Expand Down

0 comments on commit 6e669cf

Please sign in to comment.