Skip to content

Commit

Permalink
WX-1110-query-fix Corrected Query to pull in attributes outside of ex…
Browse files Browse the repository at this point in the history
…ecutionStatus and backendStatus (#7185)
  • Loading branch information
JVThomas authored Jul 27, 2023
1 parent b44554a commit d89e4af
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -323,14 +323,18 @@ trait MetadataEntryComponent {
if(isPostgres) "obj.data" else "METADATA_VALUE"
}

def targetCallsSelectStatement(callFqn: String, scatterIndex: String, retryAttempt: String): String = {
s"SELECT ${callFqn}, MAX(COALESCE(${scatterIndex}, 0)) as maxScatter, MAX(COALESCE(${retryAttempt}, 0)) AS maxRetry"
def attemptAndIndexSelectStatement(callFqn: String, scatterIndex: String, retryAttempt: String, variablePrefix: String): String = {
s"SELECT ${callFqn}, MAX(COALESCE(${scatterIndex}, 0)) as ${variablePrefix}Scatter, MAX(COALESCE(${retryAttempt}, 0)) AS ${variablePrefix}Retry"
}

def pgObjectInnerJoinStatement(isPostgres: Boolean, metadataValColName: String): String = {
if(isPostgres) s"INNER JOIN pg_largeobject obj ON me.${metadataValColName} = cast(obj.loid as text)" else ""
}

def failedTaskGroupByClause(metadataValue: String, callFqn: String): String = {
return s"GROUP BY ${callFqn}, ${metadataValue}"
}

val workflowUuid = dbIdentifierWrapper("WORKFLOW_EXECUTION_UUID", isPostgres)
val callFqn = dbIdentifierWrapper("CALL_FQN", isPostgres)
val scatterIndex = dbIdentifierWrapper("JOB_SCATTER_INDEX", isPostgres)
Expand All @@ -345,19 +349,32 @@ trait MetadataEntryComponent {
val wmse = dbIdentifierWrapper("WORKFLOW_METADATA_SUMMARY_ENTRY", isPostgres)
val resultSetColumnNames = s"me.${workflowUuid}, me.${callFqn}, me.${scatterIndex}, me.${retryAttempt}, me.${metadataKey}, me.${metadataValue}, me.${metadataValueType}, me.${metadataTimestamp}, me.${metadataJournalId}"

val query = sql"""
val query =
sql"""
SELECT #${resultSetColumnNames}
FROM #${metadataEntry} me
INNER JOIN (
#${targetCallsSelectStatement(callFqn, scatterIndex, retryAttempt)}
#${attemptAndIndexSelectStatement(callFqn, scatterIndex, retryAttempt, "failed")}
FROM #${metadataEntry} me
INNER JOIN #${wmse} wmse
ON wmse.#${workflowUuid} = me.#${workflowUuid}
#${pgObjectInnerJoinStatement(isPostgres, metadataValue)}
WHERE (wmse.#${rootUuid} = $rootWorkflowId OR wmse.#${workflowUuid} = $rootWorkflowId)
AND (me.#${metadataKey} in ('executionStatus', 'backendStatus') AND #${dbMetadataValueColCheckName(isPostgres)} = 'Failed')
#${failedTaskGroupByClause(dbMetadataValueColCheckName(isPostgres), callFqn)}
HAVING #${dbMetadataValueColCheckName(isPostgres)} = 'Failed'
) AS failedCalls
ON me.#${callFqn} = failedCalls.#${callFqn}
INNER JOIN (
#${attemptAndIndexSelectStatement(callFqn, scatterIndex, retryAttempt, "max")}
FROM #${metadataEntry} me
INNER JOIN #${wmse} wmse
ON wmse.#${workflowUuid} = me.#${workflowUuid}
WHERE (wmse.#${rootUuid} = $rootWorkflowId OR wmse.#${workflowUuid} = $rootWorkflowId)
AND #${callFqn} IS NOT NULL
GROUP BY #${callFqn}
) AS targetCalls
ON me.#${callFqn} = targetCalls.#${callFqn}
) maxCalls
ON me.#${callFqn} = maxCalls.#${callFqn}
LEFT JOIN (
SELECT DISTINCT #${callFqn}
FROM #${metadataEntry} me
Expand All @@ -370,13 +387,11 @@ trait MetadataEntryComponent {
ON me.#${callFqn} = avoidedCalls.#${callFqn}
INNER JOIN #${wmse} wmse
ON wmse.#${workflowUuid} = me.#${workflowUuid}
#${pgObjectInnerJoinStatement(isPostgres, metadataValue)}
WHERE avoidedCalls.#${callFqn} IS NULL
AND (me.#${metadataKey} in ('executionStatus', 'backendStatus') AND #${dbMetadataValueColCheckName(isPostgres)} = 'Failed')
AND (
(COALESCE(me.#${retryAttempt}, 0) = targetCalls.maxRetry AND me.#${scatterIndex} IS NULL)
OR (COALESCE(me.#${retryAttempt}, 0) = targetCalls.maxRetry AND me.#${scatterIndex} = targetCalls.maxScatter)
)
AND COALESCE(me.#${scatterIndex}, 0) = maxCalls.maxScatter
AND COALESCE(me.#${retryAttempt}, 0) = maxCalls.maxRetry
AND failedCalls.failedScatter = maxCalls.maxScatter
AND failedCalls.failedRetry = maxCalls.maxRetry
GROUP BY #${resultSetColumnNames}
HAVING me.#${workflowUuid} IN (
SELECT DISTINCT wmse.#${workflowUuid}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,16 @@ class MetadataSlickDatabaseSpec extends AnyFlatSpec with CromwellTimeoutSpec wit
val subSubWorkflowCountableId = "subsubworkflow id: countable stuff"

val failedParentWorkflowId = "bbf4c25b-282b-4a18-a914-441f9684b69e"
val ignoredFailedParentWorkflowId = "ccccc-ddddd-3333-42242"
val ignoredFailedChildWorkflowId = "eeeeeee-ggggggg-33333-22222"
val successfulParentWorkflowId = "4c1cf43d-1fbd-47af-944c-c63216f293ae"

val failedChildWorkflowId = "9ff3d855-0585-48e4-b3a1-189101f611e5"
val successfulChildWorkflowId = "73886096-2e06-48f6-ba42-f365dbf23de5"

val failedStatusMetadataValue = Option(new SerialClob("Failed".toCharArray()))
val doneStatusMetadataValue = Option(new SerialClob("Done".toCharArray()))
val stdErrValue = Option(new SerialClob("test value".toCharArray()))

it should "set up the test data" taggedAs DbmsTest in {
database.runTestTransaction(
Expand Down Expand Up @@ -215,19 +218,26 @@ class MetadataSlickDatabaseSpec extends AnyFlatSpec with CromwellTimeoutSpec wit
MetadataEntry(failedParentWorkflowId, Option("failedWorkflowCall"), Option(0), Option(1), "executionStatus", failedStatusMetadataValue, Option("String"), OffsetDateTime.now().toSystemTimestamp, None),
MetadataEntry(failedParentWorkflowId, Option("failedWorkflowCall"), Option(1), Option(0), "executionStatus", failedStatusMetadataValue, Option("String"), OffsetDateTime.now().toSystemTimestamp, None),
MetadataEntry(failedParentWorkflowId, Option("failedWorkflowCall"), Option(1), Option(1), "executionStatus", failedStatusMetadataValue, Option("String"), OffsetDateTime.now().toSystemTimestamp, None),
MetadataEntry(failedParentWorkflowId, Option("failedWorkflowCall"), Option(1), Option(1), "stderr", stdErrValue, Option("String"), OffsetDateTime.now().toSystemTimestamp, None),
MetadataEntry(failedParentWorkflowId, Option("successfulWorkflowCall"), Option(0), Option(0), "executionStatus", doneStatusMetadataValue, Option("String"), OffsetDateTime.now().toSystemTimestamp, None),
MetadataEntry(failedParentWorkflowId, Option("successfulSubWorkflowCall"), Option(0), Option(0), "subWorkflowId", Option(new SerialClob(failedChildWorkflowId.toCharArray)), Option("String"), OffsetDateTime.now().toSystemTimestamp, None),

//ignored failed workflow calls. These should not be fetched since it's not part of the target workflow tree
MetadataEntry(ignoredFailedParentWorkflowId, Option("failedWorkflowCall"), Option(0), Option(0), "executionStatus", failedStatusMetadataValue, Option("String"), OffsetDateTime.now().toSystemTimestamp, None),
MetadataEntry(ignoredFailedChildWorkflowId, Option("failedSubWorkflowCall"), None, Option(1), "executionStatus", failedStatusMetadataValue, Option("String"), OffsetDateTime.now().toSystemTimestamp, None),

//child workflow calls (successful calls and previous failed attempts/shards are mixed in for negative checks)
//notFailedSubWorkflowCall should not be returned since it succeeded on the last attempt and has no scatters
MetadataEntry(failedChildWorkflowId, Option("notActuallyFailedSubWorkflowCall"), None, Option(1), "executionStatus", failedStatusMetadataValue, Option("String"), OffsetDateTime.now().toSystemTimestamp, None),
MetadataEntry(failedChildWorkflowId, Option("notActuallyFailedSubWorkflowCall"), None, Option(2), "backendStatus", doneStatusMetadataValue, Option("String"), OffsetDateTime.now().toSystemTimestamp, None),
MetadataEntry(failedChildWorkflowId, Option("notActuallyFailedSubWorkflowCall"), None, Option(2), "stderr", stdErrValue, Option("String"), OffsetDateTime.now().toSystemTimestamp, None),
MetadataEntry(failedChildWorkflowId, Option("successfulSubWorkflowCall"), Option(0), Option(0), "executionStatus", doneStatusMetadataValue, Option("String"), OffsetDateTime.now().toSystemTimestamp, None),

//Failed child workflow calls (successful calls and previous failed attempts/shards are mixed in for negative checks)
//failedSubWorkflowCall2 should be returned since it never succeeded
//failedSubWorkflowCall should be returned since it never succeeded
MetadataEntry(failedChildWorkflowId, Option("failedSubWorkflowCall"), None, Option(1), "executionStatus", failedStatusMetadataValue, Option("String"), OffsetDateTime.now().toSystemTimestamp, None),
MetadataEntry(failedChildWorkflowId, Option("failedSubWorkflowCall"), None, Option(2), "backendStatus", failedStatusMetadataValue, Option("String"), OffsetDateTime.now().toSystemTimestamp, None),
MetadataEntry(failedChildWorkflowId, Option("failedSubWorkflowCall"), None, Option(2), "stderr", stdErrValue, Option("String"), OffsetDateTime.now().toSystemTimestamp, None),
MetadataEntry(failedChildWorkflowId, Option("successfulSubWorkflowCall2"), Option(0), Option(0), "executionStatus", doneStatusMetadataValue, Option("String"), OffsetDateTime.now().toSystemTimestamp, None),

//Third set of child workflow calls, similar to above however this set consists of retries and scatters
Expand All @@ -237,6 +247,7 @@ class MetadataSlickDatabaseSpec extends AnyFlatSpec with CromwellTimeoutSpec wit
MetadataEntry(failedChildWorkflowId, Option("failedSubWorkflowCall2"), Option(2), Option(1), "backendStatus", failedStatusMetadataValue, Option("String"), OffsetDateTime.now().toSystemTimestamp, None),
MetadataEntry(failedChildWorkflowId, Option("failedSubWorkflowCall2"), Option(1), Option(2), "backendStatus", failedStatusMetadataValue, Option("String"), OffsetDateTime.now().toSystemTimestamp, None),
MetadataEntry(failedChildWorkflowId, Option("failedSubWorkflowCall2"), Option(2), Option(2), "executionStatus", failedStatusMetadataValue, Option("String"), OffsetDateTime.now().toSystemTimestamp, None),
MetadataEntry(failedChildWorkflowId, Option("failedSubWorkflowCall2"), Option(2), Option(2), "stderr", stdErrValue, Option("String"), OffsetDateTime.now().toSystemTimestamp, None),
MetadataEntry(failedChildWorkflowId, Option("successfulSubWorkflowCall3"), Option(0), Option(0), "executionStatus", doneStatusMetadataValue, Option("String"), OffsetDateTime.now().toSystemTimestamp, None),

//Fourth set of child workflow calls
Expand All @@ -245,6 +256,7 @@ class MetadataSlickDatabaseSpec extends AnyFlatSpec with CromwellTimeoutSpec wit
MetadataEntry(failedChildWorkflowId, Option("notActuallySubWorkflowCall2"), Option(2), Option(1), "backendStatus", failedStatusMetadataValue, Option("String"), OffsetDateTime.now().toSystemTimestamp, None),
MetadataEntry(failedChildWorkflowId, Option("notActuallySubWorkflowCall2"), Option(1), Option(2), "executionStatus", doneStatusMetadataValue, Option("String"), OffsetDateTime.now().toSystemTimestamp, None),
MetadataEntry(failedChildWorkflowId, Option("notActuallySubWorkflowCall2"), Option(2), Option(2), "backendStatus", doneStatusMetadataValue, Option("String"), OffsetDateTime.now().toSystemTimestamp, None),
MetadataEntry(failedChildWorkflowId, Option("notActuallySubWorkflowCall2"), Option(2), Option(2), "stderr", stdErrValue, Option("String"), OffsetDateTime.now().toSystemTimestamp, None),
MetadataEntry(failedChildWorkflowId, Option("successfulSubWorkflowCall4"), Option(0), Option(0), "executionStatus", doneStatusMetadataValue, Option("String"), OffsetDateTime.now().toSystemTimestamp, None),

//Successful parent workflow call (negative check)
Expand All @@ -262,7 +274,9 @@ class MetadataSlickDatabaseSpec extends AnyFlatSpec with CromwellTimeoutSpec wit
WorkflowMetadataSummaryEntry(failedParentWorkflowId, Option("failedParentWorkflow"), Option("Failed"), Option(now), Option(now), Option(now), None, None, None, None),
WorkflowMetadataSummaryEntry(failedChildWorkflowId, Option("failedChildWorkflow"), Option("Failed"), Option(now), Option(now), Option(now), Option(failedParentWorkflowId), Option(failedParentWorkflowId), None, None),
WorkflowMetadataSummaryEntry(successfulParentWorkflowId, Option("successfulParentWorkflow"), Option("Succeeded"), Option(now), Option(now), Option(now), None, None, None, None),
WorkflowMetadataSummaryEntry(successfulChildWorkflowId, Option("successfulChildWorkflow"), Option("Succeeded"), Option(now), Option(now), Option(now), Option(successfulParentWorkflowId), Option(successfulParentWorkflowId), None, None)
WorkflowMetadataSummaryEntry(successfulChildWorkflowId, Option("successfulChildWorkflow"), Option("Succeeded"), Option(now), Option(now), Option(now), Option(successfulParentWorkflowId), Option(successfulParentWorkflowId), None, None),
WorkflowMetadataSummaryEntry(ignoredFailedParentWorkflowId, Option("ignoredFailedParentWorkflow"), Option("Failed"), Option(now), Option(now), Option(now), Option(ignoredFailedParentWorkflowId), Option(ignoredFailedParentWorkflowId), None, None),
WorkflowMetadataSummaryEntry(ignoredFailedChildWorkflowId, Option("ignoredFailedChildWorkflow"), Option("Failed"), Option(now), Option(now), Option(now), Option(ignoredFailedChildWorkflowId), Option(ignoredFailedChildWorkflowId), None, None)
)
).futureValue(Timeout(10.seconds))

Expand All @@ -271,11 +285,11 @@ class MetadataSlickDatabaseSpec extends AnyFlatSpec with CromwellTimeoutSpec wit

val recordCount = Map(
failedParentWorkflowId -> scala.collection.mutable.Map(
"expected" -> 1,
"expected" -> 2,
"actual" -> 0
),
failedChildWorkflowId -> scala.collection.mutable.Map(
"expected" -> 2,
"expected" -> 4,
"actual" -> 0
)
)
Expand All @@ -289,23 +303,39 @@ class MetadataSlickDatabaseSpec extends AnyFlatSpec with CromwellTimeoutSpec wit
recordCount(workflowId)("actual") += 1
val metadataValueClob = entry.metadataValue.get
val metadataValueString = metadataValueClob.getSubString(1, metadataValueClob.length().toInt)
metadataValueString should be("Failed")

entry.metadataKey match {
case "stderr" => {
metadataValueString should be("test value")
}
case "backendStatus" => {
metadataValueString should be("Failed")
}
case "executionStatus" => {
metadataValueString should be("Failed")
}
case _ => fail(s"Unexpected key ${entry.metadataKey} was included in result set")
}

entry.metadataKey should not be("subWorkflowId")
entry.callFullyQualifiedName.getOrElse("") match {
case "failedWorkflowCall" => {
entry.jobIndex.get should be(1)
entry.jobAttempt.get should be(1)
entry.metadataKey should be("executionStatus")
val isValidKey = List("executionStatus", "stderr").contains(entry.metadataKey)
isValidKey should be(true)
}
case "failedSubWorkflowCall" => {
entry.jobIndex should be(None)
entry.jobAttempt.get should be(2)
entry.metadataKey should be("backendStatus")
val isValidKey = List("backendStatus", "stderr").contains(entry.metadataKey)
isValidKey should be(true)
}
case "failedSubWorkflowCall2" => {
entry.jobIndex.get should be(2)
entry.jobAttempt.get should be(2)
entry.metadataKey should be("executionStatus")
val isValidKey = List("executionStatus", "stderr").contains(entry.metadataKey)
isValidKey should be(true)
}
case _ => fail(s"Entry ${entry.callFullyQualifiedName.getOrElse("N/A")} | Index: ${entry.jobIndex.get} | Attempt: ${entry.jobAttempt.get} should not be in result set")
}
Expand Down

0 comments on commit d89e4af

Please sign in to comment.