Skip to content

Commit

Permalink
Publisher#flatMapMergeSingle mapped source duplicate termainl improve…
Browse files Browse the repository at this point in the history
…d visibility (#1516)

Motivation:
A mapped source sending multiple terminal signals is in violation of the
Reactive Streams specification [1]. However if there is a faulty source
this may happen, and Publisher#flatMapMergeSingle will fail on an
assertion due to the activeMappedSources being 0 in
decrementActiveMappedSources. The assertion doesn't provide any
additional context as to what went wrong, and the downstream source may
not be completed as a result. This can be difficult to debug the root
cause and understand where the duplicate terminal signal originates
from.

[1] https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.7

Modifications:
- Enhance Publisher#flatMapMergeSingle and Publisher#flatMapMerge to log
  a warning when a duplicate terminal signal is detected, and still
  allow for control flow to continue.

Result:
Publisher#flatMapMergeSingle and flatMapMerge still complete control
flow in the presense of RS violations and provide log statement to help
folks track down the cause of the duplicate terminal signal.
  • Loading branch information
Scottmitch authored Apr 26, 2021
1 parent 7cfa6b1 commit 362a697
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -606,12 +606,21 @@ public void onError(final Throwable t) {

@Override
public void onComplete() {
if (parent.removeSubscriber(this, pendingDemandUpdater.getAndSet(this, -1))) {
final int unusedDemand = pendingDemandUpdater.getAndSet(this, -1);
if (unusedDemand < 0) {
logDuplicateTerminal();
} else if (parent.removeSubscriber(this, unusedDemand)) {
parent.enqueueAndDrain(complete());
} else {
parent.tryEmitItem(MAPPED_SOURCE_COMPLETE, this);
}
}

private void logDuplicateTerminal() {
LOGGER.warn("Duplicate terminal on Subscriber {}", this,
new IllegalStateException("Duplicate terminal on Subscriber " + this + " forbidden see: " +
"https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.7"));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -405,10 +405,23 @@ public void onError(Throwable t) {
}

private boolean onSingleTerminated() {
assert singleCancellable != null;
if (singleCancellable == null) {
logDuplicateTerminal();
return false;
}
cancellableSet.remove(singleCancellable);
singleCancellable = null;
return decrementActiveMappedSources();
}

private void logDuplicateTerminal() {
LOGGER.warn("onSubscribe not called before terminal or duplicate terminal on Subscriber {}", this,
new IllegalStateException(
"onSubscribe not called before terminal or duplicate terminal on Subscriber " + this +
" forbidden see: " +
"https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.9" +
"https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.7"));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.PublisherSource.Processor;
import io.servicetalk.concurrent.PublisherSource.Subscriber;
import io.servicetalk.concurrent.internal.DeliberateException;
Expand Down Expand Up @@ -50,6 +51,7 @@
import static io.servicetalk.concurrent.api.SourceAdapters.fromSource;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
import static io.servicetalk.concurrent.internal.EmptySubscriptions.EMPTY_SUBSCRIPTION;
import static io.servicetalk.utils.internal.PlatformDependent.throwException;
import static java.lang.Math.min;
import static java.util.Arrays.asList;
Expand All @@ -72,6 +74,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

public class PublisherFlatMapMergeTest {
private static final long TERMINAL_POLL_MS = 10;
Expand Down Expand Up @@ -244,6 +247,23 @@ public void singleItemMappedErrorPostSourceComplete() {
assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION));
}

@Test
public void testDuplicateTerminal() {
PublisherSource<Integer> mappedPublisher = subscriber -> {
subscriber.onSubscribe(EMPTY_SUBSCRIPTION);
subscriber.onComplete();
// intentionally violate the RS spec to verify the operator's behavior.
// [1] https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.7
subscriber.onComplete();
};
@SuppressWarnings("unchecked")
Subscriber<Integer> mockSubscriber = mock(Subscriber.class);
toSource(publisher.flatMapMerge(i -> fromSource(mappedPublisher), 1)).subscribe(mockSubscriber);
publisher.onNext(1);
publisher.onComplete();
verify(mockSubscriber).onComplete();
}

@Test
public void cancelPropagatedBeforeErrorButOriginalErrorPreserved() {
CountDownLatch cancelledLatch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.servicetalk.concurrent.PublisherSource.Subscriber;
import io.servicetalk.concurrent.PublisherSource.Subscription;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.internal.DeliberateException;
import io.servicetalk.concurrent.test.internal.TestPublisherSubscriber;

Expand All @@ -37,10 +38,12 @@
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;

import static io.servicetalk.concurrent.Cancellable.IGNORE_CANCEL;
import static io.servicetalk.concurrent.api.Executors.immediate;
import static io.servicetalk.concurrent.api.Publisher.fromIterable;
import static io.servicetalk.concurrent.api.Single.failed;
import static io.servicetalk.concurrent.api.Single.succeeded;
import static io.servicetalk.concurrent.api.SourceAdapters.fromSource;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
import static io.servicetalk.utils.internal.PlatformDependent.throwException;
Expand All @@ -61,6 +64,8 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

public class PublisherFlatMapSingleTest {
private final TestPublisherSubscriber<Integer> subscriber = new TestPublisherSubscriber<>();
Expand Down Expand Up @@ -186,6 +191,23 @@ public void testSingleErrorPostSourceComplete() {
assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION));
}

@Test
public void testDuplicateTerminal() {
SingleSource<Integer> single = subscriber -> {
subscriber.onSubscribe(IGNORE_CANCEL);
subscriber.onSuccess(2);
// intentionally violate the RS spec to verify the operator's behavior.
// [1] https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.7
subscriber.onSuccess(3);
};
@SuppressWarnings("unchecked")
Subscriber<Integer> mockSubscriber = mock(Subscriber.class);
toSource(source.flatMapMergeSingle(integer1 -> fromSource(single), 2)).subscribe(mockSubscriber);
source.onNext(1);
source.onComplete();
verify(mockSubscriber).onComplete();
}

@Test
public void cancelPropagatedBeforeErrorButOriginalErrorPreserved() {
CountDownLatch cancelledLatch = new CountDownLatch(1);
Expand Down

0 comments on commit 362a697

Please sign in to comment.