From 809c48c43b649e68cf99d2e96b5ec45f10c26f0e Mon Sep 17 00:00:00 2001 From: Paulo Casaes Date: Fri, 1 Mar 2024 11:15:41 -0800 Subject: [PATCH] Do not fail on resolve kafka streams topics when topics check disabled Resolves https://github.com/quarkusio/quarkus/issues/39120 --- .../kafka/streams/runtime/KafkaStreamsProducer.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java index f03ddabeb9bda..604017f831591 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java @@ -109,18 +109,22 @@ public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStream this.executorService = executorService; this.topicsTimeout = runtimeConfig.topicsTimeout; - this.trimmedTopics = runtimeConfig.getTrimmedTopics(); + this.trimmedTopics = isTopicsCheckEnabled() ? runtimeConfig.getTrimmedTopics() : Collections.emptyList(); this.streamsConfig = new StreamsConfig(kafkaStreamsProperties); this.kafkaStreams = initializeKafkaStreams(streamsConfig, topology.get(), kafkaClientSupplier, stateListener, globalStateRestoreListener, uncaughtExceptionHandlerListener); this.kafkaStreamsTopologyManager = new KafkaStreamsTopologyManager(kafkaAdminClient); } + private boolean isTopicsCheckEnabled() { + return topicsTimeout.compareTo(Duration.ZERO) > 0; + } + public void onStartup(@Observes StartupEvent event, Event kafkaStreamsEvent) { if (kafkaStreams != null) { kafkaStreamsEvent.fire(kafkaStreams); executorService.execute(() -> { - if (topicsTimeout.compareTo(Duration.ZERO) > 0) { + if (isTopicsCheckEnabled()) { try { waitForTopicsToBeCreated(kafkaAdminClient, trimmedTopics, topicsTimeout); } catch (InterruptedException e) {