From b204fc8c05fe4ac62c9c1631bbaaf978c350a653 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 9 May 2018 10:40:48 +0200 Subject: [PATCH 1/3] 2.x: Add MulticastProcessor --- .../processors/MulticastProcessor.java | 640 ++++++++++++++++++ .../processors/MulticastProcessorTest.java | 526 ++++++++++++++ 2 files changed, 1166 insertions(+) create mode 100644 src/main/java/io/reactivex/processors/MulticastProcessor.java create mode 100644 src/test/java/io/reactivex/processors/MulticastProcessorTest.java diff --git a/src/main/java/io/reactivex/processors/MulticastProcessor.java b/src/main/java/io/reactivex/processors/MulticastProcessor.java new file mode 100644 index 0000000000..6b4da79df9 --- /dev/null +++ b/src/main/java/io/reactivex/processors/MulticastProcessor.java @@ -0,0 +1,640 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.processors; + +import java.util.concurrent.atomic.*; + +import org.reactivestreams.*; + +import io.reactivex.annotations.*; +import io.reactivex.exceptions.*; +import io.reactivex.internal.fuseable.*; +import io.reactivex.internal.queue.*; +import io.reactivex.internal.subscriptions.*; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * A {@link FlowableProcessor} implementation that coordinates downstream requests through + * a front-buffer and stable-prefetching, optionally canceling the upstream if all + * subscribers have cancelled. + *

+ * + *

+ * This processor does not have a public constructor by design; a new empty instance of this + * {@code MulticastProcessor} can be created via the following {@code create} methods that + * allow configuring it: + *

+ *

+ * When the reference counting behavior is enabled, the {@code MulticastProcessor} cancels its + * upstream when all {@link Subscriber}s have cancelled. Late {@code Subscriber}s will then be + * immediately completed. + *

+ * Because of {@code MulticastProcessor} implements the {@link Subscriber} interface, calling + * {@code onSubscribe} is mandatory (Rule 2.12). + * If {@code MulticastProcessor} shoud run standalone, i.e., without subscribing the {@code MulticastProcessor} to another {@link Publisher}, + * use {@link #start()} or {@link #startUnbounded()} methods to initialize the internal buffer. + * Failing to do so will lead to {@link NullPointerException} at runtime. + *

+ * Use {@link #offer(Object)} to try and offer/emit items but don't fail if the + * internal buffer is full. + *

+ * A {@code MulticastProcessor} is a {@link Processor} type in the Reactive Streams specification, + * {@code null}s are not allowed (Rule 2.13) as + * parameters to {@link #onSubscribe(Subscription)}, {@link #offer(Object)}, {@link #onNext(Object)} and {@link #onError(Throwable)}. + * Such calls will result in a {@link NullPointerException} being thrown and the processor's state is not changed. + *

+ * Since a {@code MulticastProcessor} is a {@link io.reactivex.Flowable}, it supports backpressure. + * The backpressure from the currently subscribed {@link Subscriber}s are coordinated by emitting upstream + * items only if all of those {@code Subscriber}s have requested at least one item. This behavior + * is also called lockstep-mode because even if some {@code Subscriber}s can take any number + * of items, other {@code Subscriber}s requesting less or infrequently will slow down the overall + * throughput of the flow. + *

+ * Calling {@link #onNext(Object)}, {@link #offer(Object)}, {@link #onError(Throwable)} and {@link #onComplete()} + * is required to be serialized (called from the same thread or called non-overlappingly from different threads + * through external means of serialization). The {@link #toSerialized()} method available to all {@link FlowableProcessor}s + * provides such serialization and also protects against reentrance (i.e., when a downstream {@code Subscriber} + * consuming this processor also wants to call {@link #onNext(Object)} on this processor recursively). + *

+ * This {@code MulticastProcessor} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()}, + * {@link #getThrowable()} and {@link #hasSubscribers()}. This processor doesn't allow peeking into its buffer. + *

+ * When this {@code MulticastProcessor} is terminated via {@link #onError(Throwable)} or {@link #onComplete()}, + * all previously signaled but not yet consumed items will be still available to {@code Subscriber}s and the respective + * terminal even is only emitted when all previous items have been successfully delivered to {@code Subscriber}s. + * If there are no {@code Subscriber}s, the remaining items will be buffered indefinitely. + *

+ * The {@code MulticastProcessor} does not support clearing its cached events (to appear empty again). + *

+ *
Backpressure:
+ *
The backpressure from the currently subscribed {@code Subscriber}s are coordinated by emitting upstream + * items only if all of those {@code Subscriber}s have requested at least one item. This behavior + * is also called lockstep-mode because even if some {@code Subscriber}s can take any number + * of items, other {@code Subscriber}s requesting less or infrequently will slow down the overall + * throughput of the flow.
+ *
Scheduler:
+ *
{@code MulticastProcessor} does not operate by default on a particular {@link io.reactivex.Scheduler} and + * the {@code Subscriber}s get notified on an arbitrary thread in a serialized fashion.
+ *
+ *

+ * Example: + *


+    MulticastProcessor<Integer> mp = Flowable.range(1, 10)
+    .subscribeWith(MulticastProcessor.create());
+
+    mp.test().assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+
+    // --------------------
+
+    MulticastProcessor<Integer> mp2 = MulticastProcessor.create(4);
+    mp2.start();
+
+    assertTrue(mp2.offer(1));
+    assertTrue(mp2.offer(2));
+    assertTrue(mp2.offer(3));
+    assertTrue(mp2.offer(4));
+
+    assertFalse(mp2.offer(5));
+
+    mp2.onComplete();
+
+    mp2.test().assertResult(1, 2, 3, 4);
+ * 
+ * @param the input and output value type + * @since 2.1.14 - experimental + */ +@Experimental +@BackpressureSupport(BackpressureKind.FULL) +@SchedulerSupport(SchedulerSupport.NONE) +public final class MulticastProcessor extends FlowableProcessor { + + final AtomicInteger wip; + + final AtomicReference upstream; + + final AtomicReference[]> subscribers; + + final AtomicBoolean once; + + final int bufferSize; + + final int limit; + + final boolean refcount; + + volatile SimpleQueue queue; + + volatile boolean done; + volatile Throwable error; + + int consumed; + + int fusionMode; + + @SuppressWarnings("rawtypes") + static final MulticastSubscription[] EMPTY = new MulticastSubscription[0]; + + @SuppressWarnings("rawtypes") + static final MulticastSubscription[] TERMINATED = new MulticastSubscription[0]; + + /** + * Constructs a fresh instance with the default Flowable.bufferSize() prefetch + * amount and no refCount-behavior. + * @param the input and output value type + * @return the new MulticastProcessor instance + */ + public static MulticastProcessor create() { + return new MulticastProcessor(bufferSize(), false); + } + + + /** + * Constructs a fresh instance with the default Flowable.bufferSize() prefetch + * amount and no refCount-behavior. + * @param the input and output value type + * @param refCount if true and if all Subscribers have canceled, the upstream + * is cancelled + * @return the new MulticastProcessor instance + */ + public static MulticastProcessor create(boolean refCount) { + return new MulticastProcessor(bufferSize(), refCount); + } + + /** + * Constructs a fresh instance with the given prefetch amount and no refCount behavior. + * @param bufferSize the prefetch amount + * @param the input and output value type + * @return the new MulticastProcessor instance + */ + public static MulticastProcessor create(int bufferSize) { + return new MulticastProcessor(bufferSize, false); + } + + /** + * Constructs a fresh instance with the given prefetch amount and the optional + * refCount-behavior. + * @param bufferSize the prefetch amount + * @param refCount if true and if all Subscribers have canceled, the upstream + * is cancelled + * @param the input and output value type + * @return the new MulticastProcessor instance + */ + public static MulticastProcessor create(int bufferSize, boolean refCount) { + return new MulticastProcessor(bufferSize, refCount); + } + + /** + * Constructs a fresh instance with the given prefetch amount and the optional + * refCount-behavior. + * @param bufferSize the prefetch amount + * @param refCount if true and if all Subscribers have canceled, the upstream + * is cancelled + */ + @SuppressWarnings("unchecked") + MulticastProcessor(int bufferSize, boolean refCount) { + this.bufferSize = bufferSize; + this.limit = bufferSize - (bufferSize >> 2); + this.wip = new AtomicInteger(); + this.subscribers = new AtomicReference[]>(EMPTY); + this.upstream = new AtomicReference(); + this.refcount = refCount; + this.once = new AtomicBoolean(); + } + + /** + * Initializes this Processor by setting an upstream Subscription that + * ignores request amounts, uses a fixed buffer + * and allows using the onXXX and offer methods + * afterwards. + */ + public void start() { + if (SubscriptionHelper.setOnce(upstream, EmptySubscription.INSTANCE)) { + queue = new SpscArrayQueue(bufferSize); + } + } + + /** + * Initializes this Processor by setting an upstream Subscription that + * ignores request amounts, uses an unbounded buffer + * and allows using the onXXX and offer methods + * afterwards. + */ + public void startUnbounded() { + if (SubscriptionHelper.setOnce(upstream, EmptySubscription.INSTANCE)) { + queue = new SpscLinkedArrayQueue(bufferSize); + } + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.setOnce(upstream, s)) { + if (s instanceof QueueSubscription) { + @SuppressWarnings("unchecked") + QueueSubscription qs = (QueueSubscription)s; + + int m = qs.requestFusion(QueueSubscription.ANY); + if (m == QueueSubscription.SYNC) { + fusionMode = m; + queue = qs; + done = true; + drain(); + return; + } + if (m == QueueSubscription.ASYNC) { + fusionMode = m; + queue = qs; + + s.request(bufferSize); + return; + } + } + + queue = new SpscArrayQueue(bufferSize); + + s.request(bufferSize); + } + } + + @Override + public void onNext(T t) { + if (once.get()) { + return; + } + if (fusionMode == QueueSubscription.NONE) { + if (t == null) { + throw new NullPointerException("t is null"); + } + if (!queue.offer(t)) { + SubscriptionHelper.cancel(upstream); + onError(new MissingBackpressureException()); + return; + } + } + drain(); + } + + /** + * Tries to offer an item into the internal queue and returns false + * if the queue is full. + * @param t the item to offer, not null + * @return true if successful, false if the queue is full + */ + public boolean offer(T t) { + if (once.get()) { + return false; + } + if (t == null) { + throw new NullPointerException("t is null"); + } + if (fusionMode == QueueSubscription.NONE) { + if (queue.offer(t)) { + drain(); + return true; + } + } + return false; + } + + @Override + public void onError(Throwable t) { + if (t == null) { + throw new NullPointerException("t is null"); + } + if (once.compareAndSet(false, true)) { + error = t; + done = true; + drain(); + } else { + RxJavaPlugins.onError(t); + } + } + + @Override + public void onComplete() { + if (once.compareAndSet(false, true)) { + done = true; + drain(); + } + } + + @Override + public boolean hasSubscribers() { + return subscribers.get().length != 0; + } + + @Override + public boolean hasThrowable() { + return once.get() && error != null; + } + + @Override + public boolean hasComplete() { + return once.get() && error == null; + } + + @Override + public Throwable getThrowable() { + return once.get() ? error : null; + } + + @Override + protected void subscribeActual(Subscriber s) { + MulticastSubscription ms = new MulticastSubscription(s, this); + s.onSubscribe(ms); + if (add(ms)) { + if (ms.get() == Long.MIN_VALUE) { + remove(ms); + } else { + drain(); + } + } else { + if (once.get() || !refcount) { + Throwable ex = error; + if (ex != null) { + s.onError(ex); + return; + } + } + s.onComplete(); + } + } + + boolean add(MulticastSubscription inner) { + for (;;) { + MulticastSubscription[] a = subscribers.get(); + if (a == TERMINATED) { + return false; + } + int n = a.length; + @SuppressWarnings("unchecked") + MulticastSubscription[] b = new MulticastSubscription[n + 1]; + System.arraycopy(a, 0, b, 0, n); + b[n] = inner; + if (subscribers.compareAndSet(a, b)) { + return true; + } + } + } + + @SuppressWarnings("unchecked") + void remove(MulticastSubscription inner) { + for (;;) { + MulticastSubscription[] a = subscribers.get(); + int n = a.length; + if (n == 0) { + return; + } + + int j = -1; + for (int i = 0; i < n; i++) { + if (a[i] == inner) { + j = i; + break; + } + } + + if (j < 0) { + break; + } + + if (n == 1) { + if (refcount) { + if (subscribers.compareAndSet(a, TERMINATED)) { + SubscriptionHelper.cancel(upstream); + once.set(true); + break; + } + } else { + if (subscribers.compareAndSet(a, EMPTY)) { + break; + } + } + } else { + MulticastSubscription[] b = new MulticastSubscription[n - 1]; + System.arraycopy(a, 0, b, 0, j); + System.arraycopy(a, j + 1, b, j, n - j - 1); + if (subscribers.compareAndSet(a, b)) { + break; + } + } + } + } + + @SuppressWarnings("unchecked") + void drain() { + if (wip.getAndIncrement() != 0) { + return; + } + + int missed = 1; + AtomicReference[]> subs = subscribers; + int c = consumed; + int lim = limit; + SimpleQueue q = queue; + int fm = fusionMode; + + outer: + for (;;) { + + MulticastSubscription[] as = subs.get(); + int n = as.length; + + if (n != 0) { + long r = -1L; + + for (MulticastSubscription a : as) { + long ra = a.get(); + if (ra >= 0L) { + if (r == -1L) { + r = ra - a.emitted; + } else { + r = Math.min(r, ra - a.emitted); + } + } + } + + while (r > 0L) { + MulticastSubscription[] bs = subs.get(); + + if (bs == TERMINATED) { + q.clear(); + return; + } + + if (as != bs) { + continue outer; + } + + boolean d = done; + + T v; + + try { + v = q != null ? q.poll() : null; + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + SubscriptionHelper.cancel(upstream); + d = true; + v = null; + error = ex; + done = true; + } + boolean empty = v == null; + + if (d && empty) { + Throwable ex = error; + if (ex != null) { + for (MulticastSubscription inner : subs.getAndSet(TERMINATED)) { + inner.onError(ex); + } + } else { + for (MulticastSubscription inner : subs.getAndSet(TERMINATED)) { + inner.onComplete(); + } + } + return; + } + + if (empty) { + break; + } + + for (MulticastSubscription inner : as) { + inner.onNext(v); + } + + r--; + + if (fm != QueueSubscription.SYNC) { + if (++c == lim) { + c = 0; + upstream.get().request(lim); + } + } + } + + if (r == 0) { + MulticastSubscription[] bs = subs.get(); + + if (bs == TERMINATED) { + q.clear(); + return; + } + + if (as != bs) { + continue outer; + } + + if (done && q.isEmpty()) { + Throwable ex = error; + if (ex != null) { + for (MulticastSubscription inner : subs.getAndSet(TERMINATED)) { + inner.onError(ex); + } + } else { + for (MulticastSubscription inner : subs.getAndSet(TERMINATED)) { + inner.onComplete(); + } + } + return; + } + } + } + + int w = wip.get(); + if (w == missed) { + consumed = c; + missed = wip.addAndGet(-missed); + if (missed == 0) { + break; + } + } else { + missed = w; + } + } + } + + static final class MulticastSubscription extends AtomicLong implements Subscription { + + private static final long serialVersionUID = -363282618957264509L; + + final Subscriber actual; + + final MulticastProcessor parent; + + long emitted; + + MulticastSubscription(Subscriber actual, MulticastProcessor parent) { + this.actual = actual; + this.parent = parent; + } + + @Override + public void request(long n) { + if (SubscriptionHelper.validate(n)) { + for (;;) { + long r = get(); + if (r == Long.MIN_VALUE || r == Long.MAX_VALUE) { + break; + } + long u = r + n; + if (u < 0L) { + u = Long.MAX_VALUE; + } + if (compareAndSet(r, u)) { + parent.drain(); + break; + } + } + } + } + + @Override + public void cancel() { + if (getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) { + parent.remove(this); + } + } + + void onNext(T t) { + if (get() != Long.MIN_VALUE) { + emitted++; + actual.onNext(t); + } + } + + void onError(Throwable t) { + if (get() != Long.MIN_VALUE) { + actual.onError(t); + } + } + + void onComplete() { + if (get() != Long.MIN_VALUE) { + actual.onComplete(); + } + } + } +} diff --git a/src/test/java/io/reactivex/processors/MulticastProcessorTest.java b/src/test/java/io/reactivex/processors/MulticastProcessorTest.java new file mode 100644 index 0000000000..4d8ecf01dd --- /dev/null +++ b/src/test/java/io/reactivex/processors/MulticastProcessorTest.java @@ -0,0 +1,526 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.processors; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.List; + +import org.junit.Test; +import org.reactivestreams.Subscription; + +import io.reactivex.*; +import io.reactivex.exceptions.*; +import io.reactivex.functions.Function; +import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.UnicastProcessor; +import io.reactivex.subscribers.TestSubscriber; + +public class MulticastProcessorTest { + + @Test + public void complete() { + MulticastProcessor mp = MulticastProcessor.create(); + mp.start(); + + assertFalse(mp.hasSubscribers()); + assertFalse(mp.hasComplete()); + assertFalse(mp.hasThrowable()); + assertNull(mp.getThrowable()); + + TestSubscriber ts = mp.test(); + + assertTrue(mp.hasSubscribers()); + assertFalse(mp.hasComplete()); + assertFalse(mp.hasThrowable()); + assertNull(mp.getThrowable()); + + mp.onNext(1); + mp.onComplete(); + + ts.assertResult(1); + + assertFalse(mp.hasSubscribers()); + assertTrue(mp.hasComplete()); + assertFalse(mp.hasThrowable()); + assertNull(mp.getThrowable()); + + mp.test().assertResult(); + } + + + @Test + public void error() { + MulticastProcessor mp = MulticastProcessor.create(); + mp.start(); + + assertFalse(mp.hasSubscribers()); + assertFalse(mp.hasComplete()); + assertFalse(mp.hasThrowable()); + assertNull(mp.getThrowable()); + + TestSubscriber ts = mp.test(); + + assertTrue(mp.hasSubscribers()); + assertFalse(mp.hasComplete()); + assertFalse(mp.hasThrowable()); + assertNull(mp.getThrowable()); + + mp.onNext(1); + mp.onError(new IOException()); + + ts.assertFailure(IOException.class, 1); + + assertFalse(mp.hasSubscribers()); + assertFalse(mp.hasComplete()); + assertTrue(mp.hasThrowable()); + assertNotNull(mp.getThrowable()); + assertTrue("" + mp.getThrowable(), mp.getThrowable() instanceof IOException); + + mp.test().assertFailure(IOException.class); + } + + @Test + public void overflow() { + MulticastProcessor mp = MulticastProcessor.create(1); + mp.start(); + + TestSubscriber ts = mp.test(0); + + assertTrue(mp.offer(1)); + assertFalse(mp.offer(2)); + + mp.onNext(3); + + ts.assertEmpty(); + + ts.request(1); + + ts.assertFailure(MissingBackpressureException.class, 1); + + mp.test().assertFailure(MissingBackpressureException.class); + } + + @Test + public void backpressure() { + MulticastProcessor mp = MulticastProcessor.create(16, false); + mp.start(); + + for (int i = 0; i < 10; i++) { + mp.onNext(i); + } + mp.onComplete(); + + mp.test(0) + .assertEmpty() + .requestMore(1) + .assertValues(0) + .assertNotComplete() + .requestMore(2) + .assertValues(0, 1, 2) + .assertNotComplete() + .requestMore(3) + .assertValues(0, 1, 2, 3, 4, 5) + .assertNotComplete() + .requestMore(4) + .assertResult(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + } + + @Test + public void refCounted() { + MulticastProcessor mp = MulticastProcessor.create(true); + BooleanSubscription bs = new BooleanSubscription(); + + mp.onSubscribe(bs); + + assertFalse(bs.isCancelled()); + + mp.test().cancel(); + + assertTrue(bs.isCancelled()); + + assertFalse(mp.hasSubscribers()); + assertTrue(mp.hasComplete()); + assertFalse(mp.hasThrowable()); + assertNull(mp.getThrowable()); + + mp.test().assertResult(); + } + + @Test + public void refCounted2() { + MulticastProcessor mp = MulticastProcessor.create(16, true); + BooleanSubscription bs = new BooleanSubscription(); + + mp.onSubscribe(bs); + + assertFalse(bs.isCancelled()); + + mp.test(1, true); + + assertTrue(bs.isCancelled()); + + assertFalse(mp.hasSubscribers()); + assertTrue(mp.hasComplete()); + assertFalse(mp.hasThrowable()); + assertNull(mp.getThrowable()); + + mp.test().assertResult(); + } + + @Test + public void longRunning() { + MulticastProcessor mp = MulticastProcessor.create(16); + Flowable.range(1, 1000).subscribe(mp); + + mp.test().assertValueCount(1000).assertNoErrors().assertComplete(); + } + + + @Test + public void oneByOne() { + MulticastProcessor mp = MulticastProcessor.create(16); + Flowable.range(1, 1000).subscribe(mp); + + mp + .rebatchRequests(1) + .test() + .assertValueCount(1000) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void take() { + MulticastProcessor mp = MulticastProcessor.create(16); + Flowable.range(1, 1000).subscribe(mp); + + mp.take(10).test().assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void takeRefCount() { + MulticastProcessor mp = MulticastProcessor.create(16, true); + Flowable.range(1, 1000).subscribe(mp); + + mp.take(10).test().assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void takeRefCountExact() { + MulticastProcessor mp = MulticastProcessor.create(16, true); + Flowable.range(1, 10).subscribe(mp); + + mp + .rebatchRequests(10) + .take(10) + .test().assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void crossCancel() { + + final TestSubscriber ts1 = new TestSubscriber(); + + TestSubscriber ts2 = new TestSubscriber() { + @Override + public void onNext(Integer t) { + super.onNext(t); + ts1.cancel(); + ts1.onComplete(); + } + }; + + MulticastProcessor mp = MulticastProcessor.create(false); + + mp.subscribe(ts2); + mp.subscribe(ts1); + + mp.start(); + + mp.onNext(1); + mp.onComplete(); + + ts1.assertResult(); + ts2.assertResult(1); + } + + @Test + public void crossCancelError() { + + final TestSubscriber ts1 = new TestSubscriber(); + + TestSubscriber ts2 = new TestSubscriber() { + @Override + public void onError(Throwable t) { + super.onError(t); + ts1.cancel(); + ts1.onComplete(); + } + }; + + MulticastProcessor mp = MulticastProcessor.create(false); + + mp.subscribe(ts2); + mp.subscribe(ts1); + + mp.start(); + + mp.onNext(1); + mp.onError(new IOException()); + + ts1.assertResult(1); + ts2.assertFailure(IOException.class, 1); + } + + @Test + public void crossCancelComplete() { + + final TestSubscriber ts1 = new TestSubscriber(); + + TestSubscriber ts2 = new TestSubscriber() { + @Override + public void onComplete() { + super.onComplete(); + ts1.cancel(); + ts1.onNext(2); + ts1.onComplete(); + } + }; + + MulticastProcessor mp = MulticastProcessor.create(false); + + mp.subscribe(ts2); + mp.subscribe(ts1); + + mp.start(); + + mp.onNext(1); + mp.onComplete(); + + ts1.assertResult(1, 2); + ts2.assertResult(1); + } + + @Test + public void crossCancel1() { + + final TestSubscriber ts1 = new TestSubscriber(1); + + TestSubscriber ts2 = new TestSubscriber(1) { + @Override + public void onNext(Integer t) { + super.onNext(t); + ts1.cancel(); + ts1.onComplete(); + } + }; + + MulticastProcessor mp = MulticastProcessor.create(false); + + mp.subscribe(ts2); + mp.subscribe(ts1); + + mp.start(); + + mp.onNext(1); + mp.onComplete(); + + ts1.assertResult(); + ts2.assertResult(1); + } + + @Test + public void requestCancel() { + List errors = TestHelper.trackPluginErrors(); + try { + MulticastProcessor mp = MulticastProcessor.create(false); + + mp.subscribe(new FlowableSubscriber() { + + @Override + public void onNext(Integer t) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + + @Override + public void onSubscribe(Subscription t) { + t.request(-1); + t.request(1); + t.request(Long.MAX_VALUE); + t.request(Long.MAX_VALUE); + t.cancel(); + t.cancel(); + t.request(2); + } + }); + + TestHelper.assertError(errors, 0, IllegalArgumentException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void unbounded() { + MulticastProcessor mp = MulticastProcessor.create(4, false); + mp.startUnbounded(); + + for (int i = 0; i < 10; i++) { + assertTrue(mp.offer(i)); + } + mp.onComplete(); + + mp.test().assertResult(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + } + + @Test + public void multiStart() { + List errors = TestHelper.trackPluginErrors(); + try { + MulticastProcessor mp = MulticastProcessor.create(4, false); + + mp.start(); + mp.start(); + mp.startUnbounded(); + BooleanSubscription bs = new BooleanSubscription(); + mp.onSubscribe(bs); + + assertTrue(bs.isCancelled()); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + TestHelper.assertError(errors, 1, ProtocolViolationException.class); + TestHelper.assertError(errors, 2, ProtocolViolationException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test(expected = NullPointerException.class) + public void onNextNull() { + MulticastProcessor mp = MulticastProcessor.create(4, false); + mp.start(); + mp.onNext(null); + } + + + @Test(expected = NullPointerException.class) + public void onOfferNull() { + MulticastProcessor mp = MulticastProcessor.create(4, false); + mp.start(); + mp.offer(null); + } + + @Test(expected = NullPointerException.class) + public void onErrorNull() { + MulticastProcessor mp = MulticastProcessor.create(4, false); + mp.start(); + mp.onError(null); + } + + @Test + public void afterTerminated() { + List errors = TestHelper.trackPluginErrors(); + try { + MulticastProcessor mp = MulticastProcessor.create(); + mp.start(); + mp.onComplete(); + mp.onComplete(); + mp.onError(new IOException()); + mp.onNext(1); + mp.offer(1); + + mp.test().assertResult(); + + TestHelper.assertUndeliverable(errors, 0, IOException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void asyncFused() { + UnicastProcessor up = UnicastProcessor.create(); + MulticastProcessor mp = MulticastProcessor.create(4); + + up.subscribe(mp); + + TestSubscriber ts = mp.test(); + + for (int i = 0; i < 10; i++) { + up.onNext(i); + } + + assertFalse(mp.offer(10)); + + up.onComplete(); + + ts.assertResult(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + } + + @Test + public void fusionCrash() { + MulticastProcessor mp = Flowable.range(1, 5) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + throw new IOException(); + } + }) + .subscribeWith(MulticastProcessor.create()); + + mp.test().assertFailure(IOException.class); + } + + @Test + public void lockstep() { + MulticastProcessor mp = MulticastProcessor.create(); + + TestSubscriber ts1 = mp.test(); + mp.start(); + + mp.onNext(1); + mp.onNext(2); + + ts1.assertValues(1, 2); + + TestSubscriber ts2 = mp.test(0); + + ts2.assertEmpty(); + + mp.onNext(3); + + ts1.assertValues(1, 2); + ts2.assertEmpty(); + + mp.onComplete(); + + ts1.assertValues(1, 2); + ts2.assertEmpty(); + + ts2.request(1); + + ts1.assertResult(1, 2, 3); + ts2.assertResult(3); + } +} From 65cc3ec4282e1469e7d2a3c6af58467f0f893d7f Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 9 May 2018 12:21:43 +0200 Subject: [PATCH 2/3] Improve coverage, fix NPE before upstream arrives --- .../processors/MulticastProcessor.java | 171 ++++++----- .../processors/MulticastProcessorTest.java | 269 +++++++++++++++++- 2 files changed, 352 insertions(+), 88 deletions(-) diff --git a/src/main/java/io/reactivex/processors/MulticastProcessor.java b/src/main/java/io/reactivex/processors/MulticastProcessor.java index 6b4da79df9..6edc2d5890 100644 --- a/src/main/java/io/reactivex/processors/MulticastProcessor.java +++ b/src/main/java/io/reactivex/processors/MulticastProcessor.java @@ -452,126 +452,123 @@ void drain() { AtomicReference[]> subs = subscribers; int c = consumed; int lim = limit; - SimpleQueue q = queue; int fm = fusionMode; outer: for (;;) { - MulticastSubscription[] as = subs.get(); - int n = as.length; + SimpleQueue q = queue; - if (n != 0) { - long r = -1L; + if (q != null) { + MulticastSubscription[] as = subs.get(); + int n = as.length; - for (MulticastSubscription a : as) { - long ra = a.get(); - if (ra >= 0L) { - if (r == -1L) { - r = ra - a.emitted; - } else { - r = Math.min(r, ra - a.emitted); + if (n != 0) { + long r = -1L; + + for (MulticastSubscription a : as) { + long ra = a.get(); + if (ra >= 0L) { + if (r == -1L) { + r = ra - a.emitted; + } else { + r = Math.min(r, ra - a.emitted); + } } } - } - while (r > 0L) { - MulticastSubscription[] bs = subs.get(); + while (r > 0L) { + MulticastSubscription[] bs = subs.get(); - if (bs == TERMINATED) { - q.clear(); - return; - } + if (bs == TERMINATED) { + q.clear(); + return; + } - if (as != bs) { - continue outer; - } + if (as != bs) { + continue outer; + } - boolean d = done; + boolean d = done; - T v; + T v; - try { - v = q != null ? q.poll() : null; - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - SubscriptionHelper.cancel(upstream); - d = true; - v = null; - error = ex; - done = true; - } - boolean empty = v == null; - - if (d && empty) { - Throwable ex = error; - if (ex != null) { - for (MulticastSubscription inner : subs.getAndSet(TERMINATED)) { - inner.onError(ex); - } - } else { - for (MulticastSubscription inner : subs.getAndSet(TERMINATED)) { - inner.onComplete(); + try { + v = q.poll(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + SubscriptionHelper.cancel(upstream); + d = true; + v = null; + error = ex; + done = true; + } + boolean empty = v == null; + + if (d && empty) { + Throwable ex = error; + if (ex != null) { + for (MulticastSubscription inner : subs.getAndSet(TERMINATED)) { + inner.onError(ex); + } + } else { + for (MulticastSubscription inner : subs.getAndSet(TERMINATED)) { + inner.onComplete(); + } } + return; } - return; - } - if (empty) { - break; - } + if (empty) { + break; + } - for (MulticastSubscription inner : as) { - inner.onNext(v); - } + for (MulticastSubscription inner : as) { + inner.onNext(v); + } - r--; + r--; - if (fm != QueueSubscription.SYNC) { - if (++c == lim) { - c = 0; - upstream.get().request(lim); + if (fm != QueueSubscription.SYNC) { + if (++c == lim) { + c = 0; + upstream.get().request(lim); + } } } - } - if (r == 0) { - MulticastSubscription[] bs = subs.get(); + if (r == 0) { + MulticastSubscription[] bs = subs.get(); - if (bs == TERMINATED) { - q.clear(); - return; - } + if (bs == TERMINATED) { + q.clear(); + return; + } - if (as != bs) { - continue outer; - } + if (as != bs) { + continue outer; + } - if (done && q.isEmpty()) { - Throwable ex = error; - if (ex != null) { - for (MulticastSubscription inner : subs.getAndSet(TERMINATED)) { - inner.onError(ex); - } - } else { - for (MulticastSubscription inner : subs.getAndSet(TERMINATED)) { - inner.onComplete(); + if (done && q.isEmpty()) { + Throwable ex = error; + if (ex != null) { + for (MulticastSubscription inner : subs.getAndSet(TERMINATED)) { + inner.onError(ex); + } + } else { + for (MulticastSubscription inner : subs.getAndSet(TERMINATED)) { + inner.onComplete(); + } } + return; } - return; } } } - int w = wip.get(); - if (w == missed) { - consumed = c; - missed = wip.addAndGet(-missed); - if (missed == 0) { - break; - } - } else { - missed = w; + missed = wip.addAndGet(-missed); + if (missed == 0) { + break; } } } diff --git a/src/test/java/io/reactivex/processors/MulticastProcessorTest.java b/src/test/java/io/reactivex/processors/MulticastProcessorTest.java index 4d8ecf01dd..75c8f63c00 100644 --- a/src/test/java/io/reactivex/processors/MulticastProcessorTest.java +++ b/src/test/java/io/reactivex/processors/MulticastProcessorTest.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.util.List; +import java.util.concurrent.TimeUnit; import org.junit.Test; import org.reactivestreams.Subscription; @@ -26,7 +27,6 @@ import io.reactivex.functions.Function; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.plugins.RxJavaPlugins; -import io.reactivex.processors.UnicastProcessor; import io.reactivex.subscribers.TestSubscriber; public class MulticastProcessorTest { @@ -523,4 +523,271 @@ public void lockstep() { ts1.assertResult(1, 2, 3); ts2.assertResult(3); } + + @Test + public void rejectedFusion() { + + MulticastProcessor mp = MulticastProcessor.create(); + + TestHelper.rejectFlowableFusion() + .subscribe(mp); + + mp.test().assertEmpty(); + } + + @Test + public void addRemoveRaceNoRefCount() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final MulticastProcessor mp = MulticastProcessor.create(); + + final TestSubscriber ts = mp.test(); + final TestSubscriber ts2 = new TestSubscriber(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + mp.subscribe(ts2); + } + }; + + TestHelper.race(r1, r2); + + assertTrue(mp.hasSubscribers()); + } + } + + @Test + public void addRemoveRaceNoRefCountNonEmpty() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final MulticastProcessor mp = MulticastProcessor.create(); + + mp.test(); + final TestSubscriber ts = mp.test(); + final TestSubscriber ts2 = new TestSubscriber(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + mp.subscribe(ts2); + } + }; + + TestHelper.race(r1, r2); + + assertTrue(mp.hasSubscribers()); + } + } + + @Test + public void addRemoveRaceWitRefCount() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final MulticastProcessor mp = MulticastProcessor.create(true); + + final TestSubscriber ts = mp.test(); + final TestSubscriber ts2 = new TestSubscriber(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + mp.subscribe(ts2); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void cancelUpfront() { + MulticastProcessor mp = MulticastProcessor.create(); + + mp.test(0, true).assertEmpty(); + + assertFalse(mp.hasSubscribers()); + } + + + @Test + public void cancelUpfrontOtherConsumersPresent() { + MulticastProcessor mp = MulticastProcessor.create(); + + mp.test(); + + mp.test(0, true).assertEmpty(); + + assertTrue(mp.hasSubscribers()); + } + + @Test + public void consumerRequestRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final MulticastProcessor mp = MulticastProcessor.create(true); + mp.startUnbounded(); + mp.onNext(1); + mp.onNext(2); + + final TestSubscriber ts = mp.test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ts.request(1); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.request(1); + } + }; + + TestHelper.race(r1, r2); + + ts.assertValuesOnly(1, 2); + } + } + + @Test + public void consumerUpstreamRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final MulticastProcessor mp = MulticastProcessor.create(true); + + final Flowable source = Flowable.range(1, 5); + + final TestSubscriber ts = mp.test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ts.request(5); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + source.subscribe(mp); + } + }; + + TestHelper.race(r1, r2); + + ts + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3, 4, 5); + } + } + + @Test + public void emitCancelRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final MulticastProcessor mp = MulticastProcessor.create(true); + mp.startUnbounded(); + + final TestSubscriber ts = mp.test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + mp.onNext(1); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void cancelCancelDrain() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final MulticastProcessor mp = MulticastProcessor.create(true); + + final TestSubscriber ts1 = mp.test(); + final TestSubscriber ts2 = mp.test(); + + mp.test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ts1.cancel(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts2.cancel(); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void requestCancelRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final MulticastProcessor mp = MulticastProcessor.create(true); + + final TestSubscriber ts1 = mp.test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ts1.cancel(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts1.request(1); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void noUpstream() { + MulticastProcessor mp = MulticastProcessor.create(); + + TestSubscriber ts = mp.test(0); + + ts.request(1); + + assertTrue(mp.hasSubscribers()); + } + } From 8f484aad8548fcf2269b83c74ff73036c47dcfc1 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 17 May 2018 13:25:59 +0200 Subject: [PATCH 3/3] Address review feedback. --- .../processors/MulticastProcessor.java | 29 ++++++++++--------- .../processors/MulticastProcessorTest.java | 10 ++----- 2 files changed, 19 insertions(+), 20 deletions(-) diff --git a/src/main/java/io/reactivex/processors/MulticastProcessor.java b/src/main/java/io/reactivex/processors/MulticastProcessor.java index 6edc2d5890..fdc1652dc2 100644 --- a/src/main/java/io/reactivex/processors/MulticastProcessor.java +++ b/src/main/java/io/reactivex/processors/MulticastProcessor.java @@ -19,6 +19,7 @@ import io.reactivex.annotations.*; import io.reactivex.exceptions.*; +import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.fuseable.*; import io.reactivex.internal.queue.*; import io.reactivex.internal.subscriptions.*; @@ -51,11 +52,11 @@ * upstream when all {@link Subscriber}s have cancelled. Late {@code Subscriber}s will then be * immediately completed. *

- * Because of {@code MulticastProcessor} implements the {@link Subscriber} interface, calling + * Because {@code MulticastProcessor} implements the {@link Subscriber} interface, calling * {@code onSubscribe} is mandatory (Rule 2.12). - * If {@code MulticastProcessor} shoud run standalone, i.e., without subscribing the {@code MulticastProcessor} to another {@link Publisher}, + * If {@code MulticastProcessor} should run standalone, i.e., without subscribing the {@code MulticastProcessor} to another {@link Publisher}, * use {@link #start()} or {@link #startUnbounded()} methods to initialize the internal buffer. - * Failing to do so will lead to {@link NullPointerException} at runtime. + * Failing to do so will lead to a {@link NullPointerException} at runtime. *

* Use {@link #offer(Object)} to try and offer/emit items but don't fail if the * internal buffer is full. @@ -165,11 +166,12 @@ public final class MulticastProcessor extends FlowableProcessor { * @param the input and output value type * @return the new MulticastProcessor instance */ + @CheckReturnValue + @NonNull public static MulticastProcessor create() { return new MulticastProcessor(bufferSize(), false); } - /** * Constructs a fresh instance with the default Flowable.bufferSize() prefetch * amount and no refCount-behavior. @@ -178,6 +180,8 @@ public static MulticastProcessor create() { * is cancelled * @return the new MulticastProcessor instance */ + @CheckReturnValue + @NonNull public static MulticastProcessor create(boolean refCount) { return new MulticastProcessor(bufferSize(), refCount); } @@ -188,6 +192,8 @@ public static MulticastProcessor create(boolean refCount) { * @param the input and output value type * @return the new MulticastProcessor instance */ + @CheckReturnValue + @NonNull public static MulticastProcessor create(int bufferSize) { return new MulticastProcessor(bufferSize, false); } @@ -201,6 +207,8 @@ public static MulticastProcessor create(int bufferSize) { * @param the input and output value type * @return the new MulticastProcessor instance */ + @CheckReturnValue + @NonNull public static MulticastProcessor create(int bufferSize, boolean refCount) { return new MulticastProcessor(bufferSize, refCount); } @@ -214,6 +222,7 @@ public static MulticastProcessor create(int bufferSize, boolean refCount) */ @SuppressWarnings("unchecked") MulticastProcessor(int bufferSize, boolean refCount) { + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); this.bufferSize = bufferSize; this.limit = bufferSize - (bufferSize >> 2); this.wip = new AtomicInteger(); @@ -283,9 +292,7 @@ public void onNext(T t) { return; } if (fusionMode == QueueSubscription.NONE) { - if (t == null) { - throw new NullPointerException("t is null"); - } + ObjectHelper.requireNonNull(t, "onNext called with null. Null values are generally not allowed in 2.x operators and sources."); if (!queue.offer(t)) { SubscriptionHelper.cancel(upstream); onError(new MissingBackpressureException()); @@ -305,9 +312,7 @@ public boolean offer(T t) { if (once.get()) { return false; } - if (t == null) { - throw new NullPointerException("t is null"); - } + ObjectHelper.requireNonNull(t, "offer called with null. Null values are generally not allowed in 2.x operators and sources."); if (fusionMode == QueueSubscription.NONE) { if (queue.offer(t)) { drain(); @@ -319,9 +324,7 @@ public boolean offer(T t) { @Override public void onError(Throwable t) { - if (t == null) { - throw new NullPointerException("t is null"); - } + ObjectHelper.requireNonNull(t, "onError called with null. Null values are generally not allowed in 2.x operators and sources."); if (once.compareAndSet(false, true)) { error = t; done = true; diff --git a/src/test/java/io/reactivex/processors/MulticastProcessorTest.java b/src/test/java/io/reactivex/processors/MulticastProcessorTest.java index 75c8f63c00..c85665eacd 100644 --- a/src/test/java/io/reactivex/processors/MulticastProcessorTest.java +++ b/src/test/java/io/reactivex/processors/MulticastProcessorTest.java @@ -61,7 +61,6 @@ public void complete() { mp.test().assertResult(); } - @Test public void error() { MulticastProcessor mp = MulticastProcessor.create(); @@ -127,14 +126,11 @@ public void backpressure() { mp.test(0) .assertEmpty() .requestMore(1) - .assertValues(0) - .assertNotComplete() + .assertValuesOnly(0) .requestMore(2) - .assertValues(0, 1, 2) - .assertNotComplete() + .assertValuesOnly(0, 1, 2) .requestMore(3) - .assertValues(0, 1, 2, 3, 4, 5) - .assertNotComplete() + .assertValuesOnly(0, 1, 2, 3, 4, 5) .requestMore(4) .assertResult(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); }