Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: fix timer() ISE due to bad resource mgmt #4927

Merged
merged 1 commit into from
Dec 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,21 @@ public enum DisposableHelper implements Disposable {
DISPOSED
;

/**
* Checks if the given Disposable is the common {@link #DISPOSED} enum value.
* @param d the disposable to check
* @return true if d is {@link #DISPOSED}
*/
public static boolean isDisposed(Disposable d) {
return d == DISPOSED;
}

/**
* Atomically sets the field and disposes the old contents.
* @param field the target field
* @param d the new Disposable to set
* @return true if successful, false if the field contains the {@link #DISPOSED} instance.
*/
public static boolean set(AtomicReference<Disposable> field, Disposable d) {
for (;;) {
Disposable current = field.get();
Expand Down Expand Up @@ -144,6 +155,23 @@ public static void reportDisposableSet() {
RxJavaPlugins.onError(new IllegalStateException("Disposable already set!"));
}

/**
* Atomically tries to set the given Disposable on the field if it is null or disposes it if
* the field contains {@link #DISPOSED}.
* @param field the target field
* @param d the disposable to set
* @return true if successful, false otherwise
*/
public static boolean trySet(AtomicReference<Disposable> field, Disposable d) {
if (!field.compareAndSet(null, d)) {
if (field.get() == DISPOSED) {
d.dispose();
}
return false;
}
return true;
}

@Override
public void dispose() {
// deliberately no-op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ public FlowableTimer(long delay, TimeUnit unit, Scheduler scheduler) {

@Override
public void subscribeActual(Subscriber<? super Long> s) {
IntervalOnceSubscriber ios = new IntervalOnceSubscriber(s);
TimerSubscriber ios = new TimerSubscriber(s);
s.onSubscribe(ios);

Disposable d = scheduler.scheduleDirect(ios, delay, unit);

ios.setResource(d);
}

static final class IntervalOnceSubscriber extends AtomicReference<Disposable>
static final class TimerSubscriber extends AtomicReference<Disposable>
implements Subscription, Runnable {

private static final long serialVersionUID = -2809475196591179431L;
Expand All @@ -53,7 +53,7 @@ static final class IntervalOnceSubscriber extends AtomicReference<Disposable>

volatile boolean requested;

IntervalOnceSubscriber(Subscriber<? super Long> actual) {
TimerSubscriber(Subscriber<? super Long> actual) {
this.actual = actual;
}

Expand All @@ -74,16 +74,17 @@ public void run() {
if (get() != DisposableHelper.DISPOSED) {
if (requested) {
actual.onNext(0L);
lazySet(EmptyDisposable.INSTANCE);
actual.onComplete();
} else {
lazySet(EmptyDisposable.INSTANCE);
actual.onError(new MissingBackpressureException("Can't deliver value due to lack of requests"));
}
lazySet(EmptyDisposable.INSTANCE);
}
}

public void setResource(Disposable d) {
DisposableHelper.setOnce(this, d);
DisposableHelper.trySet(this, d);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,22 @@ public ObservableTimer(long delay, TimeUnit unit, Scheduler scheduler) {

@Override
public void subscribeActual(Observer<? super Long> s) {
IntervalOnceObserver ios = new IntervalOnceObserver(s);
TimerObserver ios = new TimerObserver(s);
s.onSubscribe(ios);

Disposable d = scheduler.scheduleDirect(ios, delay, unit);

ios.setResource(d);
}

static final class IntervalOnceObserver extends AtomicReference<Disposable>
static final class TimerObserver extends AtomicReference<Disposable>
implements Disposable, Runnable {

private static final long serialVersionUID = -2809475196591179431L;

final Observer<? super Long> actual;

IntervalOnceObserver(Observer<? super Long> actual) {
TimerObserver(Observer<? super Long> actual) {
this.actual = actual;
}

Expand All @@ -65,13 +65,13 @@ public boolean isDisposed() {
public void run() {
if (!isDisposed()) {
actual.onNext(0L);
actual.onComplete();
lazySet(EmptyDisposable.INSTANCE);
actual.onComplete();
}
}

public void setResource(Disposable d) {
DisposableHelper.setOnce(this, d);
DisposableHelper.trySet(this, d);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@

package io.reactivex.internal.operators.flowable;

import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;

import java.util.List;
import java.util.concurrent.TimeUnit;

import org.junit.*;
Expand All @@ -25,6 +27,7 @@
import io.reactivex.*;
import io.reactivex.exceptions.*;
import io.reactivex.flowables.ConnectableFlowable;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.TestScheduler;
import io.reactivex.subscribers.*;

Expand Down Expand Up @@ -324,4 +327,18 @@ public void run() {
TestHelper.race(r1, r2);
}
}

@Test
public void timerDelayZero() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
for (int i = 0; i < 1000; i++) {
Flowable.timer(0, TimeUnit.MILLISECONDS).blockingFirst();
}

assertTrue(errors.toString(), errors.isEmpty());
} finally {
RxJavaPlugins.reset();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@

package io.reactivex.internal.operators.observable;

import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.*;

import java.util.List;
import java.util.concurrent.TimeUnit;

import org.junit.*;
Expand All @@ -24,6 +26,7 @@
import io.reactivex.exceptions.TestException;
import io.reactivex.observables.ConnectableObservable;
import io.reactivex.observers.*;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.TestScheduler;

public class ObservableTimerTest {
Expand Down Expand Up @@ -286,4 +289,18 @@ public void onComplete() {
public void disposed() {
TestHelper.checkDisposed(Observable.timer(1, TimeUnit.DAYS));
}

@Test
public void timerDelayZero() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
for (int i = 0; i < 1000; i++) {
Observable.timer(0, TimeUnit.MILLISECONDS).blockingFirst();
}

assertTrue(errors.toString(), errors.isEmpty());
} finally {
RxJavaPlugins.reset();
}
}
}