Skip to content

Commit

Permalink
Publisher#scanWith(ScanWithMapper) incorrect/missing terminal after c…
Browse files Browse the repository at this point in the history
…ancel (#1458)

Motivation:
Publisher#scanWith(ScanWithMapper) may not deliver a terminal event
after cancel is invoked. The operator may also deliver an internally
generated error due to state management incorrectly indicated a terminal
event has already been seen (e.g. duplicate terminal with violates RS
spec) or invalid requestN demand was previously seen.

Modifications:
- cancel should not update state to TERMINATED

Result:
Publisher#scanWith(ScanWithMapper) will deliver terminal events after
cancel and won't deliver internal errors at incorrect times.
  • Loading branch information
Scottmitch authored Mar 25, 2021
1 parent 92b55e3 commit 8f3db89
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,7 @@ final class ScanWithPublisher<T, R> extends AbstractNoHandleSubscribePublisher<R

ScanWithPublisher(Publisher<T> original, Supplier<R> initial, BiFunction<R, ? super T, R> accumulator,
Executor executor) {
this(original, () -> new ScanWithMapper<T, R>() {
@Nullable
private R state = initial.get();

@Override
public R mapOnNext(@Nullable final T next) {
state = accumulator.apply(state, next);
return state;
}

@Override
public R mapOnError(final Throwable cause) {
throw newMapTerminalUnsupported();
}

@Override
public R mapOnComplete() {
throw newMapTerminalUnsupported();
}

@Override
public boolean mapTerminal() {
return false;
}
}, executor);
this(original, new SupplierScanWithMapper<>(initial, accumulator), executor);
}

ScanWithPublisher(Publisher<T> original, Supplier<? extends ScanWithMapper<? super T, ? extends R>> mapperSupplier,
Expand Down Expand Up @@ -133,7 +109,6 @@ public void request(final long n) {

@Override
public void cancel() {
demand = TERMINATED;
subscription.cancel();
}

Expand Down Expand Up @@ -245,7 +220,46 @@ private void deliverOnComplete(Subscriber<? super R> subscriber) {
}
}

private static IllegalStateException newMapTerminalUnsupported() {
throw new IllegalStateException("mapTerminal returns false, this method should never be invoked!");
private static final class SupplierScanWithMapper<T, R> implements Supplier<ScanWithMapper<T, R>> {
private final BiFunction<R, ? super T, R> accumulator;
private final Supplier<R> initial;

SupplierScanWithMapper(Supplier<R> initial, BiFunction<R, ? super T, R> accumulator) {
this.initial = requireNonNull(initial);
this.accumulator = requireNonNull(accumulator);
}

@Override
public ScanWithMapper<T, R> get() {
return new ScanWithMapper<T, R>() {
@Nullable
private R state = initial.get();

@Override
public R mapOnNext(@Nullable final T next) {
state = accumulator.apply(state, next);
return state;
}

@Override
public R mapOnError(final Throwable cause) {
throw newMapTerminalUnsupported();
}

@Override
public R mapOnComplete() {
throw newMapTerminalUnsupported();
}

@Override
public boolean mapTerminal() {
return false;
}
};
}

private static IllegalStateException newMapTerminalUnsupported() {
throw new IllegalStateException("mapTerminal returns false, this method should never be invoked!");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
import io.servicetalk.concurrent.test.internal.TestPublisherSubscriber;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.stream.Stream;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.Processors.newPublisherProcessor;
Expand Down Expand Up @@ -322,6 +326,70 @@ public void invalidDemandWithOnNextAllowsError() throws InterruptedException {
assertThat(subscriber.awaitOnError(), instanceOf(IllegalArgumentException.class));
}

@ParameterizedTest
@MethodSource("cancelStillAllowsMapsParams")
public void cancelStillAllowsMaps(boolean onError, boolean cancelBefore) {
TestPublisher<Integer> publisher = new TestPublisher<>();
TestPublisherSubscriber<Integer> subscriber = new TestPublisherSubscriber<>();
toSource(publisher.scanWith(() -> new ScanWithMapper<Integer, Integer>() {
private int sum;
@Nullable
@Override
public Integer mapOnNext(@Nullable final Integer next) {
if (next != null) {
sum += next;
}
return next;
}

@Override
public Integer mapOnError(final Throwable cause) {
return sum;
}

@Override
public Integer mapOnComplete() {
return sum;
}

@Override
public boolean mapTerminal() {
return true;
}
})).subscribe(subscriber);
Subscription s = subscriber.awaitSubscription();

if (cancelBefore) {
s.request(4);
s.cancel();
} else {
s.request(3);
}

publisher.onNext(1, 2, 3);

if (!cancelBefore) {
s.cancel();
s.request(1);
}
if (onError) {
publisher.onError(DELIBERATE_EXCEPTION);
} else {
publisher.onComplete();
}

assertThat(subscriber.takeOnNext(4), contains(1, 2, 3, 6));
subscriber.awaitOnComplete();
}

private static Stream<Arguments> cancelStillAllowsMapsParams() {
return Stream.of(
Arguments.of(true, true),
Arguments.of(true, false),
Arguments.of(false, true),
Arguments.of(false, false));
}

private static ScanWithMapper<Integer, Integer> noopMapper() {
return new ScanWithMapper<Integer, Integer>() {
@Nullable
Expand Down

0 comments on commit 8f3db89

Please sign in to comment.