Skip to content

Commit

Permalink
feat(core): add execution labels (#1190)
Browse files Browse the repository at this point in the history
* Add Execution labels

* Execution now features labels as the Flow does.
* The labels are passed in the same way the Flow inputs.
* The label names are prefixed with `label-`.
* Executions can be filtered using the labels.

Room for improvement:

* Index the labels within DB.
* Rework the labels/inputs passing - use a custom binding?
* Add a label filter to the Execution UI.

close #906

Co-authored-by: Loïc Mathieu <loikeseke@gmail.com>
  • Loading branch information
yuri1969 and loicmathieu authored May 16, 2023
1 parent fa52d5e commit e467c2b
Show file tree
Hide file tree
Showing 19 changed files with 237 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ public class Execution implements DeletedInterface {
@With
Map<String, Object> inputs;

@With
Map<String, String> labels;

@With
Map<String, Object> variables;

Expand Down Expand Up @@ -93,6 +96,7 @@ public Execution withState(State.Type state) {
this.flowRevision,
this.taskRunList,
this.inputs,
this.labels,
this.variables,
this.state.withState(state),
this.parentId,
Expand Down Expand Up @@ -122,6 +126,7 @@ public Execution withTaskRun(TaskRun taskRun) throws InternalException {
this.flowRevision,
newTaskRunList,
this.inputs,
this.labels,
this.variables,
this.state,
this.parentId,
Expand All @@ -139,6 +144,7 @@ public Execution childExecution(String childExecutionId, List<TaskRun> taskRunLi
this.flowRevision,
taskRunList,
this.inputs,
this.labels,
this.variables,
state,
childExecutionId != null ? this.getId() : null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ ArrayListTotal<Execution> find(
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
@Nullable List<State.Type> state
@Nullable List<State.Type> state,
@Nullable Map<String, String> labels
);

Flowable<Execution> find(
Expand All @@ -43,7 +44,8 @@ Flowable<Execution> find(
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
@Nullable List<State.Type> state
@Nullable List<State.Type> state,
@Nullable Map<String, String> labels
);

ArrayListTotal<TaskRun> findTaskRun(
Expand Down
32 changes: 22 additions & 10 deletions core/src/main/java/io/kestra/core/runners/RunnerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -260,41 +260,49 @@ private Map<String, Object> handleNestedInputs(Map<String, Object> inputs) {
}

public Execution runOne(String namespace, String flowId) throws TimeoutException {
return this.runOne(namespace, flowId, null, null, null);
return this.runOne(namespace, flowId, null, null, null, null);
}

public Execution runOne(String namespace, String flowId, Integer revision) throws TimeoutException {
return this.runOne(namespace, flowId, revision, null, null);
return this.runOne(namespace, flowId, revision, null, null, null);
}

public Execution runOne(String namespace, String flowId, Integer revision, BiFunction<Flow, Execution, Map<String, Object>> inputs) throws TimeoutException {
return this.runOne(namespace, flowId, revision, inputs, null);
return this.runOne(namespace, flowId, revision, inputs, null, null);
}

public Execution runOne(String namespace, String flowId, Duration duration) throws TimeoutException {
return this.runOne(namespace, flowId, null, null, duration);
return this.runOne(namespace, flowId, null, null, duration, null);
}

public Execution runOne(String namespace, String flowId, Integer revision, BiFunction<Flow, Execution, Map<String, Object>> inputs, Duration duration) throws TimeoutException {
return this.runOne(namespace, flowId, revision, inputs, duration, null);
}

public Execution runOne(String namespace, String flowId, Integer revision, BiFunction<Flow, Execution, Map<String, Object>> inputs, Duration duration, Map<String, String> labels) throws TimeoutException {
return this.runOne(
flowRepository
.findById(namespace, flowId, revision != null ? Optional.of(revision) : Optional.empty())
.orElseThrow(() -> new IllegalArgumentException("Unable to find flow '" + flowId + "'")),
inputs,
duration
);
duration,
labels);
}

public Execution runOne(Flow flow, BiFunction<Flow, Execution, Map<String, Object>> inputs) throws TimeoutException {
return this.runOne(flow, inputs, null);
return this.runOne(flow, inputs, null, null);
}

public Execution runOne(Flow flow, BiFunction<Flow, Execution, Map<String, Object>> inputs, Duration duration) throws TimeoutException {
return this.runOne(flow, inputs, duration, null);
}

public Execution runOne(Flow flow, BiFunction<Flow, Execution, Map<String, Object>> inputs, Duration duration, Map<String, String> labels) throws TimeoutException {
if (duration == null) {
duration = Duration.ofSeconds(15);
}

Execution execution = this.newExecution(flow, inputs);
Execution execution = this.newExecution(flow, inputs, labels);

return this.awaitExecution(isTerminatedExecution(execution, flow), () -> {
this.executionQueue.emit(execution);
Expand All @@ -320,7 +328,7 @@ public Execution runOneUntilPaused(Flow flow, BiFunction<Flow, Execution, Map<St
duration = Duration.ofSeconds(15);
}

Execution execution = this.newExecution(flow, inputs);
Execution execution = this.newExecution(flow, inputs, null);

return this.awaitExecution(isPausedExecution(execution), () -> {
this.executionQueue.emit(execution);
Expand Down Expand Up @@ -369,7 +377,7 @@ private Predicate<Execution> isTerminatedChildExecution(Execution parentExecutio
return e -> e.getParentId() != null && e.getParentId().equals(parentExecution.getId()) && conditionService.isTerminatedWithListeners(flow, e);
}

public Execution newExecution(Flow flow, BiFunction<Flow, Execution, Map<String, Object>> inputs) {
public Execution newExecution(Flow flow, BiFunction<Flow, Execution, Map<String, Object>> inputs, Map<String, String> labels) {
Execution execution = Execution.builder()
.id(IdUtils.create())
.namespace(flow.getNamespace())
Expand All @@ -382,6 +390,10 @@ public Execution newExecution(Flow flow, BiFunction<Flow, Execution, Map<String,
execution = execution.withInputs(inputs.apply(flow, execution));
}

if (labels != null) {
execution = execution.withLabels(labels);
}

return execution;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ public PurgeResult purge(
flowId,
null,
endDate,
state
state,
null
)
.map(execution -> {
PurgeResult.PurgeResultBuilder<?, ?> builder = PurgeResult.builder();
Expand Down
19 changes: 16 additions & 3 deletions core/src/main/java/io/kestra/core/tasks/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,17 @@ public class Flow extends Task implements RunnableTask<Flow.Output> {
private Integer revision;

@Schema(
title = "The input to pass to the triggered flow"
title = "The inputs to pass to the triggered flow"
)
@PluginProperty(dynamic = true)
private Map<String, String> inputs;

@Schema(
title = "The labels to pass to the triggered flow execution"
)
@PluginProperty(dynamic = true)
private Map<String, String> labels;

@Builder.Default
@Schema(
title = "Wait the end of the execution.",
Expand Down Expand Up @@ -112,6 +118,13 @@ public Execution createExecution(RunContext runContext, FlowExecutorInterface fl
}
}

Map<String, String> labels = new HashMap<>();
if (this.labels != null) {
for (Map.Entry<String, String> entry: this.labels.entrySet()) {
labels.put(entry.getKey(), runContext.render(entry.getValue()));
}
}

Map<String, String> flowVars = (Map<String, String>) runContext.getVariables().get("flow");

io.kestra.core.models.flows.Flow flow = flowExecutorInterface.findByIdFromFlowTask(
Expand All @@ -130,8 +143,8 @@ public Execution createExecution(RunContext runContext, FlowExecutorInterface fl
return runnerUtils
.newExecution(
flow,
(f, e) -> runnerUtils.typedInputs(f, e, inputs)
)
(f, e) -> runnerUtils.typedInputs(f, e, inputs),
labels)
.withTrigger(ExecutionTrigger.builder()
.id(this.getId())
.type(this.getType())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ protected boolean isTriggerChild(Flow parent, Flow child) {
List<AbstractTrigger> triggers = ListUtils.emptyOnNull(child.getTriggers());

// simulated execution
Execution execution = runnerUtils.newExecution(parent, (f, e) -> null);
Execution execution = runnerUtils.newExecution(parent, (f, e) -> null, null);

// keep only flow trigger
List<io.kestra.core.models.triggers.types.Flow> flowTriggers = triggers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.kestra.core.models.flows.State;

import java.util.Collections;
import java.util.Map;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -148,4 +149,14 @@ void originalId() {
);
assertThat(restart2.getOriginalId(), is(execution.getId()));
}

@Test
void labels() {
final Execution execution = Execution.builder()
.labels(Map.of("test", "test-value"))
.build();

assertThat(execution.getLabels().size(), is(1));
assertThat(execution.getLabels().get("test"), is("test-value"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,23 +106,23 @@ protected void inject() {
protected void find() {
inject();

ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), null, null, null, null, null, null);
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), null, null, null, null, null, null, null);
assertThat(executions.getTotal(), is(28L));
assertThat(executions.size(), is(10));

executions = executionRepository.find(Pageable.from(1, 10), null, null, null, null, null, List.of(State.Type.RUNNING, State.Type.FAILED));
executions = executionRepository.find(Pageable.from(1, 10), null, null, null, null, null, List.of(State.Type.RUNNING, State.Type.FAILED), null);
assertThat(executions.getTotal(), is(8L));
}

@Test
protected void findWithSort() {
inject();

ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.desc("id"))), null, null, null, null, null, null);
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.desc("id"))), null, null, null, null, null, null, null);
assertThat(executions.getTotal(), is(28L));
assertThat(executions.size(), is(10));

executions = executionRepository.find(Pageable.from(1, 10), null, null, null, null, null, List.of(State.Type.RUNNING, State.Type.FAILED));
executions = executionRepository.find(Pageable.from(1, 10), null, null, null, null, null, List.of(State.Type.RUNNING, State.Type.FAILED), null);
assertThat(executions.getTotal(), is(8L));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import jakarta.inject.Singleton;
import org.jooq.Condition;

import java.util.List;
import java.util.Map;

@Singleton
@H2RepositoryEnabled
Expand All @@ -19,7 +19,7 @@ public H2ExecutionRepository(ApplicationContext applicationContext, AbstractJdbc
}

@Override
protected Condition findCondition(String query) {
return this.jdbcRepository.fullTextCondition(List.of("fulltext"), query);
protected Condition findCondition(String query, Map<String, String> labels) {
return H2ExecutionRepositoryService.findCondition(this.jdbcRepository, query, labels);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.kestra.repository.h2;

import io.kestra.core.models.executions.Execution;
import io.kestra.jdbc.AbstractJdbcRepository;
import org.jooq.Condition;
import org.jooq.Field;
import org.jooq.impl.DSL;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public abstract class H2ExecutionRepositoryService {
public static Condition findCondition(AbstractJdbcRepository<Execution> jdbcRepository, String query, Map<String, String> labels) {
List<Condition> conditions = new ArrayList<>();

if (query != null) {
conditions.add(jdbcRepository.fullTextCondition(List.of("fulltext"), query));
}

if (labels != null) {
labels.forEach((key, value) -> {
Field<String> field = DSL.field("JQ_STRING(\"value\", '.labels." + key + "')", String.class);

if (value == null) {
conditions.add(field.isNotNull());
} else {
conditions.add(field.eq(value));
}
});
}

return conditions.size() == 0 ? DSL.trueCondition() : DSL.and(conditions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import jakarta.inject.Singleton;
import org.jooq.Condition;

import java.util.Arrays;
import java.util.Map;

@Singleton
@MysqlRepositoryEnabled
Expand All @@ -19,7 +19,7 @@ public MysqlExecutionRepository(ApplicationContext applicationContext, AbstractJ
}

@Override
protected Condition findCondition(String query) {
return this.jdbcRepository.fullTextCondition(Arrays.asList("namespace", "flow_id", "id"), query);
protected Condition findCondition(String query, Map<String, String> labels) {
return MysqlExecutionRepositoryService.findCondition(this.jdbcRepository, query, labels);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.kestra.repository.mysql;

import io.kestra.core.models.executions.Execution;
import io.kestra.jdbc.AbstractJdbcRepository;
import org.jooq.Condition;
import org.jooq.Field;
import org.jooq.impl.DSL;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

public abstract class MysqlExecutionRepositoryService {
public static Condition findCondition(AbstractJdbcRepository<Execution> jdbcRepository, String query, Map<String, String> labels) {
List<Condition> conditions = new ArrayList<>();

if (query != null) {
conditions.add(jdbcRepository.fullTextCondition(Arrays.asList("namespace", "id"), query));
}

if (labels != null) {
labels.forEach((key, value) -> {
Field<String> field = DSL.field("JSON_VALUE(value, '$.labels." + key + "' NULL ON EMPTY)", String.class);

if (value == null) {
conditions.add(field.isNotNull());
} else {
conditions.add(field.eq(value));
}
});
}

return conditions.size() == 0 ? DSL.trueCondition() : DSL.and(conditions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
import io.micronaut.context.ApplicationContext;
Expand All @@ -12,8 +11,8 @@
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@Singleton
Expand All @@ -37,7 +36,7 @@ protected Condition statesFilter(List<State.Type> state) {
}

@Override
protected Condition findCondition(String query) {
return this.jdbcRepository.fullTextCondition(Collections.singletonList("fulltext"), query);
protected Condition findCondition(String query, Map<String, String> labels) {
return PostgresExecutionRepositoryService.findCondition(this.jdbcRepository, query, labels);
}
}
Loading

0 comments on commit e467c2b

Please sign in to comment.