From b552fc4fd50c233cac04f44dcc85968888753bc1 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Thu, 30 Nov 2023 16:38:37 +0100 Subject: [PATCH] fix: rewrite of the concatenation operators This fixes race conditions in concatMap and stream concatenation operators. Refs: #1388 --- implementation/revapi.json | 9 +- .../mutiny/helpers/Subscriptions.java | 14 + .../operators/multi/MultiConcatMapOp.java | 366 ++++++++---------- .../mutiny/operators/multi/MultiConcatOp.java | 157 +------- .../multi/MultiConcatMapNoPrefetchTest.java | 25 +- 5 files changed, 217 insertions(+), 354 deletions(-) diff --git a/implementation/revapi.json b/implementation/revapi.json index d37de1ac5..2c593c3f8 100644 --- a/implementation/revapi.json +++ b/implementation/revapi.json @@ -51,7 +51,14 @@ "criticality" : "highlight", "minSeverity" : "POTENTIALLY_BREAKING", "minCriticality" : "documented", - "differences" : [ ] + "differences" : [ + { + "ignore": true, + "code": "java.class.removed", + "old": "class io.smallrye.mutiny.operators.multi.MultiConcatMapOp.ConcatMapMainSubscriber", + "justification": "Internal API refactoring" + } + ] } }, { "extension" : "revapi.reporter.json", diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/Subscriptions.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/Subscriptions.java index 44dcc1319..9787da081 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/Subscriptions.java +++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/Subscriptions.java @@ -6,6 +6,7 @@ import java.util.concurrent.Flow.Subscription; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReference; import io.smallrye.mutiny.CompositeException; @@ -103,6 +104,19 @@ public static long add(AtomicLong requested, long requests) { } } + public static long add(AtomicLongFieldUpdater updater, T receiver, long requests) { + for (;;) { + long r = updater.get(receiver); + if (r == Long.MAX_VALUE) { + return Long.MAX_VALUE; + } + long u = add(r, requests); + if (updater.compareAndSet(receiver, r, u)) { + return r; + } + } + } + /** * Atomically subtract the given number (positive, not validated) from the target field unless it contains Long.MAX_VALUE. * diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatMapOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatMapOp.java index 28736c5c2..de24ea69a 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatMapOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatMapOp.java @@ -2,20 +2,17 @@ import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscription; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Function; +import io.smallrye.mutiny.CompositeException; import io.smallrye.mutiny.Context; import io.smallrye.mutiny.Multi; -import io.smallrye.mutiny.helpers.ParameterValidation; import io.smallrye.mutiny.helpers.Subscriptions; import io.smallrye.mutiny.infrastructure.Infrastructure; import io.smallrye.mutiny.subscription.ContextSupport; import io.smallrye.mutiny.subscription.MultiSubscriber; -import io.smallrye.mutiny.subscription.SwitchableSubscriptionSubscriber; /** * ConcatMap operator without prefetching items from the upstream. @@ -25,7 +22,7 @@ *
  • The inner has no more outstanding requests.
  • *
  • The inner completed without emitting items or with outstanding requests.
  • * - * + *

    * This operator can collect failures and postpone them until termination. * * @param the upstream value type / input type @@ -50,265 +47,226 @@ public void subscribe(MultiSubscriber subscriber) { if (subscriber == null) { throw new NullPointerException("The subscriber must not be `null`"); } - ConcatMapMainSubscriber sub = new ConcatMapMainSubscriber<>(subscriber, - mapper, - postponeFailurePropagation); - - upstream.subscribe(Infrastructure.onMultiSubscription(upstream, sub)); + ConcatMapSubscriber concatMapSubscriber = new ConcatMapSubscriber<>(mapper, postponeFailurePropagation, + subscriber); + upstream.subscribe(Infrastructure.onMultiSubscription(upstream, concatMapSubscriber)); } - public static final class ConcatMapMainSubscriber implements MultiSubscriber, Subscription, ContextSupport { - - private static final int STATE_NEW = 0; // no request yet -- send first upstream request at this state - private static final int STATE_READY = 1; // first upstream request done, ready to receive items - private static final int STATE_EMITTING = 2; // received item from the upstream, subscribed to the inner - private static final int STATE_OUTER_TERMINATED = 3; // outer terminated, waiting for the inner to terminate - private static final int STATE_TERMINATED = 4; // inner and outer terminated - private static final int STATE_CANCELLED = 5; // cancelled - final AtomicInteger state = new AtomicInteger(STATE_NEW); - - final MultiSubscriber downstream; - final Function> mapper; - private final boolean delayError; - - final AtomicReference failures = new AtomicReference<>(); - - volatile Subscription upstream = null; - private static final AtomicReferenceFieldUpdater UPSTREAM_UPDATER = AtomicReferenceFieldUpdater - .newUpdater(ConcatMapMainSubscriber.class, Subscription.class, "upstream"); - - final ConcatMapInner inner; + static class ConcatMapSubscriber implements MultiSubscriber, Subscription, ContextSupport { - private final AtomicBoolean deferredUpstreamRequest = new AtomicBoolean(false); + private enum State { + INIT, + WAITING_NEXT_PUBLISHER, + WAITING_NEXT_SUBSCRIPTION, + EMITTING, + CANCELLED, + } - ConcatMapMainSubscriber( - MultiSubscriber downstream, - Function> mapper, - boolean delayError) { + private final Function> mapper; + private final boolean postponeFailurePropagation; + private final MultiSubscriber downstream; + private volatile long demand = 0L; + private volatile State state = State.INIT; + private volatile Subscription upstream; + private Subscription currentUpstream; + private boolean upstreamHasCompleted = false; + private Throwable failure; + + private static final AtomicReferenceFieldUpdater UPSTREAM_UPDATER = AtomicReferenceFieldUpdater + .newUpdater(ConcatMapSubscriber.class, Subscription.class, "upstream"); + private static final AtomicReferenceFieldUpdater STATE_UPDATER = AtomicReferenceFieldUpdater + .newUpdater(ConcatMapSubscriber.class, State.class, "state"); + private static final AtomicLongFieldUpdater DEMAND_UPDATER = AtomicLongFieldUpdater + .newUpdater(ConcatMapSubscriber.class, "demand"); + + ConcatMapSubscriber(Function> mapper, boolean postponeFailurePropagation, + MultiSubscriber downstream) { this.downstream = downstream; this.mapper = mapper; - this.delayError = delayError; - this.inner = new ConcatMapInner<>(this); + this.postponeFailurePropagation = postponeFailurePropagation; } @Override - public void request(long n) { - if (n > 0) { - if (state.compareAndSet(STATE_NEW, STATE_READY)) { - upstream.request(1); - } - if (deferredUpstreamRequest.compareAndSet(true, false)) { - upstream.request(1); - } - inner.request(n); - if (inner.requested() != 0L && deferredUpstreamRequest.compareAndSet(true, false)) { - upstream.request(1); - } + public Context context() { + if (downstream instanceof ContextSupport) { + return ((ContextSupport) downstream).context(); } else { - downstream.onFailure(Subscriptions.getInvalidRequestException()); + return Context.empty(); } } - @Override - public void cancel() { - while (true) { - int state = this.state.get(); - if (state == STATE_CANCELLED) { - return; - } - if (this.state.compareAndSet(state, STATE_CANCELLED)) { - if (state == STATE_OUTER_TERMINATED) { - inner.cancel(); - } else { - inner.cancel(); - upstream.cancel(); - } - return; - } - } - } + private final MultiSubscriber innerSubscriber = new InnerSubscriber(); @Override - public void onSubscribe(Subscription s) { - if (UPSTREAM_UPDATER.compareAndSet(this, null, s)) { + public void onSubscribe(Subscription subscription) { + if (UPSTREAM_UPDATER.compareAndSet(this, null, subscription)) { + upstream = subscription; downstream.onSubscribe(this); + } else { + subscription.cancel(); } } @Override public void onItem(I item) { - if (!state.compareAndSet(STATE_READY, STATE_EMITTING)) { + if (state == State.CANCELLED) { return; } - - try { - Publisher p = mapper.apply(item); - if (p == null) { - throw new NullPointerException(ParameterValidation.MAPPER_RETURNED_NULL); - } - - p.subscribe(inner); - } catch (Throwable e) { - if (postponeFailure(e, upstream)) { - innerComplete(0L); + if (STATE_UPDATER.compareAndSet(this, State.WAITING_NEXT_PUBLISHER, State.WAITING_NEXT_SUBSCRIPTION)) { + try { + Publisher publisher = mapper.apply(item); + if (publisher == null) { + throw new NullPointerException("The mapper produced a null publisher"); + } + publisher.subscribe(innerSubscriber); + } catch (Throwable err) { + upstream.cancel(); + onFailure(err); } } } @Override - public void onFailure(Throwable t) { - if (postponeFailure(t, inner)) { - onCompletion(); + public void onFailure(Throwable failure) { + if (STATE_UPDATER.getAndSet(this, State.CANCELLED) == State.CANCELLED) { + return; } + downstream.onFailure(addFailure(failure)); } - @Override - public void onCompletion() { - while (true) { - int state = this.state.get(); - if (state == STATE_NEW || state == STATE_READY) { - if (this.state.compareAndSet(state, STATE_TERMINATED)) { - terminateDownstream(); - return; - } - } else if (state == STATE_EMITTING) { - if (this.state.compareAndSet(state, STATE_OUTER_TERMINATED)) { - return; - } + private Throwable addFailure(Throwable failure) { + if (this.failure != null) { + if (this.failure instanceof CompositeException) { + this.failure = new CompositeException((CompositeException) this.failure, failure); } else { - return; + this.failure = new CompositeException(this.failure, failure); } + } else { + this.failure = failure; } + return this.failure; } - public synchronized void tryEmit(O value) { - switch (state.get()) { - case STATE_EMITTING: - case STATE_OUTER_TERMINATED: - downstream.onItem(value); - break; - default: - break; + @Override + public void onCompletion() { + if (state == State.CANCELLED) { + return; } - } - - public void innerComplete(long emitted) { - if (this.state.compareAndSet(STATE_EMITTING, STATE_READY)) { - // Inner completed but there are outstanding requests from inner, - // Or the inner completed without producing any items - // Request new item from upstream - if (inner.requested() != 0L || emitted == 0) { - upstream.request(1); + upstreamHasCompleted = true; + if (STATE_UPDATER.compareAndSet(this, State.WAITING_NEXT_PUBLISHER, State.CANCELLED) + || STATE_UPDATER.compareAndSet(this, State.INIT, State.CANCELLED)) { + if (failure == null) { + downstream.onCompletion(); } else { - deferredUpstreamRequest.set(true); + downstream.onFailure(failure); } - } else if (this.state.compareAndSet(STATE_OUTER_TERMINATED, STATE_TERMINATED)) { - terminateDownstream(); } } - public void innerFailure(Throwable e, long emitted) { - if (postponeFailure(e, upstream)) { - innerComplete(emitted); - } - } - - private boolean postponeFailure(Throwable e, Subscription subscription) { - if (e == null) { - return true; - } - - Subscriptions.addFailure(failures, e); - - if (delayError) { - return true; + @Override + public void request(long n) { + if (state == State.CANCELLED) { + return; } - - while (true) { - int state = this.state.get(); - if (state == STATE_CANCELLED || state == STATE_TERMINATED) { - return false; + if (n <= 0) { + cancel(); + downstream.onFailure(Subscriptions.getInvalidRequestException()); + } else { + Subscriptions.add(DEMAND_UPDATER, this, n); + if (STATE_UPDATER.compareAndSet(this, State.INIT, State.WAITING_NEXT_PUBLISHER)) { + upstream.request(1L); } else { - if (this.state.compareAndSet(state, STATE_TERMINATED)) { - subscription.cancel(); - synchronized (this) { - downstream.onFailure(failures.get()); - } - return false; + if (state == State.WAITING_NEXT_PUBLISHER) { + upstream.request(1L); + } else if (state == State.EMITTING) { + currentUpstream.request(n); } } } } - private void terminateDownstream() { - Throwable ex = failures.get(); - if (ex != null) { - downstream.onFailure(ex); + @Override + public void cancel() { + State previousState = STATE_UPDATER.getAndSet(this, State.CANCELLED); + if (previousState == State.CANCELLED) { return; } - downstream.onCompletion(); - } - - @Override - public Context context() { - if (downstream instanceof ContextSupport) { - return ((ContextSupport) downstream).context(); - } else { - return Context.empty(); + if (previousState == State.EMITTING) { + currentUpstream.cancel(); + upstream.cancel(); + } else if (upstream != null) { + upstream.cancel(); } } - } - - static final class ConcatMapInner extends SwitchableSubscriptionSubscriber { - private final ConcatMapMainSubscriber parent; + class InnerSubscriber implements MultiSubscriber, ContextSupport { - long emitted; - - /** - * Downstream passed as {@code null} to {@link SwitchableSubscriptionSubscriber} as accessors are not reachable. - * Effective downstream is {@code parent}. - * - * @param parent parent as downstream - */ - ConcatMapInner(ConcatMapMainSubscriber parent) { - super(null); - this.parent = parent; - } - - @Override - public void onItem(O item) { - emitted++; - parent.tryEmit(item); - } - - @Override - public void onFailure(Throwable failure) { - long p = emitted; - - if (p != 0L) { - emitted = 0L; - emitted(p); + @Override + public void onSubscribe(Subscription subscription) { + if (state == State.CANCELLED) { + return; + } + currentUpstream = subscription; + state = State.EMITTING; + long pending = demand; + if (pending > 0L) { + currentUpstream.request(pending); + } } - parent.innerFailure(failure, p); - } - - @Override - public void onCompletion() { - long p = emitted; + @Override + public void onItem(O item) { + if (state == State.CANCELLED) { + return; + } + DEMAND_UPDATER.decrementAndGet(ConcatMapSubscriber.this); + downstream.onItem(item); + } - if (p != 0L) { - emitted = 0L; - emitted(p); + @Override + public void onFailure(Throwable failure) { + if (state == State.CANCELLED) { + return; + } + state = State.WAITING_NEXT_PUBLISHER; + Throwable err = addFailure(failure); + if (postponeFailurePropagation) { + onCompletion(); + } else { + state = State.CANCELLED; + upstream.cancel(); + downstream.onFailure(err); + } } - parent.innerComplete(p); - } + @Override + public void onCompletion() { + if (state == State.CANCELLED) { + return; + } + if (!upstreamHasCompleted) { + state = State.WAITING_NEXT_PUBLISHER; + if (demand > 0L) { + upstream.request(1L); + } + } else { + state = State.CANCELLED; + if (failure != null) { + downstream.onFailure(failure); + } else { + downstream.onComplete(); + } + } + } - @Override - public Context context() { - return parent.context(); + @Override + public Context context() { + if (downstream instanceof ContextSupport) { + return ((ContextSupport) downstream).context(); + } else { + return Context.empty(); + } + } } } } diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatOp.java index 20e1e0b75..f7bbe0a7b 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatOp.java @@ -1,15 +1,13 @@ package io.smallrye.mutiny.operators.multi; import java.util.concurrent.Flow.Publisher; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; +import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.helpers.ParameterValidation; import io.smallrye.mutiny.helpers.Subscriptions; import io.smallrye.mutiny.infrastructure.Infrastructure; import io.smallrye.mutiny.operators.AbstractMulti; import io.smallrye.mutiny.subscription.MultiSubscriber; -import io.smallrye.mutiny.subscription.SwitchableSubscriptionSubscriber; /** * Concatenates a fixed set of Publishers. @@ -46,151 +44,16 @@ public void subscribe(MultiSubscriber actual) { } if (postponeFailurePropagation) { - ConcatArrayAndPostponeFailureSubscriber parent = new ConcatArrayAndPostponeFailureSubscriber<>(actual, - publishers); - actual.onSubscribe(parent); - - if (!parent.isCancelled()) { - parent.onCompletion(); - } + Multi.createFrom().items(publishers) + .onItem().transformToMulti(publisher -> publisher) + .collectFailures() + .concatenate() + .subscribe().withSubscriber(actual); } else { - ConcatArraySubscriber parent = new ConcatArraySubscriber<>(actual, publishers); - actual.onSubscribe(parent); - - if (!parent.isCancelled()) { - parent.onCompletion(); - } + Multi.createFrom().items(publishers) + .onItem().transformToMulti(publisher -> publisher) + .concatenate() + .subscribe().withSubscriber(actual); } } - - static final class ConcatArraySubscriber extends SwitchableSubscriptionSubscriber { - - private final Publisher[] upstreams; - - private int currentIndex; - private long emitted; - - private final AtomicInteger wip = new AtomicInteger(); - - ConcatArraySubscriber(MultiSubscriber actual, Publisher[] upstreams) { - super(actual); - this.upstreams = upstreams; - } - - @Override - public void onItem(T t) { - emitted++; - downstream.onItem(t); - } - - @Override - public void onCompletion() { - if (wip.getAndIncrement() == 0) { - Publisher[] a = upstreams; - do { - - if (isCancelled()) { - return; - } - - int i = currentIndex; - if (i == a.length) { - downstream.onCompletion(); - return; - } - - Publisher p = a[i]; - long c = emitted; - if (c != 0L) { - emitted = 0L; - emitted(c); - } - p.subscribe(Infrastructure.onMultiSubscription(p, this)); - - if (isCancelled()) { - return; - } - - currentIndex = ++i; - } while (wip.decrementAndGet() != 0); - } - - } - } - - static final class ConcatArrayAndPostponeFailureSubscriber extends SwitchableSubscriptionSubscriber { - - final Publisher[] upstreams; - - int index; - long produced; - - private final AtomicInteger wip = new AtomicInteger(); - private final AtomicReference failure = new AtomicReference<>(); - - ConcatArrayAndPostponeFailureSubscriber(MultiSubscriber actual, Publisher[] upstreams) { - super(actual); - this.upstreams = upstreams; - } - - @Override - public void onItem(T t) { - produced++; - downstream.onItem(t); - } - - @Override - public void onFailure(Throwable t) { - if (Subscriptions.addFailure(failure, t)) { - onCompletion(); - } - } - - @Override - public void onCompletion() { - if (wip.getAndIncrement() == 0) { - Publisher[] a = upstreams; - do { - - if (isCancelled()) { - return; - } - - int i = index; - if (i == a.length) { - Throwable last = Subscriptions.markFailureAsTerminated(failure); - if (last != null) { - downstream.onFailure(last); - } else { - downstream.onCompletion(); - } - return; - } - - Publisher p = a[i]; - - if (p == null) { - downstream.onFailure( - new NullPointerException("Source Publisher at currentIndex " + i + " is null")); - return; - } - - long c = produced; - if (c != 0L) { - produced = 0L; - emitted(c); - } - p.subscribe(Infrastructure.onMultiSubscription(p, this)); - - if (isCancelled()) { - return; - } - - index = ++i; - } while (wip.decrementAndGet() != 0); - } - - } - } - } diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiConcatMapNoPrefetchTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiConcatMapNoPrefetchTest.java index 63304632e..18b6f74d0 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiConcatMapNoPrefetchTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiConcatMapNoPrefetchTest.java @@ -14,7 +14,6 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import io.smallrye.mutiny.CompositeException; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.groups.MultiFlatten; @@ -36,6 +35,20 @@ void setUp() { }); } + @Test + void simpleConcatMap() { + AssertSubscriber sub = Multi.createFrom().range(1, 3) + .onItem().transformToMultiAndConcatenate(n -> Multi.createFrom().items(n * 10, n * 20)) + .subscribe().withSubscriber(AssertSubscriber.create()); + sub.request(1); + sub.assertItems(10); + sub.request(2); + sub.assertItems(10, 20, 20); + sub.request(Long.MAX_VALUE); + sub.assertItems(10, 20, 20, 40); + sub.assertCompleted(); + } + @ParameterizedTest @MethodSource("argsTransformToUni") void testTransformToUni(boolean prefetch, int[] upstreamRequests) { @@ -183,7 +196,7 @@ void testMapperReturningNullpostponeFailure() { .concatenate(); AssertSubscriber ts = new AssertSubscriber<>(5); result.subscribe(ts); - ts.assertHasNotReceivedAnyItem().assertFailedWith(CompositeException.class); + ts.assertHasNotReceivedAnyItem().assertFailedWith(NullPointerException.class); } @Test @@ -270,4 +283,12 @@ void testInnerCompleteSubscriptionAfterTermination() { ts.awaitCompletion(); } + @Test + void testUpfrontCompletion() { + AssertSubscriber sub = Multi.createFrom().empty() + .onItem().transformToMultiAndConcatenate(n -> Multi.createFrom().items(1, 2, 3)) + .subscribe().withSubscriber(AssertSubscriber.create()); + + sub.assertCompleted().assertHasNotReceivedAnyItem(); + } }