Skip to content

Commit

Permalink
2.x: fix Single.timeout unnecessary dispose calls (#5586)
Browse files Browse the repository at this point in the history
* 2.x: fix Single.timeout unnesessary dispose calls

* Route to RxJavaPlugins.onError
  • Loading branch information
akarnokd authored Sep 10, 2017
1 parent c43229b commit 9b037c7
Show file tree
Hide file tree
Showing 2 changed files with 223 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
package io.reactivex.internal.operators.single;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.*;
import io.reactivex.disposables.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.plugins.RxJavaPlugins;

public final class SingleTimeout<T> extends Single<T> {

Expand All @@ -43,97 +45,118 @@ public SingleTimeout(SingleSource<T> source, long timeout, TimeUnit unit, Schedu
@Override
protected void subscribeActual(final SingleObserver<? super T> s) {

final CompositeDisposable set = new CompositeDisposable();
s.onSubscribe(set);
TimeoutMainObserver<T> parent = new TimeoutMainObserver<T>(s, other);
s.onSubscribe(parent);

final AtomicBoolean once = new AtomicBoolean();
DisposableHelper.replace(parent.task, scheduler.scheduleDirect(parent, timeout, unit));

Disposable timer = scheduler.scheduleDirect(new TimeoutDispose(once, set, s), timeout, unit);
source.subscribe(parent);
}

set.add(timer);
static final class TimeoutMainObserver<T> extends AtomicReference<Disposable>
implements SingleObserver<T>, Runnable, Disposable {

source.subscribe(new TimeoutObserver(once, set, s));
private static final long serialVersionUID = 37497744973048446L;

}
final SingleObserver<? super T> actual;

final class TimeoutDispose implements Runnable {
private final AtomicBoolean once;
final CompositeDisposable set;
final SingleObserver<? super T> s;
final AtomicReference<Disposable> task;

TimeoutDispose(AtomicBoolean once, CompositeDisposable set, SingleObserver<? super T> s) {
this.once = once;
this.set = set;
this.s = s;
}
final TimeoutFallbackObserver<T> fallback;

@Override
public void run() {
if (once.compareAndSet(false, true)) {
if (other != null) {
set.clear();
other.subscribe(new TimeoutObserver());
} else {
set.dispose();
s.onError(new TimeoutException());
}
}
}
SingleSource<? extends T> other;

final class TimeoutObserver implements SingleObserver<T> {
static final class TimeoutFallbackObserver<T> extends AtomicReference<Disposable>
implements SingleObserver<T> {

@Override
public void onError(Throwable e) {
set.dispose();
s.onError(e);
private static final long serialVersionUID = 2071387740092105509L;
final SingleObserver<? super T> actual;

TimeoutFallbackObserver(SingleObserver<? super T> actual) {
this.actual = actual;
}

@Override
public void onSubscribe(Disposable d) {
set.add(d);
DisposableHelper.setOnce(this, d);
}

@Override
public void onSuccess(T value) {
set.dispose();
s.onSuccess(value);
public void onSuccess(T t) {
actual.onSuccess(t);
}

@Override
public void onError(Throwable e) {
actual.onError(e);
}
}
}

final class TimeoutObserver implements SingleObserver<T> {
TimeoutMainObserver(SingleObserver<? super T> actual, SingleSource<? extends T> other) {
this.actual = actual;
this.other = other;
this.task = new AtomicReference<Disposable>();
if (other != null) {
this.fallback = new TimeoutFallbackObserver<T>(actual);
} else {
this.fallback = null;
}
}

private final AtomicBoolean once;
private final CompositeDisposable set;
private final SingleObserver<? super T> s;
@Override
public void run() {
Disposable d = get();
if (d != DisposableHelper.DISPOSED && compareAndSet(d, DisposableHelper.DISPOSED)) {
if (d != null) {
d.dispose();
}
SingleSource<? extends T> other = this.other;
if (other == null) {
actual.onError(new TimeoutException());
} else {
this.other = null;
other.subscribe(fallback);
}
}
}

TimeoutObserver(AtomicBoolean once, CompositeDisposable set, SingleObserver<? super T> s) {
this.once = once;
this.set = set;
this.s = s;
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this, d);
}

@Override
public void onError(Throwable e) {
if (once.compareAndSet(false, true)) {
set.dispose();
s.onError(e);
public void onSuccess(T t) {
Disposable d = get();
if (d != DisposableHelper.DISPOSED && compareAndSet(d, DisposableHelper.DISPOSED)) {
DisposableHelper.dispose(task);
actual.onSuccess(t);
}
}

@Override
public void onSubscribe(Disposable d) {
set.add(d);
public void onError(Throwable e) {
Disposable d = get();
if (d != DisposableHelper.DISPOSED && compareAndSet(d, DisposableHelper.DISPOSED)) {
DisposableHelper.dispose(task);
actual.onError(e);
} else {
RxJavaPlugins.onError(e);
}
}

@Override
public void onSuccess(T value) {
if (once.compareAndSet(false, true)) {
set.dispose();
s.onSuccess(value);
public void dispose() {
DisposableHelper.dispose(this);
DisposableHelper.dispose(task);
if (fallback != null) {
DisposableHelper.dispose(fallback);
}
}

@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@

import static org.junit.Assert.*;

import java.util.List;
import java.util.concurrent.TimeUnit;

import org.junit.Test;

import io.reactivex.Single;
import io.reactivex.*;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.Action;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.TestScheduler;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.*;

public class SingleTimeoutTest {

Expand Down Expand Up @@ -69,4 +72,141 @@ public void mainError() {
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TestException.class);
}

@Test
public void disposeWhenFallback() {
TestScheduler sch = new TestScheduler();

SingleSubject<Integer> subj = SingleSubject.create();

subj.timeout(1, TimeUnit.SECONDS, sch, Single.just(1))
.test(true)
.assertEmpty();

assertFalse(subj.hasObservers());
}

@Test
public void isDisposed() {
TestHelper.checkDisposed(SingleSubject.create().timeout(1, TimeUnit.DAYS));
}

@Test
public void fallbackDispose() {
TestScheduler sch = new TestScheduler();

SingleSubject<Integer> subj = SingleSubject.create();

SingleSubject<Integer> fallback = SingleSubject.create();

TestObserver<Integer> to = subj.timeout(1, TimeUnit.SECONDS, sch, fallback)
.test();

assertFalse(fallback.hasObservers());

sch.advanceTimeBy(1, TimeUnit.SECONDS);

assertFalse(subj.hasObservers());
assertTrue(fallback.hasObservers());

to.cancel();

assertFalse(fallback.hasObservers());
}

@Test
public void normalSuccessDoesntDisposeMain() {
final int[] calls = { 0 };

Single.just(1)
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
calls[0]++;
}
})
.timeout(1, TimeUnit.DAYS)
.test()
.assertResult(1);

assertEquals(0, calls[0]);
}

@Test
public void successTimeoutRace() {
for (int i = 0; i < 1000; i++) {
final SingleSubject<Integer> subj = SingleSubject.create();
SingleSubject<Integer> fallback = SingleSubject.create();

final TestScheduler sch = new TestScheduler();

TestObserver<Integer> to = subj.timeout(1, TimeUnit.MILLISECONDS, sch, fallback).test();

Runnable r1 = new Runnable() {
@Override
public void run() {
subj.onSuccess(1);
}
};

Runnable r2 = new Runnable() {
@Override
public void run() {
sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);
}
};

TestHelper.race(r1, r2);

if (!fallback.hasObservers()) {
to.assertResult(1);
} else {
to.assertEmpty();
}
}
}

@Test
public void errorTimeoutRace() {
final TestException ex = new TestException();
List<Throwable> errors = TestHelper.trackPluginErrors();
try {

for (int i = 0; i < 1000; i++) {
final SingleSubject<Integer> subj = SingleSubject.create();
SingleSubject<Integer> fallback = SingleSubject.create();

final TestScheduler sch = new TestScheduler();

TestObserver<Integer> to = subj.timeout(1, TimeUnit.MILLISECONDS, sch, fallback).test();

Runnable r1 = new Runnable() {
@Override
public void run() {
subj.onError(ex);
}
};

Runnable r2 = new Runnable() {
@Override
public void run() {
sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);
}
};

TestHelper.race(r1, r2);

if (!fallback.hasObservers()) {
to.assertFailure(TestException.class);
} else {
to.assertEmpty();
}
if (!errors.isEmpty()) {
TestHelper.assertUndeliverable(errors, 0, TestException.class);
}
}
} finally {
RxJavaPlugins.reset();
}
}
}

0 comments on commit 9b037c7

Please sign in to comment.