Skip to content

Commit

Permalink
WX-1110 created method to handle identifier parsing for postgres
Browse files Browse the repository at this point in the history
  • Loading branch information
JVThomas committed Jun 28, 2023
1 parent 3f648a3 commit ff6b590
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ class MetadataSlickDatabase(originalDatabaseConfig: Config)
}

override def getFailedJobsMetadataWithWorkflowId(rootWorkflowId: String)(implicit ec: ExecutionContext): Future[Vector[MetadataEntry]] = {
runAction(dataAccess.failedJobsMetadataWithWorkflowId(rootWorkflowId))
val isPostgres = databaseConfig.getValue("db.driver").toString.toLowerCase().contains("postgres")
runAction(dataAccess.failedJobsMetadataWithWorkflowId(rootWorkflowId, isPostgres))

Check warning on line 521 in database/sql/src/main/scala/cromwell/database/slick/MetadataSlickDatabase.scala

View check run for this annotation

Codecov / codecov/patch

database/sql/src/main/scala/cromwell/database/slick/MetadataSlickDatabase.scala#L520-L521

Added lines #L520 - L521 were not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -310,40 +310,59 @@ trait MetadataEntryComponent {
}).headOption
}

def failedJobsMetadataWithWorkflowId(rootWorkflowId: String) = {
def failedJobsMetadataWithWorkflowId(rootWorkflowId: String, isPostgres: Boolean) = {
val getMetadataEntryResult = GetResult(r => {
MetadataEntry(r.<<, r.<<, r.<<, r.<<, r.<<, r.nextClobOption().map(clob => new SerialClob(clob)), r.<<, r.<<, r.<<)

Check warning on line 315 in database/sql/src/main/scala/cromwell/database/slick/tables/MetadataEntryComponent.scala

View check run for this annotation

Codecov / codecov/patch

database/sql/src/main/scala/cromwell/database/slick/tables/MetadataEntryComponent.scala#L314-L315

Added lines #L314 - L315 were not covered by tests
})

def pgIdentifierWrapper(identifier: String, isPostgres: Boolean) = {
if(isPostgres) s"${'"'}$identifier${'"'}" else identifier

Check warning on line 319 in database/sql/src/main/scala/cromwell/database/slick/tables/MetadataEntryComponent.scala

View check run for this annotation

Codecov / codecov/patch

database/sql/src/main/scala/cromwell/database/slick/tables/MetadataEntryComponent.scala#L319

Added line #L319 was not covered by tests
}

val workflowUuid = pgIdentifierWrapper("WORKFLOW_EXECUTION_UUID", isPostgres)
val callFqn = pgIdentifierWrapper("CALL_FQN", isPostgres)
val scatterIndex = pgIdentifierWrapper("JOB_SCATTER_INDEX", isPostgres)
val retryAttempt = pgIdentifierWrapper("JOB_RETRY_ATTEMPT", isPostgres)
val metadataKey = pgIdentifierWrapper("METADATA_KEY", isPostgres)
val metadataValue = pgIdentifierWrapper("METADATA_VALUE", isPostgres)
val metadataValueType = pgIdentifierWrapper("METADATA_VALUE_TYPE", isPostgres)
val metadataTimestamp = pgIdentifierWrapper("METADATA_TIMESTAMP", isPostgres)
val metadataJournalId = pgIdentifierWrapper("METADATA_JOURNAL_ID", isPostgres)
val rootUuid = pgIdentifierWrapper("ROOT_WORKFLOW_EXECUTION_UUID", isPostgres)

Check warning on line 331 in database/sql/src/main/scala/cromwell/database/slick/tables/MetadataEntryComponent.scala

View check run for this annotation

Codecov / codecov/patch

database/sql/src/main/scala/cromwell/database/slick/tables/MetadataEntryComponent.scala#L322-L331

Added lines #L322 - L331 were not covered by tests

val metadataEntry = pgIdentifierWrapper("METADATA_ENTRY", isPostgres)
val wmse = pgIdentifierWrapper("WORKFLOW_METADATA_SUMMARY_ENTRY", isPostgres)

Check warning on line 334 in database/sql/src/main/scala/cromwell/database/slick/tables/MetadataEntryComponent.scala

View check run for this annotation

Codecov / codecov/patch

database/sql/src/main/scala/cromwell/database/slick/tables/MetadataEntryComponent.scala#L333-L334

Added lines #L333 - L334 were not covered by tests

sql"""
SELECT me.WORKFLOW_EXECUTION_UUID, me.CALL_FQN, me.JOB_SCATTER_INDEX, me.JOB_RETRY_ATTEMPT, me.METADATA_KEY, me.METADATA_VALUE, me.METADATA_VALUE_TYPE, me.METADATA_TIMESTAMP, me.METADATA_JOURNAL_ID
FROM METADATA_ENTRY me
SELECT me.#${workflowUuid}, me.#${callFqn}, me.#${scatterIndex}, me.#${retryAttempt}, me.#${metadataKey}, me.#${metadataValue}, me.#${metadataValueType}, me.#${metadataTimestamp}, me.#${metadataJournalId}
FROM #${metadataEntry} me
INNER JOIN (
SELECT DISTINCT CALL_FQN, MAX(COALESCE(JOB_SCATTER_INDEX, 0)) as maxScatter, MAX(COALESCE(JOB_RETRY_ATTEMPT, 0)) AS maxRetry
FROM METADATA_ENTRY me
INNER JOIN WORKFLOW_METADATA_SUMMARY_ENTRY wmse
ON wmse.WORKFLOW_EXECUTION_UUID = me.WORKFLOW_EXECUTION_UUID
WHERE (wmse.ROOT_WORKFLOW_EXECUTION_UUID = $rootWorkflowId OR wmse.WORKFLOW_EXECUTION_UUID = $rootWorkflowId)
AND (me.METADATA_KEY in ('executionStatus', 'backendStatus') AND METADATA_VALUE = 'Failed')
AND CALL_FQN IS NOT NULL
GROUP BY CALL_FQN
SELECT DISTINCT #${callFqn}, MAX(COALESCE(#${scatterIndex}, 0)) as maxScatter, MAX(COALESCE(#${retryAttempt}, 0)) AS maxRetry
FROM #${metadataEntry} me
INNER JOIN #${wmse} wmse
ON wmse.#${workflowUuid} = me.#${workflowUuid}
WHERE (wmse.#${rootUuid} = $rootWorkflowId OR wmse.#${workflowUuid} = $rootWorkflowId)
AND (me.#${metadataKey} in ('executionStatus', 'backendStatus') AND #${metadataValue} = 'Failed')
AND #${callFqn} IS NOT NULL
GROUP BY #${callFqn}
) AS targetCalls
ON me.CALL_FQN = targetCalls.CALL_FQN
ON me.#${callFqn} = targetCalls.#${callFqn}
LEFT JOIN (
SELECT DISTINCT CALL_FQN
FROM METADATA_ENTRY me
INNER JOIN WORKFLOW_METADATA_SUMMARY_ENTRY wmse
ON wmse.WORKFLOW_EXECUTION_UUID = me.WORKFLOW_EXECUTION_UUID
WHERE (wmse.ROOT_WORKFLOW_EXECUTION_UUID = $rootWorkflowId OR wmse.WORKFLOW_EXECUTION_UUID = $rootWorkflowId)
AND me.METADATA_KEY = 'subWorkflowId'
GROUP BY CALL_FQN
SELECT DISTINCT #${callFqn}
FROM #${metadataEntry} me
INNER JOIN #${wmse} wmse
ON wmse.#${workflowUuid} = me.#${workflowUuid}
WHERE (wmse.#${rootUuid} = $rootWorkflowId OR wmse.#${workflowUuid} = $rootWorkflowId)
AND me.#${metadataKey} = 'subWorkflowId'
GROUP BY #${callFqn}
) AS avoidedCalls
ON me.CALL_FQN = avoidedCalls.CALL_FQN
INNER JOIN WORKFLOW_METADATA_SUMMARY_ENTRY wmse
ON wmse.WORKFLOW_EXECUTION_UUID = me.WORKFLOW_EXECUTION_UUID
WHERE avoidedCalls.CALL_FQN IS NULL
AND COALESCE(me.JOB_SCATTER_INDEX,0) = targetCalls.maxScatter
AND COALESCE(me.JOB_RETRY_ATTEMPT, 0) = targetCalls.maxRetry
AND (wmse.ROOT_WORKFLOW_EXECUTION_UUID = $rootWorkflowId OR wmse.WORKFLOW_EXECUTION_UUID = $rootWorkflowId)
ON me.#${callFqn} = avoidedCalls.#${callFqn}
INNER JOIN #${wmse} wmse
ON wmse.#${workflowUuid} = me.#${workflowUuid}
WHERE avoidedCalls.#${callFqn} IS NULL
AND COALESCE(me.#${scatterIndex},0) = targetCalls.maxScatter
AND COALESCE(me.#${retryAttempt}, 0) = targetCalls.maxRetry
AND (wmse.#${rootUuid} = $rootWorkflowId OR wmse.#${workflowUuid} = $rootWorkflowId)
""".as(getMetadataEntryResult)

Check warning on line 366 in database/sql/src/main/scala/cromwell/database/slick/tables/MetadataEntryComponent.scala

View check run for this annotation

Codecov / codecov/patch

database/sql/src/main/scala/cromwell/database/slick/tables/MetadataEntryComponent.scala#L366

Added line #L366 was not covered by tests
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ class MetadataSlickDatabaseSpec extends AnyFlatSpec with CromwellTimeoutSpec wit
}

it should "fetch failed tasks from a failed workflow" taggedAs DbmsTest in {
// ensure that db is in a blank state to avoid side effects from any previous unit tests
// ensure that db is in a blank state to avoid side effects from any previous unit tests
database.runTestTransaction(database.dataAccess.workflowMetadataSummaryEntries.schema.truncate)
database.runTestTransaction(database.dataAccess.metadataEntries.schema.truncate)

Expand Down Expand Up @@ -258,14 +258,14 @@ class MetadataSlickDatabaseSpec extends AnyFlatSpec with CromwellTimeoutSpec wit
entry.metadataKey should not be("subWorkflowId")
entry.callFullyQualifiedName.getOrElse("") match {
case "failedWorkflowCall" => {
entry.jobIndex should equal(1)
entry.jobAttempt should equal(1)
entry.metadataKey should equal("executionStatus")
entry.jobIndex should be(1)
entry.jobAttempt should be(1)
entry.metadataKey should be("executionStatus")
}
case "failedSubWorkflowCall" => {
entry.jobIndex should equal(1)
entry.jobAttempt should equal(1)
entry.metadataKey should be ("backendStatus")
entry.jobIndex should be(1)
entry.jobAttempt should be(1)
entry.metadataKey should be("backendStatus")
}
case _ => fail(s"Entry ${entry.callFullyQualifiedName} | Index: ${entry.jobIndex} | Attempt: ${entry.jobAttempt} should not be in result set")
}
Expand Down

0 comments on commit ff6b590

Please sign in to comment.