From 6b2b65e84c71c89560638e9241782ea8cdb0b797 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Wed, 3 Jan 2024 11:30:23 -0700 Subject: [PATCH] loadbalancer: selectors consider health first and have configurable fail-open behavior (#2787) Motivation: The health status of a connection is a course grained indicator of whether a host is likely to be able to serve traffic and should be the first consideration to selectors when picking hosts. A second issue is that it's not obvious what the desired behavior is when a healthy host cannot be found: it's going to be user specific whether to fail closed or just give it a try and see what happens. Modifications: - Switch the RR and P2C selectors to consider health first when picking hosts. - Add fail open behavior to both round robin and P2C: if a healthy host cannot be found we will try the first active candidate evaluated. Results: - Health is now considered first. This doesn't change much right now but will make L7 health status much more useful. - Fail open is supported, although off by default. --- .../loadbalancer/BaseHostSelector.java | 32 +++- .../servicetalk/loadbalancer/DefaultHost.java | 17 +- .../loadbalancer/DefaultLoadBalancer.java | 5 +- .../io/servicetalk/loadbalancer/Host.java | 14 +- .../loadbalancer/HostSelector.java | 10 +- .../loadbalancer/LoadBalancerObserver.java | 5 +- .../NoopLoadBalancerObserver.java | 2 +- .../loadbalancer/P2CLoadBalancingPolicy.java | 19 +- .../servicetalk/loadbalancer/P2CSelector.java | 108 +++++++---- .../RoundRobinLoadBalancerFactory.java | 2 +- .../RoundRobinLoadBalancingPolicy.java | 22 ++- .../loadbalancer/RoundRobinSelector.java | 45 +++-- .../loadbalancer/DefaultHostTest.java | 51 ++++- .../loadbalancer/DefaultLoadBalancerTest.java | 4 +- .../loadbalancer/P2CSelectorTest.java | 179 ++++++++++++------ .../loadbalancer/RoundRobinSelectorTest.java | 117 ++++++++++++ .../loadbalancer/SelectorTestHelpers.java | 53 ++++++ 17 files changed, 513 insertions(+), 172 deletions(-) create mode 100644 servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinSelectorTest.java create mode 100644 servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/SelectorTestHelpers.java diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/BaseHostSelector.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/BaseHostSelector.java index cf1056c7ca..8f4aa02bf2 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/BaseHostSelector.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/BaseHostSelector.java @@ -24,6 +24,7 @@ import javax.annotation.Nullable; import static io.servicetalk.concurrent.api.Single.failed; +import static io.servicetalk.concurrent.api.Single.succeeded; import static java.util.Objects.requireNonNull; abstract class BaseHostSelector @@ -51,10 +52,10 @@ public final int hostSetSize() { } @Override - public final boolean isUnHealthy() { + public final boolean isHealthy() { // TODO: in the future we may want to make this more of a "are at least X hosts available" question // so that we can compose a group of selectors into a priority set. - return allUnhealthy(hosts); + return anyHealthy(hosts); } protected final String getTargetResource() { @@ -67,21 +68,36 @@ protected final Single noActiveHostsFailure(List> us this.getClass(), "selectConnection(...)")); } + // This method assumes the host is considered healthy. + protected final @Nullable Single selectFromHost(Host host, Predicate selector, + boolean forceNewConnectionAndReserve, @Nullable ContextMap contextMap) { + // First see if we can get an existing connection regardless of health status. + if (!forceNewConnectionAndReserve) { + C c = host.pickConnection(selector, contextMap); + if (c != null) { + return succeeded(c); + } + } + // We can only create a new connection if the host is active. It's possible for it to think that + // it's healthy based on having connections but not being active but we weren't able to pick an + // existing connection. + return host.canMakeNewConnections() ? + host.newConnection(selector, forceNewConnectionAndReserve, contextMap) : null; + } + private Single noHostsFailure() { return failed(Exceptions.StacklessNoAvailableHostException.newInstance( "No hosts are available to connect for " + targetResource + ".", this.getClass(), "selectConnection(...)")); } - private static boolean allUnhealthy( + private static boolean anyHealthy( final List> usedHosts) { - boolean allUnhealthy = !usedHosts.isEmpty(); for (Host host : usedHosts) { - if (!host.isUnhealthy()) { - allUnhealthy = false; - break; + if (host.isHealthy()) { + return true; } } - return allUnhealthy; + return usedHosts.isEmpty(); } } diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/DefaultHost.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/DefaultHost.java index ea493f8492..a240fe061e 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/DefaultHost.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/DefaultHost.java @@ -68,7 +68,6 @@ final class DefaultHost implements Host< */ private static final float RANDOM_SEARCH_FACTOR = 0.75f; - private static final Object[] EMPTY_ARRAY = new Object[0]; private static final Logger LOGGER = LoggerFactory.getLogger(DefaultHost.class); private enum State { @@ -312,13 +311,15 @@ private void markUnhealthy(final Throwable cause) { } @Override - public boolean isActiveAndHealthy() { - return connState.isActive(); + public boolean isHealthy() { + final State state = connState.state; + return state != State.UNHEALTHY && state != State.CLOSED; } @Override - public boolean isUnhealthy() { - return connState.isUnhealthy(); + public boolean canMakeNewConnections() { + final State state = connState.state; + return state != State.EXPIRED && state != State.CLOSED; } private boolean addConnection(final C connection, final @Nullable HealthCheck currentHealthCheck) { @@ -380,8 +381,8 @@ private boolean addConnection(final C connection, final @Nullable HealthCheck cu // remove the connection (previously considered as the last one) from the array // in the next iteration. && connStateUpdater.compareAndSet(this, currentConnState, nextState.toClosed())) { - this.closeAsync().subscribe(); - hostObserver.onExpiredHostRemoved(address); + closeAsync().subscribe(); + hostObserver.onExpiredHostRemoved(address, nextState.connections.size()); break; } } else { @@ -435,6 +436,8 @@ private Completable doClose(final boolean graceful) { lbDescription, oldState.connections.size(), graceful ? "" : "un", address); if (oldState.state == State.ACTIVE) { hostObserver.onActiveHostRemoved(address, oldState.connections.size()); + } else if (oldState.state == State.EXPIRED) { + hostObserver.onExpiredHostRemoved(address, oldState.connections.size()); } final List connections = oldState.connections; return (connections.isEmpty() ? completed() : diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java index eec45139dd..225961295d 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java @@ -390,7 +390,6 @@ private Host createHost(ResolvedAddress addr) { currentHosts, host); // we only need to do anything else if we actually removed the host if (nextHosts.size() != currentHosts.size()) { - loadBalancerObserver.hostObserver().onExpiredHostRemoved(host.address()); sequentialUpdateUsedHosts(nextHosts); if (nextHosts.isEmpty()) { // We transitioned from non-empty to empty. That means we're not ready. @@ -472,7 +471,7 @@ private Single selectConnection0(final Predicate selector, @Nullable final Single result = currentHostSelector.selectConnection(selector, context, forceNewConnectionAndReserve); return result.beforeOnError(exn -> { if (exn instanceof NoActiveHostException) { - if (currentHostSelector.isUnHealthy()) { + if (!currentHostSelector.isHealthy()) { final long currNextResubscribeTime = nextResubscribeTime; if (currNextResubscribeTime >= 0 && healthCheckConfig.executor.currentTime(NANOSECONDS) >= currNextResubscribeTime && @@ -561,7 +560,7 @@ public HostSelector rebuildWithHosts(List extends Listen ResolvedAddress address(); /** - * Whether the host is both considered active by service discovery and healthy by the failure - * detection mechanisms. - * @return whether the host is both active and healthy + * Determine the health status of this host. + * @return whether the host considers itself healthy enough to serve traffic. This is best effort and does not + * guarantee that the request will succeed. */ - boolean isActiveAndHealthy(); + boolean isHealthy(); /** - * Whether the host is considered unhealthy bo the failure detection mechanisms. - * @return whether the host is considered unhealthy. + * Determine whether the host is in a state where it can make new connections. + * @return whether the host is in a state where it can make new connections. */ - boolean isUnhealthy(); + boolean canMakeNewConnections(); /** * Signal to the host that it has been re-discovered by the service-discovery mechanism and is expected diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/HostSelector.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/HostSelector.java index 9c5d6ffe02..521ffd23d0 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/HostSelector.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/HostSelector.java @@ -61,14 +61,14 @@ Single selectConnection(Predicate selector, @Nullable ContextMap context, HostSelector rebuildWithHosts(List> hosts); /** - * Whether the load balancer believes itself to unhealthy for serving traffic. + * Whether the load balancer believes itself to be healthy enough for serving traffic. *

* Note that this is both racy and best effort: just because a {@link HostSelector} is - * unhealthy doesn't guarantee that a request will fail nor does a healthy status indicate - * that this selector is guaranteed to successfully serve a request. - * @return whether the load balancer believes itself unhealthy enough and unlikely to successfully serve traffic. + * healthy doesn't guarantee that a request will succeed nor does an unhealthy status + * indicate that this selector is guaranteed to fail a request. + * @return whether the load balancer believes itself healthy enough and likely to successfully serve traffic. */ - boolean isUnHealthy(); + boolean isHealthy(); /** * The size of the host candidate pool for this host selector. diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/LoadBalancerObserver.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/LoadBalancerObserver.java index 7b4805632f..86158e7f07 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/LoadBalancerObserver.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/LoadBalancerObserver.java @@ -65,7 +65,7 @@ interface HostObserver { /** * Callback for when a host is removed by service discovery. * @param address the resolved address. - * @param connectionCount the number of connections that were associated with the host. + * @param connectionCount the number of open connections when the host was removed. */ void onActiveHostRemoved(ResolvedAddress address, int connectionCount); @@ -79,8 +79,9 @@ interface HostObserver { /** * Callback for when an expired host is removed. * @param address the resolved address. + * @param connectionCount the number of open connections when the host was removed. */ - void onExpiredHostRemoved(ResolvedAddress address); + void onExpiredHostRemoved(ResolvedAddress address, int connectionCount); /** * Callback for when a host is created. diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NoopLoadBalancerObserver.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NoopLoadBalancerObserver.java index baf5dfbf80..9aa1026f63 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NoopLoadBalancerObserver.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NoopLoadBalancerObserver.java @@ -63,7 +63,7 @@ public void onHostMarkedExpired(ResolvedAddress resolvedAddress, int connectionC } @Override - public void onExpiredHostRemoved(ResolvedAddress resolvedAddress) { + public void onExpiredHostRemoved(ResolvedAddress resolvedAddress, int connectionCount) { // noop } diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/P2CLoadBalancingPolicy.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/P2CLoadBalancingPolicy.java index 6fbf9e6e65..51c6e16ac7 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/P2CLoadBalancingPolicy.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/P2CLoadBalancingPolicy.java @@ -39,17 +39,19 @@ final class P2CLoadBalancingPolicy { private final int maxEffort; + private final boolean failOpen; @Nullable private final Random random; - private P2CLoadBalancingPolicy(final int maxEffort, @Nullable final Random random) { + private P2CLoadBalancingPolicy(final int maxEffort, final boolean failOpen, @Nullable final Random random) { this.maxEffort = maxEffort; + this.failOpen = failOpen; this.random = random; } @Override public HostSelector buildSelector(List> hosts, String targetResource) { - return new P2CSelector<>(hosts, targetResource, maxEffort, random); + return new P2CSelector<>(hosts, targetResource, maxEffort, failOpen, random); } @Override @@ -70,6 +72,7 @@ public static final class Builder { private static final int DEFAULT_MAX_EFFORT = 5; private int maxEffort = DEFAULT_MAX_EFFORT; + private boolean failOpen; @Nullable private Random random; @@ -87,6 +90,16 @@ public Builder maxEffort(final int maxEffort) { return this; } + /** + * Set whether the host selector should attempt to use an unhealthy {@link Host} as a last resort. + * @param failOpen whether the host selector should attempt to use an unhealthy {@link Host} as a last resort. + * @return this {@link Builder}. + */ + public Builder failOpen(final boolean failOpen) { + this.failOpen = failOpen; + return this; + } + // For testing purposes only. Builder random(Random random) { this.random = random; @@ -100,7 +113,7 @@ Builder random(Random random) { * @return the concrete {@link P2CLoadBalancingPolicy}. */ public P2CLoadBalancingPolicy build() { - return new P2CLoadBalancingPolicy<>(maxEffort, random); + return new P2CLoadBalancingPolicy<>(maxEffort, failOpen, random); } } } diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/P2CSelector.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/P2CSelector.java index bf13770443..5aa68b7d98 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/P2CSelector.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/P2CSelector.java @@ -26,8 +26,6 @@ import java.util.function.Predicate; import javax.annotation.Nullable; -import static io.servicetalk.concurrent.api.Single.succeeded; - /** * This {@link LoadBalancer} selection algorithm is based on work by Michael David Mitzenmacher in The Power of Two * Choices in Randomized Load Balancing. @@ -38,22 +36,24 @@ final class P2CSelector extends BaseHostSelector { + private final List> hosts; @Nullable private final Random random; private final int maxEffort; - private final List> hosts; + private final boolean failOpen; - P2CSelector(List> hosts, - final String targetResource, final int maxEffort, @Nullable final Random random) { + P2CSelector(List> hosts, final String targetResource, final int maxEffort, + final boolean failOpen, @Nullable final Random random) { super(hosts, targetResource); this.hosts = hosts; this.maxEffort = maxEffort; + this.failOpen = failOpen; this.random = random; } @Override public HostSelector rebuildWithHosts(List> hosts) { - return new P2CSelector<>(hosts, getTargetResource(), maxEffort, random); + return new P2CSelector<>(hosts, getTargetResource(), maxEffort, failOpen, random); } @Override @@ -67,8 +67,17 @@ protected Single selectConnection0(Predicate selector, @Nullable ContextMa " received an empty host set"); case 1: // There is only a single host, so we don't need to do any of the looping or comparison logic. - Single connection = selectFromHost(hosts.get(0), selector, forceNewConnectionAndReserve, context); - return connection == null ? noActiveHostsFailure(hosts) : connection; + Host host = hosts.get(0); + // If we're going to fail open we just yo-lo it, otherwise check if it's considered + // healthy. + if (failOpen || host.isHealthy()) { + Single result = selectFromHost( + host, selector, forceNewConnectionAndReserve, context); + if (result != null) { + return result; + } + } + return noActiveHostsFailure(hosts); default: return p2c(size, hosts, getRandom(), selector, forceNewConnectionAndReserve, context); } @@ -78,58 +87,73 @@ private Single p2c(int size, List> hosts, Random ran boolean forceNewConnectionAndReserve, @Nullable ContextMap contextMap) { // If there are only two hosts we only try once since there is no chance we'll select different hosts // on further iterations. + Host failOpenHost = null; for (int j = hosts.size() == 2 ? 1 : maxEffort; j > 0; j--) { // Pick two random indexes that don't collide. Limit the range on the second index to 1 less than // the max value so that if there is a collision we can safety increment. We also increment if - // i2 > i1 to avoid biased toward lower numbers since we limited the range by 1. + // i2 > i1 to avoid bias toward lower numbers since we limited the range by 1. final int i1 = random.nextInt(size); int i2 = random.nextInt(size - 1); if (i2 >= i1) { ++i2; } + Host t1 = hosts.get(i1); Host t2 = hosts.get(i2); - // Make t1 the preferred host by score to make the logic below a bit cleaner. - if (t1.score() < t2.score()) { - Host tmp = t1; - t1 = t2; - t2 = tmp; - } - - // Attempt to get a connection from t1 first since it's 'better'. If we can't, then try t2. - Single result = selectFromHost(t1, selector, forceNewConnectionAndReserve, contextMap); - if (result != null) { - return result; + final boolean t1Healthy = t1.isHealthy(); + final boolean t2Healthy = t2.isHealthy(); + // Priority of selection: health > score > failOpen + // Only if both hosts are healthy do we consider score. + if (t1Healthy && t2Healthy) { + // both are healthy. Select based on score, using t1 if equal. + if (t1.score() < t2.score()) { + Host tmp = t1; + t1 = t2; + t2 = tmp; + } + Single result = selectFromHost( + t1, selector, forceNewConnectionAndReserve, contextMap); + // We didn't get a connection from the first host: maybe it is inactive + // and we couldn't reserve a connection. Try the second host. + if (result == null) { + result = selectFromHost(t2, selector, forceNewConnectionAndReserve, contextMap); + } + // If we have a connection we're good to go. Otherwise fall through for another round. + // Since we didn't get a connection from either of them there is no reason to think they'll + // yield a connection if we make them the fallback. + if (result != null) { + return result; + } + } else if (t2Healthy) { + Single result = selectFromHost(t2, selector, forceNewConnectionAndReserve, contextMap); + if (result != null) { + return result; + } + } else if (t1Healthy) { + Single result = selectFromHost(t1, selector, forceNewConnectionAndReserve, contextMap); + if (result != null) { + return result; + } + } else if (failOpen && failOpenHost == null) { + // Both are unhealthy. If one of them can make new connections then it can be a backup. + if (t1.canMakeNewConnections()) { + failOpenHost = t1; + } else if (t2.canMakeNewConnections()) { + failOpenHost = t2; + } } - result = selectFromHost(t2, selector, forceNewConnectionAndReserve, contextMap); + } + // Max effort exhausted. We failed to find a healthy and active host. If we want to fail open and + // found an active host but it was considered unhealthy, try it anyway. + if (failOpenHost != null) { + Single result = selectFromHost(failOpenHost, selector, forceNewConnectionAndReserve, contextMap); if (result != null) { return result; } - // Neither t1 nor t2 yielded a connection. Fall through, potentially for another attempt. } - // Max effort exhausted. We failed to find a healthy and active host. return noActiveHostsFailure(hosts); } - @Nullable - private Single selectFromHost(Host host, Predicate selector, - boolean forceNewConnectionAndReserve, @Nullable ContextMap contextMap) { - // First see if we can get an existing connection regardless of health status. - if (!forceNewConnectionAndReserve) { - C c = host.pickConnection(selector, contextMap); - if (c != null) { - return succeeded(c); - } - } - // We need to make a new connection to the host but we'll only do so if it's considered healthy. - if (host.isActiveAndHealthy()) { - return host.newConnection(selector, forceNewConnectionAndReserve, contextMap); - } - - // no selectable active connections and the host is unhealthy, so we return `null`. - return null; - } - private Random getRandom() { return random == null ? ThreadLocalRandom.current() : random; } diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java index a77604eddf..ce79e63249 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java @@ -97,7 +97,7 @@ private LoadBalancer useNewRoundRobinLoadBalancer( final Publisher>> eventPublisher, final ConnectionFactory connectionFactory) { return new DefaultLoadBalancer<>(id, targetResource, eventPublisher, - new RoundRobinSelector<>(Collections.emptyList(), targetResource), + new RoundRobinSelector<>(Collections.emptyList(), targetResource, false), connectionFactory, linearSearchSpace, healthCheckConfig, null); } diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancingPolicy.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancingPolicy.java index 83b336276a..8ed1fe9892 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancingPolicy.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancingPolicy.java @@ -29,15 +29,16 @@ final class RoundRobinLoadBalancingPolicy implements LoadBalancingPolicy { - private static final RoundRobinLoadBalancingPolicy DEFAULT_POLICY = new RoundRobinLoadBalancingPolicy<>(); + private final boolean failOpen; - private RoundRobinLoadBalancingPolicy() { + private RoundRobinLoadBalancingPolicy(final boolean failOpen) { + this.failOpen = failOpen; } @Override public HostSelector buildSelector(final List> hosts, final String targetResource) { - return new RoundRobinSelector<>(hosts, targetResource); + return new RoundRobinSelector<>(hosts, targetResource, failOpen); } @Override @@ -50,6 +51,18 @@ public String name() { */ public static final class Builder { + private boolean failOpen; + + /** + * Set whether the host selector should attempt to use an unhealthy {@link Host} as a last resort. + * @param failOpen whether the host selector should attempt to use an unhealthy {@link Host} as a last resort. + * @return this {@link P2CLoadBalancingPolicy.Builder}. + */ + public RoundRobinLoadBalancingPolicy.Builder failOpen(final boolean failOpen) { + this.failOpen = failOpen; + return this; + } + /** * Construct the immutable {@link RoundRobinLoadBalancingPolicy}. * @param the type of the resolved address. @@ -58,8 +71,7 @@ public static final class Builder { */ public RoundRobinLoadBalancingPolicy build() { - // Right now there aren't any configurations for round-robin. - return (RoundRobinLoadBalancingPolicy) DEFAULT_POLICY; + return new RoundRobinLoadBalancingPolicy<>(failOpen); } } } diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinSelector.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinSelector.java index c59e7157f8..3f491fb9f6 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinSelector.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinSelector.java @@ -25,23 +25,24 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; -import static io.servicetalk.concurrent.api.Single.succeeded; - final class RoundRobinSelector extends BaseHostSelector { private final AtomicInteger index; private final List> usedHosts; + private final boolean failOpen; - RoundRobinSelector(final List> usedHosts, final String targetResource) { - this(new AtomicInteger(), usedHosts, targetResource); + RoundRobinSelector(final List> usedHosts, final String targetResource, + final boolean failOpen) { + this(new AtomicInteger(), usedHosts, targetResource, failOpen); } private RoundRobinSelector(final AtomicInteger index, final List> usedHosts, - final String targetResource) { + final String targetResource, final boolean failOpen) { super(usedHosts, targetResource); this.index = index; this.usedHosts = usedHosts; + this.failOpen = failOpen; } @Override @@ -50,37 +51,35 @@ protected Single selectConnection0( final boolean forceNewConnectionAndReserve) { // try one loop over hosts and if all are expired, give up final int cursor = (index.getAndIncrement() & Integer.MAX_VALUE) % usedHosts.size(); - Host pickedHost = null; + Host failOpenHost = null; for (int i = 0; i < usedHosts.size(); ++i) { // for a particular iteration we maintain a local cursor without contention with other requests final int localCursor = (cursor + i) % usedHosts.size(); final Host host = usedHosts.get(localCursor); - assert host != null : "Host can't be null."; - - if (!forceNewConnectionAndReserve) { - // First see if an existing connection can be used - C connection = host.pickConnection(selector, context); - if (connection != null) { - return succeeded(connection); + if (host.isHealthy()) { + Single result = selectFromHost(host, selector, forceNewConnectionAndReserve, context); + if (result != null) { + return result; } } - // Don't open new connections for expired or unhealthy hosts, try a different one. - // Unhealthy hosts have no open connections – that's why we don't fail earlier, the loop will not progress. - if (host.isActiveAndHealthy()) { - pickedHost = host; - break; + // If the host is active we can use it for backup. + if (failOpen && failOpenHost == null && host.canMakeNewConnections()) { + failOpenHost = host; } } - if (pickedHost == null) { - return noActiveHostsFailure(usedHosts); + if (failOpenHost != null) { + Single result = selectFromHost(failOpenHost, selector, forceNewConnectionAndReserve, context); + if (result != null) { + return result; + } } - // We have a host but no connection was selected: create a new one. - return pickedHost.newConnection(selector, forceNewConnectionAndReserve, context); + // We were unable to find a suitable host. + return noActiveHostsFailure(usedHosts); } @Override public HostSelector rebuildWithHosts(@Nonnull List> hosts) { - return new RoundRobinSelector<>(index, hosts, getTargetResource()); + return new RoundRobinSelector<>(index, hosts, getTargetResource(), failOpen); } } diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/DefaultHostTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/DefaultHostTest.java index 251b2c1540..57b716f9b5 100644 --- a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/DefaultHostTest.java +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/DefaultHostTest.java @@ -53,6 +53,10 @@ class DefaultHostTest { private HealthCheckConfig healthCheckConfig; private DefaultHost host; + static Predicate any() { + return __ -> true; + } + @BeforeEach void init() { mockHostObserver = MockLoadBalancerObserver.mockObserver().hostObserver(); @@ -99,6 +103,7 @@ void activeHostExpires() { verify(mockHostObserver, times(1)).onHostCreated(DEFAULT_ADDRESS); host.markExpired(); verify(mockHostObserver, times(1)).onHostMarkedExpired(DEFAULT_ADDRESS, 0); + verify(mockHostObserver, times(1)).onExpiredHostRemoved(DEFAULT_ADDRESS, 0); assertThat(host.onClose().toFuture().isDone(), is(true)); } @@ -112,7 +117,7 @@ void expiredHostClosesAfterLastConnectionClosed() throws Exception { assertThat(host.onClose().toFuture().isDone(), is(false)); cxn.closeAsync().toFuture().get(); assertThat(host.onClose().toFuture().isDone(), is(true)); - verify(mockHostObserver).onExpiredHostRemoved(DEFAULT_ADDRESS); + verify(mockHostObserver).onExpiredHostRemoved(DEFAULT_ADDRESS, 0); // shouldn't able to revive it. assertThat(host.markActiveIfNotClosed(), is(false)); } @@ -158,7 +163,47 @@ void l4ConsecutiveFailuresAreDetected() throws Exception { verify(mockHostObserver, times(1)).onHostRevived(DEFAULT_ADDRESS); } - static Predicate any() { - return __ -> true; + @Test + void hostStatus() throws Exception { + TestLoadBalancedConnection testLoadBalancedConnection = TestLoadBalancedConnection.mockConnection( + DEFAULT_ADDRESS); + UnhealthyHostConnectionFactory unhealthyHostConnectionFactory = new UnhealthyHostConnectionFactory( + DEFAULT_ADDRESS, 1, succeeded(testLoadBalancedConnection)); + connectionFactory = unhealthyHostConnectionFactory.createFactory(); + healthCheckConfig = new HealthCheckConfig(testExecutor, + Duration.ofSeconds(1), + Duration.ZERO, + DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD, + Duration.ofMillis(1), + Duration.ZERO); + buildHost(); + verify(mockHostObserver, times(1)).onHostCreated(DEFAULT_ADDRESS); + + assertThat(host.isHealthy(), is(true)); + assertThat(host.canMakeNewConnections(), is(true)); + + for (int i = 0; i < DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; i++) { + assertThrows(ExecutionException.class, + () -> host.newConnection(any(), false, null).toFuture().get()); + } + verify(mockHostObserver, times(1)).onHostMarkedUnhealthy(DEFAULT_ADDRESS, UNHEALTHY_HOST_EXCEPTION); + assertThat(host.isHealthy(), is(false)); + assertThat(host.canMakeNewConnections(), is(true)); + + // now revive and we should see the event and be able to get the connection. + unhealthyHostConnectionFactory.advanceTime(testExecutor); + verify(mockHostObserver, times(1)).onHostRevived(DEFAULT_ADDRESS); + assertThat(host.isHealthy(), is(true)); + assertThat(host.canMakeNewConnections(), is(true)); + + host.markExpired(); + verify(mockHostObserver, times(1)).onHostMarkedExpired(DEFAULT_ADDRESS, 1); + assertThat(host.isHealthy(), is(true)); + assertThat(host.canMakeNewConnections(), is(false)); + + host.closeAsync().toFuture().get(); + verify(mockHostObserver, times(1)).onExpiredHostRemoved(DEFAULT_ADDRESS, 1); + assertThat(host.isHealthy(), is(false)); + assertThat(host.canMakeNewConnections(), is(false)); } } diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/DefaultLoadBalancerTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/DefaultLoadBalancerTest.java index 4e80517642..267facd22a 100644 --- a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/DefaultLoadBalancerTest.java +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/DefaultLoadBalancerTest.java @@ -149,8 +149,8 @@ public HostSelector rebuildWithHosts( } @Override - public boolean isUnHealthy() { - return false; + public boolean isHealthy() { + return true; } @Override diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/P2CSelectorTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/P2CSelectorTest.java index ab24f6b949..bdc47c38d1 100644 --- a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/P2CSelectorTest.java +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/P2CSelectorTest.java @@ -16,19 +16,18 @@ package io.servicetalk.loadbalancer; import io.servicetalk.client.api.NoActiveHostException; -import io.servicetalk.concurrent.api.Single; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import java.util.ArrayList; import java.util.List; -import java.util.Random; import java.util.concurrent.ExecutionException; -import java.util.function.Predicate; import javax.annotation.Nullable; +import static io.servicetalk.loadbalancer.SelectorTestHelpers.PREDICATE; +import static io.servicetalk.loadbalancer.SelectorTestHelpers.connections; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; @@ -36,110 +35,156 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; class P2CSelectorTest { - private static final Predicate PREDICATE = (ignored) -> true; - + private boolean failOpen; + private int maxEffort; + @Nullable private HostSelector selector; - void init(List> hosts) { - init(hosts, 5, null); + @BeforeEach + void setup() { + // set the default values before each test. + selector = null; + failOpen = false; + maxEffort = 5; } - void init(List> hosts, int maxEffort, @Nullable Random random) { - selector = new P2CSelector<>(hosts, "testResource", maxEffort, random); - } - - private Host mockHost(String addr, TestLoadBalancedConnection connection) { - Host host = mock(Host.class); - when(host.address()).thenReturn(addr); - when(host.isUnhealthy()).thenReturn(true); - when(host.isActiveAndHealthy()).thenReturn(true); - when(host.pickConnection(any(), any())).thenReturn(connection); - when(host.newConnection(any(), anyBoolean(), any())).thenReturn(Single.succeeded(connection)); - return host; - } - - private List> connections(String... addresses) { - final List> results = new ArrayList<>(addresses.length); - for (String addr : addresses) { - results.add(mockHost(addr, TestLoadBalancedConnection.mockConnection(addr))); - } - return results; + void init(List> hosts) { + selector = new P2CSelector<>(hosts, "testResource", maxEffort, failOpen, null); } - @ParameterizedTest(name = "{displayName} [{index}]: forceNewConnection={0}") - @ValueSource(booleans = {false, true}) - void singleHost(boolean forceNewConnection) throws Exception { + @Test + void singleHealthyHost() throws Exception { init(connections("addr-1")); TestLoadBalancedConnection connection = selector.selectConnection( - PREDICATE, null, forceNewConnection).toFuture().get(); + PREDICATE, null, false).toFuture().get(); assertThat(connection.address(), equalTo("addr-1")); } - @ParameterizedTest(name = "{displayName} [{index}]: forceNewConnection={0}") + @ParameterizedTest(name = "{displayName} [{index}]: failOpen={0}") @ValueSource(booleans = {false, true}) - void singleUnhealthyHostWithConnection(boolean forceNewConnection) throws Exception { + void singleUnhealthyHost(boolean failOpen) throws Exception { List> hosts = connections("addr-1"); - when(hosts.get(0).isActiveAndHealthy()).thenReturn(false); + when(hosts.get(0).isHealthy()).thenReturn(false); + this.failOpen = failOpen; init(hosts); - if (forceNewConnection) { + if (failOpen) { + TestLoadBalancedConnection connection = selector.selectConnection( + PREDICATE, null, false).toFuture().get(); + assertThat(connection.address(), equalTo("addr-1")); + } else { Exception e = assertThrows(ExecutionException.class, () -> selector.selectConnection( - PREDICATE, null, forceNewConnection).toFuture().get()); + PREDICATE, null, false).toFuture().get()); assertThat(e.getCause(), isA(NoActiveHostException.class)); - } else { + } + } + + @ParameterizedTest(name = "{displayName} [{index}]: failOpen={0}") + @ValueSource(booleans = {false, true}) + void singleInactiveAndUnhealthyHostWithConnection(boolean failOpen) throws Exception { + List> hosts = connections("addr-1"); + when(hosts.get(0).isHealthy()).thenReturn(false); + when(hosts.get(0).canMakeNewConnections()).thenReturn(false); + this.failOpen = failOpen; + init(hosts); + if (failOpen) { TestLoadBalancedConnection connection = selector.selectConnection( - PREDICATE, null, forceNewConnection).toFuture().get(); + PREDICATE, null, false).toFuture().get(); assertThat(connection.address(), equalTo("addr-1")); + } else { + Exception e = assertThrows(ExecutionException.class, () -> selector.selectConnection( + PREDICATE, null, false).toFuture().get()); + assertThat(e.getCause(), isA(NoActiveHostException.class)); } } - @ParameterizedTest(name = "{displayName} [{index}]: forceNewConnection={0}") + @ParameterizedTest(name = "{displayName} [{index}]: failOpen={0}") @ValueSource(booleans = {false, true}) - void singleUnhealthyHostWithoutConnection(boolean forceNewConnection) { + void singleInactiveAndUnhealthyHostWithoutConnection(boolean failOpen) throws Exception { List> hosts = connections("addr-1"); - when(hosts.get(0).isActiveAndHealthy()).thenReturn(false); - when(hosts.get(0).pickConnection(any(), any())).thenReturn(null); + when(hosts.get(0).isHealthy()).thenReturn(false); + when(hosts.get(0).canMakeNewConnections()).thenReturn(false); + when(hosts.get(0).pickConnection(PREDICATE, null)).thenReturn(null); + this.failOpen = failOpen; init(hosts); + // We should never get a connection because we don't have one and an inactive host cant make one. Exception e = assertThrows(ExecutionException.class, () -> selector.selectConnection( - PREDICATE, null, forceNewConnection).toFuture().get()); + PREDICATE, null, false).toFuture().get()); assertThat(e.getCause(), isA(NoActiveHostException.class)); } - @ParameterizedTest(name = "{displayName} [{index}]: forceNewConnection={0}") - @ValueSource(booleans = {false, true}) - void twoHealthyHosts(boolean forceNewConnection) throws Exception { + @Test + void twoHealthyActiveHosts() throws Exception { List> hosts = connections("addr-1", "addr-2"); init(hosts); TestLoadBalancedConnection connection = selector.selectConnection( - PREDICATE, null, forceNewConnection).toFuture().get(); + PREDICATE, null, true).toFuture().get(); assertThat(connection.address(), either(equalTo("addr-1")).or(equalTo("addr-2"))); } - @Test - void twoUnHealthyHostsWithConnections() throws Exception { + @ParameterizedTest(name = "{displayName} [{index}]: failOpen={0}") + @ValueSource(booleans = {false, true}) + void twoHealthyInactiveHostsWithConnections(boolean failOpen) throws Exception { List> hosts = connections("addr-1", "addr-2"); for (Host host : hosts) { - when(host.isActiveAndHealthy()).thenReturn(false); + when(host.canMakeNewConnections()).thenReturn(false); } + this.failOpen = failOpen; init(hosts); TestLoadBalancedConnection connection = selector.selectConnection( PREDICATE, null, false).toFuture().get(); assertThat(connection.address(), either(equalTo("addr-1")).or(equalTo("addr-2"))); } - @Test - void twoUnHealthyHostsWithoutConnections() { + @ParameterizedTest(name = "{displayName} [{index}]: failOpen={0}") + @ValueSource(booleans = {false, true}) + void twoHealthyInactiveHostsWithoutConnections(boolean failOpen) throws Exception { List> hosts = connections("addr-1", "addr-2"); for (Host host : hosts) { - when(host.isActiveAndHealthy()).thenReturn(false); - when(host.pickConnection(any(), any())).thenReturn(null); + when(host.canMakeNewConnections()).thenReturn(false); + when(host.pickConnection(PREDICATE, null)).thenReturn(null); } + this.failOpen = failOpen; + init(hosts); + Exception e = assertThrows(ExecutionException.class, () -> selector.selectConnection( + PREDICATE, null, false).toFuture().get()); + assertThat(e.getCause(), isA(NoActiveHostException.class)); + } + + @ParameterizedTest(name = "{displayName} [{index}]: failOpen={0}") + @ValueSource(booleans = {false, true}) + void twoUnHealthyActiveHosts(boolean failOpen) throws Exception { + List> hosts = connections("addr-1", "addr-2"); + for (Host host : hosts) { + when(host.isHealthy()).thenReturn(false); + } + this.failOpen = failOpen; + init(hosts); + if (failOpen) { + TestLoadBalancedConnection connection = selector.selectConnection( + PREDICATE, null, false).toFuture().get(); + assertThat(connection.address(), either(equalTo("addr-1")).or(equalTo("addr-2"))); + } else { + Exception e = assertThrows(ExecutionException.class, () -> selector.selectConnection( + PREDICATE, null, false).toFuture().get()); + assertThat(e.getCause(), isA(NoActiveHostException.class)); + } + } + + @ParameterizedTest(name = "{displayName} [{index}]: failOpen={0}") + @ValueSource(booleans = {false, true}) + void twoUnHealthyInactiveHosts(boolean failOpen) { + List> hosts = connections("addr-1", "addr-2"); + for (Host host : hosts) { + when(host.isHealthy()).thenReturn(false); + when(host.canMakeNewConnections()).thenReturn(false); + } + this.failOpen = failOpen; init(hosts); Exception e = assertThrows(ExecutionException.class, () -> selector.selectConnection( PREDICATE, null, false).toFuture().get()); @@ -166,7 +211,7 @@ void selectsExistingConnectionsFromNonPreferredHost() throws Exception { // we setup the first host to always be preferred by score, but it also doesn't have any connections // and is unhealthy. when(hosts.get(0).pickConnection(any(), any())).thenReturn(null); - when(hosts.get(0).isActiveAndHealthy()).thenReturn(false); + when(hosts.get(0).isHealthy()).thenReturn(false); when(hosts.get(0).score()).thenReturn(10); init(hosts); TestLoadBalancedConnection connection = selector.selectConnection( @@ -177,12 +222,12 @@ void selectsExistingConnectionsFromNonPreferredHost() throws Exception { } @Test - void biasesTowardsActiveAndHealthyHostWhenMakingConnections() throws Exception { + void biasesTowardsHealthyHostWhenMakingConnections() throws Exception { List> hosts = connections("addr-1", "addr-2"); - when(hosts.get(0).isActiveAndHealthy()).thenReturn(false); + when(hosts.get(0).isHealthy()).thenReturn(false); init(hosts); TestLoadBalancedConnection connection = selector.selectConnection( - PREDICATE, null, true).toFuture().get(); + PREDICATE, null, false).toFuture().get(); assertThat(connection.address(), equalTo("addr-2")); } @@ -197,4 +242,18 @@ void biasesTowardTheHighestWeightHost(boolean forceNewConnection) throws Excepti PREDICATE, null, forceNewConnection).toFuture().get(); assertThat(connection.address(), equalTo("addr-1")); } + + @ParameterizedTest(name = "{displayName} [{index}]: unhealthy={0}") + @ValueSource(booleans = {false, true}) + void singleInactiveHostFailOpen(boolean unhealthy) { + List> hosts = connections("addr-1"); + when(hosts.get(0).isHealthy()).thenReturn(unhealthy); + when(hosts.get(0).canMakeNewConnections()).thenReturn(false); + when(hosts.get(0).pickConnection(PREDICATE, null)).thenReturn(null); + failOpen = true; + init(hosts); + Exception e = assertThrows(ExecutionException.class, () -> selector.selectConnection( + PREDICATE, null, false).toFuture().get()); + assertThat(e.getCause(), isA(NoActiveHostException.class)); + } } diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinSelectorTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinSelectorTest.java new file mode 100644 index 0000000000..e9eab938a1 --- /dev/null +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinSelectorTest.java @@ -0,0 +1,117 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.loadbalancer; + +import io.servicetalk.client.api.NoActiveHostException; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import javax.annotation.Nullable; + +import static io.servicetalk.loadbalancer.SelectorTestHelpers.PREDICATE; +import static io.servicetalk.loadbalancer.SelectorTestHelpers.connections; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.isA; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.when; + +class RoundRobinSelectorTest { + + private boolean failOpen; + @Nullable + private HostSelector selector; + + @BeforeEach + void setup() { + // set the default values before each test. + selector = null; + failOpen = false; + } + + void init(List> hosts) { + selector = new RoundRobinSelector<>(hosts, "testResource", failOpen); + } + + @Test + void roundRobining() throws Exception { + List> hosts = connections("addr-1", "addr-2"); + init(hosts); + List addresses = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + TestLoadBalancedConnection connection = selector.selectConnection( + PREDICATE, null, true).toFuture().get(); + addresses.add(connection.address()); + } + assertThat(addresses, contains("addr-1", "addr-2", "addr-1", "addr-2", "addr-1")); + } + + @Test + void skipUnhealthyHosts() throws Exception { + List> hosts = connections("addr-1", "addr-2"); + when(hosts.get(0).isHealthy()).thenReturn(false); + init(hosts); + List addresses = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + TestLoadBalancedConnection connection = selector.selectConnection( + PREDICATE, null, true).toFuture().get(); + addresses.add(connection.address()); + } + assertThat(addresses, contains("addr-2", "addr-2", "addr-2", "addr-2", "addr-2")); + } + + @ParameterizedTest(name = "{displayName} [{index}]: failOpen={0}") + @ValueSource(booleans = {false, true}) + void noHealthyHosts(boolean failOpen) throws Exception { + List> hosts = connections("addr-1"); + when(hosts.get(0).isHealthy()).thenReturn(false); + this.failOpen = failOpen; + init(hosts); + if (failOpen) { + List addresses = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + TestLoadBalancedConnection connection = selector.selectConnection( + PREDICATE, null, true).toFuture().get(); + addresses.add(connection.address()); + } + assertThat(addresses, contains("addr-1", "addr-1", "addr-1")); + } else { + Exception e = assertThrows(ExecutionException.class, () -> selector.selectConnection( + PREDICATE, null, false).toFuture().get()); + assertThat(e.getCause(), isA(NoActiveHostException.class)); + } + } + + @ParameterizedTest(name = "{displayName} [{index}]: unhealthy={0} failOpen={1}") + @CsvSource({"true,true", "true,false", "false,true", "false,false"}) + void singleInactiveHostWithoutConnections(boolean unhealthy, boolean failOpen) { + List> hosts = connections("addr-1"); + when(hosts.get(0).canMakeNewConnections()).thenReturn(false); + when(hosts.get(0).pickConnection(PREDICATE, null)).thenReturn(null); + this.failOpen = failOpen; + init(hosts); + Exception e = assertThrows(ExecutionException.class, () -> selector.selectConnection( + PREDICATE, null, false).toFuture().get()); + assertThat(e.getCause(), isA(NoActiveHostException.class)); + } +} diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/SelectorTestHelpers.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/SelectorTestHelpers.java new file mode 100644 index 0000000000..51148cf1cc --- /dev/null +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/SelectorTestHelpers.java @@ -0,0 +1,53 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.loadbalancer; + +import io.servicetalk.concurrent.api.Single; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Predicate; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +final class SelectorTestHelpers { + + static final Predicate PREDICATE = (ignored) -> true; + + private SelectorTestHelpers() { + } + + static List> connections(String... addresses) { + final List> results = new ArrayList<>(addresses.length); + for (String addr : addresses) { + results.add(mockHost(addr, TestLoadBalancedConnection.mockConnection(addr))); + } + return results; + } + + private static Host mockHost(String addr, TestLoadBalancedConnection connection) { + Host host = mock(Host.class); + when(host.address()).thenReturn(addr); + when(host.isHealthy()).thenReturn(true); + when(host.canMakeNewConnections()).thenReturn(true); + when(host.pickConnection(any(), any())).thenReturn(connection); + when(host.newConnection(any(), anyBoolean(), any())).thenReturn(Single.succeeded(connection)); + return host; + } +}