Skip to content

Commit

Permalink
loadbalancer-experimental: split ErrorClass into connection and reque…
Browse files Browse the repository at this point in the history
…st types (#2940)

Motivation:

We have separate interfaces for the connection and request tracking
in large part because we can't install our request tracker until we
actually get a connection. However, they both use the same ErrorClass
types.

Modifications:

- Split the error class types into two parts: those for connect path and
  those for request path.
  • Loading branch information
bryce-anderson authored May 24, 2024
1 parent 32c91b5 commit 4eeffeb
Show file tree
Hide file tree
Showing 16 changed files with 112 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.servicetalk.http.api.HttpRequestMetaData;
import io.servicetalk.http.api.HttpResponseMetaData;
import io.servicetalk.http.netty.HttpLifecycleObserverRequesterFilter;
import io.servicetalk.loadbalancer.ErrorClass;
import io.servicetalk.loadbalancer.RequestTracker;
import io.servicetalk.transport.api.ConnectionInfo;
import io.servicetalk.transport.api.ExecutionStrategy;
Expand All @@ -49,21 +48,21 @@ final class GrpcRequestTracker {
private static final GrpcLifecycleObserver.GrpcRequestObserver NOOP_REQUEST_OBSERVER =
new NoopGrpcRequestObserver();

private static final Function<GrpcStatus, ErrorClass> PEER_RESPONSE_ERROR_CLASSIFIER = (status) -> {
private static final Function<GrpcStatus, RequestTracker.ErrorClass> PEER_RESPONSE_ERROR_CLASSIFIER = (status) -> {
// TODO: this needs to be gone over with more detail.
switch (status.code()) {
case OK:
return null;
case CANCELLED:
return ErrorClass.CANCELLED;
return RequestTracker.ErrorClass.CANCELLED;
default:
return ErrorClass.EXT_ORIGIN_REQUEST_FAILED;
return RequestTracker.ErrorClass.EXT_ORIGIN_REQUEST_FAILED;
}
};

// TODO: this needs to be gone over with more detail.
private static final Function<Throwable, ErrorClass> ERROR_CLASS_FUNCTION = (exn) ->
ErrorClass.EXT_ORIGIN_REQUEST_FAILED;
private static final Function<Throwable, RequestTracker.ErrorClass> ERROR_CLASS_FUNCTION = (exn) ->
RequestTracker.ErrorClass.EXT_ORIGIN_REQUEST_FAILED;

private GrpcRequestTracker() {
// no instances
Expand Down Expand Up @@ -183,7 +182,7 @@ public void onResponseError(Throwable cause) {
public void onResponseCancel() {
final long startTime = finish();
if (checkOnce(startTime)) {
tracker.onRequestError(startTime, ErrorClass.CANCELLED);
tracker.onRequestError(startTime, RequestTracker.ErrorClass.CANCELLED);
}
}

Expand All @@ -194,7 +193,7 @@ public void onExchangeFinally() {

@Override
public void onGrpcStatus(GrpcStatus status) {
ErrorClass error = PEER_RESPONSE_ERROR_CLASSIFIER.apply(status);
RequestTracker.ErrorClass error = PEER_RESPONSE_ERROR_CLASSIFIER.apply(status);
if (error != null) {
final long startTime = finish();
if (checkOnce(startTime)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.servicetalk.grpc.api.GrpcStatusCode;
import io.servicetalk.grpc.api.GrpcStatusException;
import io.servicetalk.http.api.FilterableStreamingHttpConnection;
import io.servicetalk.loadbalancer.ErrorClass;
import io.servicetalk.loadbalancer.RequestTracker;
import io.servicetalk.transport.api.HostAndPort;
import io.servicetalk.transport.api.ServerContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.servicetalk.http.api.HttpLifecycleObserver;
import io.servicetalk.http.api.HttpRequestMetaData;
import io.servicetalk.http.api.HttpResponseMetaData;
import io.servicetalk.loadbalancer.ErrorClass;
import io.servicetalk.loadbalancer.RequestTracker;
import io.servicetalk.transport.api.ConnectionInfo;
import io.servicetalk.transport.api.ExecutionStrategy;
Expand All @@ -35,27 +34,20 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.ConnectException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Function;
import javax.annotation.Nullable;

import static io.servicetalk.http.api.HttpResponseStatus.StatusClass.SERVER_ERROR_5XX;
import static io.servicetalk.http.api.HttpResponseStatus.TOO_MANY_REQUESTS;
import static io.servicetalk.loadbalancer.ErrorClass.LOCAL_ORIGIN_CONNECT_FAILED;
import static io.servicetalk.loadbalancer.ErrorClass.LOCAL_ORIGIN_REQUEST_FAILED;
import static io.servicetalk.loadbalancer.RequestTracker.ErrorClass.EXT_ORIGIN_TIMEOUT;
import static io.servicetalk.loadbalancer.RequestTracker.ErrorClass.LOCAL_ORIGIN_REQUEST_FAILED;
import static io.servicetalk.loadbalancer.RequestTracker.REQUEST_TRACKER_KEY;

final class HttpRequestTracker {

private static final Logger LOGGER = LoggerFactory.getLogger(HttpRequestTracker.class);

private static final Function<Throwable, ErrorClass> ERROR_CLASSIFIER = t -> t instanceof ConnectException ?
LOCAL_ORIGIN_CONNECT_FAILED : LOCAL_ORIGIN_REQUEST_FAILED;
private static final Function<HttpResponseMetaData, ErrorClass> PEER_RESPONSE_ERROR_CLASSIFIER = resp ->
(resp.status().statusClass() == SERVER_ERROR_5XX || TOO_MANY_REQUESTS.equals(resp.status())) ?
ErrorClass.EXT_ORIGIN_REQUEST_FAILED : null;

private HttpRequestTracker() {
// no instances
}
Expand Down Expand Up @@ -161,7 +153,7 @@ public HttpLifecycleObserver.HttpRequestObserver onRequest(HttpRequestMetaData r

@Override
public HttpLifecycleObserver.HttpResponseObserver onResponse(HttpResponseMetaData responseMetaData) {
ErrorClass error = PEER_RESPONSE_ERROR_CLASSIFIER.apply(responseMetaData);
RequestTracker.ErrorClass error = classifyResponse(responseMetaData);
if (error != null) {
final long startTime = finish();
if (checkOnce(startTime)) {
Expand All @@ -182,15 +174,15 @@ public void onExchangeFinally() {
public void onResponseError(Throwable cause) {
final long startTime = finish();
if (checkOnce(startTime)) {
tracker.onRequestError(startTime, ERROR_CLASSIFIER.apply(cause));
tracker.onRequestError(startTime, classifyThrowable(cause));
}
}

@Override
public void onResponseCancel() {
final long startTime = finish();
if (checkOnce(startTime)) {
tracker.onRequestError(startTime, ErrorClass.CANCELLED);
tracker.onRequestError(startTime, RequestTracker.ErrorClass.CANCELLED);
}
}

Expand Down Expand Up @@ -223,4 +215,14 @@ private boolean checkOnce(long startTime) {
}
}
}

@Nullable
private static RequestTracker.ErrorClass classifyResponse(HttpResponseMetaData resp) {
return (resp.status().statusClass() == SERVER_ERROR_5XX || TOO_MANY_REQUESTS.equals(resp.status())) ?
RequestTracker.ErrorClass.EXT_ORIGIN_REQUEST_FAILED : null;
}

private static RequestTracker.ErrorClass classifyThrowable(Throwable error) {
return error instanceof TimeoutException ? EXT_ORIGIN_TIMEOUT : LOCAL_ORIGIN_REQUEST_FAILED;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.servicetalk.http.api.HttpResponse;
import io.servicetalk.http.api.HttpService;
import io.servicetalk.http.api.SingleAddressHttpClientBuilder;
import io.servicetalk.loadbalancer.ErrorClass;
import io.servicetalk.loadbalancer.RequestTracker;
import io.servicetalk.transport.api.HostAndPort;
import io.servicetalk.transport.api.ServerContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,25 @@ interface ConnectTracker {
* Callback to notify the parent {@link OutlierDetector} that an attempt to connect to this host has failed.
* @param beforeConnectStart the time that the connection attempt was initiated.
*/
void onConnectError(long beforeConnectStart);
void onConnectError(long beforeConnectStart, ErrorClass errorClass);

/**
* Classes of errors that resulted in connect failure.
*/
enum ErrorClass {
/**
* Failure due to cancellation.
*/
CANCELLED,

/**
* Failures related to locally enforced timeouts waiting to establish a usable session with the peer.
*/
CONNECT_TIMEOUT,

/**
* Failures for reasons other than cancellation and timeout.
*/
CONNECT_ERROR
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.servicetalk.loadbalancer;

import io.servicetalk.client.api.ConnectTimeoutException;
import io.servicetalk.client.api.ConnectionFactory;
import io.servicetalk.client.api.ConnectionLimitReachedException;
import io.servicetalk.client.api.DelegatingConnectionFactory;
Expand Down Expand Up @@ -48,6 +49,9 @@
import static io.servicetalk.concurrent.api.Single.failed;
import static io.servicetalk.concurrent.api.Single.succeeded;
import static io.servicetalk.concurrent.internal.FlowControlUtils.addWithOverflowProtection;
import static io.servicetalk.loadbalancer.ConnectTracker.ErrorClass.CANCELLED;
import static io.servicetalk.loadbalancer.ConnectTracker.ErrorClass.CONNECT_ERROR;
import static io.servicetalk.loadbalancer.ConnectTracker.ErrorClass.CONNECT_TIMEOUT;
import static io.servicetalk.loadbalancer.RequestTracker.REQUEST_TRACKER_KEY;
import static java.util.Collections.emptyList;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -626,13 +630,13 @@ public Single<C> newConnection(Addr addr, @Nullable ContextMap context, @Nullabl
return Single.defer(() -> {
final long connectStartTime = connectTracker.beforeConnectStart();
return delegate().newConnection(addr, context, observer)
.beforeFinally(new ConnectSignalConsumer<>(connectStartTime, connectTracker))
.beforeFinally(new ConnectSignalConsumer(connectStartTime, connectTracker))
.shareContextOnSubscribe();
});
}
}

private static class ConnectSignalConsumer<C extends LoadBalancedConnection> implements TerminalSignalConsumer {
private static class ConnectSignalConsumer implements TerminalSignalConsumer {

private final ConnectTracker connectTracker;
private final long connectStartTime;
Expand All @@ -650,16 +654,16 @@ public void onComplete() {
@Override
public void cancel() {
// We assume cancellation is the result of some sort of timeout.
doOnError();
doOnError(CANCELLED);
}

@Override
public void onError(Throwable t) {
doOnError();
doOnError(t instanceof ConnectTimeoutException ? CONNECT_TIMEOUT : CONNECT_ERROR);
}

private void doOnError() {
connectTracker.onConnectError(connectStartTime);
private void doOnError(ConnectTracker.ErrorClass errorClass) {
connectTracker.onConnectError(connectStartTime, errorClass);
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void onConnectSuccess(long beforeConnectStart) {
}

@Override
public void onConnectError(long beforeConnectStart) {
public void onConnectError(long beforeConnectStart, ConnectTracker.ErrorClass errorClass) {
// noop
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,38 @@ public interface RequestTracker {
* @param errorClass the class of error that triggered this method.
*/
void onRequestError(long beforeStartTimeNs, ErrorClass errorClass);

/**
* Enumeration of the main failure classes.
*/
enum ErrorClass {
/**
* Failures caused locally, these would be things that failed due to an exception locally.
*/
LOCAL_ORIGIN_REQUEST_FAILED(true),

/**
* Failures related to locally enforced timeouts waiting for responses from the peer.
*/
EXT_ORIGIN_TIMEOUT(false),

/**
* Failures returned from the remote peer. This will be things like 5xx responses.
*/
EXT_ORIGIN_REQUEST_FAILED(false),

/**
* Failure due to cancellation.
*/
CANCELLED(true);

private final boolean isLocal;
ErrorClass(boolean isLocal) {
this.isLocal = isLocal;
}

public boolean isLocal() {
return isLocal;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,10 @@ public final void onRequestSuccess(final long beforeStartTimeNs) {
}

@Override
public final void onRequestError(final long beforeStartTimeNs, ErrorClass errorClass) {
public final void onRequestError(final long beforeStartTimeNs, RequestTracker.ErrorClass errorClass) {
super.onRequestError(beforeStartTimeNs, errorClass);
// For now, don't consider cancellation to be an error or a success.
if (errorClass != ErrorClass.CANCELLED) {
if (errorClass != RequestTracker.ErrorClass.CANCELLED) {
doOnError();
}
}
Expand All @@ -157,11 +157,15 @@ public long beforeConnectStart() {
}

@Override
public void onConnectError(long beforeConnectStart) {
public void onConnectError(long beforeConnectStart, ConnectTracker.ErrorClass errorClass) {
// This assumes that the connect request was intended to be used for a request dispatch which
// will have now failed. This is not strictly true: a connection can be acquired and simply not
// used, but in practice it's a very good assumption.
doOnError();

// For now, don't consider cancellation to be an error or a success.
if (errorClass != ConnectTracker.ErrorClass.CANCELLED) {
doOnError();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,6 @@ void connectFailuresAreForwardedToHealthIndicator() {
host.newConnection(cxn -> true, false, null).toFuture().get()).getCause();
assertEquals(DELIBERATE_EXCEPTION, underlying);
verify(healthIndicator, times(1)).beforeConnectStart();
verify(healthIndicator, times(1)).onConnectError(0L);
verify(healthIndicator, times(1)).onConnectError(0L, ConnectTracker.ErrorClass.CONNECT_ERROR);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public void onConnectSuccess(long beforeConnectStart) {
}

@Override
public void onConnectError(long beforeConnectStart) {
public void onConnectError(long beforeConnectStart, ConnectTracker.ErrorClass errorClass) {
}

@Override
Expand All @@ -252,7 +252,7 @@ public void onRequestSuccess(long beforeStartTime) {
}

@Override
public void onRequestError(long beforeStartTime, ErrorClass errorClass) {
public void onRequestError(long beforeStartTime, RequestTracker.ErrorClass errorClass) {
}
}

Expand Down
Loading

0 comments on commit 4eeffeb

Please sign in to comment.