Skip to content

Commit

Permalink
Add WorkflowExecuteContext (#14544)
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun authored Jul 14, 2023
1 parent c812bf9 commit e6d9463
Show file tree
Hide file tree
Showing 39 changed files with 1,035 additions and 753 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.api.dto.gantt;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* gantt DTO
*/
import lombok.Data;

@Data
public class GanttDto {

/**
Expand All @@ -37,9 +38,9 @@ public class GanttDto {
private List<Task> tasks = new ArrayList<>();

/**
* task name list
* task code list
*/
private List<String> taskNames;
private List<Long> taskNames;

/**
* task status map
Expand All @@ -50,48 +51,19 @@ public GanttDto() {
this.taskStatus = new HashMap<>();
taskStatus.put("success", "success");
}
public GanttDto(int height, List<Task> tasks, List<String> taskNames) {

public GanttDto(int height, List<Task> tasks, List<Long> taskNames) {
this();
this.height = height;
this.tasks = tasks;
this.taskNames = taskNames;
}
public GanttDto(int height, List<Task> tasks, List<String> taskNames, Map<String, String> taskStatus) {
this.height = height;
this.tasks = tasks;
this.taskNames = taskNames;
this.taskStatus = taskStatus;
}

public int getHeight() {
return height;
}

public void setHeight(int height) {
public GanttDto(int height, List<Task> tasks, List<Long> taskNames, Map<String, String> taskStatus) {
this.height = height;
}

public List<Task> getTasks() {
return tasks;
}

public void setTasks(List<Task> tasks) {
this.tasks = tasks;
}

public List<String> getTaskNames() {
return taskNames;
}

public void setTaskNames(List<String> taskNames) {
this.taskNames = taskNames;
}

public Map<String, String> getTaskStatus() {
return taskStatus;
}

public void setTaskStatus(Map<String, String> taskStatus) {
this.taskStatus = taskStatus;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1894,12 +1894,12 @@ public Map<String, Object> viewTree(User loginUser, long projectCode, long code,
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
return result;
}
DAG<String, TaskNode, TaskNodeRelation> dag = processService.genDagGraph(processDefinition);
DAG<Long, TaskNode, TaskNodeRelation> dag = processService.genDagGraph(processDefinition);
// nodes that are running
Map<String, List<TreeViewDto>> runningNodeMap = new ConcurrentHashMap<>();
Map<Long, List<TreeViewDto>> runningNodeMap = new ConcurrentHashMap<>();

// nodes that are waiting to run
Map<String, List<TreeViewDto>> waitingRunningNodeMap = new ConcurrentHashMap<>();
Map<Long, List<TreeViewDto>> waitingRunningNodeMap = new ConcurrentHashMap<>();

// List of process instances
List<ProcessInstance> processInstanceList = processInstanceService.queryByProcessDefineCode(code, limit);
Expand Down Expand Up @@ -1937,16 +1937,16 @@ public Map<String, Object> viewTree(User loginUser, long projectCode, long code,
List<TreeViewDto> parentTreeViewDtoList = new ArrayList<>();
parentTreeViewDtoList.add(parentTreeViewDto);
// Here is the encapsulation task instance
for (String startNode : dag.getBeginNode()) {
for (Long startNode : dag.getBeginNode()) {
runningNodeMap.put(startNode, parentTreeViewDtoList);
}

while (!ServerLifeCycleManager.isStopped()) {
Set<String> postNodeList;
Iterator<Map.Entry<String, List<TreeViewDto>>> iter = runningNodeMap.entrySet().iterator();
Set<Long> postNodeList;
Iterator<Map.Entry<Long, List<TreeViewDto>>> iter = runningNodeMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, List<TreeViewDto>> en = iter.next();
String nodeCode = en.getKey();
Map.Entry<Long, List<TreeViewDto>> en = iter.next();
Long nodeCode = en.getKey();
parentTreeViewDtoList = en.getValue();

TreeViewDto treeViewDto = new TreeViewDto();
Expand All @@ -1957,8 +1957,8 @@ public Map<String, Object> viewTree(User loginUser, long projectCode, long code,
// set treeViewDto instances
for (int i = limit - 1; i >= 0; i--) {
ProcessInstance processInstance = processInstanceList.get(i);
TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndCode(processInstance.getId(),
Long.parseLong(nodeCode));
TaskInstance taskInstance =
taskInstanceMapper.queryByInstanceIdAndCode(processInstance.getId(), nodeCode);
if (taskInstance == null) {
treeViewDto.getInstances().add(new Instance(-1, "not running", 0, "null"));
} else {
Expand All @@ -1985,7 +1985,7 @@ public Map<String, Object> viewTree(User loginUser, long projectCode, long code,
}
postNodeList = dag.getSubsequentNodes(nodeCode);
if (CollectionUtils.isNotEmpty(postNodeList)) {
for (String nextNodeCode : postNodeList) {
for (Long nextNodeCode : postNodeList) {
List<TreeViewDto> treeViewDtoList = waitingRunningNodeMap.get(nextNodeCode);
if (CollectionUtils.isEmpty(treeViewDtoList)) {
treeViewDtoList = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1016,22 +1016,20 @@ public Map<String, Object> viewGantt(long projectCode, Integer processInstanceId
return result;
}
GanttDto ganttDto = new GanttDto();
DAG<String, TaskNode, TaskNodeRelation> dag = processService.genDagGraph(processDefinition);
DAG<Long, TaskNode, TaskNodeRelation> dag = processService.genDagGraph(processDefinition);
// topological sort
List<String> nodeList = dag.topologicalSort();
List<Long> nodeList = dag.topologicalSort();

ganttDto.setTaskNames(nodeList);

List<Task> taskList = new ArrayList<>();
if (CollectionUtils.isNotEmpty(nodeList)) {
List<Long> taskCodes = nodeList.stream().map(Long::parseLong).collect(Collectors.toList());
List<TaskInstance> taskInstances = taskInstanceMapper.queryByProcessInstanceIdsAndTaskCodes(
Collections.singletonList(processInstanceId), taskCodes);
for (String node : nodeList) {
Collections.singletonList(processInstanceId), nodeList);
for (Long node : nodeList) {
TaskInstance taskInstance = null;
for (TaskInstance instance : taskInstances) {
if (instance.getProcessInstanceId() == processInstanceId
&& instance.getTaskCode() == Long.parseLong(node)) {
if (instance.getProcessInstanceId() == processInstanceId && instance.getTaskCode() == node) {
taskInstance = instance;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,9 +745,9 @@ public void testViewGantt() throws Exception {
processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog());
when(processInstanceMapper.queryDetailById(1)).thenReturn(processInstance);
when(taskInstanceMapper.queryByInstanceIdAndName(Mockito.anyInt(), Mockito.any())).thenReturn(taskInstance);
DAG<String, TaskNode, TaskNodeRelation> graph = new DAG<>();
for (int i = 1; i <= 7; ++i) {
graph.addNode(i + "", new TaskNode());
DAG<Long, TaskNode, TaskNodeRelation> graph = new DAG<>();
for (long i = 1; i <= 7; ++i) {
graph.addNode(i, new TaskNode());
}

when(processService.genDagGraph(Mockito.any(ProcessDefinition.class)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,63 +17,23 @@

package org.apache.dolphinscheduler.common.model;

import java.util.Objects;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class TaskNodeRelation {

/**
* task start node name
* task start node Code
*/
private String startNode;
private Long startNode;

/**
* task end node name
* task end node Code
*/
private String endNode;
private Long endNode;

public TaskNodeRelation() {
}

public TaskNodeRelation(String startNode, String endNode) {
this.startNode = startNode;
this.endNode = endNode;
}

public String getStartNode() {
return startNode;
}

public void setStartNode(String startNode) {
this.startNode = startNode;
}

public String getEndNode() {
return endNode;
}

public void setEndNode(String endNode) {
this.endNode = endNode;
}

@Override
public boolean equals(Object o) {
if (!(o instanceof TaskNodeRelation)) {
return false;
}
TaskNodeRelation relation = (TaskNodeRelation) o;
return (relation.getStartNode().equals(this.startNode) && relation.getEndNode().equals(this.endNode));
}

@Override
public String toString() {
return "TaskNodeRelation{"
+ "startNode='" + startNode + '\''
+ ", endNode='" + endNode + '\''
+ '}';
}

@Override
public int hashCode() {
return Objects.hash(startNode, endNode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.master.runner.WorkflowSubmitStatus;
import org.apache.dolphinscheduler.server.master.runner.WorkflowStartStatus;

import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -56,17 +56,18 @@ public void handleWorkflowEvent(final WorkflowEvent workflowEvent) throws Workfl
throw new WorkflowEventHandleError(
"The workflow start event is invalid, cannot find the workflow instance from cache");
}
ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance();
ProcessInstance processInstance =
workflowExecuteRunnable.getWorkflowExecuteContext().getWorkflowInstance();
ProcessInstanceMetrics.incProcessInstanceByStateAndProcessDefinitionCode("submit",
processInstance.getProcessDefinitionCode().toString());
CompletableFuture.supplyAsync(workflowExecuteRunnable::call, workflowExecuteThreadPool)
.thenAccept(workflowSubmitStatus -> {
if (WorkflowSubmitStatus.SUCCESS == workflowSubmitStatus) {
.thenAccept(workflowStartStatus -> {
if (WorkflowStartStatus.SUCCESS == workflowStartStatus) {
log.info("Success submit the workflow instance");
if (processInstance.getTimeout() > 0) {
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
}
} else if (WorkflowSubmitStatus.FAILED == workflowSubmitStatus) {
} else if (WorkflowStartStatus.FAILED == workflowStartStatus) {
log.error(
"Failed to submit the workflow instance, will resend the workflow start event: {}",
workflowEvent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public class WorkflowStateEventHandler implements StateEventHandler {
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable,
StateEvent stateEvent) throws StateEventHandleException {
WorkflowStateEvent workflowStateEvent = (WorkflowStateEvent) stateEvent;
ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance();
ProcessInstance processInstance =
workflowExecuteRunnable.getWorkflowExecuteContext().getWorkflowInstance();
ProcessDefinition processDefinition = processInstance.getProcessDefinition();
measureProcessState(workflowStateEvent, processInstance.getProcessDefinitionCode().toString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public class WorkflowTimeoutStateEventHandler implements StateEventHandler {
@Override
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent) {
log.info("Handle workflow instance timeout event");
ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance();
ProcessInstance processInstance =
workflowExecuteRunnable.getWorkflowExecuteContext().getWorkflowInstance();
ProcessInstanceMetrics.incProcessInstanceByStateAndProcessDefinitionCode("timeout",
processInstance.getProcessDefinitionCode().toString());
workflowExecuteRunnable.processTimeout();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.graph;

import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.service.model.TaskNode;

public interface IWorkflowGraph {

TaskNode getTaskNodeByCode(Long taskCode);

// todo: refactor DAG class
DAG<Long, TaskNode, TaskNodeRelation> getDag();

boolean isForbiddenTask(Long taskCode);

}
Loading

0 comments on commit e6d9463

Please sign in to comment.