Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Support for Retry with LINEAR_BACKOFF #2699

Merged
merged 3 commits into from
Jan 27, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public enum TimeoutPolicy {
@ProtoEnum
public enum RetryLogic {
FIXED,
LINEAR_BACKOFF,
EXPONENTIAL_BACKOFF
}

Expand Down Expand Up @@ -120,6 +121,10 @@ public enum RetryLogic {
@Min(value = 0, message = "TaskDef pollTimeoutSeconds: {value} must be >= 0")
private Integer pollTimeoutSeconds;

@ProtoField(id = 20)
@Min(value = 1, message = "Backoff scale factor. Applicable for LINEAR_BACKOFF")
private Integer backoffScaleFactor = 1;

public TaskDef() {}

public TaskDef(String name) {
Expand Down Expand Up @@ -355,6 +360,16 @@ public Integer getPollTimeoutSeconds() {
return pollTimeoutSeconds;
}

/** @param backoffScaleFactor the backoff rate to set */
public void setBackoffScaleFactor(Integer backoffScaleFactor) {
this.backoffScaleFactor = backoffScaleFactor;
}

/** @return the backoff rate of this task definition */
public Integer getBackoffScaleFactor() {
return backoffScaleFactor;
}

@Override
public String toString() {
return name;
Expand All @@ -372,6 +387,7 @@ public boolean equals(Object o) {
return getRetryCount() == taskDef.getRetryCount()
&& getTimeoutSeconds() == taskDef.getTimeoutSeconds()
&& getRetryDelaySeconds() == taskDef.getRetryDelaySeconds()
&& getBackoffScaleFactor() == taskDef.getBackoffScaleFactor()
&& getResponseTimeoutSeconds() == taskDef.getResponseTimeoutSeconds()
&& Objects.equals(getName(), taskDef.getName())
&& Objects.equals(getDescription(), taskDef.getDescription())
Expand Down Expand Up @@ -400,6 +416,7 @@ public int hashCode() {
getTimeoutPolicy(),
getRetryLogic(),
getRetryDelaySeconds(),
getBackoffScaleFactor(),
getResponseTimeoutSeconds(),
getConcurrentExecLimit(),
getRateLimitPerFrequency(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,12 +549,24 @@ Optional<Task> retry(
case FIXED:
startDelay = taskDefinition.getRetryDelaySeconds();
break;
case LINEAR_BACKOFF:
int linearRetryDelaySeconds =
taskDefinition.getRetryDelaySeconds()
* taskDefinition.getBackoffScaleFactor()
* (task.getRetryCount() + 1);
// Reset integer overflow to max value
startDelay =
linearRetryDelaySeconds < 0 ? Integer.MAX_VALUE : linearRetryDelaySeconds;
break;
case EXPONENTIAL_BACKOFF:
int retryDelaySeconds =
int exponentialRetryDelaySeconds =
taskDefinition.getRetryDelaySeconds()
* (int) Math.pow(2, task.getRetryCount());
// Reset integer overflow to max value
startDelay = retryDelaySeconds < 0 ? Integer.MAX_VALUE : retryDelaySeconds;
startDelay =
exponentialRetryDelaySeconds < 0
? Integer.MAX_VALUE
: exponentialRetryDelaySeconds;
break;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,36 @@ public void testWorkflowTaskRetry() {
assertEquals(WorkflowStatus.FAILED, workflow.getStatus());
}

@Test
public void testLinearBackoff() {
Workflow workflow = createDefaultWorkflow();

Task task = new Task();
task.setStatus(Status.FAILED);
task.setTaskId("t1");

TaskDef taskDef = new TaskDef();
taskDef.setRetryDelaySeconds(60);
taskDef.setRetryLogic(TaskDef.RetryLogic.LINEAR_BACKOFF);
taskDef.setBackoffScaleFactor(2);
WorkflowTask workflowTask = new WorkflowTask();

Optional<Task> task2 = deciderService.retry(taskDef, workflowTask, task, workflow);
assertEquals(120, task2.get().getCallbackAfterSeconds()); // 60*2*1

Optional<Task> task3 = deciderService.retry(taskDef, workflowTask, task2.get(), workflow);
assertEquals(240, task3.get().getCallbackAfterSeconds()); // 60*2*2

Optional<Task> task4 = deciderService.retry(taskDef, workflowTask, task3.get(), workflow);
// // 60*2*3
assertEquals(360, task4.get().getCallbackAfterSeconds()); // 60*2*3

taskDef.setRetryCount(Integer.MAX_VALUE);
task4.get().setRetryCount(Integer.MAX_VALUE - 100);
Optional<Task> task5 = deciderService.retry(taskDef, workflowTask, task4.get(), workflow);
assertEquals(Integer.MAX_VALUE, task5.get().getCallbackAfterSeconds());
}

@Test
public void testExponentialBackoff() {
Workflow workflow = createDefaultWorkflow();
Expand Down
2 changes: 2 additions & 0 deletions docs/docs/configuration/taskdef.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Conductor maintains a registry of worker tasks. A task MUST be registered befor
|timeoutSeconds|Time in seconds, after which the task is marked as `TIMED_OUT` if not completed after transitioning to `IN_PROGRESS` status for the first time|No timeouts if set to 0|
|pollTimeoutSeconds|Time in seconds, after which the task is marked as `TIMED_OUT` if not polled by a worker|No timeouts if set to 0|
|responseTimeoutSeconds|Must be greater than 0 and less than timeoutSeconds. The task is rescheduled if not updated with a status after this time (heartbeat mechanism). Useful when the worker polls for the task but fails to complete due to errors/network failure.|defaults to 3600|
|backoffScaleFactor|Must be greater than 0. Scale factor for linearity of the backoff|defaults to 1|
|inputKeys|Array of keys of task's expected input. Used for documenting task's input. See [Using inputKeys and outputKeys](#using-inputkeys-and-outputkeys). |optional|
|outputKeys|Array of keys of task's expected output. Used for documenting task's output|optional|
|inputTemplate|See [Using inputTemplate](#using-inputtemplate) below.|optional|
Expand All @@ -51,6 +52,7 @@ Conductor maintains a registry of worker tasks. A task MUST be registered befor
### Retry Logic

* FIXED : Reschedule the task after the ```retryDelaySeconds```
* LINEAR_BACKOFF : Reschedule after ```retryDelaySeconds * backoffRate * attemptNumber```
* EXPONENTIAL_BACKOFF : Reschedule after ```retryDelaySeconds * attemptNumber```

### Timeout Policy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,9 @@ public TaskDefPb.TaskDef toProto(TaskDef from) {
if (from.getPollTimeoutSeconds() != null) {
to.setPollTimeoutSeconds( from.getPollTimeoutSeconds() );
}
if (from.getBackoffScaleFactor() != null) {
to.setBackoffScaleFactor( from.getBackoffScaleFactor() );
}
return to.build();
}

Expand Down Expand Up @@ -738,13 +741,15 @@ public TaskDef fromProto(TaskDefPb.TaskDef from) {
to.setExecutionNameSpace( from.getExecutionNameSpace() );
to.setOwnerEmail( from.getOwnerEmail() );
to.setPollTimeoutSeconds( from.getPollTimeoutSeconds() );
to.setBackoffScaleFactor( from.getBackoffScaleFactor() );
return to;
}

public TaskDefPb.TaskDef.RetryLogic toProto(TaskDef.RetryLogic from) {
TaskDefPb.TaskDef.RetryLogic to;
switch (from) {
case FIXED: to = TaskDefPb.TaskDef.RetryLogic.FIXED; break;
case LINEAR_BACKOFF: to = TaskDefPb.TaskDef.RetryLogic.LINEAR_BACKOFF; break;
case EXPONENTIAL_BACKOFF: to = TaskDefPb.TaskDef.RetryLogic.EXPONENTIAL_BACKOFF; break;
default: throw new IllegalArgumentException("Unexpected enum constant: " + from);
}
Expand All @@ -755,6 +760,7 @@ public TaskDef.RetryLogic fromProto(TaskDefPb.TaskDef.RetryLogic from) {
TaskDef.RetryLogic to;
switch (from) {
case FIXED: to = TaskDef.RetryLogic.FIXED; break;
case LINEAR_BACKOFF: to = TaskDef.RetryLogic.LINEAR_BACKOFF; break;
case EXPONENTIAL_BACKOFF: to = TaskDef.RetryLogic.EXPONENTIAL_BACKOFF; break;
default: throw new IllegalArgumentException("Unexpected enum constant: " + from);
}
Expand Down
4 changes: 3 additions & 1 deletion grpc/src/main/proto/model/taskdef.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ option go_package = "github.com/netflix/conductor/client/gogrpc/conductor/model"
message TaskDef {
enum RetryLogic {
FIXED = 0;
EXPONENTIAL_BACKOFF = 1;
LINEAR_BACKOFF = 1;
venkag marked this conversation as resolved.
Show resolved Hide resolved
EXPONENTIAL_BACKOFF = 2;
}
enum TimeoutPolicy {
RETRY = 0;
Expand All @@ -35,4 +36,5 @@ message TaskDef {
string execution_name_space = 17;
string owner_email = 18;
int32 poll_timeout_seconds = 19;
int32 backoff_scale_factor = 20;
}