Skip to content

Commit

Permalink
FlowableScanSeed - prevent post-terminal events
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmoten committed Dec 3, 2016
1 parent 846afd3 commit e55459b
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ static final class ScanSeedSubscriber<T, R> extends SinglePostCompleteSubscriber
private static final long serialVersionUID = -1776795561228106469L;

final BiFunction<R, ? super T, R> accumulator;

boolean done;

ScanSeedSubscriber(Subscriber<? super R> actual, BiFunction<R, ? super T, R> accumulator, R value) {
super(actual);
Expand All @@ -60,6 +62,10 @@ static final class ScanSeedSubscriber<T, R> extends SinglePostCompleteSubscriber

@Override
public void onNext(T t) {
if (done) {
return;
}

R v = value;

R u;
Expand All @@ -80,12 +86,20 @@ public void onNext(T t) {

@Override
public void onError(Throwable t) {
if (done) {
return;
}
done = true;
value = null;
actual.onError(t);
}

@Override
public void onComplete() {
if (done) {
return;
}
done = true;
complete(value);
}
}
Expand Down
85 changes: 85 additions & 0 deletions src/test/java/io/reactivex/flowable/FlowableScanTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,16 @@

package io.reactivex.flowable;

import static org.junit.Assert.assertEquals;

import java.util.HashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Assert;
import org.junit.Test;

import io.reactivex.Flowable;
import io.reactivex.flowable.FlowableEventStream.Event;
import io.reactivex.functions.*;

Expand All @@ -41,4 +47,83 @@ public void accept(HashMap<String, String> v) {
}
});
}

@Test
public void testFlowableScanSeedDoesNotEmitErrorTwiceIfScanFunctionThrows() {
final RuntimeException e = new RuntimeException();
Burst.item(1).error(e).scan(0, new BiFunction<Integer, Integer, Integer>() {

@Override
public Integer apply(Integer n1, Integer n2) throws Exception {
throw e;
}})
.test()
.assertNoValues()
.assertError(e);
}

@Test
public void testFlowableScanSeedDoesNotEmitTerminalEventTwiceIfScanFunctionThrows() {
final RuntimeException e = new RuntimeException();
Burst.item(1).create().scan(0, new BiFunction<Integer, Integer, Integer>() {

@Override
public Integer apply(Integer n1, Integer n2) throws Exception {
throw e;
}})
.test()
.assertNoValues()
.assertError(e);
}

@Test
public void testFlowableScanSeedDoesNotProcessOnNextAfterTerminalEventIfScanFunctionThrows() {
final RuntimeException e = new RuntimeException();
final AtomicInteger count = new AtomicInteger();
Burst.items(1, 2).create().scan(0, new BiFunction<Integer, Integer, Integer>() {

@Override
public Integer apply(Integer n1, Integer n2) throws Exception {
count.incrementAndGet();
throw e;
}})
.test()
.assertNoValues()
.assertError(e);
assertEquals(1, count.get());
}

@Test
public void testFlowableScanSeedCompletesNormally() {
Flowable.just(1,2,3).scan(0, new BiFunction<Integer, Integer, Integer>() {

@Override
public Integer apply(Integer t1, Integer t2) throws Exception {
return t1 + t2;
}})
.test()
.assertValues(0, 1, 3, 6)
.assertComplete();
}

@Test
public void testFlowableScanSeedWhenScanSeedProviderThrows() {
final RuntimeException e = new RuntimeException();
Flowable.just(1,2,3).scanWith(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
throw e;
}
},
new BiFunction<Integer, Integer, Integer>() {

@Override
public Integer apply(Integer t1, Integer t2) throws Exception {
return t1 + t2;
}
})
.test()
.assertError(e)
.assertNoValues();
}
}

0 comments on commit e55459b

Please sign in to comment.