Skip to content

Commit

Permalink
Refactor for Performance
Browse files Browse the repository at this point in the history
- previous commit got non-blocking working but perf tests showed it slow
- this commit retains non-blocking but surpasses master branch performance

Master branch: 11,947,459 ops/sec
This commit: 16,151,174 ops/sec
  • Loading branch information
benjchristensen committed Dec 22, 2013
1 parent dba5de9 commit a144b0e
Show file tree
Hide file tree
Showing 6 changed files with 292 additions and 96 deletions.
17 changes: 9 additions & 8 deletions rxjava-core/src/main/java/rx/subjects/AsyncSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import rx.Notification;
import rx.Observer;
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
import rx.util.functions.Action1;

/**
Expand Down Expand Up @@ -63,20 +64,20 @@ public static <T> AsyncSubject<T> create() {
*
* This will always run, even if Subject is in terminal state.
*/
new Action1<Observer<? super T>>() {
new Action1<SubjectObserver<? super T>>() {

@Override
public void call(Observer<? super T> o) {
public void call(SubjectObserver<? super T> o) {
// nothing to do if not terminated
}
},
/**
* This function executes if the Subject is terminated.
*/
new Action1<Observer<? super T>>() {
new Action1<SubjectObserver<? super T>>() {

@Override
public void call(Observer<? super T> o) {
public void call(SubjectObserver<? super T> o) {
// we want the last value + completed so add this extra logic
// to send onCompleted if the last value is an onNext
emitValueToObserver(lastNotification.get(), o);
Expand Down Expand Up @@ -104,10 +105,10 @@ protected AsyncSubject(OnSubscribeFunc<T> onSubscribe, SubjectSubscriptionManage

@Override
public void onCompleted() {
subscriptionManager.terminate(new Action1<Collection<Observer<? super T>>>() {
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {

@Override
public void call(Collection<Observer<? super T>> observers) {
public void call(Collection<SubjectObserver<? super T>> observers) {
for (Observer<? super T> o : observers) {
emitValueToObserver(lastNotification.get(), o);
}
Expand All @@ -117,10 +118,10 @@ public void call(Collection<Observer<? super T>> observers) {

@Override
public void onError(final Throwable e) {
subscriptionManager.terminate(new Action1<Collection<Observer<? super T>>>() {
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {

@Override
public void call(Collection<Observer<? super T>> observers) {
public void call(Collection<SubjectObserver<? super T>> observers) {
lastNotification.set(new Notification<T>(e));
for (Observer<? super T> o : observers) {
emitValueToObserver(lastNotification.get(), o);
Expand Down
17 changes: 9 additions & 8 deletions rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import rx.Notification;
import rx.Observer;
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
import rx.util.functions.Action1;

/**
Expand Down Expand Up @@ -95,10 +96,10 @@ public static <T> BehaviorSubject<T> create(T defaultValue) {
*
* This will always run, even if Subject is in terminal state.
*/
new Action1<Observer<? super T>>() {
new Action1<SubjectObserver<? super T>>() {

@Override
public void call(Observer<? super T> o) {
public void call(SubjectObserver<? super T> o) {
/*
* When we subscribe we always emit the latest value to the observer.
*
Expand All @@ -113,10 +114,10 @@ public void call(Observer<? super T> o) {
/**
* This function executes if the Subject is terminated before subscription occurs.
*/
new Action1<Observer<? super T>>() {
new Action1<SubjectObserver<? super T>>() {

@Override
public void call(Observer<? super T> o) {
public void call(SubjectObserver<? super T> o) {
/*
* If we are already terminated, or termination happens while trying to subscribe
* this will be invoked and we emit whatever the last terminal value was.
Expand All @@ -139,10 +140,10 @@ protected BehaviorSubject(OnSubscribeFunc<T> onSubscribe, SubjectSubscriptionMan

@Override
public void onCompleted() {
subscriptionManager.terminate(new Action1<Collection<Observer<? super T>>>() {
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {

@Override
public void call(Collection<Observer<? super T>> observers) {
public void call(Collection<SubjectObserver<? super T>> observers) {
lastNotification.set(new Notification<T>());
for (Observer<? super T> o : observers) {
o.onCompleted();
Expand All @@ -153,10 +154,10 @@ public void call(Collection<Observer<? super T>> observers) {

@Override
public void onError(final Throwable e) {
subscriptionManager.terminate(new Action1<Collection<Observer<? super T>>>() {
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {

@Override
public void call(Collection<Observer<? super T>> observers) {
public void call(Collection<SubjectObserver<? super T>> observers) {
lastNotification.set(new Notification<T>(e));
for (Observer<? super T> o : observers) {
o.onError(e);
Expand Down
17 changes: 9 additions & 8 deletions rxjava-core/src/main/java/rx/subjects/PublishSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import rx.Notification;
import rx.Observer;
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
import rx.util.functions.Action1;

/**
Expand Down Expand Up @@ -58,20 +59,20 @@ public static <T> PublishSubject<T> create() {
*
* This will always run, even if Subject is in terminal state.
*/
new Action1<Observer<? super T>>() {
new Action1<SubjectObserver<? super T>>() {

@Override
public void call(Observer<? super T> o) {
public void call(SubjectObserver<? super T> o) {
// nothing onSubscribe unless in terminal state which is the next function
}
},
/**
* This function executes if the Subject is terminated before subscription occurs.
*/
new Action1<Observer<? super T>>() {
new Action1<SubjectObserver<? super T>>() {

@Override
public void call(Observer<? super T> o) {
public void call(SubjectObserver<? super T> o) {
/*
* If we are already terminated, or termination happens while trying to subscribe
* this will be invoked and we emit whatever the last terminal value was.
Expand All @@ -94,10 +95,10 @@ protected PublishSubject(OnSubscribeFunc<T> onSubscribe, SubjectSubscriptionMana

@Override
public void onCompleted() {
subscriptionManager.terminate(new Action1<Collection<Observer<? super T>>>() {
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {

@Override
public void call(Collection<Observer<? super T>> observers) {
public void call(Collection<SubjectObserver<? super T>> observers) {
lastNotification.set(new Notification<T>());
for (Observer<? super T> o : observers) {
o.onCompleted();
Expand All @@ -108,10 +109,10 @@ public void call(Collection<Observer<? super T>> observers) {

@Override
public void onError(final Throwable e) {
subscriptionManager.terminate(new Action1<Collection<Observer<? super T>>>() {
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {

@Override
public void call(Collection<Observer<? super T>> observers) {
public void call(Collection<SubjectObserver<? super T>> observers) {
lastNotification.set(new Notification<T>(e));
for (Observer<? super T> o : observers) {
o.onError(e);
Expand Down
Loading

0 comments on commit a144b0e

Please sign in to comment.