diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/TaskController.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/TaskController.java index d9110ebcae7..68edd6052c1 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/TaskController.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/TaskController.java @@ -33,6 +33,7 @@ import org.springframework.web.bind.annotation.PutMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @@ -69,4 +70,11 @@ Result> listInstance(@RequestBody @NotNull Insta Result tmpExecute(@RequestBody @NotNull ExecuteReq req) { return Result.success(iTaskService.tmpExecute(req)); } + + @PostMapping("/kill") + @ApiOperation(value = "kill running instance", httpMethod = "POST") + Result kill(@RequestParam Long instanceId) { + iTaskService.kill(instanceId); + return Result.success(); + } } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskService.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskService.java index 475da0e512c..78960c72796 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskService.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskService.java @@ -36,4 +36,6 @@ public interface ITaskService { PageInfo listInstance(InstanceListReq req); InstanceSimpleInfoRes tmpExecute(ExecuteReq req); + + void kill(Long instanceId); } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskServiceImpl.java index 7b779d81db6..b12d8c2b7a9 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskServiceImpl.java @@ -224,6 +224,11 @@ public InstanceSimpleInfoRes tmpExecute(ExecuteReq req) { return this.translate(iJobService.execute(dto)); } + @Override + public void kill(Long instanceId) { + iJobService.kill(instanceId); + } + private JobSimpleInfoRes translate(JobSimpleInfoDto dto) { return JobSimpleInfoRes.builder() .jobId(dto.getJobId()) diff --git a/seatunnel-server/seatunnel-scheduler/seatunnel-scheduler-dolphinscheduler/src/main/java/org/apache/seatunnel/scheduler/dolphinscheduler/ExecuteTypeEnum.java b/seatunnel-server/seatunnel-scheduler/seatunnel-scheduler-dolphinscheduler/src/main/java/org/apache/seatunnel/scheduler/dolphinscheduler/ExecuteTypeEnum.java new file mode 100644 index 00000000000..f8b77a877fb --- /dev/null +++ b/seatunnel-server/seatunnel-scheduler/seatunnel-scheduler-dolphinscheduler/src/main/java/org/apache/seatunnel/scheduler/dolphinscheduler/ExecuteTypeEnum.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.scheduler.dolphinscheduler; + +public enum ExecuteTypeEnum { + NONE, + REPEAT_RUNNING, + RECOVER_SUSPENDED_PROCESS, + START_FAILURE_TASK_PROCESS, + STOP, + PAUSE +} diff --git a/seatunnel-server/seatunnel-scheduler/seatunnel-scheduler-dolphinscheduler/src/main/java/org/apache/seatunnel/scheduler/dolphinscheduler/IDolphinschedulerService.java b/seatunnel-server/seatunnel-scheduler/seatunnel-scheduler-dolphinscheduler/src/main/java/org/apache/seatunnel/scheduler/dolphinscheduler/IDolphinschedulerService.java index 8e0769caee4..8085a69ad9f 100644 --- a/seatunnel-server/seatunnel-scheduler/seatunnel-scheduler-dolphinscheduler/src/main/java/org/apache/seatunnel/scheduler/dolphinscheduler/IDolphinschedulerService.java +++ b/seatunnel-server/seatunnel-scheduler/seatunnel-scheduler-dolphinscheduler/src/main/java/org/apache/seatunnel/scheduler/dolphinscheduler/IDolphinschedulerService.java @@ -57,4 +57,6 @@ public interface IDolphinschedulerService { PageData listTaskInstance(ListProcessInstanceDto dto); void deleteProcessDefinition(long code); + + void killProcessInstance(long processInstanceId); } diff --git a/seatunnel-server/seatunnel-scheduler/seatunnel-scheduler-dolphinscheduler/src/main/java/org/apache/seatunnel/scheduler/dolphinscheduler/constants/DolphinschedulerConstants.java b/seatunnel-server/seatunnel-scheduler/seatunnel-scheduler-dolphinscheduler/src/main/java/org/apache/seatunnel/scheduler/dolphinscheduler/constants/DolphinschedulerConstants.java index 110a190cedd..d6733a3b9a2 100644 --- a/seatunnel-server/seatunnel-scheduler/seatunnel-scheduler-dolphinscheduler/src/main/java/org/apache/seatunnel/scheduler/dolphinscheduler/constants/DolphinschedulerConstants.java +++ b/seatunnel-server/seatunnel-scheduler/seatunnel-scheduler-dolphinscheduler/src/main/java/org/apache/seatunnel/scheduler/dolphinscheduler/constants/DolphinschedulerConstants.java @@ -44,6 +44,7 @@ public class DolphinschedulerConstants { public static final String SCHEDULE_ONLINE = "/projects/%s/schedules/%s/online"; public static final String SCHEDULE_OFFLINE = "/projects/%s/schedules/%s/offline"; public static final String DELETE_PROCESS_DEFINITION = "/projects/%s/process-definition/%s"; + public static final String EXECUTE = "/projects/%s/executors/execute"; /** * request param @@ -167,6 +168,8 @@ public class DolphinschedulerConstants { public static final String EXEC_TYPE_DEFAULT = "START_PROCESS"; public static final String EXEC_TYPE_COMPLEMENT = "COMPLEMENT_DATA"; public static final String DEPENDENT_MODE_DEFAULT = "OFF_MODE"; + public static final String PROCESS_INSTANCE_ID = "processInstanceId"; + public static final String EXECUTE_TYPE = "executeType"; /** * response param diff --git a/seatunnel-server/seatunnel-scheduler/seatunnel-scheduler-dolphinscheduler/src/main/java/org/apache/seatunnel/scheduler/dolphinscheduler/dto/TaskInstanceDto.java b/seatunnel-server/seatunnel-scheduler/seatunnel-scheduler-dolphinscheduler/src/main/java/org/apache/seatunnel/scheduler/dolphinscheduler/dto/TaskInstanceDto.java index 2e49196f64f..c036df943db 100644 --- a/seatunnel-server/seatunnel-scheduler/seatunnel-scheduler-dolphinscheduler/src/main/java/org/apache/seatunnel/scheduler/dolphinscheduler/dto/TaskInstanceDto.java +++ b/seatunnel-server/seatunnel-scheduler/seatunnel-scheduler-dolphinscheduler/src/main/java/org/apache/seatunnel/scheduler/dolphinscheduler/dto/TaskInstanceDto.java @@ -32,7 +32,7 @@ public class TaskInstanceDto { private boolean firstRun; private int dryRun; private String flag; - private int environmentCode; + private long environmentCode; private String processInstance; private int pid; private String taskParams; diff --git a/seatunnel-server/seatunnel-scheduler/seatunnel-scheduler-dolphinscheduler/src/main/java/org/apache/seatunnel/scheduler/dolphinscheduler/impl/DolphinschedulerServiceImpl.java b/seatunnel-server/seatunnel-scheduler/seatunnel-scheduler-dolphinscheduler/src/main/java/org/apache/seatunnel/scheduler/dolphinscheduler/impl/DolphinschedulerServiceImpl.java index 746192a9fe7..d4de2383f22 100644 --- a/seatunnel-server/seatunnel-scheduler/seatunnel-scheduler-dolphinscheduler/src/main/java/org/apache/seatunnel/scheduler/dolphinscheduler/impl/DolphinschedulerServiceImpl.java +++ b/seatunnel-server/seatunnel-scheduler/seatunnel-scheduler-dolphinscheduler/src/main/java/org/apache/seatunnel/scheduler/dolphinscheduler/impl/DolphinschedulerServiceImpl.java @@ -34,6 +34,8 @@ import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.END_TIME; import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.ENVIRONMENT_CODE; import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.ENVIRONMENT_CODE_DEFAULT; +import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.EXECUTE; +import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.EXECUTE_TYPE; import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.FAILED_NODE_DEFAULT; import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.FAILURE_STRATEGY; import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.FAILURE_STRATEGY_DEFAULT; @@ -62,6 +64,7 @@ import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.PROCESS_DEFINITION; import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.PROCESS_DEFINITION_CODE; import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.PROCESS_DEFINITION_NAME; +import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.PROCESS_INSTANCE_ID; import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.PROCESS_INSTANCE_NAME; import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.PROCESS_INSTANCE_PRIORITY; import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.PROCESS_INSTANCE_PRIORITY_DEFAULT; @@ -116,6 +119,7 @@ import static org.apache.seatunnel.server.common.SeatunnelErrorEnum.NO_MATCHED_PROJECT; import static org.apache.seatunnel.server.common.SeatunnelErrorEnum.UNEXPECTED_RETURN_CODE; +import org.apache.seatunnel.scheduler.dolphinscheduler.ExecuteTypeEnum; import org.apache.seatunnel.scheduler.dolphinscheduler.IDolphinschedulerService; import org.apache.seatunnel.scheduler.dolphinscheduler.dto.ConditionResult; import org.apache.seatunnel.scheduler.dolphinscheduler.dto.ListProcessDefinitionDto; @@ -187,7 +191,7 @@ public void afterPropertiesSet() throws Exception { } @Override - public ProcessDefinitionDto createOrUpdateProcessDefinition(UpdateProcessDefinitionDto dto) { + public ProcessDefinitionDto createOrUpdateProcessDefinition(UpdateProcessDefinitionDto dto) { // gen task code final List taskCodes = genTaskCodes(defaultProjectCode, GEN_NUM_DEFAULT); @@ -442,6 +446,11 @@ public void deleteProcessDefinition(long code) { checkResult(result, false); } + @Override + public void killProcessInstance(long processInstanceId) { + execute(processInstanceId, ExecuteTypeEnum.STOP); + } + private ProjectDto queryProjectCodeByName(String projectName) throws IOException { final Map result = HttpUtils.builder() .withUrl(apiPrefix.concat(QUERY_PROJECT_LIST_PAGING)) @@ -459,6 +468,17 @@ private ProjectDto queryProjectCodeByName(String projectName) throws IOException return projectDto; } + private void execute(long processInstanceId, ExecuteTypeEnum executeType) { + final Map result = HttpUtils.builder() + .withUrl(apiPrefix.concat(String.format(EXECUTE, defaultProjectCode))) + .withMethod(Connection.Method.POST) + .withRequestBody(this.objectToString(null)) + .withData(createParamMap(PROCESS_INSTANCE_ID, processInstanceId, EXECUTE_TYPE, executeType.name())) + .withToken(TOKEN, token) + .execute(Map.class); + checkResult(result, false); + } + private TaskDefinitionDto buildTaskDefinitionJson(Long taskCode, TaskDescriptionDto taskDescriptionDto) { final ResourceDto resourceDto = createOrUpdateScriptContent(taskDescriptionDto.getName(), taskDescriptionDto.getContent()); final TaskDefinitionDto taskDefinitionDto = new TaskDefinitionDto(); diff --git a/seatunnel-server/seatunnel-scheduler/seatunnel-scheduler-dolphinscheduler/src/main/java/org/apache/seatunnel/scheduler/dolphinscheduler/impl/InstanceServiceImpl.java b/seatunnel-server/seatunnel-scheduler/seatunnel-scheduler-dolphinscheduler/src/main/java/org/apache/seatunnel/scheduler/dolphinscheduler/impl/InstanceServiceImpl.java index 1c7f7555dc7..ff295d8ddad 100644 --- a/seatunnel-server/seatunnel-scheduler/seatunnel-scheduler-dolphinscheduler/src/main/java/org/apache/seatunnel/scheduler/dolphinscheduler/impl/InstanceServiceImpl.java +++ b/seatunnel-server/seatunnel-scheduler/seatunnel-scheduler-dolphinscheduler/src/main/java/org/apache/seatunnel/scheduler/dolphinscheduler/impl/InstanceServiceImpl.java @@ -51,7 +51,8 @@ public PageData list(InstanceListDto dto) { final PageData instancePageData = iDolphinschedulerService.listTaskInstance(listDto); final List data = instancePageData.getData().stream().map(t -> InstanceDto.builder() - .instanceId(t.getId()) + // use processInstanceId instead of origin task instance id. + .instanceId(t.getProcessInstanceId()) .instanceCode(t.getProcessInstanceId()) .instanceName(t.getProcessInstanceName()) .status(t.getState()) diff --git a/seatunnel-server/seatunnel-scheduler/seatunnel-scheduler-dolphinscheduler/src/main/java/org/apache/seatunnel/scheduler/dolphinscheduler/impl/JobServiceImpl.java b/seatunnel-server/seatunnel-scheduler/seatunnel-scheduler-dolphinscheduler/src/main/java/org/apache/seatunnel/scheduler/dolphinscheduler/impl/JobServiceImpl.java index be898a437ce..b3d8f525e3d 100644 --- a/seatunnel-server/seatunnel-scheduler/seatunnel-scheduler-dolphinscheduler/src/main/java/org/apache/seatunnel/scheduler/dolphinscheduler/impl/JobServiceImpl.java +++ b/seatunnel-server/seatunnel-scheduler/seatunnel-scheduler-dolphinscheduler/src/main/java/org/apache/seatunnel/scheduler/dolphinscheduler/impl/JobServiceImpl.java @@ -214,6 +214,11 @@ public InstanceDto execute(ExecuteDto dto) { } } + @Override + public void kill(Long instanceId) { + iDolphinschedulerService.killProcessInstance(instanceId); + } + private ProcessDefinitionDto getProcessDefinitionDto(JobDto dto) { final TaskDescriptionDto taskDescriptionDto = TaskDescriptionDto.builder() .name(dto.getJobName()) diff --git a/seatunnel-server/seatunnel-spi/src/main/java/org/apache/seatunnel/spi/scheduler/IJobService.java b/seatunnel-server/seatunnel-spi/src/main/java/org/apache/seatunnel/spi/scheduler/IJobService.java index e160fc36d98..7a629124074 100644 --- a/seatunnel-server/seatunnel-spi/src/main/java/org/apache/seatunnel/spi/scheduler/IJobService.java +++ b/seatunnel-server/seatunnel-spi/src/main/java/org/apache/seatunnel/spi/scheduler/IJobService.java @@ -33,4 +33,6 @@ public interface IJobService { PageData list(JobListDto dto); InstanceDto execute(ExecuteDto dto); + + void kill(Long instanceId); }