diff --git a/src/main/java/io/reactivex/internal/disposables/DisposableHelper.java b/src/main/java/io/reactivex/internal/disposables/DisposableHelper.java index fa870957c0..564129a3d7 100644 --- a/src/main/java/io/reactivex/internal/disposables/DisposableHelper.java +++ b/src/main/java/io/reactivex/internal/disposables/DisposableHelper.java @@ -29,10 +29,21 @@ public enum DisposableHelper implements Disposable { DISPOSED ; + /** + * Checks if the given Disposable is the common {@link #DISPOSED} enum value. + * @param d the disposable to check + * @return true if d is {@link #DISPOSED} + */ public static boolean isDisposed(Disposable d) { return d == DISPOSED; } + /** + * Atomically sets the field and disposes the old contents. + * @param field the target field + * @param d the new Disposable to set + * @return true if successful, false if the field contains the {@link #DISPOSED} instance. + */ public static boolean set(AtomicReference field, Disposable d) { for (;;) { Disposable current = field.get(); @@ -144,6 +155,23 @@ public static void reportDisposableSet() { RxJavaPlugins.onError(new IllegalStateException("Disposable already set!")); } + /** + * Atomically tries to set the given Disposable on the field if it is null or disposes it if + * the field contains {@link #DISPOSED}. + * @param field the target field + * @param d the disposable to set + * @return true if successful, false otherwise + */ + public static boolean trySet(AtomicReference field, Disposable d) { + if (!field.compareAndSet(null, d)) { + if (field.get() == DISPOSED) { + d.dispose(); + } + return false; + } + return true; + } + @Override public void dispose() { // deliberately no-op diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimer.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimer.java index e9cbc34922..cb7ac3e39f 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimer.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimer.java @@ -36,7 +36,7 @@ public FlowableTimer(long delay, TimeUnit unit, Scheduler scheduler) { @Override public void subscribeActual(Subscriber s) { - IntervalOnceSubscriber ios = new IntervalOnceSubscriber(s); + TimerSubscriber ios = new TimerSubscriber(s); s.onSubscribe(ios); Disposable d = scheduler.scheduleDirect(ios, delay, unit); @@ -44,7 +44,7 @@ public void subscribeActual(Subscriber s) { ios.setResource(d); } - static final class IntervalOnceSubscriber extends AtomicReference + static final class TimerSubscriber extends AtomicReference implements Subscription, Runnable { private static final long serialVersionUID = -2809475196591179431L; @@ -53,7 +53,7 @@ static final class IntervalOnceSubscriber extends AtomicReference volatile boolean requested; - IntervalOnceSubscriber(Subscriber actual) { + TimerSubscriber(Subscriber actual) { this.actual = actual; } @@ -74,16 +74,17 @@ public void run() { if (get() != DisposableHelper.DISPOSED) { if (requested) { actual.onNext(0L); + lazySet(EmptyDisposable.INSTANCE); actual.onComplete(); } else { + lazySet(EmptyDisposable.INSTANCE); actual.onError(new MissingBackpressureException("Can't deliver value due to lack of requests")); } - lazySet(EmptyDisposable.INSTANCE); } } public void setResource(Disposable d) { - DisposableHelper.setOnce(this, d); + DisposableHelper.trySet(this, d); } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableTimer.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableTimer.java index 3c15072190..242e77be8b 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableTimer.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableTimer.java @@ -32,7 +32,7 @@ public ObservableTimer(long delay, TimeUnit unit, Scheduler scheduler) { @Override public void subscribeActual(Observer s) { - IntervalOnceObserver ios = new IntervalOnceObserver(s); + TimerObserver ios = new TimerObserver(s); s.onSubscribe(ios); Disposable d = scheduler.scheduleDirect(ios, delay, unit); @@ -40,14 +40,14 @@ public void subscribeActual(Observer s) { ios.setResource(d); } - static final class IntervalOnceObserver extends AtomicReference + static final class TimerObserver extends AtomicReference implements Disposable, Runnable { private static final long serialVersionUID = -2809475196591179431L; final Observer actual; - IntervalOnceObserver(Observer actual) { + TimerObserver(Observer actual) { this.actual = actual; } @@ -65,13 +65,13 @@ public boolean isDisposed() { public void run() { if (!isDisposed()) { actual.onNext(0L); - actual.onComplete(); lazySet(EmptyDisposable.INSTANCE); + actual.onComplete(); } } public void setResource(Disposable d) { - DisposableHelper.setOnce(this, d); + DisposableHelper.trySet(this, d); } } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimerTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimerTest.java index a28744c19c..c289b01293 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimerTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimerTest.java @@ -13,9 +13,11 @@ package io.reactivex.internal.operators.flowable; +import static org.junit.Assert.*; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; +import java.util.List; import java.util.concurrent.TimeUnit; import org.junit.*; @@ -25,6 +27,7 @@ import io.reactivex.*; import io.reactivex.exceptions.*; import io.reactivex.flowables.ConnectableFlowable; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.TestScheduler; import io.reactivex.subscribers.*; @@ -324,4 +327,18 @@ public void run() { TestHelper.race(r1, r2); } } + + @Test + public void timerDelayZero() { + List errors = TestHelper.trackPluginErrors(); + try { + for (int i = 0; i < 1000; i++) { + Flowable.timer(0, TimeUnit.MILLISECONDS).blockingFirst(); + } + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableTimerTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableTimerTest.java index f307057cc2..5080d89f58 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableTimerTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableTimerTest.java @@ -13,8 +13,10 @@ package io.reactivex.internal.operators.observable; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.*; +import java.util.List; import java.util.concurrent.TimeUnit; import org.junit.*; @@ -24,6 +26,7 @@ import io.reactivex.exceptions.TestException; import io.reactivex.observables.ConnectableObservable; import io.reactivex.observers.*; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.TestScheduler; public class ObservableTimerTest { @@ -286,4 +289,18 @@ public void onComplete() { public void disposed() { TestHelper.checkDisposed(Observable.timer(1, TimeUnit.DAYS)); } + + @Test + public void timerDelayZero() { + List errors = TestHelper.trackPluginErrors(); + try { + for (int i = 0; i < 1000; i++) { + Observable.timer(0, TimeUnit.MILLISECONDS).blockingFirst(); + } + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } } \ No newline at end of file