Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: FlowableScan - prevent multiple terminal emissions #4901

Merged
merged 1 commit into from
Dec 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.BiFunction;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.plugins.RxJavaPlugins;

public final class FlowableScan<T> extends AbstractFlowableWithUpstream<T, T> {
final BiFunction<T, T, T> accumulator;
Expand All @@ -39,6 +40,8 @@ static final class ScanSubscriber<T> implements Subscriber<T>, Subscription {
Subscription s;

T value;

boolean done;

ScanSubscriber(Subscriber<? super T> actual, BiFunction<T, T, T> accumulator) {
this.actual = actual;
Expand All @@ -55,6 +58,9 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(T t) {
if (done) {
return;
}
final Subscriber<? super T> a = actual;
T v = value;
if (v == null) {
Expand All @@ -68,7 +74,7 @@ public void onNext(T t) {
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
s.cancel();
a.onError(e);
onError(e);
return;
}

Expand All @@ -79,11 +85,20 @@ public void onNext(T t) {

@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
done = true;
actual.onError(t);
}

@Override
public void onComplete() {
if (done) {
return;
}
done = true;
actual.onComplete();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.subscribers.SinglePostCompleteSubscriber;
import io.reactivex.internal.subscriptions.EmptySubscription;
import io.reactivex.plugins.RxJavaPlugins;

public final class FlowableScanSeed<T, R> extends AbstractFlowableWithUpstream<T, R> {
final BiFunction<R, ? super T, R> accumulator;
Expand Down Expand Up @@ -87,6 +88,7 @@ public void onNext(T t) {
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
done = true;
Expand Down
152 changes: 116 additions & 36 deletions src/test/java/io/reactivex/flowable/FlowableScanTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,25 @@

import static org.junit.Assert.assertEquals;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Assert;
import org.junit.Test;

import io.reactivex.Flowable;
import io.reactivex.flowable.FlowableEventStream.Event;
import io.reactivex.functions.*;
import io.reactivex.plugins.RxJavaPlugins;

public class FlowableScanTests {


@Test
public void testUnsubscribeScan() {

Expand All @@ -49,81 +55,155 @@ public void accept(HashMap<String, String> v) {
}

@Test
public void testFlowableScanSeedDoesNotEmitErrorTwiceIfScanFunctionThrows() {
public void testScanWithSeedDoesNotEmitErrorTwiceIfScanFunctionThrows() {
final List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
Consumer<Throwable> errorConsumer = new Consumer<Throwable>() {
@Override
public void accept(Throwable t) throws Exception {
list.add(t);
}};
try {
RxJavaPlugins.setErrorHandler(errorConsumer);
final RuntimeException e = new RuntimeException();
final RuntimeException e2 = new RuntimeException();
Burst.items(1).error(e2)
.scan(0, throwingBiFunction(e))
.test()
.assertNoValues()
.assertError(e);
assertEquals(Arrays.asList(e2), list);
} finally {
RxJavaPlugins.reset();
}
}

@Test
public void testScanWithSeedDoesNotEmitTerminalEventTwiceIfScanFunctionThrows() {
final RuntimeException e = new RuntimeException();
Burst.item(1).error(e).scan(0, new BiFunction<Integer, Integer, Integer>() {
Burst.item(1).create()
.scan(0, throwingBiFunction(e))
.test()
.assertNoValues()
.assertError(e);
}

@Test
public void testScanWithSeedDoesNotProcessOnNextAfterTerminalEventIfScanFunctionThrows() {
final RuntimeException e = new RuntimeException();
final AtomicInteger count = new AtomicInteger();
Burst.items(1, 2).create().scan(0, new BiFunction<Integer, Integer, Integer>() {

@Override
public Integer apply(Integer n1, Integer n2) throws Exception {
count.incrementAndGet();
throw e;
}})
.test()
.assertNoValues()
.assertError(e);
assertEquals(1, count.get());
}

@Test
public void testScanWithSeedCompletesNormally() {
Flowable.just(1,2,3).scan(0, SUM)
.test()
.assertValues(0, 1, 3, 6)
.assertComplete();
}

@Test
public void testFlowableScanSeedDoesNotEmitTerminalEventTwiceIfScanFunctionThrows() {
public void testScanWithSeedWhenScanSeedProviderThrows() {
final RuntimeException e = new RuntimeException();
Burst.item(1).create().scan(0, new BiFunction<Integer, Integer, Integer>() {
Flowable.just(1,2,3).scanWith(throwingCallable(e),
SUM)
.test()
.assertError(e)
.assertNoValues();
}

@Test
public void testScanNoSeed() {
Flowable.just(1, 2, 3)
.scan(SUM)
.test()
.assertValues(1, 3, 6)
.assertComplete();
}

@Test
public void testScanNoSeedDoesNotEmitErrorTwiceIfScanFunctionThrows() {
final List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
Consumer<Throwable> errorConsumer = new Consumer<Throwable>() {
@Override
public Integer apply(Integer n1, Integer n2) throws Exception {
throw e;
}})
public void accept(Throwable t) throws Exception {
list.add(t);
}};
try {
RxJavaPlugins.setErrorHandler(errorConsumer);
final RuntimeException e = new RuntimeException();
final RuntimeException e2 = new RuntimeException();
Burst.items(1, 2).error(e2)
.scan(throwingBiFunction(e))
.test()
.assertValue(1)
.assertError(e);
assertEquals(Arrays.asList(e2), list);
} finally {
RxJavaPlugins.reset();
}
}

@Test
public void testScanNoSeedDoesNotEmitTerminalEventTwiceIfScanFunctionThrows() {
final RuntimeException e = new RuntimeException();
Burst.items(1, 2).create()
.scan(throwingBiFunction(e))
.test()
.assertNoValues()
.assertValue(1)
.assertError(e);
}

@Test
public void testFlowableScanSeedDoesNotProcessOnNextAfterTerminalEventIfScanFunctionThrows() {
public void testScanNoSeedDoesNotProcessOnNextAfterTerminalEventIfScanFunctionThrows() {
final RuntimeException e = new RuntimeException();
final AtomicInteger count = new AtomicInteger();
Burst.items(1, 2).create().scan(0, new BiFunction<Integer, Integer, Integer>() {
Burst.items(1, 2, 3).create().scan(new BiFunction<Integer, Integer, Integer>() {

@Override
public Integer apply(Integer n1, Integer n2) throws Exception {
count.incrementAndGet();
throw e;
}})
.test()
.assertNoValues()
.assertValue(1)
.assertError(e);
assertEquals(1, count.get());
}

@Test
public void testFlowableScanSeedCompletesNormally() {
Flowable.just(1,2,3).scan(0, new BiFunction<Integer, Integer, Integer>() {

private static BiFunction<Integer,Integer, Integer> throwingBiFunction(final RuntimeException e) {
return new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer t1, Integer t2) throws Exception {
return t1 + t2;
}})
.test()
.assertValues(0, 1, 3, 6)
.assertComplete();
public Integer apply(Integer n1, Integer n2) throws Exception {
throw e;
}
};
}

private static final BiFunction<Integer, Integer, Integer> SUM = new BiFunction<Integer, Integer, Integer>() {

@Override
public Integer apply(Integer t1, Integer t2) throws Exception {
return t1 + t2;
}
};

@Test
public void testFlowableScanSeedWhenScanSeedProviderThrows() {
final RuntimeException e = new RuntimeException();
Flowable.just(1,2,3).scanWith(new Callable<Integer>() {
private static Callable<Integer> throwingCallable(final RuntimeException e) {
return new Callable<Integer>() {
@Override
public Integer call() throws Exception {
throw e;
}
},
new BiFunction<Integer, Integer, Integer>() {

@Override
public Integer apply(Integer t1, Integer t2) throws Exception {
return t1 + t2;
}
})
.test()
.assertError(e)
.assertNoValues();
};
}
}