diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java index 24cad0d2f7..ad4de6050f 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java @@ -50,16 +50,24 @@ public FlowableCache(Flowable source, int capacityHint) { protected void subscribeActual(Subscriber t) { // we can connect first because we replay everything anyway ReplaySubscription rp = new ReplaySubscription(t, state); - state.addChild(rp); - t.onSubscribe(rp); + boolean doReplay = true; + if (state.addChild(rp)) { + if (rp.requested.get() == ReplaySubscription.CANCELLED) { + state.removeChild(rp); + doReplay = false; + } + } + // we ensure a single connection here to save an instance field of AtomicBoolean in state. if (!once.get() && once.compareAndSet(false, true)) { state.connect(); } - // no need to call rp.replay() here because the very first request will trigger it anyway + if (doReplay) { + rp.replay(); + } } /** @@ -122,14 +130,15 @@ static final class CacheState extends LinkedArrayList implements FlowableSubs /** * Adds a ReplaySubscription to the subscribers array atomically. * @param p the target ReplaySubscription wrapping a downstream Subscriber with state + * @return true if the ReplaySubscription was added or false if the cache is already terminated */ - public void addChild(ReplaySubscription p) { + public boolean addChild(ReplaySubscription p) { // guarding by connection to save on allocating another object // thus there are two distinct locks guarding the value-addition and child come-and-go for (;;) { ReplaySubscription[] a = subscribers.get(); if (a == TERMINATED) { - return; + return false; } int n = a.length; @SuppressWarnings("unchecked") @@ -137,7 +146,7 @@ public void addChild(ReplaySubscription p) { System.arraycopy(a, 0, b, 0, n); b[n] = p; if (subscribers.compareAndSet(a, b)) { - return; + return true; } } } @@ -240,12 +249,16 @@ static final class ReplaySubscription extends AtomicInteger implements Subscription { private static final long serialVersionUID = -2557562030197141021L; - private static final long CANCELLED = -1; + private static final long CANCELLED = Long.MIN_VALUE; /** The actual child subscriber. */ final Subscriber child; /** The cache state object. */ final CacheState state; + /** + * Number of items requested and also the cancelled indicator if + * it contains {@link #CANCELLED}. + */ final AtomicLong requested; /** @@ -263,6 +276,9 @@ static final class ReplaySubscription */ int index; + /** Number of items emitted so far. */ + long emitted; + ReplaySubscription(Subscriber child, CacheState state) { this.child = child; this.state = state; @@ -271,17 +287,8 @@ static final class ReplaySubscription @Override public void request(long n) { if (SubscriptionHelper.validate(n)) { - for (;;) { - long r = requested.get(); - if (r == CANCELLED) { - return; - } - long u = BackpressureHelper.addCap(r, n); - if (requested.compareAndSet(r, u)) { - replay(); - return; - } - } + BackpressureHelper.addCancel(requested, n); + replay(); } } @@ -303,12 +310,13 @@ public void replay() { int missed = 1; final Subscriber child = this.child; AtomicLong rq = requested; + long e = emitted; for (;;) { long r = rq.get(); - if (r < 0L) { + if (r == CANCELLED) { return; } @@ -326,9 +334,8 @@ public void replay() { final int n = b.length - 1; int j = index; int k = currentIndexInBuffer; - int valuesProduced = 0; - while (j < s && r > 0) { + while (j < s && e != r) { if (rq.get() == CANCELLED) { return; } @@ -344,15 +351,14 @@ public void replay() { k++; j++; - r--; - valuesProduced++; + e++; } if (rq.get() == CANCELLED) { return; } - if (r == 0) { + if (r == e) { Object o = b[k]; if (NotificationLite.isComplete(o)) { child.onComplete(); @@ -364,15 +370,12 @@ public void replay() { } } - if (valuesProduced != 0) { - BackpressureHelper.producedCancel(rq, valuesProduced); - } - index = j; currentIndexInBuffer = k; currentBuffer = b; } + emitted = e; missed = addAndGet(-missed); if (missed == 0) { break; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java index 48a9d75333..12b1095fa8 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java @@ -411,11 +411,7 @@ public void clear() { @Override public boolean isEmpty() { - Iterator it = current; - if (it == null) { - return queue.isEmpty(); - } - return !it.hasNext(); + return current == null && queue.isEmpty(); } @Nullable diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java index 7574a93d4b..f32b58dfbc 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java @@ -1215,7 +1215,8 @@ public void subscribe(Subscriber child) { buf = bufferFactory.call(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); - throw ExceptionHelper.wrapOrThrow(ex); + EmptySubscription.error(ex, child); + return; } // create a new subscriber to source ReplaySubscriber u = new ReplaySubscriber(buf); diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservablePublish.java b/src/main/java/io/reactivex/internal/operators/observable/ObservablePublish.java index b8c5206c2b..d5fa94eb76 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservablePublish.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservablePublish.java @@ -148,13 +148,11 @@ static final class PublishObserver @SuppressWarnings("unchecked") @Override public void dispose() { - if (observers.get() != TERMINATED) { - InnerDisposable[] ps = observers.getAndSet(TERMINATED); - if (ps != TERMINATED) { - current.compareAndSet(PublishObserver.this, null); + InnerDisposable[] ps = observers.getAndSet(TERMINATED); + if (ps != TERMINATED) { + current.compareAndSet(PublishObserver.this, null); - DisposableHelper.dispose(s); - } + DisposableHelper.dispose(s); } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableCacheTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableCacheTest.java index 7cec7d4c33..50b9d69d05 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableCacheTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableCacheTest.java @@ -419,4 +419,124 @@ public void error() { .test(0L) .assertFailure(TestException.class); } + + @Test + public void cancelledUpFrontConnectAnyway() { + final AtomicInteger call = new AtomicInteger(); + Flowable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + return call.incrementAndGet(); + } + }) + .cache() + .test(1L, true) + .assertNoValues(); + + assertEquals(1, call.get()); + } + + @Test + public void cancelledUpFront() { + final AtomicInteger call = new AtomicInteger(); + Flowable f = Flowable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + return call.incrementAndGet(); + } + }).concatWith(Flowable.never()) + .cache(); + + f.test().assertValuesOnly(1); + + f.test(1L, true) + .assertEmpty(); + + assertEquals(1, call.get()); + } + + @Test + public void subscribeSubscribeRace() { + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + final Flowable cache = Flowable.range(1, 500).cache(); + + final TestSubscriber to1 = new TestSubscriber(); + final TestSubscriber to2 = new TestSubscriber(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + cache.subscribe(to1); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + cache.subscribe(to2); + } + }; + + TestHelper.race(r1, r2); + + to1 + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(500) + .assertComplete() + .assertNoErrors(); + + to2 + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(500) + .assertComplete() + .assertNoErrors(); + } + } + + @Test + public void subscribeCompleteRace() { + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + final PublishProcessor ps = PublishProcessor.create(); + + final Flowable cache = ps.cache(); + + cache.test(); + + final TestSubscriber to = new TestSubscriber(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + cache.subscribe(to); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ps.onComplete(); + } + }; + + TestHelper.race(r1, r2); + + to + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(); + } + } + + @Test + public void backpressure() { + Flowable.range(1, 5) + .cache() + .test(0) + .assertEmpty() + .requestMore(2) + .assertValuesOnly(1, 2) + .requestMore(3) + .assertResult(1, 2, 3, 4, 5); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterableTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterableTest.java index ebfe899488..b4cb5b5f5f 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterableTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterableTest.java @@ -17,7 +17,7 @@ import java.util.*; import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.*; import org.junit.*; import org.reactivestreams.*; @@ -27,8 +27,10 @@ import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; import io.reactivex.internal.fuseable.QueueSubscription; +import io.reactivex.internal.operators.flowable.FlowableFlattenIterable.FlattenIterableSubscriber; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.internal.util.ExceptionHelper; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; import io.reactivex.subscribers.*; @@ -957,4 +959,142 @@ public void remove() { assertEquals(1, counter.get()); } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>() { + @Override + public Publisher apply(Flowable f) + throws Exception { + return f.flatMapIterable(Functions.justFunction(Collections.emptyList())); + } + }); + } + + @Test + public void upstreamFusionRejected() { + TestSubscriber ts = new TestSubscriber(); + FlattenIterableSubscriber f = new FlattenIterableSubscriber(ts, + Functions.justFunction(Collections.emptyList()), 128); + + final AtomicLong requested = new AtomicLong(); + + f.onSubscribe(new QueueSubscription() { + + @Override + public int requestFusion(int mode) { + return 0; + } + + @Override + public boolean offer(Integer value) { + return false; + } + + @Override + public boolean offer(Integer v1, Integer v2) { + return false; + } + + @Override + public Integer poll() throws Exception { + return null; + } + + @Override + public boolean isEmpty() { + return false; + } + + @Override + public void clear() { + } + + @Override + public void request(long n) { + requested.set(n); + } + + @Override + public void cancel() { + } + }); + + assertEquals(128, requested.get()); + assertNotNull(f.queue); + + ts.assertEmpty(); + } + + @Test + public void onErrorLate() { + List errors = TestHelper.trackPluginErrors(); + try { + TestSubscriber ts = new TestSubscriber(); + FlattenIterableSubscriber f = new FlattenIterableSubscriber(ts, + Functions.justFunction(Collections.emptyList()), 128); + + f.onSubscribe(new BooleanSubscription()); + + f.onError(new TestException("first")); + + ts.assertFailureAndMessage(TestException.class, "first"); + + assertTrue(errors.isEmpty()); + + f.done = false; + f.onError(new TestException("second")); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "second"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void badRequest() { + TestHelper.assertBadRequestReported(Flowable.never().flatMapIterable(Functions.justFunction(Collections.emptyList()))); + } + + @Test + public void fusedCurrentIteratorEmpty() throws Exception { + TestSubscriber ts = new TestSubscriber(0); + FlattenIterableSubscriber f = new FlattenIterableSubscriber(ts, + Functions.justFunction(Arrays.asList(1, 2)), 128); + + f.onSubscribe(new BooleanSubscription()); + + f.onNext(1); + + assertFalse(f.isEmpty()); + + assertEquals(1, f.poll().intValue()); + + assertFalse(f.isEmpty()); + + assertEquals(2, f.poll().intValue()); + + assertTrue(f.isEmpty()); + } + + @Test + public void fusionRequestedState() throws Exception { + TestSubscriber ts = new TestSubscriber(0); + FlattenIterableSubscriber f = new FlattenIterableSubscriber(ts, + Functions.justFunction(Arrays.asList(1, 2)), 128); + + f.onSubscribe(new BooleanSubscription()); + + f.fusionMode = QueueSubscription.NONE; + + assertEquals(QueueSubscription.NONE, f.requestFusion(QueueSubscription.SYNC)); + + assertEquals(QueueSubscription.NONE, f.requestFusion(QueueSubscription.ASYNC)); + + f.fusionMode = QueueSubscription.SYNC; + + assertEquals(QueueSubscription.SYNC, f.requestFusion(QueueSubscription.SYNC)); + + assertEquals(QueueSubscription.NONE, f.requestFusion(QueueSubscription.ASYNC)); +} } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java index d081ea1099..72a65fe1d3 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java @@ -1943,4 +1943,40 @@ public void noHeadRetentionTime() { assertSame(o, buf.get()); } + + @Test(expected = TestException.class) + public void createBufferFactoryCrash() { + FlowableReplay.create(Flowable.just(1), new Callable>() { + @Override + public ReplayBuffer call() throws Exception { + throw new TestException(); + } + }) + .connect(); + } + + @Test + public void createBufferFactoryCrashOnSubscribe() { + FlowableReplay.create(Flowable.just(1), new Callable>() { + @Override + public ReplayBuffer call() throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void currentDisposedWhenConnecting() { + FlowableReplay fr = (FlowableReplay)FlowableReplay.create(Flowable.never(), 16); + fr.connect(); + + fr.current.get().dispose(); + assertTrue(fr.current.get().isDisposed()); + + fr.connect(); + + assertFalse(fr.current.get().isDisposed()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableObserveOnTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableObserveOnTest.java index 21460f2b21..8b0dedc22b 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableObserveOnTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableObserveOnTest.java @@ -21,13 +21,13 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; -import io.reactivex.annotations.Nullable; import org.junit.Test; import org.mockito.InOrder; import io.reactivex.*; import io.reactivex.Observable; import io.reactivex.Observer; +import io.reactivex.annotations.Nullable; import io.reactivex.disposables.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; @@ -719,4 +719,28 @@ public void clear() { .awaitDone(5, TimeUnit.SECONDS) .assertFailure(TestException.class); } + + @Test + public void outputFusedOneSignal() { + final BehaviorSubject bs = BehaviorSubject.createDefault(1); + + bs.observeOn(ImmediateThinScheduler.INSTANCE) + .concatMap(new Function>() { + @Override + public ObservableSource apply(Integer v) + throws Exception { + return Observable.just(v + 1); + } + }) + .subscribeWith(new TestObserver() { + @Override + public void onNext(Integer t) { + super.onNext(t); + if (t == 2) { + bs.onNext(2); + } + } + }) + .assertValuesOnly(2, 3); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservablePublishTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservablePublishTest.java index 9005371fe2..90a78db347 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservablePublishTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservablePublishTest.java @@ -738,4 +738,23 @@ public ObservableSource apply(Observable v) } ); } + + @Test + public void disposedUpfront() { + ConnectableObservable co = Observable.just(1) + .concatWith(Observable.never()) + .publish(); + + TestObserver to1 = co.test(); + + TestObserver to2 = co.test(true); + + co.connect(); + + to1.assertValuesOnly(1); + + to2.assertEmpty(); + + ((ObservablePublish)co).current.get().remove(null); + } } diff --git a/src/test/java/io/reactivex/observers/SerializedObserverTest.java b/src/test/java/io/reactivex/observers/SerializedObserverTest.java index 0fb3526df9..ee50af3730 100644 --- a/src/test/java/io/reactivex/observers/SerializedObserverTest.java +++ b/src/test/java/io/reactivex/observers/SerializedObserverTest.java @@ -1227,4 +1227,20 @@ public void run() { } } + + @Test + public void nullOnNext() { + + TestObserver ts = new TestObserver(); + + final SerializedObserver so = new SerializedObserver(ts); + + Disposable d = Disposables.empty(); + + so.onSubscribe(d); + + so.onNext(null); + + ts.assertFailureAndMessage(NullPointerException.class, "onNext called with null. Null values are generally not allowed in 2.x operators and sources."); + } } diff --git a/src/test/java/io/reactivex/subjects/SerializedSubjectTest.java b/src/test/java/io/reactivex/subjects/SerializedSubjectTest.java index 523e375ea0..b3d22d83c5 100644 --- a/src/test/java/io/reactivex/subjects/SerializedSubjectTest.java +++ b/src/test/java/io/reactivex/subjects/SerializedSubjectTest.java @@ -23,8 +23,10 @@ import io.reactivex.TestHelper; import io.reactivex.disposables.*; import io.reactivex.exceptions.TestException; +import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subscribers.*; public class SerializedSubjectTest { @@ -672,4 +674,18 @@ public void run() { ts.assertEmpty(); } } + + @Test + public void nullOnNext() { + + TestSubscriber ts = new TestSubscriber(); + + final SerializedSubscriber so = new SerializedSubscriber(ts); + + so.onSubscribe(new BooleanSubscription()); + + so.onNext(null); + + ts.assertFailureAndMessage(NullPointerException.class, "onNext called with null. Null values are generally not allowed in 2.x operators and sources."); + } }