Skip to content

Commit

Permalink
More control on event publishing
Browse files Browse the repository at this point in the history
  • Loading branch information
larousso committed Nov 22, 2023
1 parent 1b485b3 commit 3c90408
Showing 1 changed file with 3 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ public <TxCtx> void start(EventStore<TxCtx, E, Meta, Context> eventStore, Concur
LOGGER.error("Error republishing events for topic %s retrying for the %s time".formatted(topic, ctx.totalRetries()), ctx.failure());
})
)
.switchIfEmpty(Flux.just(Tuple.empty()));
.collectList()
.map(__ -> Tuple.empty())
.switchIfEmpty(Mono.just(Tuple.empty()));
})
.concatMap(__ ->
this.eventsSource.transform(publishToKafka(
Expand Down

0 comments on commit 3c90408

Please sign in to comment.