diff --git a/src/main/kotlin/albelli/junit/synnefo/api/SynnefoOptions.kt b/src/main/kotlin/albelli/junit/synnefo/api/SynnefoOptions.kt index 1d987bb..09e6754 100644 --- a/src/main/kotlin/albelli/junit/synnefo/api/SynnefoOptions.kt +++ b/src/main/kotlin/albelli/junit/synnefo/api/SynnefoOptions.kt @@ -74,7 +74,15 @@ annotation class SynnefoOptions( /** * @return value indicating whether we should shuffle the backlog of tasks before scheduling them in CodeBuild */ - val shuffleBacklogBeforeExecution: Boolean = false + val shuffleBacklogBeforeExecution: Boolean = false, + /** + * @return value indicating how many times the whole run can retry + */ + val maxRetries: Int = 0, + /** + * @return value indicating how many times can an individual test be retried + */ + val retriesPerTest: Int = 3 ) @Retention(RetentionPolicy.RUNTIME) @Target(AnnotationTarget.CLASS) diff --git a/src/main/kotlin/albelli/junit/synnefo/runtime/AmazonCodeBuildScheduler.kt b/src/main/kotlin/albelli/junit/synnefo/runtime/AmazonCodeBuildScheduler.kt index 73a9855..078d065 100644 --- a/src/main/kotlin/albelli/junit/synnefo/runtime/AmazonCodeBuildScheduler.kt +++ b/src/main/kotlin/albelli/junit/synnefo/runtime/AmazonCodeBuildScheduler.kt @@ -2,6 +2,8 @@ package albelli.junit.synnefo.runtime import albelli.junit.synnefo.runtime.exceptions.SynnefoException import albelli.junit.synnefo.runtime.exceptions.SynnefoTestFailureException +import albelli.junit.synnefo.runtime.exceptions.SynnefoTestStoppedException +import albelli.junit.synnefo.runtime.exceptions.SynnefoTestTimedOutException import kotlinx.coroutines.* import kotlinx.coroutines.future.await import org.junit.runner.Description @@ -19,6 +21,7 @@ import java.io.File import java.net.URI import java.nio.file.Paths import java.util.* +import kotlin.collections.HashMap internal class AmazonCodeBuildScheduler(private val classLoader: ClassLoader) { @@ -72,42 +75,43 @@ internal class AmazonCodeBuildScheduler(private val classLoader: ClassLoader) { " discard-paths: yes" internal data class Job( - val runnerInfos: List, - val notifier: RunNotifier + val runnerInfos: List, + val notifier: RunNotifier, + val randomSeed : Long = System.currentTimeMillis() ) internal data class ScheduledJob(val originalJob: Job, val buildId: String, val info: SynnefoRunnerInfo, val junitDescription: Description) internal suspend fun scheduleAndWait(job: Job) { val uniqueProps = job.runnerInfos.groupBy { it.synnefoOptions }.map { it.key } - val featuresCount = uniqueProps.flatMap { it.featurePaths }.count() - if (featuresCount == 0) - { + if (featuresCount == 0) { println("No feature paths specified, will do nothing") return } - println("Going to run $featuresCount jobs") val locationMap = uniqueProps.map { val sourceLocation = uploadToS3AndGetSourcePath(it) ensureProjectExists(it) Pair(it, sourceLocation) - }.associateBy ( { it.first }, { it.second } ) + }.associateBy({ it.first }, { it.second }) // Use the sum of the threads as the total amount of threads available. // The downside of this approach is that if an override is used, the same value would be plugged in into both annotations val threads = uniqueProps.map { it.threads }.sum() - runAndWaitForJobs(job, threads, locationMap) + + val retryConfiguration = uniqueProps.map { it to RetryConfiguration(it.maxRetries, it.retriesPerTest) }.toMap() + runAndWaitForJobs(job, threads, locationMap, retryConfiguration) println("all jobs have finished") - for(prop in uniqueProps) { - s3.deleteS3uploads(prop.bucketName, locationMap[prop] ?: error("For whatever reason we don't have the source location for this setting")) + for (prop in uniqueProps) { + s3.deleteS3uploads(prop.bucketName, locationMap[prop] + ?: error("For whatever reason we don't have the source location for this setting")) } } private suspend fun S3AsyncClient.deleteS3uploads(bucketName: String, prefix: String) { - if(prefix.isNullOrWhiteSpace()) + if (prefix.isNullOrWhiteSpace()) throw SynnefoException("prefix can't be empty") val listObjectsRequest = ListObjectsRequest @@ -117,9 +121,7 @@ internal class AmazonCodeBuildScheduler(private val classLoader: ClassLoader) { .build() val listResponse = s3.listObjects(listObjectsRequest).await() - val identifiers = listResponse.contents().map { ObjectIdentifier.builder().key(it.key()).build() } - val deleteObjectsRequest = DeleteObjectsRequest.builder() .bucket(bucketName) .delete { t -> t.objects(identifiers) } @@ -128,18 +130,20 @@ internal class AmazonCodeBuildScheduler(private val classLoader: ClassLoader) { this.deleteObjects(deleteObjectsRequest).await() } - private suspend fun runAndWaitForJobs(job: Job, threads: Int, sourceLocations: Map) { + private suspend fun runAndWaitForJobs(job: Job, threads: Int, sourceLocations: Map, retryConfiguration: Map) { + val triesPerTest: MutableMap = HashMap() val currentQueue = LinkedList() val backlog = job.runnerInfos.toMutableList() val codeBuildRequestLimit = 100 val s3Tasks = ArrayList>() + val notificationTicker = NotificationTicker(15) { println("current running total: ${currentQueue.size}; backlog: ${backlog.size}") } - var periodicalUpdateTicker = 0 + println("Going to run ${backlog.count()} jobs") while (backlog.size > 0 || currentQueue.size > 0) { if (!currentQueue.isEmpty()) { - val lookupDict: Map = currentQueue.associateBy({ it.buildId }, { it }) + val buildIdToJobMap: Map = currentQueue.associateBy({ it.buildId }, { it }) val dequeuedIds = currentQueue.dequeueUpTo(codeBuildRequestLimit).map { it.buildId } val request = BatchGetBuildsRequest @@ -149,20 +153,44 @@ internal class AmazonCodeBuildScheduler(private val classLoader: ClassLoader) { val response = codeBuild.batchGetBuilds(request).await() for (build in response.builds()) { - - val originalJob = lookupDict.getValue(build.id()) + val originalJob = buildIdToJobMap.getValue(build.id()) when (build.buildStatus()!!) { - STOPPED, TIMED_OUT, FAILED, FAULT -> { + STOPPED -> { + println("build ${originalJob.info.cucumberFeatureLocation} was stopped") + job.notifier.fireTestFailure(Failure(originalJob.junitDescription, SynnefoTestStoppedException("Test ${originalJob.info.cucumberFeatureLocation}"))) + } + + TIMED_OUT -> { + println("build ${originalJob.info.cucumberFeatureLocation} timed out") + job.notifier.fireTestFailure(Failure(originalJob.junitDescription, SynnefoTestTimedOutException("Test ${originalJob.info.cucumberFeatureLocation}"))) + } + + FAILED, FAULT -> run { + println("build ${originalJob.info.cucumberFeatureLocation} failed") + val thisTestRetryConfiguration = retryConfiguration[originalJob.info.synnefoOptions] + ?: error("Failed to get the retry configuration for ${originalJob.info.cucumberFeatureLocation}") + if (thisTestRetryConfiguration.maxRetries > 0) { + println("It's still possible to retry with ${thisTestRetryConfiguration.maxRetries} total retries left") + val newRetries = triesPerTest.getOrDefault(originalJob.info.cucumberFeatureLocation, 0) + 1 + if (newRetries <= thisTestRetryConfiguration.retriesPerTest) { + println("Adding the test back to the backlog.") + triesPerTest[originalJob.info.cucumberFeatureLocation] = newRetries + thisTestRetryConfiguration.maxRetries-- + backlog.add(originalJob.info) + return@run + } + println("But this test had exhausted the maximum retries per test") + } + job.notifier.fireTestFailure(Failure(originalJob.junitDescription, SynnefoTestFailureException("Test ${originalJob.info.cucumberFeatureLocation}"))) s3Tasks.add(GlobalScope.async { collectArtifact(originalJob) }) - println("build ${originalJob.info.cucumberFeatureLocation} failed") } SUCCEEDED -> { + println("build ${originalJob.info.cucumberFeatureLocation} succeeded") job.notifier.fireTestFinished(originalJob.junitDescription) s3Tasks.add(GlobalScope.async { collectArtifact(originalJob) }) - println("build ${originalJob.info.cucumberFeatureLocation} succeeded") } IN_PROGRESS -> currentQueue.addLast(originalJob) @@ -173,25 +201,22 @@ internal class AmazonCodeBuildScheduler(private val classLoader: ClassLoader) { } val availableSlots = threads - currentQueue.size - val jobsToSpawn = backlog.dequeueUpTo(availableSlots) val rate = 25 while (jobsToSpawn.isNotEmpty()) { - val currentBatch = jobsToSpawn - .dequeueUpTo(rate) - - val scheduledJobs = - currentBatch - .map { + val currentBatch = jobsToSpawn.dequeueUpTo(rate) - val settings = it.synnefoOptions - val location = sourceLocations[settings] - ?: error("For whatever reason we don't have the source location for this setting") + val scheduledJobs = currentBatch + .map { - GlobalScope.async { startBuild(job, settings, location, it) } - } - .map { it.await() } + val settings = it.synnefoOptions + val location = sourceLocations[settings] + ?: error("For whatever reason we don't have the source location for this setting") + val shouldTriggerNotifier = !triesPerTest.containsKey(it.cucumberFeatureLocation) + GlobalScope.async { startBuild(job, settings, location, it, shouldTriggerNotifier) } + } + .map { it.await() } currentQueue.addAll(scheduledJobs) println("started ${currentBatch.count()} jobs") @@ -199,19 +224,13 @@ internal class AmazonCodeBuildScheduler(private val classLoader: ClassLoader) { } delay(2000) - periodicalUpdateTicker++ - if (periodicalUpdateTicker > 15) - { - println("current running total: ${currentQueue.size}; backlog: ${backlog.size}") - periodicalUpdateTicker = 0 - } + notificationTicker.tick() } s3Tasks.awaitAll() } - private suspend fun collectArtifact(result : ScheduledJob) - { + private suspend fun collectArtifact(result: ScheduledJob) { val targetDirectory = result.info.synnefoOptions.reportTargetDir val buildId = result.buildId.substring(result.buildId.indexOf(':') + 1) @@ -236,7 +255,6 @@ internal class AmazonCodeBuildScheduler(private val classLoader: ClassLoader) { } private suspend fun uploadToS3AndGetSourcePath(settings: SynnefoProperties): String { - println("uploadToS3AndGetSourcePath") val targetDirectory = settings.bucketSourceFolder + UUID.randomUUID() + "/" @@ -245,8 +263,7 @@ internal class AmazonCodeBuildScheduler(private val classLoader: ClassLoader) { val jarFileName = jarPath.fileName.toString() for (feature in settings.featurePaths) { - if (!feature.scheme.equals("classpath", true)) - { + if (!feature.scheme.equals("classpath", true)) { s3.multipartUploadFile(settings.bucketName, targetDirectory + feature.schemeSpecificPart, feature, 5) } } @@ -266,7 +283,6 @@ internal class AmazonCodeBuildScheduler(private val classLoader: ClassLoader) { } private suspend fun projectExists(projectName: String?): Boolean { - val batchGetProjectsRequest = BatchGetProjectsRequest .builder() .names(projectName) @@ -318,10 +334,10 @@ internal class AmazonCodeBuildScheduler(private val classLoader: ClassLoader) { codeBuild.createProject(createRequest).await() } - private suspend fun startBuild(job: Job, settings: SynnefoProperties, sourceLocation: String, info: SynnefoRunnerInfo): ScheduledJob { + private suspend fun startBuild(job: Job, settings: SynnefoProperties, sourceLocation: String, info: SynnefoRunnerInfo, triggerTestStarted: Boolean): ScheduledJob { val useStandardImage = settings.image.startsWith("aws/codebuild/standard:2.0") - val buildSpec = generateBuildspecForFeature(Paths.get(settings.classPath).fileName.toString(), info.cucumberFeatureLocation, info.runtimeOptions, useStandardImage) + val buildSpec = generateBuildspecForFeature(Paths.get(settings.classPath).fileName.toString(), info.cucumberFeatureLocation, info.runtimeOptions, useStandardImage, job.randomSeed) val buildStartRequest = StartBuildRequest.builder() .projectName(settings.projectName) @@ -341,16 +357,17 @@ internal class AmazonCodeBuildScheduler(private val classLoader: ClassLoader) { .build() - val startBuildResponse = codeBuild.startBuild(buildStartRequest).await() + val startBuildResponse = codeBuild.startBuild(buildStartRequest).await() val buildId = startBuildResponse.build().id() val junitDescription = Description.createTestDescription("Synnefo", info.cucumberFeatureLocation) - job.notifier.fireTestStarted(junitDescription) + + if (triggerTestStarted) + job.notifier.fireTestStarted(junitDescription) return ScheduledJob(job, buildId, info, junitDescription) } private fun readFileChunks(file: URI, partSizeMb: Int) = sequence { - val connection = file.toValidURL(classLoader).openConnection() val stream = connection.getInputStream() @@ -388,18 +405,18 @@ internal class AmazonCodeBuildScheduler(private val classLoader: ClassLoader) { val partETags = chunks.map { - val uploadRequest = UploadPartRequest.builder() - .bucket(bucket) - .key(filteredKey) - .uploadId(response.uploadId()) - .partNumber(it.first) - .build() - val data = AsyncRequestBody.fromBytes(it.second) + val uploadRequest = UploadPartRequest.builder() + .bucket(bucket) + .key(filteredKey) + .uploadId(response.uploadId()) + .partNumber(it.first) + .build() + val data = AsyncRequestBody.fromBytes(it.second) - val etag = s3clientExt.uploadPart(uploadRequest, data).await().eTag() + val etag = s3clientExt.uploadPart(uploadRequest, data).await().eTag() - CompletedPart.builder().partNumber(it.first).eTag(etag).build() - } + CompletedPart.builder().partNumber(it.first).eTag(etag).build() + } val completedMultipartUpload = CompletedMultipartUpload.builder().parts(partETags) .build() @@ -413,44 +430,60 @@ internal class AmazonCodeBuildScheduler(private val classLoader: ClassLoader) { s3clientExt.completeMultipartUpload(completeMultipartUploadRequest).await() } - private fun generateBuildspecForFeature(jar: String, feature: String, runtimeOptions: List, useStandardImage: Boolean): String { + private fun generateBuildspecForFeature(jar: String, feature: String, runtimeOptions: List, useStandardImage: Boolean, randomSeed : Long): String { val sb = StringBuilder() sb.appendWithEscaping("java") sb.appendWithEscaping("-cp") sb.appendWithEscaping("./../$jar") + sb.appendWithEscaping(String.format("-D%s=%s", "java.random.seed", randomSeed)) getSystemProperties().forEach { sb.appendWithEscaping(it) } sb.appendWithEscaping("cucumber.api.cli.Main") - if(feature.startsWith("classpath")) { + if (feature.startsWith("classpath")) { sb.appendWithEscaping(feature) - } - else { + } else { sb.appendWithEscaping("./../$feature") } runtimeOptions.forEach { sb.appendWithEscaping(it) } - val image = if (useStandardImage) { this.buildSpecTemplateStandard2_0 } else {this.buildSpecTemplate } + + val image = if (useStandardImage) { + this.buildSpecTemplateStandard2_0 + } else { + this.buildSpecTemplate + } return String.format(image, sb.toString()) } private fun getSystemProperties(): List { - val transferrableProperties = System.getProperty("SynnefoTransferrableProperties") - if(transferrableProperties.isNullOrWhiteSpace()) + if (transferrableProperties.isNullOrWhiteSpace()) return arrayListOf() val propertiesList = transferrableProperties.split(';') return System.getProperties() - .map { - Pair(it.key.toString(), it.value.toString().trim()) - } - .filter { pair -> - val isNotIgnored = propertiesList.any { pair.first.startsWith(it, ignoreCase = true) } - val isNotEmpty = !pair.second.isNullOrWhiteSpace() - isNotIgnored && isNotEmpty - } - .map { + .map { + Pair(it.key.toString(), it.value.toString().trim()) + } + .filter { pair -> + val isNotIgnored = propertiesList.any { pair.first.startsWith(it, ignoreCase = true) } + val isNotEmpty = !pair.second.isNullOrWhiteSpace() + isNotIgnored && isNotEmpty + } + .map { String.format("-D%s=%s", it.first, it.second) + } + } + + internal class RetryConfiguration(var maxRetries: Int, val retriesPerTest: Int) + + internal class NotificationTicker(val times: Int, val tickFun: () -> Unit) { + private var internalTicker = 0 + fun tick() { + if (internalTicker++ >= times) { + internalTicker = 0 + tickFun() } + } } } diff --git a/src/main/kotlin/albelli/junit/synnefo/runtime/SynnefoProperties.kt b/src/main/kotlin/albelli/junit/synnefo/runtime/SynnefoProperties.kt index ea1dc24..eb39c0c 100644 --- a/src/main/kotlin/albelli/junit/synnefo/runtime/SynnefoProperties.kt +++ b/src/main/kotlin/albelli/junit/synnefo/runtime/SynnefoProperties.kt @@ -4,7 +4,6 @@ import albelli.junit.synnefo.api.SynnefoOptions import albelli.junit.synnefo.api.SynnefoRunLevel import albelli.junit.synnefo.runtime.exceptions.SynnefoException import cucumber.api.CucumberOptions -import software.amazon.awssdk.core.interceptor.Context import java.net.URI internal class SynnefoProperties( @@ -22,6 +21,8 @@ internal class SynnefoProperties( val outputFileName: String, val cucumberForcedTags: String, val shuffleBacklogBeforeExecution: Boolean, + val maxRetries: Int, + val retriesPerTest: Int, val classPath: String, val featurePaths: List) { @@ -40,6 +41,8 @@ internal class SynnefoProperties( getAnyVar("outputFileName", opt.outputFileName), opt.cucumberForcedTags, getAnyVar("shuffleBacklogBeforeExecution", opt.shuffleBacklogBeforeExecution), + getAnyVar("maxRetries", opt.maxRetries), + getAnyVar("retriesPerTest", opt.retriesPerTest), "", listOf() ) @@ -59,6 +62,8 @@ internal class SynnefoProperties( opt.outputFileName, opt.cucumberForcedTags, opt.shuffleBacklogBeforeExecution, + opt.maxRetries, + opt.retriesPerTest, classPath, featurePaths ) diff --git a/src/main/kotlin/albelli/junit/synnefo/runtime/exceptions/SynnefoTestFailureException.kt b/src/main/kotlin/albelli/junit/synnefo/runtime/exceptions/SynnefoTestFailureException.kt index d1fcf27..c2a6a4a 100644 --- a/src/main/kotlin/albelli/junit/synnefo/runtime/exceptions/SynnefoTestFailureException.kt +++ b/src/main/kotlin/albelli/junit/synnefo/runtime/exceptions/SynnefoTestFailureException.kt @@ -6,4 +6,5 @@ class SynnefoTestFailureException(message: String) : RuntimeException(message) { override fun fillInStackTrace(): Throwable { return this } -} \ No newline at end of file +} + diff --git a/src/main/kotlin/albelli/junit/synnefo/runtime/exceptions/SynnefoTestStoppedException.kt b/src/main/kotlin/albelli/junit/synnefo/runtime/exceptions/SynnefoTestStoppedException.kt new file mode 100644 index 0000000..05ce0f6 --- /dev/null +++ b/src/main/kotlin/albelli/junit/synnefo/runtime/exceptions/SynnefoTestStoppedException.kt @@ -0,0 +1,9 @@ +package albelli.junit.synnefo.runtime.exceptions + +class SynnefoTestStoppedException(message: String) : RuntimeException(message) { + @Synchronized + override fun fillInStackTrace(): Throwable { + return this + } +} + diff --git a/src/main/kotlin/albelli/junit/synnefo/runtime/exceptions/SynnefoTestTimedOutException.kt b/src/main/kotlin/albelli/junit/synnefo/runtime/exceptions/SynnefoTestTimedOutException.kt new file mode 100644 index 0000000..b38e679 --- /dev/null +++ b/src/main/kotlin/albelli/junit/synnefo/runtime/exceptions/SynnefoTestTimedOutException.kt @@ -0,0 +1,8 @@ +package albelli.junit.synnefo.runtime.exceptions + +class SynnefoTestTimedOutException(message: String) : RuntimeException(message) { + @Synchronized + override fun fillInStackTrace(): Throwable { + return this + } +} \ No newline at end of file