diff --git a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraMetadataDAO.java b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraMetadataDAO.java index 10bbe2dd51..0187034b51 100644 --- a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraMetadataDAO.java +++ b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraMetadataDAO.java @@ -24,10 +24,12 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.netflix.conductor.annotations.Trace; +import com.netflix.conductor.annotations.VisibleForTesting; import com.netflix.conductor.cassandra.config.CassandraProperties; import com.netflix.conductor.cassandra.util.Statements; import com.netflix.conductor.common.metadata.tasks.TaskDef; @@ -42,7 +44,6 @@ import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.annotations.VisibleForTesting; import static com.netflix.conductor.cassandra.util.Constants.TASK_DEFINITION_KEY; import static com.netflix.conductor.cassandra.util.Constants.TASK_DEFS_KEY; @@ -292,8 +293,8 @@ public List getAllWorkflowDefs() { row -> { String defNameVersion = row.getString(WORKFLOW_DEF_NAME_VERSION_KEY); - String[] tokens = defNameVersion.split(INDEX_DELIMITER); - return getWorkflowDef(tokens[0], Integer.parseInt(tokens[1])) + var nameVersion = getWorkflowNameAndVersion(defNameVersion); + return getWorkflowDef(nameVersion.getLeft(), nameVersion.getRight()) .orElse(null); }) .filter(Objects::nonNull) @@ -404,4 +405,27 @@ private String insertOrUpdateTaskDef(TaskDef taskDef) { String getWorkflowDefIndexValue(String name, int version) { return name + INDEX_DELIMITER + version; } + + @VisibleForTesting + ImmutablePair getWorkflowNameAndVersion(String nameVersionStr) { + int lastIndexOfDelimiter = nameVersionStr.lastIndexOf(INDEX_DELIMITER); + + if (lastIndexOfDelimiter == -1) { + throw new IllegalStateException( + nameVersionStr + + " is not in the 'workflowName" + + INDEX_DELIMITER + + "version' pattern."); + } + + String workflowName = nameVersionStr.substring(0, lastIndexOfDelimiter); + String versionStr = nameVersionStr.substring(lastIndexOfDelimiter + 1); + + try { + return new ImmutablePair<>(workflowName, Integer.parseInt(versionStr)); + } catch (NumberFormatException e) { + throw new IllegalStateException( + versionStr + " in " + nameVersionStr + " is not a valid number."); + } + } } diff --git a/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraMetadataDAOSpec.groovy b/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraMetadataDAOSpec.groovy index 5c06efe550..afca61fa7f 100644 --- a/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraMetadataDAOSpec.groovy +++ b/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraMetadataDAOSpec.groovy @@ -69,7 +69,7 @@ class CassandraMetadataDAOSpec extends CassandraSpec { defOptional.get() == workflowDef when: // modify the definition - workflowDef.setOwnerEmail("junit@test.com") + workflowDef.setOwnerEmail("test@junit.com") metadataDAO.updateWorkflowDef(workflowDef) defOptional = metadataDAO.getWorkflowDef(name, higherVersion) @@ -138,4 +138,39 @@ class CassandraMetadataDAOSpec extends CassandraSpec { // fetch deleted task def metadataDAO.getTaskDef(task2Name) == null } + + def "parse index string"() { + expect: + def pair = metadataDAO.getWorkflowNameAndVersion(nameVersionStr) + pair.left == workflowName + pair.right == version + + where: + nameVersionStr << ['name/1', 'namespace/name/3', '/namespace/name_with_lodash/2', 'name//4', 'name-with$%/895'] + workflowName << ['name', 'namespace/name', '/namespace/name_with_lodash', 'name/', 'name-with$%'] + version << [1, 3, 2, 4, 895] + } + + def "parse index string - incorrect values"() { + when: + metadataDAO.getWorkflowNameAndVersion("name_with_no_version") + + then: + def ex = thrown(IllegalStateException.class) + println(ex.message) + + when: + metadataDAO.getWorkflowNameAndVersion("name_with_no_version/") + + then: + ex = thrown(IllegalStateException.class) + println(ex.message) + + when: + metadataDAO.getWorkflowNameAndVersion("name/non_number_version") + + then: + ex = thrown(IllegalStateException.class) + println(ex.message) + } }