diff --git a/src/main/java/io/reactivex/exceptions/CompositeException.java b/src/main/java/io/reactivex/exceptions/CompositeException.java index 0b18f8ce3f..748b964cf7 100644 --- a/src/main/java/io/reactivex/exceptions/CompositeException.java +++ b/src/main/java/io/reactivex/exceptions/CompositeException.java @@ -278,7 +278,7 @@ public int size() { * @param e the {@link Throwable} {@code e}. * @return The root cause of {@code e}. If {@code e.getCause()} returns {@code null} or {@code e}, just return {@code e} itself. */ - private Throwable getRootCause(Throwable e) { + /*private */Throwable getRootCause(Throwable e) { Throwable root = e.getCause(); if (root == null || cause == root) { return e; diff --git a/src/main/java/io/reactivex/internal/functions/ObjectHelper.java b/src/main/java/io/reactivex/internal/functions/ObjectHelper.java index b9ce56560e..f574f7f477 100644 --- a/src/main/java/io/reactivex/internal/functions/ObjectHelper.java +++ b/src/main/java/io/reactivex/internal/functions/ObjectHelper.java @@ -71,7 +71,7 @@ public static int compare(int v1, int v2) { } /** - * Compares two integer values similar to Long.compare. + * Compares two long values similar to Long.compare. * @param v1 the first value * @param v2 the second value * @return the comparison result diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundarySupplier.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundarySupplier.java index af98d86cfe..a4cdcb8ced 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundarySupplier.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundarySupplier.java @@ -196,24 +196,20 @@ void next() { BufferBoundarySubscriber bs = new BufferBoundarySubscriber(this); - Disposable o = other.get(); - - if (!other.compareAndSet(o, bs)) { - return; - } - - U b; - synchronized (this) { - b = buffer; - if (b == null) { - return; + if (DisposableHelper.replace(other, bs)) { + U b; + synchronized (this) { + b = buffer; + if (b == null) { + return; + } + buffer = next; } - buffer = next; - } - boundary.subscribe(bs); + boundary.subscribe(bs); - fastPathEmitMax(b, false, this); + fastPathEmitMax(b, false, this); + } } @Override diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java index 2597a6d140..8c220a506a 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java @@ -177,8 +177,8 @@ public void request(long n) { @Override public void cancel() { + cancelled = true; s.cancel(); - DisposableHelper.dispose(timer); } @@ -199,14 +199,10 @@ public void run() { synchronized (this) { current = buffer; - if (current != null) { - buffer = next; + if (current == null) { + return; } - } - - if (current == null) { - DisposableHelper.dispose(timer); - return; + buffer = next; } fastPathEmitMax(current, false, this); @@ -324,9 +320,10 @@ public void request(long n) { @Override public void cancel() { - clear(); + cancelled = true; s.cancel(); w.dispose(); + clear(); } void clear() { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounceTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounceTimed.java index 3f667dcb09..aea5af63e7 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounceTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounceTimed.java @@ -58,7 +58,7 @@ static final class DebounceTimedSubscriber extends AtomicLong Subscription s; - final SequentialDisposable timer = new SequentialDisposable(); + Disposable timer; volatile long index; @@ -88,17 +88,15 @@ public void onNext(T t) { long idx = index + 1; index = idx; - Disposable d = timer.get(); + Disposable d = timer; if (d != null) { d.dispose(); } DebounceEmitter de = new DebounceEmitter(t, idx, this); - if (timer.replace(de)) { - d = worker.schedule(de, timeout, unit); - - de.setResource(d); - } + timer = de; + d = worker.schedule(de, timeout, unit); + de.setResource(d); } @Override @@ -108,6 +106,10 @@ public void onError(Throwable t) { return; } done = true; + Disposable d = timer; + if (d != null) { + d.dispose(); + } actual.onError(t); worker.dispose(); } @@ -119,17 +121,18 @@ public void onComplete() { } done = true; - Disposable d = timer.get(); - if (!DisposableHelper.isDisposed(d)) { - @SuppressWarnings("unchecked") - DebounceEmitter de = (DebounceEmitter)d; - if (de != null) { - de.emit(); - } - DisposableHelper.dispose(timer); - actual.onComplete(); - worker.dispose(); + Disposable d = timer; + if (d != null) { + d.dispose(); + } + @SuppressWarnings("unchecked") + DebounceEmitter de = (DebounceEmitter)d; + if (de != null) { + de.emit(); } + + actual.onComplete(); + worker.dispose(); } @Override diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableBuffer.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableBuffer.java index a75c0ce07d..9471c0b24a 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableBuffer.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableBuffer.java @@ -127,11 +127,13 @@ public void onError(Throwable t) { @Override public void onComplete() { U b = buffer; - buffer = null; - if (b != null && !b.isEmpty()) { - actual.onNext(b); + if (b != null) { + buffer = null; + if (!b.isEmpty()) { + actual.onNext(b); + } + actual.onComplete(); } - actual.onComplete(); } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundarySupplier.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundarySupplier.java index ce97d264fc..15f0dd0a1c 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundarySupplier.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundarySupplier.java @@ -190,24 +190,20 @@ void next() { BufferBoundaryObserver bs = new BufferBoundaryObserver(this); - Disposable o = other.get(); - - if (!other.compareAndSet(o, bs)) { - return; - } - - U b; - synchronized (this) { - b = buffer; - if (b == null) { - return; + if (DisposableHelper.replace(other, bs)) { + U b; + synchronized (this) { + b = buffer; + if (b == null) { + return; + } + buffer = next; } - buffer = next; - } - boundary.subscribe(bs); + boundary.subscribe(bs); - fastPathEmit(b, false, this); + fastPathEmit(b, false, this); + } } @Override diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableDebounceTimed.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableDebounceTimed.java index 7cdd59e3bb..dcc56288a7 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableDebounceTimed.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableDebounceTimed.java @@ -51,7 +51,7 @@ static final class DebounceTimedObserver Disposable s; - final AtomicReference timer = new AtomicReference(); + Disposable timer; volatile long index; @@ -80,18 +80,15 @@ public void onNext(T t) { long idx = index + 1; index = idx; - Disposable d = timer.get(); + Disposable d = timer; if (d != null) { d.dispose(); } DebounceEmitter de = new DebounceEmitter(t, idx, this); - if (timer.compareAndSet(d, de)) { - d = worker.schedule(de, timeout, unit); - - de.setResource(d); - } - + timer = de; + d = worker.schedule(de, timeout, unit); + de.setResource(d); } @Override @@ -100,6 +97,10 @@ public void onError(Throwable t) { RxJavaPlugins.onError(t); return; } + Disposable d = timer; + if (d != null) { + d.dispose(); + } done = true; actual.onError(t); worker.dispose(); @@ -112,16 +113,17 @@ public void onComplete() { } done = true; - Disposable d = timer.get(); - if (d != DisposableHelper.DISPOSED) { - @SuppressWarnings("unchecked") - DebounceEmitter de = (DebounceEmitter)d; - if (de != null) { - de.run(); - } - actual.onComplete(); - worker.dispose(); + Disposable d = timer; + if (d != null) { + d.dispose(); } + @SuppressWarnings("unchecked") + DebounceEmitter de = (DebounceEmitter)d; + if (de != null) { + de.run(); + } + actual.onComplete(); + worker.dispose(); } @Override diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleTakeUntil.java b/src/main/java/io/reactivex/internal/operators/single/SingleTakeUntil.java index 6d1183e80c..4eaa6f7620 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleTakeUntil.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleTakeUntil.java @@ -85,12 +85,9 @@ public void onSubscribe(Disposable d) { public void onSuccess(T value) { other.dispose(); - Disposable a = get(); + Disposable a = getAndSet(DisposableHelper.DISPOSED); if (a != DisposableHelper.DISPOSED) { - a = getAndSet(DisposableHelper.DISPOSED); - if (a != DisposableHelper.DISPOSED) { - actual.onSuccess(value); - } + actual.onSuccess(value); } } diff --git a/src/main/java/io/reactivex/internal/util/MergerBiFunction.java b/src/main/java/io/reactivex/internal/util/MergerBiFunction.java index e3ad1d1c60..ba13a4b7a6 100644 --- a/src/main/java/io/reactivex/internal/util/MergerBiFunction.java +++ b/src/main/java/io/reactivex/internal/util/MergerBiFunction.java @@ -58,8 +58,7 @@ public List apply(List a, List b) throws Exception { while (at.hasNext()) { both.add(at.next()); } - } else - if (s2 != null) { + } else { both.add(s2); while (bt.hasNext()) { both.add(bt.next()); diff --git a/src/test/java/io/reactivex/exceptions/CompositeExceptionTest.java b/src/test/java/io/reactivex/exceptions/CompositeExceptionTest.java index ec969189a1..4b9f96ba8a 100644 --- a/src/test/java/io/reactivex/exceptions/CompositeExceptionTest.java +++ b/src/test/java/io/reactivex/exceptions/CompositeExceptionTest.java @@ -349,6 +349,22 @@ public void badException() { assertSame(e, new CompositeException(e).getCause().getCause()); assertSame(e, new CompositeException(new RuntimeException(e)).getCause().getCause().getCause()); } + + @Test + public void rootCauseEval() { + final TestException ex0 = new TestException(); + Throwable throwable = new Throwable() { + + private static final long serialVersionUID = 3597694032723032281L; + + @Override + public synchronized Throwable getCause() { + return ex0; + } + }; + CompositeException ex = new CompositeException(throwable); + assertSame(ex, ex.getRootCause(ex)); + } } final class BadException extends Throwable { diff --git a/src/test/java/io/reactivex/internal/functions/ObjectHelperTest.java b/src/test/java/io/reactivex/internal/functions/ObjectHelperTest.java index aefbfea49c..ce5a2b340d 100644 --- a/src/test/java/io/reactivex/internal/functions/ObjectHelperTest.java +++ b/src/test/java/io/reactivex/internal/functions/ObjectHelperTest.java @@ -58,4 +58,11 @@ public void compare() { assertEquals(0, ObjectHelper.compare(0, 0)); assertEquals(1, ObjectHelper.compare(2, 0)); } + + @Test + public void compareLong() { + assertEquals(-1, ObjectHelper.compare(0L, 2L)); + assertEquals(0, ObjectHelper.compare(0L, 0L)); + assertEquals(1, ObjectHelper.compare(2L, 0L)); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java index 3a32a81c38..c8c75031bc 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java @@ -30,7 +30,10 @@ import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.*; import io.reactivex.functions.*; +import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.operators.flowable.FlowableBufferBoundarySupplier.BufferBoundarySupplierSubscriber; +import io.reactivex.internal.operators.flowable.FlowableBufferTimed.*; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.*; @@ -2557,4 +2560,203 @@ protected void subscribeActual(Subscriber s) { RxJavaPlugins.reset(); } } + + @Test + public void bufferBoundarySupplierDisposed() { + TestSubscriber> ts = new TestSubscriber>(); + BufferBoundarySupplierSubscriber, Integer> sub = + new BufferBoundarySupplierSubscriber, Integer>( + ts, Functions.justCallable((List)new ArrayList()), + Functions.justCallable(Flowable.never()) + ); + + BooleanSubscription bs = new BooleanSubscription(); + + sub.onSubscribe(bs); + + assertFalse(sub.isDisposed()); + + sub.dispose(); + + assertTrue(sub.isDisposed()); + + sub.next(); + + assertSame(DisposableHelper.DISPOSED, sub.other.get()); + + sub.cancel(); + sub.cancel(); + + assertTrue(bs.isCancelled()); + } + + @Test + public void bufferBoundarySupplierBufferAlreadyCleared() { + TestSubscriber> ts = new TestSubscriber>(); + BufferBoundarySupplierSubscriber, Integer> sub = + new BufferBoundarySupplierSubscriber, Integer>( + ts, Functions.justCallable((List)new ArrayList()), + Functions.justCallable(Flowable.never()) + ); + + BooleanSubscription bs = new BooleanSubscription(); + + sub.onSubscribe(bs); + + sub.buffer = null; + + sub.next(); + + sub.onNext(1); + + sub.onComplete(); + } + + @Test + public void timedDoubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>>() { + @Override + public Publisher> apply(Flowable f) + throws Exception { + return f.buffer(1, TimeUnit.SECONDS); + } + }); + } + + @Test + public void timedCancelledUpfront() { + TestScheduler sch = new TestScheduler(); + + TestSubscriber> ts = Flowable.never() + .buffer(1, TimeUnit.MILLISECONDS, sch) + .test(1L, true); + + sch.advanceTimeBy(1, TimeUnit.MILLISECONDS); + + ts.assertEmpty(); + } + + @Test + public void timedInternalState() { + TestScheduler sch = new TestScheduler(); + + TestSubscriber> ts = new TestSubscriber>(); + + BufferExactUnboundedSubscriber> sub = new BufferExactUnboundedSubscriber>( + ts, Functions.justCallable((List)new ArrayList()), 1, TimeUnit.SECONDS, sch); + + sub.onSubscribe(new BooleanSubscription()); + + assertFalse(sub.isDisposed()); + + sub.onError(new TestException()); + sub.onNext(1); + sub.onComplete(); + + sub.run(); + + sub.dispose(); + + assertTrue(sub.isDisposed()); + + sub.buffer = new ArrayList(); + sub.enter(); + sub.onComplete(); + } + + @Test + public void timedSkipDoubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>>() { + @Override + public Publisher> apply(Flowable f) + throws Exception { + return f.buffer(2, 1, TimeUnit.SECONDS); + } + }); + } + + @Test + public void timedSizedDoubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>>() { + @Override + public Publisher> apply(Flowable f) + throws Exception { + return f.buffer(2, TimeUnit.SECONDS, 10); + } + }); + } + + @Test + public void timedSkipInternalState() { + TestScheduler sch = new TestScheduler(); + + TestSubscriber> ts = new TestSubscriber>(); + + BufferSkipBoundedSubscriber> sub = new BufferSkipBoundedSubscriber>( + ts, Functions.justCallable((List)new ArrayList()), 1, 1, TimeUnit.SECONDS, sch.createWorker()); + + sub.onSubscribe(new BooleanSubscription()); + + sub.enter(); + sub.onComplete(); + + sub.cancel(); + + sub.run(); + } + + @Test + public void timedSkipCancelWhenSecondBuffer() { + TestScheduler sch = new TestScheduler(); + + final TestSubscriber> ts = new TestSubscriber>(); + + BufferSkipBoundedSubscriber> sub = new BufferSkipBoundedSubscriber>( + ts, new Callable>() { + int calls; + @Override + public List call() throws Exception { + if (++calls == 2) { + ts.cancel(); + } + return new ArrayList(); + } + }, 1, 1, TimeUnit.SECONDS, sch.createWorker()); + + sub.onSubscribe(new BooleanSubscription()); + + sub.run(); + + assertTrue(ts.isCancelled()); + } + + @Test + public void timedSizeBufferAlreadyCleared() { + TestScheduler sch = new TestScheduler(); + + TestSubscriber> ts = new TestSubscriber>(); + + BufferExactBoundedSubscriber> sub = + new BufferExactBoundedSubscriber>( + ts, Functions.justCallable((List)new ArrayList()), + 1, TimeUnit.SECONDS, 1, false, sch.createWorker()) + ; + + BooleanSubscription bs = new BooleanSubscription(); + + sub.onSubscribe(bs); + + sub.producerIndex++; + + sub.run(); + + assertFalse(sub.isDisposed()); + + sub.enter(); + sub.onComplete(); + + assertTrue(sub.isDisposed()); + + sub.run(); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDebounceTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDebounceTest.java index fbd14694f0..c152929e1a 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDebounceTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDebounceTest.java @@ -30,6 +30,7 @@ import io.reactivex.exceptions.*; import io.reactivex.functions.Function; import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.operators.flowable.FlowableDebounceTimed.*; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.*; @@ -487,4 +488,61 @@ protected void subscribeActual(Subscriber subscriber) { public void badRequestReported() { TestHelper.assertBadRequestReported(Flowable.never().debounce(Functions.justFunction(Flowable.never()))); } + + @Test + public void timedDoubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>() { + @Override + public Publisher apply(Flowable f) + throws Exception { + return f.debounce(1, TimeUnit.SECONDS); + } + }); + } + + @Test + public void timedDisposedIgnoredBySource() { + final TestSubscriber ts = new TestSubscriber(); + + new Flowable() { + @Override + protected void subscribeActual( + org.reactivestreams.Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + ts.cancel(); + s.onNext(1); + s.onComplete(); + } + } + .debounce(1, TimeUnit.SECONDS) + .subscribe(ts); + } + + @Test + public void timedBadRequest() { + TestHelper.assertBadRequestReported(Flowable.never().debounce(1, TimeUnit.SECONDS)); + } + + @Test + public void timedLateEmit() { + TestSubscriber ts = new TestSubscriber(); + DebounceTimedSubscriber sub = new DebounceTimedSubscriber( + ts, 1, TimeUnit.SECONDS, new TestScheduler().createWorker()); + + sub.onSubscribe(new BooleanSubscription()); + + DebounceEmitter de = new DebounceEmitter(1, 50, sub); + de.emit(); + de.emit(); + + ts.assertEmpty(); + } + + @Test + public void timedError() { + Flowable.error(new TestException()) + .debounce(1, TimeUnit.SECONDS) + .test() + .assertFailure(TestException.class); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableUsingTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableUsingTest.java index 4638086065..e836796924 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableUsingTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableUsingTest.java @@ -29,6 +29,7 @@ import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.subscribers.TestSubscriber; @@ -637,4 +638,45 @@ public void sourceSupplierReturnsNull() { .assertFailureAndMessage(NullPointerException.class, "The sourceSupplier returned a null Publisher") ; } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Flowable>() { + @Override + public Flowable apply(Flowable o) + throws Exception { + return Flowable.using(Functions.justCallable(1), Functions.justFunction(o), Functions.emptyConsumer()); + } + }); + } + + @Test + public void eagerDisposedOnComplete() { + final TestSubscriber ts = new TestSubscriber(); + + Flowable.using(Functions.justCallable(1), Functions.justFunction(new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + ts.cancel(); + observer.onComplete(); + } + }), Functions.emptyConsumer(), true) + .subscribe(ts); + } + + @Test + public void eagerDisposedOnError() { + final TestSubscriber ts = new TestSubscriber(); + + Flowable.using(Functions.justCallable(1), Functions.justFunction(new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + ts.cancel(); + observer.onError(new TestException()); + } + }), Functions.emptyConsumer(), true) + .subscribe(ts); + } } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapBiSelectorTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapBiSelectorTest.java index a618ddc94a..f817bfa839 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapBiSelectorTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapBiSelectorTest.java @@ -20,6 +20,7 @@ import io.reactivex.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; +import io.reactivex.observers.TestObserver; import io.reactivex.processors.PublishProcessor; public class MaybeFlatMapBiSelectorTest { @@ -210,4 +211,25 @@ public Object apply(Integer a, Integer b) throws Exception { .test() .assertFailure(NullPointerException.class); } + + @Test + public void mapperCancels() { + final TestObserver to = new TestObserver(); + + Maybe.just(1) + .flatMap(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + to.cancel(); + return Maybe.just(2); + } + }, new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + throw new IllegalStateException(); + } + }) + .subscribeWith(to) + .assertEmpty(); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java index 1e7407c121..3a02b31423 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java @@ -31,7 +31,11 @@ import io.reactivex.disposables.*; import io.reactivex.exceptions.*; import io.reactivex.functions.*; +import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.operators.observable.ObservableBuffer.BufferExactObserver; +import io.reactivex.internal.operators.observable.ObservableBufferBoundarySupplier.BufferBoundarySupplierObserver; +import io.reactivex.internal.operators.observable.ObservableBufferTimed.*; import io.reactivex.observers.*; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.*; @@ -1884,4 +1888,242 @@ protected void subscribeActual(Observer s) { RxJavaPlugins.reset(); } } + + @Test + public void bufferBoundarySupplierDisposed() { + TestObserver> to = new TestObserver>(); + BufferBoundarySupplierObserver, Integer> sub = + new BufferBoundarySupplierObserver, Integer>( + to, Functions.justCallable((List)new ArrayList()), + Functions.justCallable(Observable.never()) + ); + + Disposable bs = Disposables.empty(); + + sub.onSubscribe(bs); + + assertFalse(sub.isDisposed()); + + sub.dispose(); + + assertTrue(sub.isDisposed()); + + sub.next(); + + assertSame(DisposableHelper.DISPOSED, sub.other.get()); + + sub.dispose(); + sub.dispose(); + + assertTrue(bs.isDisposed()); + } + + @Test + public void bufferBoundarySupplierBufferAlreadyCleared() { + TestObserver> to = new TestObserver>(); + BufferBoundarySupplierObserver, Integer> sub = + new BufferBoundarySupplierObserver, Integer>( + to, Functions.justCallable((List)new ArrayList()), + Functions.justCallable(Observable.never()) + ); + + Disposable bs = Disposables.empty(); + + sub.onSubscribe(bs); + + sub.buffer = null; + + sub.next(); + + sub.onNext(1); + + sub.onComplete(); + } + + @Test + public void timedDoubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable(new Function, Observable>>() { + @Override + public Observable> apply(Observable f) + throws Exception { + return f.buffer(1, TimeUnit.SECONDS); + } + }); + } + + @Test + public void timedCancelledUpfront() { + TestScheduler sch = new TestScheduler(); + + TestObserver> to = Observable.never() + .buffer(1, TimeUnit.MILLISECONDS, sch) + .test(true); + + sch.advanceTimeBy(1, TimeUnit.MILLISECONDS); + + to.assertEmpty(); + } + + @Test + public void timedInternalState() { + TestScheduler sch = new TestScheduler(); + + TestObserver> to = new TestObserver>(); + + BufferExactUnboundedObserver> sub = new BufferExactUnboundedObserver>( + to, Functions.justCallable((List)new ArrayList()), 1, TimeUnit.SECONDS, sch); + + sub.onSubscribe(Disposables.empty()); + + assertFalse(sub.isDisposed()); + + sub.onError(new TestException()); + sub.onNext(1); + sub.onComplete(); + + sub.run(); + + sub.dispose(); + + assertTrue(sub.isDisposed()); + + sub.buffer = new ArrayList(); + sub.enter(); + sub.onComplete(); + } + + @Test + public void timedSkipDoubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable(new Function, Observable>>() { + @Override + public Observable> apply(Observable f) + throws Exception { + return f.buffer(2, 1, TimeUnit.SECONDS); + } + }); + } + + @Test + public void timedSizedDoubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable(new Function, Observable>>() { + @Override + public Observable> apply(Observable f) + throws Exception { + return f.buffer(2, TimeUnit.SECONDS, 10); + } + }); + } + + @Test + public void timedSkipInternalState() { + TestScheduler sch = new TestScheduler(); + + TestObserver> to = new TestObserver>(); + + BufferSkipBoundedObserver> sub = new BufferSkipBoundedObserver>( + to, Functions.justCallable((List)new ArrayList()), 1, 1, TimeUnit.SECONDS, sch.createWorker()); + + sub.onSubscribe(Disposables.empty()); + + sub.enter(); + sub.onComplete(); + + sub.dispose(); + + sub.run(); + } + + @Test + public void timedSkipCancelWhenSecondBuffer() { + TestScheduler sch = new TestScheduler(); + + final TestObserver> to = new TestObserver>(); + + BufferSkipBoundedObserver> sub = new BufferSkipBoundedObserver>( + to, new Callable>() { + int calls; + @Override + public List call() throws Exception { + if (++calls == 2) { + to.cancel(); + } + return new ArrayList(); + } + }, 1, 1, TimeUnit.SECONDS, sch.createWorker()); + + sub.onSubscribe(Disposables.empty()); + + sub.run(); + + assertTrue(to.isCancelled()); + } + + @Test + public void timedSizeBufferAlreadyCleared() { + TestScheduler sch = new TestScheduler(); + + TestObserver> to = new TestObserver>(); + + BufferExactBoundedObserver> sub = + new BufferExactBoundedObserver>( + to, Functions.justCallable((List)new ArrayList()), + 1, TimeUnit.SECONDS, 1, false, sch.createWorker()) + ; + + Disposable bs = Disposables.empty(); + + sub.onSubscribe(bs); + + sub.producerIndex++; + + sub.run(); + + assertFalse(sub.isDisposed()); + + sub.enter(); + sub.onComplete(); + + sub.dispose(); + + assertTrue(sub.isDisposed()); + + sub.run(); + + sub.onNext(1); + } + + @Test + public void bufferExactDoubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable(new Function, ObservableSource>>() { + @Override + public ObservableSource> apply(Observable o) + throws Exception { + return o.buffer(1); + } + }); + } + + @Test + public void bufferExactState() { + TestObserver> to = new TestObserver>(); + + BufferExactObserver> sub = new BufferExactObserver>( + to, 1, Functions.justCallable((List)new ArrayList()) + ); + + sub.onComplete(); + sub.onNext(1); + sub.onComplete(); + } + + @Test + public void bufferSkipDoubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable(new Function, ObservableSource>>() { + @Override + public ObservableSource> apply(Observable o) + throws Exception { + return o.buffer(1, 2); + } + }); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableDebounceTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableDebounceTest.java index 95cd9a1163..26ce6caa9c 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableDebounceTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableDebounceTest.java @@ -24,12 +24,14 @@ import org.junit.*; import org.mockito.InOrder; +import org.reactivestreams.Publisher; import io.reactivex.*; import io.reactivex.disposables.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.Function; import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.operators.observable.ObservableDebounceTimed.*; import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.TestScheduler; @@ -450,4 +452,56 @@ protected void subscribeActual(Observer observer) { to .assertResult(2); } + + @Test + public void timedDoubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>() { + @Override + public Publisher apply(Flowable f) + throws Exception { + return f.debounce(1, TimeUnit.SECONDS); + } + }); + } + + @Test + public void timedDisposedIgnoredBySource() { + final TestObserver to = new TestObserver(); + + new Observable() { + @Override + protected void subscribeActual( + Observer s) { + s.onSubscribe(Disposables.empty()); + to.cancel(); + s.onNext(1); + s.onComplete(); + } + } + .debounce(1, TimeUnit.SECONDS) + .subscribe(to); + } + + @Test + public void timedLateEmit() { + TestObserver to = new TestObserver(); + DebounceTimedObserver sub = new DebounceTimedObserver( + to, 1, TimeUnit.SECONDS, new TestScheduler().createWorker()); + + sub.onSubscribe(Disposables.empty()); + + DebounceEmitter de = new DebounceEmitter(1, 50, sub); + de.run(); + de.run(); + + to.assertEmpty(); + } + + @Test + public void timedError() { + Observable.error(new TestException()) + .debounce(1, TimeUnit.SECONDS) + .test() + .assertFailure(TestException.class); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableUsingTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableUsingTest.java index 2bf61565d9..4276e3bc95 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableUsingTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableUsingTest.java @@ -567,4 +567,45 @@ public void sourceSupplierReturnsNull() { .assertFailureAndMessage(NullPointerException.class, "The sourceSupplier returned a null ObservableSource") ; } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable o) + throws Exception { + return Observable.using(Functions.justCallable(1), Functions.justFunction(o), Functions.emptyConsumer()); + } + }); + } + + @Test + public void eagerDisposedOnComplete() { + final TestObserver to = new TestObserver(); + + Observable.using(Functions.justCallable(1), Functions.justFunction(new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + to.cancel(); + observer.onComplete(); + } + }), Functions.emptyConsumer(), true) + .subscribe(to); + } + + @Test + public void eagerDisposedOnError() { + final TestObserver to = new TestObserver(); + + Observable.using(Functions.justCallable(1), Functions.justFunction(new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + to.cancel(); + observer.onError(new TestException()); + } + }), Functions.emptyConsumer(), true) + .subscribe(to); + } } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapTest.java index 4dfb835102..c2b1ecea91 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapTest.java @@ -223,4 +223,21 @@ public SingleSource apply(Integer v) throws Exception { .test() .assertFailure(TestException.class); } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeSingle(new Function, SingleSource>() { + @Override + public SingleSource apply(Single s) + throws Exception { + return s.flatMap(new Function>() { + @Override + public SingleSource apply(Object v) + throws Exception { + return Single.just(v); + } + }); + } + }); + } } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleTakeUntilTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleTakeUntilTest.java index 07dd073f35..42633a9fcc 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleTakeUntilTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleTakeUntilTest.java @@ -19,9 +19,11 @@ import java.util.concurrent.CancellationException; import org.junit.Test; +import org.reactivestreams.Subscriber; import io.reactivex.*; import io.reactivex.exceptions.TestException; +import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; @@ -274,4 +276,19 @@ public void otherSignalsAndCompletes() { RxJavaPlugins.reset(); } } + + @Test + public void flowableCancelDelayed() { + Single.never() + .takeUntil(new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onNext(2); + } + }) + .test() + .assertFailure(CancellationException.class); + } } diff --git a/src/test/java/io/reactivex/internal/util/MergerBiFunctionTest.java b/src/test/java/io/reactivex/internal/util/MergerBiFunctionTest.java new file mode 100644 index 0000000000..db84d4cf1a --- /dev/null +++ b/src/test/java/io/reactivex/internal/util/MergerBiFunctionTest.java @@ -0,0 +1,88 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.util; + +import static org.junit.Assert.assertEquals; + +import java.util.*; + +import org.junit.Test; + +public class MergerBiFunctionTest { + + @Test + public void firstEmpty() throws Exception { + MergerBiFunction merger = new MergerBiFunction(new Comparator() { + @Override + public int compare(Integer o1, Integer o2) { + return o1.compareTo(o2); + } + }); + List list = merger.apply(Collections.emptyList(), Arrays.asList(3, 5)); + + assertEquals(Arrays.asList(3, 5), list); + } + + @Test + public void bothEmpty() throws Exception { + MergerBiFunction merger = new MergerBiFunction(new Comparator() { + @Override + public int compare(Integer o1, Integer o2) { + return o1.compareTo(o2); + } + }); + List list = merger.apply(Collections.emptyList(), Collections.emptyList()); + + assertEquals(Collections.emptyList(), list); + } + + @Test + public void secondEmpty() throws Exception { + MergerBiFunction merger = new MergerBiFunction(new Comparator() { + @Override + public int compare(Integer o1, Integer o2) { + return o1.compareTo(o2); + } + }); + List list = merger.apply(Arrays.asList(2, 4), Collections.emptyList()); + + assertEquals(Arrays.asList(2, 4), list); + } + + @Test + public void sameSize() throws Exception { + MergerBiFunction merger = new MergerBiFunction(new Comparator() { + @Override + public int compare(Integer o1, Integer o2) { + return o1.compareTo(o2); + } + }); + List list = merger.apply(Arrays.asList(2, 4), Arrays.asList(3, 5)); + + assertEquals(Arrays.asList(2, 3, 4, 5), list); + } + + @Test + public void sameSizeReverse() throws Exception { + MergerBiFunction merger = new MergerBiFunction(new Comparator() { + @Override + public int compare(Integer o1, Integer o2) { + return o1.compareTo(o2); + } + }); + List list = merger.apply(Arrays.asList(3, 5), Arrays.asList(2, 4)); + + assertEquals(Arrays.asList(2, 3, 4, 5), list); + } +} diff --git a/src/test/java/io/reactivex/processors/ReplayProcessorTest.java b/src/test/java/io/reactivex/processors/ReplayProcessorTest.java index a80d1d0b83..20e6b9a8df 100644 --- a/src/test/java/io/reactivex/processors/ReplayProcessorTest.java +++ b/src/test/java/io/reactivex/processors/ReplayProcessorTest.java @@ -1678,4 +1678,9 @@ public void noHeadRetentionTime() { assertSame(o, buf.head); } + + @Test + public void invalidRequest() { + TestHelper.assertBadRequestReported(ReplayProcessor.create()); + } } diff --git a/src/test/java/io/reactivex/subscribers/TestSubscriberTest.java b/src/test/java/io/reactivex/subscribers/TestSubscriberTest.java index ba04b04762..5e4082c399 100644 --- a/src/test/java/io/reactivex/subscribers/TestSubscriberTest.java +++ b/src/test/java/io/reactivex/subscribers/TestSubscriberTest.java @@ -2059,4 +2059,10 @@ public void assertValuesOnlyThrowsWhenErrored() { // expected } } + + @Test(timeout = 1000) + public void awaitCount0() { + TestSubscriber ts = TestSubscriber.create(); + ts.awaitCount(0, TestWaitStrategy.SLEEP_1MS, 0); + } }