Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Master] Add task caching mechanism to improve the running speed of repetitive tasks #13194

Merged
merged 22 commits into from
Dec 18, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

import static org.apache.dolphinscheduler.api.enums.Status.FORCE_TASK_SUCCESS_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_PAGING_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.REMOVE_TASK_INSTANCE_CACHE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.TASK_SAVEPOINT_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.TASK_STOP_ERROR;

import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.TaskInstanceService;
import org.apache.dolphinscheduler.api.utils.Result;
Expand All @@ -34,6 +36,7 @@

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
Expand Down Expand Up @@ -188,4 +191,26 @@ public Result<Object> stopTask(@Parameter(hidden = true) @RequestAttribute(value
@PathVariable(value = "id") Integer id) {
return taskInstanceService.stopTask(loginUser, projectCode, id);
}

/**
* remove task instance cache
*
* @param loginUser login user
* @param projectCode project code
* @param id task instance id
* @return the result code and msg
*/
@Operation(summary = "remove-task-instance-cache", description = "REMOVE_TASK_INSTANCE_CACHE")
@Parameters({
@Parameter(name = "id", description = "TASK_INSTANCE_ID", required = true, schema = @Schema(implementation = int.class, example = "12"))
})
@DeleteMapping(value = "/{id}/remove-cache")
@ResponseStatus(HttpStatus.OK)
@ApiException(REMOVE_TASK_INSTANCE_CACHE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public TaskInstanceRemoveCacheResponse removeTaskInstanceCache(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable(value = "id") Integer id) {
return taskInstanceService.removeTaskInstanceCache(loginUser, projectCode, id);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.api.dto.taskInstance;

import org.apache.dolphinscheduler.api.utils.Result;

import java.util.List;

import lombok.Data;

/**
* task instance success response
*/
@Data
public class TaskInstanceRemoveCacheResponse extends Result {

private List<Integer> deleteTaskInstanceCacheIds;
private String cacheKey;

public TaskInstanceRemoveCacheResponse(Result result) {
super();
this.setCode(result.getCode());
this.setMsg(result.getMsg());
}

public TaskInstanceRemoveCacheResponse(Result result, List<Integer> deleteTaskInstanceCacheIds, String cacheKey) {
super();
this.setCode(result.getCode());
this.setMsg(result.getMsg());
this.deleteTaskInstanceCacheIds = deleteTaskInstanceCacheIds;
this.cacheKey = cacheKey;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,16 @@ public enum Status {
UDF_RESOURCE_IS_BOUND(20013, "udf resource file is bound by UDF functions:{0}", "udf函数绑定了资源文件[{0}]"),
RESOURCE_IS_USED(20014, "resource file is used by process definition", "资源文件被上线的流程定义使用了"),
PARENT_RESOURCE_NOT_EXIST(20015, "parent resource not exist", "父资源文件不存在"),

RESOURCE_NOT_EXIST_OR_NO_PERMISSION(20016,
"resource not exist or no permission,please view the task node and remove error resource",
"请检查任务节点并移除无权限或者已删除的资源"),
RESOURCE_IS_AUTHORIZED(20017, "resource is authorized to user {0},suffix not allowed to be modified",
"资源文件已授权其他用户[{0}],后缀不允许修改"),
RESOURCE_HAS_FOLDER(20018, "There are files or folders in the current directory:{0}", "当前目录下有文件或文件夹[{0}]"),

REMOVE_TASK_INSTANCE_CACHE_ERROR(20019, "remove task instance cache error", "删除任务实例缓存错误"),

USER_NO_OPERATION_PERM(30001, "user has no operation privilege", "当前用户没有操作权限"),
USER_NO_OPERATION_PROJECT_PERM(30002, "user {0} is not has project {1} permission", "当前用户[{0}]没有[{1}]项目的操作权限"),
USER_NO_WRITE_PROJECT_PERM(30003, "user [{0}] does not have write permission for project [{1}]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dolphinscheduler.api.service;

import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
Expand Down Expand Up @@ -100,4 +101,13 @@ Result forceTaskSuccess(User loginUser,
* @return the result code and msg
*/
TaskInstance queryTaskInstanceById(User loginUser, long projectCode, Long taskInstanceId);

/**
* remove task instance cache
* @param loginUser
* @param projectCode
* @param taskInstanceId
* @return
*/
TaskInstanceRemoveCacheResponse removeTaskInstanceCache(User loginUser, long projectCode, Integer taskInstanceId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package org.apache.dolphinscheduler.api.service.impl;

import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.FORCED_SUCCESS;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_INSTANCE;

import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant;
import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.service.ProjectService;
Expand All @@ -38,13 +41,16 @@
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskSavePointRequestCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.process.ProcessService;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -81,6 +87,9 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
@Autowired
TaskInstanceMapper taskInstanceMapper;

@Autowired
TaskInstanceDao taskInstanceDao;

@Autowired
ProcessInstanceService processInstanceService;

Expand Down Expand Up @@ -319,4 +328,39 @@ public TaskInstance queryTaskInstanceById(User loginUser, long projectCode, Long
}
return taskInstance;
}

@Override
public TaskInstanceRemoveCacheResponse removeTaskInstanceCache(User loginUser, long projectCode,
Integer taskInstanceId) {
Result result = new Result();

Project project = projectMapper.queryByCode(projectCode);
projectService.checkProjectAndAuthThrowException(loginUser, project,
ApiFuncIdentificationConstant.map.get(INSTANCE_UPDATE));
Fixed Show fixed Hide fixed

TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstanceId);
if (taskInstance == null) {
logger.error("Task definition can not be found, projectCode:{}, taskInstanceId:{}.", projectCode,
taskInstanceId);
putMsg(result, Status.TASK_INSTANCE_NOT_FOUND);
return new TaskInstanceRemoveCacheResponse(result);
}
String tagCacheKey = taskInstance.getCacheKey();
String cacheKey = TaskCacheUtils.revertCacheKey(tagCacheKey);
List<Integer> cacheTaskInstanceIds = new ArrayList<>();
while (true) {
TaskInstance cacheTaskInstance = taskInstanceDao.findTaskInstanceByCacheKey(cacheKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's not a loop action.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be one cacheKey for multiple pieces of data. For example, two cache tasks will same cache key run almost simultaneously.

if (cacheTaskInstance == null) {
break;
}
cacheTaskInstance.setCacheKey(null);
boolean r = taskInstanceDao.updateTaskInstance(cacheTaskInstance);
logger.info("remove task instance cache, taskInstanceId:{}, cacheKey:{}, result:{}",
cacheTaskInstance.getId(), cacheKey, r);
cacheTaskInstanceIds.add(cacheTaskInstance.getId());
}
putMsg(result, Status.SUCCESS);
return new TaskInstanceRemoveCacheResponse(result, cacheTaskInstanceIds, cacheKey);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.apache.dolphinscheduler.common.enums;

import com.baomidou.mybatisplus.annotation.EnumValue;

public enum IsCache {
jieguangzhou marked this conversation as resolved.
Show resolved Hide resolved

/**
* 0 no
* 1 yes
*/
NO(0, "no"),
YES(1, "yes");

IsCache(int code, String descp) {
this.code = code;
this.descp = descp;
}

@EnumValue
private final int code;
private final String descp;

public int getCode() {
return code;
}

public String getDescp() {
return descp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,7 @@ public enum TaskEventType {
DELAY,
RUNNING,
RESULT,
WORKER_REJECT
WORKER_REJECT,

CACHE,
jieguangzhou marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.IsCache;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
Expand Down Expand Up @@ -115,6 +116,11 @@ public class TaskDefinition {
*/
private Flag flag;

/**
* task is cache: yes/no
*/
private IsCache isCache;
jieguangzhou marked this conversation as resolved.
Show resolved Hide resolved

/**
* task priority
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.IsCache;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
Expand All @@ -40,6 +41,7 @@

import lombok.Data;

import com.baomidou.mybatisplus.annotation.FieldStrategy;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
Expand Down Expand Up @@ -187,6 +189,19 @@ public class TaskInstance implements Serializable {
*/
private Flag flag;

/**
* task is cache: yes/no
*/
private IsCache isCache;
jieguangzhou marked this conversation as resolved.
Show resolved Hide resolved

/**
* task is cache: yes/no
*/
jieguangzhou marked this conversation as resolved.
Show resolved Hide resolved
@TableField(updateStrategy = FieldStrategy.IGNORED)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add the ignored update strategy for it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value of cache_key is null, if we remove the cache_key, we have to set it to null. So we have to ignore the update strategy, if not, the null will not be updated, that means cache data will not be removed.

private String cacheKey;

@TableField(exist = false)
private String tmpCacheKey;
jieguangzhou marked this conversation as resolved.
Show resolved Hide resolved
/**
* dependency
*/
Expand Down Expand Up @@ -409,4 +424,5 @@ public boolean retryTaskIntervalOverTime() {
// task retry does not over time, return false
return getRetryInterval() * SEC_2_MINUTES_TIME_UNIT < failedTimeInterval;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ TaskInstance queryByInstanceIdAndName(@Param("processInstanceId") int processIns
TaskInstance queryByInstanceIdAndCode(@Param("processInstanceId") int processInstanceId,
@Param("taskCode") Long taskCode);

TaskInstance queryByCacheKey(@Param("cacheKey") String cacheKey);

List<TaskInstance> queryByProcessInstanceIdsAndTaskCodes(@Param("processInstanceIds") List<Integer> processInstanceIds,
@Param("taskCodes") List<Long> taskCodes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ public interface TaskInstanceDao {
*/
TaskInstance findTaskInstanceById(Integer taskId);

/**
* find task instance by id
* @param cacheKey cache key
* @return task instance
*/
TaskInstance findTaskInstanceByCacheKey(String cacheKey);

/**
* find task instance list by id list
* @param idList task id list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.Date;
Expand Down Expand Up @@ -164,6 +165,14 @@ public TaskInstance findTaskInstanceById(Integer taskId) {
return taskInstanceMapper.selectById(taskId);
}

@Override
public TaskInstance findTaskInstanceByCacheKey(String cacheKey) {
if (StringUtils.isEmpty(cacheKey)) {
return null;
}
return taskInstanceMapper.queryByCacheKey(cacheKey);
}

@Override
public List<TaskInstance> findTaskInstanceByIdList(List<Integer> idList) {
if (CollectionUtils.isEmpty(idList)) {
Expand Down
Loading