Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for Semaphore vs Thread Isolation Bug #238

Merged
merged 2 commits into from
Apr 1, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ protected TryableSemaphore getFallbackSemaphore() {
TryableSemaphore _s = fallbackSemaphorePerCircuit.get(commandKey.name());
if (_s == null) {
// we didn't find one cache so setup
fallbackSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphore(properties.fallbackIsolationSemaphoreMaxConcurrentRequests()));
fallbackSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.fallbackIsolationSemaphoreMaxConcurrentRequests()));
// assign whatever got set (this or another thread)
return fallbackSemaphorePerCircuit.get(commandKey.name());
} else {
Expand All @@ -612,18 +612,23 @@ protected TryableSemaphore getFallbackSemaphore() {
* @return TryableSemaphore
*/
protected TryableSemaphore getExecutionSemaphore() {
if (executionSemaphoreOverride == null) {
TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());
if (_s == null) {
// we didn't find one cache so setup
executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphore(properties.executionIsolationSemaphoreMaxConcurrentRequests()));
// assign whatever got set (this or another thread)
return executionSemaphorePerCircuit.get(commandKey.name());
if (properties.executionIsolationStrategy().get().equals(ExecutionIsolationStrategy.SEMAPHORE)) {
if (executionSemaphoreOverride == null) {
TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());
if (_s == null) {
// we didn't find one cache so setup
executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests()));
// assign whatever got set (this or another thread)
return executionSemaphorePerCircuit.get(commandKey.name());
} else {
return _s;
}
} else {
return _s;
return executionSemaphoreOverride;
}
} else {
return executionSemaphoreOverride;
// return NoOp implementation since we're not using SEMAPHORE isolation
return TryableSemaphoreNoOp.DEFAULT;
}
}

Expand Down Expand Up @@ -859,14 +864,60 @@ protected RuntimeException decomposeException(Exception e) {
* Using AtomicInteger increment/decrement instead of java.util.concurrent.Semaphore since we don't need blocking and need a custom implementation to get the dynamic permit count and since
* AtomicInteger achieves the same behavior and performance without the more complex implementation of the actual Semaphore class using AbstractQueueSynchronizer.
*/
/* package */static class TryableSemaphore {
/* package */static class TryableSemaphoreActual implements TryableSemaphore {
protected final HystrixProperty<Integer> numberOfPermits;
private final AtomicInteger count = new AtomicInteger(0);

public TryableSemaphore(HystrixProperty<Integer> numberOfPermits) {
public TryableSemaphoreActual(HystrixProperty<Integer> numberOfPermits) {
this.numberOfPermits = numberOfPermits;
}

@Override
public boolean tryAcquire() {
int currentCount = count.incrementAndGet();
if (currentCount > numberOfPermits.get()) {
count.decrementAndGet();
return false;
} else {
return true;
}
}

@Override
public void release() {
count.decrementAndGet();
}

@Override
public int getNumberOfPermitsUsed() {
return count.get();
}

}

/* package */static class TryableSemaphoreNoOp implements TryableSemaphore {

public static final TryableSemaphore DEFAULT = new TryableSemaphoreNoOp();

@Override
public boolean tryAcquire() {
return true;
}

@Override
public void release() {

}

@Override
public int getNumberOfPermitsUsed() {
return 0;
}

}

/* package */static interface TryableSemaphore {

/**
* Use like this:
* <p>
Expand All @@ -883,15 +934,7 @@ public TryableSemaphore(HystrixProperty<Integer> numberOfPermits) {
*
* @return boolean
*/
public boolean tryAcquire() {
int currentCount = count.incrementAndGet();
if (currentCount > numberOfPermits.get()) {
count.decrementAndGet();
return false;
} else {
return true;
}
}
public abstract boolean tryAcquire();

/**
* ONLY call release if tryAcquire returned true.
Expand All @@ -907,13 +950,9 @@ public boolean tryAcquire() {
* }
* </pre>
*/
public void release() {
count.decrementAndGet();
}
public abstract void release();

public int getNumberOfPermitsUsed() {
return count.get();
}
public abstract int getNumberOfPermitsUsed();

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,7 @@ protected Observable<R> getFallback() {
/**
* A lazy {@link Observable} that will execute the command when subscribed to.
* <p>
* <b>Callback Scheduling</b>
* <p>
* <ul>
* <li>When using {@link ExecutionIsolationStrategy#THREAD} this defaults to using {@link Schedulers#threadPoolForComputation()} for callbacks.</li>
* <li>When using {@link ExecutionIsolationStrategy#SEMAPHORE} this defaults to using {@link Schedulers#immediate()} for callbacks.</li>
* <b>This defaults to using {@link Schedulers#immediate()} for callbacks.</li>
* </ul>
* <p>
* See https://github.com/Netflix/RxJava/wiki for more information.
Expand All @@ -263,7 +259,7 @@ protected Observable<R> getFallback() {
* if invoked more than once
*/
public Observable<R> toObservable() {
return toObservable(Schedulers.computation());
return toObservable(Schedulers.immediate());
}

protected ObservableCommand<R> toObservable(final Scheduler observeOn, boolean performAsyncTimeout) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package com.netflix.hystrix;

import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -31,6 +37,7 @@
import com.netflix.hystrix.HystrixCircuitBreakerTest.TestCircuitBreaker;
import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
import com.netflix.hystrix.HystrixExecutableBase.TryableSemaphore;
import com.netflix.hystrix.HystrixExecutableBase.TryableSemaphoreActual;
import com.netflix.hystrix.exception.HystrixBadRequestException;
import com.netflix.hystrix.exception.HystrixRuntimeException;
import com.netflix.hystrix.exception.HystrixRuntimeException.FailureType;
Expand Down Expand Up @@ -1908,7 +1915,7 @@ public void testExecutionSemaphoreWithQueue() {
final AtomicBoolean exceptionReceived = new AtomicBoolean();

final TryableSemaphore semaphore =
new TryableSemaphore(HystrixProperty.Factory.asProperty(1));
new TryableSemaphoreActual(HystrixProperty.Factory.asProperty(1));

Runnable r = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() {

Expand Down Expand Up @@ -1980,7 +1987,7 @@ public void testExecutionSemaphoreWithExecution() {
final AtomicBoolean exceptionReceived = new AtomicBoolean();

final TryableSemaphore semaphore =
new TryableSemaphore(HystrixProperty.Factory.asProperty(1));
new TryableSemaphoreActual(HystrixProperty.Factory.asProperty(1));

Runnable r = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() {

Expand Down Expand Up @@ -2109,8 +2116,8 @@ public void testSemaphorePermitsInUse() {
final TestCircuitBreaker circuitBreaker = new TestCircuitBreaker();

// this semaphore will be shared across multiple command instances
final TryableSemaphore sharedSemaphore =
new TryableSemaphore(HystrixProperty.Factory.asProperty(3));
final TryableSemaphoreActual sharedSemaphore =
new TryableSemaphoreActual(HystrixProperty.Factory.asProperty(3));

// used to wait until all commands have started
final CountDownLatch startLatch = new CountDownLatch(sharedSemaphore.numberOfPermits.get() + 1);
Expand Down Expand Up @@ -2138,8 +2145,8 @@ public void run() {
}

// creates thread using isolated semaphore
final TryableSemaphore isolatedSemaphore =
new TryableSemaphore(HystrixProperty.Factory.asProperty(1));
final TryableSemaphoreActual isolatedSemaphore =
new TryableSemaphoreActual(HystrixProperty.Factory.asProperty(1));

final CountDownLatch isolatedLatch = new CountDownLatch(1);

Expand Down Expand Up @@ -4112,8 +4119,8 @@ public void testExecutionHookSuccessfulCommandWithSemaphoreIsolation() {
@Test
public void testExecutionHookFailureWithSemaphoreIsolation() {
/* test with execute() */
final TryableSemaphore semaphore =
new TryableSemaphore(HystrixProperty.Factory.asProperty(0));
final TryableSemaphoreActual semaphore =
new TryableSemaphoreActual(HystrixProperty.Factory.asProperty(0));

TestSemaphoreCommand command = new TestSemaphoreCommand(new TestCircuitBreaker(), semaphore, 200);
try {
Expand Down
Loading