Skip to content

Commit

Permalink
loadbalancer-experimental: allow configuring the pending request pena…
Browse files Browse the repository at this point in the history
…lty (#2991)

Motivation:

The pending request penalty is a way to try to avoid connections with
lost of pending requests which results in a more even request distribution.
However, the penalty is currently fixed which doesn't allow us to choose
how to prioritize between the fastest hosts vs the most fair request
distribution.

Modifications:

Make the penalty factor configurable.
  • Loading branch information
bryce-anderson authored Jul 2, 2024
1 parent 2e5726f commit c4198c2
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ abstract class DefaultRequestTracker implements RequestTracker, ScoreSupplier {
private final double invTau;
private final long cancelPenalty;
private final long errorPenalty;
private final long concurrentRequestPenalty;

/**
* Last inserted value to compute weight.
Expand All @@ -56,14 +57,16 @@ abstract class DefaultRequestTracker implements RequestTracker, ScoreSupplier {
* Current weighted average.
*/
private int ewma;
private int pendingCount;
private long pendingStamp = Long.MIN_VALUE;
private int concurrentCount;
private long concurrentStamp = Long.MIN_VALUE;

DefaultRequestTracker(final long halfLifeNanos, final long cancelPenalty, final long errorPenalty) {
DefaultRequestTracker(final long halfLifeNanos, final long cancelPenalty, final long errorPenalty,
final long concurrentRequestPenalty) {
ensurePositive(halfLifeNanos, "halfLifeNanos");
this.invTau = Math.pow((halfLifeNanos / log(2)), -1);
this.cancelPenalty = cancelPenalty;
this.errorPenalty = errorPenalty;
this.concurrentRequestPenalty = concurrentRequestPenalty;
}

/**
Expand All @@ -77,10 +80,10 @@ public final long beforeRequestStart() {
final long stamp = lock.writeLock();
try {
long timestamp = currentTimeNanos();
pendingCount++;
if (pendingStamp == Long.MIN_VALUE) {
// only update the pending timestamp if it doesn't already have a value.
pendingStamp = timestamp;
concurrentCount++;
if (concurrentStamp == Long.MIN_VALUE) {
// only update the concurrent timestamp if it doesn't already have a value.
concurrentStamp = timestamp;
}
return timestamp;
} finally {
Expand All @@ -101,10 +104,10 @@ public void onRequestError(final long startTimeNanos, ErrorClass errorClass) {
private void onComplete(final long startTimeNanos, long penalty) {
final long stamp = lock.writeLock();
try {
pendingCount--;
concurrentCount--;
// Unconditionally clear the timestamp because we don't know which request set it. This is an acceptable
// 'error' since otherwise we need to keep a collection of start timestamps.
pendingStamp = Long.MIN_VALUE;
concurrentStamp = Long.MIN_VALUE;
updateEwma(penalty, startTimeNanos);
} finally {
lock.unlockWrite(stamp);
Expand All @@ -114,16 +117,16 @@ private void onComplete(final long startTimeNanos, long penalty) {
@Override
public final int score() {
final long lastTimeNanos;
final int cPending;
final long pendingStamp;
final int concurrentCount;
final long concurrentStamp;
int currentEWMA;
// read all the relevant state using the read lock
final long stamp = lock.readLock();
try {
currentEWMA = ewma;
lastTimeNanos = this.lastTimeNanos;
cPending = pendingCount;
pendingStamp = this.pendingStamp;
concurrentCount = this.concurrentCount;
concurrentStamp = this.concurrentStamp;
} finally {
lock.unlockRead(stamp);
}
Expand All @@ -139,26 +142,27 @@ public final int score() {
}

if (currentEWMA == 0) {
// If EWMA has decayed to 0 (or isn't yet initialized) and there are no pending requests we return the
// maximum score to increase the likelihood this entity is selected. If there are pending requests we
// If EWMA has decayed to 0 (or isn't yet initialized) and there are no concurrent requests we return the
// maximum score to increase the likelihood this entity is selected. If there are concurrent requests we
// don't yet know the latency characteristics so we return the minimum score to decrease the
// likelihood this entity is selected.
return cPending == 0 ? 0 : MIN_VALUE;
return concurrentCount == 0 ? 0 : MIN_VALUE;
}

if (cPending > 0 && pendingStamp != Long.MIN_VALUE) {
// If we have a request outstanding we should consider how long it has been outstanding so that sudden
if (concurrentCount > 0 && concurrentStamp != Long.MIN_VALUE) {
// If we have a request concurrent we should consider how long it has been concurrent so that sudden
// interruptions don't have to wait for timeouts before our scores can be adjusted.
currentEWMA = max(currentEWMA, nanoToMillis(currentTimeNanos - pendingStamp));
currentEWMA = max(currentEWMA, nanoToMillis(currentTimeNanos - concurrentStamp));
}

// Add penalty for pending requests to account for "unaccounted" load.
// Add penalty for concurrent requests to account for "unaccounted" load.
// Penalty is the observed latency if known, else an arbitrarily high value which makes entities for which
// no latency data has yet been received (eg: request sent but not received), un-selectable.
final int pendingPenalty = (int) min(MAX_VALUE, (long) cPending * currentEWMA);
final int concurrentPenalty = (int) min(MAX_VALUE,
(long) concurrentCount * concurrentRequestPenalty * currentEWMA);
// Since we are measuring latencies and lower latencies are better, we turn the score as negative such that
// lower the latency, higher the score.
return MAX_VALUE - currentEWMA <= pendingPenalty ? MIN_VALUE : -(currentEWMA + pendingPenalty);
return MAX_VALUE - currentEWMA <= concurrentPenalty ? MIN_VALUE : -(currentEWMA + concurrentPenalty);
}

private static int applyPenalty(int currentEWMA, int currentLatency, long penalty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private final class BasicHealthIndicator extends DefaultRequestTracker

BasicHealthIndicator() {
super(outlierDetectorConfig.ewmaHalfLife().toNanos(), outlierDetectorConfig.ewmaCancellationPenalty(),
outlierDetectorConfig.ewmaCancellationPenalty());
outlierDetectorConfig.ewmaCancellationPenalty(), outlierDetectorConfig.concurrentRequestPenalty());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public final class OutlierDetectorConfig {
private final Duration ewmaHalfLife;
private final long ewmaCancellationPenalty;
private final long ewmaErrorPenalty;
private final long concurrentRequestPenalty;
private final boolean cancellationIsError;
private final int failedConnectionsThreshold;
private final Duration failureDetectorIntervalJitter;
Expand All @@ -69,7 +70,7 @@ public final class OutlierDetectorConfig {
private final Duration maxEjectionTime;

OutlierDetectorConfig(final Duration ewmaHalfLife, final long ewmaCancellationPenalty, final long ewmaErrorPenalty,
final boolean cancellationIsError, int failedConnectionsThreshold,
final long concurrentRequestPenalty, final boolean cancellationIsError, int failedConnectionsThreshold,
final Duration failureDetectorIntervalJitter,
final Duration serviceDiscoveryResubscribeInterval, final Duration serviceDiscoveryResubscribeJitter,
// true xDS settings
Expand All @@ -83,6 +84,7 @@ public final class OutlierDetectorConfig {
this.ewmaHalfLife = requireNonNull(ewmaHalfLife, "ewmaHalfLife");
this.ewmaCancellationPenalty = ensureNonNegative(ewmaCancellationPenalty, "ewmaCancellationPenalty");
this.ewmaErrorPenalty = ensureNonNegative(ewmaErrorPenalty, "ewmaErrorPenalty");
this.concurrentRequestPenalty = ensureNonNegative(concurrentRequestPenalty, "concurrentRequestPenalty");
this.cancellationIsError = cancellationIsError;
this.failedConnectionsThreshold = failedConnectionsThreshold;
this.failureDetectorIntervalJitter = requireNonNull(
Expand Down Expand Up @@ -145,6 +147,16 @@ public long ewmaErrorPenalty() {
return ewmaErrorPenalty;
}

/**
* The penalty factory to apply to concurrent requests.
* The EWMA penalty to apply to endpoints when there are concurrent requests. By penalizing endpoints with
* concurrent load the traffic distribution will be smoother for algorithms that consider load metrics.
* @return the penalty factory to use for concurrent load.
*/
public long concurrentRequestPenalty() {
return concurrentRequestPenalty;
}

/**
* The threshold for consecutive connection failures to a host.
* @return the threshold for consecutive connection failures to a host.
Expand Down Expand Up @@ -356,6 +368,7 @@ public static final class Builder {
static final Duration DEFAULT_EWMA_HALF_LIFE = Duration.ofSeconds(10);
static final long DEFAULT_CANCEL_PENALTY = 5L;
static final long DEFAULT_ERROR_PENALTY = 10L;
static final long DEFAULT_CONCURRENT_REQUEST_PENALTY = 1L;
private boolean cancellationIsError = true;

// Default xDS outlier detector settings.
Expand All @@ -378,6 +391,7 @@ public static final class Builder {
private Duration ewmaHalfLife = DEFAULT_EWMA_HALF_LIFE;
private long ewmaCancellationPenalty = DEFAULT_CANCEL_PENALTY;
private long ewmaErrorPenalty = DEFAULT_ERROR_PENALTY;
private long concurrentRequestPenalty = DEFAULT_CONCURRENT_REQUEST_PENALTY;
private int failedConnectionsThreshold = DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD;
private Duration intervalJitter = DEFAULT_HEALTH_CHECK_JITTER;
private Duration serviceDiscoveryResubscribeInterval = DEFAULT_HEALTH_CHECK_RESUBSCRIBE_INTERVAL;
Expand Down Expand Up @@ -448,7 +462,7 @@ public Builder() {
*/
public OutlierDetectorConfig build() {
return new OutlierDetectorConfig(ewmaHalfLife, ewmaCancellationPenalty, ewmaErrorPenalty,
cancellationIsError, failedConnectionsThreshold, intervalJitter,
concurrentRequestPenalty, cancellationIsError, failedConnectionsThreshold, intervalJitter,
serviceDiscoveryResubscribeInterval, serviceDiscoveryResubscribeJitter,
// xDS settings
consecutive5xx, failureDetectorInterval, baseEjectionTime,
Expand Down Expand Up @@ -512,6 +526,23 @@ public Builder cancellationIsError(final boolean cancellationIsError) {
return this;
}

/**
* Set the penalty factory to apply to concurrent requests.
* The EWMA penalty to apply to endpoints when there are concurrent requests. By penalizing endpoints with
* concurrent load the traffic distribution will be more fair for algorithms that consider load metrics.
* Larger penalties will favor a more even request distribution while lower penalties will bias traffic toward
* endpoints with better performance. A value of 0 disables the penalty, 1 is an intermediate value, and larger
* values such as 10 or more will strongly favor fairness over performance.
* Defaults to {@value DEFAULT_CONCURRENT_REQUEST_PENALTY}.
* @param ewmaConcurrentRequestPenalty the penalty factory to apply for concurrent load.
* @return {@code this}
*/
public Builder ewmaConcurrentRequestPenalty(final long ewmaConcurrentRequestPenalty) {
this.concurrentRequestPenalty = ensureNonNegative(
ewmaConcurrentRequestPenalty, "ewmaConcurrentRequestPenalty");
return this;
}

/**
* Configure an interval for re-subscribing to the original events stream in case all existing hosts become
* unhealthy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,13 @@ abstract class XdsHealthIndicator<ResolvedAddress, C extends LoadBalancedConnect

XdsHealthIndicator(final SequentialExecutor sequentialExecutor, final Executor executor,
final Duration ewmaHalfLife, final long cancellationPenalty, final long errorPenalty,
final long pendingRequestPenalty,
final boolean cancellationIsError, final ResolvedAddress address, String lbDescription,
final HostObserver hostObserver) {
super(requireNonNull(ewmaHalfLife, "ewmaHalfLife").toNanos(),
ensureNonNegative(cancellationPenalty, "cancellationPenalty"),
ensureNonNegative(errorPenalty, "errorPenalty"));
ensureNonNegative(errorPenalty, "errorPenalty"),
ensureNonNegative(pendingRequestPenalty, "pendingRequestPenalty"));
this.cancellationIsError = cancellationIsError;
this.sequentialExecutor = requireNonNull(sequentialExecutor, "sequentialExecutor");
this.executor = requireNonNull(executor, "executor");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ private final class XdsHealthIndicatorImpl extends XdsHealthIndicator<ResolvedAd
HostObserver hostObserver) {
super(sequentialExecutor, executor, outlierDetectorConfig.ewmaHalfLife(),
outlierDetectorConfig.ewmaCancellationPenalty(), outlierDetectorConfig.ewmaErrorPenalty(),
outlierDetectorConfig.cancellationIsError(), address, lbDescription, hostObserver);
outlierDetectorConfig.concurrentRequestPenalty(), outlierDetectorConfig.cancellationIsError(),
address, lbDescription, hostObserver);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.function.LongUnaryOperator;

import static io.servicetalk.loadbalancer.OutlierDetectorConfig.Builder.DEFAULT_CANCEL_PENALTY;
import static io.servicetalk.loadbalancer.OutlierDetectorConfig.Builder.DEFAULT_CONCURRENT_REQUEST_PENALTY;
import static io.servicetalk.loadbalancer.OutlierDetectorConfig.Builder.DEFAULT_ERROR_PENALTY;
import static java.time.Duration.ofSeconds;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -93,7 +94,8 @@ static final class TestRequestTracker extends DefaultRequestTracker {
private long lastValue;

TestRequestTracker(Duration measurementHalfLife, final LongUnaryOperator nextValueProvider) {
super(measurementHalfLife.toNanos(), DEFAULT_CANCEL_PENALTY, DEFAULT_ERROR_PENALTY);
super(measurementHalfLife.toNanos(), DEFAULT_CANCEL_PENALTY, DEFAULT_ERROR_PENALTY,
DEFAULT_CONCURRENT_REQUEST_PENALTY);
this.nextValueProvider = nextValueProvider;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ private class TestIndicator extends XdsHealthIndicator<String, TestLoadBalancedC

TestIndicator(final OutlierDetectorConfig config) {
super(sequentialExecutor, new NormalizedTimeSourceExecutor(testExecutor), ofSeconds(10),
config.ewmaCancellationPenalty(), config.ewmaErrorPenalty(), config.cancellationIsError(),
config.ewmaCancellationPenalty(), config.ewmaErrorPenalty(), config.concurrentRequestPenalty(),
config.cancellationIsError(),
"address", "description", NoopLoadBalancerObserver.<String>instance().hostObserver("address"));
this.config = config;
}
Expand Down

0 comments on commit c4198c2

Please sign in to comment.