Skip to content

Commit

Permalink
Merge pull request #974 from benjchristensen/testing
Browse files Browse the repository at this point in the history
TestSubject, TestObserver and TestScheduler Improvements
  • Loading branch information
benjchristensen committed Mar 20, 2014
2 parents c573497 + 9785ada commit 3741e7b
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 5 deletions.
4 changes: 2 additions & 2 deletions rxjava-core/src/main/java/rx/observers/TestObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
*/
public class TestObserver<T> implements Observer<T> {


private final Observer<T> delegate;
private final ArrayList<T> onNextEvents = new ArrayList<T>();
private final ArrayList<Throwable> onErrorEvents = new ArrayList<Throwable>();
Expand Down Expand Up @@ -91,7 +90,8 @@ public void assertReceivedOnNext(List<T> items) {
throw new AssertionError("Value at index: " + i + " expected to be [null] but was: [" + onNextEvents.get(i) + "]");
}
} else if (!items.get(i).equals(onNextEvents.get(i))) {
throw new AssertionError("Value at index: " + i + " expected to be [" + items.get(i) + "] but was: [" + onNextEvents.get(i) + "]");
throw new AssertionError("Value at index: " + i + " expected to be [" + items.get(i) + "] (" + items.get(i).getClass().getSimpleName() + ") but was: [" + onNextEvents.get(i) + "] (" + onNextEvents.get(i).getClass().getSimpleName() + ")");

}
}

Expand Down
8 changes: 6 additions & 2 deletions rxjava-core/src/main/java/rx/schedulers/TestScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,21 @@ private void triggerActions(long targetTimeInNanos) {
time = targetTimeInNanos;
}

public Inner createInnerScheduler() {
return new InnerTestScheduler();
}

@Override
public Subscription schedule(Action1<Inner> action, long delayTime, TimeUnit unit) {
InnerTestScheduler inner = new InnerTestScheduler();
Inner inner = createInnerScheduler();
final TimedAction timedAction = new TimedAction(inner, time + unit.toNanos(delayTime), action);
queue.add(timedAction);
return inner;
}

@Override
public Subscription schedule(Action1<Inner> action) {
InnerTestScheduler inner = new InnerTestScheduler();
Inner inner = createInnerScheduler();
final TimedAction timedAction = new TimedAction(inner, 0, action);
queue.add(timedAction);
return inner;
Expand Down
182 changes: 182 additions & 0 deletions rxjava-core/src/main/java/rx/subjects/TestSubject.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.subjects;

import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import rx.Notification;
import rx.Observer;
import rx.Scheduler;
import rx.Scheduler.Inner;
import rx.functions.Action1;
import rx.schedulers.TestScheduler;
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;

/**
* Subject that, once and {@link Observer} has subscribed, publishes all subsequent events to the subscriber.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/S.PublishSubject.png">
* <p>
* Example usage:
* <p>
* <pre> {@code
* PublishSubject<Object> subject = PublishSubject.create();
// observer1 will receive all onNext and onCompleted events
subject.subscribe(observer1);
subject.onNext("one");
subject.onNext("two");
// observer2 will only receive "three" and onCompleted
subject.subscribe(observer2);
subject.onNext("three");
subject.onCompleted();
} </pre>
*
* @param <T>
*/
public final class TestSubject<T> extends Subject<T, T> {

public static <T> TestSubject<T> create(TestScheduler scheduler) {
final SubjectSubscriptionManager<T> subscriptionManager = new SubjectSubscriptionManager<T>();
// set a default value so subscriptions will immediately receive this until a new notification is received
final AtomicReference<Notification<T>> lastNotification = new AtomicReference<Notification<T>>();

OnSubscribe<T> onSubscribe = subscriptionManager.getOnSubscribeFunc(
/**
* This function executes at beginning of subscription.
*
* This will always run, even if Subject is in terminal state.
*/
new Action1<SubjectObserver<? super T>>() {

@Override
public void call(SubjectObserver<? super T> o) {
// nothing onSubscribe unless in terminal state which is the next function
}
},
/**
* This function executes if the Subject is terminated before subscription occurs.
*/
new Action1<SubjectObserver<? super T>>() {

@Override
public void call(SubjectObserver<? super T> o) {
/*
* If we are already terminated, or termination happens while trying to subscribe
* this will be invoked and we emit whatever the last terminal value was.
*/
lastNotification.get().accept(o);
}
});

return new TestSubject<T>(onSubscribe, subscriptionManager, lastNotification, scheduler);
}

private final SubjectSubscriptionManager<T> subscriptionManager;
private final AtomicReference<Notification<T>> lastNotification;
private final Scheduler.Inner innerScheduler;

protected TestSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, AtomicReference<Notification<T>> lastNotification, TestScheduler scheduler) {
super(onSubscribe);
this.subscriptionManager = subscriptionManager;
this.lastNotification = lastNotification;
this.innerScheduler = scheduler.createInnerScheduler();
}

@Override
public void onCompleted() {
onCompleted(innerScheduler.now());
}

private void _onCompleted() {
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {

@Override
public void call(Collection<SubjectObserver<? super T>> observers) {
lastNotification.set(Notification.<T> createOnCompleted());
for (Observer<? super T> o : observers) {
o.onCompleted();
}
}
});
}

public void onCompleted(long timeInMilliseconds) {
innerScheduler.schedule(new Action1<Inner>() {

@Override
public void call(Inner t1) {
_onCompleted();
}

}, timeInMilliseconds, TimeUnit.MILLISECONDS);
}

@Override
public void onError(final Throwable e) {
onError(e, innerScheduler.now());
}

private void _onError(final Throwable e) {
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {

@Override
public void call(Collection<SubjectObserver<? super T>> observers) {
lastNotification.set(Notification.<T> createOnError(e));
for (Observer<? super T> o : observers) {
o.onError(e);
}
}
});

}

public void onError(final Throwable e, long timeInMilliseconds) {
innerScheduler.schedule(new Action1<Inner>() {

@Override
public void call(Inner t1) {
_onError(e);
}

}, timeInMilliseconds, TimeUnit.MILLISECONDS);
}

@Override
public void onNext(T v) {
onNext(v, innerScheduler.now());
}

private void _onNext(T v) {
for (Observer<? super T> o : subscriptionManager.rawSnapshot()) {
o.onNext(v);
}
}

public void onNext(final T v, long timeInMilliseconds) {
innerScheduler.schedule(new Action1<Inner>() {

@Override
public void call(Inner t1) {
_onNext(v);
}

}, timeInMilliseconds, TimeUnit.MILLISECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void testAssertNotMatchValue() {
oi.subscribe(o);

thrown.expect(AssertionError.class);
thrown.expectMessage("Value at index: 1 expected to be [3] but was: [2]");
thrown.expectMessage("Value at index: 1 expected to be [3] (Integer) but was: [2] (Integer)");

o.assertReceivedOnNext(Arrays.asList(1, 3));
assertEquals(2, o.getOnNextEvents().size());
Expand Down

0 comments on commit 3741e7b

Please sign in to comment.