Skip to content

Commit

Permalink
2.x: Fix flatMap inner fused poll crash not cancelling the upstream (#…
Browse files Browse the repository at this point in the history
…5792)

* 2.x: Fix flatMap inner fused poll crash not cancelling the upstream

* Verify Observable.flatMapIterable
  • Loading branch information
akarnokd authored Jan 5, 2018
1 parent 4fd16ee commit d91ee5a
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ static final class MergeSubscriber<T, U> extends AtomicInteger implements Flowab

final AtomicLong requested = new AtomicLong();

Subscription s;
Subscription upstream;

long uniqueId;
long lastId;
Expand All @@ -107,8 +107,8 @@ static final class MergeSubscriber<T, U> extends AtomicInteger implements Flowab

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.s, s)) {
this.s = s;
if (SubscriptionHelper.validate(this.upstream, s)) {
this.upstream = s;
actual.onSubscribe(this);
if (!cancelled) {
if (maxConcurrency == Integer.MAX_VALUE) {
Expand All @@ -132,7 +132,7 @@ public void onNext(T t) {
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null Publisher");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
s.cancel();
upstream.cancel();
onError(e);
return;
}
Expand All @@ -154,7 +154,7 @@ public void onNext(T t) {
if (maxConcurrency != Integer.MAX_VALUE && !cancelled
&& ++scalarEmitted == scalarLimit) {
scalarEmitted = 0;
s.request(scalarLimit);
upstream.request(scalarLimit);
}
}
} else {
Expand Down Expand Up @@ -238,7 +238,7 @@ void tryEmitScalar(U value) {
if (maxConcurrency != Integer.MAX_VALUE && !cancelled
&& ++scalarEmitted == scalarLimit) {
scalarEmitted = 0;
s.request(scalarLimit);
upstream.request(scalarLimit);
}
} else {
if (q == null) {
Expand Down Expand Up @@ -350,7 +350,7 @@ public void request(long n) {
public void cancel() {
if (!cancelled) {
cancelled = true;
s.cancel();
upstream.cancel();
disposeAll();
if (getAndIncrement() == 0) {
SimpleQueue<U> q = queue;
Expand Down Expand Up @@ -482,6 +482,9 @@ void drainLoop() {
Exceptions.throwIfFatal(ex);
is.dispose();
errs.addThrowable(ex);
if (!delayErrors) {
upstream.cancel();
}
if (checkTerminate()) {
return;
}
Expand Down Expand Up @@ -539,7 +542,7 @@ void drainLoop() {
}

if (replenishMain != 0L && !cancelled) {
s.request(replenishMain);
upstream.request(replenishMain);
}
if (innerCompleted) {
continue;
Expand Down Expand Up @@ -594,7 +597,7 @@ void innerError(InnerSubscriber<T, U> inner, Throwable t) {
if (errs.addThrowable(t)) {
inner.done = true;
if (!delayErrors) {
s.cancel();
upstream.cancel();
for (InnerSubscriber<?, ?> a : subscribers.getAndSet(CANCELLED)) {
a.dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1033,4 +1033,47 @@ public Object apply(Integer v, Object w) throws Exception {
.test()
.assertFailureAndMessage(NullPointerException.class, "The mapper returned a null Publisher");
}

@Test
public void failingFusedInnerCancelsSource() {
final AtomicInteger counter = new AtomicInteger();
Flowable.range(1, 5)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
counter.getAndIncrement();
}
})
.flatMap(new Function<Integer, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(Integer v)
throws Exception {
return Flowable.<Integer>fromIterable(new Iterable<Integer>() {
@Override
public Iterator<Integer> iterator() {
return new Iterator<Integer>() {
@Override
public boolean hasNext() {
return true;
}

@Override
public Integer next() {
throw new TestException();
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
});
}
})
.test()
.assertFailure(TestException.class);

assertEquals(1, counter.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -914,4 +914,47 @@ public void multiShareHidden() {
.assertResult(600L);
}
}

@Test
public void failingInnerCancelsSource() {
final AtomicInteger counter = new AtomicInteger();
Flowable.range(1, 5)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
counter.getAndIncrement();
}
})
.flatMapIterable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v)
throws Exception {
return new Iterable<Integer>() {
@Override
public Iterator<Integer> iterator() {
return new Iterator<Integer>() {
@Override
public boolean hasNext() {
return true;
}

@Override
public Integer next() {
throw new TestException();
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
}
})
.test()
.assertFailure(TestException.class);

assertEquals(1, counter.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -894,4 +894,48 @@ public Object apply(Integer v, Object w) throws Exception {
.test()
.assertFailureAndMessage(NullPointerException.class, "The mapper returned a null ObservableSource");
}


@Test
public void failingFusedInnerCancelsSource() {
final AtomicInteger counter = new AtomicInteger();
Observable.range(1, 5)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
counter.getAndIncrement();
}
})
.flatMap(new Function<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> apply(Integer v)
throws Exception {
return Observable.<Integer>fromIterable(new Iterable<Integer>() {
@Override
public Iterator<Integer> iterator() {
return new Iterator<Integer>() {
@Override
public boolean hasNext() {
return true;
}

@Override
public Integer next() {
throw new TestException();
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
});
}
})
.test()
.assertFailure(TestException.class);

assertEquals(1, counter.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,17 @@

package io.reactivex.internal.operators.observable;

import java.util.Arrays;
import static org.junit.Assert.assertEquals;

import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

import io.reactivex.*;
import io.reactivex.functions.Function;
import io.reactivex.Observable;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.*;
import io.reactivex.subjects.PublishSubject;

public class ObservableFlattenIterableTest {
Expand Down Expand Up @@ -47,4 +52,47 @@ public Iterable<Integer> apply(Object v) throws Exception {
}
}, false, 1, 1, 10, 20);
}

@Test
public void failingInnerCancelsSource() {
final AtomicInteger counter = new AtomicInteger();
Observable.range(1, 5)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
counter.getAndIncrement();
}
})
.flatMapIterable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v)
throws Exception {
return new Iterable<Integer>() {
@Override
public Iterator<Integer> iterator() {
return new Iterator<Integer>() {
@Override
public boolean hasNext() {
return true;
}

@Override
public Integer next() {
throw new TestException();
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
}
})
.test()
.assertFailure(TestException.class);

assertEquals(1, counter.get());
}
}

0 comments on commit d91ee5a

Please sign in to comment.