Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve-#13045] after a submit failure, stop the processInstance to avoid an endless loop #13051

Merged
merged 10 commits into from
Feb 20, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public enum StateEventType {
TASK_TIMEOUT(3, "task timeout"),
WAKE_UP_TASK_GROUP(4, "wait task group"),
TASK_RETRY(5, "task retry"),
PROCESS_BLOCKED(6, "process blocked");
PROCESS_BLOCKED(6, "process blocked"),
PROCESS_SUBMIT_FAILED(7, "process submit failed");

StateEventType(int code, String descp) {
this.code = code;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.dolphinscheduler.server.master.event;

import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
Expand Down Expand Up @@ -66,11 +68,16 @@ public void handleWorkflowEvent(final WorkflowEvent workflowEvent) throws Workfl
if (processInstance.getTimeout() > 0) {
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
}
} else {
// submit failed will resend the event to workflow event queue
log.error("Failed to submit the workflow instance, will resend the workflow start event: {}",
} else if (WorkflowSubmitStatue.FAILED == workflowSubmitStatue) {
log.error(
"Failed to submit the workflow instance, will resend the workflow start event: {}",
workflowEvent);
workflowEventQueue.addEvent(workflowEvent);
fuchanghai marked this conversation as resolved.
Show resolved Hide resolved
WorkflowStateEvent stateEvent = WorkflowStateEvent.builder()
.processInstanceId(processInstance.getId())
.type(StateEventType.PROCESS_SUBMIT_FAILED)
.status(WorkflowExecutionStatus.FAILURE)
.build();
workflowExecuteRunnable.addStateEvent(stateEvent);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable,
return true;
}
if (workflowStateEvent.getStatus().isFinished()) {
if (workflowStateEvent.getType().equals(StateEventType.PROCESS_SUBMIT_FAILED)) {
workflowExecuteRunnable.updateProcessInstanceState(workflowStateEvent);
}
workflowExecuteRunnable.endProcess();
}
if (processInstance.getState().isReadyStop()) {
Expand Down