From 7f56f96c2652881246f9943d84faaaae5728b905 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Tue, 5 Dec 2023 13:56:00 +0800 Subject: [PATCH] Move delay calculation to Master --- .../event/TaskTimeoutStateEventHandler.java | 2 +- ...TaskInstanceDispatchOperationFunction.java | 43 +++-------- ...gicITaskInstanceKillOperationFunction.java | 22 +++--- ...icITaskInstancePauseOperationFunction.java | 14 ++-- .../master/runner/BaseTaskDispatcher.java | 1 - .../runner/BaseTaskExecuteRunnable.java | 55 ++++++++++++++ .../DefaultTaskExecuteRunnable.java | 4 +- ...GlobalMasterTaskExecuteRunnableQueue.java} | 26 +++---- ...MasterTaskExecuteRunnableQueueLooper.java} | 24 +++--- .../GlobalTaskDispatchWaitingQueue.java | 6 +- .../GlobalTaskDispatchWaitingQueueLooper.java | 1 - .../runner/MasterTaskExecutorBootstrap.java | 8 +- ... => PriorityDelayTaskExecuteRunnable.java} | 73 +++++++------------ .../runner/StreamTaskExecuteRunnable.java | 2 - .../{execute => }/TaskExecuteRunnable.java | 4 +- .../TaskExecuteRunnableFactory.java | 7 +- .../TaskExecutionContextFactory.java | 2 +- .../runner/WorkflowExecuteRunnable.java | 2 - .../dispatcher/MasterTaskDispatcher.java | 2 +- .../runner/dispatcher/TaskDispatcher.java | 2 +- .../dispatcher/WorkerTaskDispatcher.java | 2 +- ...able.java => AsyncMasterTaskExecutor.java} | 10 +-- ...va => AsyncMasterTaskExecutorFactory.java} | 8 +- .../AsyncTaskCallbackFunctionImpl.java | 22 +++--- .../DefaultTaskExecuteRunnableFactory.java | 3 + .../MasterDelayTaskExecuteRunnable.java | 68 ----------------- ...eRunnable.java => MasterTaskExecutor.java} | 10 +-- ...ry.java => MasterTaskExecutorFactory.java} | 4 +- ... => MasterTaskExecutorFactoryBuilder.java} | 15 ++-- ...der.java => MasterTaskExecutorHolder.java} | 14 ++-- ...java => MasterTaskExecutorThreadPool.java} | 6 +- ...nable.java => SyncMasterTaskExecutor.java} | 10 +-- ...ava => SyncMasterTaskExecutorFactory.java} | 8 +- ...seTaskExecuteRunnableDispatchOperator.java | 26 ++++++- .../BaseTaskExecuteRunnableKillOperator.java | 2 +- .../BaseTaskExecuteRunnablePauseOperator.java | 2 +- ...aseTaskExecuteRunnableTimeoutOperator.java | 2 +- ...icTaskExecuteRunnableDispatchOperator.java | 6 +- .../TaskExecuteRunnableDispatchOperator.java | 14 ++-- .../operator/TaskExecuteRunnableOperator.java | 2 +- .../TaskExecuteRunnableOperatorManager.java | 2 +- .../TaskExecuteRunnablePauseOperator.java | 2 +- .../runner/WorkflowExecuteRunnableTest.java | 1 - .../dispatcher/MasterTaskDispatcherTest.java | 2 +- .../dispatcher/WorkerTaskDispatcherTest.java | 2 +- ...PriorityDelayTaskExecuteRunnableTest.java} | 8 +- .../BaseLinuxShellInterceptorBuilder.java | 7 +- .../server/worker/WorkerServer.java | 6 +- .../registry/WorkerWaitingStrategy.java | 6 +- .../StreamingTaskInstanceOperatorImpl.java | 8 +- ...WorkerDelayTaskExecuteRunnableFactory.java | 59 --------------- ...le.java => DefaultWorkerTaskExecutor.java} | 14 ++-- ... => DefaultWorkerTaskExecutorFactory.java} | 44 ++++++----- ...va => GlobalTaskInstanceWaitingQueue.java} | 4 +- ...GlobalTaskInstanceWaitingQueueLooper.java} | 34 ++------- .../WorkerDelayTaskExecuteRunnable.java | 69 ------------------ .../worker/runner/WorkerExecService.java | 14 +--- .../worker/runner/WorkerManagerThread.java | 23 +++--- ...eRunnable.java => WorkerTaskExecutor.java} | 18 ++--- ...ry.java => WorkerTaskExecutorFactory.java} | 4 +- ... => WorkerTaskExecutorFactoryBuilder.java} | 16 ++-- ...TaskInstanceDispatchOperationFunction.java | 6 +- .../TaskInstanceKillOperationFunction.java | 8 +- ...ava => DefaultWorkerTaskExecutorTest.java} | 10 +-- 64 files changed, 363 insertions(+), 538 deletions(-) create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskExecuteRunnable.java rename dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/{execute => }/DefaultTaskExecuteRunnable.java (94%) rename dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/{MasterDelayTaskExecuteRunnableDelayQueue.java => GlobalMasterTaskExecuteRunnableQueue.java} (54%) rename dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/{MasterDelayTaskExecuteRunnableDelayQueueLooper.java => GlobalMasterTaskExecuteRunnableQueueLooper.java} (74%) rename dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/{execute/PriorityTaskExecuteRunnable.java => PriorityDelayTaskExecuteRunnable.java} (57%) rename dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/{execute => }/TaskExecuteRunnable.java (90%) rename dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/{execute => }/TaskExecuteRunnableFactory.java (89%) rename dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/{execute => }/TaskExecutionContextFactory.java (99%) rename dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/{AsyncMasterDelayTaskExecuteRunnable.java => AsyncMasterTaskExecutor.java} (82%) rename dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/{AsyncMasterDelayTaskExecuteRunnableFactory.java => AsyncMasterTaskExecutorFactory.java} (83%) delete mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterDelayTaskExecuteRunnable.java rename dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/{MasterTaskExecuteRunnable.java => MasterTaskExecutor.java} (94%) rename dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/{MasterDelayTaskExecuteRunnableFactory.java => MasterTaskExecutorFactory.java} (83%) rename dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/{MasterTaskExecuteRunnableFactoryBuilder.java => MasterTaskExecutorFactoryBuilder.java} (75%) rename dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/{MasterTaskExecuteRunnableHolder.java => MasterTaskExecutorHolder.java} (66%) rename dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/{MasterTaskExecuteRunnableThreadPool.java => MasterTaskExecutorThreadPool.java} (89%) rename dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/{SyncMasterDelayTaskExecuteRunnable.java => SyncMasterTaskExecutor.java} (82%) rename dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/{SyncMasterDelayTaskExecuteRunnableFactory.java => SyncMasterTaskExecutorFactory.java} (81%) rename dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/{PriorityTaskExecuteRunnableTest.java => PriorityDelayTaskExecuteRunnableTest.java} (89%) delete mode 100644 dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java rename dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/{DefaultWorkerDelayTaskExecuteRunnable.java => DefaultWorkerTaskExecutor.java} (76%) rename dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/{WorkerDelayTaskExecuteRunnableFactory.java => DefaultWorkerTaskExecutorFactory.java} (55%) rename dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/{GlobalTaskInstanceDispatchQueue.java => GlobalTaskInstanceWaitingQueue.java} (95%) rename dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/{GlobalTaskInstanceDispatchQueueLooper.java => GlobalTaskInstanceWaitingQueueLooper.java} (66%) delete mode 100644 dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java rename dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/{WorkerTaskExecuteRunnable.java => WorkerTaskExecutor.java} (95%) rename dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/{WorkerTaskExecuteRunnableFactory.java => WorkerTaskExecutorFactory.java} (90%) rename dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/{WorkerTaskExecuteRunnableFactoryBuilder.java => WorkerTaskExecutorFactoryBuilder.java} (72%) rename dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/{DefaultWorkerDelayTaskExecuteRunnableTest.java => DefaultWorkerTaskExecutorTest.java} (91%) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java index 16476ad7f73ad..c04eb9c338bbc 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java @@ -22,8 +22,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; +import org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; -import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable; import java.util.Map; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java index c60e860aa7354..f69ff4fb77eb0 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java @@ -17,18 +17,14 @@ package org.apache.dolphinscheduler.server.master.rpc; -import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskDispatchRequest; import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskDispatchResponse; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; -import org.apache.dolphinscheduler.server.master.runner.MasterDelayTaskExecuteRunnableDelayQueue; -import org.apache.dolphinscheduler.server.master.runner.execute.MasterDelayTaskExecuteRunnable; -import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableFactoryBuilder; +import org.apache.dolphinscheduler.server.master.runner.GlobalMasterTaskExecuteRunnableQueue; import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder; - -import java.util.concurrent.TimeUnit; +import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor; +import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorFactoryBuilder; import lombok.extern.slf4j.Slf4j; @@ -42,10 +38,10 @@ public class LogicITaskInstanceDispatchOperationFunction ITaskInstanceOperationFunction { @Autowired - private MasterTaskExecuteRunnableFactoryBuilder masterTaskExecuteRunnableFactoryBuilder; + private MasterTaskExecutorFactoryBuilder masterTaskExecutorFactoryBuilder; @Autowired - private MasterDelayTaskExecuteRunnableDelayQueue masterDelayTaskExecuteRunnableDelayQueue; + private GlobalMasterTaskExecuteRunnableQueue globalMasterTaskExecuteRunnableQueue; @Override public LogicTaskDispatchResponse operate(LogicTaskDispatchRequest taskDispatchRequest) { @@ -63,34 +59,17 @@ public LogicTaskDispatchResponse operate(LogicTaskDispatchRequest taskDispatchRe MasterTaskExecutionContextHolder.putTaskExecutionContext(taskExecutionContext); - int delayTime = taskExecutionContext.getDelayTime(); - if (delayTime > 0) { - // todo: calculate the delay in master dispatcher then we don't need to use a queue to store the task - final long remainTime = - DateUtils.getRemainTime(DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()), - TimeUnit.SECONDS.toMillis(delayTime)); - if (remainTime > 0) { - log.info( - "Current taskInstance: {} is choosing delay execution, delay time: {}/ms, remainTime: {}/ms", - taskExecutionContext.getTaskName(), - TimeUnit.SECONDS.toMillis(taskExecutionContext.getDelayTime()), remainTime); - taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION); - // todo: send delay execution message - return LogicTaskDispatchResponse.success(taskExecutionContext.getTaskInstanceId()); - } - } - final MasterDelayTaskExecuteRunnable masterDelayTaskExecuteRunnable = - masterTaskExecuteRunnableFactoryBuilder - .createWorkerDelayTaskExecuteRunnableFactory(taskExecutionContext.getTaskType()) - .createWorkerTaskExecuteRunnable(taskExecutionContext); - if (masterDelayTaskExecuteRunnableDelayQueue - .submitMasterDelayTaskExecuteRunnable(masterDelayTaskExecuteRunnable)) { + MasterTaskExecutor masterTaskExecutor = masterTaskExecutorFactoryBuilder + .createMasterTaskExecutorFactory(taskExecutionContext.getTaskType()) + .createMasterTaskExecutor(taskExecutionContext); + if (globalMasterTaskExecuteRunnableQueue + .submitMasterTaskExecuteRunnable(masterTaskExecutor)) { log.info("Submit LogicTask: {} to MasterDelayTaskExecuteRunnableDelayQueue success", taskInstanceName); return LogicTaskDispatchResponse.success(taskInstanceId); } else { log.error( "Submit LogicTask: {} to MasterDelayTaskExecuteRunnableDelayQueue failed, current task waiting queue size: {} is full", - taskInstanceName, masterDelayTaskExecuteRunnableDelayQueue.size()); + taskInstanceName, globalMasterTaskExecuteRunnableQueue.size()); return LogicTaskDispatchResponse.failed(taskInstanceId, "MasterDelayTaskExecuteRunnableDelayQueue is full"); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java index 6b43a7690a891..27fc52333d365 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java @@ -21,10 +21,10 @@ import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillResponse; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException; -import org.apache.dolphinscheduler.server.master.runner.MasterDelayTaskExecuteRunnableDelayQueue; -import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable; -import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder; +import org.apache.dolphinscheduler.server.master.runner.GlobalMasterTaskExecuteRunnableQueue; import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder; +import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor; +import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorHolder; import lombok.extern.slf4j.Slf4j; @@ -38,7 +38,7 @@ public class LogicITaskInstanceKillOperationFunction ITaskInstanceOperationFunction { @Autowired - private MasterDelayTaskExecuteRunnableDelayQueue masterDelayTaskExecuteRunnableDelayQueue; + private GlobalMasterTaskExecuteRunnableQueue globalMasterTaskExecuteRunnableQueue; @Override public LogicTaskKillResponse operate(LogicTaskKillRequest taskKillRequest) { @@ -46,16 +46,16 @@ public LogicTaskKillResponse operate(LogicTaskKillRequest taskKillRequest) { try { LogUtils.setTaskInstanceIdMDC(taskKillRequest.getTaskInstanceId()); log.info("Received killLogicTask request: {}", taskKillRequest); - final MasterTaskExecuteRunnable masterTaskExecuteRunnable = - MasterTaskExecuteRunnableHolder.getMasterTaskExecuteRunnable(taskInstanceId); - if (masterTaskExecuteRunnable == null) { + final MasterTaskExecutor masterTaskExecutor = + MasterTaskExecutorHolder.getMasterTaskExecutor(taskInstanceId); + if (masterTaskExecutor == null) { log.error("Cannot find the MasterTaskExecuteRunnable, this task may already been killed"); return LogicTaskKillResponse.fail("Cannot find the MasterTaskExecuteRunnable"); } try { - masterTaskExecuteRunnable.cancelTask(); - masterDelayTaskExecuteRunnableDelayQueue - .removeMasterDelayTaskExecuteRunnable(masterTaskExecuteRunnable); + masterTaskExecutor.cancelTask(); + globalMasterTaskExecuteRunnableQueue + .removeMasterTaskExecuteRunnable(masterTaskExecutor); return LogicTaskKillResponse.success(); } catch (MasterTaskExecuteException e) { log.error("Cancel MasterTaskExecuteRunnable failed ", e); @@ -63,7 +63,7 @@ public LogicTaskKillResponse operate(LogicTaskKillRequest taskKillRequest) { } finally { // todo: If cancel failed, we cannot remove the context? MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskInstanceId); - MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskInstanceId); + MasterTaskExecutorHolder.removeMasterTaskExecutor(taskInstanceId); } } finally { LogUtils.removeTaskInstanceIdMDC(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstancePauseOperationFunction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstancePauseOperationFunction.java index 912193b91e9f4..a95d2fc667b85 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstancePauseOperationFunction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstancePauseOperationFunction.java @@ -22,8 +22,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException; -import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable; -import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder; +import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor; +import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorHolder; import lombok.extern.slf4j.Slf4j; @@ -39,16 +39,16 @@ public class LogicITaskInstancePauseOperationFunction public LogicTaskPauseResponse operate(LogicTaskPauseRequest taskPauseRequest) { try { LogUtils.setTaskInstanceIdMDC(taskPauseRequest.getTaskInstanceId()); - final MasterTaskExecuteRunnable masterTaskExecuteRunnable = - MasterTaskExecuteRunnableHolder.getMasterTaskExecuteRunnable(taskPauseRequest.getTaskInstanceId()); - if (masterTaskExecuteRunnable == null) { + final MasterTaskExecutor masterTaskExecutor = + MasterTaskExecutorHolder.getMasterTaskExecutor(taskPauseRequest.getTaskInstanceId()); + if (masterTaskExecutor == null) { log.info("Cannot find the MasterTaskExecuteRunnable"); return LogicTaskPauseResponse.fail("Cannot find the MasterTaskExecuteRunnable"); } - final TaskExecutionContext taskExecutionContext = masterTaskExecuteRunnable.getTaskExecutionContext(); + final TaskExecutionContext taskExecutionContext = masterTaskExecutor.getTaskExecutionContext(); LogUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); - masterTaskExecuteRunnable.pauseTask(); + masterTaskExecutor.pauseTask(); return LogicTaskPauseResponse.success(); } catch (MasterTaskExecuteException e) { log.error("Pause MasterTaskExecuteRunnable failed", e); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskDispatcher.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskDispatcher.java index 0ad515e93a036..30ab8fadec195 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskDispatcher.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskDispatcher.java @@ -30,7 +30,6 @@ import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService; import org.apache.dolphinscheduler.server.master.runner.dispatcher.TaskDispatcher; -import org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable; import java.util.Date; import java.util.Optional; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskExecuteRunnable.java new file mode 100644 index 0000000000000..fefdbf3493848 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskExecuteRunnable.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; + +public abstract class BaseTaskExecuteRunnable implements TaskExecuteRunnable { + + protected final ProcessInstance workflowInstance; + protected final TaskInstance taskInstance; + protected final TaskExecutionContext taskExecutionContext; + + public BaseTaskExecuteRunnable(ProcessInstance workflowInstance, + TaskInstance taskInstance, + TaskExecutionContext taskExecutionContext) { + this.taskInstance = checkNotNull(taskInstance); + this.workflowInstance = checkNotNull(workflowInstance); + this.taskExecutionContext = checkNotNull(taskExecutionContext); + } + + @Override + public ProcessInstance getWorkflowInstance() { + return workflowInstance; + } + + @Override + public TaskInstance getTaskInstance() { + return taskInstance; + } + + @Override + public TaskExecutionContext getTaskExecutionContext() { + return taskExecutionContext; + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java similarity index 94% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnable.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java index 6f736139b6bca..c1b13717bdd31 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.master.runner.execute; +package org.apache.dolphinscheduler.server.master.runner; import static com.google.common.base.Preconditions.checkNotNull; @@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.server.master.runner.operator.TaskExecuteRunnableOperatorManager; -public class DefaultTaskExecuteRunnable extends PriorityTaskExecuteRunnable { +public class DefaultTaskExecuteRunnable extends PriorityDelayTaskExecuteRunnable { private final TaskExecuteRunnableOperatorManager taskExecuteRunnableOperatorManager; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueue.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueue.java similarity index 54% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueue.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueue.java index bdd1510527958..1087acfcdfbf5 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueue.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueue.java @@ -17,10 +17,10 @@ package org.apache.dolphinscheduler.server.master.runner; -import org.apache.dolphinscheduler.server.master.runner.execute.MasterDelayTaskExecuteRunnable; -import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor; -import java.util.concurrent.DelayQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import org.springframework.stereotype.Component; @@ -28,27 +28,27 @@ * */ @Component -public class MasterDelayTaskExecuteRunnableDelayQueue { +public class GlobalMasterTaskExecuteRunnableQueue { - private final DelayQueue masterDelayTaskExecuteRunnableDelayQueue = - new DelayQueue<>(); + private final BlockingQueue masterTaskExecutorBlockingQueue = + new LinkedBlockingQueue<>(); - public boolean submitMasterDelayTaskExecuteRunnable(MasterDelayTaskExecuteRunnable masterDelayTaskExecuteRunnable) { - return masterDelayTaskExecuteRunnableDelayQueue.offer(masterDelayTaskExecuteRunnable); + public boolean submitMasterTaskExecuteRunnable(MasterTaskExecutor masterTaskExecutor) { + return masterTaskExecutorBlockingQueue.offer(masterTaskExecutor); } - public MasterDelayTaskExecuteRunnable takeMasterDelayTaskExecuteRunnable() throws InterruptedException { - return masterDelayTaskExecuteRunnableDelayQueue.take(); + public MasterTaskExecutor takeMasterTaskExecuteRunnable() throws InterruptedException { + return masterTaskExecutorBlockingQueue.take(); } // todo: if we move the delay process to master, than we don't need this method, since dispatchProcess can directly // submit to thread pool - public boolean removeMasterDelayTaskExecuteRunnable(MasterTaskExecuteRunnable masterTaskExecuteRunnable) { - return masterDelayTaskExecuteRunnableDelayQueue.remove(masterTaskExecuteRunnable); + public boolean removeMasterTaskExecuteRunnable(MasterTaskExecutor masterTaskExecutor) { + return masterTaskExecutorBlockingQueue.remove(masterTaskExecutor); } public int size() { - return masterDelayTaskExecuteRunnableDelayQueue.size(); + return masterTaskExecutorBlockingQueue.size(); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueueLooper.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueueLooper.java similarity index 74% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueueLooper.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueueLooper.java index 557f2ca447cfd..e0b1f80704d14 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueueLooper.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueueLooper.java @@ -18,9 +18,9 @@ package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; -import org.apache.dolphinscheduler.server.master.runner.execute.MasterDelayTaskExecuteRunnable; -import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder; -import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableThreadPool; +import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor; +import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorHolder; +import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorThreadPool; import java.util.concurrent.atomic.AtomicBoolean; @@ -31,17 +31,17 @@ @Slf4j @Component -public class MasterDelayTaskExecuteRunnableDelayQueueLooper extends BaseDaemonThread implements AutoCloseable { +public class GlobalMasterTaskExecuteRunnableQueueLooper extends BaseDaemonThread implements AutoCloseable { @Autowired - private MasterDelayTaskExecuteRunnableDelayQueue masterDelayTaskExecuteRunnableDelayQueue; + private GlobalMasterTaskExecuteRunnableQueue globalMasterTaskExecuteRunnableQueue; @Autowired - private MasterTaskExecuteRunnableThreadPool masterTaskExecuteRunnableThreadPool; + private MasterTaskExecutorThreadPool masterTaskExecutorThreadPool; private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false); - public MasterDelayTaskExecuteRunnableDelayQueueLooper() { + public GlobalMasterTaskExecuteRunnableQueueLooper() { super("MasterDelayTaskExecuteRunnableDelayQueueLooper"); } @@ -53,7 +53,7 @@ public synchronized void start() { } log.info("MasterDelayTaskExecuteRunnableDelayQueueLooper starting..."); super.start(); - masterTaskExecuteRunnableThreadPool.start(); + masterTaskExecutorThreadPool.start(); log.info("MasterDelayTaskExecuteRunnableDelayQueueLooper started..."); } @@ -61,10 +61,10 @@ public synchronized void start() { public void run() { while (RUNNING_FLAG.get()) { try { - final MasterDelayTaskExecuteRunnable masterDelayTaskExecuteRunnable = - masterDelayTaskExecuteRunnableDelayQueue.takeMasterDelayTaskExecuteRunnable(); - masterTaskExecuteRunnableThreadPool.submitMasterTaskExecuteRunnable(masterDelayTaskExecuteRunnable); - MasterTaskExecuteRunnableHolder.putMasterTaskExecuteRunnable(masterDelayTaskExecuteRunnable); + final MasterTaskExecutor masterTaskExecutor = + globalMasterTaskExecuteRunnableQueue.takeMasterTaskExecuteRunnable(); + masterTaskExecutorThreadPool.submitMasterTaskExecutor(masterTaskExecutor); + MasterTaskExecutorHolder.putMasterTaskExecuteRunnable(masterTaskExecutor); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); log.warn("MasterDelayTaskExecuteRunnableDelayQueueLooper has been interrupted, will stop loop"); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java index a8f8f884986b6..f4d50537c6600 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java @@ -17,9 +17,7 @@ package org.apache.dolphinscheduler.server.master.runner; -import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable; - -import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.DelayQueue; import lombok.extern.slf4j.Slf4j; @@ -29,7 +27,7 @@ @Component public class GlobalTaskDispatchWaitingQueue { - private final PriorityBlockingQueue queue = new PriorityBlockingQueue<>(); + private final DelayQueue queue = new DelayQueue<>(); public void submitNeedToDispatchTaskExecuteRunnable(DefaultTaskExecuteRunnable priorityTaskExecuteRunnable) { queue.put(priorityTaskExecuteRunnable); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java index c54aa5aff8949..b496bea5a5ad1 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java @@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.server.master.runner.dispatcher.TaskDispatchFactory; import org.apache.dolphinscheduler.server.master.runner.dispatcher.TaskDispatcher; -import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java index 744e560c007b8..3e99d2141cf69 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java @@ -32,7 +32,7 @@ public class MasterTaskExecutorBootstrap implements AutoCloseable { private GlobalTaskDispatchWaitingQueueLooper globalTaskDispatchWaitingQueueLooper; @Autowired - private MasterDelayTaskExecuteRunnableDelayQueueLooper masterDelayTaskExecuteRunnableDelayQueueLooper; + private GlobalMasterTaskExecuteRunnableQueueLooper globalMasterTaskExecuteRunnableQueueLooper; @Autowired private AsyncMasterTaskDelayQueueLooper asyncMasterTaskDelayQueueLooper; @@ -40,7 +40,7 @@ public class MasterTaskExecutorBootstrap implements AutoCloseable { public synchronized void start() { log.info("MasterTaskExecutorBootstrap starting..."); globalTaskDispatchWaitingQueueLooper.start(); - masterDelayTaskExecuteRunnableDelayQueueLooper.start(); + globalMasterTaskExecuteRunnableQueueLooper.start(); asyncMasterTaskDelayQueueLooper.start(); log.info("MasterTaskExecutorBootstrap started..."); } @@ -51,8 +51,8 @@ public void close() throws Exception { try ( final GlobalTaskDispatchWaitingQueueLooper globalTaskDispatchWaitingQueueLooper1 = globalTaskDispatchWaitingQueueLooper; - final MasterDelayTaskExecuteRunnableDelayQueueLooper masterDelayTaskExecuteRunnableDelayQueueLooper1 = - masterDelayTaskExecuteRunnableDelayQueueLooper; + final GlobalMasterTaskExecuteRunnableQueueLooper globalMasterTaskExecuteRunnableQueueLooper1 = + globalMasterTaskExecuteRunnableQueueLooper; final AsyncMasterTaskDelayQueueLooper asyncMasterTaskDelayQueueLooper1 = asyncMasterTaskDelayQueueLooper) { // closed the resource diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/PriorityDelayTaskExecuteRunnable.java similarity index 57% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnable.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/PriorityDelayTaskExecuteRunnable.java index 2e23feb7bd845..255ec6c8ac2c1 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/PriorityDelayTaskExecuteRunnable.java @@ -15,62 +15,57 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.master.runner.execute; - -import static com.google.common.base.Preconditions.checkNotNull; +package org.apache.dolphinscheduler.server.master.runner; +import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.jetbrains.annotations.NotNull; - -public abstract class PriorityTaskExecuteRunnable implements TaskExecuteRunnable, Comparable { - - private final ProcessInstance workflowInstance; - private final TaskInstance taskInstance; - private final TaskExecutionContext taskExecutionContext; - - public PriorityTaskExecuteRunnable(ProcessInstance workflowInstance, - TaskInstance taskInstance, - TaskExecutionContext taskExecutionContext) { - this.taskInstance = checkNotNull(taskInstance); - this.workflowInstance = checkNotNull(workflowInstance); - this.taskExecutionContext = checkNotNull(taskExecutionContext); - } +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; - @Override - public ProcessInstance getWorkflowInstance() { - return workflowInstance; - } +public abstract class PriorityDelayTaskExecuteRunnable extends BaseTaskExecuteRunnable implements Delayed { - @Override - public TaskInstance getTaskInstance() { - return taskInstance; + public PriorityDelayTaskExecuteRunnable(ProcessInstance workflowInstance, + TaskInstance taskInstance, + TaskExecutionContext taskExecutionContext) { + super(workflowInstance, taskInstance, taskExecutionContext); } @Override - public TaskExecutionContext getTaskExecutionContext() { - return taskExecutionContext; + public long getDelay(TimeUnit unit) { + return unit.convert( + DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), + taskExecutionContext.getDelayTime() * 60L), + TimeUnit.SECONDS); } @Override - public int compareTo(@NotNull TaskExecuteRunnable other) { + public int compareTo(Delayed o) { + if (o == null) { + return 1; + } + int delayTimeCompareResult = + Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); + if (delayTimeCompareResult != 0) { + return delayTimeCompareResult; + } + PriorityDelayTaskExecuteRunnable other = (PriorityDelayTaskExecuteRunnable) o; // the smaller dispatch fail times, the higher priority int dispatchFailTimesCompareResult = taskExecutionContext.getDispatchFailTimes() - other.getTaskExecutionContext().getDispatchFailTimes(); if (dispatchFailTimesCompareResult != 0) { return dispatchFailTimesCompareResult; } - int workflowInstancePriorityCompareResult = workflowInstance.getProcessInstancePriority().getCode() - other.getWorkflowInstance().getProcessInstancePriority().getCode(); if (workflowInstancePriorityCompareResult != 0) { return workflowInstancePriorityCompareResult; } - int workflowInstanceIdCompareResult = workflowInstance.getId() - other.getWorkflowInstance().getId(); + long workflowInstanceIdCompareResult = workflowInstance.getId().compareTo(other.getWorkflowInstance().getId()); if (workflowInstanceIdCompareResult != 0) { - return workflowInstanceIdCompareResult; + return workflowInstancePriorityCompareResult; } int taskInstancePriorityCompareResult = taskInstance.getTaskInstancePriority().getCode() - other.getTaskInstance().getTaskInstancePriority().getCode(); @@ -84,21 +79,7 @@ public int compareTo(@NotNull TaskExecuteRunnable other) { return -taskGroupPriorityCompareResult; } // The task instance shouldn't be equals - return taskInstance.getId() - other.getTaskInstance().getId(); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof PriorityTaskExecuteRunnable) { - PriorityTaskExecuteRunnable other = (PriorityTaskExecuteRunnable) obj; - return compareTo(other) == 0; - } - return false; - } - - @Override - public int hashCode() { - return taskInstance.getId(); + return taskInstance.getId().compareTo(other.getTaskInstance().getId()); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java index 24e524d171d15..ab6fc555a6a83 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java @@ -57,9 +57,7 @@ import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; import org.apache.dolphinscheduler.server.master.runner.dispatcher.WorkerTaskDispatcher; -import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory; -import org.apache.dolphinscheduler.server.master.runner.execute.TaskExecutionContextFactory; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.spi.enums.ResourceType; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java similarity index 90% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecuteRunnable.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java index 02980aff49fa1..8f661896171e7 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.master.runner.execute; +package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -25,7 +25,7 @@ * This interface is used to define a task which is executing. * todo: split to MasterTaskExecuteRunnable and WorkerTaskExecuteRunnable */ -public interface TaskExecuteRunnable extends Comparable { +public interface TaskExecuteRunnable { void dispatch(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecuteRunnableFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnableFactory.java similarity index 89% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecuteRunnableFactory.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnableFactory.java index e3afb2d6b358a..43ee971160a88 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecuteRunnableFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnableFactory.java @@ -15,11 +15,16 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.master.runner.execute; +package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.master.exception.TaskExecuteRunnableCreateException; +/** + * Use to create TaskExecuteRunnable + * + * @param TaskExecuteRunnable + */ public interface TaskExecuteRunnableFactory { T createTaskExecuteRunnable(TaskInstance taskInstance) throws TaskExecuteRunnableCreateException; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecutionContextFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecutionContextFactory.java similarity index 99% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecutionContextFactory.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecutionContextFactory.java index d8f72564bd136..3fb2a9c850f2c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecutionContextFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecutionContextFactory.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.master.runner.execute; +package org.apache.dolphinscheduler.server.master.runner; import static org.apache.dolphinscheduler.common.constants.Constants.ADDRESS; import static org.apache.dolphinscheduler.common.constants.Constants.DATABASE; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 00c1515648296..35ad1e8c82aed 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -80,9 +80,7 @@ import org.apache.dolphinscheduler.server.master.event.WorkflowStateEvent; import org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph; import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; -import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory; -import org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable; import org.apache.dolphinscheduler.server.master.utils.TaskUtils; import org.apache.dolphinscheduler.server.master.utils.WorkflowInstanceUtils; import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcher.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcher.java index d30a1e554d863..cca7112975539 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcher.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcher.java @@ -27,7 +27,7 @@ import org.apache.dolphinscheduler.server.master.exception.TaskDispatchException; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService; import org.apache.dolphinscheduler.server.master.runner.BaseTaskDispatcher; -import org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnable; import java.util.Optional; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatcher.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatcher.java index 32a195fb5afb0..f595d5a490f6b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatcher.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatcher.java @@ -19,7 +19,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException; import org.apache.dolphinscheduler.server.master.exception.TaskDispatchException; -import org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnable; /** * Used to do task dispatcher. diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcher.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcher.java index 36739a116394b..33200826947ba 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcher.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcher.java @@ -31,7 +31,7 @@ import org.apache.dolphinscheduler.server.master.exception.TaskDispatchException; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService; import org.apache.dolphinscheduler.server.master.runner.BaseTaskDispatcher; -import org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnable; import java.util.Optional; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterDelayTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskExecutor.java similarity index 82% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterDelayTaskExecuteRunnable.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskExecutor.java index 8a6e6d8e870e8..d289a318daa3b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterDelayTaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskExecutor.java @@ -26,14 +26,14 @@ import lombok.extern.slf4j.Slf4j; @Slf4j -public class AsyncMasterDelayTaskExecuteRunnable extends MasterDelayTaskExecuteRunnable { +public class AsyncMasterTaskExecutor extends MasterTaskExecutor { private final AsyncMasterTaskDelayQueue asyncMasterTaskDelayQueue; - public AsyncMasterDelayTaskExecuteRunnable(TaskExecutionContext taskExecutionContext, - LogicTaskPluginFactoryBuilder logicTaskPluginFactoryBuilder, - LogicTaskInstanceExecutionEventSenderManager logicTaskInstanceExecutionEventSenderManager, - AsyncMasterTaskDelayQueue asyncTaskDelayQueue) { + public AsyncMasterTaskExecutor(TaskExecutionContext taskExecutionContext, + LogicTaskPluginFactoryBuilder logicTaskPluginFactoryBuilder, + LogicTaskInstanceExecutionEventSenderManager logicTaskInstanceExecutionEventSenderManager, + AsyncMasterTaskDelayQueue asyncTaskDelayQueue) { super(taskExecutionContext, logicTaskPluginFactoryBuilder, logicTaskInstanceExecutionEventSenderManager); this.asyncMasterTaskDelayQueue = asyncTaskDelayQueue; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterDelayTaskExecuteRunnableFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskExecutorFactory.java similarity index 83% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterDelayTaskExecuteRunnableFactory.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskExecutorFactory.java index a71f394b7d120..6c81cd78626dc 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterDelayTaskExecuteRunnableFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskExecutorFactory.java @@ -25,9 +25,9 @@ import org.springframework.stereotype.Component; @Component -public class AsyncMasterDelayTaskExecuteRunnableFactory +public class AsyncMasterTaskExecutorFactory implements - MasterDelayTaskExecuteRunnableFactory { + MasterTaskExecutorFactory { @Autowired private LogicTaskPluginFactoryBuilder logicTaskPluginFactoryBuilder; @@ -39,8 +39,8 @@ public class AsyncMasterDelayTaskExecuteRunnableFactory private AsyncMasterTaskDelayQueue asyncTaskDelayQueue; @Override - public AsyncMasterDelayTaskExecuteRunnable createWorkerTaskExecuteRunnable(TaskExecutionContext taskExecutionContext) { - return new AsyncMasterDelayTaskExecuteRunnable(taskExecutionContext, + public AsyncMasterTaskExecutor createMasterTaskExecutor(TaskExecutionContext taskExecutionContext) { + return new AsyncMasterTaskExecutor(taskExecutionContext, logicTaskPluginFactoryBuilder, logicTaskInstanceExecutionEventSenderManager, asyncTaskDelayQueue); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java index e9d4fe4430a6a..6c83a3ccb9484 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java @@ -26,41 +26,41 @@ @Slf4j public class AsyncTaskCallbackFunctionImpl implements AsyncTaskCallbackFunction { - private final AsyncMasterDelayTaskExecuteRunnable asyncMasterDelayTaskExecuteRunnable; + private final AsyncMasterTaskExecutor asyncMasterTaskExecuteRunnable; - public AsyncTaskCallbackFunctionImpl(@NonNull AsyncMasterDelayTaskExecuteRunnable asyncMasterDelayTaskExecuteRunnable) { - this.asyncMasterDelayTaskExecuteRunnable = asyncMasterDelayTaskExecuteRunnable; + public AsyncTaskCallbackFunctionImpl(@NonNull AsyncMasterTaskExecutor asyncMasterTaskExecuteRunnable) { + this.asyncMasterTaskExecuteRunnable = asyncMasterTaskExecuteRunnable; } @Override public void executeSuccess() { - asyncMasterDelayTaskExecuteRunnable.getTaskExecutionContext() + asyncMasterTaskExecuteRunnable.getTaskExecutionContext() .setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS); executeFinished(); } @Override public void executeFailed() { - asyncMasterDelayTaskExecuteRunnable.getTaskExecutionContext() + asyncMasterTaskExecuteRunnable.getTaskExecutionContext() .setCurrentExecutionStatus(TaskExecutionStatus.FAILURE); executeFinished(); } @Override public void executeThrowing(Throwable throwable) { - asyncMasterDelayTaskExecuteRunnable.afterThrowing(throwable); + asyncMasterTaskExecuteRunnable.afterThrowing(throwable); } private void executeFinished() { TaskInstanceLogHeader.printFinalizeTaskHeader(); - int taskInstanceId = asyncMasterDelayTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId(); + int taskInstanceId = asyncMasterTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId(); MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskInstanceId); - MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskInstanceId); + MasterTaskExecutorHolder.removeMasterTaskExecutor(taskInstanceId); log.info("Task execute finished, removed the TaskExecutionContext"); - asyncMasterDelayTaskExecuteRunnable.sendTaskResult(); + asyncMasterTaskExecuteRunnable.sendTaskResult(); log.info( "Execute task finished, will send the task execute result to master, the current task execute result is {}", - asyncMasterDelayTaskExecuteRunnable.getTaskExecutionContext().getCurrentExecutionStatus().name()); - asyncMasterDelayTaskExecuteRunnable.closeLogAppender(); + asyncMasterTaskExecuteRunnable.getTaskExecutionContext().getCurrentExecutionStatus().name()); + asyncMasterTaskExecuteRunnable.closeLogAppender(); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnableFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnableFactory.java index 64443855fa2a2..ab749b58615d4 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnableFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnableFactory.java @@ -21,6 +21,9 @@ import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.exception.TaskExecuteRunnableCreateException; import org.apache.dolphinscheduler.server.master.exception.TaskExecutionContextCreateException; +import org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnableFactory; +import org.apache.dolphinscheduler.server.master.runner.TaskExecutionContextFactory; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.operator.TaskExecuteRunnableOperatorManager; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterDelayTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterDelayTaskExecuteRunnable.java deleted file mode 100644 index 70f3e93521331..0000000000000 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterDelayTaskExecuteRunnable.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.runner.execute; - -import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.server.master.runner.message.LogicTaskInstanceExecutionEventSenderManager; -import org.apache.dolphinscheduler.server.master.runner.task.LogicTaskPluginFactoryBuilder; - -import java.util.concurrent.Delayed; -import java.util.concurrent.TimeUnit; - -public abstract class MasterDelayTaskExecuteRunnable extends MasterTaskExecuteRunnable implements Delayed { - - public MasterDelayTaskExecuteRunnable(TaskExecutionContext taskExecutionContext, - LogicTaskPluginFactoryBuilder logicTaskPluginFactoryBuilder, - LogicTaskInstanceExecutionEventSenderManager logicTaskInstanceExecutionEventSenderManager) { - super(taskExecutionContext, logicTaskPluginFactoryBuilder, logicTaskInstanceExecutionEventSenderManager); - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof MasterDelayTaskExecuteRunnable)) { - return false; - } - MasterDelayTaskExecuteRunnable other = (MasterDelayTaskExecuteRunnable) obj; - return other.getTaskExecutionContext().getTaskInstanceId() == this.getTaskExecutionContext() - .getTaskInstanceId(); - } - - @Override - public int hashCode() { - return this.getTaskExecutionContext().getTaskInstanceId(); - } - - @Override - public long getDelay(TimeUnit unit) { - TaskExecutionContext taskExecutionContext = getTaskExecutionContext(); - return unit.convert( - DateUtils.getRemainTime( - taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L), - TimeUnit.SECONDS); - } - - @Override - public int compareTo(Delayed o) { - if (o == null) { - return 1; - } - return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); - } - -} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutor.java similarity index 94% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutor.java index 9e08c1a4d2077..8c1e2feaba191 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutor.java @@ -35,16 +35,16 @@ import lombok.extern.slf4j.Slf4j; @Slf4j -public abstract class MasterTaskExecuteRunnable implements Runnable { +public abstract class MasterTaskExecutor implements Runnable { protected final TaskExecutionContext taskExecutionContext; protected final LogicTaskPluginFactoryBuilder logicTaskPluginFactoryBuilder; protected final LogicTaskInstanceExecutionEventSenderManager logicTaskInstanceExecutionEventSenderManager; protected ILogicTask logicTask; - public MasterTaskExecuteRunnable(TaskExecutionContext taskExecutionContext, - LogicTaskPluginFactoryBuilder logicTaskPluginFactoryBuilder, - LogicTaskInstanceExecutionEventSenderManager logicTaskInstanceExecutionEventSenderManager) { + public MasterTaskExecutor(TaskExecutionContext taskExecutionContext, + LogicTaskPluginFactoryBuilder logicTaskPluginFactoryBuilder, + LogicTaskInstanceExecutionEventSenderManager logicTaskInstanceExecutionEventSenderManager) { this.taskExecutionContext = taskExecutionContext; this.logicTaskPluginFactoryBuilder = logicTaskPluginFactoryBuilder; this.logicTaskInstanceExecutionEventSenderManager = logicTaskInstanceExecutionEventSenderManager; @@ -68,7 +68,7 @@ protected void afterThrowing(Throwable throwable) { "Get a exception when execute the task, sent the task execute result to master, the current task execute result is {}", taskExecutionContext.getCurrentExecutionStatus()); MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskExecutionContext.getTaskInstanceId()); - MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskExecutionContext.getTaskInstanceId()); + MasterTaskExecutorHolder.removeMasterTaskExecutor(taskExecutionContext.getTaskInstanceId()); log.info("Get a exception when execute the task, removed the TaskExecutionContext"); closeLogAppender(); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterDelayTaskExecuteRunnableFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorFactory.java similarity index 83% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterDelayTaskExecuteRunnableFactory.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorFactory.java index 0fd79dfabe545..d1f76aedbc707 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterDelayTaskExecuteRunnableFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorFactory.java @@ -19,8 +19,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -public interface MasterDelayTaskExecuteRunnableFactory { +public interface MasterTaskExecutorFactory { - T createWorkerTaskExecuteRunnable(TaskExecutionContext taskExecutionContext); + T createMasterTaskExecutor(TaskExecutionContext taskExecutionContext); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactoryBuilder.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorFactoryBuilder.java similarity index 75% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactoryBuilder.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorFactoryBuilder.java index c5689f6a1b0f9..84c342c074e1f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactoryBuilder.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorFactoryBuilder.java @@ -28,24 +28,27 @@ import com.google.common.collect.Sets; +/** + * Use to create MasterTaskExecutorFactory + */ @Component -public class MasterTaskExecuteRunnableFactoryBuilder { +public class MasterTaskExecutorFactoryBuilder { @Autowired - private AsyncMasterDelayTaskExecuteRunnableFactory asyncMasterDelayTaskExecuteRunnableFactory; + private AsyncMasterTaskExecutorFactory asyncMasterTaskExecutorFactory; @Autowired - private SyncMasterDelayTaskExecuteRunnableFactory syncMasterDelayTaskExecuteRunnableFactory; + private SyncMasterTaskExecutorFactory syncMasterTaskExecutorFactory; private static final Set ASYNC_TASK_TYPE = Sets.newHashSet( DependentLogicTask.TASK_TYPE, SubWorkflowLogicTask.TASK_TYPE, DynamicLogicTask.TASK_TYPE); - public MasterDelayTaskExecuteRunnableFactory createWorkerDelayTaskExecuteRunnableFactory(String taskType) { + public MasterTaskExecutorFactory createMasterTaskExecutorFactory(String taskType) { if (ASYNC_TASK_TYPE.contains(taskType)) { - return asyncMasterDelayTaskExecuteRunnableFactory; + return asyncMasterTaskExecutorFactory; } - return syncMasterDelayTaskExecuteRunnableFactory; + return syncMasterTaskExecutorFactory; } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableHolder.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorHolder.java similarity index 66% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableHolder.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorHolder.java index 6b298976118da..983542cae2f3f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableHolder.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorHolder.java @@ -25,20 +25,20 @@ @Slf4j @UtilityClass -public class MasterTaskExecuteRunnableHolder { +public class MasterTaskExecutorHolder { - private static final Map SUBMITTED_MASTER_TASK_MAP = new ConcurrentHashMap<>(); + private static final Map SUBMITTED_MASTER_TASK_MAP = new ConcurrentHashMap<>(); - public void putMasterTaskExecuteRunnable(MasterTaskExecuteRunnable masterTaskExecuteRunnable) { - SUBMITTED_MASTER_TASK_MAP.put(masterTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId(), - masterTaskExecuteRunnable); + public void putMasterTaskExecuteRunnable(MasterTaskExecutor masterTaskExecutor) { + SUBMITTED_MASTER_TASK_MAP.put(masterTaskExecutor.getTaskExecutionContext().getTaskInstanceId(), + masterTaskExecutor); } - public MasterTaskExecuteRunnable getMasterTaskExecuteRunnable(Integer taskInstanceId) { + public MasterTaskExecutor getMasterTaskExecutor(Integer taskInstanceId) { return SUBMITTED_MASTER_TASK_MAP.get(taskInstanceId); } - public void removeMasterTaskExecuteRunnable(Integer taskInstanceId) { + public void removeMasterTaskExecutor(Integer taskInstanceId) { SUBMITTED_MASTER_TASK_MAP.remove(taskInstanceId); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPool.java similarity index 89% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableThreadPool.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPool.java index 4f542556e41d9..d61d058d22e04 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPool.java @@ -30,7 +30,7 @@ @Slf4j @Component -public class MasterTaskExecuteRunnableThreadPool { +public class MasterTaskExecutorThreadPool { @Autowired private MasterConfig masterConfig; @@ -44,8 +44,8 @@ public synchronized void start() { log.info("MasterTaskExecuteRunnableThreadPool started..."); } - public void submitMasterTaskExecuteRunnable(MasterTaskExecuteRunnable masterTaskExecuteRunnable) { - listeningExecutorService.submit(masterTaskExecuteRunnable); + public void submitMasterTaskExecutor(MasterTaskExecutor masterTaskExecutor) { + listeningExecutorService.submit(masterTaskExecutor); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterTaskExecutor.java similarity index 82% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnable.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterTaskExecutor.java index 62365f78cba55..7f303487efed3 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterTaskExecutor.java @@ -27,11 +27,11 @@ import lombok.extern.slf4j.Slf4j; @Slf4j -public class SyncMasterDelayTaskExecuteRunnable extends MasterDelayTaskExecuteRunnable { +public class SyncMasterTaskExecutor extends MasterTaskExecutor { - public SyncMasterDelayTaskExecuteRunnable(TaskExecutionContext taskExecutionContext, - LogicTaskPluginFactoryBuilder logicTaskPluginFactoryBuilder, - LogicTaskInstanceExecutionEventSenderManager logicTaskInstanceExecutionEventSenderManager) { + public SyncMasterTaskExecutor(TaskExecutionContext taskExecutionContext, + LogicTaskPluginFactoryBuilder logicTaskPluginFactoryBuilder, + LogicTaskInstanceExecutionEventSenderManager logicTaskInstanceExecutionEventSenderManager) { super(taskExecutionContext, logicTaskPluginFactoryBuilder, logicTaskInstanceExecutionEventSenderManager); } @@ -56,7 +56,7 @@ protected void afterExecute() throws MasterTaskExecuteException { taskExecutionContext.getCurrentExecutionStatus().name()); closeLogAppender(); MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskExecutionContext.getTaskInstanceId()); - MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskExecutionContext.getTaskInstanceId()); + MasterTaskExecutorHolder.removeMasterTaskExecutor(taskExecutionContext.getTaskInstanceId()); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnableFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterTaskExecutorFactory.java similarity index 81% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnableFactory.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterTaskExecutorFactory.java index 2bf829c02132d..b591711e3d02f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnableFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterTaskExecutorFactory.java @@ -28,9 +28,9 @@ @Slf4j @Component -public class SyncMasterDelayTaskExecuteRunnableFactory +public class SyncMasterTaskExecutorFactory implements - MasterDelayTaskExecuteRunnableFactory { + MasterTaskExecutorFactory { @Autowired private LogicTaskPluginFactoryBuilder logicTaskPluginFactoryBuilder; @@ -38,8 +38,8 @@ public class SyncMasterDelayTaskExecuteRunnableFactory private LogicTaskInstanceExecutionEventSenderManager logicTaskInstanceExecutionEventSenderManager; @Override - public SyncMasterDelayTaskExecuteRunnable createWorkerTaskExecuteRunnable(TaskExecutionContext taskExecutionContext) { - return new SyncMasterDelayTaskExecuteRunnable(taskExecutionContext, logicTaskPluginFactoryBuilder, + public SyncMasterTaskExecutor createMasterTaskExecutor(TaskExecutionContext taskExecutionContext) { + return new SyncMasterTaskExecutor(taskExecutionContext, logicTaskPluginFactoryBuilder, logicTaskInstanceExecutionEventSenderManager); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java index 6294f581ecce2..dce716e71492f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java @@ -17,19 +17,41 @@ package org.apache.dolphinscheduler.server.master.runner.operator; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.GlobalTaskDispatchWaitingQueue; -import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable; +import java.util.concurrent.TimeUnit; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j public abstract class BaseTaskExecuteRunnableDispatchOperator implements TaskExecuteRunnableOperator { private final GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue; - public BaseTaskExecuteRunnableDispatchOperator(GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue) { + private final TaskInstanceDao taskInstanceDao; + + public BaseTaskExecuteRunnableDispatchOperator( + GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue, + TaskInstanceDao taskInstanceDao) { this.globalTaskDispatchWaitingQueue = globalTaskDispatchWaitingQueue; + this.taskInstanceDao = taskInstanceDao; } @Override public void operate(DefaultTaskExecuteRunnable taskExecuteRunnable) { + long remainTime = taskExecuteRunnable.getDelay(TimeUnit.SECONDS); + TaskInstance taskInstance = taskExecuteRunnable.getTaskInstance(); + if (remainTime > 0) { + taskInstance.setState(TaskExecutionStatus.DELAY_EXECUTION); + taskInstanceDao.updateById(taskInstance); + log.info("Current taskInstance: {} is choose delay execution, delay time: {}/s, remainTime: {}/s", + taskInstance.getName(), + TimeUnit.SECONDS.toMillis(taskInstance.getDelayTime()), remainTime); + } globalTaskDispatchWaitingQueue.submitNeedToDispatchTaskExecuteRunnable(taskExecuteRunnable); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableKillOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableKillOperator.java index a1d0a893fc56d..1b7a92db98d29 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableKillOperator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableKillOperator.java @@ -20,7 +20,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; -import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable; import java.util.Date; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnablePauseOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnablePauseOperator.java index 383752d3f3a1d..8163817afc665 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnablePauseOperator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnablePauseOperator.java @@ -18,7 +18,7 @@ package org.apache.dolphinscheduler.server.master.runner.operator; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable; import lombok.extern.slf4j.Slf4j; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableTimeoutOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableTimeoutOperator.java index 4068bfa082050..ef9dc80901191 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableTimeoutOperator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableTimeoutOperator.java @@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; -import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable; import java.util.Date; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableDispatchOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableDispatchOperator.java index d5b0802372882..ed1b777aebb14 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableDispatchOperator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableDispatchOperator.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.runner.operator; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.server.master.runner.GlobalTaskDispatchWaitingQueue; import org.springframework.stereotype.Component; @@ -24,8 +25,9 @@ @Component public class LogicTaskExecuteRunnableDispatchOperator extends BaseTaskExecuteRunnableDispatchOperator { - public LogicTaskExecuteRunnableDispatchOperator(GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue) { - super(globalTaskDispatchWaitingQueue); + public LogicTaskExecuteRunnableDispatchOperator(GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue, + TaskInstanceDao taskInstanceDao) { + super(globalTaskDispatchWaitingQueue, taskInstanceDao); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableDispatchOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableDispatchOperator.java index 5a9f070641648..5a31f1138df64 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableDispatchOperator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableDispatchOperator.java @@ -17,20 +17,16 @@ package org.apache.dolphinscheduler.server.master.runner.operator; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.server.master.runner.GlobalTaskDispatchWaitingQueue; -import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component -public class TaskExecuteRunnableDispatchOperator implements TaskExecuteRunnableOperator { +public class TaskExecuteRunnableDispatchOperator extends BaseTaskExecuteRunnableDispatchOperator { - @Autowired - private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue; - - @Override - public void operate(DefaultTaskExecuteRunnable taskExecuteRunnable) { - globalTaskDispatchWaitingQueue.submitNeedToDispatchTaskExecuteRunnable(taskExecuteRunnable); + public TaskExecuteRunnableDispatchOperator(GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue, + TaskInstanceDao taskInstanceDao) { + super(globalTaskDispatchWaitingQueue, taskInstanceDao); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableOperator.java index d270103da18f1..1d397e3575a01 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableOperator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableOperator.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.server.master.runner.operator; -import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable; public interface TaskExecuteRunnableOperator { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableOperatorManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableOperatorManager.java index b217aff8664a9..1b92f5e75ca90 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableOperatorManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableOperatorManager.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.server.master.runner.operator; -import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable; import org.apache.dolphinscheduler.server.master.utils.TaskUtils; import org.springframework.beans.factory.annotation.Autowired; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnablePauseOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnablePauseOperator.java index 39896bb0cbc35..f450044377f20 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnablePauseOperator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnablePauseOperator.java @@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator; import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstancePauseRequest; import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstancePauseResponse; -import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable; import org.apache.commons.lang3.StringUtils; diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java index d121d7e198925..28c0010e3b01b 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java @@ -39,7 +39,6 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph; import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory; -import org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable; import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcherTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcherTest.java index c9710209e520d..f57c9b6a68ddc 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcherTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcherTest.java @@ -20,7 +20,7 @@ import org.apache.dolphinscheduler.extract.base.utils.Host; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService; -import org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnable; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcherTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcherTest.java index 9b447aada392a..46c4f53e1a3e0 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcherTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcherTest.java @@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException; import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService; -import org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnable; import java.util.Optional; diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnableTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityDelayTaskExecuteRunnableTest.java similarity index 89% rename from dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnableTest.java rename to dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityDelayTaskExecuteRunnableTest.java index 3a3a80b3cae2a..778884e066a20 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnableTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityDelayTaskExecuteRunnableTest.java @@ -21,12 +21,14 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.PriorityDelayTaskExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.operator.TaskExecuteRunnableOperatorManager; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -public class PriorityTaskExecuteRunnableTest { +public class PriorityDelayTaskExecuteRunnableTest { @Test public void testCompareTo() { @@ -46,9 +48,9 @@ public void testCompareTo() { TaskExecutionContext context1 = new TaskExecutionContext(); TaskExecutionContext context2 = new TaskExecutionContext(); - PriorityTaskExecuteRunnable p1 = + PriorityDelayTaskExecuteRunnable p1 = new DefaultTaskExecuteRunnable(workflowInstance, t1, context1, taskOperatorManager); - PriorityTaskExecuteRunnable p2 = + PriorityDelayTaskExecuteRunnable p2 = new DefaultTaskExecuteRunnable(workflowInstance, t2, context2, taskOperatorManager); Assertions.assertEquals(0, p1.compareTo(p2)); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java index 3c24977c4a09c..d5a858a437f0b 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.plugin.task.api.shell; +import org.apache.dolphinscheduler.common.constants.TenantConstants; import org.apache.dolphinscheduler.common.exception.FileOperateException; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.PropertyUtils; @@ -67,8 +68,10 @@ protected void generateShellScript() throws IOException { protected List generateBootstrapCommand() throws FileOperateException { if (sudoEnable) { - // Set the tenant owner as the working directory - FileUtils.setDirectoryOwner(Paths.get(shellDirectory), runUser); + if (!TenantConstants.BOOTSTRAPT_SYSTEM_USER.equals(runUser)) { + // Set the tenant owner as the working directory + FileUtils.setDirectoryOwner(Paths.get(shellDirectory), runUser); + } return bootstrapCommandInSudoMode(); } return bootstrapCommandInNormalMode(); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index 082d784aab805..e61409c9d23ff 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -29,7 +29,7 @@ import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer; -import org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceDispatchQueueLooper; +import org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceWaitingQueueLooper; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.commons.collections4.CollectionUtils; @@ -68,7 +68,7 @@ public class WorkerServer implements IStoppable { private MessageRetryRunner messageRetryRunner; @Autowired - private GlobalTaskInstanceDispatchQueueLooper globalTaskInstanceDispatchQueueLooper; + private GlobalTaskInstanceWaitingQueueLooper globalTaskInstanceWaitingQueueLooper; /** * worker server startup, not use web service @@ -91,7 +91,7 @@ public void run() { this.workerManagerThread.start(); this.messageRetryRunner.start(); - this.globalTaskInstanceDispatchQueueLooper.start(); + this.globalTaskInstanceWaitingQueueLooper.start(); /* * registry hooks, which are called before the process exits diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java index 2ab6ed8ecdffd..dc97a2c9a4694 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java @@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner; import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer; -import org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceDispatchQueue; +import org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceWaitingQueue; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import java.time.Duration; @@ -57,7 +57,7 @@ public class WorkerWaitingStrategy implements WorkerConnectStrategy { private WorkerManagerThread workerManagerThread; @Autowired - private GlobalTaskInstanceDispatchQueue globalTaskInstanceDispatchQueue; + private GlobalTaskInstanceWaitingQueue globalTaskInstanceWaitingQueue; @Override public void disconnect() { @@ -121,7 +121,7 @@ private void clearWorkerResource() { workerRpcServer.close(); log.warn("Worker server close the RPC server due to lost connection from registry"); workerManagerThread.clearTask(); - globalTaskInstanceDispatchQueue.clearTask(); + globalTaskInstanceWaitingQueue.clearTask(); log.warn("Worker server clear the tasks due to lost connection from registry"); messageRetryRunner.clearMessage(); log.warn("Worker server clear the retry message due to lost connection from registry"); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/StreamingTaskInstanceOperatorImpl.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/StreamingTaskInstanceOperatorImpl.java index 3466d52c28e52..c57ee2efca719 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/StreamingTaskInstanceOperatorImpl.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/StreamingTaskInstanceOperatorImpl.java @@ -26,7 +26,7 @@ import org.apache.dolphinscheduler.plugin.task.api.stream.StreamTask; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; -import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable; +import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor; import lombok.extern.slf4j.Slf4j; @@ -53,12 +53,12 @@ public TaskInstanceTriggerSavepointResponse triggerSavepoint(TaskInstanceTrigger log.error("Cannot find TaskExecutionContext for taskInstance: {}", taskInstanceId); return TaskInstanceTriggerSavepointResponse.fail("Cannot find TaskExecutionContext"); } - WorkerTaskExecuteRunnable workerTaskExecuteRunnable = workerManager.getTaskExecuteThread(taskInstanceId); - if (workerTaskExecuteRunnable == null) { + WorkerTaskExecutor workerTaskExecutor = workerManager.getTaskExecuteThread(taskInstanceId); + if (workerTaskExecutor == null) { log.error("Cannot find WorkerTaskExecuteRunnable for taskInstance: {}", taskInstanceId); return TaskInstanceTriggerSavepointResponse.fail("Cannot find WorkerTaskExecuteRunnable"); } - AbstractTask task = workerTaskExecuteRunnable.getTask(); + AbstractTask task = workerTaskExecutor.getTask(); if (task == null) { log.error("Cannot find StreamTask for taskInstance:{}", taskInstanceId); return TaskInstanceTriggerSavepointResponse.fail("Cannot find StreamTask"); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java deleted file mode 100644 index 631b29cc8e550..0000000000000 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.worker.runner; - -import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; -import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; -import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; -import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; - -import javax.annotation.Nullable; - -import lombok.NonNull; - -public class DefaultWorkerDelayTaskExecuteRunnableFactory - extends - WorkerDelayTaskExecuteRunnableFactory { - - protected DefaultWorkerDelayTaskExecuteRunnableFactory(@NonNull TaskExecutionContext taskExecutionContext, - @NonNull WorkerConfig workerConfig, - @NonNull WorkerMessageSender workerMessageSender, - @NonNull TaskPluginManager taskPluginManager, - @Nullable StorageOperate storageOperate, - @NonNull WorkerRegistryClient workerRegistryClient) { - super(taskExecutionContext, - workerConfig, - workerMessageSender, - taskPluginManager, - storageOperate, - workerRegistryClient); - } - - @Override - public DefaultWorkerDelayTaskExecuteRunnable createWorkerTaskExecuteRunnable() { - return new DefaultWorkerDelayTaskExecuteRunnable( - taskExecutionContext, - workerConfig, - workerMessageSender, - taskPluginManager, - storageOperate, - workerRegistryClient); - } -} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutor.java similarity index 76% rename from dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java rename to dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutor.java index 82b91e16baaaa..19421ee05e663 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutor.java @@ -30,14 +30,14 @@ import lombok.NonNull; -public class DefaultWorkerDelayTaskExecuteRunnable extends WorkerDelayTaskExecuteRunnable { +public class DefaultWorkerTaskExecutor extends WorkerTaskExecutor { - public DefaultWorkerDelayTaskExecuteRunnable(@NonNull TaskExecutionContext taskExecutionContext, - @NonNull WorkerConfig workerConfig, - @NonNull WorkerMessageSender workerMessageSender, - @NonNull TaskPluginManager taskPluginManager, - @Nullable StorageOperate storageOperate, - @NonNull WorkerRegistryClient workerRegistryClient) { + public DefaultWorkerTaskExecutor(@NonNull TaskExecutionContext taskExecutionContext, + @NonNull WorkerConfig workerConfig, + @NonNull WorkerMessageSender workerMessageSender, + @NonNull TaskPluginManager taskPluginManager, + @Nullable StorageOperate storageOperate, + @NonNull WorkerRegistryClient workerRegistryClient) { super(taskExecutionContext, workerConfig, workerMessageSender, diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorFactory.java similarity index 55% rename from dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java rename to dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorFactory.java index f2d39835b1459..0141a5cd177bf 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorFactory.java @@ -28,24 +28,23 @@ import lombok.NonNull; -public abstract class WorkerDelayTaskExecuteRunnableFactory +public class DefaultWorkerTaskExecutorFactory implements - WorkerTaskExecuteRunnableFactory { - - protected final @NonNull TaskExecutionContext taskExecutionContext; - protected final @NonNull WorkerConfig workerConfig; - protected final @NonNull WorkerMessageSender workerMessageSender; - protected final @NonNull TaskPluginManager taskPluginManager; - protected final @Nullable StorageOperate storageOperate; - protected final @NonNull WorkerRegistryClient workerRegistryClient; - - protected WorkerDelayTaskExecuteRunnableFactory( - @NonNull TaskExecutionContext taskExecutionContext, - @NonNull WorkerConfig workerConfig, - @NonNull WorkerMessageSender workerMessageSender, - @NonNull TaskPluginManager taskPluginManager, - @Nullable StorageOperate storageOperate, - @NonNull WorkerRegistryClient workerRegistryClient) { + WorkerTaskExecutorFactory { + + private final @NonNull TaskExecutionContext taskExecutionContext; + private final @NonNull WorkerConfig workerConfig; + private final @NonNull WorkerMessageSender workerMessageSender; + private final @NonNull TaskPluginManager taskPluginManager; + private final @Nullable StorageOperate storageOperate; + private final @NonNull WorkerRegistryClient workerRegistryClient; + + public DefaultWorkerTaskExecutorFactory(@NonNull TaskExecutionContext taskExecutionContext, + @NonNull WorkerConfig workerConfig, + @NonNull WorkerMessageSender workerMessageSender, + @NonNull TaskPluginManager taskPluginManager, + @Nullable StorageOperate storageOperate, + @NonNull WorkerRegistryClient workerRegistryClient) { this.taskExecutionContext = taskExecutionContext; this.workerConfig = workerConfig; this.workerMessageSender = workerMessageSender; @@ -54,5 +53,14 @@ protected WorkerDelayTaskExecuteRunnableFactory( this.workerRegistryClient = workerRegistryClient; } - public abstract T createWorkerTaskExecuteRunnable(); + @Override + public DefaultWorkerTaskExecutor createWorkerTaskExecutor() { + return new DefaultWorkerTaskExecutor( + taskExecutionContext, + workerConfig, + workerMessageSender, + taskPluginManager, + storageOperate, + workerRegistryClient); + } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceDispatchQueue.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueue.java similarity index 95% rename from dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceDispatchQueue.java rename to dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueue.java index 2b894913a7474..e1ac97b2d3187 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceDispatchQueue.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueue.java @@ -31,13 +31,13 @@ @Slf4j @Component -public class GlobalTaskInstanceDispatchQueue { +public class GlobalTaskInstanceWaitingQueue { private final WorkerConfig workerConfig; private final BlockingQueue blockingQueue; - public GlobalTaskInstanceDispatchQueue(WorkerConfig workerConfig) { + public GlobalTaskInstanceWaitingQueue(WorkerConfig workerConfig) { this.workerConfig = workerConfig; this.blockingQueue = new ArrayBlockingQueue<>(workerConfig.getExecThreads()); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceDispatchQueueLooper.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueueLooper.java similarity index 66% rename from dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceDispatchQueueLooper.java rename to dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueueLooper.java index 654a1dd9fec90..f54ed33c8602c 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceDispatchQueueLooper.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueueLooper.java @@ -18,12 +18,9 @@ package org.apache.dolphinscheduler.server.worker.runner; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; -import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.extract.master.transportor.ITaskInstanceExecutionEvent; import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; -import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; @@ -36,10 +33,10 @@ @Slf4j @Component -public class GlobalTaskInstanceDispatchQueueLooper extends BaseDaemonThread { +public class GlobalTaskInstanceWaitingQueueLooper extends BaseDaemonThread { @Autowired - private GlobalTaskInstanceDispatchQueue globalTaskInstanceDispatchQueue; + private GlobalTaskInstanceWaitingQueue globalTaskInstanceWaitingQueue; @Autowired private WorkerConfig workerConfig; @@ -59,7 +56,7 @@ public class GlobalTaskInstanceDispatchQueueLooper extends BaseDaemonThread { @Autowired private WorkerRegistryClient workerRegistryClient; - protected GlobalTaskInstanceDispatchQueueLooper() { + protected GlobalTaskInstanceWaitingQueueLooper() { super("GlobalTaskDispatchQueueLooper"); } @@ -72,35 +69,20 @@ public synchronized void start() { public void run() { while (true) { try { - TaskExecutionContext taskExecutionContext = globalTaskInstanceDispatchQueue.take(); + TaskExecutionContext taskExecutionContext = globalTaskInstanceWaitingQueue.take(); LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath()); LogUtils.setTaskInstanceIdMDC(taskExecutionContext.getTaskInstanceId()); - int delayTime = taskExecutionContext.getDelayTime(); - if (delayTime > 0) { - // delay task process - long remainTime = - DateUtils.getRemainTime( - DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()), - delayTime * 60L); - if (remainTime > 0) { - log.info("Current taskInstance is choose delay execution, delay time: {}s", remainTime); - taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION); - // todo: use delay running event - workerMessageSender.sendMessage(taskExecutionContext, - ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH); - } - } - WorkerDelayTaskExecuteRunnable workerTaskExecuteRunnable = WorkerTaskExecuteRunnableFactoryBuilder - .createWorkerDelayTaskExecuteRunnableFactory( + WorkerTaskExecutor workerTaskExecutor = WorkerTaskExecutorFactoryBuilder + .createWorkerTaskExecutorFactory( taskExecutionContext, workerConfig, workerMessageSender, taskPluginManager, storageOperate, workerRegistryClient) - .createWorkerTaskExecuteRunnable(); - if (workerManager.offer(workerTaskExecuteRunnable)) { + .createWorkerTaskExecutor(); + if (workerManager.offer(workerTaskExecutor)) { log.info("Success submit WorkerDelayTaskExecuteRunnable to WorkerManagerThread's waiting queue"); } } catch (InterruptedException e) { diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java deleted file mode 100644 index 07d85dc57b5ea..0000000000000 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.worker.runner; - -import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; -import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; -import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; -import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; - -import java.util.concurrent.Delayed; -import java.util.concurrent.TimeUnit; - -import javax.annotation.Nullable; - -import lombok.NonNull; - -public abstract class WorkerDelayTaskExecuteRunnable extends WorkerTaskExecuteRunnable implements Delayed { - - protected WorkerDelayTaskExecuteRunnable(@NonNull TaskExecutionContext taskExecutionContext, - @NonNull WorkerConfig workerConfig, - @NonNull WorkerMessageSender workerMessageSender, - @NonNull TaskPluginManager taskPluginManager, - @Nullable StorageOperate storageOperate, - @NonNull WorkerRegistryClient workerRegistryClient) { - super(taskExecutionContext, - workerConfig, - workerMessageSender, - taskPluginManager, - storageOperate, - workerRegistryClient); - } - - @Override - public long getDelay(TimeUnit unit) { - TaskExecutionContext taskExecutionContext = getTaskExecutionContext(); - return unit.convert( - DateUtils.getRemainTime( - DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()), - taskExecutionContext.getDelayTime() * 60L), - TimeUnit.SECONDS); - } - - @Override - public int compareTo(Delayed o) { - if (o == null) { - return 1; - } - return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); - } - -} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java index f16e887bec8be..2a6b7feec2153 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java @@ -37,25 +37,19 @@ public class WorkerExecService { private final ListeningExecutorService listeningExecutorService; - /** - * thread executor service - */ private final ExecutorService execService; - /** - * running task - */ - private final ConcurrentHashMap taskExecuteThreadMap; + private final ConcurrentHashMap taskExecuteThreadMap; public WorkerExecService(ExecutorService execService, - ConcurrentHashMap taskExecuteThreadMap) { + ConcurrentHashMap taskExecuteThreadMap) { this.execService = execService; this.listeningExecutorService = MoreExecutors.listeningDecorator(this.execService); this.taskExecuteThreadMap = taskExecuteThreadMap; WorkerServerMetrics.registerWorkerTaskTotalGauge(taskExecuteThreadMap::size); } - public void submit(final WorkerTaskExecuteRunnable taskExecuteThread) { + public void submit(final WorkerTaskExecutor taskExecuteThread) { taskExecuteThreadMap.put(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), taskExecuteThread); ListenableFuture future = this.listeningExecutorService.submit(taskExecuteThread); FutureCallback futureCallback = new FutureCallback() { @@ -90,7 +84,7 @@ public int getActiveExecThreadCount() { return ((ThreadPoolExecutor) this.execService).getActiveCount(); } - public Map getTaskExecuteThreadMap() { + public Map getTaskExecuteThreadMap() { return taskExecuteThreadMap; } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java index 3fdcec4e891b0..4b666cfb20cb2 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java @@ -25,8 +25,9 @@ import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.DelayQueue; +import java.util.concurrent.LinkedBlockingQueue; import javax.annotation.Nullable; @@ -41,28 +42,22 @@ @Slf4j public class WorkerManagerThread implements Runnable { - private final DelayQueue waitSubmitQueue; + private final BlockingQueue waitSubmitQueue; private final WorkerExecService workerExecService; - private final WorkerConfig workerConfig; private final int workerExecThreads; - /** - * running task - */ - private final ConcurrentHashMap taskExecuteThreadMap = - new ConcurrentHashMap<>(); + private final ConcurrentHashMap taskExecuteThreadMap = new ConcurrentHashMap<>(); public WorkerManagerThread(WorkerConfig workerConfig) { - this.workerConfig = workerConfig; workerExecThreads = workerConfig.getExecThreads(); - this.waitSubmitQueue = new DelayQueue<>(); + this.waitSubmitQueue = new LinkedBlockingQueue<>(); workerExecService = new WorkerExecService( ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getExecThreads()), taskExecuteThreadMap); } - public @Nullable WorkerTaskExecuteRunnable getTaskExecuteThread(Integer taskInstanceId) { + public @Nullable WorkerTaskExecutor getTaskExecuteThread(Integer taskInstanceId) { return taskExecuteThreadMap.get(taskInstanceId); } @@ -95,7 +90,7 @@ public void killTaskBeforeExecuteByInstanceId(Integer taskInstanceId) { .forEach(waitSubmitQueue::remove); } - public boolean offer(WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable) { + public boolean offer(WorkerTaskExecutor workerDelayTaskExecuteRunnable) { return waitSubmitQueue.add(workerDelayTaskExecuteRunnable); } @@ -122,8 +117,8 @@ public void run() { Thread.sleep(Constants.SLEEP_TIME_MILLIS); } if (this.getThreadPoolQueueSize() <= workerExecThreads) { - final WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable = waitSubmitQueue.take(); - workerExecService.submit(workerDelayTaskExecuteRunnable); + WorkerTaskExecutor workerTaskExecutor = waitSubmitQueue.take(); + workerExecService.submit(workerTaskExecutor); } else { WorkerServerMetrics.incWorkerOverloadCount(); log.info("Exec queue is full, waiting submit queue {}, waiting exec queue size {}", diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java similarity index 95% rename from dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java rename to dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java index 15141ed7479da..c0c7c35b7d70d 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java @@ -67,9 +67,9 @@ import com.google.common.base.Strings; -public abstract class WorkerTaskExecuteRunnable implements Runnable { +public abstract class WorkerTaskExecutor implements Runnable { - protected static final Logger log = LoggerFactory.getLogger(WorkerTaskExecuteRunnable.class); + protected static final Logger log = LoggerFactory.getLogger(WorkerTaskExecutor.class); protected final TaskExecutionContext taskExecutionContext; protected final WorkerConfig workerConfig; @@ -80,13 +80,13 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable { protected @Nullable AbstractTask task; - protected WorkerTaskExecuteRunnable( - @NonNull TaskExecutionContext taskExecutionContext, - @NonNull WorkerConfig workerConfig, - @NonNull WorkerMessageSender workerMessageSender, - @NonNull TaskPluginManager taskPluginManager, - @Nullable StorageOperate storageOperate, - @NonNull WorkerRegistryClient workerRegistryClient) { + protected WorkerTaskExecutor( + @NonNull TaskExecutionContext taskExecutionContext, + @NonNull WorkerConfig workerConfig, + @NonNull WorkerMessageSender workerMessageSender, + @NonNull TaskPluginManager taskPluginManager, + @Nullable StorageOperate storageOperate, + @NonNull WorkerRegistryClient workerRegistryClient) { this.taskExecutionContext = taskExecutionContext; this.workerConfig = workerConfig; this.workerMessageSender = workerMessageSender; diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactory.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactory.java similarity index 90% rename from dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactory.java rename to dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactory.java index 441662f4bce28..5a185dbba0c3a 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactory.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactory.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.server.worker.runner; -public interface WorkerTaskExecuteRunnableFactory { +public interface WorkerTaskExecutorFactory { - T createWorkerTaskExecuteRunnable(); + T createWorkerTaskExecutor(); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java similarity index 72% rename from dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java rename to dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java index 603462e5b0dba..c2efdc9c7adba 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java @@ -30,15 +30,15 @@ import lombok.experimental.UtilityClass; @UtilityClass -public class WorkerTaskExecuteRunnableFactoryBuilder { +public class WorkerTaskExecutorFactoryBuilder { - public static WorkerDelayTaskExecuteRunnableFactory createWorkerDelayTaskExecuteRunnableFactory(@NonNull TaskExecutionContext taskExecutionContext, - @NonNull WorkerConfig workerConfig, - @NonNull WorkerMessageSender workerMessageSender, - @NonNull TaskPluginManager taskPluginManager, - @Nullable StorageOperate storageOperate, - @NonNull WorkerRegistryClient workerRegistryClient) { - return new DefaultWorkerDelayTaskExecuteRunnableFactory(taskExecutionContext, + public static WorkerTaskExecutorFactory createWorkerTaskExecutorFactory(@NonNull TaskExecutionContext taskExecutionContext, + @NonNull WorkerConfig workerConfig, + @NonNull WorkerMessageSender workerMessageSender, + @NonNull TaskPluginManager taskPluginManager, + @Nullable StorageOperate storageOperate, + @NonNull WorkerRegistryClient workerRegistryClient) { + return new DefaultWorkerTaskExecutorFactory(taskExecutionContext, workerConfig, workerMessageSender, taskPluginManager, diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java index 3ff110c45999b..4d3ebc9aa93bb 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java @@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.metrics.TaskMetrics; -import org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceDispatchQueue; +import org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceWaitingQueue; import lombok.extern.slf4j.Slf4j; @@ -41,7 +41,7 @@ public class TaskInstanceDispatchOperationFunction private WorkerConfig workerConfig; @Autowired - private GlobalTaskInstanceDispatchQueue globalTaskInstanceDispatchQueue; + private GlobalTaskInstanceWaitingQueue globalTaskInstanceWaitingQueue; @Override public TaskInstanceDispatchResponse operate(TaskInstanceDispatchRequest taskInstanceDispatchRequest) { @@ -57,7 +57,7 @@ public TaskInstanceDispatchResponse operate(TaskInstanceDispatchRequest taskInst taskExecutionContext.getTaskInstanceId()); TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType()); - if (!globalTaskInstanceDispatchQueue.addDispatchTask(taskExecutionContext)) { + if (!globalTaskInstanceWaitingQueue.addDispatchTask(taskExecutionContext)) { log.error("Submit task: {} to wait queue error, current queue size: {} is full", taskExecutionContext.getTaskName(), workerConfig.getExecThreads()); return TaskInstanceDispatchResponse.failed(taskExecutionContext.getTaskInstanceId(), diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java index 347a52fa50d46..c5f4ffb78bbaf 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java @@ -29,7 +29,7 @@ import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; -import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable; +import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor; import lombok.extern.slf4j.Slf4j; @@ -103,12 +103,12 @@ private boolean doKill(TaskExecutionContext taskExecutionContext) { } protected void cancelApplication(int taskInstanceId) { - WorkerTaskExecuteRunnable workerTaskExecuteRunnable = workerManager.getTaskExecuteThread(taskInstanceId); - if (workerTaskExecuteRunnable == null) { + WorkerTaskExecutor workerTaskExecutor = workerManager.getTaskExecuteThread(taskInstanceId); + if (workerTaskExecutor == null) { log.warn("taskExecuteThread not found, taskInstanceId:{}", taskInstanceId); return; } - AbstractTask task = workerTaskExecuteRunnable.getTask(); + AbstractTask task = workerTaskExecutor.getTask(); if (task == null) { log.warn("task not found, taskInstanceId:{}", taskInstanceId); return; diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorTest.java similarity index 91% rename from dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java rename to dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorTest.java index 79d04edae1290..43ef6f87d08e2 100644 --- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorTest.java @@ -30,7 +30,7 @@ import org.junit.jupiter.api.Test; import org.mockito.Mockito; -public class DefaultWorkerDelayTaskExecuteRunnableTest { +public class DefaultWorkerTaskExecutorTest { private TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class); @@ -54,7 +54,7 @@ public void testDryRun() { .processDefineId(0) .firstSubmitTime(System.currentTimeMillis()) .build(); - WorkerTaskExecuteRunnable workerTaskExecuteRunnable = new DefaultWorkerDelayTaskExecuteRunnable( + WorkerTaskExecutor workerTaskExecutor = new DefaultWorkerTaskExecutor( taskExecutionContext, workerConfig, workerMessageSender, @@ -62,7 +62,7 @@ public void testDryRun() { storageOperate, workerRegistryClient); - Assertions.assertAll(workerTaskExecuteRunnable::run); + Assertions.assertAll(workerTaskExecutor::run); Assertions.assertEquals(TaskExecutionStatus.SUCCESS, taskExecutionContext.getCurrentExecutionStatus()); } @@ -78,7 +78,7 @@ public void testErrorboundTestDataSource() { .taskParams( "{\"localParams\":[],\"resourceList\":[],\"type\":\"POSTGRESQL\",\"datasource\":null,\"sql\":\"select * from t_ds_user\",\"sqlType\":\"0\",\"preStatements\":[],\"postStatements\":[],\"segmentSeparator\":\"\",\"displayRows\":10,\"conditionResult\":\"null\",\"dependence\":\"null\",\"switchResult\":\"null\",\"waitStartTimeout\":null}") .build(); - WorkerTaskExecuteRunnable workerTaskExecuteRunnable = new DefaultWorkerDelayTaskExecuteRunnable( + WorkerTaskExecutor workerTaskExecutor = new DefaultWorkerTaskExecutor( taskExecutionContext, workerConfig, workerMessageSender, @@ -86,7 +86,7 @@ public void testErrorboundTestDataSource() { storageOperate, workerRegistryClient); - Assertions.assertAll(workerTaskExecuteRunnable::run); + Assertions.assertAll(workerTaskExecutor::run); Assertions.assertEquals(TaskExecutionStatus.FAILURE, taskExecutionContext.getCurrentExecutionStatus()); } }