From aa26c12da99ec88d0cbea86f0777e60616ed5a3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Wed, 2 Oct 2024 15:41:13 +0200 Subject: [PATCH] Fix MonoPublishOnTest flakiness (#3898) The implementation relied on a specific sequencing of events, which in case of slower hardware or random events would occasionally fail the assertions. This change introduces firm ordering and fixes an incorrect assertion, which assumed the `Mono` terminates without error, while in fact the `RejectedExecutionException` is propagated to the `AssertSubscriber`. In effect, the tests are no longer flaky. --- .../core/publisher/MonoPublishOnTest.java | 57 +++++++++++-------- 1 file changed, 32 insertions(+), 25 deletions(-) diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoPublishOnTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoPublishOnTest.java index abd2567b04..18732e0f24 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoPublishOnTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoPublishOnTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2015-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -55,13 +55,12 @@ public void rejectedExecutionExceptionOnDataSignalExecutor() final AtomicReference dataInOnOperatorError = new AtomicReference<>(); try { - - CountDownLatch hookLatch = new CountDownLatch(1); + CountDownLatch finallyLatch = new CountDownLatch(1); + CountDownLatch inOnNextLatch = new CountDownLatch(1); Hooks.onOperatorError((t, d) -> { throwableInOnOperatorError.set(t); dataInOnOperatorError.set(d); - hookLatch.countDown(); return t; }); @@ -73,22 +72,25 @@ public void rejectedExecutionExceptionOnDataSignalExecutor() .publishOn(fromExecutorService(executor)) .doOnNext(s -> { try { + inOnNextLatch.countDown(); latch.await(); } catch (InterruptedException e) { } }) .publishOn(fromExecutor(executor)) + .doFinally(s -> finallyLatch.countDown()) .subscribe(assertSubscriber); + inOnNextLatch.await(); executor.shutdownNow(); + finallyLatch.await(); + assertSubscriber.assertNoValues() - .assertNoError() + .assertError(RejectedExecutionException.class) .assertNotComplete(); - hookLatch.await(); - assertThat(throwableInOnOperatorError.get()).isInstanceOf(RejectedExecutionException.class); assertThat(data).isSameAs(dataInOnOperatorError.get()); } @@ -109,13 +111,12 @@ public void rejectedExecutionExceptionOnErrorSignalExecutor() final AtomicReference dataInOnOperatorError = new AtomicReference<>(); try { - - CountDownLatch hookLatch = new CountDownLatch(2); + CountDownLatch finallyLatch = new CountDownLatch(1); + CountDownLatch inOnNextLatch = new CountDownLatch(1); Hooks.onOperatorError((t, d) -> { throwableInOnOperatorError.set(t); dataInOnOperatorError.set(d); - hookLatch.countDown(); return t; }); @@ -127,6 +128,7 @@ public void rejectedExecutionExceptionOnErrorSignalExecutor() .publishOn(fromExecutorService(executor)) .doOnNext(s -> { try { + inOnNextLatch.countDown(); latch.await(); } catch (InterruptedException e) { @@ -134,16 +136,17 @@ public void rejectedExecutionExceptionOnErrorSignalExecutor() } }) .publishOn(fromExecutor(executor)) + .doFinally(s -> finallyLatch.countDown()) .subscribe(assertSubscriber); + inOnNextLatch.await(); executor.shutdownNow(); + finallyLatch.await(); assertSubscriber.assertNoValues() - .assertNoError() + .assertError(RejectedExecutionException.class) .assertNotComplete(); - hookLatch.await(); - assertThat(throwableInOnOperatorError.get()).isInstanceOf(RejectedExecutionException.class); assertThat(exception).isSameAs(throwableInOnOperatorError.get() .getSuppressed()[0]); @@ -164,13 +167,12 @@ public void rejectedExecutionExceptionOnDataSignalExecutorService() final AtomicReference dataInOnOperatorError = new AtomicReference<>(); try { - - CountDownLatch hookLatch = new CountDownLatch(1); + CountDownLatch finallyLatch = new CountDownLatch(1); + CountDownLatch inOnNextLatch = new CountDownLatch(1); Hooks.onOperatorError((t, d) -> { throwableInOnOperatorError.set(t); dataInOnOperatorError.set(d); - hookLatch.countDown(); return t; }); @@ -182,22 +184,25 @@ public void rejectedExecutionExceptionOnDataSignalExecutorService() .publishOn(fromExecutorService(executor)) .doOnNext(s -> { try { + inOnNextLatch.countDown(); latch.await(); } catch (InterruptedException e) { } }) .publishOn(fromExecutorService(executor)) + .doFinally(s -> finallyLatch.countDown()) .subscribe(assertSubscriber); + inOnNextLatch.await(); + executor.shutdownNow(); + finallyLatch.await(); assertSubscriber.assertNoValues() - .assertNoError() + .assertError(RejectedExecutionException.class) .assertNotComplete(); - hookLatch.await(); - assertThat(throwableInOnOperatorError.get()).isInstanceOf(RejectedExecutionException.class); assertThat(data).isSameAs(dataInOnOperatorError.get()); } @@ -218,13 +223,12 @@ public void rejectedExecutionExceptionOnErrorSignalExecutorService() final AtomicReference dataInOnOperatorError = new AtomicReference<>(); try { - - CountDownLatch hookLatch = new CountDownLatch(2); + CountDownLatch finallyLatch = new CountDownLatch(1); + CountDownLatch inOnNextLatch = new CountDownLatch(1); Hooks.onOperatorError((t, d) -> { throwableInOnOperatorError.set(t); dataInOnOperatorError.set(d); - hookLatch.countDown(); return t; }); @@ -236,6 +240,7 @@ public void rejectedExecutionExceptionOnErrorSignalExecutorService() .publishOn(fromExecutorService(executor)) .doOnNext(s -> { try { + inOnNextLatch.countDown(); latch.await(); } catch (InterruptedException e) { @@ -243,16 +248,18 @@ public void rejectedExecutionExceptionOnErrorSignalExecutorService() } }) .publishOn(fromExecutorService(executor)) + .doFinally(s -> finallyLatch.countDown()) .subscribe(assertSubscriber); + inOnNextLatch.await(); + executor.shutdownNow(); + finallyLatch.await(); assertSubscriber.assertNoValues() - .assertNoError() + .assertError(RejectedExecutionException.class) .assertNotComplete(); - hookLatch.await(); - assertThat(throwableInOnOperatorError.get()).isInstanceOf(RejectedExecutionException.class); assertThat(exception).isSameAs(throwableInOnOperatorError.get() .getSuppressed()[0]);