Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added unit test for fallback rejection #687

Merged
merged 1 commit into from
Feb 19, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 36 additions & 42 deletions hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -675,51 +675,12 @@ public void call() {
* <p>
* If something in the <code>getFallback()</code> implementation is latent (such as a network call) then the semaphore will cause us to start rejecting requests rather than allowing potentially
* all threads to pile up and block.
*
*
* @return K
* @throws UnsupportedOperationException
* if getFallback() not implemented
* @throws HystrixException
* if getFallback() fails (throws an Exception) or is rejected by the semaphore
*/
private Observable<R> getFallbackWithProtection() {
final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();

// acquire a permit
if (fallbackSemaphore.tryAcquire()) {
executionHook.onFallbackStart(this);
final AbstractCommand<R> _cmd = this;

Observable<R> fallback;
try {
fallback = getFallbackObservable();
} catch (Throwable t) {
// getFallback() is user provided and can throw so we catch it and turn it into Observable.error
fallback = Observable.error(t);
}

return fallback
.lift(new FallbackHookApplication(_cmd))
.lift(new DeprecatedOnFallbackHookApplication(_cmd))
.doOnTerminate(new Action0() {

@Override
public void call() {
fallbackSemaphore.release();
}

});
} else {
metrics.markFallbackRejection();

logger.debug("HystrixCommand Fallback Rejection."); // debug only since we're throwing the exception and someone higher will do something with it
// if we couldn't acquire a permit, we "fail fast" by throwing an exception
return Observable.error(new HystrixRuntimeException(FailureType.REJECTED_SEMAPHORE_FALLBACK, this.getClass(), getLogMessagePrefix() + " fallback execution rejected.", null, null));
}
}

/**
* @throws HystrixRuntimeException
* if getFallback() fails (throws an Exception) or is rejected by the semaphore
*/
private Observable<R> getFallbackOrThrowException(final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
Expand All @@ -731,7 +692,40 @@ private Observable<R> getFallbackOrThrowException(final HystrixEventType eventTy
executionResult = executionResult.addEvents(eventType);
final AbstractCommand<R> _cmd = this;

return getFallbackWithProtection().doOnNext(new Action1<R>() {
final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();

Observable<R> fallbackExecutionChain;

// acquire a permit
if (fallbackSemaphore.tryAcquire()) {
executionHook.onFallbackStart(this);

try {
fallbackExecutionChain = getFallbackObservable();
} catch (Throwable t) {
// getFallback() is user provided and can throw so we catch it and turn it into Observable.error
fallbackExecutionChain = Observable.error(t);
}

fallbackExecutionChain = fallbackExecutionChain
.lift(new FallbackHookApplication(_cmd))
.lift(new DeprecatedOnFallbackHookApplication(_cmd))
.doOnTerminate(new Action0() {

@Override
public void call() {
fallbackSemaphore.release();
}
});
} else {
metrics.markFallbackRejection();

logger.debug("HystrixCommand Fallback Rejection."); // debug only since we're throwing the exception and someone higher will do something with it
// if we couldn't acquire a permit, we "fail fast" by throwing an exception
return Observable.error(new HystrixRuntimeException(FailureType.REJECTED_SEMAPHORE_FALLBACK, this.getClass(), getLogMessagePrefix() + " fallback execution rejected.", null, null));
}

return fallbackExecutionChain.doOnNext(new Action1<R>() {
@Override
public void call(R r) {
if (shouldOutputOnNextEvents()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1009,13 +1009,13 @@ public void call(C command) {
*/
@Test
public void testExecutionHookResponseFromCache() {
getCommand(ExecutionIsolationStrategy.THREAD, ExecutionResult.SUCCESS, 0, FallbackResult.UNIMPLEMENTED, new HystrixCircuitBreakerTest.TestCircuitBreaker(), null, 100, CacheEnabled.YES, 42, 10).observe();
getCommand(ExecutionIsolationStrategy.THREAD, ExecutionResult.SUCCESS, 0, FallbackResult.UNIMPLEMENTED, 0, new HystrixCircuitBreakerTest.TestCircuitBreaker(), null, 100, CacheEnabled.YES, 42, 10, 10).observe();

assertHooksOnSuccess(
new Func0<C>() {
@Override
public C call() {
return getCommand(ExecutionIsolationStrategy.THREAD, ExecutionResult.SUCCESS, 0, FallbackResult.UNIMPLEMENTED, new HystrixCircuitBreakerTest.TestCircuitBreaker(), null, 100, CacheEnabled.YES, 42, 10);
return getCommand(ExecutionIsolationStrategy.THREAD, ExecutionResult.SUCCESS, 0, FallbackResult.UNIMPLEMENTED, 0, new HystrixCircuitBreakerTest.TestCircuitBreaker(), null, 100, CacheEnabled.YES, 42, 10, 10);
}
},
new Action1<C>() {
Expand Down Expand Up @@ -1452,50 +1452,61 @@ C getCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult execu
}

C getCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, int executionLatency, FallbackResult fallbackResult) {
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, new HystrixCircuitBreakerTest.TestCircuitBreaker(), null, (executionLatency * 2) + 100, CacheEnabled.NO, "foo", 10);
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, 0, new HystrixCircuitBreakerTest.TestCircuitBreaker(), null, (executionLatency * 2) + 100, CacheEnabled.NO, "foo", 10, 10);
}

C getCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, int executionLatency, FallbackResult fallbackResult, int timeout) {
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, new HystrixCircuitBreakerTest.TestCircuitBreaker(), null, timeout, CacheEnabled.NO, "foo", 10);
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, 0, new HystrixCircuitBreakerTest.TestCircuitBreaker(), null, timeout, CacheEnabled.NO, "foo", 10, 10);
}

C getCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, int executionLatency, FallbackResult fallbackResult, HystrixCircuitBreakerTest.TestCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, int timeout, CacheEnabled cacheEnabled, Object value, int semaphoreCount) {
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, circuitBreaker, threadPool, timeout, cacheEnabled, value, semaphoreCount, false);
C getCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, int executionLatency, FallbackResult fallbackResult, int fallbackLatency, HystrixCircuitBreakerTest.TestCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, int timeout, CacheEnabled cacheEnabled, Object value, int executionSemaphoreCount, int fallbackSemaphoreCount) {
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, fallbackLatency, circuitBreaker, threadPool, timeout, cacheEnabled, value, executionSemaphoreCount, fallbackSemaphoreCount, false);
}

C getCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, int executionLatency, FallbackResult fallbackResult, HystrixCircuitBreakerTest.TestCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, int timeout, CacheEnabled cacheEnabled, Object value, int semaphoreCount, boolean circuitBreakerDisabled) {
AbstractCommand.TryableSemaphoreActual semaphore = new AbstractCommand.TryableSemaphoreActual(HystrixProperty.Factory.asProperty(semaphoreCount));
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, circuitBreaker, threadPool, timeout, cacheEnabled, value, semaphore, circuitBreakerDisabled);
C getCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, int executionLatency, FallbackResult fallbackResult, int fallbackLatency, HystrixCircuitBreakerTest.TestCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, int timeout, CacheEnabled cacheEnabled, Object value, int executionSemaphoreCount, int fallbackSemaphoreCount, boolean circuitBreakerDisabled) {
AbstractCommand.TryableSemaphoreActual executionSemaphore = new AbstractCommand.TryableSemaphoreActual(HystrixProperty.Factory.asProperty(executionSemaphoreCount));
AbstractCommand.TryableSemaphoreActual fallbackSemaphore = new AbstractCommand.TryableSemaphoreActual(HystrixProperty.Factory.asProperty(fallbackSemaphoreCount));
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, fallbackLatency, circuitBreaker, threadPool, timeout, cacheEnabled, value, executionSemaphore, fallbackSemaphore, circuitBreakerDisabled);
}

abstract C getCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, int executionLatency, FallbackResult fallbackResult, HystrixCircuitBreakerTest.TestCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, int timeout, CacheEnabled cacheEnabled, Object value, AbstractCommand.TryableSemaphore semaphore, boolean circuitBreakerDisabled);
C getCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, int executionLatency, FallbackResult fallbackResult, int fallbackLatency, HystrixCircuitBreakerTest.TestCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, int timeout, CacheEnabled cacheEnabled, Object value, int executionSemaphoreCount, AbstractCommand.TryableSemaphore fallbackSemaphore, boolean circuitBreakerDisabled) {
AbstractCommand.TryableSemaphoreActual executionSemaphore = new AbstractCommand.TryableSemaphoreActual(HystrixProperty.Factory.asProperty(executionSemaphoreCount));
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, fallbackLatency, circuitBreaker, threadPool, timeout, cacheEnabled, value, executionSemaphore, fallbackSemaphore, circuitBreakerDisabled);
}

abstract C getCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, int executionLatency, FallbackResult fallbackResult, int fallbackLatency, HystrixCircuitBreakerTest.TestCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, int timeout, CacheEnabled cacheEnabled, Object value, AbstractCommand.TryableSemaphore executionSemaphore, AbstractCommand.TryableSemaphore fallbackSemaphore, boolean circuitBreakerDisabled);

C getLatentCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, int executionLatency, FallbackResult fallbackResult, int timeout) {
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, new HystrixCircuitBreakerTest.TestCircuitBreaker(), null, timeout, CacheEnabled.NO, "foo", 10);
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, 0, new HystrixCircuitBreakerTest.TestCircuitBreaker(), null, timeout, CacheEnabled.NO, "foo", 10, 10);
}

C getLatentCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, int executionLatency, FallbackResult fallbackResult, HystrixCircuitBreakerTest.TestCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, int timeout) {
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, circuitBreaker, threadPool, timeout, CacheEnabled.NO, "foo", 10);
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, 0, circuitBreaker, threadPool, timeout, CacheEnabled.NO, "foo", 10, 10);
}

C getLatentCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, int executionLatency, FallbackResult fallbackResult, AbstractCommand.TryableSemaphore executionSemaphore) {
AbstractCommand.TryableSemaphoreActual fallbackSemaphore = new AbstractCommand.TryableSemaphoreActual(HystrixProperty.Factory.asProperty(10));
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, 0, new HystrixCircuitBreakerTest.TestCircuitBreaker(), null, (executionLatency * 2) + 100, CacheEnabled.NO, "foo", executionSemaphore, fallbackSemaphore, false);
}

C getLatentCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, int executionLatency, FallbackResult fallbackResult, AbstractCommand.TryableSemaphore semaphore) {
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, new HystrixCircuitBreakerTest.TestCircuitBreaker(), null, (executionLatency * 2) + 100, CacheEnabled.NO, "foo", semaphore, false);
C getFallbackLatentCommand(ExecutionIsolationStrategy isolationStrategy, FallbackResult fallbackResult, int fallbackLatency, HystrixCircuitBreakerTest.TestCircuitBreaker circuitBreaker, AbstractCommand.TryableSemaphore fallbackSemaphore) {
return getCommand(isolationStrategy, ExecutionResult.FAILURE, 0, fallbackResult, fallbackLatency, circuitBreaker, null, 100, CacheEnabled.NO, "foo", 10, fallbackSemaphore, false);
}

C getCircuitOpenCommand(ExecutionIsolationStrategy isolationStrategy, FallbackResult fallbackResult) {
HystrixCircuitBreakerTest.TestCircuitBreaker openCircuit = new HystrixCircuitBreakerTest.TestCircuitBreaker().setForceShortCircuit(true);
return getCommand(isolationStrategy, ExecutionResult.SUCCESS, 0, fallbackResult, openCircuit, null, 100, CacheEnabled.NO, "foo", 10, false);
return getCommand(isolationStrategy, ExecutionResult.SUCCESS, 0, fallbackResult, 0, openCircuit, null, 100, CacheEnabled.NO, "foo", 10, 10, false);
}

C getSharedCircuitBreakerCommand(ExecutionIsolationStrategy isolationStrategy, HystrixCircuitBreakerTest.TestCircuitBreaker circuitBreaker) {
return getCommand(isolationStrategy, ExecutionResult.FAILURE, 0, FallbackResult.SUCCESS, circuitBreaker, null, 100, CacheEnabled.NO, "foo", 10);
return getCommand(isolationStrategy, ExecutionResult.FAILURE, 0, FallbackResult.SUCCESS, 0, circuitBreaker, null, 100, CacheEnabled.NO, "foo", 10, 10);
}

C getSharedCircuitBreakerCommand(ExecutionIsolationStrategy isolationStrategy, FallbackResult fallbackResult, HystrixCircuitBreakerTest.TestCircuitBreaker circuitBreaker) {
return getCommand(isolationStrategy, ExecutionResult.FAILURE, 0, fallbackResult, circuitBreaker, null, 100, CacheEnabled.NO, "foo", 10);
return getCommand(isolationStrategy, ExecutionResult.FAILURE, 0, fallbackResult, 0, circuitBreaker, null, 100, CacheEnabled.NO, "foo", 10, 10);
}

C getCircuitBreakerDisabledCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult) {
return getCommand(isolationStrategy, executionResult, 0, FallbackResult.UNIMPLEMENTED, new HystrixCircuitBreakerTest.TestCircuitBreaker(), null, 100, CacheEnabled.NO, "foo", 10, true);
return getCommand(isolationStrategy, executionResult, 0, FallbackResult.UNIMPLEMENTED, 0, new HystrixCircuitBreakerTest.TestCircuitBreaker(), null, 100, CacheEnabled.NO, "foo", 10, 10, true);
}
}
Loading