Skip to content

Commit

Permalink
fix(core): don't crash the worker if it receive a flowable task
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu authored and tchiotludo committed May 29, 2023
1 parent 1de9135 commit ec733c5
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 2 deletions.
14 changes: 12 additions & 2 deletions core/src/main/java/io/kestra/core/runners/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -309,14 +309,24 @@ private void logTerminated(WorkerTask workerTask) {
}

private WorkerTask runAttempt(WorkerTask workerTask) {
RunnableTask<?> task = (RunnableTask<?>) workerTask.getTask();

RunContext runContext = workerTask
.getRunContext()
.forWorker(this.applicationContext, workerTask);

Logger logger = runContext.logger();

if(!(workerTask.getTask() instanceof RunnableTask)) {
// This should never happen but better to deal with it than crashing the Worker
TaskRunAttempt attempt = TaskRunAttempt.builder().state(new State().withState(State.Type.FAILED)).build();
List<TaskRunAttempt> attempts = this.addAttempt(workerTask, attempt);
TaskRun taskRun = workerTask.getTaskRun().withAttempts(attempts);
logger.error("Unable to execute the task '" + workerTask.getTask().getId() +
"': only runnable tasks can be executed by the worker but the task is of type " + workerTask.getTask().getClass());
return workerTask.withTaskRun(taskRun);
}

RunnableTask<?> task = (RunnableTask<?>) workerTask.getTask();

TaskRunAttempt.TaskRunAttemptBuilder builder = TaskRunAttempt.builder()
.state(new State().withState(State.Type.RUNNING));

Expand Down
48 changes: 48 additions & 0 deletions core/src/test/java/io/kestra/core/runners/WorkerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.tasks.flows.Pause;
import io.micronaut.context.ApplicationContext;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -73,6 +74,53 @@ void success() throws TimeoutException {
assertThat(workerTaskResult.get().getTaskRun().getState().getHistories().size(), is(3));
}

@Test
void failOnWorkerTaskWithFlowable() throws TimeoutException {
Worker worker = new Worker(applicationContext, 8);
worker.run();

AtomicReference<WorkerTaskResult> workerTaskResult = new AtomicReference<>(null);
workerTaskResultQueue.receive(workerTaskResult::set);

Pause pause = Pause.builder()
.type(Pause.class.getName())
.delay(Duration.ofSeconds(1))
.id("unit-test")
.build();

io.kestra.core.tasks.flows.Worker theWorkerTask = io.kestra.core.tasks.flows.Worker.builder()
.type(io.kestra.core.tasks.flows.Worker.class.getName())
.id("worker-unit-test")
.tasks(List.of(pause))
.build();

Flow flow = Flow.builder()
.id(IdUtils.create())
.namespace("io.kestra.unit-test")
.tasks(Collections.singletonList(theWorkerTask))
.build();

Execution execution = TestsUtils.mockExecution(flow, ImmutableMap.of());

ResolvedTask resolvedTask = ResolvedTask.of(pause);

WorkerTask workerTask = WorkerTask.builder()
.runContext(runContextFactory.of(ImmutableMap.of("key", "value")))
.task(theWorkerTask)
.taskRun(TaskRun.of(execution, resolvedTask))
.build();

workerTaskQueue.emit(workerTask);

Await.until(
() -> workerTaskResult.get() != null && workerTaskResult.get().getTaskRun().getState().isFailed(),
Duration.ofMillis(100),
Duration.ofMinutes(1)
);

assertThat(workerTaskResult.get().getTaskRun().getState().getHistories().size(), is(3));
}

@Test
void killed() throws InterruptedException, TimeoutException {
List<LogEntry> logs = new ArrayList<>();
Expand Down

0 comments on commit ec733c5

Please sign in to comment.