Skip to content

Commit

Permalink
Merge pull request #1742 from benjchristensen/issue-1571
Browse files Browse the repository at this point in the history
EmptyObserver and TestObserver
  • Loading branch information
benjchristensen committed Oct 10, 2014
2 parents ea13546 + 35c4fa1 commit b2fe579
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action0;
import rx.observers.EmptyObserver;
import rx.observers.Subscribers;
import rx.observers.Observers;
import rx.subjects.Subject;
import rx.subscriptions.Subscriptions;

Expand Down Expand Up @@ -51,9 +50,6 @@
*/
public class BufferUntilSubscriber<T> extends Subject<T, T> {

@SuppressWarnings("rawtypes")
private final static Observer EMPTY_OBSERVER = new EmptyObserver();

/**
* @warn create() undescribed
* @return
Expand Down Expand Up @@ -96,7 +92,7 @@ public void call(final Subscriber<? super T> s) {
s.add(Subscriptions.create(new Action0() {
@Override
public void call() {
state.observerRef = EMPTY_OBSERVER;
state.observerRef = Observers.empty();
}
}));
boolean win = false;
Expand Down
41 changes: 0 additions & 41 deletions src/main/java/rx/observers/EmptyObserver.java

This file was deleted.

8 changes: 3 additions & 5 deletions src/main/java/rx/observers/Observers.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ public final void onNext(Object args) {

/**
* Returns an inert {@link Observer} that does nothing in response to the emissions or notifications from
* any {@code Observable} it subscribes to. This is different, however, from an {@link EmptyObserver}, in
* that it will throw an exception if its {@link Observer#onError onError} method is called (whereas
* {@code EmptyObserver} will swallow the error in such a case).
* any {@code Observable} it subscribes to but will throw an exception if its {@link Observer#onError onError} method is called.
*
* @return an inert {@code Observer}
*/
Expand All @@ -59,8 +57,8 @@ public static <T> Observer<T> empty() {

/**
* Creates an {@link Observer} that receives the emissions of any {@code Observable} it subscribes to via
* {@link Observer#onNext onNext} but ignores {@link Observer#onError onError} and
* {@link Observer#onCompleted onCompleted} notifications.
* {@link Observer#onNext onNext} but ignores {@link Observer#onCompleted onCompleted} notifications.
* It will throws an {@link OnErrorNotImplementedException} if {@link Observer#onError onError} is invoked.
*
* @param onNext
* a function that handles each item emitted by an {@code Observable}
Expand Down
22 changes: 21 additions & 1 deletion src/main/java/rx/observers/TestObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ public TestObserver(Observer<T> delegate) {
this.delegate = delegate;
}

@SuppressWarnings("unchecked")
public TestObserver() {
this.delegate = Observers.empty();
this.delegate = (Observer<T>) INERT;
}

@Override
Expand Down Expand Up @@ -153,4 +154,23 @@ public void assertTerminalEvent() {
}
}

// do nothing ... including swallowing errors
private static Observer<Object> INERT = new Observer<Object>() {

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Object t) {

}

};
}
5 changes: 5 additions & 0 deletions src/test/java/rx/observers/TestObserverTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,5 +119,10 @@ public void testWrappingMockWhenUnsubscribeInvolved() {
inOrder.verify(mockObserver, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}

@Test
public void testErrorSwallowed() {
Observable.error(new RuntimeException()).subscribe(new TestObserver<Object>());
}

}

0 comments on commit b2fe579

Please sign in to comment.