Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Version 1.3 - RxJava Observable Integration #151

Merged
merged 23 commits into from
Jul 26, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
0fef200
Eliminate duplicate logic between execute and queue
benjchristensen May 5, 2013
c51bd7e
Support Asynchronous Callbacks with RxJava Integration
benjchristensen May 9, 2013
e7568b6
Upgrade to RxJava 0.8.4
benjchristensen May 13, 2013
6649237
Ordering of log now records in order of execution, not completion
benjchristensen May 13, 2013
aa3701b
Fix non-deterministic test
benjchristensen May 13, 2013
bb5e84e
Upgrade to RxJava 0.9.0
benjchristensen May 17, 2013
3df7d89
Merge branch 'observe-rxjava' of https://github.com/benjchristensen/H…
neerajrj Jun 11, 2013
52104a8
Fix timeout issue when start time has not been set by the winning thr…
neerajrj Jun 11, 2013
e8f1e16
Fix bugs related to toObservable behavior with RequestCache disabled.
benjchristensen Jun 29, 2013
dccd322
add a clarifying comment
benjchristensen Jul 4, 2013
32bd26e
Upgrade HystrixCollapser to use RxJava and execute reactively
benjchristensen Jul 4, 2013
05cb229
Javadocs for observe and toObservable
benjchristensen Jul 4, 2013
34c3e5c
Concurrency fixes for race condition occurring when RequestCollapser …
benjchristensen Jul 5, 2013
0b47055
Merge remote-tracking branch 'upstream/master' into version_1_3-rxjava
benjchristensen Jul 5, 2013
a1732c2
Refactor HystrixCollapser into multiple implementation classes to sim…
benjchristensen Jul 23, 2013
7b103b8
Fix concurrency bug in RequestBatch for HystrixCollapser
benjchristensen Jul 23, 2013
5c6599e
Reduce logging
benjchristensen Jul 24, 2013
490c10e
Assert that cancelling/unsubscribing a single collapsed request doesn…
benjchristensen Jul 24, 2013
da69311
Remove non-applicable unit tests
benjchristensen Jul 24, 2013
1e7f5e0
Merge remote-tracking branch 'upstream/master' into version_1_3-rxjava
benjchristensen Jul 24, 2013
d244689
IllegalStateException on duplicate invocation.
benjchristensen Jul 24, 2013
45a6420
Improve exception javadoc for observer/toObservable methods.
benjchristensen Jul 24, 2013
2ae4493
HystrixExecutable and Javadocs for observe()
benjchristensen Jul 26, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=1.2.19-SNAPSHOT
version=1.3.0-SNAPSHOT
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=http\://services.gradle.org/distributions/gradle-1.1-bin.zip
distributionUrl=http\://services.gradle.org/distributions/gradle-1.3-bin.zip
1 change: 1 addition & 0 deletions hystrix-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ apply plugin: 'idea'

dependencies {
compile 'com.netflix.archaius:archaius-core:0.4.1'
compile 'com.netflix.rxjava:rxjava-core:0.9.0'
compile 'org.slf4j:slf4j-api:1.7.0'
compile 'com.google.code.findbugs:jsr305:2.0.0'
provided 'junit:junit-dep:4.10'
Expand Down
1,078 changes: 211 additions & 867 deletions hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java

Large diffs are not rendered by default.

2,008 changes: 1,255 additions & 753 deletions hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

import java.util.concurrent.Future;

import rx.Observable;
import rx.concurrency.Schedulers;

import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
import com.netflix.hystrix.exception.HystrixBadRequestException;
import com.netflix.hystrix.exception.HystrixRuntimeException;

Expand Down Expand Up @@ -56,4 +60,35 @@ public interface HystrixExecutable<R> {
*/
public Future<R> queue();

/**
* Used for asynchronous execution of command with a callback by subscribing to the {@link Observable}.
* <p>
* This eagerly starts execution of the command the same as {@link #queue()} and {@link #execute()}.
* A lazy {@link Observable} can be obtained from {@link HystrixCommand#toObservable()} or {@link HystrixCollapser#toObservable()}.
* <p>
* <b>Callback Scheduling</b>
* <p>
* <ul>
* <li>When using {@link ExecutionIsolationStrategy#THREAD} this defaults to using {@link Schedulers#threadPoolForComputation()} for callbacks.</li>
* <li>When using {@link ExecutionIsolationStrategy#SEMAPHORE} this defaults to using {@link Schedulers#immediate()} for callbacks.</li>
* </ul>
* Use {@link HystrixCommand#toObservable(rx.Scheduler)} or {@link HystrixCollapser#toObservable(rx.Scheduler)} to schedule the callback differently.
* <p>
* See https://github.com/Netflix/RxJava/wiki for more information.
*
* @return {@code Observable<R>} that executes and calls back with the result of {@link #run()} execution or a fallback from {@link #getFallback()} if the command fails for any reason.
* @throws HystrixRuntimeException
* if a fallback does not exist
* <p>
* <ul>
* <li>via {@code Observer#onError} if a failure occurs</li>
* <li>or immediately if the command can not be queued (such as short-circuited, thread-pool/semaphore rejected)</li>
* </ul>
* @throws HystrixBadRequestException
* via {@code Observer#onError} if invalid arguments or state were used representing a user failure, not a system failure
* @throws IllegalStateException
* if invoked more than once
*/
public Observable<R> observe();

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@
import static org.junit.Assert.*;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func1;

import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategyDefault;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
Expand Down Expand Up @@ -54,15 +56,15 @@ public class HystrixRequestCache {
* <p>
* Key => CommandPrefix + CacheKey : Future<?> from queue()
*/
private static final HystrixRequestVariableHolder<ConcurrentHashMap<ValueCacheKey, Future<?>>> requestVariableForCache = new HystrixRequestVariableHolder<ConcurrentHashMap<ValueCacheKey, Future<?>>>(new HystrixRequestVariableLifecycle<ConcurrentHashMap<ValueCacheKey, Future<?>>>() {
private static final HystrixRequestVariableHolder<ConcurrentHashMap<ValueCacheKey, Observable<?>>> requestVariableForCache = new HystrixRequestVariableHolder<ConcurrentHashMap<ValueCacheKey, Observable<?>>>(new HystrixRequestVariableLifecycle<ConcurrentHashMap<ValueCacheKey, Observable<?>>>() {

@Override
public ConcurrentHashMap<ValueCacheKey, Future<?>> initialValue() {
return new ConcurrentHashMap<ValueCacheKey, Future<?>>();
public ConcurrentHashMap<ValueCacheKey, Observable<?>> initialValue() {
return new ConcurrentHashMap<ValueCacheKey, Observable<?>>();
}

@Override
public void shutdown(ConcurrentHashMap<ValueCacheKey, Future<?>> value) {
public void shutdown(ConcurrentHashMap<ValueCacheKey, Observable<?>> value) {
// nothing to shutdown
};

Expand Down Expand Up @@ -104,11 +106,11 @@ private static HystrixRequestCache getInstance(RequestCacheKey rcKey, HystrixCon
*/
// suppressing warnings because we are using a raw Future since it's in a heterogeneous ConcurrentHashMap cache
@SuppressWarnings({ "unchecked" })
public <T> Future<T> get(String cacheKey) {
/* package */<T> Observable<T> get(String cacheKey) {
ValueCacheKey key = getRequestCacheKey(cacheKey);
if (key != null) {
/* look for the stored value */
return (Future<T>) requestVariableForCache.get(concurrencyStrategy).get(key);
return (Observable<T>) requestVariableForCache.get(concurrencyStrategy).get(key);
}
return null;
}
Expand All @@ -127,11 +129,11 @@ public <T> Future<T> get(String cacheKey) {
*/
// suppressing warnings because we are using a raw Future since it's in a heterogeneous ConcurrentHashMap cache
@SuppressWarnings({ "unchecked" })
public <T> Future<T> putIfAbsent(String cacheKey, Future<T> f) {
/* package */<T> Observable<T> putIfAbsent(String cacheKey, Observable<T> f) {
ValueCacheKey key = getRequestCacheKey(cacheKey);
if (key != null) {
/* look for the stored value */
Future<T> alreadySet = (Future<T>) requestVariableForCache.get(concurrencyStrategy).putIfAbsent(key, f);
Observable<T> alreadySet = (Observable<T>) requestVariableForCache.get(concurrencyStrategy).putIfAbsent(key, f);
if (alreadySet != null) {
// someone beat us so we didn't cache this
return alreadySet;
Expand Down Expand Up @@ -282,17 +284,17 @@ public void testCache() {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
HystrixRequestCache cache1 = HystrixRequestCache.getInstance(HystrixCommandKey.Factory.asKey("command1"), strategy);
cache1.putIfAbsent("valueA", new TestFuture("a1"));
cache1.putIfAbsent("valueA", new TestFuture("a2"));
cache1.putIfAbsent("valueB", new TestFuture("b1"));
cache1.putIfAbsent("valueA", new TestObservable("a1"));
cache1.putIfAbsent("valueA", new TestObservable("a2"));
cache1.putIfAbsent("valueB", new TestObservable("b1"));

HystrixRequestCache cache2 = HystrixRequestCache.getInstance(HystrixCommandKey.Factory.asKey("command2"), strategy);
cache2.putIfAbsent("valueA", new TestFuture("a3"));
cache2.putIfAbsent("valueA", new TestObservable("a3"));

assertEquals("a1", cache1.get("valueA").get());
assertEquals("b1", cache1.get("valueB").get());
assertEquals("a1", cache1.get("valueA").toBlockingObservable().last());
assertEquals("b1", cache1.get("valueB").toBlockingObservable().last());

assertEquals("a3", cache2.get("valueA").get());
assertEquals("a3", cache2.get("valueA").toBlockingObservable().last());
assertNull(cache2.get("valueB"));
} catch (Exception e) {
fail("Exception: " + e.getMessage());
Expand All @@ -318,8 +320,8 @@ public void testClearCache() {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
HystrixRequestCache cache1 = HystrixRequestCache.getInstance(HystrixCommandKey.Factory.asKey("command1"), strategy);
cache1.putIfAbsent("valueA", new TestFuture("a1"));
assertEquals("a1", cache1.get("valueA").get());
cache1.putIfAbsent("valueA", new TestObservable("a1"));
assertEquals("a1", cache1.get("valueA").toBlockingObservable().last());
cache1.clear("valueA");
assertNull(cache1.get("valueA"));
} catch (Exception e) {
Expand All @@ -330,39 +332,19 @@ public void testClearCache() {
}
}

private static class TestFuture implements Future<String> {

private final String value;

public TestFuture(String value) {
this.value = value;
}
private static class TestObservable extends Observable<String> {
public TestObservable(final String value) {
super(new Func1<Observer<String>, Subscription>() {

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public Subscription call(Observer<String> observer) {
observer.onNext(value);
observer.onCompleted();
return Subscriptions.empty();
}

@Override
public boolean isCancelled() {
return false;
});
}

@Override
public boolean isDone() {
return false;
}

@Override
public String get() throws InterruptedException, ExecutionException {
return value;
}

@Override
public String get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return value;
}

}

}
Expand Down
Loading