Skip to content

Commit

Permalink
2.x: Fix Flowable.singleOrError().toFlowable() not signalling NSEE (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Mar 9, 2018
1 parent 5e5d5a2 commit 7bdcb59
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package io.reactivex.internal.operators.flowable;

import java.util.NoSuchElementException;

import org.reactivestreams.*;

import io.reactivex.*;
Expand All @@ -23,14 +25,17 @@ public final class FlowableSingle<T> extends AbstractFlowableWithUpstream<T, T>

final T defaultValue;

public FlowableSingle(Flowable<T> source, T defaultValue) {
final boolean failOnEmpty;

public FlowableSingle(Flowable<T> source, T defaultValue, boolean failOnEmpty) {
super(source);
this.defaultValue = defaultValue;
this.failOnEmpty = failOnEmpty;
}

@Override
protected void subscribeActual(Subscriber<? super T> s) {
source.subscribe(new SingleElementSubscriber<T>(s, defaultValue));
source.subscribe(new SingleElementSubscriber<T>(s, defaultValue, failOnEmpty));
}

static final class SingleElementSubscriber<T> extends DeferredScalarSubscription<T>
Expand All @@ -40,13 +45,16 @@ static final class SingleElementSubscriber<T> extends DeferredScalarSubscription

final T defaultValue;

final boolean failOnEmpty;

Subscription s;

boolean done;

SingleElementSubscriber(Subscriber<? super T> actual, T defaultValue) {
SingleElementSubscriber(Subscriber<? super T> actual, T defaultValue, boolean failOnEmpty) {
super(actual);
this.defaultValue = defaultValue;
this.failOnEmpty = failOnEmpty;
}

@Override
Expand Down Expand Up @@ -94,7 +102,11 @@ public void onComplete() {
v = defaultValue;
}
if (v == null) {
actual.onComplete();
if (failOnEmpty) {
actual.onError(new NoSuchElementException());
} else {
actual.onComplete();
}
} else {
complete(v);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ protected void subscribeActual(MaybeObserver<? super T> s) {

@Override
public Flowable<T> fuseToFlowable() {
return RxJavaPlugins.onAssembly(new FlowableSingle<T>(source, null));
return RxJavaPlugins.onAssembly(new FlowableSingle<T>(source, null, false));
}

static final class SingleElementSubscriber<T>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ protected void subscribeActual(SingleObserver<? super T> s) {

@Override
public Flowable<T> fuseToFlowable() {
return RxJavaPlugins.onAssembly(new FlowableSingle<T>(source, defaultValue));
return RxJavaPlugins.onAssembly(new FlowableSingle<T>(source, defaultValue, true));
}

static final class SingleElementSubscriber<T>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -796,4 +796,13 @@ public void cancelAsFlowable() {

assertFalse(pp.hasSubscribers());
}

@Test
public void singleOrError() {
Flowable.empty()
.singleOrError()
.toFlowable()
.test()
.assertFailure(NoSuchElementException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -556,4 +556,13 @@ public MaybeSource<Object> apply(Observable<Object> o) throws Exception {
}
});
}

@Test
public void singleOrError() {
Observable.empty()
.singleOrError()
.toObservable()
.test()
.assertFailure(NoSuchElementException.class);
}
}

0 comments on commit 7bdcb59

Please sign in to comment.