Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

New workflow metadata endpoint #3670

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ public List<WorkflowDef> getAllWorkflowDefs() {
return cassandraMetadataDAO.getAllWorkflowDefs();
}

@Override
public List<WorkflowDef> getAllWorkflowDefsLatestVersions() {
return cassandraMetadataDAO.getAllWorkflowDefsLatestVersions();
}

private List<TaskDef> refreshTaskDefsCache() {
try {
Cache taskDefsCache = cacheManager.getCache(TASK_DEF_CACHE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@

import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.stream.Collectors;

import org.apache.commons.lang3.tuple.ImmutablePair;
Expand Down Expand Up @@ -60,8 +63,10 @@ public class CassandraMetadataDAO extends CassandraBaseDAO implements MetadataDA
private final PreparedStatement insertTaskDefStatement;

private final PreparedStatement selectWorkflowDefStatement;

private final PreparedStatement selectAllWorkflowDefVersionsByNameStatement;
private final PreparedStatement selectAllWorkflowDefsStatement;
private final PreparedStatement selectAllWorkflowDefsLatestVersionsStatement;
private final PreparedStatement selectTaskDefStatement;
private final PreparedStatement selectAllTaskDefsStatement;

Expand Down Expand Up @@ -97,6 +102,9 @@ public CassandraMetadataDAO(
this.selectAllWorkflowDefsStatement =
session.prepare(statements.getSelectAllWorkflowDefsStatement())
.setConsistencyLevel(properties.getReadConsistencyLevel());
this.selectAllWorkflowDefsLatestVersionsStatement =
session.prepare(statements.getSelectAllWorkflowDefsLatestVersionsStatement())
.setConsistencyLevel(properties.getReadConsistencyLevel());
this.selectTaskDefStatement =
session.prepare(statements.getSelectTaskDefStatement())
.setConsistencyLevel(properties.getReadConsistencyLevel());
Expand Down Expand Up @@ -289,6 +297,48 @@ public List<WorkflowDef> getAllWorkflowDefs() {
}
}

@Override
public List<WorkflowDef> getAllWorkflowDefsLatestVersions() {
try {
ResultSet resultSet =
session.execute(
selectAllWorkflowDefsLatestVersionsStatement.bind(
WORKFLOW_DEF_INDEX_KEY));
List<Row> rows = resultSet.all();
if (rows.size() == 0) {
LOGGER.info("No workflow definitions were found.");
return Collections.EMPTY_LIST;
}
Map<String, PriorityQueue<WorkflowDef>> allWorkflowDefs = new HashMap<>();

for (Row row : rows) {
String defNameVersion = row.getString(WORKFLOW_DEF_NAME_VERSION_KEY);
var nameVersion = getWorkflowNameAndVersion(defNameVersion);
WorkflowDef def =
getWorkflowDef(nameVersion.getLeft(), nameVersion.getRight()).orElse(null);
if (def == null) {
continue;
}
if (allWorkflowDefs.get(def.getName()) == null) {
allWorkflowDefs.put(
def.getName(),
new PriorityQueue<>(
(WorkflowDef w1, WorkflowDef w2) ->
Integer.compare(w2.getVersion(), w1.getVersion())));
}
allWorkflowDefs.get(def.getName()).add(def);
}
return allWorkflowDefs.values().stream()
.map(PriorityQueue::poll)
.collect(Collectors.toList());
} catch (DriverException e) {
Monitors.error(CLASS_NAME, "getAllWorkflowDefsLatestVersions");
String errorMsg = "Error retrieving all workflow defs latest versions";
LOGGER.error(errorMsg, e);
throw new TransientException(errorMsg, e);
}
}

private TaskDef getTaskDefFromDB(String name) {
try {
ResultSet resultSet = session.execute(selectTaskDefStatement.bind(name));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,14 @@ public String getSelectAllWorkflowDefsStatement() {
.getQueryString();
}

public String getSelectAllWorkflowDefsLatestVersionsStatement() {
return QueryBuilder.select()
.all()
.from(keyspace, TABLE_WORKFLOW_DEFS_INDEX)
.where(eq(WORKFLOW_DEF_INDEX_KEY, bindMarker()))
.getQueryString();
}

/**
* @return cql query statement to fetch a task definition by name from the "task_definitions"
* table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,36 @@ class CassandraMetadataDAOSpec extends CassandraSpec {

}

def "Get All WorkflowDef"() {
when:
metadataDAO.removeWorkflowDef("workflow_def_1", 1)
WorkflowDef workflowDef = new WorkflowDef()
workflowDef.setName("workflow_def_1")
workflowDef.setVersion(1)
workflowDef.setOwnerEmail("test@junit.com")
metadataDAO.createWorkflowDef(workflowDef)

workflowDef.setName("workflow_def_2")
metadataDAO.createWorkflowDef(workflowDef)
workflowDef.setVersion(2)
metadataDAO.createWorkflowDef(workflowDef)

workflowDef.setName("workflow_def_3")
workflowDef.setVersion(1)
metadataDAO.createWorkflowDef(workflowDef)
workflowDef.setVersion(2)
metadataDAO.createWorkflowDef(workflowDef)
workflowDef.setVersion(3)
metadataDAO.createWorkflowDef(workflowDef)

then: // fetch the workflow definition
def allDefsLatestVersions = metadataDAO.getAllWorkflowDefsLatestVersions()
Map<String, WorkflowDef> allDefsMap = allDefsLatestVersions.collectEntries {wfDef -> [wfDef.getName(), wfDef]}
allDefsMap.get("workflow_def_1").getVersion() == 1
allDefsMap.get("workflow_def_2").getVersion() == 2
allDefsMap.get("workflow_def_3").getVersion() == 3
}

def "parse index string"() {
expect:
def pair = metadataDAO.getWorkflowNameAndVersion(nameVersionStr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;

import com.sun.jersey.api.client.ClientHandler;
import com.sun.jersey.api.client.GenericType;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.api.client.filter.ClientFilter;

public class MetadataClient extends ClientBase {

private static final GenericType<List<WorkflowDef>> workflowDefList =
new GenericType<List<WorkflowDef>>() {};

/** Creates a default metadata client */
public MetadataClient() {
this(new DefaultClientConfig(), new DefaultConductorClientConfiguration(), null);
Expand Down Expand Up @@ -122,6 +126,12 @@ public WorkflowDef getWorkflowDef(String name, Integer version) {
name);
}

/** */
public List<WorkflowDef> getAllWorkflowsWithLatestVersions() {
return getForEntity(
"metadata/workflow/latest-versions", null, workflowDefList, (Object) null);
}

/**
* Removes the workflow definition of a workflow from the conductor server. It does not remove
* associated workflows. Use with caution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
package com.netflix.conductor.client.http

import com.netflix.conductor.client.exception.ConductorClientException
import com.netflix.conductor.common.metadata.workflow.WorkflowDef

import com.sun.jersey.api.client.ClientResponse
import spock.lang.Subject

class MetadataClientSpec extends ClientSpecification {
Expand Down Expand Up @@ -75,4 +77,18 @@ class MetadataClientSpec extends ClientSpecification {
then:
thrown(IllegalArgumentException.class)
}

def "workflow get all definitions latest version"() {
given:
List<WorkflowDef> result = new ArrayList<WorkflowDef>()
URI uri = createURI("metadata/workflow/latest-versions")

when:
metadataClient.getAllWorkflowsWithLatestVersions()

then:
1 * requestHandler.get(uri) >> Mock(ClientResponse.class) {
getEntity(_) >> result
}
}
}
5 changes: 5 additions & 0 deletions core/src/main/java/com/netflix/conductor/dao/MetadataDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,9 @@ public interface MetadataDAO {
* @return List of all the workflow definitions
*/
List<WorkflowDef> getAllWorkflowDefs();

/**
* @return List the latest versions of the workflow definitions
*/
List<WorkflowDef> getAllWorkflowDefsLatestVersions();
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,6 @@ void removeEventHandlerStatus(
List<EventHandler> getEventHandlersForEvent(
@NotEmpty(message = "EventName cannot be null or empty") String event,
boolean activeOnly);

List<WorkflowDef> getWorkflowDefsLatestVersions();
}
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,11 @@ public List<EventHandler> getEventHandlersForEvent(String event, boolean activeO
return eventHandlerDAO.getEventHandlersForEvent(event, activeOnly);
}

@Override
public List<WorkflowDef> getWorkflowDefsLatestVersions() {
return metadataDAO.getAllWorkflowDefsLatestVersions();
}

public Map<String, ? extends Iterable<WorkflowDefSummary>> getWorkflowNamesAndVersions() {
List<WorkflowDef> workflowDefs = metadataDAO.getAllWorkflowDefs();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,27 @@ public List<WorkflowDef> getAllWorkflowDefs() {
return workflows;
}

@Override
public List<WorkflowDef> getAllWorkflowDefsLatestVersions() {
List<WorkflowDef> workflows = new LinkedList<>();

// Get all definitions latest versions from WORKFLOW_DEF_NAMES
recordRedisDaoRequests("getAllWorkflowLatestVersionsDefs");
Set<String> wfNames = jedisProxy.smembers(nsKey(WORKFLOW_DEF_NAMES));
int size = 0;
// Place all workflows into the Priority Queue. The PQ will allow us to grab the latest
// version of the workflows.
for (String wfName : wfNames) {
WorkflowDef def = getLatestWorkflowDef(wfName).orElse(null);
if (def != null) {
workflows.add(def);
size += def.toString().length();
}
}
recordRedisDaoPayloadSize("getAllWorkflowLatestVersionsDefs", size, "n/a", "n/a");
return workflows;
}

private void _createOrUpdate(WorkflowDef workflowDef) {
// First set the workflow def
jedisProxy.hset(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.junit.Before;
Expand Down Expand Up @@ -160,6 +162,45 @@ public void testWorkflowDefOperations() {
assertEquals(workflow.getVersion(), 3);
}

@Test
public void testGetAllWorkflowDefsLatestVersions() {
WorkflowDef def = new WorkflowDef();
def.setName("test1");
def.setVersion(1);
def.setDescription("description");
def.setCreatedBy("unit_test");
def.setCreateTime(1L);
def.setOwnerApp("ownerApp");
def.setUpdatedBy("unit_test2");
def.setUpdateTime(2L);
redisMetadataDAO.createWorkflowDef(def);

def.setName("test2");
redisMetadataDAO.createWorkflowDef(def);
def.setVersion(2);
redisMetadataDAO.createWorkflowDef(def);

def.setName("test3");
def.setVersion(1);
redisMetadataDAO.createWorkflowDef(def);
def.setVersion(2);
redisMetadataDAO.createWorkflowDef(def);
def.setVersion(3);
redisMetadataDAO.createWorkflowDef(def);

// Placed the values in a map because they might not be stored in order of defName.
// To test, needed to confirm that the versions are correct for the definitions.
Map<String, WorkflowDef> allMap =
redisMetadataDAO.getAllWorkflowDefsLatestVersions().stream()
.collect(Collectors.toMap(WorkflowDef::getName, Function.identity()));

assertNotNull(allMap);
assertEquals(3, allMap.size());
assertEquals(1, allMap.get("test1").getVersion());
assertEquals(2, allMap.get("test2").getVersion());
assertEquals(3, allMap.get("test3").getVersion());
}

@Test(expected = NotFoundException.class)
public void removeInvalidWorkflowDef() {
redisMetadataDAO.removeWorkflowDef("hello", 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ public List<WorkflowDef> getAll() {
return metadataService.getWorkflowNamesAndVersions();
}

@Operation(summary = "Returns only the latest version of all workflow definitions")
@GetMapping("/workflow/latest-versions")
public List<WorkflowDef> getAllWorkflowsWithLatestVersions() {
return metadataService.getWorkflowDefsLatestVersions();
}

@DeleteMapping("/workflow/{name}/{version}")
@Operation(
summary =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,20 @@ public void testGetAllWorkflowDef() {
assertEquals(listOfWorkflowDef, metadataResource.getAll());
}

@Test
public void testGetAllWorkflowDefLatestVersions() {
WorkflowDef workflowDef = new WorkflowDef();
workflowDef.setName("test");
workflowDef.setVersion(1);
workflowDef.setDescription("test");

List<WorkflowDef> listOfWorkflowDef = new ArrayList<>();
listOfWorkflowDef.add(workflowDef);

when(mockMetadataService.getWorkflowDefsLatestVersions()).thenReturn(listOfWorkflowDef);
assertEquals(listOfWorkflowDef, metadataResource.getAllWorkflowsWithLatestVersions());
}

@Test
public void testUnregisterWorkflowDef() throws Exception {
metadataResource.unregisterWorkflowDef("test", 1);
Expand Down
Loading