Skip to content

Commit

Permalink
Pivotal ID # 186080414: Execute Submission Persistence On DMs (#791)
Browse files Browse the repository at this point in the history
https://www.pivotaltracker.com/story/show/186080414

- Publish submission task artefact
- Always use specific configuration file to start the applications
- Fix logs output path
- Fix submission task start command
- Avoid duplicated rabbit message
  • Loading branch information
jhoanmanuelms authored Dec 14, 2023
1 parent 695b369 commit 55d754d
Show file tree
Hide file tree
Showing 17 changed files with 183 additions and 106 deletions.
4 changes: 2 additions & 2 deletions ci/stats-reporter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ deploy-dev-stats-reporter-task:
when: manual
dependencies:
- build-test
script: gradle :scheduler:tasks:submission-stats-reporter-task:bootJar updateStatsReporterArtifact -Penv=codon -PdeployPath=$CODON_APPS_PATH/scheduler/dev/apps
script: gradle :scheduler:tasks:stats-reporter-task:bootJar updateStatsReporterArtifact -Penv=codon -PdeployPath=$CODON_APPS_PATH/scheduler/dev/apps

deploy-beta-stats-reporter-task:
stage: deploy-beta-stats-reporter-task
when: manual
dependencies:
- build-test
script: gradle :scheduler:tasks:submission-stats-reporter-task:bootJar updateStatsReporterArtifact -Penv=codon -PdeployPath=$CODON_APPS_PATH/scheduler/beta/apps
script: gradle :scheduler:tasks:stats-reporter-task:bootJar updateStatsReporterArtifact -Penv=codon -PdeployPath=$CODON_APPS_PATH/scheduler/beta/apps

deploy-prod-stats-reporter-task:
extends: .deploy-prod-stats-reporter-task
Expand Down
30 changes: 30 additions & 0 deletions ci/submission-task.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
.deploy-prod-submission-task:
dependencies:
- build-test
script: gradle :submission:submission-task:bootJar updateSubmissionTaskArtifact -Penv=prod -PdeployPath=$APPS_PATH/submitter/prod

auto-deploy-prod-submission-task:
extends: .deploy-prod-submission-task
stage: auto-deploy-prod-submission-task
only:
refs:
- prod

deploy-dev-submission-task:
stage: deploy-dev-submission-task
when: manual
dependencies:
- build-test
script: gradle :submission:submission-task:bootJar updateSubmissionTaskArtifact -Penv=dev -PdeployPath=$APPS_PATH/submitter/dev

deploy-beta-submission-task:
stage: deploy-beta-submission-task
when: manual
dependencies:
- build-test
script: gradle :submission:submission-task:bootJar updateSubmissionTaskArtifact -Penv=beta -PdeployPath=$APPS_PATH/submitter/beta

deploy-prod-submission-task:
extends: .deploy-prod-submission-task
stage: deploy-prod-submission-task
when: manual
2 changes: 1 addition & 1 deletion ci/update.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ nohup ${JAVA_HOME}/bin/java JVM_PARAMS \
-Dsun.jnu.encoding=UTF-8 \
-Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=*:DEBUG_PORT,suspend=n \
-jar APP_PATH/APP_NAME \
--spring.config.location=classpath:/application.yml,APP_PATH/application.yml \
--spring.config.location=APP_PATH/application.yml \
--server.port=APP_PORT > start_logs.txt 2>&1 &
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import kotlinx.coroutines.withContext
import mu.KotlinLogging
import uk.ac.ebi.biostd.client.cluster.common.JobResponseParser
import uk.ac.ebi.biostd.client.cluster.common.JobSubmitFailException
import uk.ac.ebi.biostd.client.cluster.exception.FailedJobException
import uk.ac.ebi.biostd.client.cluster.model.Job
import uk.ac.ebi.biostd.client.cluster.model.JobSpec
import java.time.Duration.ofSeconds
Expand All @@ -21,7 +22,7 @@ class RemoteClusterClient(
private val sessionFunction: () -> Session,
) : ClusterClient {
override suspend fun triggerJobAsync(jobSpec: JobSpec): Try<Job> {
val parameters = mutableListOf("bsub -o $logsPath -e $logsPath")
val parameters = mutableListOf("bsub -o $logsPath/%J_OUT -e $logsPath/%J_IN")
parameters.addAll(jobSpec.asParameter())
val command = parameters.joinToString(separator = " ")
logger.info { "Executing command '$command'" }
Expand All @@ -41,30 +42,32 @@ class RemoteClusterClient(
}

override suspend fun jobStatus(jobId: String): String {
logger.info { "Checking Job id ='$jobId' status" }
return runInSession {
val status = executeCommand("bjobs -o STAT -noheader $jobId").second.trimIndent()
logger.info { "Job $jobId. Current status $status" }
logger.info { "Job $jobId status $status" }
status
}
}

private suspend fun await(job: Job, checkJobInterval: Long, maxSecondsDuration: Long): Job {
return runInSession {
var status: String = PEND_STATUS
waitUntil(
interval = ofSeconds(checkJobInterval),
duration = ofSeconds(maxSecondsDuration)
) {
val status = jobStatus(job.id)
val isDone = status == "DONE"
when {
isDone -> logger.info { "Job ${job.id} status is $status. Execution completed" }
status = jobStatus(job.id)
val executionFinished = status == DONE_STATUS || status == EXIT_STATUS
when (status) {
EXIT_STATUS -> logger.error { "Job ${job.id} status is $EXIT_STATUS. Execution failed" }
DONE_STATUS -> logger.info { "Job ${job.id} status is $DONE_STATUS. Execution completed" }
else -> logger.info { "Job ${job.id} status is $status. Waiting for completion" }
}
return@waitUntil isDone

return@waitUntil executionFinished
}

job
if (status == DONE_STATUS) job else throw FailedJobException(job)
}
}

Expand All @@ -75,7 +78,24 @@ class RemoteClusterClient(
return Try.raise(JobSubmitFailException(response))
}

private suspend fun <T> runInSession(exec: suspend CommandRunner.() -> T): T {
return withContext(Dispatchers.IO) {
val session = sessionFunction()

try {
session.connect()
exec(CommandRunner(session))
} finally {
session.disconnect()
}
}
}

companion object {
internal const val DONE_STATUS = "DONE"
internal const val EXIT_STATUS = "EXIT"
internal const val PEND_STATUS = "PEND"

private val responseParser = JobResponseParser()

fun create(sshKey: String, sshMachine: String, logsPath: String): RemoteClusterClient {
Expand All @@ -88,17 +108,4 @@ class RemoteClusterClient(
}
}
}

private suspend fun <T> runInSession(exec: suspend CommandRunner.() -> T): T {
return withContext(Dispatchers.IO) {
val session = sessionFunction()

try {
session.connect()
exec(CommandRunner(session))
} finally {
session.disconnect()
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package uk.ac.ebi.biostd.client.cluster.exception

import uk.ac.ebi.biostd.client.cluster.model.Job

class FailedJobException(job: Job) : RuntimeException("The job ${job.id} execution has failed. Logs: ${job.logsPath}")
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ import java.nio.file.Paths
class FtpClientExtTest(@MockK val testInstance: FtpClient) {

@Test
fun existsWhenExists(@MockK file: FTPFile) {
fun `when exists`(@MockK file: FTPFile) {
val path = Paths.get("/dummy/path")
every { testInstance.listFiles(path) } returns listOf(file)

assertThat(testInstance.exists(path)).isTrue()
}

@Test
fun existsWhenDoesNotExists() {
fun `when doesn't exist`() {
val path = Paths.get("/dummy/path")
every { testInstance.listFiles(path) } returns emptyList()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ebi.ac.uk.extended.mapping.to
import ebi.ac.uk.extended.model.ExtFile
import ebi.ac.uk.extended.model.FireFile
import ebi.ac.uk.extended.model.NfsFile
import ebi.ac.uk.io.ext.size
import ebi.ac.uk.model.BioFile

internal const val TO_FILE_EXTENSIONS = "ebi.ac.uk.extended.mapping.to.ToFileKt"
Expand All @@ -14,9 +13,7 @@ internal const val TO_FILE_EXTENSIONS = "ebi.ac.uk.extended.mapping.to.ToFileKt"
*/
fun ExtFile.toFile(): BioFile {
return when (this) {
is NfsFile ->
BioFile(filePath, file.size(calculateDirectories = false), type.value, attributes.map { it.toAttribute() })

is NfsFile -> BioFile(filePath, size, type.value, attributes.map { it.toAttribute() })
is FireFile -> BioFile(filePath, size, type.value, attributes.map { it.toAttribute() })
}
}
9 changes: 9 additions & 0 deletions deploy.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,15 @@ task updateStatsReporterArtifact {
finalizedBy "deployArtifact"
}

task updateSubmissionTaskArtifact {
doFirst {
project.ext.artifactPath = "submission/submission-task/build/libs"
project.ext.artifactName = "submission-task-1.0.0.jar"
}

finalizedBy "deployArtifact"
}

task deployArtifact {
doLast {
ssh.run {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ internal fun createFile(path: String, file: File, attributes: List<Attribute>):
file = file,
fullPath = file.absolutePath,
md5 = "NOT_CALCULATED",
size = file.size(false),
size = file.size(calculateDirectories = false),
attributes = attributes.toExtAttributes(FILES_RESERVED_ATTRS)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package ac.uk.ebi.biostd.common.properties
data class SubmissionTaskProperties(
val enabled: Boolean,
val jarLocation: String,
val logsLocation: String,
val configFilePath: String,
val javaLocation: String,
val configFileLocation: String,
)

data class ClusterProperties(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,24 @@ class ExtendedSubmissionSubmitter(
}
}

override suspend fun cleanRequest(accNo: String, version: Int) {
when (submissionTaskProperties.enabled) {
true -> remoteExtSubmissionSubmitter.cleanRequest(accNo, version)
else -> localExtSubmissionSubmitter.cleanRequest(accNo, version)
}
}

override suspend fun processRequest(accNo: String, version: Int) {
when (submissionTaskProperties.enabled) {
true -> remoteExtSubmissionSubmitter.processRequest(accNo, version)
else -> localExtSubmissionSubmitter.processRequest(accNo, version)
}
}

override suspend fun checkReleased(accNo: String, version: Int) {
when (submissionTaskProperties.enabled) {
true -> remoteExtSubmissionSubmitter.checkReleased(accNo, version)
else -> localExtSubmissionSubmitter.checkReleased(accNo, version)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ import ebi.ac.uk.extended.model.ExtSubmission
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import mu.KotlinLogging
import java.io.File
import java.util.UUID
import uk.ac.ebi.biostd.client.cluster.api.ClusterClient
import uk.ac.ebi.biostd.client.cluster.model.DataMoverQueue
import uk.ac.ebi.biostd.client.cluster.model.JobSpec
import uk.ac.ebi.biostd.client.cluster.model.MemorySpec.Companion.SIXTEEN_GB

private val logger = KotlinLogging.logger {}

@Suppress("TooManyFunctions")
class RemoteExtSubmissionSubmitter(
private val clusterClient: ClusterClient,
private val submissionTaskProperties: SubmissionTaskProperties,
) : ExtSubmissionSubmitter {
override suspend fun createRequest(rqt: ExtSubmitRequest): Pair<String, Int> {
Expand All @@ -25,19 +28,19 @@ class RemoteExtSubmissionSubmitter(
}

override suspend fun loadRequest(accNo: String, version: Int) {
executeRemotly(accNo, version, Mode.LOAD)
executeRemotely(accNo, version, Mode.LOAD)
}

override suspend fun cleanRequest(accNo: String, version: Int) {
TODO("Not yet implemented")
executeRemotely(accNo, version, Mode.CLEAN)
}

override suspend fun processRequest(accNo: String, version: Int) {
executeRemotly(accNo, version, Mode.COPY)
executeRemotely(accNo, version, Mode.COPY)
}

override suspend fun checkReleased(accNo: String, version: Int) {
TODO("Not yet implemented")
executeRemotely(accNo, version, Mode.CHECK_RELEASED)
}

override suspend fun saveRequest(accNo: String, version: Int): ExtSubmission {
Expand All @@ -56,27 +59,25 @@ class RemoteExtSubmissionSubmitter(
TODO("Not yet implemented")
}

private suspend fun executeRemotly(accNo: String, version: Int, mode: Mode) = withContext(Dispatchers.IO) {
val pId = UUID.randomUUID()
val logs = File(submissionTaskProperties.logsLocation, "application-$pId.log")
val params = buildList<String> {
add("java")
add("-jar")
add(submissionTaskProperties.jarLocation)
add("--spring.config.location=${submissionTaskProperties.configFilePath}")
add("--accNo=$accNo")
add("--version=$version")
add("--mode=${mode.name}")
private suspend fun executeRemotely(accNo: String, version: Int, mode: Mode) = withContext(Dispatchers.IO) {
val command = buildString {
appendSpaced(submissionTaskProperties.javaLocation)
appendSpaced("-jar")
appendSpaced(submissionTaskProperties.jarLocation)
appendSpaced("--spring.config.location=${submissionTaskProperties.configFileLocation}")
appendSpaced("--accNo=$accNo")
appendSpaced("--version=$version")
appendSpaced("--mode=${mode.name}")
}
logger.info { "$accNo $version process $pId, task ='${params.joinToString(" ")}', logs='${logs.absolutePath}'" }
val exitCode = executeRemotly(logs, params)
if (exitCode != 0) throw IllegalStateException("Failed to process subsmision '$accNo' in process $pId")
val jobSpec = JobSpec(cores = 8, ram = SIXTEEN_GB, DataMoverQueue, command)

// TODO this should be moved back to async after #186657310 is completed
val job = clusterClient.triggerJobSync(jobSpec, maxSecondsDuration = 600)
logger.info { "$accNo Executed submitter task in mode $mode. Job Id: ${job.id}, Logs: ${job.logsPath}" }
}

private fun executeRemotly(logs: File, params: List<String>): Int {
val processBuilder = ProcessBuilder(params)
processBuilder.redirectOutput(logs)
val process = processBuilder.start()
return process.waitFor()
private fun StringBuilder.appendSpaced(value: String) {
append(value)
append(" ")
}
}
7 changes: 5 additions & 2 deletions submission/submission-task/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import Dependencies.KotlinCoroutines
import Dependencies.KotlinLogging
import Dependencies.MySql
import Projects.SubmissionSubmitter
Expand All @@ -23,9 +24,11 @@ dependencies {
api(project(SubmissionSubmitter))

annotationProcessor(SpringBootConfigurationProcessor)
implementation(SpringBootStarter)
implementation(MySql)

implementation(KotlinCoroutines)
implementation(KotlinLogging)
implementation(MySql)
implementation(SpringBootStarter)
}

tasks.named<BootJar>("bootJar") {
Expand Down
Loading

0 comments on commit 55d754d

Please sign in to comment.