Skip to content

Commit

Permalink
fix(core): killing paused without subtask should transition to KILLED
Browse files Browse the repository at this point in the history
Fixes #6243

When we kill an execution that is running a Pause task that didn't have any subtask, we must transition the task run to KILLED immediatly or the executor will process the Pause task and transition it to SUCCESS.
  • Loading branch information
loicmathieu committed Jan 3, 2025
1 parent ce81990 commit e37fd76
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 4 deletions.
13 changes: 9 additions & 4 deletions core/src/main/java/io/kestra/core/services/ExecutionService.java
Original file line number Diff line number Diff line change
Expand Up @@ -322,13 +322,18 @@ private Execution markAs(final Execution execution, Flow flow, String taskRunId,
newTaskRun = newTaskRun.withOutputs(pauseTask.generateOutputs(onResumeInputs));
}

if (task instanceof Pause pauseTask && pauseTask.getTasks() == null && newState == State.Type.RUNNING) {
newTaskRun = newTaskRun.withState(State.Type.SUCCESS);
// if it's a Pause task with no subtask, we terminate the task
if (task instanceof Pause pauseTask && pauseTask.getTasks() == null) {
if (newState == State.Type.RUNNING) {
newTaskRun = newTaskRun.withState(State.Type.SUCCESS);
} else if (newState == State.Type.KILLING) {
newTaskRun = newTaskRun.withState(State.Type.KILLED);
}
}

if (originalTaskRun.getAttempts() != null && !originalTaskRun.getAttempts().isEmpty()) {
ArrayList<TaskRunAttempt> attempts = new ArrayList<>(originalTaskRun.getAttempts());
attempts.set(attempts.size() - 1, attempts.get(attempts.size() - 1).withState(newState));
attempts.set(attempts.size() - 1, attempts.getLast().withState(newState));
newTaskRun = newTaskRun.withAttempts(attempts);
}

Expand Down Expand Up @@ -450,7 +455,7 @@ public void delete(
* The execution must be paused or this call will be a no-op.
*
* @param execution the execution to resume
* @param newState should be RUNNING or KILLING, other states may lead to undefined behaviour
* @param newState should be RUNNING or KILLING, other states may lead to undefined behavior
* @param flow the flow of the execution
* @return the execution in the new state.
* @throws Exception if the state of the execution cannot be updated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,4 +368,20 @@ void deleteExecutionKeepLogs() throws TimeoutException, QueueException, IOExcept
assertThat(executionRepository.findById(execution.getTenantId(),execution.getId()), is(Optional.empty()));
assertThat(logRepository.findByExecutionId(execution.getTenantId(),execution.getId(), Level.INFO), hasSize(4));
}

@Test
@LoadFlows({"flows/valids/pause_no_tasks.yaml"})
void shouldKillPausedExecutions() throws Exception {
Execution execution = runnerUtils.runOneUntilPaused(null, "io.kestra.tests", "pause_no_tasks");
Flow flow = flowRepository.findByExecution(execution);

assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.getState().getCurrent(), is(State.Type.PAUSED));

Execution killed = executionService.kill(execution, flow);

assertThat(killed.getState().getCurrent(), is(State.Type.RESTARTED));
assertThat(killed.findTaskRunsByTaskId("pause").getFirst().getState().getCurrent(), is(State.Type.KILLED));
assertThat(killed.getState().getHistories(), hasSize(4));
}
}

0 comments on commit e37fd76

Please sign in to comment.