diff --git a/reactor-core/src/jcstress/java/reactor/core/publisher/FluxCreateStressTest.java b/reactor-core/src/jcstress/java/reactor/core/publisher/FluxCreateStressTest.java new file mode 100644 index 0000000000..edec0f5183 --- /dev/null +++ b/reactor-core/src/jcstress/java/reactor/core/publisher/FluxCreateStressTest.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2023 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.publisher; + +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.function.LongConsumer; + +import org.openjdk.jcstress.annotations.Actor; +import org.openjdk.jcstress.annotations.Arbiter; +import org.openjdk.jcstress.annotations.JCStressTest; +import org.openjdk.jcstress.annotations.Outcome; +import org.openjdk.jcstress.annotations.State; +import org.openjdk.jcstress.infra.results.J_Result; +import org.reactivestreams.Subscription; + +import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE; + +public abstract class FluxCreateStressTest { + + @JCStressTest + @Outcome(id = {"4"}, expect = ACCEPTABLE, desc = "demand delivered") + @State + public static class RequestAndOnRequestStressTest implements LongConsumer { + + FluxSink sink; + + Subscription s; + + volatile long observedDemand; + static final AtomicLongFieldUpdater OBSERVED_DEMAND + = AtomicLongFieldUpdater.newUpdater(RequestAndOnRequestStressTest.class, "observedDemand"); + + { + Flux.create(sink -> this.sink = sink) + .subscribe(new BaseSubscriber() { + @Override + protected void hookOnSubscribe(Subscription subscription) { + RequestAndOnRequestStressTest.this.s = subscription; + } + }); + } + + @Override + public void accept(long value) { + Operators.addCap(OBSERVED_DEMAND, this, value); + } + + @Actor + public void request() { + s.request(1); + s.request(1); + s.request(1); + s.request(1); + } + + @Actor + public void setOnRequestConsumer() { + sink.onRequest(this); + } + + + @Arbiter + public void artiber(J_Result r) { + r.r1 = observedDemand; + } + } +} diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxCreate.java b/reactor-core/src/main/java/reactor/core/publisher/FluxCreate.java index d3fb256492..27564cfc37 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxCreate.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxCreate.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -268,7 +268,7 @@ void drainLoop() { @Override public FluxSink onRequest(LongConsumer consumer) { - sink.onRequest(consumer, consumer, sink.requested); + sink.onPushPullRequest(consumer); return this; } @@ -409,6 +409,8 @@ static abstract class BaseSink extends AtomicBoolean static final Disposable TERMINATED = OperatorDisposables.DISPOSED; static final Disposable CANCELLED = Disposables.disposed(); + static final LongConsumer NOOP_CONSUMER = n -> {}; + final CoreSubscriber actual; final Context ctx; @@ -434,6 +436,7 @@ static abstract class BaseSink extends AtomicBoolean BaseSink(CoreSubscriber actual) { this.actual = actual; this.ctx = actual.currentContext(); + REQUESTED.lazySet(this, Long.MIN_VALUE); } @Override @@ -500,7 +503,7 @@ void disposeResource(boolean isCancel) { @Override public long requestedFromDownstream() { - return requested; + return requested & Long.MAX_VALUE; } void onCancel() { @@ -519,12 +522,15 @@ final boolean isTerminated() { @Override public final void request(long n) { if (Operators.validate(n)) { - Operators.addCap(REQUESTED, this, n); + long s = addCap(this, n); - LongConsumer consumer = requestConsumer; - if (n > 0 && consumer != null && !isCancelled()) { - consumer.accept(n); + if (hasRequestConsumer(s)) { + LongConsumer consumer = requestConsumer; + if (!isCancelled()) { + consumer.accept(n); + } } + onRequestedFromDownstream(); } } @@ -541,20 +547,29 @@ public CoreSubscriber actual() { @Override public FluxSink onRequest(LongConsumer consumer) { Objects.requireNonNull(consumer, "onRequest"); - onRequest(consumer, n -> { - }, Long.MAX_VALUE); + onPushRequest(consumer); return this; } - protected void onRequest(LongConsumer initialRequestConsumer, - LongConsumer requestConsumer, - long value) { + protected void onPushRequest(LongConsumer initialRequestConsumer) { + if (!REQUEST_CONSUMER.compareAndSet(this, null, NOOP_CONSUMER)) { + throw new IllegalStateException( + "A consumer has already been assigned to consume requests"); + } + + // do not change real flag since real consumer is technically absent + initialRequestConsumer.accept(Long.MAX_VALUE); + } + + protected void onPushPullRequest(LongConsumer requestConsumer) { if (!REQUEST_CONSUMER.compareAndSet(this, null, requestConsumer)) { throw new IllegalStateException( "A consumer has already been assigned to consume requests"); } - else if (value > 0) { - initialRequestConsumer.accept(value); + + long initialRequest = markRequestConsumerSet(this); + if (initialRequest > 0) { + requestConsumer.accept(initialRequest); } } @@ -607,7 +622,7 @@ else if (c instanceof SinkDisposable) { public Object scanUnsafe(Attr key) { if (key == Attr.TERMINATED) return disposable == TERMINATED; if (key == Attr.CANCELLED) return disposable == CANCELLED; - if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return requested; + if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return requestedFromDownstream(); if (key == Attr.RUN_STYLE) return Attr.RunStyle.ASYNC; return InnerProducer.super.scanUnsafe(key); @@ -617,6 +632,54 @@ public Object scanUnsafe(Attr key) { public String toString() { return "FluxSink"; } + + static void produced(BaseSink instance, long toSub) { + long s, r, u; + do { + s = instance.requested; + r = s & Long.MAX_VALUE; + if (r == 0 || r == Long.MAX_VALUE) { + return; + } + u = Operators.subOrZero(r, toSub); + } while (!REQUESTED.compareAndSet(instance, s, u | (s & Long.MIN_VALUE))); + } + + + static long addCap(BaseSink instance, long toAdd) { + long r, u, s; + for (;;) { + s = instance.requested; + r = s & Long.MAX_VALUE; + if (r == Long.MAX_VALUE) { + return Long.MAX_VALUE; + } + u = Operators.addCap(r, toAdd); + if (REQUESTED.compareAndSet(instance, s, u | (s & Long.MIN_VALUE))) { + return s; + } + } + } + + static long markRequestConsumerSet(BaseSink instance) { + long u, s; + for (;;) { + s = instance.requested; + + if (hasRequestConsumer(s)) { + return s; + } + + u = s & Long.MAX_VALUE; + if (REQUESTED.compareAndSet(instance, s, u)) { + return u; + } + } + } + + static boolean hasRequestConsumer(long requestedState) { + return (requestedState & Long.MIN_VALUE) == 0; + } } static final class IgnoreSink extends BaseSink { @@ -639,8 +702,9 @@ public FluxSink next(T t) { actual.onNext(t); for (; ; ) { - long r = requested; - if (r == 0L || REQUESTED.compareAndSet(this, r, r - 1)) { + long s = requested; + long r = s & Long.MAX_VALUE; + if (r == 0L || REQUESTED.compareAndSet(this, s, (r - 1) | (s & Long.MIN_VALUE))) { return this; } } @@ -665,9 +729,9 @@ public final FluxSink next(T t) { return this; } - if (requested != 0) { + if (requestedFromDownstream() != 0) { actual.onNext(t); - Operators.produced(REQUESTED, this, 1); + produced(this, 1); } else { onOverflow(); @@ -776,7 +840,7 @@ void drain() { final Queue q = queue; for (; ; ) { - long r = requested; + long r = requestedFromDownstream(); long e = 0L; while (e != r) { @@ -844,7 +908,7 @@ void drain() { } if (e != 0) { - Operators.produced(REQUESTED, this, e); + produced(this, e); } if (WIP.decrementAndGet(this) == 0) { @@ -936,7 +1000,7 @@ void drain() { final AtomicReference q = queue; for (; ; ) { - long r = requested; + long r = requestedFromDownstream(); long e = 0L; while (e != r) { @@ -1006,7 +1070,7 @@ void drain() { } if (e != 0) { - Operators.produced(REQUESTED, this, e); + produced(this, e); } if (WIP.decrementAndGet(this) == 0) { diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxSink.java b/reactor-core/src/main/java/reactor/core/publisher/FluxSink.java index c4b6bf5835..6615b15bca 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxSink.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxSink.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -108,6 +108,11 @@ default ContextView contextView() { * or {@link Flux#create(java.util.function.Consumer, FluxSink.OverflowStrategy)}, * the consumer * is invoked for every request to enable a hybrid backpressure-enabled push/pull model. + *

+ * Note: in case of multiple {@link Subscription#request} happening + * concurrently to this method, the first consumer invocation may process + * accumulated demand instead of being called multiple times. + *

* When bridging with asynchronous listener-based APIs, the {@code onRequest} callback * may be used to request more data from source if required and to manage backpressure * by delivering data to sink only when requests are pending. diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxCreateTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxCreateTest.java index 6fe1621d41..ddceb93d8e 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxCreateTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxCreateTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2015-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,10 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -27,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.IntStream; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.provider.EnumSource; import org.reactivestreams.Subscriber; @@ -53,6 +57,36 @@ class FluxCreateTest { + @Test + //https://github.com/reactor/reactor-core/issues/1949 + void ensuresConcurrentRequestAndSettingOnRequestAlwaysDeliversDemand() throws ExecutionException, InterruptedException { + AtomicReference sub = new AtomicReference<>(); + ExecutorService executor = Executors.newSingleThreadExecutor(); + int i = 0; + int attempts = 100; + for (AtomicBoolean requested = new AtomicBoolean(true); requested.getAndSet(false) && i < attempts; ++i) { + CountDownLatch latch = new CountDownLatch(1); + FutureTask task = new FutureTask<>(() -> sub.get().request(1), null); + Flux.create(sink -> sink.onRequest(__ -> { + requested.set(true); + // onRequest can be delivered asynchronously after request(n) has + // returned, so latch coordinates successful request->onRequest + // completion. + latch.countDown(); + })) + .subscribe(new BaseSubscriber() { + @Override + protected void hookOnSubscribe(Subscription subscription) { + sub.set(subscription); + executor.execute(task); + } + }); + latch.await(100, TimeUnit.MILLISECONDS); + } + executor.shutdown(); + Assertions.assertThat(i).as("Failed after %d attempts", i).isEqualTo(attempts); + } + @Test void normalBuffered() { AssertSubscriber ts = AssertSubscriber.create();