Skip to content

Commit

Permalink
traffic-resilience-http: Add dry-run mode for resilience filters (#3085)
Browse files Browse the repository at this point in the history
Motivation:

Sometimes people will want to test out the capacity limiters without
actually rejecting any traffic. This can help them tune their system
and generally just feel more confident.

Modifications:

Add dry-run mode.
  • Loading branch information
bryce-anderson authored Nov 7, 2024
1 parent 2e4a31d commit 8441a20
Show file tree
Hide file tree
Showing 5 changed files with 302 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ abstract class AbstractTrafficResilienceHttpFilter implements HttpExecutionStrat
private final Supplier<Function<HttpRequestMetaData, CircuitBreaker>> circuitBreakerPartitionsSupplier;

private final TrafficResiliencyObserver observer;
private final boolean dryRun;

AbstractTrafficResilienceHttpFilter(
final Supplier<Function<HttpRequestMetaData, CapacityLimiter>> capacityPartitionsSupplier,
Expand All @@ -99,7 +100,9 @@ abstract class AbstractTrafficResilienceHttpFilter implements HttpExecutionStrat
final Consumer<Ticket> onCancellationTicketTerminal,
final BiConsumer<Ticket, Throwable> onErrorTicketTerminal,
final Supplier<Function<HttpRequestMetaData, CircuitBreaker>> circuitBreakerPartitionsSupplier,
final TrafficResiliencyObserver observer) {
final TrafficResiliencyObserver observer,
final boolean dryRun
) {
this.capacityPartitionsSupplier = requireNonNull(capacityPartitionsSupplier, "capacityPartitionsSupplier");
this.rejectWhenNotMatchedCapacityPartition = rejectWhenNotMatchedCapacityPartition;
this.capacityRejectionPredicate = requireNonNull(capacityRejectionPredicate, "capacityRejectionPredicate");
Expand All @@ -112,6 +115,7 @@ abstract class AbstractTrafficResilienceHttpFilter implements HttpExecutionStrat
this.circuitBreakerPartitionsSupplier = requireNonNull(circuitBreakerPartitionsSupplier,
"circuitBreakerPartitionsSupplier");
this.observer = requireNonNull(observer, "observer");
this.dryRun = dryRun;
}

@Override
Expand All @@ -136,7 +140,7 @@ Function<HttpResponseMetaData, Duration> newDelayProvider() {
return __ -> Duration.ZERO;
}

Single<StreamingHttpResponse> applyCapacityControl(
final Single<StreamingHttpResponse> applyCapacityControl(
final Function<HttpRequestMetaData, CapacityLimiter> capacityPartitions,
final Function<HttpRequestMetaData, CircuitBreaker> circuitBreakerPartitions,
final Function<HttpRequestMetaData, Classification> classifier,
Expand All @@ -151,7 +155,7 @@ Single<StreamingHttpResponse> applyCapacityControl(
if (partition == null) {
observer.onRejectedUnmatchedPartition(request);
return rejectWhenNotMatchedCapacityPartition ?
handleLocalCapacityRejection(null, request, responseFactory)
doHandleLocalCapacityRejection(delegate, null, request, responseFactory)
.shareContextOnSubscribe() :
handlePassthrough(delegate, request)
.shareContextOnSubscribe();
Expand All @@ -166,22 +170,23 @@ Single<StreamingHttpResponse> applyCapacityControl(

if (ticket == null) {
observer.onRejectedLimit(request, partition.name(), meta, classification);
return handleLocalCapacityRejection(serverListenContext, request, responseFactory)
return doHandleLocalCapacityRejection(delegate, serverListenContext, request, responseFactory)
.shareContextOnSubscribe();
}
final CircuitBreaker breaker = circuitBreakerPartitions.apply(request);
if (breaker != null && !breaker.tryAcquirePermit()) {
observer.onRejectedOpenCircuit(request, breaker.name(), meta, classification);
// Ignore the acquired ticket if breaker was open.
ticket.ignored();
return handleLocalBreakerRejection(request, responseFactory, breaker).shareContextOnSubscribe();
return doHandleLocalBreakerRejection(delegate, request, responseFactory, breaker)
.shareContextOnSubscribe();
}

// Ticket lifetime must be completed at all points now, try/catch to ensure if anything throws (e.g.
// reactive flow isn't followed) we still complete ticket lifetime.
try {
final TicketObserver ticketObserver = observer.onAllowedThrough(request, ticket.state());
return handleAllow(delegate, delayProvider, request, wrapTicket(serverListenContext, ticket),
return doHandleAllow(delegate, delayProvider, request, wrapTicket(serverListenContext, ticket),
ticketObserver, breaker, startTime).shareContextOnSubscribe();
} catch (Throwable cause) {
onError(cause, breaker, startTime, ticket);
Expand All @@ -194,11 +199,29 @@ Ticket wrapTicket(@Nullable final ServerListenContext serverListenContext, final
return ticket;
}

private Single<StreamingHttpResponse> doHandleLocalCapacityRejection(
Function<StreamingHttpRequest, Single<StreamingHttpResponse>> delegate,
@Nullable ServerListenContext serverListenContext,
StreamingHttpRequest request,
@Nullable StreamingHttpResponseFactory responseFactory) {
return dryRun ? handlePassthrough(delegate, request) :
handleLocalCapacityRejection(serverListenContext, request, responseFactory);
}

abstract Single<StreamingHttpResponse> handleLocalCapacityRejection(
@Nullable ServerListenContext serverListenContext,
StreamingHttpRequest request,
@Nullable StreamingHttpResponseFactory responseFactory);

private Single<StreamingHttpResponse> doHandleLocalBreakerRejection(
Function<StreamingHttpRequest, Single<StreamingHttpResponse>> delegate,
StreamingHttpRequest request,
@Nullable StreamingHttpResponseFactory responseFactory,
CircuitBreaker breaker) {
return dryRun ? handlePassthrough(delegate, request) :
handleLocalBreakerRejection(request, responseFactory, breaker);
}

abstract Single<StreamingHttpResponse> handleLocalBreakerRejection(
StreamingHttpRequest request,
@Nullable StreamingHttpResponseFactory responseFactory,
Expand All @@ -219,6 +242,16 @@ private static Single<StreamingHttpResponse> handlePassthrough(
return delegate.apply(request);
}

private Single<StreamingHttpResponse> doHandleAllow(
final Function<StreamingHttpRequest, Single<StreamingHttpResponse>> delegate,
final Function<HttpResponseMetaData, Duration> delayProvider, final StreamingHttpRequest request,
final Ticket ticket, final TicketObserver ticketObserver, @Nullable final CircuitBreaker breaker,
final long startTimeNs) {
return dryRun ? dryRunHandleAllow(
delegate, delayProvider, request, ticket, ticketObserver, breaker, startTimeNs) :
handleAllow(delegate, delayProvider, request, ticket, ticketObserver, breaker, startTimeNs);
}

private Single<StreamingHttpResponse> handleAllow(
final Function<StreamingHttpRequest, Single<StreamingHttpResponse>> delegate,
final Function<HttpResponseMetaData, Duration> delayProvider,
Expand Down Expand Up @@ -249,37 +282,101 @@ private Single<StreamingHttpResponse> handleAllow(
}
return Single.succeeded(resp).shareContextOnSubscribe();
})
.liftSync(new BeforeFinallyHttpOperator(new TerminalSignalConsumer() {
@Override
public void onComplete() {
try {
if (breaker != null) {
breaker.onSuccess(nanoTime() - startTimeNs, NANOSECONDS);
}
} finally {
onSuccessTicketTerminal.accept(ticket);
ticketObserver.onComplete();
}
}
.liftSync(new BeforeFinallyHttpOperator(
new SignalConsumer(this, ticket, ticketObserver, breaker, startTimeNs)));
}

@Override
public void onError(final Throwable throwable) {
AbstractTrafficResilienceHttpFilter.this.onError(throwable, breaker, startTimeNs, ticket);
ticketObserver.onError(throwable);
private Single<StreamingHttpResponse> dryRunHandleAllow(
final Function<StreamingHttpRequest, Single<StreamingHttpResponse>> delegate,
final Function<HttpResponseMetaData, Duration> delayProvider, final StreamingHttpRequest request,
final Ticket ticket,
final TicketObserver ticketObserver, @Nullable final CircuitBreaker breaker, final long startTimeNs) {
SignalConsumer signalConsumer = new SignalConsumer(
this, ticket, ticketObserver, breaker, startTimeNs);
return delegate.apply(request)
// This logic has the same general structure as the `handleAllow` case, but there is a twist: we want
// to always return the successful single even when we fail the predicates. Because we use a latch in
// SignalConsumer it's safe to signal those conditions eagerly in the `.flatMap` operation and let the
// request proceed as usual and any following signals (such as from body processing) will be ignored.
.map(resp -> {
if (breaker != null && breakerRejectionPredicate.test(resp)) {
Exception rejection = peerBreakerRejection(resp, breaker, delayProvider);
signalConsumer.onError(rejection);
} else if (capacityRejectionPredicate.test(resp)) {
final RuntimeException rejection = peerRejection(resp);
signalConsumer.onError(rejection);
}
return resp;
})
.liftSync(new BeforeFinallyHttpOperator(signalConsumer));
}

@Override
public void cancel() {
try {
if (breaker != null) {
breaker.ignorePermit();
}
} finally {
onCancellationTicketTerminal.accept(ticket);
ticketObserver.onCancel();
}
}
}, true));
private static final class SignalConsumer implements TerminalSignalConsumer {

private static final AtomicIntegerFieldUpdater<SignalConsumer> SIGNALLED_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(SignalConsumer.class, "signalled");

private final AbstractTrafficResilienceHttpFilter parent;
private final Ticket ticket;
private final TicketObserver ticketObserver;
@Nullable
private final CircuitBreaker breaker;
private final long startTimeNs;

private volatile int signalled;

SignalConsumer(final AbstractTrafficResilienceHttpFilter parent, final Ticket ticket,
final TicketObserver ticketObserver, @Nullable final CircuitBreaker breaker,
final long startTimeNs) {
this.parent = parent;
this.ticket = ticket;
this.ticketObserver = ticketObserver;
this.breaker = breaker;
this.startTimeNs = startTimeNs;
}

@Override
public void onComplete() {
if (!once()) {
return;
}
try {
if (breaker != null) {
breaker.onSuccess(nanoTime() - startTimeNs, NANOSECONDS);
}
} finally {
parent.onSuccessTicketTerminal.accept(ticket);
ticketObserver.onComplete();
}
}

@Override
public void onError(final Throwable throwable) {
if (!once()) {
return;
}
parent.onError(throwable, breaker, startTimeNs, ticket);
ticketObserver.onError(throwable);
}

@Override
public void cancel() {
if (!once()) {
return;
}
try {
if (breaker != null) {
breaker.ignorePermit();
}
} finally {
parent.onCancellationTicketTerminal.accept(ticket);
ticketObserver.onCancel();
}
}

private boolean once() {
return SIGNALLED_UPDATER.compareAndSet(this, 0, 1);
}
}

private void onError(final Throwable throwable, @Nullable final CircuitBreaker breaker,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,11 @@ private TrafficResilienceHttpClientFilter(final Supplier<Function<HttpRequestMet
@Nullable final Supplier<Function<HttpResponseMetaData, Duration>>
focreOpenCircuitOnPeerCircuitRejectionsDelayProvider,
@Nullable final Executor circuitBreakerResetExecutor,
final TrafficResiliencyObserver observer) {
final TrafficResiliencyObserver observer,
final boolean dryRun) {
super(capacityPartitionsSupplier, rejectWhenNotMatchedCapacityPartition, classifier,
clientPeerRejectionPolicy.predicate(), breakerRejectionPredicate, onCompletion, onCancellation,
onError, circuitBreakerPartitionsSupplier, observer);
onError, circuitBreakerPartitionsSupplier, observer, dryRun);
this.clientPeerRejectionPolicy = clientPeerRejectionPolicy;
this.forceOpenCircuitOnPeerCircuitRejections = forceOpenCircuitOnPeerCircuitRejections;
this.focreOpenCircuitOnPeerCircuitRejectionsDelayProvider =
Expand Down Expand Up @@ -256,6 +257,7 @@ public static final class Builder {
@Nullable
private Executor circuitBreakerResetExecutor;
private TrafficResiliencyObserver observer = NoOpTrafficResiliencyObserver.INSTANCE;
private boolean dryRun;

/**
* A {@link TrafficResilienceHttpClientFilter} with no partitioning schemes.
Expand Down Expand Up @@ -526,6 +528,18 @@ public Builder observer(final TrafficResiliencyObserver observer) {
return this;
}

/**
* Use the resilience filter in dry-run mode.
* In dry-run mode the capacity limiter will track requests and log their results but request which would
* have been rejected will instead pass through to the underlying client.
* @param dryRun whether to use the resilience filter in dry-run mode.
* @return {@code this}
*/
public Builder dryRun(final boolean dryRun) {
this.dryRun = dryRun;
return this;
}

/**
* Invoke to build an instance of {@link TrafficResilienceHttpClientFilter} filter to be used inside the
* HttpClientBuilder.
Expand All @@ -539,7 +553,8 @@ public TrafficResilienceHttpClientFilter build() {
circuitBreakerPartitionsSupplier, classifier, clientPeerRejectionPolicy,
peerUnavailableRejectionPredicate, onCompletionTicketTerminal, onCancellationTicketTerminal,
onErrorTicketTerminal, forceOpenCircuitOnPeerCircuitRejections,
focreOpenCircuitOnPeerCircuitRejectionsDelayProvider, circuitBreakerResetExecutor, observer);
focreOpenCircuitOnPeerCircuitRejectionsDelayProvider, circuitBreakerResetExecutor, observer,
dryRun);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,10 @@ private TrafficResilienceHttpServiceFilter(final Supplier<Function<HttpRequestMe
final Supplier<Function<HttpRequestMetaData, CircuitBreaker>>
circuitBreakerPartitionsSupplier,
final ServiceRejectionPolicy onServiceRejectionPolicy,
final TrafficResiliencyObserver observer) {
final TrafficResiliencyObserver observer,
final boolean dryRun) {
super(capacityPartitionsSupplier, rejectNotMatched, classifier, __ -> false, __ -> false,
onCompletion, onCancellation, onError, circuitBreakerPartitionsSupplier, observer);
onCompletion, onCancellation, onError, circuitBreakerPartitionsSupplier, observer, dryRun);
this.serviceRejectionPolicy = onServiceRejectionPolicy;
}

Expand Down Expand Up @@ -191,6 +192,7 @@ public static final class Builder {
}
};
private TrafficResiliencyObserver observer = NoOpTrafficResiliencyObserver.INSTANCE;
private boolean dryRun;

/**
* A {@link TrafficResilienceHttpServiceFilter} with no partitioning schemes.
Expand Down Expand Up @@ -377,6 +379,18 @@ public Builder observer(final TrafficResiliencyObserver observer) {
return this;
}

/**
* Use the resilience filter in dry-run mode.
* In dry-run mode the capacity limiter will track requests and log their results but request which would
* have been rejected will instead pass through to the underlying client.
* @param dryRun whether to use the resilience filter in dry-run mode.
* @return {@code this}
*/
public Builder dryRun(final boolean dryRun) {
this.dryRun = dryRun;
return this;
}

/**
* Invoke to build an instance of {@link TrafficResilienceHttpServiceFilter} filter to be used inside the
* {@link HttpServerBuilder}.
Expand All @@ -386,7 +400,8 @@ public Builder observer(final TrafficResiliencyObserver observer) {
public TrafficResilienceHttpServiceFilter build() {
return new TrafficResilienceHttpServiceFilter(capacityPartitionsSupplier, rejectNotMatched,
classifier, onCompletionTicketTerminal, onCancellationTicketTerminal,
onErrorTicketTerminal, circuitBreakerPartitionsSupplier, onServiceRejectionPolicy, observer);
onErrorTicketTerminal, circuitBreakerPartitionsSupplier, onServiceRejectionPolicy, observer,
dryRun);
}
}

Expand Down
Loading

0 comments on commit 8441a20

Please sign in to comment.