Skip to content

Commit

Permalink
Merge pull request #1337 from zsxwing/fix-toFuture
Browse files Browse the repository at this point in the history
Make Future receive NoSuchElementException when the BlockingObservable is empty
  • Loading branch information
benjchristensen committed Jun 12, 2014
2 parents f7de6e9 + 8d06589 commit d56959d
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public static <T> Future<T> toFuture(Observable<? extends T> that) {
final AtomicReference<T> value = new AtomicReference<T>();
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();

final Subscription s = that.subscribe(new Subscriber<T>() {
final Subscription s = that.single().subscribe(new Subscriber<T>() {

@Override
public void onCompleted() {
Expand All @@ -67,11 +67,8 @@ public void onError(Throwable e) {

@Override
public void onNext(T v) {
if (!value.compareAndSet(null, v)) {
// this means we received more than one value and must fail as a Future can handle only a single value
error.compareAndSet(null, new IllegalStateException("Observable.toFuture() only supports sequences with a single value. Use .toList().toFuture() if multiple values are expected."));
finished.countDown();
}
// "single" guarantees there is only one "onNext"
value.set(v);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public void onNext(T value) {
if (isNonEmpty) {
hasTooManyElements = true;
subscriber.onError(new IllegalArgumentException("Sequence contains too many elements"));
unsubscribe();
} else {
this.value = value;
isNonEmpty = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,9 +405,11 @@ public T singleOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
/**
* Returns a {@link Future} representing the single value emitted by this {@code BlockingObservable}.
* <p>
* {@code toFuture} throws an exception if the {@code BlockingObservable} emits more than one item. If the
* {@code BlockingObservable} may emit more than item, use
* {@link Observable#toList toList()}{@code .toFuture()}.
* If {@link BlockingObservable} emits more than one item, {@link java.util.concurrent.Future} will receive an
* {@link java.lang.IllegalArgumentException}. If {@link BlockingObservable} is empty, {@link java.util.concurrent.Future}
* will receive an {@link java.util.NoSuchElementException}.
* <p>
* If the {@code BlockingObservable} may emit more than one item, use {@code Observable.toList().toBlocking().toFuture()}.
* <p>
* <img width="640" height="395" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/B.toFuture.png">
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static rx.internal.operators.BlockingOperatorToFuture.toFuture;

import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -51,12 +52,17 @@ public void testToFutureList() throws InterruptedException, ExecutionException {
assertEquals("three", f.get().get(2));
}

@Test(expected = ExecutionException.class)
public void testExceptionWithMoreThanOneElement() throws InterruptedException, ExecutionException {
@Test(expected = IllegalArgumentException.class)
public void testExceptionWithMoreThanOneElement() throws Throwable {
Observable<String> obs = Observable.from("one", "two");
Future<String> f = toFuture(obs);
assertEquals("one", f.get());
// we expect an exception since there are more than 1 element
try {
// we expect an exception since there are more than 1 element
f.get();
}
catch(ExecutionException e) {
throw e.getCause();
}
}

@Test
Expand Down Expand Up @@ -106,4 +112,23 @@ public void call(Subscriber<? super T> unused) {
// do nothing
}
}

@Test(expected = NoSuchElementException.class)
public void testGetWithEmptyObservable() throws Throwable {
Observable<String> obs = Observable.empty();
Future<String> f = obs.toBlocking().toFuture();
try {
f.get();
}
catch(ExecutionException e) {
throw e.getCause();
}
}

@Test
public void testGetWithASingleNullItem() throws Exception {
Observable<String> obs = Observable.from((String)null);
Future<String> f = obs.toBlocking().toFuture();
assertEquals(null, f.get());
}
}

0 comments on commit d56959d

Please sign in to comment.