Skip to content

Commit

Permalink
feat(taskController): add query timeouts when retrieving execution body
Browse files Browse the repository at this point in the history
  • Loading branch information
apoorv-mahajan authored and kirangodishala committed Nov 28, 2024
1 parent 1cf55e1 commit 40fd05d
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,12 @@ class DualExecutionRepository(

override fun retrievePipelineExecutionDetailsForApplication(
@Nonnull application: String,
pipelineConfigIds: List<String>): Collection<PipelineExecution> {
pipelineConfigIds: List<String>,
queryTimeoutSeconds: Int
): Collection<PipelineExecution> {
return (
primary.retrievePipelineExecutionDetailsForApplication(application, pipelineConfigIds) +
previous.retrievePipelineExecutionDetailsForApplication(application, pipelineConfigIds)
primary.retrievePipelineExecutionDetailsForApplication(application, pipelineConfigIds, queryTimeoutSeconds) +
previous.retrievePipelineExecutionDetailsForApplication(application, pipelineConfigIds, queryTimeoutSeconds)
).distinctBy { it.id }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ Collection<String> retrieveAndFilterPipelineExecutionIdsForApplication(

@Nonnull
Collection<PipelineExecution> retrievePipelineExecutionDetailsForApplication(
@Nonnull String application, @Nonnull List<String> pipelineConfigIds);
@Nonnull String application,
@Nonnull List<String> pipelineConfigIds,
int queryTimeoutSeconds);

/**
* Returns executions in the time boundary. Redis impl does not respect pageSize or offset params,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,9 @@ class InMemoryExecutionRepository : ExecutionRepository {

override fun retrievePipelineExecutionDetailsForApplication(
application: String,
pipelineConfigIds: List<String>): Collection<PipelineExecution> {
pipelineConfigIds: List<String>,
queryTimeoutSeconds: Int
): Collection<PipelineExecution> {
return pipelines.values
.filter { it.application == application && pipelineConfigIds.contains(it.pipelineConfigId) }
.distinctBy { it.id }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,9 @@ public void delete(@Nonnull ExecutionType type, @Nonnull List<String> idsToDelet

@Override
public @Nonnull List<PipelineExecution> retrievePipelineExecutionDetailsForApplication(
@Nonnull String application, @Nonnull List<String> pipelineExecutionIds) {
@Nonnull String application,
@Nonnull List<String> pipelineExecutionIds,
int queryTimeoutSeconds) {
// TODO: not implemented yet - this method, at present, is primarily meant for the
// SqlExecutionRepository
// implementation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,9 @@ class SqlExecutionRepository(
*/
override fun retrievePipelineExecutionDetailsForApplication(
application: String,
pipelineExecutions: List<String>): Collection<PipelineExecution> {
pipelineExecutions: List<String>,
queryTimeoutSeconds: Int
): Collection<PipelineExecution> {
withPool(poolName) {
val baseQuery = jooq.select(selectExecutionFields(compressionProperties))
.from(
Expand All @@ -565,6 +567,7 @@ class SqlExecutionRepository(
field("application").eq(application)
.and(field("id").`in`(*pipelineExecutions.toTypedArray()))
)
.queryTimeout(queryTimeoutSeconds) // add an explicit timeout so that the query doesn't run forever
.fetch()

log.info("getting stage information for all the executions found so far")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,9 @@ import java.nio.charset.Charset
import java.time.Clock
import java.time.ZoneOffset
import java.util.concurrent.Callable
import java.util.concurrent.CancellationException
import java.util.concurrent.Executors
import java.util.concurrent.ExecutorService
import java.util.concurrent.Future
import java.util.concurrent.TimeoutException
import java.util.concurrent.TimeUnit
import java.util.stream.Collectors

Expand Down Expand Up @@ -889,7 +887,7 @@ class TaskController {
*
*<p>
* 3. It then processes n pipeline executions at a time to retrieve the complete execution details. In addition,
* we make use of a configured thread pool so that multiple batches of n executions can be processed parallelly.
* we make use of a configured thread pool to process multiple batches of n executions in parallel.
*/
private List<PipelineExecution> optimizedGetPipelineExecutions(String application,
List<String> front50PipelineConfigIds, ExecutionCriteria executionCriteria) {
Expand Down Expand Up @@ -946,34 +944,33 @@ class TaskController {
filteredPipelineExecutionIds
.collate(this.configurationProperties.getMaxNumberOfPipelineExecutionsToProcess())
.each { List<String> chunkedExecutions ->
futures.add(
executorService.submit({
List<PipelineExecution> result = executionRepository.retrievePipelineExecutionDetailsForApplication(
application, chunkedExecutions
)
log.debug("completed execution retrieval for ${result.size()} executions")
return result
} as Callable<Collection<PipelineExecution>>)
)
futures.add(executorService.submit({
try {
List<PipelineExecution> result = executionRepository.retrievePipelineExecutionDetailsForApplication(
application,
chunkedExecutions,
this.configurationProperties.getExecutionRetrievalTimeoutSeconds()
)
log.debug("completed execution retrieval for ${result.size()} executions")
return result
} catch (Exception e) { // handle exceptions such as query timeouts etc.
log.error("error occurred while retrieving these executions: ${chunkedExecutions.toString()} " +
"for application: ${application}.", e)
// in case of errors, this will return partial results. We are going with this best-effort approach
// because the UI keeps refreshing the executions view frequently. Hence, the user will eventually see
// these executions via one of the subsequent calls. Partial data is better than an exception at this
// point since the latter will result in a UI devoid of any executions.
//
return []
}
} as Callable<Collection<PipelineExecution>>))
}

futures.each {
Future<Collection<PipelineExecution>> future ->
try {
finalResult.addAll(
future.get(this.configurationProperties.getExecutionRetrievalTimeoutSeconds(), TimeUnit.SECONDS)
)
} catch (Exception e) {
// no need to fail the entire thing if one thread fails. This means the final output will simply not
// contain any of these failed executions.
log.error("Task failed with error", e)
}
}
futures.each { Future<Collection<PipelineExecution>> future -> finalResult.addAll(future.get()) }
return finalResult
} finally {
// attempt to shutdown the executor service
try {
executorService.shutdownNow()
executorService.shutdownNow() // attempt to shutdown the executor service
} catch (Exception e) {
log.error("shutting down the executor service failed", e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ public class TaskControllerConfigurationProperties {
int maxNumberOfPipelineExecutionsToProcess = 150;

/**
* only applicable if optimizeExecutionRetrieval = true. No retrieval thread should take more than
* 60s to complete.
* only applicable if optimizeExecutionRetrieval = true. It specifies the max time after which the
* execution retrieval query will timeout.
*/
long executionRetrievalTimeoutSeconds = 60;
int executionRetrievalTimeoutSeconds = 60;

/** moved this to here. Earlier definition was in the {@link TaskController} class */
int daysOfExecutionHistory = 14;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,31 @@ import com.netflix.spinnaker.orca.sql.pipeline.persistence.SqlExecutionRepositor
import com.nhaarman.mockito_kotlin.mock
import dev.minutest.junit.JUnit5Minutests
import dev.minutest.rootContext
import org.jooq.exception.DataAccessException
import org.jooq.impl.DSL.field
import org.jooq.impl.DSL.table
import org.junit.Assert.assertThrows
import org.junit.jupiter.api.assertThrows
import org.mockito.Mockito
import org.springframework.test.web.servlet.MockMvc
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get
import org.springframework.test.web.servlet.setup.MockMvcBuilders
import strikt.api.expectCatching
import strikt.api.expectThat
import strikt.assertions.isA
import strikt.assertions.isEqualTo
import strikt.assertions.isTrue
import strikt.assertions.isFailure
import java.time.Clock
import java.time.Instant
import java.time.ZoneId
import java.time.temporal.ChronoUnit

class TaskControllerTest : JUnit5Minutests {
data class Fixture(val optimizeExecution: Boolean, val timeout: Double = 60.0) {
data class Fixture(val optimizeExecution: Boolean) {

private val clock: Clock = Clock.fixed(Instant.now(), ZoneId.systemDefault())
val database: SqlTestUtil.TestDatabase = SqlTestUtil.initTcMysqlDatabase()!!



private val executionRepository: SqlExecutionRepository = SqlExecutionRepository(
partitionName = "test",
jooq = database.context,
Expand All @@ -64,7 +67,6 @@ class TaskControllerTest : JUnit5Minutests {
private val taskControllerConfigurationProperties: TaskControllerConfigurationProperties = TaskControllerConfigurationProperties()
.apply {
optimizeExecutionRetrieval = optimizeExecution
executionRetrievalTimeoutSeconds = timeout.toLong()
}

private val daysOfExecutionHistory: Long = taskControllerConfigurationProperties.daysOfExecutionHistory.toLong()
Expand Down Expand Up @@ -238,28 +240,20 @@ class TaskControllerTest : JUnit5Minutests {
}
}

context("execution retrieval with optimization having timeouts") {
context("test query having explicit query timeouts") {
fixture {
Fixture(true, 0.1)
Fixture(true)
}

before { setup() }
after { cleanUp() }

test("retrieve executions with limit = 2 & expand = false") {
expectThat(database.context.fetchCount(table("pipelines"))).isEqualTo(5)
val response = subject.perform(get("/applications/test-app/pipelines?limit=2&expand=false")).andReturn().response
val results = OrcaObjectMapper.getInstance().readValue(response.contentAsString, object : TypeReference<List<PipelineExecution>>() {})
expectThat(results.isEmpty()).isTrue()
}

test("retrieve executions with limit = 2 & expand = false with statuses") {
expectThat(database.context.fetchCount(table("pipelines"))).isEqualTo(5)
val response = subject.perform(get(
"/applications/test-app/pipelines?limit=2&expand=false&statuses=RUNNING,SUSPENDED,PAUSED,NOT_STARTED")
).andReturn().response
val results = OrcaObjectMapper.getInstance().readValue(response.contentAsString, object : TypeReference<List<PipelineExecution>>() {})
expectThat(results.isEmpty()).isTrue()
test("it returns a DataAccessException on query timeout") {
expectCatching {
database.context.select(field("sleep(10)")).queryTimeout(1).execute()
}
.isFailure()
.isA<DataAccessException>()
}
}
}
Expand Down

0 comments on commit 40fd05d

Please sign in to comment.