Skip to content

Commit

Permalink
SingleConcatWithPublisher and invalid request-n
Browse files Browse the repository at this point in the history
__Motivation__

If invalid calls to request-n are made for `Single.concat(Publisher)` before the `Single` result is propagated, we first emit the result and then propagate the invalid demand.
For cases, when the concatenated `Publisher` completes inline with `subscribe()` (eg: `Publisher.empty()`), this means we will never deliver an error for the invalid demand.

__Modification__

Deliver error when invalid demand is made in the above case instead of `Single`'s result and do not subscribe to the concatenated `Publisher`

__Result__

Invalid demand always terminates the `Subscriber` with an error.
  • Loading branch information
Nitesh Kant committed May 6, 2020
1 parent e57a01c commit 7f720fb
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.internal.SubscriberUtils.isRequestNValid;
import static io.servicetalk.concurrent.internal.SubscriberUtils.newExceptionForInvalidRequestN;
import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater;

final class SingleConcatWithPublisher<T> extends AbstractNoHandleSubscribePublisher<T> {
Expand All @@ -46,6 +48,7 @@ private static final class ConcatSubscriber<T> extends CancellableThenSubscripti
private static final Object INITIAL = new Object();
private static final Object REQUESTED = new Object();
private static final Object CANCELLED = new Object();
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<ConcatSubscriber, Object> mayBeResultUpdater =
newUpdater(ConcatSubscriber.class, Object.class, "mayBeResult");

Expand Down Expand Up @@ -119,9 +122,14 @@ public void request(long n) {
break;
} else if (mayBeResultUpdater.compareAndSet(this, oldVal, REQUESTED)) {
if (oldVal != INITIAL) {
@SuppressWarnings("unchecked")
final T tVal = (T) oldVal;
emitSingleSuccessToTarget(tVal);
if (!isRequestNValid(n)) {
target.onError(newExceptionForInvalidRequestN(n));
return;
} else {
@SuppressWarnings("unchecked")
final T tVal = (T) oldVal;
emitSingleSuccessToTarget(tVal);
}
}
// forward any invalid requestN on to the super class so it can propagate an error if necessary.
if (n != 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.servicetalk.concurrent.api.single;

import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.TestCancellable;
import io.servicetalk.concurrent.api.TestPublisher;
import io.servicetalk.concurrent.api.TestPublisherSubscriber;
Expand All @@ -28,17 +29,17 @@
import org.junit.Test;
import org.junit.rules.Timeout;

import static io.servicetalk.concurrent.api.Publisher.empty;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

public class SingleConcatWithPublisherTest {
@Rule
Expand Down Expand Up @@ -101,6 +102,17 @@ private void invalidRequestBeforeNextSubscribe(long invalidN) {
assertThat("Unexpected requestN amount", subscription.requested(), is(invalidN));
}

@Test
public void invalidRequestNWithInlineSourceCompletion() {
TestPublisherSubscriber<Integer> subscriber = new TestPublisherSubscriber<>();
toSource(Single.succeeded(1).concat(empty())).subscribe(subscriber);
assertThat("Unexpected terminal.", subscriber.subscriptionReceived(), is(true));
subscriber.request(-1);
TerminalNotification term = subscriber.takeTerminal();
assertThat("Unexpected terminal.", term, is(notNullValue()));
assertThat("Unexpected terminal.", term.cause(), instanceOf(IllegalArgumentException.class));
}

@Test
public void invalidRequestAfterNextSubscribe() {
triggerNextSubscribe();
Expand Down Expand Up @@ -140,6 +152,7 @@ private void invalidThenValidRequest(long invalidN) {
@Test
public void request0PropagatedAfterSuccess() {
source.onSuccess(1);
subscriber.request(1); // get the success from the Single
subscriber.request(0);
next.onSubscribe(subscription);
assertThat("Invalid request-n propagated " + subscription, subscription.requestedEquals(0),
Expand All @@ -150,33 +163,33 @@ public void request0PropagatedAfterSuccess() {
public void sourceError() {
source.onError(DELIBERATE_EXCEPTION);
assertThat("Unexpected subscriber termination.", subscriber.takeError(), sameInstance(DELIBERATE_EXCEPTION));
assertFalse("Next source subscribed unexpectedly.", next.isSubscribed());
assertThat("Next source subscribed unexpectedly.", next.isSubscribed(), is(false));
}

@Test
public void cancelSource() {
assertThat("Subscriber terminated unexpectedly.", subscriber.isTerminated(), is(false));
subscriber.cancel();
assertTrue("Original single not cancelled.", cancellable.isCancelled());
assertFalse("Next source subscribed unexpectedly.", next.isSubscribed());
assertThat("Original single not cancelled.", cancellable.isCancelled(), is(true));
assertThat("Next source subscribed unexpectedly.", next.isSubscribed(), is(false));
}

@Test
public void cancelSourcePostRequest() {
assertThat("Subscriber terminated unexpectedly.", subscriber.isTerminated(), is(false));
subscriber.request(1);
subscriber.cancel();
assertTrue("Original single not cancelled.", cancellable.isCancelled());
assertFalse("Next source subscribed unexpectedly.", next.isSubscribed());
assertThat("Original single not cancelled.", cancellable.isCancelled(), is(true));
assertThat("Next source subscribed unexpectedly.", next.isSubscribed(), is(false));
}

@Test
public void cancelNext() {
triggerNextSubscribe();
assertThat("Subscriber terminated unexpectedly.", subscriber.isTerminated(), is(false));
subscriber.cancel();
assertFalse("Original single cancelled unexpectedly.", cancellable.isCancelled());
assertTrue("Next source not cancelled.", subscription.isCancelled());
assertThat("Original single cancelled unexpectedly.", cancellable.isCancelled(), is(false));
assertThat("Next source not cancelled.", subscription.isCancelled(), is(true));
}

@Test
Expand Down

0 comments on commit 7f720fb

Please sign in to comment.