From c9d201993abdd7ec28a4f0a21f89594ce714a92e Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 10 Nov 2016 13:47:58 +0100 Subject: [PATCH] 2.x: add doFinally for handling post terminal or cancel cleanup --- src/main/java/io/reactivex/Flowable.java | 29 ++ .../operators/flowable/FlowableDoFinally.java | 262 +++++++++++ .../flowable/FlowableDoFinallyTest.java | 441 ++++++++++++++++++ 3 files changed, 732 insertions(+) create mode 100644 src/main/java/io/reactivex/internal/operators/flowable/FlowableDoFinally.java create mode 100644 src/test/java/io/reactivex/internal/operators/flowable/FlowableDoFinallyTest.java diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index ab817bf8a8..c892b467b5 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -7324,6 +7324,35 @@ public final Flowable distinctUntilChanged(BiPredicate return RxJavaPlugins.onAssembly(new FlowableDistinctUntilChanged(this, Functions.identity(), comparer)); } + /** + * Calls the specified action after this Flowable signals onError or onCompleted or gets cancelled by + * the downstream. + *

In case of a race between a terminal event and a cancellation, the provided {@code onFinally} action + * is executed at once per subscription. + *

Note that the {@code onFinally} action is shared between subscriptions and as such + * should be thread-safe. + *

+ *
Backpressure:
+ *
The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure + * behavior.
+ *
Scheduler:
+ *
{@code doFinally} does not operate by default on a particular {@link Scheduler}.
+ * Operator-fusion: + *
This operator supports normal and conditional Subscribers as well as boundary-limited + * synchronous or asynchronous queue-fusion.
+ *
+ * @param onFinally the action called when this Flowable terminates or gets cancelled + * @return the new Flowable instance + * @since 2.0.1 - experimental + */ + @BackpressureSupport(BackpressureKind.PASS_THROUGH) + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Flowable doFinally(Action onFinally) { + ObjectHelper.requireNonNull(onFinally, "onFinally is null"); + return RxJavaPlugins.onAssembly(new FlowableDoFinally(this, onFinally)); + } + /** * Registers an {@link Action} to be called when this Publisher invokes either * {@link Subscriber#onComplete onComplete} or {@link Subscriber#onError onError}. diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoFinally.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoFinally.java new file mode 100644 index 0000000000..9a55528a27 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoFinally.java @@ -0,0 +1,262 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.flowable; + +import org.reactivestreams.*; + +import io.reactivex.annotations.Experimental; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Action; +import io.reactivex.internal.fuseable.*; +import io.reactivex.internal.subscriptions.*; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Execute an action after an onError, onComplete or a cancel event. + * + * @param the value type + * @since 2.0.1 - experimental + */ +@Experimental +public final class FlowableDoFinally extends AbstractFlowableWithUpstream { + + final Action onFinally; + + public FlowableDoFinally(Publisher source, Action onFinally) { + super(source); + this.onFinally = onFinally; + } + + @Override + protected void subscribeActual(Subscriber s) { + if (s instanceof ConditionalSubscriber) { + source.subscribe(new DoFinallyConditionalSubscriber((ConditionalSubscriber)s, onFinally)); + } else { + source.subscribe(new DoFinallySubscriber(s, onFinally)); + } + } + + static final class DoFinallySubscriber extends BasicIntQueueSubscription implements Subscriber { + + private static final long serialVersionUID = 4109457741734051389L; + + final Subscriber actual; + + final Action onFinally; + + Subscription s; + + QueueSubscription qs; + + boolean syncFused; + + DoFinallySubscriber(Subscriber actual, Action onFinally) { + this.actual = actual; + this.onFinally = onFinally; + } + + @SuppressWarnings("unchecked") + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + if (s instanceof QueueSubscription) { + this.qs = (QueueSubscription)s; + } + + actual.onSubscribe(this); + } + } + + @Override + public void onNext(T t) { + actual.onNext(t); + } + + @Override + public void onError(Throwable t) { + actual.onError(t); + runFinally(); + } + + @Override + public void onComplete() { + actual.onComplete(); + runFinally(); + } + + @Override + public void cancel() { + s.cancel(); + runFinally(); + } + + @Override + public void request(long n) { + s.request(n); + } + + @Override + public int requestFusion(int mode) { + QueueSubscription qs = this.qs; + if (qs != null && (mode & BOUNDARY) == 0) { + int m = qs.requestFusion(mode); + if (m != NONE) { + syncFused = m == SYNC; + } + return m; + } + return NONE; + } + + @Override + public void clear() { + qs.clear(); + } + + @Override + public boolean isEmpty() { + return qs.isEmpty(); + } + + @Override + public T poll() throws Exception { + T v = qs.poll(); + if (v == null && syncFused) { + runFinally(); + } + return v; + } + + void runFinally() { + if (compareAndSet(0, 1)) { + try { + onFinally.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + RxJavaPlugins.onError(ex); + } + } + } + } + + static final class DoFinallyConditionalSubscriber extends BasicIntQueueSubscription implements ConditionalSubscriber { + + private static final long serialVersionUID = 4109457741734051389L; + + final ConditionalSubscriber actual; + + final Action onFinally; + + Subscription s; + + QueueSubscription qs; + + boolean syncFused; + + DoFinallyConditionalSubscriber(ConditionalSubscriber actual, Action onFinally) { + this.actual = actual; + this.onFinally = onFinally; + } + + @SuppressWarnings("unchecked") + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + if (s instanceof QueueSubscription) { + this.qs = (QueueSubscription)s; + } + + actual.onSubscribe(this); + } + } + + @Override + public void onNext(T t) { + actual.onNext(t); + } + + @Override + public boolean tryOnNext(T t) { + return actual.tryOnNext(t); + } + + @Override + public void onError(Throwable t) { + actual.onError(t); + runFinally(); + } + + @Override + public void onComplete() { + actual.onComplete(); + runFinally(); + } + + @Override + public void cancel() { + s.cancel(); + runFinally(); + } + + @Override + public void request(long n) { + s.request(n); + } + + @Override + public int requestFusion(int mode) { + QueueSubscription qs = this.qs; + if (qs != null && (mode & BOUNDARY) == 0) { + int m = qs.requestFusion(mode); + if (m != NONE) { + syncFused = m == SYNC; + } + return m; + } + return NONE; + } + + @Override + public void clear() { + qs.clear(); + } + + @Override + public boolean isEmpty() { + return qs.isEmpty(); + } + + @Override + public T poll() throws Exception { + T v = qs.poll(); + if (v == null && syncFused) { + runFinally(); + } + return v; + } + + void runFinally() { + if (compareAndSet(0, 1)) { + try { + onFinally.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + RxJavaPlugins.onError(ex); + } + } + } + } +} diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDoFinallyTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDoFinallyTest.java new file mode 100644 index 0000000000..cdc2187141 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDoFinallyTest.java @@ -0,0 +1,441 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.flowable; + +import static org.junit.Assert.*; + +import java.util.List; + +import org.junit.Test; +import org.reactivestreams.*; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.fuseable.QueueSubscription; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.UnicastProcessor; +import io.reactivex.subscribers.*; + +public class FlowableDoFinallyTest implements Action { + + int calls; + + @Override + public void run() throws Exception { + calls++; + } + + @Test + public void normalJust() { + Flowable.just(1) + .doFinally(this) + .test() + .assertResult(1); + + assertEquals(1, calls); + } + + @Test + public void normalEmpty() { + Flowable.empty() + .doFinally(this) + .test() + .assertResult(); + + assertEquals(1, calls); + } + + @Test + public void normalError() { + Flowable.error(new TestException()) + .doFinally(this) + .test() + .assertFailure(TestException.class); + + assertEquals(1, calls); + } + + @Test + public void normalTake() { + Flowable.range(1, 10) + .doFinally(this) + .take(5) + .test() + .assertResult(1, 2, 3, 4, 5); + + assertEquals(1, calls); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>() { + @Override + public Publisher apply(Flowable f) throws Exception { + return f.doFinally(FlowableDoFinallyTest.this); + } + }); + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>() { + @Override + public Publisher apply(Flowable f) throws Exception { + return f.doFinally(FlowableDoFinallyTest.this).filter(Functions.alwaysTrue()); + } + }); + } + + @Test + public void syncFused() { + TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.SYNC); + + Flowable.range(1, 5) + .doFinally(this) + .subscribe(ts); + + SubscriberFusion.assertFusion(ts, QueueSubscription.SYNC) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(1, calls); + } + + @Test + public void syncFusedBoundary() { + TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.SYNC | QueueSubscription.BOUNDARY); + + Flowable.range(1, 5) + .doFinally(this) + .subscribe(ts); + + SubscriberFusion.assertFusion(ts, QueueSubscription.NONE) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(1, calls); + } + + @Test + public void asyncFused() { + TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.ASYNC); + + UnicastProcessor up = UnicastProcessor.create(); + TestHelper.emit(up, 1, 2, 3, 4, 5); + + up + .doFinally(this) + .subscribe(ts); + + SubscriberFusion.assertFusion(ts, QueueSubscription.ASYNC) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(1, calls); + } + + @Test + public void asyncFusedBoundary() { + TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.ASYNC | QueueSubscription.BOUNDARY); + + UnicastProcessor up = UnicastProcessor.create(); + TestHelper.emit(up, 1, 2, 3, 4, 5); + + up + .doFinally(this) + .subscribe(ts); + + SubscriberFusion.assertFusion(ts, QueueSubscription.NONE) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(1, calls); + } + + + @Test + public void normalJustConditional() { + Flowable.just(1) + .doFinally(this) + .filter(Functions.alwaysTrue()) + .test() + .assertResult(1); + + assertEquals(1, calls); + } + + @Test + public void normalEmptyConditional() { + Flowable.empty() + .doFinally(this) + .filter(Functions.alwaysTrue()) + .test() + .assertResult(); + + assertEquals(1, calls); + } + + @Test + public void normalErrorConditional() { + Flowable.error(new TestException()) + .doFinally(this) + .filter(Functions.alwaysTrue()) + .test() + .assertFailure(TestException.class); + + assertEquals(1, calls); + } + + @Test + public void normalTakeConditional() { + Flowable.range(1, 10) + .doFinally(this) + .filter(Functions.alwaysTrue()) + .take(5) + .test() + .assertResult(1, 2, 3, 4, 5); + + assertEquals(1, calls); + } + + @Test + public void syncFusedConditional() { + TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.SYNC); + + Flowable.range(1, 5) + .doFinally(this) + .filter(Functions.alwaysTrue()) + .subscribe(ts); + + SubscriberFusion.assertFusion(ts, QueueSubscription.SYNC) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(1, calls); + } + + @Test + public void nonFused() { + TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.SYNC); + + Flowable.range(1, 5).hide() + .doFinally(this) + .subscribe(ts); + + SubscriberFusion.assertFusion(ts, QueueSubscription.NONE) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(1, calls); + } + + @Test + public void nonFusedConditional() { + TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.SYNC); + + Flowable.range(1, 5).hide() + .doFinally(this) + .filter(Functions.alwaysTrue()) + .subscribe(ts); + + SubscriberFusion.assertFusion(ts, QueueSubscription.NONE) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(1, calls); + } + + @Test + public void syncFusedBoundaryConditional() { + TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.SYNC | QueueSubscription.BOUNDARY); + + Flowable.range(1, 5) + .doFinally(this) + .filter(Functions.alwaysTrue()) + .subscribe(ts); + + SubscriberFusion.assertFusion(ts, QueueSubscription.NONE) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(1, calls); + } + + @Test + public void asyncFusedConditional() { + TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.ASYNC); + + UnicastProcessor up = UnicastProcessor.create(); + TestHelper.emit(up, 1, 2, 3, 4, 5); + + up + .doFinally(this) + .filter(Functions.alwaysTrue()) + .subscribe(ts); + + SubscriberFusion.assertFusion(ts, QueueSubscription.ASYNC) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(1, calls); + } + + @Test + public void asyncFusedBoundaryConditional() { + TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.ASYNC | QueueSubscription.BOUNDARY); + + UnicastProcessor up = UnicastProcessor.create(); + TestHelper.emit(up, 1, 2, 3, 4, 5); + + up + .doFinally(this) + .filter(Functions.alwaysTrue()) + .subscribe(ts); + + SubscriberFusion.assertFusion(ts, QueueSubscription.NONE) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(1, calls); + } + + @Test(expected = NullPointerException.class) + public void nullAction() { + Flowable.just(1).doFinally(null); + } + + @Test + public void actionThrows() { + List errors = TestHelper.trackPluginErrors(); + try { + Flowable.just(1) + .doFinally(new Action() { + @Override + public void run() throws Exception { + throw new TestException(); + } + }) + .test() + .assertResult(1) + .cancel(); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void actionThrowsConditional() { + List errors = TestHelper.trackPluginErrors(); + try { + Flowable.just(1) + .doFinally(new Action() { + @Override + public void run() throws Exception { + throw new TestException(); + } + }) + .filter(Functions.alwaysTrue()) + .test() + .assertResult(1) + .cancel(); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void clearIsEmpty() { + Flowable.range(1, 5) + .doFinally(this) + .subscribe(new Subscriber() { + + @Override + public void onSubscribe(Subscription s) { + @SuppressWarnings("unchecked") + QueueSubscription qs = (QueueSubscription)s; + + qs.requestFusion(QueueSubscription.ANY); + + assertFalse(qs.isEmpty()); + + try { + assertEquals(1, qs.poll().intValue()); + } catch (Throwable ex) { + throw new RuntimeException(ex); + } + + assertFalse(qs.isEmpty()); + + qs.clear(); + + assertTrue(qs.isEmpty()); + + qs.cancel(); + } + + @Override + public void onNext(Integer t) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + }); + + assertEquals(1, calls); + } + + @Test + public void clearIsEmptyConditional() { + Flowable.range(1, 5) + .doFinally(this) + .filter(Functions.alwaysTrue()) + .subscribe(new Subscriber() { + + @Override + public void onSubscribe(Subscription s) { + @SuppressWarnings("unchecked") + QueueSubscription qs = (QueueSubscription)s; + + qs.requestFusion(QueueSubscription.ANY); + + assertFalse(qs.isEmpty()); + + try { + assertEquals(1, qs.poll().intValue()); + } catch (Throwable ex) { + throw new RuntimeException(ex); + } + + assertFalse(qs.isEmpty()); + + qs.clear(); + + assertTrue(qs.isEmpty()); + + qs.cancel(); + } + + @Override + public void onNext(Integer t) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + }); + + assertEquals(1, calls); + } +}