Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WX-1792] Emit Cost Per Hour #7550

Merged
merged 51 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
5d5f06f
Improve doc
jgainerdewar Sep 4, 2024
8346395
Merge branch 'develop' into jgd_WX-1784_realCostResponse
jgainerdewar Sep 7, 2024
0024b74
Terrible but functional
jgainerdewar Sep 9, 2024
d0b3164
Remove support for includeTaskBreakdown, includeSubworkflowBreakdown
jgainerdewar Sep 9, 2024
a29e9b7
Nicer json walking
jgainerdewar Sep 11, 2024
2ca8c3f
Better status handling
jgainerdewar Sep 11, 2024
9fccf2b
Report cost computation errors
jgainerdewar Sep 11, 2024
f5b5b9b
More better error handling
jgainerdewar Sep 11, 2024
c02b264
Merge branch 'develop' into jgd_WX-1784_realCostResponse
jgainerdewar Sep 12, 2024
520ff4a
Remove dead code
jgainerdewar Sep 12, 2024
df6dcc1
Merge branch 'jgd_WX-1784_realCostResponse' of github.com:broadinstit…
jgainerdewar Sep 12, 2024
8baf043
Interpret BigDecimal as a number in metadata
jgainerdewar Sep 13, 2024
39f6d3d
MetadataBuilderActor unit tests
jgainerdewar Sep 13, 2024
e5a51ae
Standardize on BigD
jgainerdewar Sep 13, 2024
ceb077c
Oops there's more
jgainerdewar Sep 13, 2024
da531e9
Merge branch 'develop' into jgd_WX-1784_realCostResponse
THWiseman Sep 16, 2024
cbacb80
More tests
jgainerdewar Sep 16, 2024
39ec889
Merge branch 'jgd_WX-1784_realCostResponse' of github.com:broadinstit…
jgainerdewar Sep 16, 2024
984e50e
Use a real currency
jgainerdewar Sep 17, 2024
7de9ca1
Merge branch 'develop' into jgd_WX-1784_realCostResponse
jgainerdewar Sep 18, 2024
c044228
the gist
THWiseman Sep 18, 2024
331aeb0
Merge branch 'develop' into WX-1792-emit-cost-per-hour
THWiseman Sep 18, 2024
ac5f731
bonus debug logging
THWiseman Sep 19, 2024
26e841d
RAM usage
THWiseman Sep 19, 2024
df7abf7
Bug fixes
jgainerdewar Sep 23, 2024
21c0376
Cost catalog massaging
jgainerdewar Sep 25, 2024
7c18124
Standardize resource type
jgainerdewar Sep 25, 2024
a8ed112
Drop cost catalog entries we won't use
jgainerdewar Sep 25, 2024
b1f7237
Fall back to predefined sku for N1 custom
jgainerdewar Sep 25, 2024
d59c321
Rearrange
jgainerdewar Sep 26, 2024
7b8452f
Ignore all SKUs except the ones we know we care about
jgainerdewar Sep 26, 2024
40ba5fa
Check for cost catalog key collisions
jgainerdewar Sep 26, 2024
8796a94
Fix tests
jgainerdewar Sep 27, 2024
7a3b4f5
Refactor cost calculations
jgainerdewar Sep 27, 2024
6467f04
Try -> ErrorOr
jgainerdewar Sep 27, 2024
30df098
Require logger
jgainerdewar Sep 27, 2024
248e16d
Fixes
jgainerdewar Sep 30, 2024
f88ebff
Unit tests
jgainerdewar Sep 30, 2024
2baf2c0
Logging improvements
jgainerdewar Sep 30, 2024
6efa7df
More informative collision checking
jgainerdewar Sep 30, 2024
2f51c50
Prevent future hair-tearing due to case changes
jgainerdewar Sep 30, 2024
fbcd649
Condense tests
jgainerdewar Sep 30, 2024
65eac4f
Merge branch 'develop' into WX-1792-emit-cost-per-hour
jgainerdewar Sep 30, 2024
2c760b4
Don't be concerned with GCP-specific things in PollResultMonitorActor
jgainerdewar Sep 30, 2024
5b5a054
Merge branch 'WX-1792-emit-cost-per-hour' of github.com:broadinstitut…
jgainerdewar Sep 30, 2024
e3086f9
Scalafmt
jgainerdewar Sep 30, 2024
b31a6b8
Merge branch 'develop' into WX-1792-emit-cost-per-hour
jgainerdewar Sep 30, 2024
e93b078
Clearer log message
jgainerdewar Oct 1, 2024
de345c2
Enable disablement of cost catalog, only keep google client open as n…
jgainerdewar Oct 2, 2024
1d49347
Fix tests?
jgainerdewar Oct 4, 2024
c0c52af
Merge branch 'develop' into WX-1792-emit-cost-per-hour
jgainerdewar Oct 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package cromwell.backend.standard.pollmonitoring

import akka.actor.{Actor, ActorRef}
import cromwell.backend.{BackendJobDescriptor, BackendWorkflowDescriptor, Platform}
import cromwell.backend.validation.{
Expand All @@ -9,6 +10,7 @@ import cromwell.backend.validation.{
ValidatedRuntimeAttributes
}
import cromwell.core.logging.JobLogger
import cromwell.services.cost.InstantiatedVmInfo
import cromwell.services.metadata.CallMetadataKeys
import cromwell.services.metrics.bard.BardEventing.BardEventRequest
import cromwell.services.metrics.bard.model.TaskSummaryEvent
Expand All @@ -26,7 +28,7 @@ case class PollMonitorParameters(
jobDescriptor: BackendJobDescriptor,
validatedRuntimeAttributes: ValidatedRuntimeAttributes,
platform: Option[Platform],
logger: Option[JobLogger]
logger: JobLogger
)

/**
Expand All @@ -42,6 +44,9 @@ trait PollResultMonitorActor[PollResultType] extends Actor {
// Time that the user VM started spending money.
def extractStartTimeFromRunState(pollStatus: PollResultType): Option[OffsetDateTime]

// Used to kick off a cost calculation
def extractVmInfoFromRunState(pollStatus: PollResultType): Option[InstantiatedVmInfo]

// Time that the user VM stopped spending money.
def extractEndTimeFromRunState(pollStatus: PollResultType): Option[OffsetDateTime]

Expand Down Expand Up @@ -99,6 +104,7 @@ trait PollResultMonitorActor[PollResultType] extends Actor {
Option.empty
private var vmStartTime: Option[OffsetDateTime] = Option.empty
private var vmEndTime: Option[OffsetDateTime] = Option.empty
protected var vmCostPerHour: Option[BigDecimal] = Option.empty

def processPollResult(pollStatus: PollResultType): Unit = {
// Make sure jobStartTime remains the earliest event time ever seen
Expand All @@ -122,8 +128,16 @@ trait PollResultMonitorActor[PollResultType] extends Actor {
tellMetadata(Map(CallMetadataKeys.VmEndTime -> end))
}
}
// If we don't yet have a cost per hour and we can extract VM info, send a cost request to the catalog service.
// We expect it to reply with an answer, which is handled in receive.
// NB: Due to the nature of async code, we may send a few cost requests before we get a response back.
if (vmCostPerHour.isEmpty) {
extractVmInfoFromRunState(pollStatus).foreach(handleVmCostLookup)
}
}

def handleVmCostLookup(vmInfo: InstantiatedVmInfo): Unit

// When a job finishes, the bard actor needs to know about the timing in order to record metrics.
// Cost related metadata should already have been handled in processPollResult.
def handleAsyncJobFinish(terminalStateName: String): Unit =
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -607,9 +607,11 @@ services {
}
}

CostCatalogService {
// When enabled, Cromwell will store vmCostPerHour metadata for GCP tasks
GcpCostCatalogService {
class = "cromwell.services.cost.GcpCostCatalogService"
config {
enabled = false
catalogExpirySeconds = 86400
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package cromwell.services.cost
import com.typesafe.config.Config
import net.ceedubs.ficus.Ficus._

final case class CostCatalogConfig(catalogExpirySeconds: Int)
final case class CostCatalogConfig(enabled: Boolean, catalogExpirySeconds: Int)

object CostCatalogConfig {
def apply(config: Config): CostCatalogConfig = CostCatalogConfig(config.as[Int]("catalogExpirySeconds"))
def apply(config: Config): CostCatalogConfig =
CostCatalogConfig(config.as[Boolean]("enabled"), config.as[Int]("catalogExpirySeconds"))
}
Original file line number Diff line number Diff line change
@@ -1,31 +1,121 @@
package cromwell.services.cost

import akka.actor.{Actor, ActorRef}
import cats.implicits.catsSyntaxValidatedId
import com.google.`type`.Money
import com.google.cloud.billing.v1._
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import common.util.StringUtil.EnhancedToStringable
import common.validation.ErrorOr._
import common.validation.ErrorOr.ErrorOr
import cromwell.services.ServiceRegistryActor.ServiceRegistryMessage
import cromwell.services.cost.GcpCostCatalogService.{COMPUTE_ENGINE_SERVICE_NAME, DEFAULT_CURRENCY_CODE}
import cromwell.util.GracefulShutdownHelper.ShutdownCommand

import java.time.{Duration, Instant}
import scala.jdk.CollectionConverters.IterableHasAsScala
import java.time.temporal.ChronoUnit.SECONDS
import scala.util.Using

case class CostCatalogKey(machineType: Option[MachineType],
usageType: Option[UsageType],
machineCustomization: Option[MachineCustomization],
resourceGroup: Option[ResourceGroup]
case class CostCatalogKey(machineType: MachineType,
usageType: UsageType,
machineCustomization: MachineCustomization,
resourceType: ResourceType,
region: String
)

object CostCatalogKey {

// Specifically support only the SKUs that we know we can use. This is brittle and I hate it, but the more structured
// fields available in the SKU don't give us enough information without relying on the human-readable descriptions.
//
// N1: We usually use custom machines but SKUs are only available for predefined; we'll fall back to these SKUs.
// N2 and N2D: We only use custom machines.

// Use this regex to filter down to just the SKUs we are interested in.
// NB: This should be updated if we add new machine types or the cost catalog descriptions change
final val expectedSku =
(".*?N1 Predefined Instance (Core|Ram) .*|" +
".*?N2 Custom Instance (Core|Ram) .*|" +
".*?N2D AMD Custom Instance (Core|Ram) .*").r

Check warning on line 41 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L40-L41

Added lines #L40 - L41 were not covered by tests

def apply(sku: Sku): List[CostCatalogKey] =
for {
_ <- expectedSku.findFirstIn(sku.getDescription).toList
machineType <- MachineType.fromSku(sku).toList
resourceType <- ResourceType.fromSku(sku).toList
usageType <- UsageType.fromSku(sku).toList
machineCustomization <- MachineCustomization.fromSku(sku).toList
region <- sku.getServiceRegionsList.asScala.toList
} yield CostCatalogKey(machineType, usageType, machineCustomization, resourceType, region)

Check warning on line 51 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L45-L51

Added lines #L45 - L51 were not covered by tests

def apply(instantiatedVmInfo: InstantiatedVmInfo, resourceType: ResourceType): ErrorOr[CostCatalogKey] =
MachineType.fromGoogleMachineTypeString(instantiatedVmInfo.machineType).map { mType =>
CostCatalogKey(

Check warning on line 55 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L54-L55

Added lines #L54 - L55 were not covered by tests
mType,
UsageType.fromBoolean(instantiatedVmInfo.preemptible),
MachineCustomization.fromMachineTypeString(instantiatedVmInfo.machineType),

Check warning on line 58 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L57-L58

Added lines #L57 - L58 were not covered by tests
resourceType,
instantiatedVmInfo.region

Check warning on line 60 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L60

Added line #L60 was not covered by tests
)
}
}

case class GcpCostLookupRequest(vmInfo: InstantiatedVmInfo, replyTo: ActorRef) extends ServiceRegistryMessage {
override def serviceName: String = "GcpCostCatalogService"
}
case class GcpCostLookupResponse(vmInfo: InstantiatedVmInfo, calculatedCost: ErrorOr[BigDecimal])
case class CostCatalogValue(catalogObject: Sku)
case class ExpiringGcpCostCatalog(catalog: Map[CostCatalogKey, CostCatalogValue], fetchTime: Instant)
object ExpiringGcpCostCatalog {
def empty: ExpiringGcpCostCatalog = ExpiringGcpCostCatalog(Map.empty, Instant.MIN)
}

object GcpCostCatalogService {
// Can be gleaned by using googleClient.listServices
private val COMPUTE_ENGINE_SERVICE_NAME = "services/6F81-5844-456A"

// ISO 4217 https://developers.google.com/adsense/management/appendix/currencies
private val DEFAULT_CURRENCY_CODE = "USD"

def getMostRecentPricingInfo(sku: Sku): PricingInfo = {
val mostRecentPricingInfoIndex = sku.getPricingInfoCount - 1
sku.getPricingInfo(mostRecentPricingInfoIndex)

Check warning on line 84 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L83-L84

Added lines #L83 - L84 were not covered by tests
}

// See: https://cloud.google.com/billing/v1/how-tos/catalog-api
def calculateCpuPricePerHour(cpuSku: Sku, coreCount: Int): ErrorOr[BigDecimal] = {
val pricingInfo = getMostRecentPricingInfo(cpuSku)
val usageUnit = pricingInfo.getPricingExpression.getUsageUnit
if (usageUnit == "h") {

Check warning on line 91 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L89-L91

Added lines #L89 - L91 were not covered by tests
// Price per hour of a single core
// NB: Ignoring "TieredRates" here (the idea that stuff gets cheaper the more you use).
// Technically, we should write code that determines which tier(s) to use.
// In practice, from what I've seen, CPU cores and RAM don't have more than a single tier.
val costPerUnit: Money = pricingInfo.getPricingExpression.getTieredRates(0).getUnitPrice

Check warning on line 96 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L96

Added line #L96 was not covered by tests
val costPerCorePerHour: BigDecimal =
costPerUnit.getUnits + (costPerUnit.getNanos * 10e-9) // Same as above, but as a big decimal
val result = costPerCorePerHour * coreCount
result.validNel

Check warning on line 100 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L98-L100

Added lines #L98 - L100 were not covered by tests
} else {
s"Expected usage units of CPUs to be 'h'. Got ${usageUnit}".invalidNel

Check warning on line 102 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L102

Added line #L102 was not covered by tests
}
}

def calculateRamPricePerHour(ramSku: Sku, ramGbCount: Double): ErrorOr[BigDecimal] = {
val pricingInfo = getMostRecentPricingInfo(ramSku)
val usageUnit = pricingInfo.getPricingExpression.getUsageUnit
if (usageUnit == "GiBy.h") {
val costPerUnit: Money = pricingInfo.getPricingExpression.getTieredRates(0).getUnitPrice

Check warning on line 110 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L107-L110

Added lines #L107 - L110 were not covered by tests
val costPerGbHour: BigDecimal =
costPerUnit.getUnits + (costPerUnit.getNanos * 10e-9) // Same as above, but as a big decimal
val result = costPerGbHour * ramGbCount
result.validNel

Check warning on line 114 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L112-L114

Added lines #L112 - L114 were not covered by tests
} else {
s"Expected usage units of RAM to be 'GiBy.h'. Got ${usageUnit}".invalidNel

Check warning on line 116 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L116

Added line #L116 was not covered by tests
}
}
}

/**
Expand All @@ -36,37 +126,40 @@
extends Actor
with LazyLogging {

private val maxCatalogLifetime: Duration =
Duration.of(CostCatalogConfig(serviceConfig).catalogExpirySeconds.longValue, SECONDS)
private val costCatalogConfig = CostCatalogConfig(serviceConfig)

private var googleClient: Option[CloudCatalogClient] = Option.empty
private val maxCatalogLifetime: Duration =
Duration.of(costCatalogConfig.catalogExpirySeconds.longValue, SECONDS)

// Cached catalog. Refreshed lazily when older than maxCatalogLifetime.
private var costCatalog: Option[ExpiringGcpCostCatalog] = Option.empty
private var costCatalog: ExpiringGcpCostCatalog = ExpiringGcpCostCatalog.empty

/**
* Returns the SKU for a given key, if it exists
*/
def getSku(key: CostCatalogKey): Option[CostCatalogValue] = getOrFetchCachedCatalog().get(key)

protected def fetchNewCatalog: Iterable[Sku] = {
if (googleClient.isEmpty) {
// We use option rather than lazy here so that the client isn't created when it is told to shutdown (see receive override)
googleClient = Some(CloudCatalogClient.create)
protected def fetchSkuIterable(googleClient: CloudCatalogClient): Iterable[Sku] =
makeInitialWebRequest(googleClient).iterateAll().asScala

Check warning on line 143 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L143

Added line #L143 was not covered by tests

protected def makeCatalog(skus: Iterable[Sku]): ExpiringGcpCostCatalog =
ExpiringGcpCostCatalog(processCostCatalog(skus), Instant.now())

Check warning on line 146 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L146

Added line #L146 was not covered by tests

protected def fetchNewCatalog: ExpiringGcpCostCatalog =
Using.resource(CloudCatalogClient.create) { googleClient =>
makeCatalog(makeInitialWebRequest(googleClient).iterateAll().asScala)

Check warning on line 150 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L149-L150

Added lines #L149 - L150 were not covered by tests
}
makeInitialWebRequest(googleClient.get).iterateAll().asScala
}

def getCatalogAge: Duration =
Duration.between(costCatalog.map(c => c.fetchTime).getOrElse(Instant.ofEpochMilli(0)), Instant.now())
private def isCurrentCatalogExpired: Boolean = getCatalogAge.toNanos > maxCatalogLifetime.toNanos
def getCatalogAge: Duration = Duration.between(costCatalog.fetchTime, Instant.now())

Check warning on line 153 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L153

Added line #L153 was not covered by tests

private def isCurrentCatalogExpired: Boolean = getCatalogAge.toSeconds > maxCatalogLifetime.toSeconds

Check warning on line 155 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L155

Added line #L155 was not covered by tests

private def getOrFetchCachedCatalog(): Map[CostCatalogKey, CostCatalogValue] = {
if (costCatalog.isEmpty || isCurrentCatalogExpired) {
if (isCurrentCatalogExpired) {

Check warning on line 158 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L158

Added line #L158 was not covered by tests
logger.info("Fetching a new GCP public cost catalog.")
costCatalog = Some(ExpiringGcpCostCatalog(processCostCatalog(fetchNewCatalog), Instant.now()))
costCatalog = fetchNewCatalog

Check warning on line 160 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L160

Added line #L160 was not covered by tests
}
costCatalog.map(expiringCatalog => expiringCatalog.catalog).getOrElse(Map.empty)
costCatalog.catalog

Check warning on line 162 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L162

Added line #L162 was not covered by tests
}

/**
Expand All @@ -88,23 +181,63 @@
* Ideally, we don't want to have an entire, unprocessed, cost catalog in memory at once since it's ~20MB.
*/
private def processCostCatalog(skus: Iterable[Sku]): Map[CostCatalogKey, CostCatalogValue] =
// TODO: Account for key collisions (same key can be in multiple regions)
// TODO: reduce memory footprint of returned map (don't store entire SKU object)
skus.foldLeft(Map.empty[CostCatalogKey, CostCatalogValue]) { case (acc, sku) =>
acc + convertSkuToKeyValuePair(sku)
val keys = CostCatalogKey(sku)

Check warning on line 185 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L185

Added line #L185 was not covered by tests

// We expect that every cost catalog key is unique, but changes to the SKUs returned by Google may
// break this assumption. Check and log an error if we find collisions.
val collisions = keys.flatMap(acc.get(_).toList).map(_.catalogObject.getDescription)
if (collisions.nonEmpty)
logger.error(

Check warning on line 191 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L189-L191

Added lines #L189 - L191 were not covered by tests
s"Found SKU key collision when adding ${sku.getDescription}, collides with ${collisions.mkString(", ")}"
)

acc ++ keys.map(k => (k, CostCatalogValue(sku)))

Check warning on line 195 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L195

Added line #L195 was not covered by tests
}

private def convertSkuToKeyValuePair(sku: Sku): (CostCatalogKey, CostCatalogValue) = CostCatalogKey(
machineType = MachineType.fromSku(sku),
usageType = UsageType.fromSku(sku),
machineCustomization = MachineCustomization.fromSku(sku),
resourceGroup = ResourceGroup.fromSku(sku)
) -> CostCatalogValue(sku)
def lookUpSku(instantiatedVmInfo: InstantiatedVmInfo, resourceType: ResourceType): ErrorOr[Sku] =
CostCatalogKey(instantiatedVmInfo, resourceType).flatMap { key =>

Check warning on line 199 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L199

Added line #L199 was not covered by tests
// As of Sept 2024 the cost catalog does not contain entries for custom N1 machines. If we're using N1, attempt
// to fall back to predefined.
lazy val n1PredefinedKey =
(key.machineType, key.machineCustomization) match {
case (N1, Custom) => Option(key.copy(machineCustomization = Predefined))
case _ => None
}
val sku = getSku(key).orElse(n1PredefinedKey.flatMap(getSku)).map(_.catalogObject)

Check warning on line 207 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L207

Added line #L207 was not covered by tests
sku match {
case Some(sku) => sku.validNel
case None => s"Failed to look up ${resourceType} SKU for ${instantiatedVmInfo}".invalidNel

Check warning on line 210 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L209-L210

Added lines #L209 - L210 were not covered by tests
}
}

// TODO consider caching this, answers won't change until we reload the SKUs
def calculateVmCostPerHour(instantiatedVmInfo: InstantiatedVmInfo): ErrorOr[BigDecimal] =
for {
cpuSku <- lookUpSku(instantiatedVmInfo, Cpu)
coreCount <- MachineType.extractCoreCountFromMachineTypeString(instantiatedVmInfo.machineType)
cpuPricePerHour <- GcpCostCatalogService.calculateCpuPricePerHour(cpuSku, coreCount)
ramSku <- lookUpSku(instantiatedVmInfo, Ram)
ramMbCount <- MachineType.extractRamMbFromMachineTypeString(instantiatedVmInfo.machineType)
ramGbCount = ramMbCount / 1024d // need sub-integer resolution
ramPricePerHour <- GcpCostCatalogService.calculateRamPricePerHour(ramSku, ramGbCount)
totalCost = cpuPricePerHour + ramPricePerHour

Check warning on line 224 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L217-L224

Added lines #L217 - L224 were not covered by tests
_ = logger.info(
s"Calculated vmCostPerHour of ${totalCost} " +
s"(CPU ${cpuPricePerHour} for ${coreCount} cores [${cpuSku.getDescription}], " +
s"RAM ${ramPricePerHour} for ${ramGbCount} Gb [${ramSku.getDescription}]) " +
s"for ${instantiatedVmInfo}"
)
} yield totalCost

Check warning on line 231 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L231

Added line #L231 was not covered by tests

def serviceRegistryActor: ActorRef = serviceRegistry
override def receive: Receive = {
case GcpCostLookupRequest(vmInfo, replyTo) if costCatalogConfig.enabled =>
val calculatedCost = calculateVmCostPerHour(vmInfo)
val response = GcpCostLookupResponse(vmInfo, calculatedCost)
replyTo ! response

Check warning on line 238 in services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala

View check run for this annotation

Codecov / codecov/patch

services/src/main/scala/cromwell/services/cost/GcpCostCatalogService.scala#L236-L238

Added lines #L236 - L238 were not covered by tests
case GcpCostLookupRequest(_, _) => // do nothing if we're disabled
case ShutdownCommand =>
googleClient.foreach(client => client.shutdownNow())
context stop self
case other =>
logger.error(
Expand Down
Loading
Loading