Skip to content

Commit

Permalink
Add StickyTaskQueueDrainTimeout
Browse files Browse the repository at this point in the history
Add an option to drain sticky task queue
during graceful shutdown.
  • Loading branch information
Quinn-With-Two-Ns committed Mar 26, 2024
1 parent f205d1c commit 14b9861
Show file tree
Hide file tree
Showing 8 changed files with 212 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,23 @@ public CompletableFuture<Void> waitForSemaphorePermitsReleaseUntimed(
return future;
}

/**
* waitForStickyQueueBalancer -&gt; disableNormalPoll -&gt; timed wait for graceful completion of
* sticky workflows
*/
public CompletableFuture<Void> waitForStickyQueueBalancer(
StickyQueueBalancer balancer, Duration timeout) {
CompletableFuture<Void> future = new CompletableFuture<>();
balancer.disableNormalPoll();
scheduledExecutorService.schedule(
() -> {
future.complete(null);
},
timeout.toMillis(),
TimeUnit.MILLISECONDS);
return future;
}

/**
* Wait for {@code executorToShutdown} to terminate. Only completes the returned CompletableFuture
* when the executor is terminated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public static final class Builder {
private long defaultDeadlockDetectionTimeout;
private Duration maxHeartbeatThrottleInterval;
private Duration defaultHeartbeatThrottleInterval;
private Duration drainStickyTaskQueueTimeout;

private Builder() {}

Expand All @@ -80,6 +81,7 @@ private Builder(SingleWorkerOptions options) {
this.defaultHeartbeatThrottleInterval = options.getDefaultHeartbeatThrottleInterval();
this.buildId = options.getBuildId();
this.useBuildIdForVersioning = options.isUsingBuildIdForVersioning();
this.drainStickyTaskQueueTimeout = options.getDrainStickyTaskQueueTimeout();
}

public Builder setIdentity(String identity) {
Expand Down Expand Up @@ -161,6 +163,11 @@ public Builder setUseBuildIdForVersioning(boolean useBuildIdForVersioning) {
return this;
}

public Builder setStickyTaskQueueDrainTimeout(Duration drainStickyTaskQueueTimeout) {
this.drainStickyTaskQueueTimeout = drainStickyTaskQueueTimeout;
return this;
}

public SingleWorkerOptions build() {
PollerOptions pollerOptions = this.pollerOptions;
if (pollerOptions == null) {
Expand All @@ -177,6 +184,11 @@ public SingleWorkerOptions build() {
metricsScope = new NoopScope();
}

Duration drainStickyTaskQueueTimeout = this.drainStickyTaskQueueTimeout;
if (drainStickyTaskQueueTimeout == null) {
drainStickyTaskQueueTimeout = Duration.ofSeconds(0);
}

return new SingleWorkerOptions(
this.identity,
this.binaryChecksum,
Expand All @@ -192,7 +204,8 @@ public SingleWorkerOptions build() {
this.stickyQueueScheduleToStartTimeout,
this.defaultDeadlockDetectionTimeout,
this.maxHeartbeatThrottleInterval,
this.defaultHeartbeatThrottleInterval);
this.defaultHeartbeatThrottleInterval,
drainStickyTaskQueueTimeout);
}
}

Expand All @@ -211,6 +224,7 @@ public SingleWorkerOptions build() {
private final long defaultDeadlockDetectionTimeout;
private final Duration maxHeartbeatThrottleInterval;
private final Duration defaultHeartbeatThrottleInterval;
private final Duration drainStickyTaskQueueTimeout;

private SingleWorkerOptions(
String identity,
Expand All @@ -227,7 +241,8 @@ private SingleWorkerOptions(
Duration stickyQueueScheduleToStartTimeout,
long defaultDeadlockDetectionTimeout,
Duration maxHeartbeatThrottleInterval,
Duration defaultHeartbeatThrottleInterval) {
Duration defaultHeartbeatThrottleInterval,
Duration drainStickyTaskQueueTimeout) {
this.identity = identity;
this.binaryChecksum = binaryChecksum;
this.buildId = buildId;
Expand All @@ -243,6 +258,7 @@ private SingleWorkerOptions(
this.defaultDeadlockDetectionTimeout = defaultDeadlockDetectionTimeout;
this.maxHeartbeatThrottleInterval = maxHeartbeatThrottleInterval;
this.defaultHeartbeatThrottleInterval = defaultHeartbeatThrottleInterval;
this.drainStickyTaskQueueTimeout = drainStickyTaskQueueTimeout;
}

public String getIdentity() {
Expand All @@ -265,6 +281,10 @@ public boolean isUsingBuildIdForVersioning() {
return useBuildIdForVersioning;
}

public Duration getDrainStickyTaskQueueTimeout() {
return drainStickyTaskQueueTimeout;
}

public DataConverter getDataConverter() {
return dataConverter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package io.temporal.internal.worker;

import io.temporal.api.enums.v1.TaskQueueKind;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.concurrent.ThreadSafe;

Expand All @@ -30,6 +31,7 @@ public class StickyQueueBalancer {
private final boolean stickyQueueEnabled;
private final AtomicInteger stickyPollers = new AtomicInteger(0);
private final AtomicInteger normalPollers = new AtomicInteger(0);
private final AtomicBoolean disableNormalPoll = new AtomicBoolean(false);

private volatile long stickyBacklogSize = 0;

Expand All @@ -43,6 +45,10 @@ public StickyQueueBalancer(int pollersCount, boolean stickyQueueEnabled) {
*/
public TaskQueueKind makePoll() {
if (stickyQueueEnabled) {
if (disableNormalPoll.get()) {
stickyPollers.incrementAndGet();
return TaskQueueKind.TASK_QUEUE_KIND_STICKY;
}
// If pollersCount >= stickyBacklogSize > 0 we want to go back to a normal ratio to avoid a
// situation that too many pollers (all of them in the worst case) will open only sticky queue
// polls observing a stickyBacklogSize == 1 for example (which actually can be 0 already at
Expand Down Expand Up @@ -83,4 +89,12 @@ public void finishPoll(TaskQueueKind taskQueueKind, long backlogSize) {
stickyBacklogSize = backlogSize;
}
}

public void disableNormalPoll() {
disableNormalPoll.set(true);
}

public int getNormalPollerCount() {
return normalPollers.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public boolean start() {
@Override
public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
return shutdownManager
// we want to shutdown heartbeatExecutor before activity worker, so in-flight activities
// we want to shut down heartbeatExecutor before activity worker, so in-flight activities
// could get an ActivityWorkerShutdownException from their heartbeat
.shutdownExecutor(heartbeatExecutor, this + "#heartbeatExecutor", Duration.ofSeconds(5))
.thenCompose(r -> worker.shutdown(shutdownManager, interruptTasks))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ final class WorkflowWorker implements SuspendableWorker {
// Currently the implementation looks safe without volatile, but it's brittle.
@Nonnull private SuspendableWorker poller = new NoopWorker();

private StickyQueueBalancer stickyQueueBalancer;

public WorkflowWorker(
@Nonnull WorkflowServiceStubs service,
@Nonnull String namespace,
Expand Down Expand Up @@ -118,7 +120,7 @@ public boolean start() {
options.getTaskExecutorThreadPoolSize(),
workerMetricsScope,
true);
StickyQueueBalancer stickyQueueBalancer =
stickyQueueBalancer =
new StickyQueueBalancer(
options.getPollerOptions().getPollThreadCount(), stickyTaskQueueName != null);

Expand Down Expand Up @@ -153,8 +155,21 @@ public boolean start() {
@Override
public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
String semaphoreName = this + "#executorSlotsSemaphore";
return poller
.shutdown(shutdownManager, interruptTasks)

boolean stickyQueueBalancerDrainEnabled =
!interruptTasks
&& !options.getDrainStickyTaskQueueTimeout().isZero()
&& stickyTaskQueueName != null
&& stickyQueueBalancer != null;

return CompletableFuture.completedFuture(null)
.thenCompose(
ignore ->
stickyQueueBalancerDrainEnabled
? shutdownManager.waitForStickyQueueBalancer(
stickyQueueBalancer, options.getDrainStickyTaskQueueTimeout())
: CompletableFuture.completedFuture(null))
.thenCompose(ignore -> poller.shutdown(shutdownManager, interruptTasks))
.thenCompose(
ignore ->
!interruptTasks
Expand Down
1 change: 1 addition & 0 deletions temporal-sdk/src/main/java/io/temporal/worker/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ private static SingleWorkerOptions toWorkflowWorkerOptions(
PollerOptions.newBuilder().setPollThreadCount(maxConcurrentWorkflowTaskPollers).build())
.setTaskExecutorThreadPoolSize(options.getMaxConcurrentWorkflowTaskExecutionSize())
.setStickyQueueScheduleToStartTimeout(stickyQueueScheduleToStartTimeout)
.setStickyTaskQueueDrainTimeout(options.getStickyTaskQueueDrainTimeout())
.setDefaultDeadlockDetectionTimeout(options.getDefaultDeadlockDetectionTimeout())
.setMetricsScope(metricsScope.tagged(tags))
.build();
Expand Down
49 changes: 44 additions & 5 deletions temporal-sdk/src/main/java/io/temporal/worker/WorkerOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import com.google.common.base.Preconditions;
import io.temporal.common.Experimental;
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
import java.time.Duration;
import java.util.Objects;
import javax.annotation.Nonnull;
Expand All @@ -45,6 +46,8 @@ public static WorkerOptions getDefaultInstance() {

static final Duration DEFAULT_STICKY_SCHEDULE_TO_START_TIMEOUT = Duration.ofSeconds(5);

static final Duration DEFAULT_STICKY_TASK_QUEUE_DRAIN_TIMEOUT = Duration.ofSeconds(0);

private static final WorkerOptions DEFAULT_INSTANCE;

static {
Expand Down Expand Up @@ -78,6 +81,7 @@ public static final class Builder {
private boolean disableEagerExecution;
private String buildId;
private boolean useBuildIdForVersioning;
private Duration stickyTaskQueueDrainTimeout;

private Builder() {}

Expand All @@ -100,6 +104,7 @@ private Builder(WorkerOptions o) {
this.disableEagerExecution = o.disableEagerExecution;
this.useBuildIdForVersioning = o.useBuildIdForVersioning;
this.buildId = o.buildId;
this.stickyTaskQueueDrainTimeout = o.stickyTaskQueueDrainTimeout;
}

/**
Expand Down Expand Up @@ -349,6 +354,22 @@ public Builder setBuildId(String buildId) {
return this;
}

/**
* During graceful shutdown, like calling {@link WorkerFactory#shutdown()}, if "sticky worker"
* is enabled, this timeout controls how long to wait for the sticky task queue to drain before
* shutting down the worker. If set the worker will stop making new poll requests on the normal
* task queue, but will continue to poll the sticky task queue until the timeout is reached.
* This value should always be greater than clients rpc long poll timeout, which can be set via
* {@link WorkflowServiceStubsOptions.Builder#setRpcLongPollTimeout(Duration)}.
*
* <p>Default is not to wait.
*/
@Experimental
public Builder setStickyTaskQueueDrainTimeout(Duration stickyTaskQueueDrainTimeout) {
this.stickyTaskQueueDrainTimeout = stickyTaskQueueDrainTimeout;
return this;
}

public WorkerOptions build() {
return new WorkerOptions(
maxWorkerActivitiesPerSecond,
Expand All @@ -365,7 +386,8 @@ public WorkerOptions build() {
stickyQueueScheduleToStartTimeout,
disableEagerExecution,
useBuildIdForVersioning,
buildId);
buildId,
stickyTaskQueueDrainTimeout);
}

public WorkerOptions validateAndBuildWithDefaults() {
Expand Down Expand Up @@ -396,6 +418,9 @@ public WorkerOptions validateAndBuildWithDefaults() {
buildId != null && !buildId.isEmpty(),
"buildId must be set non-empty if useBuildIdForVersioning is set true");
}
Preconditions.checkState(
stickyTaskQueueDrainTimeout == null || !stickyTaskQueueDrainTimeout.isNegative(),
"negative stickyTaskQueueDrainTimeout");

return new WorkerOptions(
maxWorkerActivitiesPerSecond,
Expand Down Expand Up @@ -430,7 +455,10 @@ public WorkerOptions validateAndBuildWithDefaults() {
: stickyQueueScheduleToStartTimeout,
disableEagerExecution,
useBuildIdForVersioning,
buildId);
buildId,
stickyTaskQueueDrainTimeout == null
? DEFAULT_STICKY_TASK_QUEUE_DRAIN_TIMEOUT
: stickyTaskQueueDrainTimeout);
}
}

Expand All @@ -449,6 +477,7 @@ public WorkerOptions validateAndBuildWithDefaults() {
private final boolean disableEagerExecution;
private final boolean useBuildIdForVersioning;
private final String buildId;
private final Duration stickyTaskQueueDrainTimeout;

private WorkerOptions(
double maxWorkerActivitiesPerSecond,
Expand All @@ -465,7 +494,8 @@ private WorkerOptions(
@Nonnull Duration stickyQueueScheduleToStartTimeout,
boolean disableEagerExecution,
boolean useBuildIdForVersioning,
String buildId) {
String buildId,
Duration stickyTaskQueueDrainTimeout) {
this.maxWorkerActivitiesPerSecond = maxWorkerActivitiesPerSecond;
this.maxConcurrentActivityExecutionSize = maxConcurrentActivityExecutionSize;
this.maxConcurrentWorkflowTaskExecutionSize = maxConcurrentWorkflowExecutionSize;
Expand All @@ -481,6 +511,7 @@ private WorkerOptions(
this.disableEagerExecution = disableEagerExecution;
this.useBuildIdForVersioning = useBuildIdForVersioning;
this.buildId = buildId;
this.stickyTaskQueueDrainTimeout = stickyTaskQueueDrainTimeout;
}

public double getMaxWorkerActivitiesPerSecond() {
Expand Down Expand Up @@ -560,6 +591,10 @@ public String getBuildId() {
return buildId;
}

public Duration getStickyTaskQueueDrainTimeout() {
return stickyTaskQueueDrainTimeout;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -579,7 +614,8 @@ && compare(that.maxTaskQueueActivitiesPerSecond, maxTaskQueueActivitiesPerSecond
&& Objects.equals(stickyQueueScheduleToStartTimeout, that.stickyQueueScheduleToStartTimeout)
&& disableEagerExecution == that.disableEagerExecution
&& useBuildIdForVersioning == that.useBuildIdForVersioning
&& Objects.equals(that.buildId, buildId);
&& Objects.equals(that.buildId, buildId)
&& Objects.equals(stickyTaskQueueDrainTimeout, that.stickyTaskQueueDrainTimeout);
}

@Override
Expand All @@ -599,7 +635,8 @@ public int hashCode() {
stickyQueueScheduleToStartTimeout,
disableEagerExecution,
useBuildIdForVersioning,
buildId);
buildId,
stickyTaskQueueDrainTimeout);
}

@Override
Expand Down Expand Up @@ -635,6 +672,8 @@ public String toString() {
+ useBuildIdForVersioning
+ ", buildId='"
+ buildId
+ ", stickyTaskQueueDrainTimeout='"
+ stickyTaskQueueDrainTimeout
+ '}';
}
}
Loading

0 comments on commit 14b9861

Please sign in to comment.