Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

START_WORKFLOW system task #2870

Merged
merged 5 commits into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public enum TaskType {
JOIN,
DO_WHILE,
SUB_WORKFLOW,
START_WORKFLOW,
EVENT,
WAIT,
USER_DEFINED,
Expand All @@ -54,6 +55,7 @@ public enum TaskType {
public static final String TASK_TYPE_EVENT = "EVENT";
public static final String TASK_TYPE_WAIT = "WAIT";
public static final String TASK_TYPE_SUB_WORKFLOW = "SUB_WORKFLOW";
public static final String TASK_TYPE_START_WORKFLOW = "START_WORKFLOW";
public static final String TASK_TYPE_FORK_JOIN = "FORK_JOIN";
public static final String TASK_TYPE_SIMPLE = "SIMPLE";
public static final String TASK_TYPE_HTTP = "HTTP";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ public static boolean isEnvironmentVariable(String test) {
}
}
String value =
Optional.ofNullable(System.getProperty(test))
.orElseGet(() -> Optional.ofNullable(System.getenv(test)).orElse(null));
Optional.ofNullable(System.getProperty(test)).orElseGet(() -> System.getenv(test));
return value != null;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.execution.mapper;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import com.netflix.conductor.common.metadata.tasks.TaskType;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.core.exception.TerminateWorkflowException;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;

import static com.netflix.conductor.common.metadata.tasks.TaskType.START_WORKFLOW;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_START_WORKFLOW;

@Component
public class StartWorkflowTaskMapper implements TaskMapper {

private static final Logger LOGGER = LoggerFactory.getLogger(StartWorkflowTaskMapper.class);

@Override
public TaskType getTaskType() {
return START_WORKFLOW;
}

@Override
public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext)
throws TerminateWorkflowException {
WorkflowTask taskToSchedule = taskMapperContext.getTaskToSchedule();
WorkflowModel workflowInstance = taskMapperContext.getWorkflowInstance();
String taskId = taskMapperContext.getTaskId();

TaskModel startWorkflowTask = new TaskModel();
startWorkflowTask.setTaskType(TASK_TYPE_START_WORKFLOW);
startWorkflowTask.setReferenceTaskName(taskToSchedule.getTaskReferenceName());
startWorkflowTask.setWorkflowInstanceId(workflowInstance.getWorkflowId());
startWorkflowTask.setWorkflowType(workflowInstance.getWorkflowName());
startWorkflowTask.setCorrelationId(workflowInstance.getCorrelationId());
startWorkflowTask.setScheduledTime(System.currentTimeMillis());
startWorkflowTask.setTaskId(taskId);
startWorkflowTask.addInput(taskMapperContext.getTaskInput());
startWorkflowTask.setStatus(TaskModel.Status.SCHEDULED);
startWorkflowTask.setWorkflowTask(taskToSchedule);
startWorkflowTask.setWorkflowPriority(workflowInstance.getPriority());
startWorkflowTask.setCallbackAfterSeconds(taskToSchedule.getStartDelay());
LOGGER.debug("{} created", startWorkflowTask);
return List.of(startWorkflowTask);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ SubWorkflowParams getSubWorkflowParams(WorkflowTask taskToSchedule) {
String reason =
String.format(
"Task %s is defined as sub-workflow and is missing subWorkflowParams. "
+ "Please check the blueprint",
+ "Please check the workflow definition",
taskToSchedule.getName());
LOGGER.error(reason);
return new TerminateWorkflowException(reason);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.execution.tasks;

import java.util.Map;

import javax.validation.Validator;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest;
import com.netflix.conductor.core.exception.ApplicationException;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;

import com.fasterxml.jackson.databind.ObjectMapper;

import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_START_WORKFLOW;
import static com.netflix.conductor.model.TaskModel.Status.COMPLETED;
import static com.netflix.conductor.model.TaskModel.Status.FAILED;

@Component(TASK_TYPE_START_WORKFLOW)
public class StartWorkflow extends WorkflowSystemTask {

private static final Logger LOGGER = LoggerFactory.getLogger(StartWorkflow.class);

private static final String WORKFLOW_ID = "workflowId";
private static final String START_WORKFLOW_PARAMETER = "startWorkflow";

private final ObjectMapper objectMapper;
private final Validator validator;

public StartWorkflow(ObjectMapper objectMapper, Validator validator) {
super(TASK_TYPE_START_WORKFLOW);
this.objectMapper = objectMapper;
this.validator = validator;
}

@Override
public void start(
WorkflowModel workflow, TaskModel taskModel, WorkflowExecutor workflowExecutor) {
StartWorkflowRequest request = getRequest(taskModel);
if (request == null) {
return;
}

// set the correlation id of starter workflow, if its empty in the StartWorkflowRequest
request.setCorrelationId(
StringUtils.defaultIfBlank(
request.getCorrelationId(), workflow.getCorrelationId()));

try {
String workflowId = startWorkflow(request, workflowExecutor);
taskModel.addOutput(WORKFLOW_ID, workflowId);
taskModel.setStatus(COMPLETED);
} catch (ApplicationException ae) {
if (ae.isRetryable()) {
LOGGER.info(
"A transient backend error happened when task {} in {} tried to start workflow {}.",
taskModel.getTaskId(),
workflow.toShortString(),
request.getName());
} else {
taskModel.setStatus(FAILED);
taskModel.setReasonForIncompletion(ae.getMessage());
LOGGER.error(
"Error starting workflow: {} from workflow: {}",
request.getName(),
workflow.toShortString(),
ae);
}
} catch (Exception e) {
taskModel.setStatus(FAILED);
taskModel.setReasonForIncompletion(e.getMessage());
LOGGER.error(
"Error starting workflow: {} from workflow: {}",
request.getName(),
workflow.toShortString(),
e);
}
}

private StartWorkflowRequest getRequest(TaskModel taskModel) {
Map<String, Object> taskInput = taskModel.getInputData();

StartWorkflowRequest startWorkflowRequest = null;

if (taskInput.get(START_WORKFLOW_PARAMETER) == null) {
taskModel.setStatus(FAILED);
taskModel.setReasonForIncompletion(
"Missing '" + START_WORKFLOW_PARAMETER + "' in input data.");
} else {
try {
startWorkflowRequest =
objectMapper.convertValue(
taskInput.get(START_WORKFLOW_PARAMETER),
StartWorkflowRequest.class);

var violations = validator.validate(startWorkflowRequest);
if (!violations.isEmpty()) {
StringBuilder reasonForIncompletion =
new StringBuilder(START_WORKFLOW_PARAMETER)
.append(" validation failed. ");
for (var violation : violations) {
reasonForIncompletion
.append("'")
.append(violation.getPropertyPath().toString())
.append("' -> ")
.append(violation.getMessage())
.append(". ");
}
taskModel.setStatus(FAILED);
taskModel.setReasonForIncompletion(reasonForIncompletion.toString());
startWorkflowRequest = null;
}
} catch (IllegalArgumentException e) {
LOGGER.error("Error reading StartWorkflowRequest for {}", taskModel, e);
taskModel.setStatus(FAILED);
taskModel.setReasonForIncompletion(
"Error reading StartWorkflowRequest. " + e.getMessage());
}
}

return startWorkflowRequest;
}

private String startWorkflow(StartWorkflowRequest request, WorkflowExecutor workflowExecutor) {
if (request.getWorkflowDef() == null) {
return workflowExecutor.startWorkflow(
request.getName(),
request.getVersion(),
request.getCorrelationId(),
request.getPriority(),
request.getInput(),
request.getExternalInputPayloadStoragePath(),
null,
request.getTaskToDomain());
} else {
return workflowExecutor.startWorkflow(
request.getWorkflowDef(),
request.getInput(),
request.getExternalInputPayloadStoragePath(),
request.getCorrelationId(),
request.getPriority(),
null,
request.getTaskToDomain());
}
}

@Override
public boolean isAsync() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,6 @@ public String startWorkflow(
WorkflowDef workflowDef) {

if (workflowDef == null) {
workflowDef = metadataService.getWorkflowDef(name, version);
apanicker-nflx marked this conversation as resolved.
Show resolved Hide resolved
if (workflowDef == null) {
throw new ApplicationException(
ApplicationException.Code.NOT_FOUND,
String.format(
"No such workflow found by name: %s, version: %d", name, version));
}

return workflowExecutor.startWorkflow(
name,
version,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.execution.tasks


import javax.validation.ConstraintViolation
import javax.validation.Validator

import com.netflix.conductor.common.config.ObjectMapperProvider
import com.netflix.conductor.core.exception.ApplicationException
import com.netflix.conductor.core.execution.WorkflowExecutor
import com.netflix.conductor.model.TaskModel
import com.netflix.conductor.model.WorkflowModel

import spock.lang.Specification
import spock.lang.Subject

import static com.netflix.conductor.core.execution.tasks.StartWorkflow.START_WORKFLOW_PARAMETER
import static com.netflix.conductor.model.TaskModel.Status.FAILED
import static com.netflix.conductor.model.TaskModel.Status.SCHEDULED

/**
* Unit test for StartWorkflow. Success and Javax validation cases are covered by the StartWorkflowSpec in test-harness module.
*/
class StartWorkflowSpec extends Specification {

@Subject
StartWorkflow startWorkflow

WorkflowExecutor workflowExecutor
Validator validator
WorkflowModel workflowModel
TaskModel taskModel

def setup() {
workflowExecutor = Mock(WorkflowExecutor.class)
validator = Mock(Validator.class) {
validate(_) >> new HashSet<ConstraintViolation<Object>>()
}

def inputData = [:]
inputData[START_WORKFLOW_PARAMETER] = ['name': 'some_workflow']
taskModel = new TaskModel(status: SCHEDULED, inputData: inputData)
workflowModel = new WorkflowModel()

startWorkflow = new StartWorkflow(new ObjectMapperProvider().getObjectMapper(), validator)
}

def "StartWorkflow task is asynchronous"() {
expect:
startWorkflow.isAsync()
}

def "startWorkflow parameter is missing"() {
given: "a task with no start_workflow in input"
taskModel.inputData = [:]

when:
startWorkflow.start(workflowModel, taskModel, workflowExecutor)

then:
taskModel.status == FAILED
taskModel.reasonForIncompletion != null
}

def "ObjectMapper throws an IllegalArgumentException"() {
given: "a task with no start_workflow in input"
taskModel.inputData[START_WORKFLOW_PARAMETER] = "I can't be converted to StartWorkflowRequest"

when:
startWorkflow.start(workflowModel, taskModel, workflowExecutor)

then:
taskModel.status == FAILED
taskModel.reasonForIncompletion != null
}

def "WorkflowExecutor throws a retryable exception"() {
when:
startWorkflow.start(workflowModel, taskModel, workflowExecutor)

then:
taskModel.status == SCHEDULED
apanicker-nflx marked this conversation as resolved.
Show resolved Hide resolved
1 * workflowExecutor.startWorkflow(*_) >> { throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, "") }
}

def "WorkflowExecutor throws a non-retryable ApplicationException"() {
when:
startWorkflow.start(workflowModel, taskModel, workflowExecutor)

then:
taskModel.status == FAILED
taskModel.reasonForIncompletion != null
1 * workflowExecutor.startWorkflow(*_) >> { throw new ApplicationException(ApplicationException.Code.NOT_FOUND, "") }
}

def "WorkflowExecutor throws a RuntimeException"() {
when:
startWorkflow.start(workflowModel, taskModel, workflowExecutor)

then:
taskModel.status == FAILED
taskModel.reasonForIncompletion != null
1 * workflowExecutor.startWorkflow(*_) >> { throw new RuntimeException("I am an unexpected exception") }
}
}
Loading