diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/DynamicMqttTopicSourceTest.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/DynamicMqttTopicSourceTest.java index 75956e432f..8d6164eae9 100644 --- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/DynamicMqttTopicSourceTest.java +++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/DynamicMqttTopicSourceTest.java @@ -48,6 +48,10 @@ private void awaitAndVerify() { .pollInterval(Duration.ofSeconds(1)) .until(() -> connector.getReadiness().isOk()); + await() + .pollInterval(Duration.ofSeconds(1)) + .until(() -> connector.getLiveness().isOk()); + bean.publish(); await().until(() -> bean.messages().size() >= 3); @@ -155,23 +159,36 @@ public void publish() { emitter.send(MqttMessage .of("/app/hello/mqtt-" + LocalDate.now().toString() + "/greeting", "hello from dynamic topic 1", MqttQoS.EXACTLY_ONCE)); + nap(10); emitter.send(MqttMessage .of("/app/hello/mqtt-" + LocalDate.now().toString() + "/greeting", "hello from dynamic topic 2", MqttQoS.EXACTLY_ONCE)); + nap(10); emitter.send(MqttMessage .of("/app/hello/mqtt-" + LocalDate.now().toString() + "/greeting", "hello from dynamic topic 3", MqttQoS.EXACTLY_ONCE)); + nap(15); emitter.send(MqttMessage .of("$/app/hello/mqtt-" + LocalDate.now().toString() + "/greeting", "hello from dynamic topic 4", MqttQoS.EXACTLY_ONCE)); + nap(10); emitter.send(MqttMessage .of("$/app/hello/mqtt-" + LocalDate.now().toString() + "/greeting", "hello from dynamic topic 5", MqttQoS.EXACTLY_ONCE)); + nap(5); emitter.send(MqttMessage .of("$/app/hello/mqtt-" + LocalDate.now().toString() + "/greeting", "hello from dynamic topic 6", MqttQoS.EXACTLY_ONCE)); } + private void nap(int duration) { + try { + Thread.sleep(duration); + } catch (Exception ignored) { + // ignored. + } + } + @Incoming("in") public CompletionStage received(MqttMessage message) { messages.add(message);