Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Master] Add task caching mechanism to improve the running speed of repetitive tasks #13194

Merged
merged 22 commits into from
Dec 18, 2022

Conversation

jieguangzhou
Copy link
Member

@jieguangzhou jieguangzhou commented Dec 14, 2022

Purpose of the pull request

close #13133

Like this :Flyter Caching

In machine learning workflow, if some tasks will be caching, the workflows will be executed faster

How to determine whether a task has been cached when the cache is executed, that is, how to determine whether a task can use the running result of another task?

For the task identified as Cache Execution, when the task starts, a cache key will be generated, and the key is composed of the following fields and hashed:

  • task definition: the id of the task definition corresponding to the task instance
  • task version: the version of the task definition corresponding to the task instance
  • task input parameters: including the parameters passed in by the upstream node and the global parameter, the parameters referenced by the parameter list of the task definition and the parameters used by the task definition using ${}
  • environment configuration: the actual configuration content of the environment configuration under the environment name, that is, the actual configuration content in the security - environment management

If the task with cache identification runs, it will find whether there is data with the same cache key in the database,

  • If there is, copy the task instance and update the corresponding data
  • If not, the task runs as usual, and the task instance data is stored in the cache when the task is completed

If you do not need to cache, you can right-click the node to run Clear cache in the workflow instance to clear the cache, which will clear the cache data of the current input parameters under this version.

Brief change log

Front end

  • Add useCache flag to all task plugin except the logical component

image

  • Add handleRemoveTaskInstanceCache method to task instance clear cache

image

Backend

  • Add removeTaskInstanceCache API to clear task instance cache
  • Add TaskCacheUtils to manage the cache key
  • Add checkIsCacheExecution before dispatcher.dispatch task to worker
  • Add TaskCacheEventHandler to the handle cache task instead dispatch task
  • Add saveCacheTaskInstance after the cache task successfully run

Database

  • add is_cache into table t_ds_task_definitiont_ds_task_definition_logt_ds_task_instance
  • add cache_key to t_ds_task_instance

Doc

  • Add Cache Execution parameters introduction
  • Add faq about task cache

Verify this pull request

This pull request is code cleanup without any test coverage.

(or)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(or)

If your pull request contain incompatible change, you should also add it to docs/docs/en/guide/upgrede/incompatible.md

@github-actions github-actions bot added backend UI ui and front end related labels Dec 14, 2022
@jieguangzhou jieguangzhou self-assigned this Dec 14, 2022
@jieguangzhou jieguangzhou added feature new feature 3.2.0 for 3.2.0 version labels Dec 14, 2022
@jieguangzhou jieguangzhou added this to the 3.2.0 milestone Dec 14, 2022
@jieguangzhou jieguangzhou changed the title Supports task instance cache operation [Feature][Master] Add task caching mechanism to improve the running speed of repetitive tasks Dec 14, 2022
@codecov-commenter
Copy link

codecov-commenter commented Dec 15, 2022

Codecov Report

Merging #13194 (e7ac78e) into dev (43e6ac3) will increase coverage by 0.09%.
The diff coverage is 65.64%.

@@             Coverage Diff              @@
##                dev   #13194      +/-   ##
============================================
+ Coverage     39.37%   39.46%   +0.09%     
- Complexity     4278     4294      +16     
============================================
  Files          1066     1069       +3     
  Lines         40479    40641     +162     
  Branches       4657     4673      +16     
============================================
+ Hits          15937    16040     +103     
- Misses        22755    22812      +57     
- Partials       1787     1789       +2     
Impacted Files Coverage Δ
...heduler/api/controller/TaskInstanceController.java 45.45% <0.00%> (-4.55%) ⬇️
...org/apache/dolphinscheduler/common/enums/Flag.java 0.00% <ø> (ø)
...e/dolphinscheduler/common/enums/TaskEventType.java 0.00% <0.00%> (ø)
...he/dolphinscheduler/dao/entity/TaskDefinition.java 31.57% <ø> (ø)
...dolphinscheduler/dao/entity/TaskDefinitionLog.java 19.04% <0.00%> (-0.47%) ⬇️
...ache/dolphinscheduler/dao/entity/TaskInstance.java 45.45% <ø> (ø)
...duler/dao/repository/impl/TaskInstanceDaoImpl.java 3.17% <0.00%> (-0.16%) ⬇️
...ver/master/consumer/TaskPriorityQueueConsumer.java 0.00% <0.00%> (ø)
...duler/server/master/processor/queue/TaskEvent.java 60.00% <0.00%> (-9.24%) ⬇️
.../server/master/runner/WorkflowExecuteRunnable.java 10.31% <0.00%> (-0.12%) ⬇️
... and 30 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Comment on lines 349 to 350
while (true) {
TaskInstance cacheTaskInstance = taskInstanceDao.findTaskInstanceByCacheKey(cacheKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's not a loop action.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be one cacheKey for multiple pieces of data. For example, two cache tasks will same cache key run almost simultaneously.

if (tagCacheKey.contains(MERGE_TAG)) {
String[] split = tagCacheKey.split(MERGE_TAG);
if (split.length == 2) {
taskIdAndCacheKey = Pair.of(Integer.parseInt(split[0]), split[1]);

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException

Potential uncaught 'java.lang.NumberFormatException'.
@jieguangzhou jieguangzhou marked this pull request as ready for review December 15, 2022 13:32
@jieguangzhou
Copy link
Member Author

@Amy0104 @songjianet please help to review the front-end code

Amy0104
Amy0104 previously approved these changes Dec 16, 2022
Copy link
Member

@Amy0104 Amy0104 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The front end side LGTM.

zhongjiajie
zhongjiajie previously approved these changes Dec 16, 2022
Copy link
Contributor

@caishunfeng caishunfeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 👍

@sonarqubecloud
Copy link

SonarCloud Quality Gate failed.    Quality Gate failed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 11 Code Smells

59.6% 59.6% Coverage
2.5% 2.5% Duplication

@jieguangzhou
Copy link
Member Author

@caishunfeng @ruanwenjun please PTAL, thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
3.2.0 for 3.2.0 version backend document feature new feature UI ui and front end related
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature][Master] Add task caching mechanism to improve the running speed of repetitive tasks
6 participants