Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

loadbalancer: Simplify ConnectionFactory usage in DefaultHost #2796

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public Single<C> newConnection(
}
return newCnx.closeAsync().<C>concat(
failed(Exceptions.StacklessConnectionRejectedException.newInstance(
"Failed to add newly created connection " + newCnx + " for " + toString(),
"Failed to add newly created connection " + newCnx + " for " + this,
RoundRobinLoadBalancer.class, "selectConnection0(...)")))
.shareContextOnSubscribe();
});
Expand Down Expand Up @@ -489,7 +489,7 @@ public void schedule(final Throwable originalCause) {
.apply(0, originalCause)
// Remove any state from async context
.beforeOnSubscribe(__ -> AsyncContext.clear())
.concat(connectionFactory.newConnection(address, null, null)
.concat(newConnection(cxn -> true, false, null)
// There is no risk for StackOverflowError because result of each connection
// attempt will be invoked on IoExecutor as a new task.
.retryWhen(retryWithConstantBackoffDeltaJitter(
Expand All @@ -502,19 +502,10 @@ public void schedule(final Throwable originalCause) {
healthCheckConfig.jitter,
healthCheckConfig.executor)))
.flatMapCompletable(newCnx -> {
if (addConnection(newCnx, this)) {
LOGGER.info("{}: health check passed for {}, marked this " +
"host as ACTIVE for the selection algorithm.",
lbDescription, DefaultHost.this);
return completed();
} else {
// This happens only if the host is closed, no need to mark as healthy.
assert connState.state == State.CLOSED;
LOGGER.debug("{}: health check passed for {}, but the " +
"host rejected a new connection {}. Closing it now.",
lbDescription, DefaultHost.this, newCnx);
return newCnx.closeAsync();
}
LOGGER.info("{}: health check passed for {}, marked this " +
"host as ACTIVE for the selection algorithm.",
lbDescription, DefaultHost.this);
return completed();
})
// Use onErrorComplete instead of whenOnError to avoid double logging of an error inside
// subscribe(): SimpleCompletableSubscriber.
Expand Down
Loading