Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
fix integ tests
Browse files Browse the repository at this point in the history
  • Loading branch information
apanicker-nflx committed Jan 27, 2022
1 parent 6eaf508 commit 9830fca
Show file tree
Hide file tree
Showing 43 changed files with 143 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class CassandraExecutionDAOSpec extends CassandraSpec {
workflow.setWorkflowId(workflowId)
workflow.setInput(new HashMap<>())
workflow.setStatus(WorkflowModel.Status.RUNNING)
workflow.setCreatedTime(System.currentTimeMillis())
workflow.setCreateTime(System.currentTimeMillis())

when:
// create a new workflow in the datastore
Expand Down Expand Up @@ -120,7 +120,7 @@ class CassandraExecutionDAOSpec extends CassandraSpec {
given: 'we create a workflow'
String workflowId = IDGenerator.generate()
WorkflowDef workflowDef = new WorkflowDef(name: 'def1', version: 1)
WorkflowModel workflow = new WorkflowModel(workflowDefinition: workflowDef, workflowId: workflowId, input: new HashMap(), status: WorkflowModel.Status.RUNNING, createdTime: System.currentTimeMillis())
WorkflowModel workflow = new WorkflowModel(workflowDefinition: workflowDef, workflowId: workflowId, input: new HashMap(), status: WorkflowModel.Status.RUNNING, createTime: System.currentTimeMillis())
executionDAO.createWorkflow(workflow)

and: 'create tasks for this workflow'
Expand Down Expand Up @@ -192,7 +192,7 @@ class CassandraExecutionDAOSpec extends CassandraSpec {
given: 'we create a workflow'
String workflowId = IDGenerator.generate()
WorkflowDef workflowDef = new WorkflowDef(name: 'def1', version: 1)
WorkflowModel workflow = new WorkflowModel(workflowDefinition: workflowDef, workflowId: workflowId, input: new HashMap(), status: WorkflowModel.Status.RUNNING, createdTime: System.currentTimeMillis())
WorkflowModel workflow = new WorkflowModel(workflowDefinition: workflowDef, workflowId: workflowId, input: new HashMap(), status: WorkflowModel.Status.RUNNING, createTime: System.currentTimeMillis())
executionDAO.createWorkflow(workflow)

and: 'create tasks for this workflow'
Expand Down Expand Up @@ -230,7 +230,7 @@ class CassandraExecutionDAOSpec extends CassandraSpec {
given: 'we create a workflow'
String workflowId = IDGenerator.generate()
WorkflowDef workflowDef = new WorkflowDef(name: 'def1', version: 1)
WorkflowModel workflow = new WorkflowModel(workflowDefinition: workflowDef, workflowId: workflowId, input: new HashMap(), status: WorkflowModel.Status.RUNNING, createdTime: System.currentTimeMillis())
WorkflowModel workflow = new WorkflowModel(workflowDefinition: workflowDef, workflowId: workflowId, input: new HashMap(), status: WorkflowModel.Status.RUNNING, createTime: System.currentTimeMillis())
executionDAO.createWorkflow(workflow)

and: 'create tasks for this workflow'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@ public static void init() throws Exception {
map.put("SomeKey", null);
JSON_RESPONSE = objectMapper.writeValueAsString(map);

final TypeReference<Map<String, Object>> mapOfObj =
new TypeReference<>() {
};
final TypeReference<Map<String, Object>> mapOfObj = new TypeReference<>() {};
MockServerClient client =
new MockServerClient(mockServer.getHost(), mockServer.getServerPort());
client.when(request().withPath("/post").withMethod("POST"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ public String updateWorkflow(WorkflowModel workflow) {
executionDAO.updateWorkflow(leanWorkflow);
if (properties.isAsyncIndexingEnabled()) {
if (workflow.getStatus().isTerminal()
&& workflow.getEndTime() - workflow.getCreatedTime()
&& workflow.getEndTime() - workflow.getCreateTime()
< properties.getAsyncUpdateShortRunningWorkflowDuration().toMillis()) {
final String workflowId = workflow.getWorkflowId();
DelayWorkflowUpdate delayWorkflowUpdate = new DelayWorkflowUpdate(workflowId);
Expand Down Expand Up @@ -429,11 +429,19 @@ public List<Task> getTasksForWorkflow(String workflowId) {
}

public TaskModel getTaskModel(String taskId) {
return modelMapper.getFullCopy(getTaskFromDatastore(taskId));
TaskModel taskModel = getTaskFromDatastore(taskId);
if (taskModel != null) {
return modelMapper.getFullCopy(taskModel);
}
return null;
}

public Task getTask(String taskId) {
return modelMapper.getTask(getTaskFromDatastore(taskId));
TaskModel taskModel = getTaskFromDatastore(taskId);
if (taskModel != null) {
return modelMapper.getTask(taskModel);
}
return null;
}

private TaskModel getTaskFromDatastore(String taskId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,6 @@ Optional<TaskModel> retry(
rescheduled.setStartTime(0);
rescheduled.setEndTime(0);
rescheduled.setWorkerId(null);
rescheduled.setSeq(0);

if (StringUtils.isNotBlank(task.getExternalInputPayloadStoragePath())) {
rescheduled.setExternalInputPayloadStoragePath(
Expand Down Expand Up @@ -620,7 +619,7 @@ void checkWorkflowTimeout(WorkflowModel workflow) {
long elapsedTime =
workflow.getLastRetriedTime() > 0
? now - workflow.getLastRetriedTime()
: now - workflow.getCreatedTime();
: now - workflow.getCreateTime();

if (elapsedTime < timeout) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ public String startWorkflow(
workflow.setParentWorkflowId(parentWorkflowId);
workflow.setParentWorkflowTaskId(parentWorkflowTaskId);
workflow.setOwnerApp(WorkflowContext.get().getClientApp());
workflow.setCreatedTime(System.currentTimeMillis());
workflow.setCreateTime(System.currentTimeMillis());
workflow.setUpdatedBy(null);
workflow.setUpdatedTime(null);
workflow.setEvent(event);
Expand Down Expand Up @@ -568,7 +568,7 @@ public void restart(String workflowId, boolean useLatestDefinitions) {

workflow.getTasks().clear();
workflow.setReasonForIncompletion(null);
workflow.setCreatedTime(System.currentTimeMillis());
workflow.setCreateTime(System.currentTimeMillis());
workflow.setEndTime(0);
workflow.setLastRetriedTime(0);
// Change the status to running
Expand Down Expand Up @@ -869,7 +869,7 @@ WorkflowModel completeWorkflow(WorkflowModel workflow) {
workflowStatusListener.onWorkflowCompletedIfEnabled(workflow);
Monitors.recordWorkflowCompletion(
workflow.getWorkflowName(),
workflow.getEndTime() - workflow.getCreatedTime(),
workflow.getEndTime() - workflow.getCreateTime(),
workflow.getOwnerApp());

if (workflow.hasParent()) {
Expand Down
16 changes: 9 additions & 7 deletions core/src/main/java/com/netflix/conductor/model/TaskModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,20 @@
*/
package com.netflix.conductor.model;

import com.google.protobuf.Any;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;

import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;

import com.google.protobuf.Any;

public class TaskModel {

public enum Status {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@
package com.netflix.conductor.model;

import java.util.*;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;

import com.netflix.conductor.common.metadata.workflow.WorkflowDef;

import com.google.common.base.Preconditions;
import org.springframework.beans.BeanUtils;

public class WorkflowModel {

Expand Down Expand Up @@ -91,7 +90,7 @@ public boolean isSuccessful() {

private String ownerApp;

private Long createdTime;
private Long createTime;

private Long updatedTime;

Expand Down Expand Up @@ -276,12 +275,12 @@ public void setOwnerApp(String ownerApp) {
this.ownerApp = ownerApp;
}

public Long getCreatedTime() {
return createdTime;
public Long getCreateTime() {
return createTime;
}

public void setCreatedTime(Long createdTime) {
this.createdTime = createdTime;
public void setCreateTime(Long createTime) {
this.createTime = createTime;
}

public Long getUpdatedTime() {
Expand Down Expand Up @@ -411,7 +410,7 @@ && getStatus() == that.getStatus()
that.getExternalOutputPayloadStoragePath())
&& Objects.equals(getVariables(), that.getVariables())
&& Objects.equals(getOwnerApp(), that.getOwnerApp())
&& Objects.equals(getCreatedTime(), that.getCreatedTime())
&& Objects.equals(getCreateTime(), that.getCreateTime())
&& Objects.equals(getUpdatedTime(), that.getUpdatedTime())
&& Objects.equals(getCreatedBy(), that.getCreatedBy())
&& Objects.equals(getUpdatedBy(), that.getUpdatedBy());
Expand Down Expand Up @@ -441,7 +440,7 @@ public int hashCode() {
getVariables(),
getLastRetriedTime(),
getOwnerApp(),
getCreatedTime(),
getCreateTime(),
getUpdatedTime(),
getCreatedBy(),
getUpdatedBy());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public void testWorkflowWithNoTasks() throws Exception {

WorkflowModel workflow = new WorkflowModel();
workflow.setWorkflowDefinition(def);
workflow.setCreatedTime(0L);
workflow.setCreateTime(0L);
workflow.getInput().put("param1", "nested");
workflow.getInput().put("param2", "one");

Expand Down Expand Up @@ -214,7 +214,7 @@ public void testWorkflowWithNoTasksWithSwitch() throws Exception {

WorkflowModel workflow = new WorkflowModel();
workflow.setWorkflowDefinition(def);
workflow.setCreatedTime(0L);
workflow.setCreateTime(0L);
workflow.getInput().put("param1", "nested");
workflow.getInput().put("param2", "one");

Expand Down Expand Up @@ -252,7 +252,7 @@ public void testRetries() {
WorkflowModel workflow = new WorkflowModel();
workflow.setWorkflowDefinition(def);
workflow.getInput().put("requestId", 123);
workflow.setCreatedTime(System.currentTimeMillis());
workflow.setCreateTime(System.currentTimeMillis());
DeciderOutcome outcome = deciderService.decide(workflow);
assertNotNull(outcome);

Expand Down Expand Up @@ -319,7 +319,7 @@ public void testRetries() {
workflow = new WorkflowModel();
workflow.setWorkflowDefinition(def);
workflow.getInput().put("requestId", 123);
workflow.setCreatedTime(System.currentTimeMillis());
workflow.setCreateTime(System.currentTimeMillis());

workflow.getInput().put("forks", forks);
workflow.getInput().put("forkedInputs", forkedInputs);
Expand Down Expand Up @@ -385,7 +385,7 @@ public void testOptional() {

WorkflowModel workflow = new WorkflowModel();
workflow.setWorkflowDefinition(def);
workflow.setCreatedTime(System.currentTimeMillis());
workflow.setCreateTime(System.currentTimeMillis());
DeciderOutcome outcome = deciderService.decide(workflow);
assertNotNull(outcome);
assertEquals(1, outcome.tasksToBeScheduled.size());
Expand Down Expand Up @@ -477,7 +477,7 @@ public void testOptionalWithDynamicFork() {
workflow.getInput().put("forks", forks);
workflow.getInput().put("forkedInputs", forkedInputs);

workflow.setCreatedTime(System.currentTimeMillis());
workflow.setCreateTime(System.currentTimeMillis());
DeciderOutcome outcome = deciderService.decide(workflow);
assertNotNull(outcome);
assertEquals(5, outcome.tasksToBeScheduled.size());
Expand Down Expand Up @@ -556,7 +556,7 @@ public void testDecisionCases() {

WorkflowModel workflow = new WorkflowModel();
workflow.setWorkflowDefinition(def);
workflow.setCreatedTime(System.currentTimeMillis());
workflow.setCreateTime(System.currentTimeMillis());
DeciderOutcome outcome = deciderService.decide(workflow);
assertNotNull(outcome);
assertEquals(2, outcome.tasksToBeScheduled.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ public void testCaseStatement() {

WorkflowModel workflow = new WorkflowModel();
workflow.setWorkflowDefinition(def);
workflow.setCreatedTime(0L);
workflow.setCreateTime(0L);
workflow.setWorkflowId("a");
workflow.setCorrelationId("b");
workflow.setStatus(WorkflowModel.Status.RUNNING);
Expand Down Expand Up @@ -760,10 +760,10 @@ public void testWorkflowTaskRetry() {

@Test
public void testLinearBackoff() {
Workflow workflow = createDefaultWorkflow();
WorkflowModel workflow = createDefaultWorkflow();

Task task = new Task();
task.setStatus(Status.FAILED);
TaskModel task = new TaskModel();
task.setStatus(TaskModel.Status.FAILED);
task.setTaskId("t1");

TaskDef taskDef = new TaskDef();
Expand All @@ -772,19 +772,22 @@ public void testLinearBackoff() {
taskDef.setBackoffScaleFactor(2);
WorkflowTask workflowTask = new WorkflowTask();

Optional<Task> task2 = deciderService.retry(taskDef, workflowTask, task, workflow);
Optional<TaskModel> task2 = deciderService.retry(taskDef, workflowTask, task, workflow);
assertEquals(120, task2.get().getCallbackAfterSeconds()); // 60*2*1

Optional<Task> task3 = deciderService.retry(taskDef, workflowTask, task2.get(), workflow);
Optional<TaskModel> task3 =
deciderService.retry(taskDef, workflowTask, task2.get(), workflow);
assertEquals(240, task3.get().getCallbackAfterSeconds()); // 60*2*2

Optional<Task> task4 = deciderService.retry(taskDef, workflowTask, task3.get(), workflow);
Optional<TaskModel> task4 =
deciderService.retry(taskDef, workflowTask, task3.get(), workflow);
// // 60*2*3
assertEquals(360, task4.get().getCallbackAfterSeconds()); // 60*2*3

taskDef.setRetryCount(Integer.MAX_VALUE);
task4.get().setRetryCount(Integer.MAX_VALUE - 100);
Optional<Task> task5 = deciderService.retry(taskDef, workflowTask, task4.get(), workflow);
Optional<TaskModel> task5 =
deciderService.retry(taskDef, workflowTask, task4.get(), workflow);
assertEquals(Integer.MAX_VALUE, task5.get().getCallbackAfterSeconds());
}

Expand Down Expand Up @@ -1145,7 +1148,7 @@ public void testCheckWorkflowTimeout() {
workflowDef.setName("test");
WorkflowModel workflow = new WorkflowModel();
workflow.setOwnerApp("junit");
workflow.setCreatedTime(System.currentTimeMillis() - 10_000);
workflow.setCreateTime(System.currentTimeMillis() - 10_000);
workflow.setWorkflowId("workflow_id");

// no-op
Expand Down
Loading

0 comments on commit 9830fca

Please sign in to comment.