From 5f10044ac27cecccc4033eab0421b7494477cc6e Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Tue, 5 Dec 2023 13:56:00 +0800 Subject: [PATCH] Move delay calculation to Master --- ...TaskInstanceDispatchOperationFunction.java | 19 +++-- ...gicITaskInstanceKillOperationFunction.java | 8 +-- ...GlobalMasterTaskExecuteRunnableQueue.java} | 24 +++---- ...MasterTaskExecuteRunnableQueueLooper.java} | 16 ++--- .../GlobalTaskDispatchWaitingQueue.java | 4 +- .../runner/MasterTaskExecutorBootstrap.java | 8 +-- ...va => AsyncMasterTaskExecuteRunnable.java} | 10 +-- ...syncMasterTaskExecuteRunnableFactory.java} | 8 +-- .../AsyncTaskCallbackFunctionImpl.java | 20 +++--- .../execute/BaseTaskExecuteRunnable.java | 55 ++++++++++++++ .../execute/DefaultTaskExecuteRunnable.java | 2 +- .../MasterDelayTaskExecuteRunnable.java | 68 ------------------ ... => MasterTaskExecuteRunnableFactory.java} | 2 +- ...sterTaskExecuteRunnableFactoryBuilder.java | 6 +- ... => PriorityDelayTaskExecuteRunnable.java} | 71 +++++++------------ ...ava => SyncMasterTaskExecuteRunnable.java} | 8 +-- ...SyncMasterTaskExecuteRunnableFactory.java} | 8 +-- .../runner/execute/TaskExecuteRunnable.java | 2 +- ...seTaskExecuteRunnableDispatchOperator.java | 24 ++++++- ...icTaskExecuteRunnableDispatchOperator.java | 6 +- .../TaskExecuteRunnableDispatchOperator.java | 14 ++-- ...PriorityDelayTaskExecuteRunnableTest.java} | 6 +- .../BaseLinuxShellInterceptorBuilder.java | 7 +- ...WorkerDelayTaskExecuteRunnableFactory.java | 59 --------------- ... => DefaultWorkerTaskExecuteRunnable.java} | 14 ++-- ...aultWorkerTaskExecuteRunnableFactory.java} | 44 +++++++----- ...GlobalTaskInstanceDispatchQueueLooper.java | 22 +----- .../worker/runner/WorkerManagerThread.java | 13 ++-- ...rkerTaskExecuteRunnableFactoryBuilder.java | 14 ++-- ...DefaultWorkerTaskExecuteRunnableTest.java} | 6 +- 30 files changed, 245 insertions(+), 323 deletions(-) rename dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/{MasterDelayTaskExecuteRunnableDelayQueue.java => GlobalMasterTaskExecuteRunnableQueue.java} (55%) rename dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/{MasterDelayTaskExecuteRunnableDelayQueueLooper.java => GlobalMasterTaskExecuteRunnableQueueLooper.java} (83%) rename dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/{AsyncMasterDelayTaskExecuteRunnable.java => AsyncMasterTaskExecuteRunnable.java} (82%) rename dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/{AsyncMasterDelayTaskExecuteRunnableFactory.java => AsyncMasterTaskExecuteRunnableFactory.java} (83%) create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/BaseTaskExecuteRunnable.java delete mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterDelayTaskExecuteRunnable.java rename dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/{MasterDelayTaskExecuteRunnableFactory.java => MasterTaskExecuteRunnableFactory.java} (91%) rename dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/{PriorityTaskExecuteRunnable.java => PriorityDelayTaskExecuteRunnable.java} (58%) rename dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/{SyncMasterDelayTaskExecuteRunnable.java => SyncMasterTaskExecuteRunnable.java} (86%) rename dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/{SyncMasterDelayTaskExecuteRunnableFactory.java => SyncMasterTaskExecuteRunnableFactory.java} (81%) rename dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/{PriorityTaskExecuteRunnableTest.java => PriorityDelayTaskExecuteRunnableTest.java} (94%) delete mode 100644 dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java rename dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/{DefaultWorkerDelayTaskExecuteRunnable.java => DefaultWorkerTaskExecuteRunnable.java} (76%) rename dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/{WorkerDelayTaskExecuteRunnableFactory.java => DefaultWorkerTaskExecuteRunnableFactory.java} (53%) rename dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/{DefaultWorkerDelayTaskExecuteRunnableTest.java => DefaultWorkerTaskExecuteRunnableTest.java} (96%) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java index c60e860aa7354..3019228da6e6b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java @@ -23,8 +23,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; -import org.apache.dolphinscheduler.server.master.runner.MasterDelayTaskExecuteRunnableDelayQueue; -import org.apache.dolphinscheduler.server.master.runner.execute.MasterDelayTaskExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.GlobalMasterTaskExecuteRunnableQueue; +import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableFactoryBuilder; import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder; @@ -45,7 +45,7 @@ public class LogicITaskInstanceDispatchOperationFunction private MasterTaskExecuteRunnableFactoryBuilder masterTaskExecuteRunnableFactoryBuilder; @Autowired - private MasterDelayTaskExecuteRunnableDelayQueue masterDelayTaskExecuteRunnableDelayQueue; + private GlobalMasterTaskExecuteRunnableQueue globalMasterTaskExecuteRunnableQueue; @Override public LogicTaskDispatchResponse operate(LogicTaskDispatchRequest taskDispatchRequest) { @@ -79,18 +79,17 @@ public LogicTaskDispatchResponse operate(LogicTaskDispatchRequest taskDispatchRe return LogicTaskDispatchResponse.success(taskExecutionContext.getTaskInstanceId()); } } - final MasterDelayTaskExecuteRunnable masterDelayTaskExecuteRunnable = - masterTaskExecuteRunnableFactoryBuilder - .createWorkerDelayTaskExecuteRunnableFactory(taskExecutionContext.getTaskType()) - .createWorkerTaskExecuteRunnable(taskExecutionContext); - if (masterDelayTaskExecuteRunnableDelayQueue - .submitMasterDelayTaskExecuteRunnable(masterDelayTaskExecuteRunnable)) { + MasterTaskExecuteRunnable masterTaskExecuteRunnable = masterTaskExecuteRunnableFactoryBuilder + .createWorkerDelayTaskExecuteRunnableFactory(taskExecutionContext.getTaskType()) + .createWorkerTaskExecuteRunnable(taskExecutionContext); + if (globalMasterTaskExecuteRunnableQueue + .submitMasterTaskExecuteRunnable(masterTaskExecuteRunnable)) { log.info("Submit LogicTask: {} to MasterDelayTaskExecuteRunnableDelayQueue success", taskInstanceName); return LogicTaskDispatchResponse.success(taskInstanceId); } else { log.error( "Submit LogicTask: {} to MasterDelayTaskExecuteRunnableDelayQueue failed, current task waiting queue size: {} is full", - taskInstanceName, masterDelayTaskExecuteRunnableDelayQueue.size()); + taskInstanceName, globalMasterTaskExecuteRunnableQueue.size()); return LogicTaskDispatchResponse.failed(taskInstanceId, "MasterDelayTaskExecuteRunnableDelayQueue is full"); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java index 6b43a7690a891..927fa3627a91d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java @@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillResponse; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException; -import org.apache.dolphinscheduler.server.master.runner.MasterDelayTaskExecuteRunnableDelayQueue; +import org.apache.dolphinscheduler.server.master.runner.GlobalMasterTaskExecuteRunnableQueue; import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder; import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder; @@ -38,7 +38,7 @@ public class LogicITaskInstanceKillOperationFunction ITaskInstanceOperationFunction { @Autowired - private MasterDelayTaskExecuteRunnableDelayQueue masterDelayTaskExecuteRunnableDelayQueue; + private GlobalMasterTaskExecuteRunnableQueue globalMasterTaskExecuteRunnableQueue; @Override public LogicTaskKillResponse operate(LogicTaskKillRequest taskKillRequest) { @@ -54,8 +54,8 @@ public LogicTaskKillResponse operate(LogicTaskKillRequest taskKillRequest) { } try { masterTaskExecuteRunnable.cancelTask(); - masterDelayTaskExecuteRunnableDelayQueue - .removeMasterDelayTaskExecuteRunnable(masterTaskExecuteRunnable); + globalMasterTaskExecuteRunnableQueue + .removeMasterTaskExecuteRunnable(masterTaskExecuteRunnable); return LogicTaskKillResponse.success(); } catch (MasterTaskExecuteException e) { log.error("Cancel MasterTaskExecuteRunnable failed ", e); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueue.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueue.java similarity index 55% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueue.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueue.java index bdd1510527958..6356d3b515afc 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueue.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueue.java @@ -17,10 +17,10 @@ package org.apache.dolphinscheduler.server.master.runner; -import org.apache.dolphinscheduler.server.master.runner.execute.MasterDelayTaskExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable; -import java.util.concurrent.DelayQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import org.springframework.stereotype.Component; @@ -28,27 +28,27 @@ * */ @Component -public class MasterDelayTaskExecuteRunnableDelayQueue { +public class GlobalMasterTaskExecuteRunnableQueue { - private final DelayQueue masterDelayTaskExecuteRunnableDelayQueue = - new DelayQueue<>(); + private final BlockingQueue masterTaskExecuteRunnableBlockingQueue = + new LinkedBlockingQueue<>(); - public boolean submitMasterDelayTaskExecuteRunnable(MasterDelayTaskExecuteRunnable masterDelayTaskExecuteRunnable) { - return masterDelayTaskExecuteRunnableDelayQueue.offer(masterDelayTaskExecuteRunnable); + public boolean submitMasterTaskExecuteRunnable(MasterTaskExecuteRunnable masterTaskExecuteRunnable) { + return masterTaskExecuteRunnableBlockingQueue.offer(masterTaskExecuteRunnable); } - public MasterDelayTaskExecuteRunnable takeMasterDelayTaskExecuteRunnable() throws InterruptedException { - return masterDelayTaskExecuteRunnableDelayQueue.take(); + public MasterTaskExecuteRunnable takeMasterTaskExecuteRunnable() throws InterruptedException { + return masterTaskExecuteRunnableBlockingQueue.take(); } // todo: if we move the delay process to master, than we don't need this method, since dispatchProcess can directly // submit to thread pool - public boolean removeMasterDelayTaskExecuteRunnable(MasterTaskExecuteRunnable masterTaskExecuteRunnable) { - return masterDelayTaskExecuteRunnableDelayQueue.remove(masterTaskExecuteRunnable); + public boolean removeMasterTaskExecuteRunnable(MasterTaskExecuteRunnable masterTaskExecuteRunnable) { + return masterTaskExecuteRunnableBlockingQueue.remove(masterTaskExecuteRunnable); } public int size() { - return masterDelayTaskExecuteRunnableDelayQueue.size(); + return masterTaskExecuteRunnableBlockingQueue.size(); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueueLooper.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueueLooper.java similarity index 83% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueueLooper.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueueLooper.java index 557f2ca447cfd..98525a1411b16 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueueLooper.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueueLooper.java @@ -18,7 +18,7 @@ package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; -import org.apache.dolphinscheduler.server.master.runner.execute.MasterDelayTaskExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder; import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableThreadPool; @@ -31,17 +31,17 @@ @Slf4j @Component -public class MasterDelayTaskExecuteRunnableDelayQueueLooper extends BaseDaemonThread implements AutoCloseable { +public class GlobalMasterTaskExecuteRunnableQueueLooper extends BaseDaemonThread implements AutoCloseable { @Autowired - private MasterDelayTaskExecuteRunnableDelayQueue masterDelayTaskExecuteRunnableDelayQueue; + private GlobalMasterTaskExecuteRunnableQueue globalMasterTaskExecuteRunnableQueue; @Autowired private MasterTaskExecuteRunnableThreadPool masterTaskExecuteRunnableThreadPool; private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false); - public MasterDelayTaskExecuteRunnableDelayQueueLooper() { + public GlobalMasterTaskExecuteRunnableQueueLooper() { super("MasterDelayTaskExecuteRunnableDelayQueueLooper"); } @@ -61,10 +61,10 @@ public synchronized void start() { public void run() { while (RUNNING_FLAG.get()) { try { - final MasterDelayTaskExecuteRunnable masterDelayTaskExecuteRunnable = - masterDelayTaskExecuteRunnableDelayQueue.takeMasterDelayTaskExecuteRunnable(); - masterTaskExecuteRunnableThreadPool.submitMasterTaskExecuteRunnable(masterDelayTaskExecuteRunnable); - MasterTaskExecuteRunnableHolder.putMasterTaskExecuteRunnable(masterDelayTaskExecuteRunnable); + final MasterTaskExecuteRunnable masterTaskExecuteRunnable = + globalMasterTaskExecuteRunnableQueue.takeMasterTaskExecuteRunnable(); + masterTaskExecuteRunnableThreadPool.submitMasterTaskExecuteRunnable(masterTaskExecuteRunnable); + MasterTaskExecuteRunnableHolder.putMasterTaskExecuteRunnable(masterTaskExecuteRunnable); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); log.warn("MasterDelayTaskExecuteRunnableDelayQueueLooper has been interrupted, will stop loop"); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java index a8f8f884986b6..a8ae551ba8dd6 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java @@ -19,7 +19,7 @@ import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable; -import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.DelayQueue; import lombok.extern.slf4j.Slf4j; @@ -29,7 +29,7 @@ @Component public class GlobalTaskDispatchWaitingQueue { - private final PriorityBlockingQueue queue = new PriorityBlockingQueue<>(); + private final DelayQueue queue = new DelayQueue<>(); public void submitNeedToDispatchTaskExecuteRunnable(DefaultTaskExecuteRunnable priorityTaskExecuteRunnable) { queue.put(priorityTaskExecuteRunnable); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java index 744e560c007b8..3e99d2141cf69 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java @@ -32,7 +32,7 @@ public class MasterTaskExecutorBootstrap implements AutoCloseable { private GlobalTaskDispatchWaitingQueueLooper globalTaskDispatchWaitingQueueLooper; @Autowired - private MasterDelayTaskExecuteRunnableDelayQueueLooper masterDelayTaskExecuteRunnableDelayQueueLooper; + private GlobalMasterTaskExecuteRunnableQueueLooper globalMasterTaskExecuteRunnableQueueLooper; @Autowired private AsyncMasterTaskDelayQueueLooper asyncMasterTaskDelayQueueLooper; @@ -40,7 +40,7 @@ public class MasterTaskExecutorBootstrap implements AutoCloseable { public synchronized void start() { log.info("MasterTaskExecutorBootstrap starting..."); globalTaskDispatchWaitingQueueLooper.start(); - masterDelayTaskExecuteRunnableDelayQueueLooper.start(); + globalMasterTaskExecuteRunnableQueueLooper.start(); asyncMasterTaskDelayQueueLooper.start(); log.info("MasterTaskExecutorBootstrap started..."); } @@ -51,8 +51,8 @@ public void close() throws Exception { try ( final GlobalTaskDispatchWaitingQueueLooper globalTaskDispatchWaitingQueueLooper1 = globalTaskDispatchWaitingQueueLooper; - final MasterDelayTaskExecuteRunnableDelayQueueLooper masterDelayTaskExecuteRunnableDelayQueueLooper1 = - masterDelayTaskExecuteRunnableDelayQueueLooper; + final GlobalMasterTaskExecuteRunnableQueueLooper globalMasterTaskExecuteRunnableQueueLooper1 = + globalMasterTaskExecuteRunnableQueueLooper; final AsyncMasterTaskDelayQueueLooper asyncMasterTaskDelayQueueLooper1 = asyncMasterTaskDelayQueueLooper) { // closed the resource diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterDelayTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskExecuteRunnable.java similarity index 82% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterDelayTaskExecuteRunnable.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskExecuteRunnable.java index 8a6e6d8e870e8..f8c81cccb7374 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterDelayTaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskExecuteRunnable.java @@ -26,14 +26,14 @@ import lombok.extern.slf4j.Slf4j; @Slf4j -public class AsyncMasterDelayTaskExecuteRunnable extends MasterDelayTaskExecuteRunnable { +public class AsyncMasterTaskExecuteRunnable extends MasterTaskExecuteRunnable { private final AsyncMasterTaskDelayQueue asyncMasterTaskDelayQueue; - public AsyncMasterDelayTaskExecuteRunnable(TaskExecutionContext taskExecutionContext, - LogicTaskPluginFactoryBuilder logicTaskPluginFactoryBuilder, - LogicTaskInstanceExecutionEventSenderManager logicTaskInstanceExecutionEventSenderManager, - AsyncMasterTaskDelayQueue asyncTaskDelayQueue) { + public AsyncMasterTaskExecuteRunnable(TaskExecutionContext taskExecutionContext, + LogicTaskPluginFactoryBuilder logicTaskPluginFactoryBuilder, + LogicTaskInstanceExecutionEventSenderManager logicTaskInstanceExecutionEventSenderManager, + AsyncMasterTaskDelayQueue asyncTaskDelayQueue) { super(taskExecutionContext, logicTaskPluginFactoryBuilder, logicTaskInstanceExecutionEventSenderManager); this.asyncMasterTaskDelayQueue = asyncTaskDelayQueue; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterDelayTaskExecuteRunnableFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskExecuteRunnableFactory.java similarity index 83% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterDelayTaskExecuteRunnableFactory.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskExecuteRunnableFactory.java index a71f394b7d120..95b8df6fd5481 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterDelayTaskExecuteRunnableFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskExecuteRunnableFactory.java @@ -25,9 +25,9 @@ import org.springframework.stereotype.Component; @Component -public class AsyncMasterDelayTaskExecuteRunnableFactory +public class AsyncMasterTaskExecuteRunnableFactory implements - MasterDelayTaskExecuteRunnableFactory { + MasterTaskExecuteRunnableFactory { @Autowired private LogicTaskPluginFactoryBuilder logicTaskPluginFactoryBuilder; @@ -39,8 +39,8 @@ public class AsyncMasterDelayTaskExecuteRunnableFactory private AsyncMasterTaskDelayQueue asyncTaskDelayQueue; @Override - public AsyncMasterDelayTaskExecuteRunnable createWorkerTaskExecuteRunnable(TaskExecutionContext taskExecutionContext) { - return new AsyncMasterDelayTaskExecuteRunnable(taskExecutionContext, + public AsyncMasterTaskExecuteRunnable createWorkerTaskExecuteRunnable(TaskExecutionContext taskExecutionContext) { + return new AsyncMasterTaskExecuteRunnable(taskExecutionContext, logicTaskPluginFactoryBuilder, logicTaskInstanceExecutionEventSenderManager, asyncTaskDelayQueue); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java index e9d4fe4430a6a..29478f2e0aaf5 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java @@ -26,41 +26,41 @@ @Slf4j public class AsyncTaskCallbackFunctionImpl implements AsyncTaskCallbackFunction { - private final AsyncMasterDelayTaskExecuteRunnable asyncMasterDelayTaskExecuteRunnable; + private final AsyncMasterTaskExecuteRunnable asyncMasterTaskExecuteRunnable; - public AsyncTaskCallbackFunctionImpl(@NonNull AsyncMasterDelayTaskExecuteRunnable asyncMasterDelayTaskExecuteRunnable) { - this.asyncMasterDelayTaskExecuteRunnable = asyncMasterDelayTaskExecuteRunnable; + public AsyncTaskCallbackFunctionImpl(@NonNull AsyncMasterTaskExecuteRunnable asyncMasterTaskExecuteRunnable) { + this.asyncMasterTaskExecuteRunnable = asyncMasterTaskExecuteRunnable; } @Override public void executeSuccess() { - asyncMasterDelayTaskExecuteRunnable.getTaskExecutionContext() + asyncMasterTaskExecuteRunnable.getTaskExecutionContext() .setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS); executeFinished(); } @Override public void executeFailed() { - asyncMasterDelayTaskExecuteRunnable.getTaskExecutionContext() + asyncMasterTaskExecuteRunnable.getTaskExecutionContext() .setCurrentExecutionStatus(TaskExecutionStatus.FAILURE); executeFinished(); } @Override public void executeThrowing(Throwable throwable) { - asyncMasterDelayTaskExecuteRunnable.afterThrowing(throwable); + asyncMasterTaskExecuteRunnable.afterThrowing(throwable); } private void executeFinished() { TaskInstanceLogHeader.printFinalizeTaskHeader(); - int taskInstanceId = asyncMasterDelayTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId(); + int taskInstanceId = asyncMasterTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId(); MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskInstanceId); MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskInstanceId); log.info("Task execute finished, removed the TaskExecutionContext"); - asyncMasterDelayTaskExecuteRunnable.sendTaskResult(); + asyncMasterTaskExecuteRunnable.sendTaskResult(); log.info( "Execute task finished, will send the task execute result to master, the current task execute result is {}", - asyncMasterDelayTaskExecuteRunnable.getTaskExecutionContext().getCurrentExecutionStatus().name()); - asyncMasterDelayTaskExecuteRunnable.closeLogAppender(); + asyncMasterTaskExecuteRunnable.getTaskExecutionContext().getCurrentExecutionStatus().name()); + asyncMasterTaskExecuteRunnable.closeLogAppender(); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/BaseTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/BaseTaskExecuteRunnable.java new file mode 100644 index 0000000000000..a574c271b7bae --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/BaseTaskExecuteRunnable.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.execute; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; + +public abstract class BaseTaskExecuteRunnable implements TaskExecuteRunnable { + + protected final ProcessInstance workflowInstance; + protected final TaskInstance taskInstance; + protected final TaskExecutionContext taskExecutionContext; + + public BaseTaskExecuteRunnable(ProcessInstance workflowInstance, + TaskInstance taskInstance, + TaskExecutionContext taskExecutionContext) { + this.taskInstance = checkNotNull(taskInstance); + this.workflowInstance = checkNotNull(workflowInstance); + this.taskExecutionContext = checkNotNull(taskExecutionContext); + } + + @Override + public ProcessInstance getWorkflowInstance() { + return workflowInstance; + } + + @Override + public TaskInstance getTaskInstance() { + return taskInstance; + } + + @Override + public TaskExecutionContext getTaskExecutionContext() { + return taskExecutionContext; + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnable.java index 6f736139b6bca..4fa9d918729a9 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnable.java @@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.server.master.runner.operator.TaskExecuteRunnableOperatorManager; -public class DefaultTaskExecuteRunnable extends PriorityTaskExecuteRunnable { +public class DefaultTaskExecuteRunnable extends PriorityDelayTaskExecuteRunnable { private final TaskExecuteRunnableOperatorManager taskExecuteRunnableOperatorManager; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterDelayTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterDelayTaskExecuteRunnable.java deleted file mode 100644 index 70f3e93521331..0000000000000 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterDelayTaskExecuteRunnable.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.runner.execute; - -import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.server.master.runner.message.LogicTaskInstanceExecutionEventSenderManager; -import org.apache.dolphinscheduler.server.master.runner.task.LogicTaskPluginFactoryBuilder; - -import java.util.concurrent.Delayed; -import java.util.concurrent.TimeUnit; - -public abstract class MasterDelayTaskExecuteRunnable extends MasterTaskExecuteRunnable implements Delayed { - - public MasterDelayTaskExecuteRunnable(TaskExecutionContext taskExecutionContext, - LogicTaskPluginFactoryBuilder logicTaskPluginFactoryBuilder, - LogicTaskInstanceExecutionEventSenderManager logicTaskInstanceExecutionEventSenderManager) { - super(taskExecutionContext, logicTaskPluginFactoryBuilder, logicTaskInstanceExecutionEventSenderManager); - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof MasterDelayTaskExecuteRunnable)) { - return false; - } - MasterDelayTaskExecuteRunnable other = (MasterDelayTaskExecuteRunnable) obj; - return other.getTaskExecutionContext().getTaskInstanceId() == this.getTaskExecutionContext() - .getTaskInstanceId(); - } - - @Override - public int hashCode() { - return this.getTaskExecutionContext().getTaskInstanceId(); - } - - @Override - public long getDelay(TimeUnit unit) { - TaskExecutionContext taskExecutionContext = getTaskExecutionContext(); - return unit.convert( - DateUtils.getRemainTime( - taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L), - TimeUnit.SECONDS); - } - - @Override - public int compareTo(Delayed o) { - if (o == null) { - return 1; - } - return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); - } - -} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterDelayTaskExecuteRunnableFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactory.java similarity index 91% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterDelayTaskExecuteRunnableFactory.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactory.java index 0fd79dfabe545..105b1f7c2c4f8 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterDelayTaskExecuteRunnableFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactory.java @@ -19,7 +19,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -public interface MasterDelayTaskExecuteRunnableFactory { +public interface MasterTaskExecuteRunnableFactory { T createWorkerTaskExecuteRunnable(TaskExecutionContext taskExecutionContext); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactoryBuilder.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactoryBuilder.java index c5689f6a1b0f9..b8328ac983f6e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactoryBuilder.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactoryBuilder.java @@ -32,17 +32,17 @@ public class MasterTaskExecuteRunnableFactoryBuilder { @Autowired - private AsyncMasterDelayTaskExecuteRunnableFactory asyncMasterDelayTaskExecuteRunnableFactory; + private AsyncMasterTaskExecuteRunnableFactory asyncMasterDelayTaskExecuteRunnableFactory; @Autowired - private SyncMasterDelayTaskExecuteRunnableFactory syncMasterDelayTaskExecuteRunnableFactory; + private SyncMasterTaskExecuteRunnableFactory syncMasterDelayTaskExecuteRunnableFactory; private static final Set ASYNC_TASK_TYPE = Sets.newHashSet( DependentLogicTask.TASK_TYPE, SubWorkflowLogicTask.TASK_TYPE, DynamicLogicTask.TASK_TYPE); - public MasterDelayTaskExecuteRunnableFactory createWorkerDelayTaskExecuteRunnableFactory(String taskType) { + public MasterTaskExecuteRunnableFactory createWorkerDelayTaskExecuteRunnableFactory(String taskType) { if (ASYNC_TASK_TYPE.contains(taskType)) { return asyncMasterDelayTaskExecuteRunnableFactory; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityDelayTaskExecuteRunnable.java similarity index 58% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnable.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityDelayTaskExecuteRunnable.java index 2e23feb7bd845..576deb5c7b5a8 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityDelayTaskExecuteRunnable.java @@ -17,60 +17,55 @@ package org.apache.dolphinscheduler.server.master.runner.execute; -import static com.google.common.base.Preconditions.checkNotNull; - +import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.jetbrains.annotations.NotNull; - -public abstract class PriorityTaskExecuteRunnable implements TaskExecuteRunnable, Comparable { - - private final ProcessInstance workflowInstance; - private final TaskInstance taskInstance; - private final TaskExecutionContext taskExecutionContext; - - public PriorityTaskExecuteRunnable(ProcessInstance workflowInstance, - TaskInstance taskInstance, - TaskExecutionContext taskExecutionContext) { - this.taskInstance = checkNotNull(taskInstance); - this.workflowInstance = checkNotNull(workflowInstance); - this.taskExecutionContext = checkNotNull(taskExecutionContext); - } +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; - @Override - public ProcessInstance getWorkflowInstance() { - return workflowInstance; - } +public abstract class PriorityDelayTaskExecuteRunnable extends BaseTaskExecuteRunnable implements Delayed { - @Override - public TaskInstance getTaskInstance() { - return taskInstance; + public PriorityDelayTaskExecuteRunnable(ProcessInstance workflowInstance, + TaskInstance taskInstance, + TaskExecutionContext taskExecutionContext) { + super(workflowInstance, taskInstance, taskExecutionContext); } @Override - public TaskExecutionContext getTaskExecutionContext() { - return taskExecutionContext; + public long getDelay(TimeUnit unit) { + return unit.convert( + DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), + taskExecutionContext.getDelayTime() * 60L), + TimeUnit.SECONDS); } @Override - public int compareTo(@NotNull TaskExecuteRunnable other) { + public int compareTo(Delayed o) { + if (o == null) { + return 1; + } + int delayTimeCompareResult = + Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); + if (delayTimeCompareResult != 0) { + return delayTimeCompareResult; + } + PriorityDelayTaskExecuteRunnable other = (PriorityDelayTaskExecuteRunnable) o; // the smaller dispatch fail times, the higher priority int dispatchFailTimesCompareResult = taskExecutionContext.getDispatchFailTimes() - other.getTaskExecutionContext().getDispatchFailTimes(); if (dispatchFailTimesCompareResult != 0) { return dispatchFailTimesCompareResult; } - int workflowInstancePriorityCompareResult = workflowInstance.getProcessInstancePriority().getCode() - other.getWorkflowInstance().getProcessInstancePriority().getCode(); if (workflowInstancePriorityCompareResult != 0) { return workflowInstancePriorityCompareResult; } - int workflowInstanceIdCompareResult = workflowInstance.getId() - other.getWorkflowInstance().getId(); + long workflowInstanceIdCompareResult = workflowInstance.getId().compareTo(other.getWorkflowInstance().getId()); if (workflowInstanceIdCompareResult != 0) { - return workflowInstanceIdCompareResult; + return workflowInstancePriorityCompareResult; } int taskInstancePriorityCompareResult = taskInstance.getTaskInstancePriority().getCode() - other.getTaskInstance().getTaskInstancePriority().getCode(); @@ -84,21 +79,7 @@ public int compareTo(@NotNull TaskExecuteRunnable other) { return -taskGroupPriorityCompareResult; } // The task instance shouldn't be equals - return taskInstance.getId() - other.getTaskInstance().getId(); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof PriorityTaskExecuteRunnable) { - PriorityTaskExecuteRunnable other = (PriorityTaskExecuteRunnable) obj; - return compareTo(other) == 0; - } - return false; - } - - @Override - public int hashCode() { - return taskInstance.getId(); + return taskInstance.getId().compareTo(other.getTaskInstance().getId()); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterTaskExecuteRunnable.java similarity index 86% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnable.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterTaskExecuteRunnable.java index 62365f78cba55..2e3fcc6cf6db3 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterTaskExecuteRunnable.java @@ -27,11 +27,11 @@ import lombok.extern.slf4j.Slf4j; @Slf4j -public class SyncMasterDelayTaskExecuteRunnable extends MasterDelayTaskExecuteRunnable { +public class SyncMasterTaskExecuteRunnable extends MasterTaskExecuteRunnable { - public SyncMasterDelayTaskExecuteRunnable(TaskExecutionContext taskExecutionContext, - LogicTaskPluginFactoryBuilder logicTaskPluginFactoryBuilder, - LogicTaskInstanceExecutionEventSenderManager logicTaskInstanceExecutionEventSenderManager) { + public SyncMasterTaskExecuteRunnable(TaskExecutionContext taskExecutionContext, + LogicTaskPluginFactoryBuilder logicTaskPluginFactoryBuilder, + LogicTaskInstanceExecutionEventSenderManager logicTaskInstanceExecutionEventSenderManager) { super(taskExecutionContext, logicTaskPluginFactoryBuilder, logicTaskInstanceExecutionEventSenderManager); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnableFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterTaskExecuteRunnableFactory.java similarity index 81% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnableFactory.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterTaskExecuteRunnableFactory.java index 2bf829c02132d..79dc7b18755d7 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnableFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterTaskExecuteRunnableFactory.java @@ -28,9 +28,9 @@ @Slf4j @Component -public class SyncMasterDelayTaskExecuteRunnableFactory +public class SyncMasterTaskExecuteRunnableFactory implements - MasterDelayTaskExecuteRunnableFactory { + MasterTaskExecuteRunnableFactory { @Autowired private LogicTaskPluginFactoryBuilder logicTaskPluginFactoryBuilder; @@ -38,8 +38,8 @@ public class SyncMasterDelayTaskExecuteRunnableFactory private LogicTaskInstanceExecutionEventSenderManager logicTaskInstanceExecutionEventSenderManager; @Override - public SyncMasterDelayTaskExecuteRunnable createWorkerTaskExecuteRunnable(TaskExecutionContext taskExecutionContext) { - return new SyncMasterDelayTaskExecuteRunnable(taskExecutionContext, logicTaskPluginFactoryBuilder, + public SyncMasterTaskExecuteRunnable createWorkerTaskExecuteRunnable(TaskExecutionContext taskExecutionContext) { + return new SyncMasterTaskExecuteRunnable(taskExecutionContext, logicTaskPluginFactoryBuilder, logicTaskInstanceExecutionEventSenderManager); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecuteRunnable.java index 02980aff49fa1..96df56abb7224 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecuteRunnable.java @@ -25,7 +25,7 @@ * This interface is used to define a task which is executing. * todo: split to MasterTaskExecuteRunnable and WorkerTaskExecuteRunnable */ -public interface TaskExecuteRunnable extends Comparable { +public interface TaskExecuteRunnable { void dispatch(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java index 6294f581ecce2..3c96b268dbe64 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java @@ -17,19 +17,41 @@ package org.apache.dolphinscheduler.server.master.runner.operator; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.server.master.runner.GlobalTaskDispatchWaitingQueue; import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable; +import java.util.concurrent.TimeUnit; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j public abstract class BaseTaskExecuteRunnableDispatchOperator implements TaskExecuteRunnableOperator { private final GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue; - public BaseTaskExecuteRunnableDispatchOperator(GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue) { + private final TaskInstanceDao taskInstanceDao; + + public BaseTaskExecuteRunnableDispatchOperator( + GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue, + TaskInstanceDao taskInstanceDao) { this.globalTaskDispatchWaitingQueue = globalTaskDispatchWaitingQueue; + this.taskInstanceDao = taskInstanceDao; } @Override public void operate(DefaultTaskExecuteRunnable taskExecuteRunnable) { + long remainTime = taskExecuteRunnable.getDelay(TimeUnit.SECONDS); + TaskInstance taskInstance = taskExecuteRunnable.getTaskInstance(); + if (remainTime > 0) { + taskInstance.setState(TaskExecutionStatus.DELAY_EXECUTION); + taskInstanceDao.updateById(taskInstance); + log.info("Current taskInstance: {} is choose delay execution, delay time: {}/s, remainTime: {}/s", + taskInstance.getName(), + TimeUnit.SECONDS.toMillis(taskInstance.getDelayTime()), remainTime); + } globalTaskDispatchWaitingQueue.submitNeedToDispatchTaskExecuteRunnable(taskExecuteRunnable); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableDispatchOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableDispatchOperator.java index d5b0802372882..ed1b777aebb14 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableDispatchOperator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableDispatchOperator.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.runner.operator; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.server.master.runner.GlobalTaskDispatchWaitingQueue; import org.springframework.stereotype.Component; @@ -24,8 +25,9 @@ @Component public class LogicTaskExecuteRunnableDispatchOperator extends BaseTaskExecuteRunnableDispatchOperator { - public LogicTaskExecuteRunnableDispatchOperator(GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue) { - super(globalTaskDispatchWaitingQueue); + public LogicTaskExecuteRunnableDispatchOperator(GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue, + TaskInstanceDao taskInstanceDao) { + super(globalTaskDispatchWaitingQueue, taskInstanceDao); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableDispatchOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableDispatchOperator.java index 5a9f070641648..5a31f1138df64 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableDispatchOperator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableDispatchOperator.java @@ -17,20 +17,16 @@ package org.apache.dolphinscheduler.server.master.runner.operator; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.server.master.runner.GlobalTaskDispatchWaitingQueue; -import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component -public class TaskExecuteRunnableDispatchOperator implements TaskExecuteRunnableOperator { +public class TaskExecuteRunnableDispatchOperator extends BaseTaskExecuteRunnableDispatchOperator { - @Autowired - private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue; - - @Override - public void operate(DefaultTaskExecuteRunnable taskExecuteRunnable) { - globalTaskDispatchWaitingQueue.submitNeedToDispatchTaskExecuteRunnable(taskExecuteRunnable); + public TaskExecuteRunnableDispatchOperator(GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue, + TaskInstanceDao taskInstanceDao) { + super(globalTaskDispatchWaitingQueue, taskInstanceDao); } } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnableTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityDelayTaskExecuteRunnableTest.java similarity index 94% rename from dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnableTest.java rename to dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityDelayTaskExecuteRunnableTest.java index 3a3a80b3cae2a..79fe1439d4207 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnableTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityDelayTaskExecuteRunnableTest.java @@ -26,7 +26,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -public class PriorityTaskExecuteRunnableTest { +public class PriorityDelayTaskExecuteRunnableTest { @Test public void testCompareTo() { @@ -46,9 +46,9 @@ public void testCompareTo() { TaskExecutionContext context1 = new TaskExecutionContext(); TaskExecutionContext context2 = new TaskExecutionContext(); - PriorityTaskExecuteRunnable p1 = + PriorityDelayTaskExecuteRunnable p1 = new DefaultTaskExecuteRunnable(workflowInstance, t1, context1, taskOperatorManager); - PriorityTaskExecuteRunnable p2 = + PriorityDelayTaskExecuteRunnable p2 = new DefaultTaskExecuteRunnable(workflowInstance, t2, context2, taskOperatorManager); Assertions.assertEquals(0, p1.compareTo(p2)); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java index 3c24977c4a09c..d5a858a437f0b 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.plugin.task.api.shell; +import org.apache.dolphinscheduler.common.constants.TenantConstants; import org.apache.dolphinscheduler.common.exception.FileOperateException; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.PropertyUtils; @@ -67,8 +68,10 @@ protected void generateShellScript() throws IOException { protected List generateBootstrapCommand() throws FileOperateException { if (sudoEnable) { - // Set the tenant owner as the working directory - FileUtils.setDirectoryOwner(Paths.get(shellDirectory), runUser); + if (!TenantConstants.BOOTSTRAPT_SYSTEM_USER.equals(runUser)) { + // Set the tenant owner as the working directory + FileUtils.setDirectoryOwner(Paths.get(shellDirectory), runUser); + } return bootstrapCommandInSudoMode(); } return bootstrapCommandInNormalMode(); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java deleted file mode 100644 index 631b29cc8e550..0000000000000 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.worker.runner; - -import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; -import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; -import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; -import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; - -import javax.annotation.Nullable; - -import lombok.NonNull; - -public class DefaultWorkerDelayTaskExecuteRunnableFactory - extends - WorkerDelayTaskExecuteRunnableFactory { - - protected DefaultWorkerDelayTaskExecuteRunnableFactory(@NonNull TaskExecutionContext taskExecutionContext, - @NonNull WorkerConfig workerConfig, - @NonNull WorkerMessageSender workerMessageSender, - @NonNull TaskPluginManager taskPluginManager, - @Nullable StorageOperate storageOperate, - @NonNull WorkerRegistryClient workerRegistryClient) { - super(taskExecutionContext, - workerConfig, - workerMessageSender, - taskPluginManager, - storageOperate, - workerRegistryClient); - } - - @Override - public DefaultWorkerDelayTaskExecuteRunnable createWorkerTaskExecuteRunnable() { - return new DefaultWorkerDelayTaskExecuteRunnable( - taskExecutionContext, - workerConfig, - workerMessageSender, - taskPluginManager, - storageOperate, - workerRegistryClient); - } -} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecuteRunnable.java similarity index 76% rename from dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java rename to dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecuteRunnable.java index 82b91e16baaaa..1be333b750aa2 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecuteRunnable.java @@ -30,14 +30,14 @@ import lombok.NonNull; -public class DefaultWorkerDelayTaskExecuteRunnable extends WorkerDelayTaskExecuteRunnable { +public class DefaultWorkerTaskExecuteRunnable extends WorkerTaskExecuteRunnable { - public DefaultWorkerDelayTaskExecuteRunnable(@NonNull TaskExecutionContext taskExecutionContext, - @NonNull WorkerConfig workerConfig, - @NonNull WorkerMessageSender workerMessageSender, - @NonNull TaskPluginManager taskPluginManager, - @Nullable StorageOperate storageOperate, - @NonNull WorkerRegistryClient workerRegistryClient) { + public DefaultWorkerTaskExecuteRunnable(@NonNull TaskExecutionContext taskExecutionContext, + @NonNull WorkerConfig workerConfig, + @NonNull WorkerMessageSender workerMessageSender, + @NonNull TaskPluginManager taskPluginManager, + @Nullable StorageOperate storageOperate, + @NonNull WorkerRegistryClient workerRegistryClient) { super(taskExecutionContext, workerConfig, workerMessageSender, diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecuteRunnableFactory.java similarity index 53% rename from dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java rename to dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecuteRunnableFactory.java index f2d39835b1459..245809f2bda2c 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecuteRunnableFactory.java @@ -28,24 +28,23 @@ import lombok.NonNull; -public abstract class WorkerDelayTaskExecuteRunnableFactory +public class DefaultWorkerTaskExecuteRunnableFactory implements - WorkerTaskExecuteRunnableFactory { - - protected final @NonNull TaskExecutionContext taskExecutionContext; - protected final @NonNull WorkerConfig workerConfig; - protected final @NonNull WorkerMessageSender workerMessageSender; - protected final @NonNull TaskPluginManager taskPluginManager; - protected final @Nullable StorageOperate storageOperate; - protected final @NonNull WorkerRegistryClient workerRegistryClient; - - protected WorkerDelayTaskExecuteRunnableFactory( - @NonNull TaskExecutionContext taskExecutionContext, - @NonNull WorkerConfig workerConfig, - @NonNull WorkerMessageSender workerMessageSender, - @NonNull TaskPluginManager taskPluginManager, - @Nullable StorageOperate storageOperate, - @NonNull WorkerRegistryClient workerRegistryClient) { + WorkerTaskExecuteRunnableFactory { + + private final @NonNull TaskExecutionContext taskExecutionContext; + private final @NonNull WorkerConfig workerConfig; + private final @NonNull WorkerMessageSender workerMessageSender; + private final @NonNull TaskPluginManager taskPluginManager; + private final @Nullable StorageOperate storageOperate; + private final @NonNull WorkerRegistryClient workerRegistryClient; + + public DefaultWorkerTaskExecuteRunnableFactory(@NonNull TaskExecutionContext taskExecutionContext, + @NonNull WorkerConfig workerConfig, + @NonNull WorkerMessageSender workerMessageSender, + @NonNull TaskPluginManager taskPluginManager, + @Nullable StorageOperate storageOperate, + @NonNull WorkerRegistryClient workerRegistryClient) { this.taskExecutionContext = taskExecutionContext; this.workerConfig = workerConfig; this.workerMessageSender = workerMessageSender; @@ -54,5 +53,14 @@ protected WorkerDelayTaskExecuteRunnableFactory( this.workerRegistryClient = workerRegistryClient; } - public abstract T createWorkerTaskExecuteRunnable(); + @Override + public DefaultWorkerTaskExecuteRunnable createWorkerTaskExecuteRunnable() { + return new DefaultWorkerTaskExecuteRunnable( + taskExecutionContext, + workerConfig, + workerMessageSender, + taskPluginManager, + storageOperate, + workerRegistryClient); + } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceDispatchQueueLooper.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceDispatchQueueLooper.java index 654a1dd9fec90..6cda398f76c02 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceDispatchQueueLooper.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceDispatchQueueLooper.java @@ -18,12 +18,9 @@ package org.apache.dolphinscheduler.server.worker.runner; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; -import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.extract.master.transportor.ITaskInstanceExecutionEvent; import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; -import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; @@ -76,23 +73,8 @@ public void run() { LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath()); LogUtils.setTaskInstanceIdMDC(taskExecutionContext.getTaskInstanceId()); - int delayTime = taskExecutionContext.getDelayTime(); - if (delayTime > 0) { - // delay task process - long remainTime = - DateUtils.getRemainTime( - DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()), - delayTime * 60L); - if (remainTime > 0) { - log.info("Current taskInstance is choose delay execution, delay time: {}s", remainTime); - taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION); - // todo: use delay running event - workerMessageSender.sendMessage(taskExecutionContext, - ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH); - } - } - WorkerDelayTaskExecuteRunnable workerTaskExecuteRunnable = WorkerTaskExecuteRunnableFactoryBuilder - .createWorkerDelayTaskExecuteRunnableFactory( + WorkerTaskExecuteRunnable workerTaskExecuteRunnable = WorkerTaskExecuteRunnableFactoryBuilder + .createWorkerTaskExecuteRunnableFactory( taskExecutionContext, workerConfig, workerMessageSender, diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java index 3fdcec4e891b0..a6a9fbe9b775d 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java @@ -25,8 +25,9 @@ import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.DelayQueue; +import java.util.concurrent.LinkedBlockingQueue; import javax.annotation.Nullable; @@ -41,7 +42,7 @@ @Slf4j public class WorkerManagerThread implements Runnable { - private final DelayQueue waitSubmitQueue; + private final BlockingQueue waitSubmitQueue; private final WorkerExecService workerExecService; private final WorkerConfig workerConfig; @@ -56,7 +57,7 @@ public class WorkerManagerThread implements Runnable { public WorkerManagerThread(WorkerConfig workerConfig) { this.workerConfig = workerConfig; workerExecThreads = workerConfig.getExecThreads(); - this.waitSubmitQueue = new DelayQueue<>(); + this.waitSubmitQueue = new LinkedBlockingQueue<>(); workerExecService = new WorkerExecService( ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getExecThreads()), taskExecuteThreadMap); @@ -95,7 +96,7 @@ public void killTaskBeforeExecuteByInstanceId(Integer taskInstanceId) { .forEach(waitSubmitQueue::remove); } - public boolean offer(WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable) { + public boolean offer(WorkerTaskExecuteRunnable workerDelayTaskExecuteRunnable) { return waitSubmitQueue.add(workerDelayTaskExecuteRunnable); } @@ -122,8 +123,8 @@ public void run() { Thread.sleep(Constants.SLEEP_TIME_MILLIS); } if (this.getThreadPoolQueueSize() <= workerExecThreads) { - final WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable = waitSubmitQueue.take(); - workerExecService.submit(workerDelayTaskExecuteRunnable); + WorkerTaskExecuteRunnable workerTaskExecuteRunnable = waitSubmitQueue.take(); + workerExecService.submit(workerTaskExecuteRunnable); } else { WorkerServerMetrics.incWorkerOverloadCount(); log.info("Exec queue is full, waiting submit queue {}, waiting exec queue size {}", diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java index 603462e5b0dba..ba48f7c3a9601 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java @@ -32,13 +32,13 @@ @UtilityClass public class WorkerTaskExecuteRunnableFactoryBuilder { - public static WorkerDelayTaskExecuteRunnableFactory createWorkerDelayTaskExecuteRunnableFactory(@NonNull TaskExecutionContext taskExecutionContext, - @NonNull WorkerConfig workerConfig, - @NonNull WorkerMessageSender workerMessageSender, - @NonNull TaskPluginManager taskPluginManager, - @Nullable StorageOperate storageOperate, - @NonNull WorkerRegistryClient workerRegistryClient) { - return new DefaultWorkerDelayTaskExecuteRunnableFactory(taskExecutionContext, + public static WorkerTaskExecuteRunnableFactory createWorkerTaskExecuteRunnableFactory(@NonNull TaskExecutionContext taskExecutionContext, + @NonNull WorkerConfig workerConfig, + @NonNull WorkerMessageSender workerMessageSender, + @NonNull TaskPluginManager taskPluginManager, + @Nullable StorageOperate storageOperate, + @NonNull WorkerRegistryClient workerRegistryClient) { + return new DefaultWorkerTaskExecuteRunnableFactory(taskExecutionContext, workerConfig, workerMessageSender, taskPluginManager, diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecuteRunnableTest.java similarity index 96% rename from dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java rename to dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecuteRunnableTest.java index 79d04edae1290..662d879217013 100644 --- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecuteRunnableTest.java @@ -30,7 +30,7 @@ import org.junit.jupiter.api.Test; import org.mockito.Mockito; -public class DefaultWorkerDelayTaskExecuteRunnableTest { +public class DefaultWorkerTaskExecuteRunnableTest { private TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class); @@ -54,7 +54,7 @@ public void testDryRun() { .processDefineId(0) .firstSubmitTime(System.currentTimeMillis()) .build(); - WorkerTaskExecuteRunnable workerTaskExecuteRunnable = new DefaultWorkerDelayTaskExecuteRunnable( + WorkerTaskExecuteRunnable workerTaskExecuteRunnable = new DefaultWorkerTaskExecuteRunnable( taskExecutionContext, workerConfig, workerMessageSender, @@ -78,7 +78,7 @@ public void testErrorboundTestDataSource() { .taskParams( "{\"localParams\":[],\"resourceList\":[],\"type\":\"POSTGRESQL\",\"datasource\":null,\"sql\":\"select * from t_ds_user\",\"sqlType\":\"0\",\"preStatements\":[],\"postStatements\":[],\"segmentSeparator\":\"\",\"displayRows\":10,\"conditionResult\":\"null\",\"dependence\":\"null\",\"switchResult\":\"null\",\"waitStartTimeout\":null}") .build(); - WorkerTaskExecuteRunnable workerTaskExecuteRunnable = new DefaultWorkerDelayTaskExecuteRunnable( + WorkerTaskExecuteRunnable workerTaskExecuteRunnable = new DefaultWorkerTaskExecuteRunnable( taskExecutionContext, workerConfig, workerMessageSender,