Skip to content

Commit

Permalink
[seatunnel-1947][seatunnel-server] add kill instance. (apache#2616)
Browse files Browse the repository at this point in the history
* [seatunnel-1947][seatunnel-server] add kill instance.

* [seatunnel-1947][seatunnel-server] add kill instance.
  • Loading branch information
dijiekstra authored and MRYOG committed Sep 15, 2022
1 parent f8d7bec commit f929d98
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
Expand Down Expand Up @@ -69,4 +70,11 @@ Result<PageInfo<InstanceSimpleInfoRes>> listInstance(@RequestBody @NotNull Insta
Result<InstanceSimpleInfoRes> tmpExecute(@RequestBody @NotNull ExecuteReq req) {
return Result.success(iTaskService.tmpExecute(req));
}

@PostMapping("/kill")
@ApiOperation(value = "kill running instance", httpMethod = "POST")
Result<Void> kill(@RequestParam Long instanceId) {
iTaskService.kill(instanceId);
return Result.success();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ public interface ITaskService {
PageInfo<InstanceSimpleInfoRes> listInstance(InstanceListReq req);

InstanceSimpleInfoRes tmpExecute(ExecuteReq req);

void kill(Long instanceId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,11 @@ public InstanceSimpleInfoRes tmpExecute(ExecuteReq req) {
return this.translate(iJobService.execute(dto));
}

@Override
public void kill(Long instanceId) {
iJobService.kill(instanceId);
}

private JobSimpleInfoRes translate(JobSimpleInfoDto dto) {
return JobSimpleInfoRes.builder()
.jobId(dto.getJobId())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.scheduler.dolphinscheduler;

public enum ExecuteTypeEnum {
NONE,
REPEAT_RUNNING,
RECOVER_SUSPENDED_PROCESS,
START_FAILURE_TASK_PROCESS,
STOP,
PAUSE
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,6 @@ public interface IDolphinschedulerService {
PageData<TaskInstanceDto> listTaskInstance(ListProcessInstanceDto dto);

void deleteProcessDefinition(long code);

void killProcessInstance(long processInstanceId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class DolphinschedulerConstants {
public static final String SCHEDULE_ONLINE = "/projects/%s/schedules/%s/online";
public static final String SCHEDULE_OFFLINE = "/projects/%s/schedules/%s/offline";
public static final String DELETE_PROCESS_DEFINITION = "/projects/%s/process-definition/%s";
public static final String EXECUTE = "/projects/%s/executors/execute";

/**
* request param
Expand Down Expand Up @@ -167,6 +168,8 @@ public class DolphinschedulerConstants {
public static final String EXEC_TYPE_DEFAULT = "START_PROCESS";
public static final String EXEC_TYPE_COMPLEMENT = "COMPLEMENT_DATA";
public static final String DEPENDENT_MODE_DEFAULT = "OFF_MODE";
public static final String PROCESS_INSTANCE_ID = "processInstanceId";
public static final String EXECUTE_TYPE = "executeType";

/**
* response param
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class TaskInstanceDto {
private boolean firstRun;
private int dryRun;
private String flag;
private int environmentCode;
private long environmentCode;
private String processInstance;
private int pid;
private String taskParams;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.END_TIME;
import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.ENVIRONMENT_CODE;
import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.ENVIRONMENT_CODE_DEFAULT;
import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.EXECUTE;
import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.EXECUTE_TYPE;
import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.FAILED_NODE_DEFAULT;
import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.FAILURE_STRATEGY;
import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.FAILURE_STRATEGY_DEFAULT;
Expand Down Expand Up @@ -62,6 +64,7 @@
import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.PROCESS_DEFINITION;
import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.PROCESS_DEFINITION_CODE;
import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.PROCESS_DEFINITION_NAME;
import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.PROCESS_INSTANCE_ID;
import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.PROCESS_INSTANCE_NAME;
import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.PROCESS_INSTANCE_PRIORITY;
import static org.apache.seatunnel.scheduler.dolphinscheduler.constants.DolphinschedulerConstants.PROCESS_INSTANCE_PRIORITY_DEFAULT;
Expand Down Expand Up @@ -116,6 +119,7 @@
import static org.apache.seatunnel.server.common.SeatunnelErrorEnum.NO_MATCHED_PROJECT;
import static org.apache.seatunnel.server.common.SeatunnelErrorEnum.UNEXPECTED_RETURN_CODE;

import org.apache.seatunnel.scheduler.dolphinscheduler.ExecuteTypeEnum;
import org.apache.seatunnel.scheduler.dolphinscheduler.IDolphinschedulerService;
import org.apache.seatunnel.scheduler.dolphinscheduler.dto.ConditionResult;
import org.apache.seatunnel.scheduler.dolphinscheduler.dto.ListProcessDefinitionDto;
Expand Down Expand Up @@ -187,7 +191,7 @@ public void afterPropertiesSet() throws Exception {
}

@Override
public ProcessDefinitionDto createOrUpdateProcessDefinition(UpdateProcessDefinitionDto dto) {
public ProcessDefinitionDto createOrUpdateProcessDefinition(UpdateProcessDefinitionDto dto) {
// gen task code
final List<Long> taskCodes = genTaskCodes(defaultProjectCode, GEN_NUM_DEFAULT);

Expand Down Expand Up @@ -442,6 +446,11 @@ public void deleteProcessDefinition(long code) {
checkResult(result, false);
}

@Override
public void killProcessInstance(long processInstanceId) {
execute(processInstanceId, ExecuteTypeEnum.STOP);
}

private ProjectDto queryProjectCodeByName(String projectName) throws IOException {
final Map result = HttpUtils.builder()
.withUrl(apiPrefix.concat(QUERY_PROJECT_LIST_PAGING))
Expand All @@ -459,6 +468,17 @@ private ProjectDto queryProjectCodeByName(String projectName) throws IOException
return projectDto;
}

private void execute(long processInstanceId, ExecuteTypeEnum executeType) {
final Map result = HttpUtils.builder()
.withUrl(apiPrefix.concat(String.format(EXECUTE, defaultProjectCode)))
.withMethod(Connection.Method.POST)
.withRequestBody(this.objectToString(null))
.withData(createParamMap(PROCESS_INSTANCE_ID, processInstanceId, EXECUTE_TYPE, executeType.name()))
.withToken(TOKEN, token)
.execute(Map.class);
checkResult(result, false);
}

private TaskDefinitionDto buildTaskDefinitionJson(Long taskCode, TaskDescriptionDto taskDescriptionDto) {
final ResourceDto resourceDto = createOrUpdateScriptContent(taskDescriptionDto.getName(), taskDescriptionDto.getContent());
final TaskDefinitionDto taskDefinitionDto = new TaskDefinitionDto();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public PageData<InstanceDto> list(InstanceListDto dto) {
final PageData<TaskInstanceDto> instancePageData = iDolphinschedulerService.listTaskInstance(listDto);

final List<InstanceDto> data = instancePageData.getData().stream().map(t -> InstanceDto.builder()
.instanceId(t.getId())
// use processInstanceId instead of origin task instance id.
.instanceId(t.getProcessInstanceId())
.instanceCode(t.getProcessInstanceId())
.instanceName(t.getProcessInstanceName())
.status(t.getState())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,11 @@ public InstanceDto execute(ExecuteDto dto) {
}
}

@Override
public void kill(Long instanceId) {
iDolphinschedulerService.killProcessInstance(instanceId);
}

private ProcessDefinitionDto getProcessDefinitionDto(JobDto dto) {
final TaskDescriptionDto taskDescriptionDto = TaskDescriptionDto.builder()
.name(dto.getJobName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,6 @@ public interface IJobService {
PageData<JobSimpleInfoDto> list(JobListDto dto);

InstanceDto execute(ExecuteDto dto);

void kill(Long instanceId);
}

0 comments on commit f929d98

Please sign in to comment.