diff --git a/database/sql/src/main/scala/cromwell/database/slick/MetadataSlickDatabase.scala b/database/sql/src/main/scala/cromwell/database/slick/MetadataSlickDatabase.scala index 1405f7d101f..0a33e4b6b72 100644 --- a/database/sql/src/main/scala/cromwell/database/slick/MetadataSlickDatabase.scala +++ b/database/sql/src/main/scala/cromwell/database/slick/MetadataSlickDatabase.scala @@ -515,4 +515,9 @@ class MetadataSlickDatabase(originalDatabaseConfig: Config) override def getMetadataTableSizeInformation()(implicit ec: ExecutionContext): Future[Option[InformationSchemaEntry]] = { runAction(dataAccess.metadataTableSizeInformation()) } + + override def getFailedJobsMetadataWithWorkflowId(rootWorkflowId: String)(implicit ec: ExecutionContext): Future[Vector[MetadataEntry]] = { + val isPostgres = databaseConfig.getValue("db.driver").toString.toLowerCase().contains("postgres") + runLobAction(dataAccess.failedJobsMetadataWithWorkflowId(rootWorkflowId, isPostgres)) + } } diff --git a/database/sql/src/main/scala/cromwell/database/slick/tables/MetadataEntryComponent.scala b/database/sql/src/main/scala/cromwell/database/slick/tables/MetadataEntryComponent.scala index 32256c448fb..f4b2724b6a7 100644 --- a/database/sql/src/main/scala/cromwell/database/slick/tables/MetadataEntryComponent.scala +++ b/database/sql/src/main/scala/cromwell/database/slick/tables/MetadataEntryComponent.scala @@ -310,6 +310,84 @@ trait MetadataEntryComponent { }).headOption } + def failedJobsMetadataWithWorkflowId(rootWorkflowId: String, isPostgres: Boolean) = { + val getMetadataEntryResult = GetResult(r => { + MetadataEntry(r.<<, r.<<, r.<<, r.<<, r.<<, r.nextClobOption().map(clob => new SerialClob(clob)), r.<<, r.<<, r.<<) + }) + + def dbIdentifierWrapper(identifier: String, isPostgres: Boolean) = { + if(isPostgres) s"${'"'}$identifier${'"'}" else identifier + } + + def dbMetadataValueColCheckName(isPostgres: Boolean): String = { + if(isPostgres) "obj.data" else "METADATA_VALUE" + } + + def targetCallsSelectStatement(callFqn: String, scatterIndex: String, retryAttempt: String): String = { + s"SELECT ${callFqn}, MAX(COALESCE(${scatterIndex}, 0)) as maxScatter, MAX(COALESCE(${retryAttempt}, 0)) AS maxRetry" + } + + def pgObjectInnerJoinStatement(isPostgres: Boolean, metadataValColName: String): String = { + if(isPostgres) s"INNER JOIN pg_largeobject obj ON me.${metadataValColName} = cast(obj.loid as text)" else "" + } + + val workflowUuid = dbIdentifierWrapper("WORKFLOW_EXECUTION_UUID", isPostgres) + val callFqn = dbIdentifierWrapper("CALL_FQN", isPostgres) + val scatterIndex = dbIdentifierWrapper("JOB_SCATTER_INDEX", isPostgres) + val retryAttempt = dbIdentifierWrapper("JOB_RETRY_ATTEMPT", isPostgres) + val metadataKey = dbIdentifierWrapper("METADATA_KEY", isPostgres) + val metadataValueType = dbIdentifierWrapper("METADATA_VALUE_TYPE", isPostgres) + val metadataTimestamp = dbIdentifierWrapper("METADATA_TIMESTAMP", isPostgres) + val metadataJournalId = dbIdentifierWrapper("METADATA_JOURNAL_ID", isPostgres) + val rootUuid = dbIdentifierWrapper("ROOT_WORKFLOW_EXECUTION_UUID", isPostgres) + val metadataValue = dbIdentifierWrapper("METADATA_VALUE", isPostgres) + val metadataEntry = dbIdentifierWrapper("METADATA_ENTRY", isPostgres) + val wmse = dbIdentifierWrapper("WORKFLOW_METADATA_SUMMARY_ENTRY", isPostgres) + val resultSetColumnNames = s"me.${workflowUuid}, me.${callFqn}, me.${scatterIndex}, me.${retryAttempt}, me.${metadataKey}, me.${metadataValue}, me.${metadataValueType}, me.${metadataTimestamp}, me.${metadataJournalId}" + + val query = sql""" + SELECT #${resultSetColumnNames} + FROM #${metadataEntry} me + INNER JOIN ( + #${targetCallsSelectStatement(callFqn, scatterIndex, retryAttempt)} + FROM #${metadataEntry} me + INNER JOIN #${wmse} wmse + ON wmse.#${workflowUuid} = me.#${workflowUuid} + WHERE (wmse.#${rootUuid} = $rootWorkflowId OR wmse.#${workflowUuid} = $rootWorkflowId) + AND #${callFqn} IS NOT NULL + GROUP BY #${callFqn} + ) AS targetCalls + ON me.#${callFqn} = targetCalls.#${callFqn} + LEFT JOIN ( + SELECT DISTINCT #${callFqn} + FROM #${metadataEntry} me + INNER JOIN #${wmse} wmse + ON wmse.#${workflowUuid} = me.#${workflowUuid} + WHERE (wmse.#${rootUuid} = $rootWorkflowId OR wmse.#${workflowUuid} = $rootWorkflowId) + AND me.#${metadataKey} = 'subWorkflowId' + GROUP BY #${callFqn} + ) AS avoidedCalls + ON me.#${callFqn} = avoidedCalls.#${callFqn} + INNER JOIN #${wmse} wmse + ON wmse.#${workflowUuid} = me.#${workflowUuid} + #${pgObjectInnerJoinStatement(isPostgres, metadataValue)} + WHERE avoidedCalls.#${callFqn} IS NULL + AND (me.#${metadataKey} in ('executionStatus', 'backendStatus') AND #${dbMetadataValueColCheckName(isPostgres)} = 'Failed') + AND ( + (COALESCE(me.#${retryAttempt}, 0) = targetCalls.maxRetry AND me.#${scatterIndex} IS NULL) + OR (COALESCE(me.#${retryAttempt}, 0) = targetCalls.maxRetry AND me.#${scatterIndex} = targetCalls.maxScatter) + ) + GROUP BY #${resultSetColumnNames} + HAVING me.#${workflowUuid} IN ( + SELECT DISTINCT wmse.#${workflowUuid} + FROM #${wmse} wmse + WHERE wmse.#${rootUuid} = $rootWorkflowId OR wmse.#${workflowUuid} = $rootWorkflowId + ) + """ + + query.as(getMetadataEntryResult) + } + private[this] def metadataEntryHasMetadataKeysLike(metadataEntry: MetadataEntries, metadataKeysToFilterFor: List[String], metadataKeysToFilterOut: List[String]): Rep[Boolean] = { diff --git a/database/sql/src/main/scala/cromwell/database/sql/MetadataSqlDatabase.scala b/database/sql/src/main/scala/cromwell/database/sql/MetadataSqlDatabase.scala index 30534818f85..9139c819999 100644 --- a/database/sql/src/main/scala/cromwell/database/sql/MetadataSqlDatabase.scala +++ b/database/sql/src/main/scala/cromwell/database/sql/MetadataSqlDatabase.scala @@ -197,4 +197,6 @@ trait MetadataSqlDatabase extends SqlDatabase { def countWorkflowsLeftToDeleteThatEndedOnOrBeforeThresholdTimestamp(workflowEndTimestampThreshold: Timestamp)(implicit ec: ExecutionContext): Future[Int] def getMetadataTableSizeInformation()(implicit ec: ExecutionContext): Future[Option[InformationSchemaEntry]] + + def getFailedJobsMetadataWithWorkflowId(rootWorkflowId: String)(implicit ec: ExecutionContext): Future[Vector[MetadataEntry]] } diff --git a/docs/api/RESTAPI.md b/docs/api/RESTAPI.md index 7a0585ca08e..64ffe19a3fd 100644 --- a/docs/api/RESTAPI.md +++ b/docs/api/RESTAPI.md @@ -1,5 +1,5 @@