Skip to content

Commit

Permalink
Merge pull request ReactiveX#474 from benjchristensen/issue-423-reduc…
Browse files Browse the repository at this point in the history
…e-empty-observable

BugFix: Reduce an empty observable
  • Loading branch information
benjchristensen committed Nov 7, 2013
2 parents 682590c + 2b17aba commit 736d658
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 1 deletion.
9 changes: 8 additions & 1 deletion rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -3520,11 +3520,18 @@ public Observable<T> onErrorReturn(Func1<Throwable, ? extends T> resumeFunction)
* Observable, whose result will be used in the next accumulator call
* @return an Observable that emits a single item that is the result of accumulating the
* output from the source Observable
* @throws IllegalArgumentException
* if Observable sequence is empty.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229154(v%3Dvs.103).aspx">MSDN: Observable.Aggregate</a>
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
*/
public Observable<T> reduce(Func2<T, T, T> accumulator) {
return create(OperationScan.scan(this, accumulator)).takeLast(1);
/*
* Discussion and confirmation of implementation at https://github.com/Netflix/RxJava/issues/423#issuecomment-27642532
*
* It should use last() not takeLast(1) since it needs to emit an error if the sequence is empty.
*/
return create(OperationScan.scan(this, accumulator)).last();
}

/**
Expand Down
26 changes: 26 additions & 0 deletions rxjava-core/src/test/java/rx/ObservableTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import rx.observables.ConnectableObservable;
import rx.subscriptions.BooleanSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func1;
import rx.util.functions.Func2;
Expand Down Expand Up @@ -210,6 +211,31 @@ public Integer call(Integer t1, Integer t2) {
verify(w).onNext(10);
}


/**
* A reduce should fail with an IllegalArgumentException if done on an empty Observable.
*/
@Test(expected = IllegalArgumentException.class)
public void testReduceWithEmptyObservable() {
Observable<Integer> observable = Observable.range(1, 0);
observable.reduce(new Func2<Integer, Integer, Integer>() {

@Override
public Integer call(Integer t1, Integer t2) {
return t1 + t2;
}

}).toBlockingObservable().forEach(new Action1<Integer>() {

@Override
public void call(Integer t1) {
// do nothing ... we expect an exception instead
}
});

fail("Expected an exception to be thrown");
}

@Test
public void testReduceWithInitialValue() {
Observable<Integer> observable = Observable.from(1, 2, 3, 4);
Expand Down

0 comments on commit 736d658

Please sign in to comment.