Skip to content

Commit

Permalink
2.x: Don't dispose the winner of {Single|Maybe|Completable}.amb() (#…
Browse files Browse the repository at this point in the history
…6375)

* 2.x: Don't dispose the winner of {Single|Maybe|Completable}.amb()

* Add null-source test to MaybeAmbTest
  • Loading branch information
akarnokd authored Jan 17, 2019
1 parent 5106a20 commit d40f923
Show file tree
Hide file tree
Showing 8 changed files with 462 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ public void subscribeActual(final CompletableObserver observer) {

final AtomicBoolean once = new AtomicBoolean();

CompletableObserver inner = new Amb(once, set, observer);

for (int i = 0; i < count; i++) {
CompletableSource c = sources[i];
if (set.isDisposed()) {
Expand All @@ -82,7 +80,7 @@ public void subscribeActual(final CompletableObserver observer) {
}

// no need to have separate subscribers because inner is stateless
c.subscribe(inner);
c.subscribe(new Amb(once, set, observer));
}

if (count == 0) {
Expand All @@ -91,9 +89,14 @@ public void subscribeActual(final CompletableObserver observer) {
}

static final class Amb implements CompletableObserver {
private final AtomicBoolean once;
private final CompositeDisposable set;
private final CompletableObserver downstream;

final AtomicBoolean once;

final CompositeDisposable set;

final CompletableObserver downstream;

Disposable upstream;

Amb(AtomicBoolean once, CompositeDisposable set, CompletableObserver observer) {
this.once = once;
Expand All @@ -104,6 +107,7 @@ static final class Amb implements CompletableObserver {
@Override
public void onComplete() {
if (once.compareAndSet(false, true)) {
set.delete(upstream);
set.dispose();
downstream.onComplete();
}
Expand All @@ -112,6 +116,7 @@ public void onComplete() {
@Override
public void onError(Throwable e) {
if (once.compareAndSet(false, true)) {
set.delete(upstream);
set.dispose();
downstream.onError(e);
} else {
Expand All @@ -121,8 +126,8 @@ public void onError(Throwable e) {

@Override
public void onSubscribe(Disposable d) {
upstream = d;
set.add(d);
}

}
}
56 changes: 28 additions & 28 deletions src/main/java/io/reactivex/internal/operators/maybe/MaybeAmb.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,64 +64,63 @@ protected void subscribeActual(MaybeObserver<? super T> observer) {
count = sources.length;
}

AmbMaybeObserver<T> parent = new AmbMaybeObserver<T>(observer);
observer.onSubscribe(parent);
CompositeDisposable set = new CompositeDisposable();
observer.onSubscribe(set);

AtomicBoolean winner = new AtomicBoolean();

for (int i = 0; i < count; i++) {
MaybeSource<? extends T> s = sources[i];
if (parent.isDisposed()) {
if (set.isDisposed()) {
return;
}

if (s == null) {
parent.onError(new NullPointerException("One of the MaybeSources is null"));
set.dispose();
NullPointerException ex = new NullPointerException("One of the MaybeSources is null");
if (winner.compareAndSet(false, true)) {
observer.onError(ex);
} else {
RxJavaPlugins.onError(ex);
}
return;
}

s.subscribe(parent);
s.subscribe(new AmbMaybeObserver<T>(observer, set, winner));
}

if (count == 0) {
observer.onComplete();
}

}

static final class AmbMaybeObserver<T>
extends AtomicBoolean
implements MaybeObserver<T>, Disposable {

private static final long serialVersionUID = -7044685185359438206L;
implements MaybeObserver<T> {

final MaybeObserver<? super T> downstream;

final CompositeDisposable set;
final AtomicBoolean winner;

AmbMaybeObserver(MaybeObserver<? super T> downstream) {
this.downstream = downstream;
this.set = new CompositeDisposable();
}
final CompositeDisposable set;

@Override
public void dispose() {
if (compareAndSet(false, true)) {
set.dispose();
}
}
Disposable upstream;

@Override
public boolean isDisposed() {
return get();
AmbMaybeObserver(MaybeObserver<? super T> downstream, CompositeDisposable set, AtomicBoolean winner) {
this.downstream = downstream;
this.set = set;
this.winner = winner;
}

@Override
public void onSubscribe(Disposable d) {
upstream = d;
set.add(d);
}

@Override
public void onSuccess(T value) {
if (compareAndSet(false, true)) {
if (winner.compareAndSet(false, true)) {
set.delete(upstream);
set.dispose();

downstream.onSuccess(value);
Expand All @@ -130,7 +129,8 @@ public void onSuccess(T value) {

@Override
public void onError(Throwable e) {
if (compareAndSet(false, true)) {
if (winner.compareAndSet(false, true)) {
set.delete(upstream);
set.dispose();

downstream.onError(e);
Expand All @@ -141,12 +141,12 @@ public void onError(Throwable e) {

@Override
public void onComplete() {
if (compareAndSet(false, true)) {
if (winner.compareAndSet(false, true)) {
set.delete(upstream);
set.dispose();

downstream.onComplete();
}
}

}
}
26 changes: 16 additions & 10 deletions src/main/java/io/reactivex/internal/operators/single/SingleAmb.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,61 +59,67 @@ protected void subscribeActual(final SingleObserver<? super T> observer) {
count = sources.length;
}

final AtomicBoolean winner = new AtomicBoolean();
final CompositeDisposable set = new CompositeDisposable();

AmbSingleObserver<T> shared = new AmbSingleObserver<T>(observer, set);
observer.onSubscribe(set);

for (int i = 0; i < count; i++) {
SingleSource<? extends T> s1 = sources[i];
if (shared.get()) {
if (set.isDisposed()) {
return;
}

if (s1 == null) {
set.dispose();
Throwable e = new NullPointerException("One of the sources is null");
if (shared.compareAndSet(false, true)) {
if (winner.compareAndSet(false, true)) {
observer.onError(e);
} else {
RxJavaPlugins.onError(e);
}
return;
}

s1.subscribe(shared);
s1.subscribe(new AmbSingleObserver<T>(observer, set, winner));
}
}

static final class AmbSingleObserver<T> extends AtomicBoolean implements SingleObserver<T> {

private static final long serialVersionUID = -1944085461036028108L;
static final class AmbSingleObserver<T> implements SingleObserver<T> {

final CompositeDisposable set;

final SingleObserver<? super T> downstream;

AmbSingleObserver(SingleObserver<? super T> observer, CompositeDisposable set) {
final AtomicBoolean winner;

Disposable upstream;

AmbSingleObserver(SingleObserver<? super T> observer, CompositeDisposable set, AtomicBoolean winner) {
this.downstream = observer;
this.set = set;
this.winner = winner;
}

@Override
public void onSubscribe(Disposable d) {
this.upstream = d;
set.add(d);
}

@Override
public void onSuccess(T value) {
if (compareAndSet(false, true)) {
if (winner.compareAndSet(false, true)) {
set.delete(upstream);
set.dispose();
downstream.onSuccess(value);
}
}

@Override
public void onError(Throwable e) {
if (compareAndSet(false, true)) {
if (winner.compareAndSet(false, true)) {
set.delete(upstream);
set.dispose();
downstream.onError(e);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@
import static org.junit.Assert.*;

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Test;

import io.reactivex.*;
import io.reactivex.disposables.*;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.operators.completable.CompletableAmb.Amb;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.*;

public class CompletableAmbTest {
Expand Down Expand Up @@ -173,6 +177,7 @@ public void ambRace() {
CompositeDisposable cd = new CompositeDisposable();
AtomicBoolean once = new AtomicBoolean();
Amb a = new Amb(once, cd, to);
a.onSubscribe(Disposables.empty());

a.onComplete();
a.onComplete();
Expand Down Expand Up @@ -259,4 +264,54 @@ public void untilCompletableOtherError() {
to.assertFailure(TestException.class);
}

@Test
public void noWinnerErrorDispose() throws Exception {
final TestException ex = new TestException();
for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
final AtomicBoolean interrupted = new AtomicBoolean();
final CountDownLatch cdl = new CountDownLatch(1);

Completable.ambArray(
Completable.error(ex)
.subscribeOn(Schedulers.single())
.observeOn(Schedulers.computation()),
Completable.never()
)
.subscribe(Functions.EMPTY_ACTION, new Consumer<Throwable>() {
@Override
public void accept(Throwable e) throws Exception {
interrupted.set(Thread.currentThread().isInterrupted());
cdl.countDown();
}
});

assertTrue(cdl.await(500, TimeUnit.SECONDS));
assertFalse("Interrupted!", interrupted.get());
}
}

@Test
public void noWinnerCompleteDispose() throws Exception {
for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
final AtomicBoolean interrupted = new AtomicBoolean();
final CountDownLatch cdl = new CountDownLatch(1);

Completable.ambArray(
Completable.complete()
.subscribeOn(Schedulers.single())
.observeOn(Schedulers.computation()),
Completable.never()
)
.subscribe(new Action() {
@Override
public void run() throws Exception {
interrupted.set(Thread.currentThread().isInterrupted());
cdl.countDown();
}
});

assertTrue(cdl.await(500, TimeUnit.SECONDS));
assertFalse("Interrupted!", interrupted.get());
}
}
}
Loading

0 comments on commit d40f923

Please sign in to comment.