Skip to content

Commit

Permalink
SpliceFlatStreamToMetaSingle: propagate cancel when races with data (
Browse files Browse the repository at this point in the history
…#3036)

Motivation:

Cancellation of the single may race with delivering the first element. If `cancel` wins, we already deliver `CancellationException` if someone subscribes to the payload publisher. However, if `cancel` arrives after  `dataSubscriber.onSuccess(...)` and before someone subscribes to the payload publisher, it's a strong indicator that there was a race and `dataSubscriber` is not interested in it anymore. Therefore, we should pessimistically assume that nobody will ever subscribe to the payload publisher, cancel upstream subscription, and deliver `CancellationException` is someone still subscribes.

Modifications:
- Cancel upstream subscription for both `null` and `PENDING` states if received data cancel;
- Upwrap code around `parent.packer.apply` to assert that this path happens only if `CANCELED`;
- Deliver `CancellationException` is someone still subscribes to the payload publisher after `CANCELED`;
- Fix another bug: do not deliver `dataSubscriber.onError` if `metaSeenInOnNext == true` because it's already terminated (`CANCELED` does not play any role here);
- Log more appropriate message if upstream delivers a terminal event after `payloadSubscriber` is already terminated (`EMPTY_COMPLETED_DELIVERED`);
- Remove duplication around "Duplicate Subscribers are not allowed";

Result:

Network resources receive `cancel` signal and clean up connection state.
  • Loading branch information
idelpivnitskiy authored Aug 14, 2024
1 parent 2316f7c commit f345761
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 50 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018, 2022 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018-2019, 2021-2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,6 +32,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -71,12 +72,12 @@ public PublisherSource.Subscriber<Object> apply(Subscriber<? super Data> subscri
return new SplicingSubscriber<>(this, subscriber);
}

/* Visible for testing */
static final class SplicingSubscriber<Data, MetaData, Payload> implements PublisherSource.Subscriber<Object> {
private static final class SplicingSubscriber<Data, MetaData, Payload>
implements PublisherSource.Subscriber<Object> {

@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<SplicingSubscriber, Object>
maybePayloadSubUpdater = AtomicReferenceFieldUpdater.newUpdater(SplicingSubscriber.class,
Object.class, "maybePayloadSub");
private static final AtomicReferenceFieldUpdater<SplicingSubscriber, Object> maybePayloadSubUpdater =
AtomicReferenceFieldUpdater.newUpdater(SplicingSubscriber.class, Object.class, "maybePayloadSub");

private static final String CANCELED = "CANCELED";
private static final String PENDING = "PENDING";
Expand Down Expand Up @@ -155,7 +156,9 @@ private SplicingSubscriber(SpliceFlatStreamToMetaSingle<Data, MetaData, Payload>
* Publisher}&lt;{@link Payload}&gt;
*/
private void cancelData(Subscription subscription) {
if (maybePayloadSubUpdater.compareAndSet(this, null, CANCELED)) {
final Object current = maybePayloadSubUpdater.getAndUpdate(this,
curr -> curr == null || curr == PENDING ? CANCELED : curr);
if (current == null || current == PENDING) {
subscription.cancel();
}
}
Expand Down Expand Up @@ -195,12 +198,22 @@ public void onNext(@Nullable Object obj) {
metaSeenInOnNext = true;
final Data data;
try {
data = parent.packer.apply(meta, maybePayloadSubUpdater.compareAndSet(this, null, PENDING) ?
newPayloadPublisher() : Publisher.failed(StacklessCancellationException.newInstance(
"Canceled prematurely from Data", SplicingSubscriber.class, "cancelData(..)")));
final Publisher<Payload> payload;
if (maybePayloadSubUpdater.compareAndSet(this, null, PENDING)) {
payload = newPayloadPublisher();
} else {
final Object maybePayloadSub = this.maybePayloadSub;
assert maybePayloadSub == CANCELED : "Expected CANCELED but got: " + maybePayloadSub;
boolean cas = maybePayloadSubUpdater.compareAndSet(this, CANCELED, EMPTY_COMPLETED_DELIVERED);
assert cas : "Could not transition from CANCELED to EMPTY_COMPLETED_DELIVERED";
payload = Publisher.failed(StacklessCancellationException.newInstance(
"Canceled prematurely from SplicingSubscriber.cancelData(..), current state: " +
maybePayloadSub, getClass(), "onNext(...)"));
}
data = parent.packer.apply(meta, payload);
assert data != null : "Packer function must return non-null Data";
} catch (Throwable t) {
assert rawSubscription != null;
assert rawSubscription != null : "Expected rawSubscription but got null";
// We know that there is nothing else that can happen on this stream as we are not sending the
// data to the dataSubscriber.
rawSubscription.cancel();
Expand All @@ -225,7 +238,7 @@ protected void handleSubscribe(PublisherSource.Subscriber<? super Payload> newSu
// Subscriber which is not allowed by the Reactive Streams specification.
newSubscriber.onSubscribe(delayedSubscription);
if (maybePayloadSubUpdater.compareAndSet(SplicingSubscriber.this, PENDING, newSubscriber)) {
assert rawSubscription != null;
assert rawSubscription != null : "Expected rawSubscription but got null";
delayedSubscription.delayedSubscription(rawSubscription);
} else {
// Entering this branch means either a duplicate subscriber or a stream that completed or failed
Expand All @@ -240,8 +253,15 @@ protected void handleSubscribe(PublisherSource.Subscriber<? super Payload> newSu
newSubscriber.onComplete();
} else if (maybeSubscriber instanceof Throwable && maybePayloadSubUpdater
.compareAndSet(SplicingSubscriber.this, maybeSubscriber, EMPTY_COMPLETED_DELIVERED)) {
// Premature error or cancel
// Premature error
newSubscriber.onError((Throwable) maybeSubscriber);
} else if (maybeSubscriber == CANCELED && maybePayloadSubUpdater
.compareAndSet(SplicingSubscriber.this, maybeSubscriber, EMPTY_COMPLETED_DELIVERED)) {
// Premature cancel, capture the full caller stack-trace to understand which code path
// subscribes to the payload after cancellation.
newSubscriber.onError(new CancellationException(
"Canceled prematurely from SplicingSubscriber.cancelData(..), current state: " +
maybeSubscriber));
} else {
// Existing subscriber or terminal event consumed by other subscriber (COMPLETED_DELIVERED)
newSubscriber.onError(new DuplicateSubscribeException(maybeSubscriber, newSubscriber,
Expand All @@ -259,17 +279,18 @@ public void onError(Throwable t) {
payloadSubscriber.onError(t);
} else {
final Object maybeSubscriber = maybePayloadSubUpdater.getAndSet(this, t);
if (maybeSubscriber == CANCELED || !metaSeenInOnNext) {
if (!metaSeenInOnNext) {
ensureResultSubscriberOnSubscribe();
dataSubscriber.onError(t);
} else if (maybeSubscriber instanceof PublisherSource.Subscriber) {
if (maybePayloadSubUpdater.compareAndSet(this, t, EMPTY_COMPLETED_DELIVERED)) {
((PublisherSource.Subscriber<Payload>) maybeSubscriber).onError(t);
} else {
((PublisherSource.Subscriber<Payload>) maybeSubscriber).onError(new IllegalStateException(
"Duplicate Subscribers are not allowed. Existing: " + maybeSubscriber +
", failed the race with a duplicate, but neither has seen onNext()"));
terminateWithIllegalStateException((PublisherSource.Subscriber<Payload>) maybeSubscriber);
}
} else if (maybeSubscriber == EMPTY_COMPLETED_DELIVERED) {
LOGGER.debug("Discarding a terminal error from upstream because the payload publisher was " +
"already terminated", t);
} else {
LOGGER.debug("Terminal error queued for delayed delivery to the payload publisher. " +
"If the payload is not subscribed, this event will not be delivered.", t);
Expand All @@ -289,26 +310,32 @@ public void onComplete() {
EMPTY_COMPLETED_DELIVERED)) {
((PublisherSource.Subscriber<Payload>) maybeSubscriber).onComplete();
} else {
((PublisherSource.Subscriber<Payload>) maybeSubscriber).onError(new IllegalStateException(
"Duplicate Subscribers are not allowed. Existing: " + maybeSubscriber +
", failed the race with a duplicate, but neither has seen onNext()"));
terminateWithIllegalStateException((PublisherSource.Subscriber<Payload>) maybeSubscriber);
}
} else if (!metaSeenInOnNext) {
ensureResultSubscriberOnSubscribe();
dataSubscriber.onError(new IllegalStateException(
"Stream unexpectedly completed without emitting any items"));
} else if (maybeSubscriber == EMPTY_COMPLETED_DELIVERED) {
LOGGER.debug("Discarding a terminal complete from upstream because the payload publisher was " +
"already terminated");
}
}
}

private void ensureResultSubscriberOnSubscribe() {
assert !metaSeenInOnNext;
assert !metaSeenInOnNext : "Already seen meta-data";
if (!onSubscribeSent) {
onSubscribeSent = true;
// Since we are going to deliver data or a terminal signal right after this,
// there is no need for this to be cancellable.
dataSubscriber.onSubscribe(IGNORE_CANCEL);
}
}

private void terminateWithIllegalStateException(PublisherSource.Subscriber<Payload> subscriber) {
subscriber.onError(new IllegalStateException("Duplicate Subscribers are not allowed. Existing: " +
subscriber + ", failed the race with a duplicate, but neither has seen onNext()"));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018-2019, 2021 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018-2022, 2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,8 @@
import io.servicetalk.concurrent.test.internal.TestSingleSubscriber;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.concurrent.CancellationException;

Expand Down Expand Up @@ -60,7 +62,7 @@ void streamWithHeaderAndPayloadShouldProduceDataWithEmbeddedPayload() {
upstream.onNext(metaData);
Data data = dataSubscriber.awaitOnSuccess();
assertThat(data, is(notNullValue()));
assertThat(data.meta(), equalTo(data.meta()));
assertThat(data.meta(), equalTo(metaData.meta()));
toSource(data.getPayload()).subscribe(payloadSubscriber);
payloadSubscriber.awaitSubscription().request(2);
upstream.onNext(one, last);
Expand All @@ -77,7 +79,7 @@ void streamWithHeaderAndEmptyPayloadShouldCompleteOnPublisherOnSubscribe()
upstream.onNext(metaData);
Data data = dataSubscriber.awaitOnSuccess();
assertThat(data, is(notNullValue()));
assertThat(data.meta(), equalTo(data.meta()));
assertThat(data.meta(), equalTo(metaData.meta()));
upstream.onComplete();
assertThat(data.getPayload().toFuture().get(), empty());
}
Expand All @@ -90,34 +92,49 @@ void emptyStreamShouldCompleteDataWithError() {
assertThat(dataSubscriber.awaitOnError(), instanceOf(IllegalStateException.class));
}

@Test
void cancelDataRacingWithDataShouldCompleteAndFailPublisherOnSubscribe() {
@ParameterizedTest(name = "{displayName} [{index}]: terminateUpstreamWithError={0}")
@ValueSource(booleans = {false, true})
void cancelDataRacingWithDataShouldCompleteAndFailPublisherOnSubscribe(boolean terminateUpstreamWithError) {
Single<Data> op = upstream.liftSyncToSingle(new SpliceFlatStreamToMetaSingle<>(Data::new));
toSource(op).subscribe(dataSubscriber);
upstream.onSubscribe(subscription);
dataSubscriber.awaitSubscription().cancel();
assertTrue(subscription.isCancelled());
upstream.onNext(metaData);
Data data = dataSubscriber.awaitOnSuccess();
assertThat(data, is(notNullValue()));
assertThat(data.meta(), equalTo(data.meta()));
assertThat(data.meta(), equalTo(metaData.meta()));
toSource(data.getPayload()).subscribe(payloadSubscriber);
assertThat(payloadSubscriber.awaitOnError(), instanceOf(CancellationException.class));
assertPayloadSubscriberReceivesCancellationException(terminateUpstreamWithError);
}

@Test
void cancelDataAfterDataCompleteShouldIgnoreCancelAndDeliverPublisherOnComplete() {
@ParameterizedTest(name = "{displayName} [{index}]: terminateUpstreamWithError={0}")
@ValueSource(booleans = {false, true})
void cancelDataAfterDataCompleteShouldCancelUpstreamAndFailPublisherOnSubscribe(
boolean terminateUpstreamWithError) {
Single<Data> op = upstream.liftSyncToSingle(new SpliceFlatStreamToMetaSingle<>(Data::new));
toSource(op).subscribe(dataSubscriber);
upstream.onSubscribe(subscription);
upstream.onNext(metaData);
Data data = dataSubscriber.awaitOnSuccess();
assertThat(data, is(notNullValue()));
assertThat(data.meta(), equalTo(data.meta()));
assertThat(data.meta(), equalTo(metaData.meta()));
assertFalse(subscription.isCancelled());
dataSubscriber.awaitSubscription().cancel();
assertTrue(subscription.isCancelled());
toSource(data.getPayload()).subscribe(payloadSubscriber);
payloadSubscriber.awaitSubscription().request(3);
upstream.onNext(one, two, last);
upstream.onComplete();
assertThat(payloadSubscriber.takeOnNext(3), contains(one, two, last));
payloadSubscriber.awaitOnComplete();
assertPayloadSubscriberReceivesCancellationException(terminateUpstreamWithError);
}

private void assertPayloadSubscriberReceivesCancellationException(boolean terminateUpstreamWithError) {
assertThat(payloadSubscriber.awaitOnError(), instanceOf(CancellationException.class));
// Verify payloadSubscriber does not receive a terminal signal two times. If received, TestPublisherSubscriber
// will throw IllegalStateException: Subscriber has already terminated.
if (terminateUpstreamWithError) {
upstream.onError(DELIBERATE_EXCEPTION);
} else {
upstream.onComplete();
}
}

@Test
Expand All @@ -131,37 +148,51 @@ void cancelDataBeforeDataCompleteShouldDeliverError() {
assertThat(dataSubscriber.awaitOnError(), is(DELIBERATE_EXCEPTION));
}

@Test
void streamErrorAfterPublisherSubscribeShouldDeliverError() {
@ParameterizedTest(name = "{displayName} [{index}]: withPayload={0}")
@ValueSource(booleans = {false, true})
void streamErrorAfterPublisherSubscribeShouldDeliverError(boolean withPayload) {
Single<Data> op = upstream.liftSyncToSingle(new SpliceFlatStreamToMetaSingle<>(Data::new));
toSource(op).subscribe(dataSubscriber);
upstream.onSubscribe(subscription);
upstream.onNext(metaData);
Data data = dataSubscriber.awaitOnSuccess();
assertThat(data, is(notNullValue()));
assertThat(data.meta(), equalTo(data.meta()));
assertThat(data.meta(), equalTo(metaData.meta()));
toSource(data.getPayload()).subscribe(payloadSubscriber);
payloadSubscriber.awaitSubscription().request(1);
upstream.onNext(one);
if (withPayload) {
upstream.onNext(one);
}
assertFalse(subscription.isCancelled());
upstream.onError(DELIBERATE_EXCEPTION);
assertThat(payloadSubscriber.takeOnNext(), is(one));
if (withPayload) {
assertThat(payloadSubscriber.takeOnNext(), is(one));
} else {
assertThat(payloadSubscriber.pollAllOnNext(), is(empty()));
}
assertThat(payloadSubscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION));
}

@Test
void streamCompleteAfterPublisherSubscribeShouldDeliverComplete() {
@ParameterizedTest(name = "{displayName} [{index}]: withPayload={0}")
@ValueSource(booleans = {false, true})
void streamCompleteAfterPublisherSubscribeShouldDeliverComplete(boolean withPayload) {
Single<Data> op = upstream.liftSyncToSingle(new SpliceFlatStreamToMetaSingle<>(Data::new));
toSource(op).subscribe(dataSubscriber);
upstream.onNext(metaData);
Data data = dataSubscriber.awaitOnSuccess();
assertThat(data, is(notNullValue()));
assertThat(data.meta(), equalTo(data.meta()));
assertThat(data.meta(), equalTo(metaData.meta()));
toSource(data.getPayload()).subscribe(payloadSubscriber);
payloadSubscriber.awaitSubscription().request(3);
upstream.onNext(one, two, last);
if (withPayload) {
upstream.onNext(one, two, last);
}
upstream.onComplete();
assertThat(payloadSubscriber.takeOnNext(3), contains(one, two, last));
if (withPayload) {
assertThat(payloadSubscriber.takeOnNext(3), contains(one, two, last));
} else {
assertThat(payloadSubscriber.pollAllOnNext(), is(empty()));
}
payloadSubscriber.awaitOnComplete();
}

Expand All @@ -172,7 +203,7 @@ void streamCompleteBeforePublisherSubscribeShouldDeliverCompleteOnSubscribe() {
upstream.onNext(metaData);
Data data = dataSubscriber.awaitOnSuccess();
assertThat(data, is(notNullValue()));
assertThat(data.meta(), equalTo(data.meta()));
assertThat(data.meta(), equalTo(metaData.meta()));
upstream.onComplete();
toSource(data.getPayload()).subscribe(payloadSubscriber);
payloadSubscriber.awaitOnComplete();
Expand All @@ -186,7 +217,7 @@ void streamErrorBeforePublisherSubscribeShouldDeliverErrorOnSubscribe() {
upstream.onNext(metaData);
Data data = dataSubscriber.awaitOnSuccess();
assertThat(data, is(notNullValue()));
assertThat(data.meta(), equalTo(data.meta()));
assertThat(data.meta(), equalTo(metaData.meta()));
assertFalse(subscription.isCancelled());
upstream.onError(DELIBERATE_EXCEPTION);
toSource(data.getPayload()).subscribe(payloadSubscriber);
Expand All @@ -200,7 +231,7 @@ void publisherSubscribeTwiceShouldFailSecondSubscriber() {
upstream.onNext(metaData);
Data data = dataSubscriber.awaitOnSuccess();
assertThat(data, is(notNullValue()));
assertThat(data.meta(), equalTo(data.meta()));
assertThat(data.meta(), equalTo(metaData.meta()));
toSource(data.getPayload()).subscribe(payloadSubscriber);
payloadSubscriber.awaitSubscription().request(3);
upstream.onNext(one, two, last);
Expand All @@ -218,7 +249,7 @@ void publisherSubscribeAgainAfterCompletingInitialSubscriberShouldFailSecondSubs
upstream.onNext(metaData);
Data data = dataSubscriber.awaitOnSuccess();
assertThat(data, is(notNullValue()));
assertThat(data.meta(), equalTo(data.meta()));
assertThat(data.meta(), equalTo(metaData.meta()));
toSource(data.getPayload()).subscribe(payloadSubscriber);
payloadSubscriber.awaitSubscription().request(3);
upstream.onNext(one, two, last);
Expand Down

0 comments on commit f345761

Please sign in to comment.