Skip to content

Commit

Permalink
Add support for querying externalExecutionId
Browse files Browse the repository at this point in the history
resolves #863
  • Loading branch information
cppwfs committed Sep 11, 2023
1 parent 9b4d7f0 commit 40566a5
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,6 +46,15 @@ public interface TaskExplorer {
*/
Page<TaskExecution> findRunningTaskExecutions(String taskName, Pageable pageable);

/**
* Retrieve a collection of taskExecutions that contain the provided external
* execution id.
* @param externalExecutionId the external execution id of the tasks
* @param pageable the constraints for the search
* @return the set of task executions for tasks with the external execution id
*/
Page<TaskExecution> findTaskExecutionsByExecutionId(String externalExecutionId, Pageable pageable);

/**
* Retrieve a list of available task names.
* @return the set of task names that have been executed
Expand All @@ -71,6 +80,13 @@ public interface TaskExplorer {
*/
long getRunningTaskExecutionCount();

/**
* Retrieves current number of task executions by external executionId.
* @param externalExecutionId The externalExecutionId to be searched.
* @return current number of task executions for a specific externalExecutionId.
*/
long getTaskExecutionCountByExternalExecutionId(String externalExecutionId);

/**
* Get a collection/page of executions.
* @param taskName the name of the task to be searched
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2021 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -85,6 +85,11 @@ public class JdbcTaskExecutionDao implements TaskExecutionDao {
*/
public static final String TASK_NAME_WHERE_CLAUSE = "where TASK_NAME = :taskName ";

/**
* WHERE clause for external execution id.
*/
public static final String EXTERNAL_EXECUTION_ID_WHERE_CLAUSE = "where EXTERNAL_EXECUTION_ID = :externalExecutionId ";

private static final String SAVE_TASK_EXECUTION = "INSERT into %PREFIX%EXECUTION"
+ "(TASK_EXECUTION_ID, EXIT_CODE, START_TIME, TASK_NAME, LAST_UPDATED, EXTERNAL_EXECUTION_ID, PARENT_EXECUTION_ID)"
+ "values (:taskExecutionId, :exitCode, :startTime, "
Expand Down Expand Up @@ -126,6 +131,9 @@ public class JdbcTaskExecutionDao implements TaskExecutionDao {
private static final String TASK_EXECUTION_COUNT_BY_NAME = "SELECT COUNT(*) FROM "
+ "%PREFIX%EXECUTION where TASK_NAME = :taskName";

private static final String TASK_EXECUTION_COUNT_BY_EXTERNAL_EXECUTION_ID = "SELECT COUNT(*) FROM "
+ "%PREFIX%EXECUTION where EXTERNAL_EXECUTION_ID = :externalExecutionId";

private static final String RUNNING_TASK_EXECUTION_COUNT_BY_NAME = "SELECT COUNT(*) FROM "
+ "%PREFIX%EXECUTION where TASK_NAME = :taskName AND END_TIME IS NULL ";

Expand Down Expand Up @@ -407,6 +415,27 @@ public Page<TaskExecution> findRunningTaskExecutions(String taskName, Pageable p
new MapSqlParameterSource("taskName", taskName), getRunningTaskExecutionCountByTaskName(taskName));
}

@Override
public Page<TaskExecution> findTaskExecutionsByExternalExecutionId(String externalExecutionId, Pageable pageable) {
return queryForPageableResults(pageable, SELECT_CLAUSE, FROM_CLAUSE, EXTERNAL_EXECUTION_ID_WHERE_CLAUSE,
new MapSqlParameterSource("externalExecutionId", externalExecutionId),
getTaskExecutionCountByExternalExecutionId(externalExecutionId));
}

@Override
public long getTaskExecutionCountByExternalExecutionId(String externalExecutionId) {
final MapSqlParameterSource queryParameters = new MapSqlParameterSource().addValue("externalExecutionId",
externalExecutionId, Types.VARCHAR);

try {
return this.jdbcTemplate.queryForObject(getQuery(TASK_EXECUTION_COUNT_BY_EXTERNAL_EXECUTION_ID),
queryParameters, Long.class);
}
catch (EmptyResultDataAccessException e) {
return 0;
}
}

@Override
public Page<TaskExecution> findTaskExecutionsByName(String taskName, Pageable pageable) {
return queryForPageableResults(pageable, SELECT_CLAUSE, FROM_CLAUSE, TASK_NAME_WHERE_CLAUSE,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -128,6 +128,17 @@ public long getTaskExecutionCountByTaskName(String taskName) {
return count;
}

@Override
public long getTaskExecutionCountByExternalExecutionId(String externalExecutionId) {
int count = 0;
for (Map.Entry<Long, TaskExecution> entry : this.taskExecutions.entrySet()) {
if (entry.getValue().getExternalExecutionId().equals(externalExecutionId)) {
count++;
}
}
return count;
}

@Override
public long getRunningTaskExecutionCountByTaskName(String taskName) {
int count = 0;
Expand Down Expand Up @@ -166,6 +177,18 @@ public Page<TaskExecution> findRunningTaskExecutions(String taskName, Pageable p
return getPageFromList(new ArrayList<>(result), pageable, getRunningTaskExecutionCountByTaskName(taskName));
}

@Override
public Page<TaskExecution> findTaskExecutionsByExternalExecutionId(String externalExecutionId, Pageable pageable) {
Set<TaskExecution> result = getTaskExecutionTreeSet();
for (Map.Entry<Long, TaskExecution> entry : this.taskExecutions.entrySet()) {
if (entry.getValue().getExternalExecutionId().equals(externalExecutionId)) {
result.add(entry.getValue());
}
}
return getPageFromList(new ArrayList<>(result), pageable,
getTaskExecutionCountByExternalExecutionId(externalExecutionId));
}

@Override
public Page<TaskExecution> findTaskExecutionsByName(String taskName, Pageable pageable) {
Set<TaskExecution> filteredSet = getTaskExecutionTreeSet();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -150,6 +150,24 @@ void completeTaskExecution(long executionId, Integer exitCode, LocalDateTime end
*/
Page<TaskExecution> findRunningTaskExecutions(String taskName, Pageable pageable);

/**
* Retrieve a collection of taskExecutions that contain the provided external
* execution id.
* @param externalExecutionId the external execution id of the tasks
* @param pageable the constraints for the search
* @return the set of task executions for tasks with the externalExecutionId
*/
Page<TaskExecution> findTaskExecutionsByExternalExecutionId(String externalExecutionId, Pageable pageable);

/**
* Retrieves current number of task executions for a externalTaskExecutionId.
* @param externalExecutionId the external execution id of the task to search for in
* the repository.
* @return current number of task executions for the externalExecutionId.
*/

long getTaskExecutionCountByExternalExecutionId(String externalExecutionId);

/**
* Retrieves a subset of task executions by task name, start location and size.
* @param taskName the name of the task to search for in the repository.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,6 +59,11 @@ public Page<TaskExecution> findRunningTaskExecutions(String taskName, Pageable p
return this.taskExecutionDao.findRunningTaskExecutions(taskName, pageable);
}

@Override
public Page<TaskExecution> findTaskExecutionsByExecutionId(String externalExecutionId, Pageable pageable) {
return this.taskExecutionDao.findTaskExecutionsByExternalExecutionId(externalExecutionId, pageable);
}

@Override
public List<String> getTaskNames() {
return this.taskExecutionDao.getTaskNames();
Expand All @@ -79,6 +84,11 @@ public long getRunningTaskExecutionCount() {
return this.taskExecutionDao.getRunningTaskExecutionCount();
}

@Override
public long getTaskExecutionCountByExternalExecutionId(String externalExecutionId) {
return this.taskExecutionDao.getTaskExecutionCountByExternalExecutionId(externalExecutionId);
}

@Override
public Page<TaskExecution> findTaskExecutionsByName(String taskName, Pageable pageable) {
return this.taskExecutionDao.findTaskExecutionsByName(taskName, pageable);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -151,7 +151,7 @@ public void findRunningTasks(DaoType testType) {
final int COMPLETE_COUNT = 5;

Map<Long, TaskExecution> expectedResults = new HashMap<>();
// Store completed jobs
// Store completed task executions
int i = 0;
for (; i < COMPLETE_COUNT; i++) {
createAndSaveTaskExecution(i);
Expand All @@ -178,6 +178,55 @@ public void findRunningTasks(DaoType testType) {
}
}

@ParameterizedTest
@MethodSource("data")
public void findTasksByExternalExecutionId(DaoType testType) {
testDefaultContext(testType);
Map<Long, TaskExecution> sampleDataSet = createSampleDataSet(33);
sampleDataSet.values().forEach(taskExecution -> {
Page<TaskExecution> taskExecutionsByExecutionId = this.taskExplorer
.findTaskExecutionsByExecutionId(taskExecution.getExternalExecutionId(), PageRequest.of(0, 5));
assertThat(taskExecutionsByExecutionId.getTotalElements()).isEqualTo(1);
assertThat(this.taskExplorer
.getTaskExecutionCountByExternalExecutionId(taskExecution.getExternalExecutionId())).isEqualTo(1);
TaskExecution resultTaskExecution = taskExecutionsByExecutionId.getContent().get(0);
assertThat(resultTaskExecution.getExecutionId()).isEqualTo(taskExecution.getExecutionId());
});
}

@ParameterizedTest
@MethodSource("data")
public void findTasksByExternalExecutionIdMultipleEntry(DaoType testType) {
testDefaultContext(testType);

testDefaultContext(testType);
final int SAME_EXTERNAL_ID_COUNT = 2;
final int UNIQUE_COUNT = 3;

Map<Long, TaskExecution> expectedResults = new HashMap<>();
// Store task executions each with a unique external execution id
int i = 0;
for (; i < UNIQUE_COUNT; i++) {
createAndSaveTaskExecution(i);
}
// Create task execution with same external execution id
for (; i < (UNIQUE_COUNT + SAME_EXTERNAL_ID_COUNT); i++) {
TaskExecution expectedTaskExecution = this.taskRepository.createTaskExecution(getSimpleTaskExecution());
expectedResults.put(expectedTaskExecution.getExecutionId(), expectedTaskExecution);
}
Pageable pageable = PageRequest.of(0, 10);
Page<TaskExecution> resultSet = this.taskExplorer.findTaskExecutionsByExecutionId(EXTERNAL_EXECUTION_ID,
pageable);
assertThat(resultSet.getTotalElements()).isEqualTo(SAME_EXTERNAL_ID_COUNT);
List<TaskExecution> taskExecutions = resultSet.getContent();
taskExecutions.forEach(taskExecution -> {
assertThat(expectedResults.keySet()).contains(taskExecution.getExecutionId());
});
assertThat(this.taskExplorer.getTaskExecutionCountByExternalExecutionId(EXTERNAL_EXECUTION_ID))
.isEqualTo(SAME_EXTERNAL_ID_COUNT);

}

@ParameterizedTest
@MethodSource("data")
public void findTasksByName(DaoType testType) {
Expand All @@ -186,7 +235,7 @@ public void findTasksByName(DaoType testType) {
final int COMPLETE_COUNT = 7;

Map<Long, TaskExecution> expectedResults = new HashMap<>();
// Store completed jobs
// Store completed task executions
for (int i = 0; i < COMPLETE_COUNT; i++) {
createAndSaveTaskExecution(i);
}
Expand Down

0 comments on commit 40566a5

Please sign in to comment.