Skip to content

Commit

Permalink
loadbalancer-experimental: properly capture consecutive error signals (
Browse files Browse the repository at this point in the history
…#2984)

Motivation:

We have signals from the OutlierDetector to the DefaultLoadBalancer that signals when our endpoints health changes. However, for XdsOutlierDetector, we compute this as the difference on each iteration but consecutive 5xx is tripped immediately, so it may not be captured, and neither will recoveries which are most likely going to flip outside of an outlier detector scan.

Modifications:

- Instead of checking before and after each scan, save the last health status and check if that changed since the last scan.

Result:

We properly capture health state transitions at the point where the event is emitted.
  • Loading branch information
bryce-anderson authored Jun 24, 2024
1 parent 4e6ce9b commit 7e19105
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ final class FailurePercentageXdsOutlierDetectorAlgorithm<ResolvedAddress, C exte

@Override
public void detectOutliers(final OutlierDetectorConfig config,
final Collection<XdsHealthIndicator<ResolvedAddress, C>> indicators) {
final Collection<? extends XdsHealthIndicator<ResolvedAddress, C>> indicators) {
final long[] failurePercentages = new long[indicators.size()];
int i = 0;
int enoughVolumeHosts = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ final class SuccessRateXdsOutlierDetectorAlgorithm<ResolvedAddress, C extends Lo

@Override
public void detectOutliers(final OutlierDetectorConfig config,
final Collection<XdsHealthIndicator<ResolvedAddress, C>> indicators) {
final Collection<? extends XdsHealthIndicator<ResolvedAddress, C>> indicators) {
LOGGER.debug("Started outlier detection.");
final double[] successRates = new double[indicators.size()];
int i = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -76,8 +76,7 @@ final class XdsOutlierDetector<ResolvedAddress, C extends LoadBalancedConnection
private final Kernel kernel;
private final AtomicInteger indicatorCount = new AtomicInteger();
// Protected by `sequentialExecutor`.
// Note that this is a LinkedHashSet so as to preserve the iteration order.
private final Set<XdsHealthIndicator<ResolvedAddress, C>> indicators = new LinkedHashSet<>();
private final Set<XdsHealthIndicatorImpl> indicators = new HashSet<>();
// reads and writes are protected by `sequentialExecutor`.
private int ejectedHostCount;

Expand All @@ -100,7 +99,7 @@ final class XdsOutlierDetector<ResolvedAddress, C extends LoadBalancedConnection

@Override
public HealthIndicator<ResolvedAddress, C> newHealthIndicator(ResolvedAddress address, HostObserver hostObserver) {
XdsHealthIndicator<ResolvedAddress, C> result = new XdsHealthIndicatorImpl(
XdsHealthIndicatorImpl result = new XdsHealthIndicatorImpl(
address, kernel.config, hostObserver);
sequentialExecutor.execute(() -> indicators.add(result));
indicatorCount.incrementAndGet();
Expand Down Expand Up @@ -133,6 +132,9 @@ int ejectedHostCount() {

private final class XdsHealthIndicatorImpl extends XdsHealthIndicator<ResolvedAddress, C> {

// Protected by `sequentialExecutor`.
private boolean lastObservedHealthy = true;

XdsHealthIndicatorImpl(final ResolvedAddress address, OutlierDetectorConfig outlierDetectorConfig,
HostObserver hostObserver) {
super(sequentialExecutor, executor, outlierDetectorConfig.ewmaHalfLife(),
Expand Down Expand Up @@ -201,24 +203,25 @@ private Cancellable scheduleNextOutliersCheck(OutlierDetectorConfig currentConfi

private void sequentialCheckOutliers() {
assert sequentialExecutor.isCurrentThreadDraining();
boolean[] beforeState = new boolean[indicators.size()];
int i = 0;
for (HealthIndicator<?, ?> indicator : indicators) {
beforeState[i++] = indicator.isHealthy();
}

for (XdsOutlierDetectorAlgorithm<ResolvedAddress, C> outlierDetector : algorithms) {
outlierDetector.detectOutliers(config, indicators);
}
cancellable.nextCancellable(scheduleNextOutliersCheck(config));

// now check to see if any of our health states changed
i = 0;
for (HealthIndicator<?, ?> indicator : indicators) {
if (beforeState[i++] != indicator.isHealthy()) {
healthStatusChangeProcessor.onNext(null);
break;
// Check to see if any of our health states changed from the previous scan and fire an event if they did.
boolean emitChange = false;
for (XdsHealthIndicatorImpl indicator : indicators) {
boolean currentlyIsHealthy = indicator.isHealthy();
if (indicator.lastObservedHealthy != currentlyIsHealthy) {
indicator.lastObservedHealthy = currentlyIsHealthy;
emitChange = true;
}
}
if (emitChange) {
LOGGER.debug("Health status change observed. Emitting event.");
healthStatusChangeProcessor.onNext(null);
}
}
}

Expand All @@ -242,7 +245,7 @@ private static final class AlwaysHealthyOutlierDetectorAlgorithm<ResolvedAddress

@Override
public void detectOutliers(final OutlierDetectorConfig config,
final Collection<XdsHealthIndicator<ResolvedAddress, C>> indicators) {
final Collection<? extends XdsHealthIndicator<ResolvedAddress, C>> indicators) {
int unhealthy = 0;
for (XdsHealthIndicator indicator : indicators) {
// Hosts can still be marked unhealthy due to consecutive failures.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ interface XdsOutlierDetectorAlgorithm<ResolvedAddress, C extends LoadBalancedCon
* @param config the current {@link OutlierDetectorConfig} to use.
* @param indicators an ordered list of {@link HealthIndicator} instances to collect stats from.
*/
void detectOutliers(OutlierDetectorConfig config, Collection<XdsHealthIndicator<ResolvedAddress, C>> indicators);
void detectOutliers(OutlierDetectorConfig config,
Collection<? extends XdsHealthIndicator<ResolvedAddress, C>> indicators);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,19 @@

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

class XdsOutlierDetectorTest {
final class XdsOutlierDetectorTest {

private final TestExecutor executor = new TestExecutor();
OutlierDetectorConfig config = new OutlierDetectorConfig.Builder()
.failureDetectorInterval(Duration.ofSeconds(5), Duration.ZERO)
.ejectionTimeJitter(Duration.ZERO)
.baseEjectionTime(Duration.ofSeconds(2))
.build();

@Nullable
Expand Down Expand Up @@ -59,7 +61,7 @@ void cancellationOfEvictedHealthIndicatorMarksHostUnejected() {
init();
HealthIndicator<String, TestLoadBalancedConnection> healthIndicator = xdsOutlierDetector.newHealthIndicator(
"addr-1", NoopLoadBalancerObserver.instance().hostObserver("addr-1"));
eject(healthIndicator);
consecutiveFailureEject(healthIndicator);
assertThat(healthIndicator.isHealthy(), equalTo(false));
assertThat(xdsOutlierDetector.ejectedHostCount(), equalTo(1));
healthIndicator.cancel();
Expand All @@ -77,10 +79,10 @@ void maxHostRemovalIsHonored() {
"addr-1", NoopLoadBalancerObserver.instance().hostObserver("addr-1"));
HealthIndicator<String, TestLoadBalancedConnection> indicator2 = xdsOutlierDetector.newHealthIndicator(
"addr-2", NoopLoadBalancerObserver.instance().hostObserver("addr-2"));
eject(indicator1);
consecutiveFailureEject(indicator1);
assertThat(xdsOutlierDetector.ejectedHostCount(), equalTo(1));
assertThat(indicator1.isHealthy(), equalTo(false));
eject(indicator2);
consecutiveFailureEject(indicator2);
assertThat(xdsOutlierDetector.ejectedHostCount(), equalTo(1));
assertThat(indicator2.isHealthy(), equalTo(true));

Expand All @@ -101,13 +103,37 @@ void hostRevival() {
init();
HealthIndicator<String, TestLoadBalancedConnection> indicator = xdsOutlierDetector.newHealthIndicator(
"addr-1", NoopLoadBalancerObserver.instance().hostObserver("addr-1"));
eject(indicator);
consecutiveFailureEject(indicator);
assertThat(indicator.isHealthy(), equalTo(false));
executor.advanceTimeBy(config.baseEjectionTime().toNanos(), TimeUnit.NANOSECONDS);
assertThat(indicator.isHealthy(), equalTo(true));
}

private void eject(HealthIndicator<String, TestLoadBalancedConnection> indicator) {
@Test
void consecutiveFailuresTriggersHealthChangeSignal() {
config = new OutlierDetectorConfig.Builder(config)
// make it longer than the failure detector interval
.baseEjectionTime(config.failureDetectorInterval().multipliedBy(2))
.build();
init();
AtomicInteger healthChanges = new AtomicInteger();
xdsOutlierDetector.healthStatusChanged().forEach(ignored -> healthChanges.incrementAndGet());
HealthIndicator<String, TestLoadBalancedConnection> indicator = xdsOutlierDetector.newHealthIndicator(
"addr-1", NoopLoadBalancerObserver.instance().hostObserver("addr-1"));
consecutiveFailureEject(indicator);
assertThat(healthChanges.get(), equalTo(0));
assertThat(indicator.isHealthy(), equalTo(false));
executor.advanceTimeBy(config.failureDetectorInterval().toNanos(), TimeUnit.NANOSECONDS);
assertThat(indicator.isHealthy(), equalTo(false));
assertThat(healthChanges.get(), equalTo(1));

// We should revive after another interval
executor.advanceTimeBy(config.failureDetectorInterval().toNanos(), TimeUnit.NANOSECONDS);
assertThat(indicator.isHealthy(), equalTo(true));
assertThat(healthChanges.get(), equalTo(2));
}

private void consecutiveFailureEject(HealthIndicator<String, TestLoadBalancedConnection> indicator) {
for (int i = 0; i < config.consecutive5xx(); i++) {
indicator.onRequestError(indicator.beforeConnectStart(),
RequestTracker.ErrorClass.EXT_ORIGIN_REQUEST_FAILED);
Expand Down

0 comments on commit 7e19105

Please sign in to comment.