Skip to content

Commit

Permalink
loadbalancer-experimental: consolidate outlier detector concerns into…
Browse files Browse the repository at this point in the history
… the OutlierDetectorConfig

Motivation:

We currently have two different outlier detector implementations:
- the xDS compatible implementation
- the L4 connection failure implementation.
Right now the configuration of them is odd: one is configured on
LoadBalancerBuilder and the other is passed to the xds outlier detector
factory. This is a really strange API.

Modifications:

- move l4 configuration options to OutlierDetectorConfig so everything is in one place
- make it easier to convert OutlierDetectorConfig to a builder
  • Loading branch information
bryce-anderson committed Mar 14, 2024
1 parent d4012e6 commit 7a1b5fe
Show file tree
Hide file tree
Showing 14 changed files with 414 additions and 208 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.servicetalk.loadbalancer.LoadBalancers;
import io.servicetalk.loadbalancer.OutlierDetectorConfig;
import io.servicetalk.loadbalancer.P2CLoadBalancingPolicy;
import io.servicetalk.loadbalancer.XdsOutlierDetectorFactory;
import io.servicetalk.transport.api.HostAndPort;

import java.net.InetSocketAddress;
Expand Down Expand Up @@ -70,7 +69,7 @@ private static LoadBalancerFactory<InetSocketAddress, FilterableStreamingHttpLoa
// Whether to try to use a host regardless of health status (default: false)
.failOpen(true)
.build())
.outlierDetectorFactory(new XdsOutlierDetectorFactory<>(
.outlierDetectorConfig(
// xDS compatible outlier detection has a number of tuning knobs. There are multiple detection
// algorithms describe in more detail below. In addition to the limits appropriate to each
// algorithm there are also parameters to tune the chances of enforcement when a particular
Expand All @@ -97,6 +96,6 @@ private static LoadBalancerFactory<InetSocketAddress, FilterableStreamingHttpLoa
// only allow 20% of hosts to be marked unhealthy at any one time
.maxEjectionPercentage(80)
.build()
)).build();
).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,7 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
private final int linearSearchSpace;
@Nullable
private final HealthCheckConfig healthCheckConfig;
@Nullable
private final OutlierDetector<ResolvedAddress, C> healthChecker;
private final OutlierDetector<ResolvedAddress, C> outlierDetector;
private final LoadBalancerObserver loadBalancerObserver;
private final ListenableAsyncCloseable asyncCloseable;

Expand All @@ -133,7 +132,7 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
final int linearSearchSpace,
final LoadBalancerObserver loadBalancerObserver,
@Nullable final HealthCheckConfig healthCheckConfig,
@Nullable final Function<String, OutlierDetector<ResolvedAddress, C>> outlierDetectorFactory) {
final Function<String, OutlierDetector<ResolvedAddress, C>> outlierDetectorFactory) {
this.targetResource = requireNonNull(targetResourceName);
this.lbDescription = makeDescription(id, targetResource);
this.hostSelector = requireNonNull(hostSelector, "hostSelector");
Expand All @@ -149,7 +148,7 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
this.asyncCloseable = toAsyncCloseable(this::doClose);
// Maintain a Subscriber so signals are always delivered to replay and new Subscribers get the latest signal.
eventStream.ignoreElements().subscribe();
this.healthChecker = outlierDetectorFactory == null ? null : outlierDetectorFactory.apply(lbDescription);
this.outlierDetector = requireNonNull(outlierDetectorFactory, "outlierDetectorFactory").apply(lbDescription);
// We subscribe to events as the very last step so that if we subscribe to an eager service discoverer
// we already have all the fields initialized.
subscribeToEvents(false);
Expand Down Expand Up @@ -178,9 +177,7 @@ private Completable doClose(final boolean graceful) {
if (!isClosed) {
discoveryCancellable.cancel();
eventStreamProcessor.onComplete();
if (healthChecker != null) {
healthChecker.cancel();
}
outlierDetector.cancel();
}
isClosed = true;
List<Host<ResolvedAddress, C>> currentList = usedHosts;
Expand Down Expand Up @@ -387,8 +384,7 @@ private void sequentialOnNext(Collection<? extends ServiceDiscovererEvent<Resolv
private Host<ResolvedAddress, C> createHost(ResolvedAddress addr) {
final LoadBalancerObserver.HostObserver hostObserver = loadBalancerObserver.hostObserver(addr);
// All hosts will share the healthcheck config of the parent RR loadbalancer.
final HealthIndicator indicator = healthChecker == null ?
null : healthChecker.newHealthIndicator(addr, hostObserver);
final HealthIndicator indicator = outlierDetector.newHealthIndicator(addr, hostObserver);
final Host<ResolvedAddress, C> host = new DefaultHost<>(lbDescription, addr, connectionFactory,
linearSearchSpace, hostObserver, healthCheckConfig, indicator);
if (indicator != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,7 @@ final class DefaultLoadBalancerBuilder<ResolvedAddress, C extends LoadBalancedCo
private Executor backgroundExecutor;
@Nullable
private LoadBalancerObserver loadBalancerObserver;
@Nullable
private OutlierDetectorFactory<ResolvedAddress, C> outlierDetectorFactory;
private Duration healthCheckInterval = DEFAULT_HEALTH_CHECK_INTERVAL;
private Duration healthCheckJitter = DEFAULT_HEALTH_CHECK_JITTER;
private int healthCheckFailedConnectionsThreshold = DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD;
private OutlierDetectorConfig outlierDetectorConfig = OutlierDetectorConfig.DEFAULT_CONFIG;
private Duration healthCheckResubscribeInterval = DEFAULT_HEALTH_CHECK_RESUBSCRIBE_INTERVAL;
private Duration healthCheckResubscribeJitter = DEFAULT_HEALTH_CHECK_JITTER;

Expand Down Expand Up @@ -85,9 +81,10 @@ public LoadBalancerBuilder<ResolvedAddress, C> loadBalancerObserver(
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> outlierDetectorFactory(
OutlierDetectorFactory<ResolvedAddress, C> outlierDetectorFactory) {
this.outlierDetectorFactory = outlierDetectorFactory;
public LoadBalancerBuilder<ResolvedAddress, C> outlierDetectorConfig(
@Nullable OutlierDetectorConfig outlierDetectorConfig) {
this.outlierDetectorConfig = outlierDetectorConfig == null ?
OutlierDetectorConfig.DEFAULT_CONFIG : outlierDetectorConfig;
return this;
}

Expand All @@ -97,14 +94,6 @@ public LoadBalancerBuilder<ResolvedAddress, C> backgroundExecutor(Executor backg
return this;
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> healthCheckInterval(Duration interval, Duration jitter) {
validateHealthCheckIntervals(interval, jitter);
this.healthCheckInterval = interval;
this.healthCheckJitter = jitter;
return this;
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> healthCheckResubscribeInterval(
Duration interval, Duration jitter) {
Expand All @@ -114,39 +103,34 @@ public LoadBalancerBuilder<ResolvedAddress, C> healthCheckResubscribeInterval(
return this;
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> healthCheckFailedConnectionsThreshold(
int threshold) {
if (threshold == 0) {
throw new IllegalArgumentException("Invalid health-check failed connections (expected != 0)");
}
this.healthCheckFailedConnectionsThreshold = threshold;
return this;
}

@Override
public LoadBalancerFactory<ResolvedAddress, C> build() {
final HealthCheckConfig healthCheckConfig;
if (this.healthCheckFailedConnectionsThreshold < 0) {
final Executor executor = getExecutor();
if (outlierDetectorConfig.failedConnectionsThreshold() < 0) {
healthCheckConfig = null;
} else {
healthCheckConfig = new HealthCheckConfig(getExecutor(),
healthCheckInterval, healthCheckJitter, healthCheckFailedConnectionsThreshold,
healthCheckConfig = new HealthCheckConfig(executor,
outlierDetectorConfig.failedConnectionsProbeInterval(),
outlierDetectorConfig.failedConnectionsProbeJitter(),
outlierDetectorConfig.failedConnectionsThreshold(),
healthCheckResubscribeInterval, healthCheckResubscribeJitter);
}
final LoadBalancerObserver loadBalancerObserver = this.loadBalancerObserver != null ?
this.loadBalancerObserver : NoopLoadBalancerObserver.instance();
Function<String, OutlierDetector<ResolvedAddress, C>> healthCheckerSupplier;
if (outlierDetectorFactory == null) {
healthCheckerSupplier = null;

final Function<String, OutlierDetector<ResolvedAddress, C>> outlierDetectorFactory;
if (outlierDetectorConfig.enforcingSuccessRate() != 0 ||
outlierDetectorConfig.enforcingConsecutive5xx() != 0 ||
outlierDetectorConfig.enforcingFailurePercentage() != 0) {
outlierDetectorFactory = (lbDescription) ->
new XdsOutlierDetector<ResolvedAddress, C>(executor, outlierDetectorConfig, lbDescription);
} else {
final Executor executor = getExecutor();
healthCheckerSupplier = (String lbDescrption) ->
outlierDetectorFactory.newHealthChecker(executor, lbDescrption);
outlierDetectorFactory = (lbDescription) ->
new NoopOutlierDetector<>(outlierDetectorConfig.ewmaHalfLife(), executor);
}

return new DefaultLoadBalancerFactory<>(id, loadBalancingPolicy, linearSearchSpace, healthCheckConfig,
loadBalancerObserver, healthCheckerSupplier);
loadBalancerObserver, outlierDetectorFactory);
}

private static final class DefaultLoadBalancerFactory<ResolvedAddress, C extends LoadBalancedConnection>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.servicetalk.client.api.ScoreSupplier;

import java.time.Duration;
import java.util.concurrent.locks.StampedLock;

import static io.servicetalk.utils.internal.NumberUtils.ensurePositive;
Expand All @@ -39,6 +40,8 @@
*/
abstract class DefaultRequestTracker implements RequestTracker, ScoreSupplier {
private static final long MAX_MS_TO_NS = NANOSECONDS.convert(MAX_VALUE, MILLISECONDS);

static final Duration DEFAULT_EWMA_HALF_LIFE = Duration.ofSeconds(10);
static final long DEFAULT_CANCEL_PENALTY = 5L;
static final long DEFAULT_ERROR_PENALTY = 10L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ public LoadBalancerBuilder<ResolvedAddress, C> loadBalancerObserver(
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> outlierDetectorFactory(
OutlierDetectorFactory<ResolvedAddress, C> outlierDetectorFactory) {
delegate = delegate.outlierDetectorFactory(outlierDetectorFactory);
public LoadBalancerBuilder<ResolvedAddress, C> outlierDetectorConfig(
@Nullable OutlierDetectorConfig outlierDetectorConfig) {
delegate = delegate.outlierDetectorConfig(outlierDetectorConfig);
return this;
}

Expand All @@ -86,24 +86,12 @@ public LoadBalancerBuilder<ResolvedAddress, C> linearSearchSpace(int linearSearc
return this;
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> healthCheckInterval(Duration interval, Duration jitter) {
delegate = delegate.healthCheckInterval(interval, jitter);
return this;
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> healthCheckResubscribeInterval(Duration interval, Duration jitter) {
delegate = delegate.healthCheckResubscribeInterval(interval, jitter);
return this;
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> healthCheckFailedConnectionsThreshold(int threshold) {
delegate = delegate.healthCheckFailedConnectionsThreshold(threshold);
return this;
}

@Override
public LoadBalancerFactory<ResolvedAddress, C> build() {
return delegate.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,28 +83,20 @@ LoadBalancerBuilder<ResolvedAddress, C> loadBalancingPolicy(
LoadBalancerBuilder<ResolvedAddress, C> loadBalancerObserver(@Nullable LoadBalancerObserver loadBalancerObserver);

/**
* Set the {@link OutlierDetectorFactory} to use with this load balancer.
* @param outlierDetectorFactory the {@link OutlierDetectorFactory} to use, or {@code null} to not use a
* {@link OutlierDetector}.
* Set the {@link OutlierDetectorConfig} to use with this load balancer.
* @param outlierDetectorConfig the {@link OutlierDetectorConfig} to use, or {@code null} to use the default
* outlier detection.
* @return {code this}
*/
LoadBalancerBuilder<ResolvedAddress, C> outlierDetectorFactory(
OutlierDetectorFactory<ResolvedAddress, C> outlierDetectorFactory);
LoadBalancerBuilder<ResolvedAddress, C> outlierDetectorConfig(
@Nullable OutlierDetectorConfig outlierDetectorConfig);

/**
* This {@link LoadBalancer} may monitor hosts to which connection establishment has failed
* using health checks that run in the background. The health check tries to establish a new connection
* and if it succeeds, the host is returned to the load balancing pool. As long as the connection
* establishment fails, the host is not considered for opening new connections for processed requests.
* If an {@link Executor} is not provided using this method, a default shared instance is used
* for all {@link LoadBalancer LoadBalancers} created by this factory.
* <p>
* {@link #healthCheckFailedConnectionsThreshold(int)} can be used to disable this mechanism and always
* consider all hosts for establishing new connections.
* Set the background {@link Executor} to use for determining time and scheduling background tasks such
* as those associated with outlier detection.
*
* @param backgroundExecutor {@link Executor} on which to schedule health checking.
* @param backgroundExecutor {@link Executor} to use as a time source and for scheduling background tasks.
* @return {@code this}.
* @see #healthCheckFailedConnectionsThreshold(int)
*/
LoadBalancerBuilder<ResolvedAddress, C> backgroundExecutor(Executor backgroundExecutor);

Expand All @@ -124,22 +116,6 @@ LoadBalancerBuilder<ResolvedAddress, C> outlierDetectorFactory(
*/
LoadBalancerBuilder<ResolvedAddress, C> linearSearchSpace(int linearSearchSpace);

// TODO: these healthCheck* methods should be moved into their own OutlierDetection configuration instance
// and much like the LoadBalancingPolicy, we should be able to add `OutlierDetectionPolicy`s
/**
* Configure an interval for health checking a host that failed to open connections. If no interval is provided
* using this method, a default value will be used.
* <p>
* {@link #healthCheckFailedConnectionsThreshold(int)} can be used to disable the health checking mechanism
* and always consider all hosts for establishing new connections.
*
* @param interval interval at which a background health check will be scheduled.
* @param jitter the amount of jitter to apply to each retry {@code interval}.
* @return {@code this}.
* @see #healthCheckFailedConnectionsThreshold(int)
*/
LoadBalancerBuilder<ResolvedAddress, C> healthCheckInterval(Duration interval, Duration jitter);

/**
* Configure an interval for re-subscribing to the original events stream in case all existing hosts become
* unhealthy.
Expand All @@ -148,32 +124,15 @@ LoadBalancerBuilder<ResolvedAddress, C> outlierDetectorFactory(
* known hosts become unhealthy, which could happen due to intermediate caching layers, re-subscribe to the
* events stream can help to exit from a dead state.
* <p>
* {@link #healthCheckFailedConnectionsThreshold(int)} can be used to disable the health checking mechanism
* and always consider all hosts for establishing new connections.
* Note: setting the interval to {@code Duration.ofNanos(Long.MAX_VALUE)} will effectively disable health check
* resubscribes.
*
* @param interval interval at which re-subscribes will be scheduled.
* @param jitter the amount of jitter to apply to each re-subscribe {@code interval}.
* @return {@code this}.
* @see #healthCheckFailedConnectionsThreshold(int)
*/
LoadBalancerBuilder<ResolvedAddress, C> healthCheckResubscribeInterval(Duration interval, Duration jitter);

/**
* Configure a threshold for consecutive connection failures to a host. When the {@link LoadBalancer}
* consecutively fails to open connections in the amount greater or equal to the specified value,
* the host will be marked as unhealthy and connection establishment will take place in the background
* repeatedly until a connection is established. During that time, the host will not take part in
* load balancing selection.
* <p>
* Use a negative value of the argument to disable health checking.
*
* @param threshold number of consecutive connection failures to consider a host unhealthy and eligible for
* background health checking. Use negative value to disable the health checking mechanism.
* @return {@code this}.
* @see #backgroundExecutor(Executor)
*/
LoadBalancerBuilder<ResolvedAddress, C> healthCheckFailedConnectionsThreshold(int threshold);

/**
* Builds the {@link LoadBalancerFactory} configured by this builder.
*
Expand Down
Loading

0 comments on commit 7a1b5fe

Please sign in to comment.