Skip to content

Commit

Permalink
Handle reentry for n-arity Publisher.from (#1626)
Browse files Browse the repository at this point in the history
Motivation:
FromPublisher(s) with n-arity have no safety net for re-entrance. This can result in emitting duplicate items.

Modifications:
Handle re-entry.

Result:
No duplicates.
  • Loading branch information
tkountis authored and bondolo committed Jun 28, 2021
1 parent 07b451c commit fa5f888
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 165 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,63 @@

import javax.annotation.Nullable;

import static io.servicetalk.concurrent.internal.FlowControlUtils.addWithOverflowProtection;
import static io.servicetalk.concurrent.internal.SubscriberUtils.handleExceptionFromOnSubscribe;
import static io.servicetalk.concurrent.internal.SubscriberUtils.isRequestNValid;
import static io.servicetalk.concurrent.internal.SubscriberUtils.newExceptionForInvalidRequestN;
import static java.lang.Math.min;

final class FromNPublisher<T> extends AbstractSynchronousPublisher<T> {

private static final Object UNUSED_REF = new Object();

final class From2Publisher<T> extends AbstractSynchronousPublisher<T> {
@Nullable
private final T v1;
@Nullable
private final T v2;
@Nullable
private final T v3;

From2Publisher(@Nullable T v1, @Nullable T v2) {
@SuppressWarnings("unchecked")
FromNPublisher(@Nullable T v1, @Nullable T v2) {
this.v1 = (T) UNUSED_REF;
this.v2 = v1;
this.v3 = v2;
}

FromNPublisher(@Nullable T v1, @Nullable T v2, @Nullable T v3) {
this.v1 = v1;
this.v2 = v2;
this.v3 = v3;
}

@Override
void doSubscribe(final Subscriber<? super T> subscriber) {
try {
subscriber.onSubscribe(new TwoValueSubscription(subscriber));
subscriber.onSubscribe(new NValueSubscription(subscriber));
} catch (Throwable cause) {
handleExceptionFromOnSubscribe(subscriber, cause);
}
}

private final class TwoValueSubscription implements Subscription {
private static final byte INIT = 0;
private static final byte DELIVERED_V1 = 1;
private static final byte CANCELLED = 2;
private final class NValueSubscription implements Subscription {
private static final byte TERMINATED = 3;
private byte requested;
private byte state;
private final Subscriber<? super T> subscriber;

private TwoValueSubscription(final Subscriber<? super T> subscriber) {
private NValueSubscription(final Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
if (v1 == UNUSED_REF) {
// 3-value version - simulate 1 emitted item, start counting from 1.
requested = 1;
state++;
}
}

@Override
public void cancel() {
if (state != TERMINATED) {
state = CANCELLED;
}
state = TERMINATED;
}

@Override
Expand All @@ -70,33 +86,32 @@ public void request(final long n) {
subscriber.onError(newExceptionForInvalidRequestN(n));
return;
}
if (state == INIT) {
state = DELIVERED_V1;
try {
subscriber.onNext(v1);
} catch (Throwable cause) {
state = TERMINATED;
subscriber.onError(cause);
return;
}
// We could check CANCELLED here and return, but it isn't required.
if (n > 1) {
deliverV2();
if (requested == 3) {
return;
}
requested = (byte) min(3, addWithOverflowProtection(requested, n));
boolean successful = true;
while (successful && state < requested) {
if (state == 0) {
successful = deliver(v1);
} else if (state == 1) {
successful = deliver(v2);
} else if (state == 2 && deliver(v3)) {
subscriber.onComplete();
}
} else if (state == DELIVERED_V1) {
deliverV2();
}
}

private void deliverV2() {
state = TERMINATED;
private boolean deliver(@Nullable T value) {
++state;
try {
subscriber.onNext(v2);
subscriber.onNext(value);
return true;
} catch (Throwable cause) {
state = TERMINATED;
subscriber.onError(cause);
return;
return false;
}
subscriber.onComplete();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3387,7 +3387,7 @@ public static <T> Publisher<T> from(@Nullable T value) {
* @see <a href="http://reactivex.io/documentation/operators/just.html">ReactiveX just operator.</a>
*/
public static <T> Publisher<T> from(@Nullable T v1, @Nullable T v2) {
return new From2Publisher<>(v1, v2);
return new FromNPublisher<>(v1, v2);
}

/**
Expand All @@ -3405,7 +3405,7 @@ public static <T> Publisher<T> from(@Nullable T v1, @Nullable T v2) {
* @see <a href="http://reactivex.io/documentation/operators/just.html">ReactiveX just operator.</a>
*/
public static <T> Publisher<T> from(@Nullable T v1, @Nullable T v2, @Nullable T v3) {
return new From3Publisher<>(v1, v2, v3);
return new FromNPublisher<>(v1, v2, v3);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import org.mockito.stubbing.Answer;

import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.Publisher.from;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
import static io.servicetalk.concurrent.internal.SubscriberUtils.newExceptionForInvalidRequestN;
import static java.lang.Long.MAX_VALUE;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
Expand Down Expand Up @@ -116,6 +118,52 @@ void invalidRequestNNeg() {
invalidRequestN(-1);
}

@Test
void reentry() {
reentry(1, 1);
}

@Test
void reentryMaxFirst() {
reentry(MAX_VALUE, 1);
}

@Test
void reentryMaxLater() {
reentry(1, MAX_VALUE);
}

void reentry(final long firstDemand, final long nextDemand) {
final int[] emitted = {0};
final boolean[] completed = {false};
final Subscription[] subscription = new Subscription[1];
toSource(fromPublisher()).subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(final Subscription subscription1) {
subscription[0] = subscription1;
subscription1.request(firstDemand);
}

@Override
public void onNext(@Nullable final Integer integer) {
emitted[0]++;
subscription[0].request(nextDemand);
}

@Override
public void onError(final Throwable t) {
}

@Override
public void onComplete() {
completed[0] = true;
}
});

assertThat(emitted[0], is(2));
assertThat(completed[0], is(true));
}

private void invalidRequestN(long n) {
toSource(fromPublisher()).subscribe(subscriber);
subscriber.awaitSubscription().request(n);
Expand Down
Loading

0 comments on commit fa5f888

Please sign in to comment.