Skip to content

Commit

Permalink
Move delay calculation to Master
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Dec 5, 2023
1 parent db3d84b commit 5f10044
Show file tree
Hide file tree
Showing 30 changed files with 245 additions and 323 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.runner.MasterDelayTaskExecuteRunnableDelayQueue;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterDelayTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.GlobalMasterTaskExecuteRunnableQueue;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableFactoryBuilder;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder;

Expand All @@ -45,7 +45,7 @@ public class LogicITaskInstanceDispatchOperationFunction
private MasterTaskExecuteRunnableFactoryBuilder masterTaskExecuteRunnableFactoryBuilder;

@Autowired
private MasterDelayTaskExecuteRunnableDelayQueue masterDelayTaskExecuteRunnableDelayQueue;
private GlobalMasterTaskExecuteRunnableQueue globalMasterTaskExecuteRunnableQueue;

@Override
public LogicTaskDispatchResponse operate(LogicTaskDispatchRequest taskDispatchRequest) {
Expand Down Expand Up @@ -79,18 +79,17 @@ public LogicTaskDispatchResponse operate(LogicTaskDispatchRequest taskDispatchRe
return LogicTaskDispatchResponse.success(taskExecutionContext.getTaskInstanceId());
}
}
final MasterDelayTaskExecuteRunnable masterDelayTaskExecuteRunnable =
masterTaskExecuteRunnableFactoryBuilder
.createWorkerDelayTaskExecuteRunnableFactory(taskExecutionContext.getTaskType())
.createWorkerTaskExecuteRunnable(taskExecutionContext);
if (masterDelayTaskExecuteRunnableDelayQueue
.submitMasterDelayTaskExecuteRunnable(masterDelayTaskExecuteRunnable)) {
MasterTaskExecuteRunnable masterTaskExecuteRunnable = masterTaskExecuteRunnableFactoryBuilder
.createWorkerDelayTaskExecuteRunnableFactory(taskExecutionContext.getTaskType())
.createWorkerTaskExecuteRunnable(taskExecutionContext);
if (globalMasterTaskExecuteRunnableQueue
.submitMasterTaskExecuteRunnable(masterTaskExecuteRunnable)) {
log.info("Submit LogicTask: {} to MasterDelayTaskExecuteRunnableDelayQueue success", taskInstanceName);
return LogicTaskDispatchResponse.success(taskInstanceId);
} else {
log.error(
"Submit LogicTask: {} to MasterDelayTaskExecuteRunnableDelayQueue failed, current task waiting queue size: {} is full",
taskInstanceName, masterDelayTaskExecuteRunnableDelayQueue.size());
taskInstanceName, globalMasterTaskExecuteRunnableQueue.size());
return LogicTaskDispatchResponse.failed(taskInstanceId,
"MasterDelayTaskExecuteRunnableDelayQueue is full");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
import org.apache.dolphinscheduler.server.master.runner.MasterDelayTaskExecuteRunnableDelayQueue;
import org.apache.dolphinscheduler.server.master.runner.GlobalMasterTaskExecuteRunnableQueue;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder;
Expand All @@ -38,7 +38,7 @@ public class LogicITaskInstanceKillOperationFunction
ITaskInstanceOperationFunction<LogicTaskKillRequest, LogicTaskKillResponse> {

@Autowired
private MasterDelayTaskExecuteRunnableDelayQueue masterDelayTaskExecuteRunnableDelayQueue;
private GlobalMasterTaskExecuteRunnableQueue globalMasterTaskExecuteRunnableQueue;

@Override
public LogicTaskKillResponse operate(LogicTaskKillRequest taskKillRequest) {
Expand All @@ -54,8 +54,8 @@ public LogicTaskKillResponse operate(LogicTaskKillRequest taskKillRequest) {
}
try {
masterTaskExecuteRunnable.cancelTask();
masterDelayTaskExecuteRunnableDelayQueue
.removeMasterDelayTaskExecuteRunnable(masterTaskExecuteRunnable);
globalMasterTaskExecuteRunnableQueue
.removeMasterTaskExecuteRunnable(masterTaskExecuteRunnable);
return LogicTaskKillResponse.success();
} catch (MasterTaskExecuteException e) {
log.error("Cancel MasterTaskExecuteRunnable failed ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,38 @@

package org.apache.dolphinscheduler.server.master.runner;

import org.apache.dolphinscheduler.server.master.runner.execute.MasterDelayTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable;

import java.util.concurrent.DelayQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.springframework.stereotype.Component;

/**
*
*/
@Component
public class MasterDelayTaskExecuteRunnableDelayQueue {
public class GlobalMasterTaskExecuteRunnableQueue {

private final DelayQueue<MasterDelayTaskExecuteRunnable> masterDelayTaskExecuteRunnableDelayQueue =
new DelayQueue<>();
private final BlockingQueue<MasterTaskExecuteRunnable> masterTaskExecuteRunnableBlockingQueue =
new LinkedBlockingQueue<>();

public boolean submitMasterDelayTaskExecuteRunnable(MasterDelayTaskExecuteRunnable masterDelayTaskExecuteRunnable) {
return masterDelayTaskExecuteRunnableDelayQueue.offer(masterDelayTaskExecuteRunnable);
public boolean submitMasterTaskExecuteRunnable(MasterTaskExecuteRunnable masterTaskExecuteRunnable) {
return masterTaskExecuteRunnableBlockingQueue.offer(masterTaskExecuteRunnable);
}

public MasterDelayTaskExecuteRunnable takeMasterDelayTaskExecuteRunnable() throws InterruptedException {
return masterDelayTaskExecuteRunnableDelayQueue.take();
public MasterTaskExecuteRunnable takeMasterTaskExecuteRunnable() throws InterruptedException {
return masterTaskExecuteRunnableBlockingQueue.take();
}

// todo: if we move the delay process to master, than we don't need this method, since dispatchProcess can directly
// submit to thread pool
public boolean removeMasterDelayTaskExecuteRunnable(MasterTaskExecuteRunnable masterTaskExecuteRunnable) {
return masterDelayTaskExecuteRunnableDelayQueue.remove(masterTaskExecuteRunnable);
public boolean removeMasterTaskExecuteRunnable(MasterTaskExecuteRunnable masterTaskExecuteRunnable) {
return masterTaskExecuteRunnableBlockingQueue.remove(masterTaskExecuteRunnable);
}

public int size() {
return masterDelayTaskExecuteRunnableDelayQueue.size();
return masterTaskExecuteRunnableBlockingQueue.size();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.dolphinscheduler.server.master.runner;

import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterDelayTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableThreadPool;

Expand All @@ -31,17 +31,17 @@

@Slf4j
@Component
public class MasterDelayTaskExecuteRunnableDelayQueueLooper extends BaseDaemonThread implements AutoCloseable {
public class GlobalMasterTaskExecuteRunnableQueueLooper extends BaseDaemonThread implements AutoCloseable {

@Autowired
private MasterDelayTaskExecuteRunnableDelayQueue masterDelayTaskExecuteRunnableDelayQueue;
private GlobalMasterTaskExecuteRunnableQueue globalMasterTaskExecuteRunnableQueue;

@Autowired
private MasterTaskExecuteRunnableThreadPool masterTaskExecuteRunnableThreadPool;

private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);

public MasterDelayTaskExecuteRunnableDelayQueueLooper() {
public GlobalMasterTaskExecuteRunnableQueueLooper() {
super("MasterDelayTaskExecuteRunnableDelayQueueLooper");
}

Expand All @@ -61,10 +61,10 @@ public synchronized void start() {
public void run() {
while (RUNNING_FLAG.get()) {
try {
final MasterDelayTaskExecuteRunnable masterDelayTaskExecuteRunnable =
masterDelayTaskExecuteRunnableDelayQueue.takeMasterDelayTaskExecuteRunnable();
masterTaskExecuteRunnableThreadPool.submitMasterTaskExecuteRunnable(masterDelayTaskExecuteRunnable);
MasterTaskExecuteRunnableHolder.putMasterTaskExecuteRunnable(masterDelayTaskExecuteRunnable);
final MasterTaskExecuteRunnable masterTaskExecuteRunnable =
globalMasterTaskExecuteRunnableQueue.takeMasterTaskExecuteRunnable();
masterTaskExecuteRunnableThreadPool.submitMasterTaskExecuteRunnable(masterTaskExecuteRunnable);
MasterTaskExecuteRunnableHolder.putMasterTaskExecuteRunnable(masterTaskExecuteRunnable);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
log.warn("MasterDelayTaskExecuteRunnableDelayQueueLooper has been interrupted, will stop loop");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.DelayQueue;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -29,7 +29,7 @@
@Component
public class GlobalTaskDispatchWaitingQueue {

private final PriorityBlockingQueue<DefaultTaskExecuteRunnable> queue = new PriorityBlockingQueue<>();
private final DelayQueue<DefaultTaskExecuteRunnable> queue = new DelayQueue<>();

public void submitNeedToDispatchTaskExecuteRunnable(DefaultTaskExecuteRunnable priorityTaskExecuteRunnable) {
queue.put(priorityTaskExecuteRunnable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ public class MasterTaskExecutorBootstrap implements AutoCloseable {
private GlobalTaskDispatchWaitingQueueLooper globalTaskDispatchWaitingQueueLooper;

@Autowired
private MasterDelayTaskExecuteRunnableDelayQueueLooper masterDelayTaskExecuteRunnableDelayQueueLooper;
private GlobalMasterTaskExecuteRunnableQueueLooper globalMasterTaskExecuteRunnableQueueLooper;

@Autowired
private AsyncMasterTaskDelayQueueLooper asyncMasterTaskDelayQueueLooper;

public synchronized void start() {
log.info("MasterTaskExecutorBootstrap starting...");
globalTaskDispatchWaitingQueueLooper.start();
masterDelayTaskExecuteRunnableDelayQueueLooper.start();
globalMasterTaskExecuteRunnableQueueLooper.start();
asyncMasterTaskDelayQueueLooper.start();
log.info("MasterTaskExecutorBootstrap started...");
}
Expand All @@ -51,8 +51,8 @@ public void close() throws Exception {
try (
final GlobalTaskDispatchWaitingQueueLooper globalTaskDispatchWaitingQueueLooper1 =
globalTaskDispatchWaitingQueueLooper;
final MasterDelayTaskExecuteRunnableDelayQueueLooper masterDelayTaskExecuteRunnableDelayQueueLooper1 =
masterDelayTaskExecuteRunnableDelayQueueLooper;
final GlobalMasterTaskExecuteRunnableQueueLooper globalMasterTaskExecuteRunnableQueueLooper1 =
globalMasterTaskExecuteRunnableQueueLooper;
final AsyncMasterTaskDelayQueueLooper asyncMasterTaskDelayQueueLooper1 =
asyncMasterTaskDelayQueueLooper) {
// closed the resource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class AsyncMasterDelayTaskExecuteRunnable extends MasterDelayTaskExecuteRunnable {
public class AsyncMasterTaskExecuteRunnable extends MasterTaskExecuteRunnable {

private final AsyncMasterTaskDelayQueue asyncMasterTaskDelayQueue;

public AsyncMasterDelayTaskExecuteRunnable(TaskExecutionContext taskExecutionContext,
LogicTaskPluginFactoryBuilder logicTaskPluginFactoryBuilder,
LogicTaskInstanceExecutionEventSenderManager logicTaskInstanceExecutionEventSenderManager,
AsyncMasterTaskDelayQueue asyncTaskDelayQueue) {
public AsyncMasterTaskExecuteRunnable(TaskExecutionContext taskExecutionContext,
LogicTaskPluginFactoryBuilder logicTaskPluginFactoryBuilder,
LogicTaskInstanceExecutionEventSenderManager logicTaskInstanceExecutionEventSenderManager,
AsyncMasterTaskDelayQueue asyncTaskDelayQueue) {
super(taskExecutionContext, logicTaskPluginFactoryBuilder, logicTaskInstanceExecutionEventSenderManager);
this.asyncMasterTaskDelayQueue = asyncTaskDelayQueue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import org.springframework.stereotype.Component;

@Component
public class AsyncMasterDelayTaskExecuteRunnableFactory
public class AsyncMasterTaskExecuteRunnableFactory
implements
MasterDelayTaskExecuteRunnableFactory<AsyncMasterDelayTaskExecuteRunnable> {
MasterTaskExecuteRunnableFactory<AsyncMasterTaskExecuteRunnable> {

@Autowired
private LogicTaskPluginFactoryBuilder logicTaskPluginFactoryBuilder;
Expand All @@ -39,8 +39,8 @@ public class AsyncMasterDelayTaskExecuteRunnableFactory
private AsyncMasterTaskDelayQueue asyncTaskDelayQueue;

@Override
public AsyncMasterDelayTaskExecuteRunnable createWorkerTaskExecuteRunnable(TaskExecutionContext taskExecutionContext) {
return new AsyncMasterDelayTaskExecuteRunnable(taskExecutionContext,
public AsyncMasterTaskExecuteRunnable createWorkerTaskExecuteRunnable(TaskExecutionContext taskExecutionContext) {
return new AsyncMasterTaskExecuteRunnable(taskExecutionContext,
logicTaskPluginFactoryBuilder,
logicTaskInstanceExecutionEventSenderManager,
asyncTaskDelayQueue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,41 +26,41 @@
@Slf4j
public class AsyncTaskCallbackFunctionImpl implements AsyncTaskCallbackFunction {

private final AsyncMasterDelayTaskExecuteRunnable asyncMasterDelayTaskExecuteRunnable;
private final AsyncMasterTaskExecuteRunnable asyncMasterTaskExecuteRunnable;

public AsyncTaskCallbackFunctionImpl(@NonNull AsyncMasterDelayTaskExecuteRunnable asyncMasterDelayTaskExecuteRunnable) {
this.asyncMasterDelayTaskExecuteRunnable = asyncMasterDelayTaskExecuteRunnable;
public AsyncTaskCallbackFunctionImpl(@NonNull AsyncMasterTaskExecuteRunnable asyncMasterTaskExecuteRunnable) {
this.asyncMasterTaskExecuteRunnable = asyncMasterTaskExecuteRunnable;
}

@Override
public void executeSuccess() {
asyncMasterDelayTaskExecuteRunnable.getTaskExecutionContext()
asyncMasterTaskExecuteRunnable.getTaskExecutionContext()
.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
executeFinished();
}

@Override
public void executeFailed() {
asyncMasterDelayTaskExecuteRunnable.getTaskExecutionContext()
asyncMasterTaskExecuteRunnable.getTaskExecutionContext()
.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE);
executeFinished();
}

@Override
public void executeThrowing(Throwable throwable) {
asyncMasterDelayTaskExecuteRunnable.afterThrowing(throwable);
asyncMasterTaskExecuteRunnable.afterThrowing(throwable);
}

private void executeFinished() {
TaskInstanceLogHeader.printFinalizeTaskHeader();
int taskInstanceId = asyncMasterDelayTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId();
int taskInstanceId = asyncMasterTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId();
MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskInstanceId);
MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskInstanceId);
log.info("Task execute finished, removed the TaskExecutionContext");
asyncMasterDelayTaskExecuteRunnable.sendTaskResult();
asyncMasterTaskExecuteRunnable.sendTaskResult();
log.info(
"Execute task finished, will send the task execute result to master, the current task execute result is {}",
asyncMasterDelayTaskExecuteRunnable.getTaskExecutionContext().getCurrentExecutionStatus().name());
asyncMasterDelayTaskExecuteRunnable.closeLogAppender();
asyncMasterTaskExecuteRunnable.getTaskExecutionContext().getCurrentExecutionStatus().name());
asyncMasterTaskExecuteRunnable.closeLogAppender();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.runner.execute;

import static com.google.common.base.Preconditions.checkNotNull;

import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;

public abstract class BaseTaskExecuteRunnable implements TaskExecuteRunnable {

protected final ProcessInstance workflowInstance;
protected final TaskInstance taskInstance;
protected final TaskExecutionContext taskExecutionContext;

public BaseTaskExecuteRunnable(ProcessInstance workflowInstance,
TaskInstance taskInstance,
TaskExecutionContext taskExecutionContext) {
this.taskInstance = checkNotNull(taskInstance);
this.workflowInstance = checkNotNull(workflowInstance);
this.taskExecutionContext = checkNotNull(taskExecutionContext);
}

@Override
public ProcessInstance getWorkflowInstance() {
return workflowInstance;
}

@Override
public TaskInstance getTaskInstance() {
return taskInstance;
}

@Override
public TaskExecutionContext getTaskExecutionContext() {
return taskExecutionContext;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.runner.operator.TaskExecuteRunnableOperatorManager;

public class DefaultTaskExecuteRunnable extends PriorityTaskExecuteRunnable {
public class DefaultTaskExecuteRunnable extends PriorityDelayTaskExecuteRunnable {

private final TaskExecuteRunnableOperatorManager taskExecuteRunnableOperatorManager;

Expand Down
Loading

0 comments on commit 5f10044

Please sign in to comment.