diff --git a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java index efb4500f..6334fdf5 100644 --- a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java +++ b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java @@ -99,7 +99,9 @@ public void start(EventStore eventStore, Concur LOGGER.error("Error republishing events for topic %s retrying for the %s time".formatted(topic, ctx.totalRetries()), ctx.failure()); }) ) - .switchIfEmpty(Flux.just(Tuple.empty())); + .collectList() + .map(__ -> Tuple.empty()) + .switchIfEmpty(Mono.just(Tuple.empty())); }) .concatMap(__ -> this.eventsSource.transform(publishToKafka(