Skip to content

Commit

Permalink
Merge pull request #112 from benjchristensen/issue-80
Browse files Browse the repository at this point in the history
More work on HystrixCollapser response not received bug
  • Loading branch information
benjchristensen committed Feb 23, 2013
2 parents 91244ed + 6df70a1 commit 9999764
Showing 1 changed file with 129 additions and 20 deletions.
149 changes: 129 additions & 20 deletions hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -341,6 +342,15 @@ public Future<ResponseType> queue() {
}
Future<ResponseType> response = collapser.submitRequest(getRequestArgument());
if (properties.requestCachingEnabled().get()) {
/*
* A race can occur here with multiple threads queuing but only one will be cached.
* This means we can have some duplication of requests in a thread-race but we're okay
* with having some inefficiency in duplicate requests in the same batch
* and then subsequent requests will retrieve a previously cached Future.
*
* If this is an issue we can make a lazy-future that gets set in the cache
* then only the winning 'put' will be invoked to actually call 'submitRequest'
*/
requestCache.putIfAbsent(getCacheKey(), response);
}
return response;
Expand Down Expand Up @@ -676,9 +686,7 @@ private class BatchFutureWrapper implements Future<BatchReturnType> {
private final Future<BatchReturnType> actualFuture;
private final HystrixCollapser<BatchReturnType, ResponseType, RequestArgumentType> command;
private final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests;
private Lock mapResponseToRequestsLock = new ReentrantLock();
@GuardedBy("mapResponseToRequestsLock")
private volatile boolean mapResponseToRequestsPerformed = false;
private AtomicBoolean mapResponseWork = new AtomicBoolean(false);

private BatchFutureWrapper(Future<BatchReturnType> actualFuture, HystrixCollapser<BatchReturnType, ResponseType, RequestArgumentType> command, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
this.actualFuture = actualFuture;
Expand All @@ -687,6 +695,20 @@ private BatchFutureWrapper(Future<BatchReturnType> actualFuture, HystrixCollapse
}

public boolean cancel(boolean mayInterruptIfRunning) {
logger.warn("Cancelling BatchFuture so setting Exception on all collapsed requests.");

RuntimeException e = new RuntimeException("BatchFuture cancelled.");

for (CollapsedRequest<ResponseType, RequestArgumentType> request : requests) {
try {
request.setException(e);
} catch (IllegalStateException e2) {
// if we have partial responses set in mapResponseToRequests
// then we may get IllegalStateException as we loop over them
// so we'll log but continue to the rest
logger.warn("Partial success of 'mapResponseToRequests' resulted in IllegalStateException while setting Exception during BatchFuture cancellation.. Continuing ... ", e2);
}
}
return actualFuture.cancel(mayInterruptIfRunning);
}

Expand All @@ -699,26 +721,41 @@ public boolean isDone() {
}

public BatchReturnType get() throws InterruptedException, ExecutionException {
/* make one of the calling thread to this work using tryLock which allows 1 thread in and all the rest will proceed to actualFuture.get() */
if (!mapResponseToRequestsPerformed && mapResponseToRequestsLock.tryLock()) {
/* only one thread should do this and all the rest will proceed to actualFuture.get() */
if (mapResponseWork.compareAndSet(false, true)) {
try {
if (!mapResponseToRequestsPerformed) {
/* we only want one thread to execute the above code */
command.mapResponseToRequests(actualFuture.get(), requests);
} catch (Exception e) {
logger.error("Exception mapping responses to requests.", e);
// if a failure occurs we want to pass that exception to all of the Futures that we've returned
for (CollapsedRequest<ResponseType, RequestArgumentType> request : requests) {
try {
/* we only want one thread to execute the above code */
command.mapResponseToRequests(actualFuture.get(), requests);
} catch (Exception e) {
logger.error("Exception mapping responses to requests.", e);
// if a failure occurs we want to pass that exception to all of the Futures that we've returned
for (CollapsedRequest<ResponseType, RequestArgumentType> request : requests) {
request.setException(e);
}
request.setException(e);
} catch (IllegalStateException e2) {
// if we have partial responses set in mapResponseToRequests
// then we may get IllegalStateException as we loop over them
// so we'll log but continue to the rest
logger.warn("Partial success of 'mapResponseToRequests' resulted in IllegalStateException while setting Exception. Continuing ... ", e2);
}
mapResponseToRequestsPerformed = true;
}
} finally {
mapResponseToRequestsLock.unlock();
}

// check that all requests had setResponse or setException invoked in case 'mapResponseToRequests' was implemented poorly
for (CollapsedRequest<ResponseType, RequestArgumentType> request : requests) {
try {
if (((CollapsedRequestFutureImpl<ResponseType, RequestArgumentType>) request).responseReference.get() == null) {
request.setException(new NullPointerException("No response set."));
}
} catch (IllegalStateException e2) {
logger.warn("Partial success of 'mapResponseToRequests' resulted in IllegalStateException while setting 'No response set' Exception. Continuing ... ", e2);
}
}
}

// TODO this is a thread-race and will release BEFORE we call request.setException/request.setResponse
// but shouldn't matter since 'responseReceived' will make the thread wait in CollapsedRequestFutureImpl.
// Does this need to block here until the code above is completed?
return actualFuture.get();
}

Expand Down Expand Up @@ -901,8 +938,12 @@ public T get(long timeout, TimeUnit unit) throws InterruptedException, Execution
responseReceived.await(timeout, unit);

if (responseReference.get() == null) {
logger.error("TimedOut waiting on responseReceived: " + responseReceived.getCount() + " batchReceived: " + batchReceived.getCount() + " batchFuture: " + batchFuture);
throw new ExecutionException("No response or exception set before returning from Future.get", new NullPointerException());
if(batchFuture == null) {
logger.error("TimedOut waiting on responseReference: " + responseReceived.getCount() + " batchReceived: " + batchReceived.getCount() + " batchFuture: " + batchFuture + " batchFuture.isDone: NULL batchFuture.isCancelled: NULL argument: " + argument);
} else {
logger.error("TimedOut waiting on responseReference: " + responseReceived.getCount() + " batchReceived: " + batchReceived.getCount() + " batchFuture: " + batchFuture + " batchFuture.isDone: " + batchFuture.isDone() + " batchFuture.isCancelled: " + batchFuture.isCancelled() + " argument: " + argument);
}
throw new ExecutionException("No response or exception set before returning from Future.get", new NullPointerException("ResponseReference is NULL"));
} else {
// we got past here so let's return the response now
if (responseReference.get().getException() != null) {
Expand Down Expand Up @@ -985,7 +1026,7 @@ protected String getCacheKey() {
/**
* Clears all state. If new requests come in instances will be recreated and metrics started from scratch.
*/
/* package */ static void reset() {
/* package */static void reset() {
defaultNameCache.clear();
globalScopedCollapsers.clear();
requestScopedCollapsers.clear();
Expand Down Expand Up @@ -1762,6 +1803,36 @@ public void testRequestCacheWithTimeout() {
assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size());
}

/**
* Test how the collapser behaves when the circuit is short-circuited
*/
@Test
public void testRequestWithCommandShortCircuited() throws Exception {
TestCollapserTimer timer = new TestCollapserTimer();
Future<String> response1 = new TestRequestCollapserWithShortCircuitedCommand(timer, counter, "1").queue();
Future<String> response2 = new TestRequestCollapserWithShortCircuitedCommand(timer, counter, "2").queue();
timer.incrementTime(10); // let time pass that equals the default delay/period

try {
response1.get();
fail("we should have received an exception");
} catch (ExecutionException e) {
// e.printStackTrace();
// what we expect
}
try {
response2.get();
fail("we should have received an exception");
} catch (ExecutionException e) {
// e.printStackTrace();
// what we expect
}

assertEquals(0, counter.get());
// it will execute once (short-circuited)
assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size());
}

private static class TestRequestCollapser extends HystrixCollapser<List<String>, String, String> {

private final AtomicInteger count;
Expand Down Expand Up @@ -1900,6 +1971,23 @@ public HystrixCommand<List<String>> createCommand(Collection<com.netflix.hystrix

}

/**
* Throw an exception when creating a command.
*/
private static class TestRequestCollapserWithShortCircuitedCommand extends TestRequestCollapser {

public TestRequestCollapserWithShortCircuitedCommand(TestCollapserTimer timer, AtomicInteger counter, String value) {
super(timer, counter, value);
}

@Override
public HystrixCommand<List<String>> createCommand(Collection<com.netflix.hystrix.HystrixCollapser.CollapsedRequest<String, String>> requests) {
// args don't matter as it's short-circuited
return new ShortCircuitedCommand();
}

}

/**
* Throw an exception when mapToResponse is invoked
*/
Expand Down Expand Up @@ -1974,6 +2062,27 @@ public String getCacheKey() {
}
}

private static class ShortCircuitedCommand extends HystrixCommand<List<String>> {

protected ShortCircuitedCommand() {
super(HystrixCommand.Setter.withGroupKey(
HystrixCommandGroupKey.Factory.asKey("shortCircuitedCommand"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter
.getUnitTestPropertiesSetter()
.withCircuitBreakerForceOpen(true)));
}

@Override
protected List<String> run() throws Exception {
System.out.println("*** execution (this shouldn't happen)");
// this won't ever get called as we're forcing short-circuiting
ArrayList<String> values = new ArrayList<String>();
values.add("hello");
return values;
}

}

private static class TestCollapserTimer implements CollapserTimer {

private final ConcurrentLinkedQueue<ATask> tasks = new ConcurrentLinkedQueue<ATask>();
Expand Down

0 comments on commit 9999764

Please sign in to comment.