diff --git a/reactor-core/src/blockHoundTest/java/reactor/core/scheduler/ReactorBlockHoundIntegrationTest.java b/reactor-core/src/blockHoundTest/java/reactor/core/scheduler/ReactorBlockHoundIntegrationTest.java index a580307548..ca0b5b1847 100644 --- a/reactor-core/src/blockHoundTest/java/reactor/core/scheduler/ReactorBlockHoundIntegrationTest.java +++ b/reactor-core/src/blockHoundTest/java/reactor/core/scheduler/ReactorBlockHoundIntegrationTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2019-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/reactor-core/src/main/java/reactor/core/publisher/Flux.java b/reactor-core/src/main/java/reactor/core/publisher/Flux.java index 19521444fb..8b5079d76d 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Flux.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Flux.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -4809,9 +4809,6 @@ public final Flux doFirst(Runnable onFirst) { */ public final Flux doFinally(Consumer onFinally) { Objects.requireNonNull(onFinally, "onFinally"); - if (this instanceof Fuseable) { - return onAssembly(new FluxDoFinallyFuseable<>(this, onFinally)); - } return onAssembly(new FluxDoFinally<>(this, onFinally)); } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxDoFinally.java b/reactor-core/src/main/java/reactor/core/publisher/FluxDoFinally.java index ccb6a5302f..be56e85da1 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxDoFinally.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxDoFinally.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -45,17 +45,7 @@ final class FluxDoFinally extends InternalFluxOperator { final Consumer onFinally; @SuppressWarnings("unchecked") - static CoreSubscriber createSubscriber( - CoreSubscriber s, Consumer onFinally, - boolean fuseable) { - - if (fuseable) { - if(s instanceof ConditionalSubscriber) { - return new DoFinallyFuseableConditionalSubscriber<>( - (ConditionalSubscriber) s, onFinally); - } - return new DoFinallyFuseableSubscriber<>(s, onFinally); - } + static CoreSubscriber createSubscriber(CoreSubscriber s, Consumer onFinally) { if (s instanceof ConditionalSubscriber) { return new DoFinallyConditionalSubscriber<>((ConditionalSubscriber) s, onFinally); @@ -70,7 +60,7 @@ static CoreSubscriber createSubscriber( @Override public CoreSubscriber subscribeOrReturn(CoreSubscriber actual) { - return createSubscriber(actual, onFinally, false); + return createSubscriber(actual, onFinally); } @Override @@ -87,15 +77,12 @@ static class DoFinallySubscriber implements InnerOperator { volatile int once; + @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater ONCE = AtomicIntegerFieldUpdater.newUpdater(DoFinallySubscriber.class, "once"); - QueueSubscription qs; - Subscription s; - boolean syncFused; - DoFinallySubscriber(CoreSubscriber actual, Consumer onFinally) { this.actual = actual; this.onFinally = onFinally; @@ -112,14 +99,10 @@ public Object scanUnsafe(Attr key) { return InnerOperator.super.scanUnsafe(key); } - @SuppressWarnings("unchecked") @Override public void onSubscribe(Subscription s) { if (Operators.validate(this.s, s)) { this.s = s; - if (s instanceof QueueSubscription) { - this.qs = (QueueSubscription)s; - } actual.onSubscribe(this); } @@ -175,57 +158,6 @@ public CoreSubscriber actual() { } - static class DoFinallyFuseableSubscriber extends DoFinallySubscriber - implements Fuseable, QueueSubscription { - - DoFinallyFuseableSubscriber(CoreSubscriber actual, Consumer onFinally) { - super(actual, onFinally); - } - - @Override - public int requestFusion(int mode) { - QueueSubscription qs = this.qs; - if (qs != null && (mode & Fuseable.THREAD_BARRIER) == 0) { - int m = qs.requestFusion(mode); - if (m != Fuseable.NONE) { - syncFused = m == Fuseable.SYNC; - } - return m; - } - return Fuseable.NONE; - } - - @Override - public void clear() { - if (qs != null) { - qs.clear(); - } - } - - @Override - public boolean isEmpty() { - return qs == null || qs.isEmpty(); - } - - @Override - @Nullable - public T poll() { - if (qs == null) { - return null; - } - T v = qs.poll(); - if (v == null && syncFused) { - runFinally(SignalType.ON_COMPLETE); - } - return v; - } - - @Override - public int size() { - return qs == null ? 0 : qs.size(); - } - } - static final class DoFinallyConditionalSubscriber extends DoFinallySubscriber implements ConditionalSubscriber { @@ -240,19 +172,4 @@ public boolean tryOnNext(T t) { return ((ConditionalSubscriber)actual).tryOnNext(t); } } - - static final class DoFinallyFuseableConditionalSubscriber extends DoFinallyFuseableSubscriber - implements ConditionalSubscriber { - - DoFinallyFuseableConditionalSubscriber(ConditionalSubscriber actual, - Consumer onFinally) { - super(actual, onFinally); - } - - @Override - @SuppressWarnings("unchecked") - public boolean tryOnNext(T t) { - return ((ConditionalSubscriber)actual).tryOnNext(t); - } - } } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxDoFinallyFuseable.java b/reactor-core/src/main/java/reactor/core/publisher/FluxDoFinallyFuseable.java deleted file mode 100644 index c9a948d110..0000000000 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxDoFinallyFuseable.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package reactor.core.publisher; - -import java.util.function.Consumer; - -import reactor.core.CoreSubscriber; -import reactor.core.Fuseable; - -/** - * Hook with fusion into the lifecycle events and signals of a {@link Flux} - * and execute a provided callback after any of onComplete, onError and cancel events. - * The hook is executed only once and receives the event type that triggered - * it ({@link SignalType#ON_COMPLETE}, {@link SignalType#ON_ERROR} or - * {@link SignalType#CANCEL}). - *

- * Note that any exception thrown by the hook are caught and bubbled up - * using {@link Operators#onErrorDropped(Throwable, reactor.util.context.Context)}. - * - * @param the value type - * @author Simon Baslé - */ -final class FluxDoFinallyFuseable extends InternalFluxOperator implements Fuseable { - - final Consumer onFinally; - - FluxDoFinallyFuseable(Flux source, Consumer onFinally) { - super(source); - this.onFinally = onFinally; - } - - @Override - public CoreSubscriber subscribeOrReturn(CoreSubscriber actual) { - return FluxDoFinally.createSubscriber(actual, onFinally, true); - } - - @Override - public Object scanUnsafe(Attr key) { - if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC; - return super.scanUnsafe(key); - } -} diff --git a/reactor-core/src/main/java/reactor/core/publisher/Mono.java b/reactor-core/src/main/java/reactor/core/publisher/Mono.java index 3916ed6a51..c665a457a5 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Mono.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Mono.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -2573,9 +2573,6 @@ public final Mono doFirst(Runnable onFirst) { */ public final Mono doFinally(Consumer onFinally) { Objects.requireNonNull(onFinally, "onFinally"); - if (this instanceof Fuseable) { - return onAssembly(new MonoDoFinallyFuseable<>(this, onFinally)); - } return onAssembly(new MonoDoFinally<>(this, onFinally)); } diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoDoFinally.java b/reactor-core/src/main/java/reactor/core/publisher/MonoDoFinally.java index 0d677a0efe..6026565a60 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoDoFinally.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoDoFinally.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,7 +44,7 @@ final class MonoDoFinally extends InternalMonoOperator { @Override public CoreSubscriber subscribeOrReturn(CoreSubscriber actual) { - return FluxDoFinally.createSubscriber(actual, onFinally, false); + return FluxDoFinally.createSubscriber(actual, onFinally); } @Override diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoDoFinallyFuseable.java b/reactor-core/src/main/java/reactor/core/publisher/MonoDoFinallyFuseable.java deleted file mode 100644 index e19f01e353..0000000000 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoDoFinallyFuseable.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package reactor.core.publisher; - -import java.util.function.Consumer; - -import reactor.core.CoreSubscriber; -import reactor.core.Fuseable; - -/** - * Hook with fusion into the lifecycle events and signals of a {@link Mono} - * and execute a provided callback after any of onComplete, onError and cancel events. - * The hook is executed only once and receives the event type that triggered - * it ({@link SignalType#ON_COMPLETE}, {@link SignalType#ON_ERROR} or - * {@link SignalType#CANCEL}). - *

- * Note that any exception thrown by the hook are caught and bubbled up - * using {@link Operators#onErrorDropped(Throwable, reactor.util.context.Context)}. - * - * @param the value type - * @author Simon Baslé - */ -final class MonoDoFinallyFuseable extends InternalMonoOperator implements Fuseable { - - final Consumer onFinally; - - MonoDoFinallyFuseable(Mono source, Consumer onFinally) { - super(source); - this.onFinally = onFinally; - } - - @Override - public CoreSubscriber subscribeOrReturn(CoreSubscriber actual) { - return FluxDoFinally.createSubscriber(actual, onFinally, true); - } - - @Override - public Object scanUnsafe(Attr key) { - if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC; - return super.scanUnsafe(key); - } -} diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoPeekTerminal.java b/reactor-core/src/main/java/reactor/core/publisher/MonoPeekTerminal.java index 068b6cdac3..62672821d6 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoPeekTerminal.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoPeekTerminal.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -321,7 +321,22 @@ public CoreSubscriber actual() { public T poll() { assert queueSubscription != null; boolean d = done; - T v = queueSubscription.poll(); + T v; + try { + v = queueSubscription.poll(); + } + catch (Throwable pe) { + if (parent.onErrorCall != null) { + try { + parent.onErrorCall.accept(pe); + } + catch (Throwable t) { + t = Operators.onOperatorError(null, pe, t, actual.currentContext()); + throw Exceptions.propagate(t); + } + } + throw pe; + } if (!valued && (v != null || d || sourceMode == SYNC)) { valued = true; //TODO include onEmptyCall here as well? @@ -334,16 +349,8 @@ public T poll() { actual.currentContext())); } } - if (parent.onAfterTerminateCall != null) { - try { - parent.onAfterTerminateCall.accept(v, null); - } - catch (Throwable t) { - Operators.onErrorDropped(Operators.onOperatorError(t, - actual.currentContext()), - actual.currentContext()); - } - } + //if parent.onAfterTerminateCall is set, fusion MUST be negotiated to NONE + //because there's no way to correctly support onAfterError in the poll() scenario } return v; } @@ -362,7 +369,13 @@ public void clear() { @Override public int requestFusion(int requestedMode) { int m; - if (queueSubscription == null) { //source wasn't actually Fuseable + if (queueSubscription == null || parent.onAfterTerminateCall != null) { + /* + Two cases where the configuration doesn't allow fusion: + - source wasn't actually Fuseable + - onAfterTerminateCall is set (which cannot be correctly implemented in the case + qs.poll() throws) + */ m = NONE; } else if ((requestedMode & THREAD_BARRIER) != 0) { diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxDoFinallyTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxDoFinallyTest.java index 844176ab38..bd63d6e5b2 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxDoFinallyTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxDoFinallyTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,12 +30,12 @@ import reactor.core.CoreSubscriber; import reactor.core.Exceptions; import reactor.core.Scannable; -import reactor.test.util.LoggerUtils; import reactor.test.StepVerifier; +import reactor.test.util.LoggerUtils; import reactor.test.util.TestLogger; -import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static reactor.core.Fuseable.*; import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST; @@ -116,70 +116,6 @@ public void normalTake() { assertThat(signalType).isEqualTo(SignalType.ON_COMPLETE); } - @Test - public void syncFused() { - StepVerifier.create(Flux.range(1, 5).doFinally(this)) - .expectFusion(SYNC) - .expectNext(1, 2, 3, 4, 5) - .expectComplete() - .verify(); - - assertThat(calls).isEqualTo(1); - assertThat(signalType).isEqualTo(SignalType.ON_COMPLETE); - } - - @Test - public void syncFusedThreadBarrier() { - StepVerifier.create(Flux.range(1, 5).doFinally(this)) - .expectFusion(SYNC | THREAD_BARRIER , NONE) - .expectNext(1, 2, 3, 4, 5) - .expectComplete() - .verify(); - - assertThat(calls).isEqualTo(1); - assertThat(signalType).isEqualTo(SignalType.ON_COMPLETE); - } - - @Test - public void asyncFused() { - Sinks.Many up = Sinks.many().unicast().onBackpressureBuffer(); - up.emitNext(1, FAIL_FAST); - up.emitNext(2, FAIL_FAST); - up.emitNext(3, FAIL_FAST); - up.emitNext(4, FAIL_FAST); - up.emitNext(5, FAIL_FAST); - up.emitComplete(FAIL_FAST); - - StepVerifier.create(up.asFlux().doFinally(this)) - .expectFusion(ASYNC) - .expectNext(1, 2, 3, 4, 5) - .expectComplete() - .verify(); - - assertThat(calls).isEqualTo(1); - assertThat(signalType).isEqualTo(SignalType.ON_COMPLETE); - } - - @Test - public void asyncFusedThreadBarrier() { - Sinks.Many up = Sinks.many().unicast().onBackpressureBuffer(); - up.emitNext(1, FAIL_FAST); - up.emitNext(2, FAIL_FAST); - up.emitNext(3, FAIL_FAST); - up.emitNext(4, FAIL_FAST); - up.emitNext(5, FAIL_FAST); - up.emitComplete(FAIL_FAST); - - StepVerifier.create(up.asFlux().doFinally(this)) - .expectFusion(ASYNC | THREAD_BARRIER, NONE) - .expectNext(1, 2, 3, 4, 5) - .expectComplete() - .verify(); - - assertThat(calls).isEqualTo(1); - assertThat(signalType).isEqualTo(SignalType.ON_COMPLETE); - } - @Test public void normalJustConditional() { StepVerifier.create(Flux.just(1) @@ -254,76 +190,6 @@ public void normalTakeConditional() { assertThat(signalType).isEqualTo(SignalType.ON_COMPLETE); } - @Test - public void syncFusedConditional() { - StepVerifier.create(Flux.range(1, 5) - .doFinally(this) - .filter(i -> true)) - .expectFusion(SYNC) - .expectNext(1, 2, 3, 4, 5) - .expectComplete() - .verify(); - - assertThat(calls).isEqualTo(1); - assertThat(signalType).isEqualTo(SignalType.ON_COMPLETE); - } - - @Test - public void syncFusedThreadBarrierConditional() { - StepVerifier.create(Flux.range(1, 5) - .doFinally(this) - .filter(i -> true)) - .expectFusion(SYNC | THREAD_BARRIER, NONE) - .expectNext(1, 2, 3, 4, 5) - .expectComplete() - .verify(); - - assertThat(calls).isEqualTo(1); - assertThat(signalType).isEqualTo(SignalType.ON_COMPLETE); - } - - @Test - public void asyncFusedConditional() { - Sinks.Many up = Sinks.many().unicast().onBackpressureBuffer(); - up.emitNext(1, FAIL_FAST); - up.emitNext(2, FAIL_FAST); - up.emitNext(3, FAIL_FAST); - up.emitNext(4, FAIL_FAST); - up.emitNext(5, FAIL_FAST); - up.emitComplete(FAIL_FAST); - - StepVerifier.create(up.asFlux().doFinally(this) - .filter(i -> true)) - .expectFusion(ASYNC) - .expectNext(1, 2, 3, 4, 5) - .expectComplete() - .verify(); - - assertThat(calls).isEqualTo(1); - assertThat(signalType).isEqualTo(SignalType.ON_COMPLETE); - } - - @Test - public void asyncFusedThreadBarrierConditional() { - Sinks.Many up = Sinks.many().unicast().onBackpressureBuffer(); - up.emitNext(1, FAIL_FAST); - up.emitNext(2, FAIL_FAST); - up.emitNext(3, FAIL_FAST); - up.emitNext(4, FAIL_FAST); - up.emitNext(5, FAIL_FAST); - up.emitComplete(FAIL_FAST); - - StepVerifier.create(up.asFlux().doFinally(this) - .filter(i -> true)) - .expectFusion(ASYNC | THREAD_BARRIER, NONE) - .expectNext(1, 2, 3, 4, 5) - .expectComplete() - .verify(); - - assertThat(calls).isEqualTo(1); - assertThat(signalType).isEqualTo(SignalType.ON_COMPLETE); - } - @Test public void nullCallback() { assertThatExceptionOfType(NullPointerException.class).isThrownBy(() -> { @@ -385,16 +251,7 @@ public void severalInARowExecutedInReverseOrder() { @Test public void scanOperator(){ Flux parent = Flux.just(1); - FluxDoFinally test = new FluxDoFinally<>(parent, v -> {}); - - Assertions.assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent); - Assertions.assertThat(test.scan(Scannable.Attr.RUN_STYLE)).isSameAs(Scannable.Attr.RunStyle.SYNC); - } - - @Test - public void scanFuseableOperator(){ - Flux parent = Flux.just(1); - FluxDoFinallyFuseable test = new FluxDoFinallyFuseable<>(parent, s -> {}); + FluxDoFinally test = new FluxDoFinally<>(parent, v -> {}); Assertions.assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent); Assertions.assertThat(test.scan(Scannable.Attr.RUN_STYLE)).isSameAs(Scannable.Attr.RunStyle.SYNC); @@ -523,4 +380,23 @@ public void gh951_withoutDoOnError() { private Boolean throwError(Boolean x) { throw new IllegalStateException("boom"); } + + + // loosely related to https://github.com/reactor/reactor-core/issues/3044 + @Test + void noFusionAndCorrectOrderOfOnErrorThenFinallySignals() { + List signals = new ArrayList<>(); + StepVerifier.create(Flux.just("a", "b", "c") + .collectList() + .map(l -> 100 / (l.size() - 3)) + .doFinally(sig -> signals.add("doFinally(" + sig.toString() + ")")) + .doOnError(e -> signals.add("doOnError")) //ensuring that the finally is invoked after propagation of onError + ) + .expectNoFusionSupport() + .expectErrorSatisfies(e -> assertThat(e).isInstanceOf(ArithmeticException.class) + .hasMessage("/ by zero")) + .verify(); + + assertThat(signals).containsExactly("doOnError", "doFinally(onError)"); + } } diff --git a/reactor-core/src/test/java/reactor/core/publisher/FuseableBestPracticesArchTest.java b/reactor-core/src/test/java/reactor/core/publisher/FuseableBestPracticesArchTest.java new file mode 100644 index 0000000000..e66f9b2b1f --- /dev/null +++ b/reactor-core/src/test/java/reactor/core/publisher/FuseableBestPracticesArchTest.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2019-2022 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.publisher; + +import com.tngtech.archunit.core.domain.JavaClass; +import com.tngtech.archunit.core.domain.JavaClasses; +import com.tngtech.archunit.core.domain.JavaModifier; +import com.tngtech.archunit.core.importer.ClassFileImporter; +import com.tngtech.archunit.core.importer.ImportOption; +import com.tngtech.archunit.lang.ArchCondition; +import com.tngtech.archunit.lang.ConditionEvents; +import com.tngtech.archunit.lang.SimpleConditionEvent; +import org.junit.jupiter.api.Test; + +import reactor.core.CoreSubscriber; +import reactor.core.Fuseable; + +import static com.tngtech.archunit.lang.syntax.ArchRuleDefinition.classes; +import static org.assertj.core.api.Assertions.assertThat; + +class FuseableBestPracticesArchTest { + + static JavaClasses OPERATOR_CLASSES = new ClassFileImporter() + .withImportOption(ImportOption.Predefined.DO_NOT_INCLUDE_TESTS) + .withImportOption(ImportOption.Predefined.DO_NOT_INCLUDE_JARS) + .importPackages("reactor.core.publisher"); + + @Test + void smokeTestWhereClassesLoaded() { + assertThat(OPERATOR_CLASSES).isNotEmpty(); + } + + @Test + void coreFuseableSubscribersShouldNotExtendNonFuseableOnNext() { + classes() + .that().implement(CoreSubscriber.class) + .and().doNotHaveModifier(JavaModifier.ABSTRACT) + .and().areAssignableTo(Fuseable.QueueSubscription.class) + .should(new ArchCondition("have onNext defined in a Fuseable-compatible way") { + @Override + public void check(JavaClass item, ConditionEvents events) { + boolean overridesMethod = item + .getAllMethods() + .stream() + .filter(it -> "onNext".equals(it.getName())) + .anyMatch(it -> it.getOwner().isAssignableTo(Fuseable.QueueSubscription.class)); + + if (!overridesMethod) { + events.add(SimpleConditionEvent.violated( + item, + item.getFullName() + item.getSourceCodeLocation() + ": onNext(T) is not overridden from a QueueSubscription implementation" + )); + } + } + }) + .check(OPERATOR_CLASSES); + } +} diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoDoFinallyTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoDoFinallyTest.java index ed2406df02..a6b1c89204 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoDoFinallyTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoDoFinallyTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -122,31 +122,6 @@ public void normalJustConditional() { assertThat(signalType).isEqualTo(SignalType.ON_COMPLETE); } - @Test - public void syncFused() { - StepVerifier.create(Mono.just(1).doFinally(this)) - .expectFusion(SYNC) - .expectNext(1) - .expectComplete() - .verify(); - - assertThat(calls).isEqualTo(1); - assertThat(signalType).isEqualTo(SignalType.ON_COMPLETE); - } - - @Test - public void syncFusedConditional() { - StepVerifier.create(Mono.just(1).doFinally(this).filter(i -> true)) - .expectFusion(SYNC) - .expectNext(1) - .expectComplete() - .verify(); - - assertThat(calls).isEqualTo(1); - assertThat(signalType).isEqualTo(SignalType.ON_COMPLETE); - } - - @Test public void nullCallback() { assertThatExceptionOfType(NullPointerException.class).isThrownBy(() -> { @@ -211,11 +186,4 @@ public void scanOperator(){ Assertions.assertThat(test.scan(Scannable.Attr.RUN_STYLE)).isSameAs(Scannable.Attr.RunStyle.SYNC); } - - @Test - public void scanFuseableOperator(){ - MonoDoFinallyFuseable test = new MonoDoFinallyFuseable<>(Mono.just("foo"), this); - - assertThat(test.scan(Scannable.Attr.RUN_STYLE)).isSameAs(Scannable.Attr.RunStyle.SYNC); - } } diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoPeekAfterTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoPeekAfterTest.java index 8ba9dc80ca..8166af5e56 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoPeekAfterTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoPeekAfterTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -307,7 +307,7 @@ public void onAfterSuccessOrErrorNormalConditional() { } @Test - public void onAfterSuccessOrErrorFusion() { + public void onAfterSuccessOrErrorFuseableNegotiatedNone() { LongAdder invoked = new LongAdder(); AtomicBoolean completedEmpty = new AtomicBoolean(); AtomicReference error = new AtomicReference<>(); @@ -323,7 +323,7 @@ public void onAfterSuccessOrErrorFusion() { }); StepVerifier.create(mono.log()) - .expectFusion() + .expectFusion(Fuseable.ANY, Fuseable.NONE) .expectNext(55) .expectComplete() .verify(); @@ -334,7 +334,7 @@ public void onAfterSuccessOrErrorFusion() { } @Test - public void onAfterSuccessOrErrorFusionConditional() { + public void onAfterSuccessOrErrorFuseableConditionalNegotiatedNone() { LongAdder invoked = new LongAdder(); AtomicBoolean completedEmpty = new AtomicBoolean(); AtomicReference error = new AtomicReference<>(); @@ -351,7 +351,7 @@ public void onAfterSuccessOrErrorFusionConditional() { }); StepVerifier.create(mono) - .expectFusion() + .expectFusion(Fuseable.ANY, Fuseable.NONE) .expectNext(55) .expectComplete() .verify(); @@ -382,7 +382,7 @@ public void onAfterTerminateNormalConditional() { } @Test - public void onAfterTerminateFusion() { + void onAfterTerminateFuseableNegotiatedNone() { LongAdder invoked = new LongAdder(); Mono mono = Flux @@ -391,16 +391,16 @@ public void onAfterTerminateFusion() { .doAfterTerminate(invoked::increment); StepVerifier.create(mono.log()) - .expectFusion() - .expectNext(55) - .expectComplete() - .verify(); + .expectFusion(Fuseable.ANY, Fuseable.NONE) + .expectNext(55) + .expectComplete() + .verify(); assertThat(invoked.intValue()).isEqualTo(1); } @Test - public void onAfterTerminateFusionConditional() { + public void onAfterTerminateFuseableConditionalNegotiatedNone() { LongAdder invoked = new LongAdder(); Mono mono = Flux @@ -410,10 +410,10 @@ public void onAfterTerminateFusionConditional() { .doAfterTerminate(invoked::increment); StepVerifier.create(mono) - .expectFusion() - .expectNext(55) - .expectComplete() - .verify(); + .expectFusion(Fuseable.ANY, Fuseable.NONE) + .expectNext(55) + .expectComplete() + .verify(); assertThat(invoked.intValue()).isEqualTo(1); } @@ -689,7 +689,7 @@ public void testCallbacksNoFusion() { errorInvocation.set(t); }); - StepVerifier.create(mono) + StepVerifier.create(mono.log()) .expectFusion(Fuseable.NONE) .expectNext(55) .expectComplete() @@ -701,7 +701,7 @@ public void testCallbacksNoFusion() { } @Test - public void testCallbacksFusionSync() { + void testCallbacksWithAfterTerminateNegotiatesFusionNone() { AtomicReference successInvocation = new AtomicReference<>(); AtomicReference afterTerminateInvocation = new AtomicReference<>(); AtomicReference errorInvocation = new AtomicReference<>(); @@ -717,7 +717,7 @@ public void testCallbacksFusionSync() { }); StepVerifier.create(mono) - .expectFusion(Fuseable.SYNC, Fuseable.SYNC) //TODO in 3.0.3 this doesn't work + .expectFusion(Fuseable.SYNC, Fuseable.NONE) .expectNext(55) .expectComplete() .verify(); @@ -728,10 +728,31 @@ public void testCallbacksFusionSync() { } @Test - public void testCallbacksFusionAsync() { + void testCallbacksFusionSync() { + AtomicReference successInvocation = new AtomicReference<>(); + AtomicReference errorInvocation = new AtomicReference<>(); + + Mono source = Mono.fromDirect(Flux.range(55, 1)); + + Mono mono = new MonoPeekTerminal<>(source, + successInvocation::set, + errorInvocation::set, + null); //afterTerminate forces the negotiation of fusion mode NONE + + StepVerifier.create(mono) + .expectFusion(Fuseable.SYNC) + .expectNext(55) + .expectComplete() + .verify(); + + assertThat((Object) successInvocation.get()).isEqualTo(55); + assertThat(errorInvocation).hasValue(null); + } + + @Test + void testCallbacksFusionAsync() { AtomicReference successInvocation = new AtomicReference<>(); AtomicReference errorInvocation = new AtomicReference<>(); - AtomicReference afterTerminateInvocation = new AtomicReference<>(); Mono source = Flux .range(1, 10) @@ -740,10 +761,7 @@ public void testCallbacksFusionAsync() { Mono mono = new MonoPeekTerminal<>(source, successInvocation::set, errorInvocation::set, - (v, t) -> { - afterTerminateInvocation.set(v); - errorInvocation.set(t); - }); + null); //afterTerminate forces the negotiation of fusion mode NONE StepVerifier.create(mono) .expectFusion(Fuseable.ASYNC) @@ -753,7 +771,6 @@ public void testCallbacksFusionAsync() { assertThat((Object) successInvocation.get()).isEqualTo(55); assertThat(errorInvocation).hasValue(null); - assertThat((Object) afterTerminateInvocation.get()).isEqualTo(55); } @Test diff --git a/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java b/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java index cad0e129aa..59defd30bf 100644 --- a/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java +++ b/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java @@ -1211,8 +1211,8 @@ void drainAsyncLoop(){ } catch (Throwable e) { Exceptions.throwIfFatal(e); - cancel(); onError(Exceptions.unwrap(e)); + cancel(); return; } if (currentCollector != null) {