Skip to content

Commit

Permalink
Add retry functionality to the AmazonCodeBuildScheduler. (#38)
Browse files Browse the repository at this point in the history
Now we have two more parameters: maxRetries and retriesPerTest.
retriesPerTest has a default value of 3, while maxRetries has a defaul value of 0.
What this means is that nothing will be retried by default and the overall retry value should be explicitly set.

Both parameters can be overridden using the same techniques as all the other parameters: via a -D system variable or via environment variables.

All the retries pass the same random seed as a -Djava.random.seed parameter so the test can pick it up and restart using the exact same values.
  • Loading branch information
derwasp authored Jan 7, 2020
1 parent 3f7459f commit e0eabc0
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 80 deletions.
10 changes: 9 additions & 1 deletion src/main/kotlin/albelli/junit/synnefo/api/SynnefoOptions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,15 @@ annotation class SynnefoOptions(
/**
* @return value indicating whether we should shuffle the backlog of tasks before scheduling them in CodeBuild
*/
val shuffleBacklogBeforeExecution: Boolean = false
val shuffleBacklogBeforeExecution: Boolean = false,
/**
* @return value indicating how many times the whole run can retry
*/
val maxRetries: Int = 0,
/**
* @return value indicating how many times can an individual test be retried
*/
val retriesPerTest: Int = 3
)
@Retention(RetentionPolicy.RUNTIME)
@Target(AnnotationTarget.CLASS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package albelli.junit.synnefo.runtime

import albelli.junit.synnefo.runtime.exceptions.SynnefoException
import albelli.junit.synnefo.runtime.exceptions.SynnefoTestFailureException
import albelli.junit.synnefo.runtime.exceptions.SynnefoTestStoppedException
import albelli.junit.synnefo.runtime.exceptions.SynnefoTestTimedOutException
import kotlinx.coroutines.*
import kotlinx.coroutines.future.await
import org.junit.runner.Description
Expand All @@ -19,6 +21,7 @@ import java.io.File
import java.net.URI
import java.nio.file.Paths
import java.util.*
import kotlin.collections.HashMap

internal class AmazonCodeBuildScheduler(private val classLoader: ClassLoader) {

Expand Down Expand Up @@ -72,42 +75,43 @@ internal class AmazonCodeBuildScheduler(private val classLoader: ClassLoader) {
" discard-paths: yes"

internal data class Job(
val runnerInfos: List<SynnefoRunnerInfo>,
val notifier: RunNotifier
val runnerInfos: List<SynnefoRunnerInfo>,
val notifier: RunNotifier,
val randomSeed : Long = System.currentTimeMillis()
)

internal data class ScheduledJob(val originalJob: Job, val buildId: String, val info: SynnefoRunnerInfo, val junitDescription: Description)

internal suspend fun scheduleAndWait(job: Job) {
val uniqueProps = job.runnerInfos.groupBy { it.synnefoOptions }.map { it.key }

val featuresCount = uniqueProps.flatMap { it.featurePaths }.count()
if (featuresCount == 0)
{
if (featuresCount == 0) {
println("No feature paths specified, will do nothing")
return
}
println("Going to run $featuresCount jobs")

val locationMap = uniqueProps.map {
val sourceLocation = uploadToS3AndGetSourcePath(it)
ensureProjectExists(it)
Pair(it, sourceLocation)
}.associateBy ( { it.first }, { it.second } )
}.associateBy({ it.first }, { it.second })

// Use the sum of the threads as the total amount of threads available.
// The downside of this approach is that if an override is used, the same value would be plugged in into both annotations
val threads = uniqueProps.map { it.threads }.sum()
runAndWaitForJobs(job, threads, locationMap)

val retryConfiguration = uniqueProps.map { it to RetryConfiguration(it.maxRetries, it.retriesPerTest) }.toMap()
runAndWaitForJobs(job, threads, locationMap, retryConfiguration)
println("all jobs have finished")

for(prop in uniqueProps) {
s3.deleteS3uploads(prop.bucketName, locationMap[prop] ?: error("For whatever reason we don't have the source location for this setting"))
for (prop in uniqueProps) {
s3.deleteS3uploads(prop.bucketName, locationMap[prop]
?: error("For whatever reason we don't have the source location for this setting"))
}
}

private suspend fun S3AsyncClient.deleteS3uploads(bucketName: String, prefix: String) {
if(prefix.isNullOrWhiteSpace())
if (prefix.isNullOrWhiteSpace())
throw SynnefoException("prefix can't be empty")

val listObjectsRequest = ListObjectsRequest
Expand All @@ -117,9 +121,7 @@ internal class AmazonCodeBuildScheduler(private val classLoader: ClassLoader) {
.build()

val listResponse = s3.listObjects(listObjectsRequest).await()

val identifiers = listResponse.contents().map { ObjectIdentifier.builder().key(it.key()).build() }

val deleteObjectsRequest = DeleteObjectsRequest.builder()
.bucket(bucketName)
.delete { t -> t.objects(identifiers) }
Expand All @@ -128,18 +130,20 @@ internal class AmazonCodeBuildScheduler(private val classLoader: ClassLoader) {
this.deleteObjects(deleteObjectsRequest).await()
}

private suspend fun runAndWaitForJobs(job: Job, threads: Int, sourceLocations: Map<SynnefoProperties, String>) {
private suspend fun runAndWaitForJobs(job: Job, threads: Int, sourceLocations: Map<SynnefoProperties, String>, retryConfiguration: Map<SynnefoProperties, RetryConfiguration>) {
val triesPerTest: MutableMap<String, Int> = HashMap()
val currentQueue = LinkedList<ScheduledJob>()
val backlog = job.runnerInfos.toMutableList()
val codeBuildRequestLimit = 100
val s3Tasks = ArrayList<Deferred<Unit>>()
val notificationTicker = NotificationTicker(15) { println("current running total: ${currentQueue.size}; backlog: ${backlog.size}") }

var periodicalUpdateTicker = 0
println("Going to run ${backlog.count()} jobs")

while (backlog.size > 0 || currentQueue.size > 0) {

if (!currentQueue.isEmpty()) {
val lookupDict: Map<String, ScheduledJob> = currentQueue.associateBy({ it.buildId }, { it })
val buildIdToJobMap: Map<String, ScheduledJob> = currentQueue.associateBy({ it.buildId }, { it })
val dequeuedIds = currentQueue.dequeueUpTo(codeBuildRequestLimit).map { it.buildId }

val request = BatchGetBuildsRequest
Expand All @@ -149,20 +153,44 @@ internal class AmazonCodeBuildScheduler(private val classLoader: ClassLoader) {
val response = codeBuild.batchGetBuilds(request).await()

for (build in response.builds()) {

val originalJob = lookupDict.getValue(build.id())
val originalJob = buildIdToJobMap.getValue(build.id())

when (build.buildStatus()!!) {
STOPPED, TIMED_OUT, FAILED, FAULT -> {
STOPPED -> {
println("build ${originalJob.info.cucumberFeatureLocation} was stopped")
job.notifier.fireTestFailure(Failure(originalJob.junitDescription, SynnefoTestStoppedException("Test ${originalJob.info.cucumberFeatureLocation}")))
}

TIMED_OUT -> {
println("build ${originalJob.info.cucumberFeatureLocation} timed out")
job.notifier.fireTestFailure(Failure(originalJob.junitDescription, SynnefoTestTimedOutException("Test ${originalJob.info.cucumberFeatureLocation}")))
}

FAILED, FAULT -> run {
println("build ${originalJob.info.cucumberFeatureLocation} failed")
val thisTestRetryConfiguration = retryConfiguration[originalJob.info.synnefoOptions]
?: error("Failed to get the retry configuration for ${originalJob.info.cucumberFeatureLocation}")
if (thisTestRetryConfiguration.maxRetries > 0) {
println("It's still possible to retry with ${thisTestRetryConfiguration.maxRetries} total retries left")
val newRetries = triesPerTest.getOrDefault(originalJob.info.cucumberFeatureLocation, 0) + 1
if (newRetries <= thisTestRetryConfiguration.retriesPerTest) {
println("Adding the test back to the backlog.")
triesPerTest[originalJob.info.cucumberFeatureLocation] = newRetries
thisTestRetryConfiguration.maxRetries--
backlog.add(originalJob.info)
return@run
}
println("But this test had exhausted the maximum retries per test")
}

job.notifier.fireTestFailure(Failure(originalJob.junitDescription, SynnefoTestFailureException("Test ${originalJob.info.cucumberFeatureLocation}")))
s3Tasks.add(GlobalScope.async { collectArtifact(originalJob) })
println("build ${originalJob.info.cucumberFeatureLocation} failed")
}

SUCCEEDED -> {
println("build ${originalJob.info.cucumberFeatureLocation} succeeded")
job.notifier.fireTestFinished(originalJob.junitDescription)
s3Tasks.add(GlobalScope.async { collectArtifact(originalJob) })
println("build ${originalJob.info.cucumberFeatureLocation} succeeded")
}

IN_PROGRESS -> currentQueue.addLast(originalJob)
Expand All @@ -173,45 +201,36 @@ internal class AmazonCodeBuildScheduler(private val classLoader: ClassLoader) {
}

val availableSlots = threads - currentQueue.size

val jobsToSpawn = backlog.dequeueUpTo(availableSlots)

val rate = 25
while (jobsToSpawn.isNotEmpty()) {
val currentBatch = jobsToSpawn
.dequeueUpTo(rate)

val scheduledJobs =
currentBatch
.map {
val currentBatch = jobsToSpawn.dequeueUpTo(rate)

val settings = it.synnefoOptions
val location = sourceLocations[settings]
?: error("For whatever reason we don't have the source location for this setting")
val scheduledJobs = currentBatch
.map {

GlobalScope.async { startBuild(job, settings, location, it) }
}
.map { it.await() }
val settings = it.synnefoOptions
val location = sourceLocations[settings]
?: error("For whatever reason we don't have the source location for this setting")
val shouldTriggerNotifier = !triesPerTest.containsKey(it.cucumberFeatureLocation)
GlobalScope.async { startBuild(job, settings, location, it, shouldTriggerNotifier) }
}
.map { it.await() }

currentQueue.addAll(scheduledJobs)
println("started ${currentBatch.count()} jobs")
delay(2500)
}

delay(2000)
periodicalUpdateTicker++
if (periodicalUpdateTicker > 15)
{
println("current running total: ${currentQueue.size}; backlog: ${backlog.size}")
periodicalUpdateTicker = 0
}
notificationTicker.tick()
}

s3Tasks.awaitAll()
}

private suspend fun collectArtifact(result : ScheduledJob)
{
private suspend fun collectArtifact(result: ScheduledJob) {
val targetDirectory = result.info.synnefoOptions.reportTargetDir

val buildId = result.buildId.substring(result.buildId.indexOf(':') + 1)
Expand All @@ -236,7 +255,6 @@ internal class AmazonCodeBuildScheduler(private val classLoader: ClassLoader) {
}

private suspend fun uploadToS3AndGetSourcePath(settings: SynnefoProperties): String {

println("uploadToS3AndGetSourcePath")

val targetDirectory = settings.bucketSourceFolder + UUID.randomUUID() + "/"
Expand All @@ -245,8 +263,7 @@ internal class AmazonCodeBuildScheduler(private val classLoader: ClassLoader) {
val jarFileName = jarPath.fileName.toString()

for (feature in settings.featurePaths) {
if (!feature.scheme.equals("classpath", true))
{
if (!feature.scheme.equals("classpath", true)) {
s3.multipartUploadFile(settings.bucketName, targetDirectory + feature.schemeSpecificPart, feature, 5)
}
}
Expand All @@ -266,7 +283,6 @@ internal class AmazonCodeBuildScheduler(private val classLoader: ClassLoader) {
}

private suspend fun projectExists(projectName: String?): Boolean {

val batchGetProjectsRequest = BatchGetProjectsRequest
.builder()
.names(projectName)
Expand Down Expand Up @@ -318,10 +334,10 @@ internal class AmazonCodeBuildScheduler(private val classLoader: ClassLoader) {
codeBuild.createProject(createRequest).await()
}

private suspend fun startBuild(job: Job, settings: SynnefoProperties, sourceLocation: String, info: SynnefoRunnerInfo): ScheduledJob {
private suspend fun startBuild(job: Job, settings: SynnefoProperties, sourceLocation: String, info: SynnefoRunnerInfo, triggerTestStarted: Boolean): ScheduledJob {
val useStandardImage = settings.image.startsWith("aws/codebuild/standard:2.0")

val buildSpec = generateBuildspecForFeature(Paths.get(settings.classPath).fileName.toString(), info.cucumberFeatureLocation, info.runtimeOptions, useStandardImage)
val buildSpec = generateBuildspecForFeature(Paths.get(settings.classPath).fileName.toString(), info.cucumberFeatureLocation, info.runtimeOptions, useStandardImage, job.randomSeed)

val buildStartRequest = StartBuildRequest.builder()
.projectName(settings.projectName)
Expand All @@ -341,16 +357,17 @@ internal class AmazonCodeBuildScheduler(private val classLoader: ClassLoader) {
.build()


val startBuildResponse = codeBuild.startBuild(buildStartRequest).await()
val startBuildResponse = codeBuild.startBuild(buildStartRequest).await()
val buildId = startBuildResponse.build().id()
val junitDescription = Description.createTestDescription("Synnefo", info.cucumberFeatureLocation)
job.notifier.fireTestStarted(junitDescription)

if (triggerTestStarted)
job.notifier.fireTestStarted(junitDescription)

return ScheduledJob(job, buildId, info, junitDescription)
}

private fun readFileChunks(file: URI, partSizeMb: Int) = sequence {

val connection = file.toValidURL(classLoader).openConnection()
val stream = connection.getInputStream()

Expand Down Expand Up @@ -388,18 +405,18 @@ internal class AmazonCodeBuildScheduler(private val classLoader: ClassLoader) {

val partETags =
chunks.map {
val uploadRequest = UploadPartRequest.builder()
.bucket(bucket)
.key(filteredKey)
.uploadId(response.uploadId())
.partNumber(it.first)
.build()
val data = AsyncRequestBody.fromBytes(it.second)
val uploadRequest = UploadPartRequest.builder()
.bucket(bucket)
.key(filteredKey)
.uploadId(response.uploadId())
.partNumber(it.first)
.build()
val data = AsyncRequestBody.fromBytes(it.second)

val etag = s3clientExt.uploadPart(uploadRequest, data).await().eTag()
val etag = s3clientExt.uploadPart(uploadRequest, data).await().eTag()

CompletedPart.builder().partNumber(it.first).eTag(etag).build()
}
CompletedPart.builder().partNumber(it.first).eTag(etag).build()
}

val completedMultipartUpload = CompletedMultipartUpload.builder().parts(partETags)
.build()
Expand All @@ -413,44 +430,60 @@ internal class AmazonCodeBuildScheduler(private val classLoader: ClassLoader) {
s3clientExt.completeMultipartUpload(completeMultipartUploadRequest).await()
}

private fun generateBuildspecForFeature(jar: String, feature: String, runtimeOptions: List<String>, useStandardImage: Boolean): String {
private fun generateBuildspecForFeature(jar: String, feature: String, runtimeOptions: List<String>, useStandardImage: Boolean, randomSeed : Long): String {
val sb = StringBuilder()
sb.appendWithEscaping("java")
sb.appendWithEscaping("-cp")
sb.appendWithEscaping("./../$jar")
sb.appendWithEscaping(String.format("-D%s=%s", "java.random.seed", randomSeed))
getSystemProperties().forEach { sb.appendWithEscaping(it) }
sb.appendWithEscaping("cucumber.api.cli.Main")
if(feature.startsWith("classpath")) {
if (feature.startsWith("classpath")) {
sb.appendWithEscaping(feature)
}
else {
} else {
sb.appendWithEscaping("./../$feature")
}
runtimeOptions.forEach { sb.appendWithEscaping(it) }

val image = if (useStandardImage) { this.buildSpecTemplateStandard2_0 } else {this.buildSpecTemplate }

val image = if (useStandardImage) {
this.buildSpecTemplateStandard2_0
} else {
this.buildSpecTemplate
}
return String.format(image, sb.toString())
}

private fun getSystemProperties(): List<String> {

val transferrableProperties = System.getProperty("SynnefoTransferrableProperties")
if(transferrableProperties.isNullOrWhiteSpace())
if (transferrableProperties.isNullOrWhiteSpace())
return arrayListOf()

val propertiesList = transferrableProperties.split(';')

return System.getProperties()
.map {
Pair(it.key.toString(), it.value.toString().trim())
}
.filter { pair ->
val isNotIgnored = propertiesList.any { pair.first.startsWith(it, ignoreCase = true) }
val isNotEmpty = !pair.second.isNullOrWhiteSpace()
isNotIgnored && isNotEmpty
}
.map {
.map {
Pair(it.key.toString(), it.value.toString().trim())
}
.filter { pair ->
val isNotIgnored = propertiesList.any { pair.first.startsWith(it, ignoreCase = true) }
val isNotEmpty = !pair.second.isNullOrWhiteSpace()
isNotIgnored && isNotEmpty
}
.map {
String.format("-D%s=%s", it.first, it.second)
}
}

internal class RetryConfiguration(var maxRetries: Int, val retriesPerTest: Int)

internal class NotificationTicker(val times: Int, val tickFun: () -> Unit) {
private var internalTicker = 0
fun tick() {
if (internalTicker++ >= times) {
internalTicker = 0
tickFun()
}
}
}
}
Loading

0 comments on commit e0eabc0

Please sign in to comment.