Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #433 from Netflix/cleanup
Browse files Browse the repository at this point in the history
Logger statements and clean up
  • Loading branch information
pctreddy committed Feb 23, 2018
2 parents 4df0f91 + 17abc49 commit 504bd40
Show file tree
Hide file tree
Showing 24 changed files with 356 additions and 306 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@
public class TaskResult {

public enum Status {

IN_PROGRESS, FAILED, COMPLETED, SCHEDULED; //SCHEDULED is added for the backward compatibility and should NOT be used when updating the task result
};
IN_PROGRESS, FAILED, COMPLETED, SCHEDULED; //SCHEDULED is added for the backward compatibility and should NOT be used when updating the task result
}

private String workflowInstanceId;

Expand Down Expand Up @@ -100,7 +99,7 @@ public long getCallbackAfterSeconds() {
/**
* When set to non-zero values, the task remains in the queue for the specified seconds before sent back to the worker when polled.
* Useful for the long running task, where the task is updated as IN_PROGRESS and should not be polled out of the queue for a specified amount of time. (delayed queue implementation)
* @param callbackAfterSeconds. Amount of time in seconds the task should be held in the queue before giving it to a polling worker.
* @param callbackAfterSeconds Amount of time in seconds the task should be held in the queue before giving it to a polling worker.
*/
public void setCallbackAfterSeconds(long callbackAfterSeconds) {
this.callbackAfterSeconds = callbackAfterSeconds;
Expand Down Expand Up @@ -219,7 +218,6 @@ public static TaskResult newTaskResult(Status status) {
result.setStatus(status);
return result;
}




}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class EventQueues {

private static Logger logger = LoggerFactory.getLogger(EventQueues.class);

private static ParametersUtils pu = new ParametersUtils();
private static ParametersUtils parametersUtils = new ParametersUtils();

private static Map<String, EventQueueProvider> providers = new HashMap<>();

Expand All @@ -54,7 +54,7 @@ public static List<String> providers() {
}

public static ObservableQueue getQueue(String eventt, boolean throwException) {
String event = pu.replace(eventt).toString();
String event = parametersUtils.replace(eventt).toString();
String type = event.substring(0, event.indexOf(':'));
String queueURI = event.substring(event.indexOf(':') + 1);
EventQueueProvider provider = providers.get(type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
@SuppressWarnings("serial")
public class ApplicationException extends RuntimeException {

public static enum Code {
public enum Code {
INVALID_INPUT(400), INTERNAL_ERROR(500), NOT_FOUND(404), CONFLICT(409), UNAUTHORIZED(403), BACKEND_ERROR(500);

private int statusCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public DeciderService(MetadataDAO metadataDAO, @Named("TaskMappers") Map<String,
}

//QQ public method validation of the input params
public DeciderOutcome decide(Workflow workflow, WorkflowDef def) throws TerminateWorkflow {
public DeciderOutcome decide(Workflow workflow, WorkflowDef def) throws TerminateWorkflowException {

workflow.setSchemaVersion(def.getSchemaVersion());
//In case of a new workflow the list of tasks will be empty
Expand All @@ -91,7 +91,7 @@ public DeciderOutcome decide(Workflow workflow, WorkflowDef def) throws Terminat
return decide(def, workflow, tasksToBeScheduled);
}

private DeciderOutcome decide(final WorkflowDef def, final Workflow workflow, List<Task> preScheduledTasks) throws TerminateWorkflow {
private DeciderOutcome decide(final WorkflowDef def, final Workflow workflow, List<Task> preScheduledTasks) throws TerminateWorkflowException {

DeciderOutcome outcome = new DeciderOutcome();

Expand Down Expand Up @@ -193,7 +193,7 @@ private DeciderOutcome decide(final WorkflowDef def, final Workflow workflow, Li

}

private List<Task> startWorkflow(Workflow workflow, WorkflowDef def) throws TerminateWorkflow {
private List<Task> startWorkflow(Workflow workflow, WorkflowDef def) throws TerminateWorkflowException {

logger.debug("Starting workflow " + def.getName() + "/" + workflow.getWorkflowId());
//The tasks will be empty in case of new workflow
Expand All @@ -202,7 +202,7 @@ private List<Task> startWorkflow(Workflow workflow, WorkflowDef def) throws Term
if (workflow.getReRunFromWorkflowId() == null || tasks.isEmpty()) {

if (def.getTasks().isEmpty()) {
throw new TerminateWorkflow("No tasks found to be executed", WorkflowStatus.COMPLETED);
throw new TerminateWorkflowException("No tasks found to be executed", WorkflowStatus.COMPLETED);
}

WorkflowTask taskToSchedule = def.getTasks().getFirst(); //Nothing isSystemTask running yet - so schedule the first task
Expand All @@ -227,7 +227,7 @@ private List<Task> startWorkflow(Workflow workflow, WorkflowDef def) throws Term
.orElseThrow(() -> {
String reason = String.format("The workflow %s isSystemTask marked for re-run from %s but could not find the starting task",
workflow.getWorkflowId(), workflow.getReRunFromWorkflowId());
return new TerminateWorkflow(reason);
return new TerminateWorkflowException(reason);
});

return Arrays.asList(rerunFromTask);
Expand All @@ -251,7 +251,7 @@ private void updateOutput(final WorkflowDef def, final Workflow workflow) {
workflow.setOutput(output);
}

private boolean checkForWorkflowCompletion(final WorkflowDef def, final Workflow workflow) throws TerminateWorkflow {
private boolean checkForWorkflowCompletion(final WorkflowDef def, final Workflow workflow) throws TerminateWorkflowException {

List<Task> allTasks = workflow.getTasks();
if (allTasks.isEmpty()) {
Expand Down Expand Up @@ -319,13 +319,13 @@ private String getNextTasksToBeScheduled(WorkflowDef def, Workflow workflow, Tas
}

@VisibleForTesting
Task retry(TaskDef taskDefinition, WorkflowTask workflowTask, Task task, Workflow workflow) throws TerminateWorkflow {
Task retry(TaskDef taskDefinition, WorkflowTask workflowTask, Task task, Workflow workflow) throws TerminateWorkflowException {

int retryCount = task.getRetryCount();
if (!task.getStatus().isRetriable() || SystemTaskType.isBuiltIn(task.getTaskType()) || taskDefinition == null || taskDefinition.getRetryCount() <= retryCount) {
WorkflowStatus status = task.getStatus().equals(Status.TIMED_OUT) ? WorkflowStatus.TIMED_OUT : WorkflowStatus.FAILED;
task.setRetried(true);
throw new TerminateWorkflow(task.getReasonForIncompletion(), status, task);
throw new TerminateWorkflowException(task.getReasonForIncompletion(), status, task);
}

// retry... - but not immediately - put a delay...
Expand Down Expand Up @@ -393,7 +393,7 @@ void checkForTimeout(TaskDef taskType, Task task) {
case TIME_OUT_WF:
task.setStatus(Status.TIMED_OUT);
task.setReasonForIncompletion(reason);
throw new TerminateWorkflow(reason, WorkflowStatus.TIMED_OUT, task);
throw new TerminateWorkflowException(reason, WorkflowStatus.TIMED_OUT, task);
}
}

Expand Down Expand Up @@ -428,8 +428,10 @@ public List<Task> getTasksToBeScheduled(WorkflowDef def, Workflow workflow,

public List<Task> getTasksToBeScheduled(WorkflowDef workflowDefinition, Workflow workflowInstance,
WorkflowTask taskToSchedule, int retryCount, String retriedTaskId) {

Map<String, Object> input = parametersUtils.getTaskInput(taskToSchedule.getInputParameters(),
workflowInstance, null, null);

Type taskType = Type.USER_DEFINED;
String type = taskToSchedule.getType();
if (Type.isSystemTask(type)) {
Expand All @@ -455,7 +457,7 @@ private boolean isTaskSkipped(WorkflowTask taskToSchedule, Workflow workflow) {
}
return retval;
} catch (Exception e) {
throw new TerminateWorkflow(e.getMessage());
throw new TerminateWorkflowException(e.getMessage());
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,21 @@
*
*/
@SuppressWarnings("serial")
public class TerminateWorkflow extends RuntimeException {
public class TerminateWorkflowException extends RuntimeException {

WorkflowStatus workflowStatus;

Task task;

public TerminateWorkflow(String reason) {
public TerminateWorkflowException(String reason) {
this(reason, WorkflowStatus.FAILED);
}

public TerminateWorkflow(String reason, WorkflowStatus workflowStatus) {
public TerminateWorkflowException(String reason, WorkflowStatus workflowStatus) {
this(reason, workflowStatus, null);
}

public TerminateWorkflow(String reason, WorkflowStatus workflowStatus, Task task) {
public TerminateWorkflowException(String reason, WorkflowStatus workflowStatus, Task task) {
super(reason);
this.workflowStatus = workflowStatus;
this.task = task;
Expand Down
Loading

0 comments on commit 504bd40

Please sign in to comment.