Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WX-1252 Per-backend runtime attributes #7380

Merged
merged 25 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ trait BackendLifecycleActorFactory {
def dockerHashCredentials(workflowDescriptor: BackendWorkflowDescriptor,
initializationDataOption: Option[BackendInitializationData]
): List[Any] = List.empty

/**
* Allows Cromwell to self-identify which cloud it's running on for runtime attribute purposes
*/
def platform: Option[Platform] = None
}

object BackendLifecycleActorFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import cromwell.util.JsonFormatting.WomValueJsonFormatter
import common.validation.ErrorOr.ErrorOr
import wom.callable.Callable.InputDefinition
import wom.expression.IoFunctionSet
import wom.values.WomObject
import wom.values.WomValue
import wom.{RuntimeAttributes, WomExpressionException}

Expand All @@ -20,13 +21,57 @@ case class RuntimeAttributeDefinition(name: String, factoryDefault: Option[WomVa

object RuntimeAttributeDefinition {

/**
* "Evaluate" means hydrating the runtime attributes with information from the inputs
* @param unevaluated WOM expressions that may or may not reference inputs
* @param wdlFunctions The set of IO for the current backend
* @param evaluatedInputs The inputs
* @param platform Optional, directs platform-based prioritization
* @return Evaluated
*/
def evaluateRuntimeAttributes(unevaluated: RuntimeAttributes,
wdlFunctions: IoFunctionSet,
evaluatedInputs: Map[InputDefinition, WomValue]
evaluatedInputs: Map[InputDefinition, WomValue],
platform: Option[Platform] = None
): ErrorOr[Map[String, WomValue]] = {
import common.validation.ErrorOr._
val inputsMap = evaluatedInputs map { case (x, y) => x.name -> y }
unevaluated.attributes.traverseValues(_.evaluateValue(inputsMap, wdlFunctions))
val evaluated = unevaluated.attributes.traverseValues(_.evaluateValue(inputsMap, wdlFunctions))

// Platform mapping must come after evaluation because we need to evaluate
// e.g. `gcp: userDefinedObject` to find out what its runtime value is.
// The type system informs us of this because a `WomExpression` in `unevaluated`
// cannot be safely read as a `WomObject` with a `values` map until evaluation
evaluated.map(e => applyPlatform(e, platform))
}

def applyPlatform(attributes: Map[String, WomValue], maybePlatform: Option[Platform]): Map[String, WomValue] = {

def extractPlatformAttributes(platform: Platform): Map[String, WomValue] =
attributes.get(platform.runtimeKey) match {
case Some(obj: WomObject) =>
// WDL spec: "Use objects to avoid collisions"
// https://github.com/openwdl/wdl/blob/wdl-1.1/SPEC.md#conventions-and-best-practices
obj.values
case _ =>
// A malformed non-object override such as gcp: "banana" is ignored
Map.empty
}

val platformAttributes = maybePlatform match {
case Some(platform) =>
extractPlatformAttributes(platform)
case None =>
Map.empty
}

// We've scooped our desired platform, now delete "azure", "gcp", etc.
val originalAttributesWithoutPlatforms: Map[String, WomValue] =
attributes -- Platform.all.map(_.runtimeKey)

// With `++` keys from the RHS overwrite duplicates in LHS, which is what we want
// RHS `Map.empty` is a no-op
originalAttributesWithoutPlatforms ++ platformAttributes
}

def buildMapBasedLookup(evaluatedDeclarations: Map[InputDefinition, Try[WomValue]])(identifier: String): WomValue = {
Expand Down
23 changes: 23 additions & 0 deletions backend/src/main/scala/cromwell/backend/backend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,26 @@
final case class AttemptedLookupResult(name: String, value: Try[WomValue]) {
def toPair = name -> value
}

sealed trait Platform {
def runtimeKey: String
}

object Platform {
def all: Seq[Platform] = Seq(Gcp, Azure, Aws)

def apply(str: String): Option[Platform] =
all.find(_.runtimeKey == str)

Check warning on line 167 in backend/src/main/scala/cromwell/backend/backend.scala

View check run for this annotation

Codecov / codecov/patch

backend/src/main/scala/cromwell/backend/backend.scala#L167

Added line #L167 was not covered by tests
}

object Gcp extends Platform {
override def runtimeKey: String = "gcp"
}

object Azure extends Platform {
override def runtimeKey: String = "azure"
}

object Aws extends Platform {
override def runtimeKey: String = "aws"
}
3 changes: 3 additions & 0 deletions centaur/src/it/scala/centaur/CentaurTestSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ object CentaurTestSuite extends StrictLogging {
}

val cromwellBackends = CentaurCromwellClient.backends.unsafeRunSync().supportedBackends.map(_.toLowerCase)
val defaultBackend = CentaurCromwellClient.backends.unsafeRunSync().defaultBackend.toLowerCase
logger.info(s"Cromwell under test configured with backends ${cromwellBackends.mkString(", ")}")
logger.info(s"Unless overridden by workflow options file, tests use default backend: $defaultBackend")

def isWdlUpgradeTest(testCase: CentaurTestCase): Boolean = testCase.containsTag("wdl_upgrade")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: biscayne_new_runtime_attributes_lifesciences
testFormat: workflowsuccess
tags: ["wdl_biscayne"]

# Will run on a Cromwell that supports any one of these backends
backendsMode: any
backends: [Papi, Papiv2, GCPBatch]

files {
workflow: wdl_biscayne/biscayne_new_runtime_attributes/biscayne_new_runtime_attributes.wdl
}

metadata {
"calls.runtime_attributes_wf.runtime_attributes_task.runtimeAttributes.docker": "rockylinux:9",
"calls.runtime_attributes_wf.runtime_attributes_task.runtimeAttributes.cpu": 4
"calls.runtime_attributes_wf.runtime_attributes_task.runtimeAttributes.memory": "6 GB"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
name: biscayne_new_runtime_attributes_local
testFormat: workflowsuccess
tags: ["wdl_biscayne"]

# This test should only run in the Local suite, on its default `Local` backend. Unfortunately the `Local` backend
# leaks into other suites, so require an irrelevant `LocalNoDocker` backend that is only found in Local suite.
backends: [Local, LocalNoDocker]

files {
workflow: wdl_biscayne/biscayne_new_runtime_attributes/biscayne_new_runtime_attributes.wdl
}

# CPU, memory attributes not applicable for Local backend
metadata {
"calls.runtime_attributes_wf.runtime_attributes_task.runtimeAttributes.docker": "ubuntu:latest",
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
name: biscayne_new_runtime_attributes_tes
testFormat: workflowsuccess
tags: ["wdl_biscayne"]
backends: [TES]

files {
workflow: wdl_biscayne/biscayne_new_runtime_attributes/biscayne_new_runtime_attributes.wdl
}

metadata {
"calls.runtime_attributes_wf.runtime_attributes_task.runtimeAttributes.docker": "debian:latest",
"calls.runtime_attributes_wf.runtime_attributes_task.runtimeAttributes.cpu": 4
"calls.runtime_attributes_wf.runtime_attributes_task.runtimeAttributes.memory": "4 GB"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
version development-1.1

workflow runtime_attributes_wf {
call runtime_attributes_task
output {
String out = runtime_attributes_task.out
}
}

task runtime_attributes_task {

command <<<
echo "Zardoz"
>>>

meta {
volatile: true
}

runtime {
# Meaningless keys are ignored
banana: object {
cpuPlatform: "Banana Lake"
}

gcp: object {
# Platform-specific keys take precedence
docker: "rockylinux:9",
memory: "6 GB"
}

azure: object {
memory: "4 GB",
docker: "debian:latest"
}

# Generic keys are ignored in favor of platform ones
docker: "ubuntu:latest"
memory: "8 GB"

# We still read generic keys that are not overridden
cpu: 4
}

output {
String out = read_string(stdout())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,15 @@ case class CentaurTestCase(workflow: Workflow,

def isIgnored(supportedBackends: List[String]): Boolean = {
val backendSupported = workflow.backends match {
case AllBackendsRequired(allBackends) => allBackends forall supportedBackends.contains
case AnyBackendRequired(anyBackend) => anyBackend exists supportedBackends.contains
case OnlyBackendsAllowed(onlyBackends) => supportedBackends forall onlyBackends.contains
case AllBackendsRequired(testBackends) =>
// Test will run on servers that support all of the test's backends (or more) (default)
testBackends forall supportedBackends.contains
case AnyBackendRequired(testBackends) =>
// Test will run on servers that support at least one of the test's backends (or more)
testBackends exists supportedBackends.contains
case OnlyBackendsAllowed(testBackends) =>
// Test will run on servers that only support backends the test specifies (or fewer)
supportedBackends forall testBackends.contains
}

testOptions.ignore || !backendSupported
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ class JobPreparationActor(workflowDescriptor: EngineWorkflowDescriptor,
val unevaluatedRuntimeAttributes = jobKey.call.callable.runtimeAttributes
evaluateRuntimeAttributes(unevaluatedRuntimeAttributes,
expressionLanguageFunctions,
inputEvaluation
inputEvaluation,
factory.platform
) map curriedAddDefaultsToAttributes
}
}
Expand Down
1 change: 1 addition & 0 deletions src/ci/resources/tes_application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ backend {
dockerRoot = "/cromwell-executions"
# TES Endpoint for cromwell to use. Might look something like: "https://lz7388ada396994bb48ea5c87a02eed673689c82c2af423d03.servicebus.windows.net/something/tes/v1/tasks
endpoint = "http://127.0.0.1:9000/v1/tasks"
platform = "azure"
concurrent-job-limit = 1000
# Identity to execute the workflow as. Might look something like: "pet-2676571657071603a9eab"
workflow-execution-identity = ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ package cromwell.backend.impl.aws

import akka.actor.{ActorRef, Props}
import cromwell.backend.{
Aws,
BackendConfigurationDescriptor,
BackendInitializationData,
BackendWorkflowDescriptor,
JobExecutionMap
JobExecutionMap,
Platform
}
import cromwell.backend.standard.{
StandardAsyncExecutionActor,
Expand Down Expand Up @@ -106,4 +108,6 @@ case class AwsBatchBackendLifecycleActorFactory(name: String, configurationDescr

override def backendSingletonActorProps(serviceRegistryActor: ActorRef): Option[Props] =
Option(AwsBatchSingletonActor.props(configuration.awsConfig.region, Option(configuration.awsAuth)))

override def platform: Option[Platform] = Option(Aws)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import cromwell.backend.{
BackendConfigurationDescriptor,
BackendInitializationData,
BackendWorkflowDescriptor,
JobExecutionMap
Gcp,
JobExecutionMap,
Platform
}
import cromwell.cloudsupport.gcp.GoogleConfiguration
import cromwell.core.CallOutputs
Expand Down Expand Up @@ -97,6 +99,8 @@ class GcpBatchBackendLifecycleActorFactory(override val name: String,
GcpBatchBackendSingletonActor.props(requestFactory, serviceRegistryActor = serviceRegistryActor)(requestHandler)
)
}

override def platform: Option[Platform] = Option(Gcp)
}

object GcpBatchBackendLifecycleActorFactory extends StrictLogging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ abstract class PipelinesApiBackendLifecycleActorFactory(
List(dockerCredentials, googleCredentials).flatten
case _ => List.empty[Any]
}

override def platform: Option[Platform] = Option(Gcp)
}

object PipelinesApiBackendLifecycleActorFactory extends StrictLogging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@
restarting: Boolean
): StandardInitializationActorParams =
TesInitializationActorParams(workflowDescriptor, calls, tesConfiguration, serviceRegistryActor)

override def platform: Option[Platform] = tesConfiguration.platform

Check warning on line 28 in supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesBackendLifecycleActorFactory.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesBackendLifecycleActorFactory.scala#L28

Added line #L28 was not covered by tests
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package cromwell.backend.impl.tes

import com.typesafe.config.Config
import cromwell.backend.BackendConfigurationDescriptor
import cromwell.backend.{BackendConfigurationDescriptor, Platform}
import cromwell.core.retry.SimpleExponentialBackoff
import net.ceedubs.ficus.Ficus._

Expand All @@ -12,6 +12,8 @@ class TesConfiguration(val configurationDescriptor: BackendConfigurationDescript

val endpointURL = configurationDescriptor.backendConfig.getString("endpoint")
val runtimeConfig = configurationDescriptor.backendRuntimeAttributesConfig
val platform: Option[Platform] =
configurationDescriptor.backendConfig.as[Option[String]]("platform").flatMap(Platform(_))
val useBackendParameters =
configurationDescriptor.backendConfig
.as[Option[Boolean]](TesConfiguration.useBackendParametersKey)
Expand Down
Loading