From 2af6e08e66f7827315d118ef0fce2564c9161994 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 24 Jan 2020 20:28:54 +0100 Subject: [PATCH 1/2] 3.x: Add onErrorComplete to F/O/S --- .../io/reactivex/rxjava3/core/Flowable.java | 52 ++++++- .../java/io/reactivex/rxjava3/core/Maybe.java | 4 + .../io/reactivex/rxjava3/core/Observable.java | 41 +++++ .../io/reactivex/rxjava3/core/Single.java | 43 ++++++ .../flowable/FlowableOnErrorComplete.java | 106 +++++++++++++ .../operators/maybe/MaybeOnErrorComplete.java | 7 +- .../observable/ObservableOnErrorComplete.java | 105 +++++++++++++ .../single/SingleOnErrorComplete.java | 43 ++++++ .../flowable/FlowableOnErrorCompleteTest.java | 143 ++++++++++++++++++ .../ObservableOnErrorCompleteTest.java | 134 ++++++++++++++++ .../single/SingleOnErrorCompleteTest.java | 113 ++++++++++++++ 11 files changed, 787 insertions(+), 4 deletions(-) create mode 100644 src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnErrorComplete.java create mode 100644 src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableOnErrorComplete.java create mode 100644 src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleOnErrorComplete.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnErrorCompleteTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableOnErrorCompleteTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleOnErrorCompleteTest.java diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java index 0106f42ce9..cdc7907894 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java @@ -19,6 +19,7 @@ import org.reactivestreams.*; import io.reactivex.rxjava3.annotations.*; +import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.exceptions.*; import io.reactivex.rxjava3.flowables.*; @@ -28,7 +29,7 @@ import io.reactivex.rxjava3.internal.jdk8.*; import io.reactivex.rxjava3.internal.operators.flowable.*; import io.reactivex.rxjava3.internal.operators.mixed.*; -import io.reactivex.rxjava3.internal.operators.observable.ObservableFromPublisher; +import io.reactivex.rxjava3.internal.operators.observable.*; import io.reactivex.rxjava3.internal.schedulers.ImmediateThinScheduler; import io.reactivex.rxjava3.internal.subscribers.*; import io.reactivex.rxjava3.internal.util.*; @@ -12308,6 +12309,55 @@ public final Flowable onBackpressureLatest() { return RxJavaPlugins.onAssembly(new FlowableOnBackpressureLatest<>(this)); } + /** + * Returns a {@code Flowable} instance that if the current {@code Flowable} emits an error, it will emit an {@code onComplete} + * and swallow the throwable. + *

+ * + *

+ *
Backpressure:
+ *
The operator doesn't interfere with backpressure which is determined by the current {@code Flowable}'s backpressure + * behavior.
+ *
Scheduler:
+ *
{@code onErrorComplete} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @return the new {@code Flowable} instance + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.PASS_THROUGH) + @NonNull + public final Flowable onErrorComplete() { + return onErrorComplete(Functions.alwaysTrue()); + } + + /** + * Returns a {@code Flowable} instance that if the current {@code Flowable} emits an error and the predicate returns + * {@code true}, it will emit an {@code onComplete} and swallow the throwable. + *

+ * + *

+ *
Backpressure:
+ *
The operator doesn't interfere with backpressure which is determined by the current {@code Flowable}'s backpressure + * behavior.
+ *
Scheduler:
+ *
{@code onErrorComplete} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param predicate the predicate to call when an {@link Throwable} is emitted which should return {@code true} + * if the {@code Throwable} should be swallowed and replaced with an {@code onComplete}. + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code predicate} is {@code null} + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.PASS_THROUGH) + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final Flowable onErrorComplete(@NonNull Predicate predicate) { + Objects.requireNonNull(predicate, "predicate is null"); + + return RxJavaPlugins.onAssembly(new FlowableOnErrorComplete<>(this, predicate)); + } + /** * Resumes the flow with a {@link Publisher} returned for the failure {@link Throwable} of the current {@code Flowable} by a * function instead of signaling the error via {@code onError}. diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java index 375a439e65..1d385ce148 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java +++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java @@ -4042,6 +4042,8 @@ public final Single toSingle() { /** * Returns a {@code Maybe} instance that if this {@code Maybe} emits an error, it will emit an {@code onComplete} * and swallow the throwable. + *

+ * *

*
Scheduler:
*
{@code onErrorComplete} does not operate by default on a particular {@link Scheduler}.
@@ -4058,6 +4060,8 @@ public final Maybe onErrorComplete() { /** * Returns a {@code Maybe} instance that if this {@code Maybe} emits an error and the predicate returns * {@code true}, it will emit an {@code onComplete} and swallow the throwable. + *

+ * *

*
Scheduler:
*
{@code onErrorComplete} does not operate by default on a particular {@link Scheduler}.
diff --git a/src/main/java/io/reactivex/rxjava3/core/Observable.java b/src/main/java/io/reactivex/rxjava3/core/Observable.java index 57456a9545..81fa93a580 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Observable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Observable.java @@ -10346,6 +10346,47 @@ public final Observable ofType(@NonNull Class clazz) { return filter(Functions.isInstanceOf(clazz)).cast(clazz); } + /** + * Returns an {@code Observable} instance that if the current {@code Observable} emits an error, it will emit an {@code onComplete} + * and swallow the throwable. + *

+ * + *

+ *
Scheduler:
+ *
{@code onErrorComplete} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @return the new {@code Observable} instance + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final Observable onErrorComplete() { + return onErrorComplete(Functions.alwaysTrue()); + } + + /** + * Returns an {@code Observable} instance that if the current {@code Observable} emits an error and the predicate returns + * {@code true}, it will emit an {@code onComplete} and swallow the throwable. + *

+ * + *

+ *
Scheduler:
+ *
{@code onErrorComplete} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param predicate the predicate to call when an {@link Throwable} is emitted which should return {@code true} + * if the {@code Throwable} should be swallowed and replaced with an {@code onComplete}. + * @return the new {@code Observable} instance + * @throws NullPointerException if {@code predicate} is {@code null} + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public final Observable onErrorComplete(@NonNull Predicate predicate) { + Objects.requireNonNull(predicate, "predicate is null"); + + return RxJavaPlugins.onAssembly(new ObservableOnErrorComplete<>(this, predicate)); + } + /** * Resumes the flow with an {@link ObservableSource} returned for the failure {@link Throwable} of the current {@code Observable} by a * function instead of signaling the error via {@code onError}. diff --git a/src/main/java/io/reactivex/rxjava3/core/Single.java b/src/main/java/io/reactivex/rxjava3/core/Single.java index 216d3deb87..f6e41cee5c 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Single.java +++ b/src/main/java/io/reactivex/rxjava3/core/Single.java @@ -3439,6 +3439,49 @@ public final Single onErrorResumeWith(@NonNull SingleSource fall return onErrorResumeNext(Functions.justFunction(fallback)); } + /** + * Returns a {@link Maybe} instance that if the current {@code Single} emits an error, it will emit an {@code onComplete} + * and swallow the throwable. + *

+ * + *

+ *
Scheduler:
+ *
{@code onErrorComplete} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @return the new {@code Maybe} instance + * @since 3.0.0 + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final Maybe onErrorComplete() { + return onErrorComplete(Functions.alwaysTrue()); + } + + /** + * Returns a {@link Maybe} instance that if this {@code Single} emits an error and the predicate returns + * {@code true}, it will emit an {@code onComplete} and swallow the throwable. + *

+ * + *

+ *
Scheduler:
+ *
{@code onErrorComplete} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param predicate the predicate to call when an {@link Throwable} is emitted which should return {@code true} + * if the {@code Throwable} should be swallowed and replaced with an {@code onComplete}. + * @return the new {@code Maybe} instance + * @throws NullPointerException if {@code predicate} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public final Maybe onErrorComplete(@NonNull Predicate predicate) { + Objects.requireNonNull(predicate, "predicate is null"); + + return RxJavaPlugins.onAssembly(new SingleOnErrorComplete<>(this, predicate)); + } + /** * Resumes the flow with a {@link SingleSource} returned for the failure {@link Throwable} of the current {@code Single} by a * function instead of signaling the error via {@code onError}. diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnErrorComplete.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnErrorComplete.java new file mode 100644 index 0000000000..d9aac04d31 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnErrorComplete.java @@ -0,0 +1,106 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.flowable; + +import org.reactivestreams.*; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.*; +import io.reactivex.rxjava3.functions.Predicate; +import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper; + +/** + * Emits an onComplete if the source emits an onError and the predicate returns true for + * that Throwable. + * + * @param the value type + */ +public final class FlowableOnErrorComplete extends AbstractFlowableWithUpstream { + + final Predicate predicate; + + public FlowableOnErrorComplete(Flowable source, + Predicate predicate) { + super(source); + this.predicate = predicate; + } + + @Override + protected void subscribeActual(Subscriber observer) { + source.subscribe(new OnErrorCompleteSubscriber<>(observer, predicate)); + } + + public static final class OnErrorCompleteSubscriber + implements FlowableSubscriber, Subscription { + + final Subscriber downstream; + + final Predicate predicate; + + Subscription upstream; + + public OnErrorCompleteSubscriber(Subscriber actual, Predicate predicate) { + this.downstream = actual; + this.predicate = predicate; + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.upstream, s)) { + this.upstream = s; + + downstream.onSubscribe(this); + } + } + + @Override + public void onNext(T value) { + downstream.onNext(value); + } + + @Override + public void onError(Throwable e) { + boolean b; + + try { + b = predicate.test(e); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + downstream.onError(new CompositeException(e, ex)); + return; + } + + if (b) { + downstream.onComplete(); + } else { + downstream.onError(e); + } + } + + @Override + public void onComplete() { + downstream.onComplete(); + } + + @Override + public void cancel() { + upstream.cancel(); + } + + @Override + public void request(long n) { + upstream.request(n); + } + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeOnErrorComplete.java b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeOnErrorComplete.java index 8792a93cc5..277e207eaf 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeOnErrorComplete.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeOnErrorComplete.java @@ -37,10 +37,11 @@ public MaybeOnErrorComplete(MaybeSource source, @Override protected void subscribeActual(MaybeObserver observer) { - source.subscribe(new OnErrorCompleteMaybeObserver<>(observer, predicate)); + source.subscribe(new OnErrorCompleteMultiObserver<>(observer, predicate)); } - static final class OnErrorCompleteMaybeObserver implements MaybeObserver, Disposable { + public static final class OnErrorCompleteMultiObserver + implements MaybeObserver, SingleObserver, Disposable { final MaybeObserver downstream; @@ -48,7 +49,7 @@ static final class OnErrorCompleteMaybeObserver implements MaybeObserver, Disposable upstream; - OnErrorCompleteMaybeObserver(MaybeObserver actual, Predicate predicate) { + public OnErrorCompleteMultiObserver(MaybeObserver actual, Predicate predicate) { this.downstream = actual; this.predicate = predicate; } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableOnErrorComplete.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableOnErrorComplete.java new file mode 100644 index 0000000000..fc4d69c9b5 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableOnErrorComplete.java @@ -0,0 +1,105 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.observable; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.*; +import io.reactivex.rxjava3.functions.Predicate; +import io.reactivex.rxjava3.internal.disposables.DisposableHelper; + +/** + * Emits an onComplete if the source emits an onError and the predicate returns true for + * that Throwable. + * + * @param the value type + */ +public final class ObservableOnErrorComplete extends AbstractObservableWithUpstream { + + final Predicate predicate; + + public ObservableOnErrorComplete(ObservableSource source, + Predicate predicate) { + super(source); + this.predicate = predicate; + } + + @Override + protected void subscribeActual(Observer observer) { + source.subscribe(new OnErrorCompleteObserver<>(observer, predicate)); + } + + public static final class OnErrorCompleteObserver + implements Observer, Disposable { + + final Observer downstream; + + final Predicate predicate; + + Disposable upstream; + + public OnErrorCompleteObserver(Observer actual, Predicate predicate) { + this.downstream = actual; + this.predicate = predicate; + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.upstream, d)) { + this.upstream = d; + + downstream.onSubscribe(this); + } + } + + @Override + public void onNext(T value) { + downstream.onNext(value); + } + + @Override + public void onError(Throwable e) { + boolean b; + + try { + b = predicate.test(e); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + downstream.onError(new CompositeException(e, ex)); + return; + } + + if (b) { + downstream.onComplete(); + } else { + downstream.onError(e); + } + } + + @Override + public void onComplete() { + downstream.onComplete(); + } + + @Override + public void dispose() { + upstream.dispose(); + } + + @Override + public boolean isDisposed() { + return upstream.isDisposed(); + } + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleOnErrorComplete.java b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleOnErrorComplete.java new file mode 100644 index 0000000000..7be9bd817c --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleOnErrorComplete.java @@ -0,0 +1,43 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.single; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.functions.Predicate; +import io.reactivex.rxjava3.internal.operators.maybe.MaybeOnErrorComplete; + +/** + * Emits an onComplete if the source emits an onError and the predicate returns true for + * that Throwable. + * + * @param the value type + * @since 3.0.0 + */ +public final class SingleOnErrorComplete extends Maybe { + + final Single source; + + final Predicate predicate; + + public SingleOnErrorComplete(Single source, + Predicate predicate) { + this.source = source; + this.predicate = predicate; + } + + @Override + protected void subscribeActual(MaybeObserver observer) { + source.subscribe(new MaybeOnErrorComplete.OnErrorCompleteMultiObserver(observer, predicate)); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnErrorCompleteTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnErrorCompleteTest.java new file mode 100644 index 0000000000..c1c89cbe27 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnErrorCompleteTest.java @@ -0,0 +1,143 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.flowable; + +import static org.junit.Assert.*; + +import java.io.IOException; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.exceptions.*; +import io.reactivex.rxjava3.processors.PublishProcessor; +import io.reactivex.rxjava3.subscribers.TestSubscriber; +import io.reactivex.rxjava3.testsupport.*; + +public class FlowableOnErrorCompleteTest { + + @Test + public void normal() { + Flowable.range(1, 10) + .onErrorComplete() + .test() + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void normalBackpressured() { + Flowable.range(1, 10) + .onErrorComplete() + .test(0) + .assertEmpty() + .requestMore(3) + .assertValuesOnly(1, 2, 3) + .requestMore(3) + .assertValuesOnly(1, 2, 3, 4, 5, 6) + .requestMore(4) + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void empty() { + Flowable.empty() + .onErrorComplete() + .test() + .assertResult(); + } + + @Test + public void error() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Flowable.error(new TestException()) + .onErrorComplete() + .test() + .assertResult(); + + assertTrue("" + errors, errors.isEmpty()); + }); + } + + @Test + public void errorMatches() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Flowable.error(new TestException()) + .onErrorComplete(error -> error instanceof TestException) + .test() + .assertResult(); + + assertTrue("" + errors, errors.isEmpty()); + }); + } + + @Test + public void errorNotMatches() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Flowable.error(new IOException()) + .onErrorComplete(error -> error instanceof TestException) + .test() + .assertFailure(IOException.class); + + assertTrue("" + errors, errors.isEmpty()); + }); + } + + @Test + public void errorPredicateCrash() throws Throwable { + TestHelper.withErrorTracking(errors -> { + TestSubscriberEx ts = Flowable.error(new IOException()) + .onErrorComplete(error -> { throw new TestException(); }) + .subscribeWith(new TestSubscriberEx<>()) + .assertFailure(CompositeException.class); + + TestHelper.assertError(ts, 0, IOException.class); + TestHelper.assertError(ts, 1, TestException.class); + + assertTrue("" + errors, errors.isEmpty()); + }); + } + + @Test + public void itemsThenError() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Flowable.range(1, 5) + .map(v -> 4 / (3 - v)) + .onErrorComplete() + .test() + .assertResult(2, 4); + + assertTrue("" + errors, errors.isEmpty()); + }); + } + + @Test + public void cancel() { + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = pp + .onErrorComplete() + .test(); + + assertTrue("No subscribers?!", pp.hasSubscribers()); + + ts.cancel(); + + assertFalse("Still subscribers?!", pp.hasSubscribers()); + } + + @Test + public void onSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(f -> f.onErrorComplete()); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableOnErrorCompleteTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableOnErrorCompleteTest.java new file mode 100644 index 0000000000..4c37745834 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableOnErrorCompleteTest.java @@ -0,0 +1,134 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.observable; + +import static org.junit.Assert.*; + +import java.io.IOException; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.exceptions.*; +import io.reactivex.rxjava3.observers.TestObserver; +import io.reactivex.rxjava3.subjects.PublishSubject; +import io.reactivex.rxjava3.testsupport.*; + +public class ObservableOnErrorCompleteTest { + + @Test + public void normal() { + Observable.range(1, 10) + .onErrorComplete() + .test() + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void empty() { + Observable.empty() + .onErrorComplete() + .test() + .assertResult(); + } + + @Test + public void error() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Observable.error(new TestException()) + .onErrorComplete() + .test() + .assertResult(); + + assertTrue("" + errors, errors.isEmpty()); + }); + } + + @Test + public void errorMatches() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Observable.error(new TestException()) + .onErrorComplete(error -> error instanceof TestException) + .test() + .assertResult(); + + assertTrue("" + errors, errors.isEmpty()); + }); + } + + @Test + public void errorNotMatches() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Observable.error(new IOException()) + .onErrorComplete(error -> error instanceof TestException) + .test() + .assertFailure(IOException.class); + + assertTrue("" + errors, errors.isEmpty()); + }); + } + + @Test + public void errorPredicateCrash() throws Throwable { + TestHelper.withErrorTracking(errors -> { + TestObserverEx to = Observable.error(new IOException()) + .onErrorComplete(error -> { throw new TestException(); }) + .subscribeWith(new TestObserverEx<>()) + .assertFailure(CompositeException.class); + + TestHelper.assertError(to, 0, IOException.class); + TestHelper.assertError(to, 1, TestException.class); + + assertTrue("" + errors, errors.isEmpty()); + }); + } + + @Test + public void itemsThenError() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Observable.range(1, 5) + .map(v -> 4 / (3 - v)) + .onErrorComplete() + .test() + .assertResult(2, 4); + + assertTrue("" + errors, errors.isEmpty()); + }); + } + + @Test + public void dispose() { + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps + .onErrorComplete() + .test(); + + assertTrue("No subscribers?!", ps.hasObservers()); + + to.dispose(); + + assertFalse("Still subscribers?!", ps.hasObservers()); + } + + @Test + public void onSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable(f -> f.onErrorComplete()); + } + + @Test + public void isDisposed() { + TestHelper.checkDisposed(PublishSubject.create().onErrorComplete()); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleOnErrorCompleteTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleOnErrorCompleteTest.java new file mode 100644 index 0000000000..189759d6c6 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleOnErrorCompleteTest.java @@ -0,0 +1,113 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.single; + +import static org.junit.Assert.*; + +import java.io.IOException; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.exceptions.*; +import io.reactivex.rxjava3.observers.TestObserver; +import io.reactivex.rxjava3.subjects.*; +import io.reactivex.rxjava3.testsupport.*; + +public class SingleOnErrorCompleteTest { + + @Test + public void normal() { + Single.just(1) + .onErrorComplete() + .test() + .assertResult(1); + } + + @Test + public void error() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Single.error(new TestException()) + .onErrorComplete() + .test() + .assertResult(); + + assertTrue("" + errors, errors.isEmpty()); + }); + } + + @Test + public void errorMatches() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Single.error(new TestException()) + .onErrorComplete(error -> error instanceof TestException) + .test() + .assertResult(); + + assertTrue("" + errors, errors.isEmpty()); + }); + } + + @Test + public void errorNotMatches() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Single.error(new IOException()) + .onErrorComplete(error -> error instanceof TestException) + .test() + .assertFailure(IOException.class); + + assertTrue("" + errors, errors.isEmpty()); + }); + } + + @Test + public void errorPredicateCrash() throws Throwable { + TestHelper.withErrorTracking(errors -> { + TestObserverEx to = Single.error(new IOException()) + .onErrorComplete(error -> { throw new TestException(); }) + .subscribeWith(new TestObserverEx<>()) + .assertFailure(CompositeException.class); + + TestHelper.assertError(to, 0, IOException.class); + TestHelper.assertError(to, 1, TestException.class); + + assertTrue("" + errors, errors.isEmpty()); + }); + } + + @Test + public void dispose() { + SingleSubject ss = SingleSubject.create(); + + TestObserver to = ss + .onErrorComplete() + .test(); + + assertTrue("No subscribers?!", ss.hasObservers()); + + to.dispose(); + + assertFalse("Still subscribers?!", ss.hasObservers()); + } + + @Test + public void onSubscribe() { + TestHelper.checkDoubleOnSubscribeSingleToMaybe(f -> f.onErrorComplete()); + } + + @Test + public void isDisposed() { + TestHelper.checkDisposed(SingleSubject.create().onErrorComplete()); + } +} From 58dbf8a882a873bcf7c94eee6101b803e27fa5c7 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 24 Jan 2020 21:09:05 +0100 Subject: [PATCH 2/2] Add version tags. --- src/main/java/io/reactivex/rxjava3/core/Flowable.java | 2 ++ src/main/java/io/reactivex/rxjava3/core/Observable.java | 2 ++ .../internal/operators/flowable/FlowableOnErrorComplete.java | 1 + .../operators/observable/ObservableOnErrorComplete.java | 1 + 4 files changed, 6 insertions(+) diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java index cdc7907894..a627eb3474 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java @@ -12322,6 +12322,7 @@ public final Flowable onBackpressureLatest() { *
{@code onErrorComplete} does not operate by default on a particular {@link Scheduler}.
* * @return the new {@code Flowable} instance + * @since 3.0.0 */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) @@ -12347,6 +12348,7 @@ public final Flowable onErrorComplete() { * if the {@code Throwable} should be swallowed and replaced with an {@code onComplete}. * @return the new {@code Flowable} instance * @throws NullPointerException if {@code predicate} is {@code null} + * @since 3.0.0 */ @CheckReturnValue @BackpressureSupport(BackpressureKind.PASS_THROUGH) diff --git a/src/main/java/io/reactivex/rxjava3/core/Observable.java b/src/main/java/io/reactivex/rxjava3/core/Observable.java index 81fa93a580..f599ce0eeb 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Observable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Observable.java @@ -10356,6 +10356,7 @@ public final Observable ofType(@NonNull Class clazz) { *
{@code onErrorComplete} does not operate by default on a particular {@link Scheduler}.
* * @return the new {@code Observable} instance + * @since 3.0.0 */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) @@ -10377,6 +10378,7 @@ public final Observable onErrorComplete() { * if the {@code Throwable} should be swallowed and replaced with an {@code onComplete}. * @return the new {@code Observable} instance * @throws NullPointerException if {@code predicate} is {@code null} + * @since 3.0.0 */ @CheckReturnValue @NonNull diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnErrorComplete.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnErrorComplete.java index d9aac04d31..c477917f35 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnErrorComplete.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnErrorComplete.java @@ -25,6 +25,7 @@ * that Throwable. * * @param the value type + * @since 3.0.0 */ public final class FlowableOnErrorComplete extends AbstractFlowableWithUpstream { diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableOnErrorComplete.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableOnErrorComplete.java index fc4d69c9b5..6854a5ce6d 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableOnErrorComplete.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableOnErrorComplete.java @@ -24,6 +24,7 @@ * that Throwable. * * @param the value type + * @since 3.0.0 */ public final class ObservableOnErrorComplete extends AbstractObservableWithUpstream {