Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka failure strategy dead-letter-topic can't find deserializer class #34931

Closed
Dieken opened this issue Jul 22, 2023 · 18 comments · Fixed by #36347
Closed

kafka failure strategy dead-letter-topic can't find deserializer class #34931

Dieken opened this issue Jul 22, 2023 · 18 comments · Fixed by #36347
Assignees
Labels
area/kafka kind/bug Something isn't working
Milestone

Comments

@Dieken
Copy link
Contributor

Dieken commented Jul 22, 2023

Describe the bug

Enable mp.messaging.incoming.words-in.failure-strategy=dead-letter-queue then Quarkus application will fail to start:

2023-07-22 19:49:49,127 ERROR [io.sma.rea.mes.provider] (Quarkus Main Thread) SRMSG00230: Unable to create the publisher or subscriber during initialization: java.lang.IllegalArgumentException: SRMSG18010: Unable to create an instance of `org.acme.MQMessage_Serializer_5c899fb72f68994441842e7374516dfc6eaa09e7`
        at io.smallrye.reactive.messaging.kafka.fault.SerializerWrapper.createDelegateSerializer(SerializerWrapper.java:35)
        at io.smallrye.reactive.messaging.kafka.fault.SerializerWrapper.<init>(SerializerWrapper.java:25)
        at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer.<init>(ReactiveKafkaProducer.java:103)
        at io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue$Factory.create(KafkaDeadLetterQueue.java:95)
        at io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue_Factory_ClientProxy.create(Unknown Source)
        at io.smallrye.reactive.messaging.kafka.impl.KafkaSource.createFailureHandler(KafkaSource.java:309)
        at io.smallrye.reactive.messaging.kafka.impl.KafkaSource.<init>(KafkaSource.java:119)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector.getPublisher(KafkaConnector.java:211)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass.getPublisher$$superforward(Unknown Source)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass$$function$$9.apply(Unknown Source)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:73)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext$NextAroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:97)
        at io.quarkus.smallrye.reactivemessaging.runtime.devmode.DevModeSupportConnectorFactoryInterceptor.intercept(DevModeSupportConnectorFactoryInterceptor.java:53)
        at io.quarkus.smallrye.reactivemessaging.runtime.devmode.DevModeSupportConnectorFactoryInterceptor_Bean.intercept(Unknown Source)
        at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:42)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:70)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:62)
        at io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor.intercept(DuplicatedContextConnectorFactoryInterceptor.java:32)
        at io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor_Bean.intercept(Unknown Source)
        at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:42)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:30)
        at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:27)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass.getPublisher(Unknown Source)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_ClientProxy.getPublisher(Unknown Source)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.createPublisher(ConfiguredChannelFactory.java:172)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.register(ConfiguredChannelFactory.java:134)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.initialize(ConfiguredChannelFactory.java:106)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory_ClientProxy.initialize(Unknown Source)
        at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
        at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
        at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
        at io.smallrye.reactive.messaging.providers.extension.MediatorManager.start(MediatorManager.java:212)
        at io.smallrye.reactive.messaging.providers.extension.MediatorManager_ClientProxy.start(Unknown Source)
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle.onApplicationStart(SmallRyeReactiveMessagingLifecycle.java:52)
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_68e7b57eb97cb75d597c5b816682366e888d0d9b.notify(Unknown Source)
        at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:346)
        at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:328)
        at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:82)
        at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:155)
        at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:106)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy_0(Unknown Source)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy(Unknown Source)
        at io.quarkus.runner.ApplicationImpl.doStart(Unknown Source)
        at io.quarkus.runtime.Application.start(Application.java:101)
        at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:111)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:71)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:44)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:124)
        at io.quarkus.runner.GeneratedMain.main(Unknown Source)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at io.quarkus.runner.bootstrap.StartupActionImpl$1.run(StartupActionImpl.java:104)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.ClassNotFoundException: org.acme.MQMessage_Serializer_5c899fb72f68994441842e7374516dfc6eaa09e7
        at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
        at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
        at io.quarkus.bootstrap.classloading.QuarkusClassLoader.loadClass(QuarkusClassLoader.java:516)
        at io.quarkus.bootstrap.classloading.QuarkusClassLoader.loadClass(QuarkusClassLoader.java:466)
        at io.quarkus.bootstrap.classloading.QuarkusClassLoader.loadClass(QuarkusClassLoader.java:516)
        at io.quarkus.bootstrap.classloading.QuarkusClassLoader.loadClass(QuarkusClassLoader.java:466)
        at java.base/java.lang.Class.forName0(Native Method)
        at java.base/java.lang.Class.forName(Class.java:467)
        at org.apache.kafka.common.utils.Utils.loadClass(Utils.java:422)
        at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:411)
        at io.smallrye.reactive.messaging.kafka.fault.SerializerWrapper.createDelegateSerializer(SerializerWrapper.java:33)
        ... 54 more


2023-07-22 19:49:49,140 INFO  [io.qua.dep.dev.IsolatedDevModeMain] (main) Attempting to start live reload endpoint to recover from previous Quarkus startup failure
2023-07-22 19:49:49,140 ERROR [io.qua.run.Application] (Quarkus Main Thread) Failed to start application (with profile [dev]): java.lang.RuntimeException: Failed to start quarkus
        at io.quarkus.runner.ApplicationImpl.doStart(Unknown Source)
        at io.quarkus.runtime.Application.start(Application.java:101)
        at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:111)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:71)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:44)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:124)
        at io.quarkus.runner.GeneratedMain.main(Unknown Source)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at io.quarkus.runner.bootstrap.StartupActionImpl$1.run(StartupActionImpl.java:104)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: jakarta.enterprise.inject.spi.DeploymentException: java.lang.IllegalArgumentException: SRMSG18010: Unable to create an instance of `org.acme.MQMessage_Serializer_5c899fb72f68994441842e7374516dfc6eaa09e7`
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle.onApplicationStart(SmallRyeReactiveMessagingLifecycle.java:57)
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_68e7b57eb97cb75d597c5b816682366e888d0d9b.notify(Unknown Source)
        at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:346)
        at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:328)
        at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:82)
        at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:155)
        at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:106)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy_0(Unknown Source)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy(Unknown Source)
        ... 13 more
Caused by: java.lang.IllegalArgumentException: SRMSG18010: Unable to create an instance of `org.acme.MQMessage_Serializer_5c899fb72f68994441842e7374516dfc6eaa09e7`
        at io.smallrye.reactive.messaging.kafka.fault.SerializerWrapper.createDelegateSerializer(SerializerWrapper.java:35)
        at io.smallrye.reactive.messaging.kafka.fault.SerializerWrapper.<init>(SerializerWrapper.java:25)
        at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer.<init>(ReactiveKafkaProducer.java:103)
        at io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue$Factory.create(KafkaDeadLetterQueue.java:95)
        at io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue_Factory_ClientProxy.create(Unknown Source)
        at io.smallrye.reactive.messaging.kafka.impl.KafkaSource.createFailureHandler(KafkaSource.java:309)
        at io.smallrye.reactive.messaging.kafka.impl.KafkaSource.<init>(KafkaSource.java:119)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector.getPublisher(KafkaConnector.java:211)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass.getPublisher$$superforward(Unknown Source)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass$$function$$9.apply(Unknown Source)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:73)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext$NextAroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:97)
        at io.quarkus.smallrye.reactivemessaging.runtime.devmode.DevModeSupportConnectorFactoryInterceptor.intercept(DevModeSupportConnectorFactoryInterceptor.java:53)
        at io.quarkus.smallrye.reactivemessaging.runtime.devmode.DevModeSupportConnectorFactoryInterceptor_Bean.intercept(Unknown Source)
        at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:42)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:70)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:62)
        at io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor.intercept(DuplicatedContextConnectorFactoryInterceptor.java:32)
        at io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor_Bean.intercept(Unknown Source)
        at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:42)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:30)
        at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:27)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass.getPublisher(Unknown Source)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_ClientProxy.getPublisher(Unknown Source)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.createPublisher(ConfiguredChannelFactory.java:172)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.register(ConfiguredChannelFactory.java:134)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.initialize(ConfiguredChannelFactory.java:106)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory_ClientProxy.initialize(Unknown Source)
        at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
        at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
        at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
        at io.smallrye.reactive.messaging.providers.extension.MediatorManager.start(MediatorManager.java:212)
        at io.smallrye.reactive.messaging.providers.extension.MediatorManager_ClientProxy.start(Unknown Source)
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle.onApplicationStart(SmallRyeReactiveMessagingLifecycle.java:52)
        ... 21 more
Caused by: java.lang.ClassNotFoundException: org.acme.MQMessage_Serializer_5c899fb72f68994441842e7374516dfc6eaa09e7
        at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
        at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
        at io.quarkus.bootstrap.classloading.QuarkusClassLoader.loadClass(QuarkusClassLoader.java:516)
        at io.quarkus.bootstrap.classloading.QuarkusClassLoader.loadClass(QuarkusClassLoader.java:466)
        at io.quarkus.bootstrap.classloading.QuarkusClassLoader.loadClass(QuarkusClassLoader.java:516)
        at io.quarkus.bootstrap.classloading.QuarkusClassLoader.loadClass(QuarkusClassLoader.java:466)
        at java.base/java.lang.Class.forName0(Native Method)
        at java.base/java.lang.Class.forName(Class.java:467)
        at org.apache.kafka.common.utils.Utils.loadClass(Utils.java:422)
        at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:411)
        at io.smallrye.reactive.messaging.kafka.fault.SerializerWrapper.createDelegateSerializer(SerializerWrapper.java:33)
        ... 54 more

Expected behavior

It should just work.

Actual behavior

It throws exception and fails to start.

How to Reproduce?

code-with-quarkus.tar.gz

Run quarkus dev, it will throw exception above.

Output of uname -a or ver

Darwin xxx 22.4.0 Darwin Kernel Version 22.4.0: Mon Mar 6 21:00:17 PST 2023; root:xnu-8796.101.5~3/RELEASE_X86_64 x86_64

Output of java -version

OpenJDK 64-Bit Server VM Temurin-17+35 (build 17+35, mixed mode, sharing)

GraalVM version (if different from Java)

No response

Quarkus version or git rev

3.2.1.Final

Build tool (ie. output of mvnw --version or gradlew --version)

Apache Maven 3.8.8 (4c87b05d9aedce574290d1acc98575ed5eb6cd39)

Additional information

No response

@Dieken Dieken added the kind/bug Something isn't working label Jul 22, 2023
@quarkus-bot
Copy link

quarkus-bot bot commented Jul 22, 2023

/cc @alesj (kafka), @cescoffier (kafka), @ozangunalp (kafka)

@ozangunalp
Copy link
Contributor

Dead letter queue strategy tries to find a serializer given the name of the incoming channel deserializer. In your case it seems like an auto generated deserializer. You can look at the dead letter queue config to configure the serializer: https://smallrye.io/smallrye-reactive-messaging/4.8.0/kafka/receiving-kafka-records/#dead-letter-queue

@Dieken
Copy link
Contributor Author

Dieken commented Jul 22, 2023

The auto generation of serializer and deserializer for nomal topic just works, so I feel it should just work too for dead letter topic.

@cescoffier
Copy link
Member

I agree @Dieken. The discovery should work in this case. I think this should be considered as a bug.

For now, you can work around it by configuring the dead letter queue serializer.

@Dieken
Copy link
Contributor Author

Dieken commented Jul 27, 2023

@ozangunalp got this exception after set mp.messaging.incoming.messages-in.dead-letter-queue.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer

2023-07-27 08:27:00,238 ERROR [org.apa.kaf.cli.pro.int.Sender] (kafka-producer-network-thread | kafka-dead-letter-topic-producer-kafka-consumer-messages-in-1) [Producer clientId=kafka-dead-letter-topic-producer-kafka-consumer-messages-in-1] Uncaught error in kafka producer I/O thread: : java.lang.NullPointerException: Cannot invoke "String.getBytes(java.nio.charset.Charset)" because "this.key" is null
        at org.apache.kafka.common.message.FindCoordinatorRequestData.addSize(FindCoordinatorRequestData.java:235)
        at org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)
        at org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187)
        at org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:101)
        at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:524)
        at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:500)
        at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:460)
        at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:478)
        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
        at java.base/java.lang.Thread.run(Thread.java:833)


2023-07-27 08:27:00,238 ERROR [org.apa.kaf.cli.pro.int.Sender] (kafka-producer-network-thread | kafka-dead-letter-topic-producer-kafka-consumer-messages-in-0) [Producer clientId=kafka-dead-letter-topic-producer-kafka-consumer-messages-in-0] Uncaught error in kafka producer I/O thread: : java.lang.NullPointerException: Cannot invoke "String.getBytes(java.nio.charset.Charset)" because "this.key" is null
        at org.apache.kafka.common.message.FindCoordinatorRequestData.addSize(FindCoordinatorRequestData.java:235)
        at org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)
        at org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187)
        at org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:101)
        at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:524)
        at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:500)
        at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:460)
        at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:478)
        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
        at java.base/java.lang.Thread.run(Thread.java:833)


2023-07-27 08:27:00,238 ERROR [org.apa.kaf.cli.pro.int.Sender] (kafka-producer-network-thread | kafka-dead-letter-topic-producer-kafka-consumer-messages-in-3) [Producer clientId=kafka-dead-letter-topic-producer-kafka-consumer-messages-in-3] Uncaught error in kafka producer I/O thread: : java.lang.NullPointerException: Cannot invoke "String.getBytes(java.nio.charset.Charset)" because "this.key" is null
        at org.apache.kafka.common.message.FindCoordinatorRequestData.addSize(FindCoordinatorRequestData.java:235)
        at org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)
        at org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187)
        at org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:101)
        at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:524)
        at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:500)
        at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:460)
        at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:478)
        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
        at java.base/java.lang.Thread.run(Thread.java:833)

Related config:

mp.messaging.outgoing.messages-out.connector=smallrye-kafka
mp.messaging.outgoing.messages-out.topic=messages
mp.messaging.outgoing.messages-out.transactional.id=messages-${quarkus.uuid}

mp.messaging.incoming.messages-in.auto.offset.reset=latest
mp.messaging.incoming.messages-in.connector=smallrye-kafka
mp.messaging.incoming.messages-in.topic=messages
mp.messaging.incoming.messages-in.isolation.level=read_committed
mp.messaging.incoming.messages-in.partitions=4
mp.messaging.incoming.messages-in.failure-strategy=dead-letter-queue
mp.messaging.incoming.messages-in.dead-letter-queue.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer

kafka.group.id=engine
kafka.partitioner.class=my.KafkaConsistentHashPartitioner

KafkaConsistentHashPartitioner.java:

import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

public class KafkaConsistentHashPartitioner implements Partitioner {

    private HashFunction hasher = Hashing.sipHash24();

    @Override
    public int partition(java.lang.String topic, Object key, byte[] keyBytes,
            Object value, byte[] valueBytes, Cluster cluster) {
        int partitions = cluster.partitionCountForTopic(topic);
        return Hashing.consistentHash(hasher.hashBytes(keyBytes), partitions);
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> map) {
    }
}

Environment:

  1. Quarkus 3.2.2.Final
  2. JDK 17
  3. macOS

@Dieken
Copy link
Contributor Author

Dieken commented Jul 27, 2023

I agree @Dieken. The discovery should work in this case. I think this should be considered as a bug.

For now, you can work around it by configuring the dead letter queue serializer.

Yes, it would be better if it just works without explicit configuration of dead letter queue serializer.

@ozangunalp
Copy link
Contributor

I've never seen this exception before. It may be related to your authentication method to the Kafka broker.

@Dieken
Copy link
Contributor Author

Dieken commented Jul 27, 2023

I use Kafka dev container. The exception can be easily recurred, just update the application.properties in the code-with-quarkus.tar.gz attachment above:

mp.messaging.outgoing.words-out.topic=words
#mp.messaging.outgoing.words-out.transactional.id=words-${quarkus.uuid}

mp.messaging.incoming.words-in.topic=words
mp.messaging.incoming.words-in.auto.offset.reset=earliest
mp.messaging.incoming.words-in.failure-strategy=dead-letter-queue
mp.messaging.incoming.words-in.dead-letter-queue.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer
mp.messaging.incoming.words-in.isolation.level=read_committed
mp.messaging.incoming.words-in.partitions=4

kafka.group.id=engine

The NPE is caused by partitions=4.

@Dieken
Copy link
Contributor Author

Dieken commented Jul 27, 2023

Maybe it's an issue of Kafka client, it reports warnings or errors if the topic doesn't exist. After the topic is created, the warnings or errors disappear.

With Confluent-7.0.4:

2023-07-27 20:52:21,833 WARN  [org.apa.kaf.cli.NetworkClient] (smallrye-kafka-consumer-thread-0) [Consumer clientId=kafka-consumer-words-in-0, groupId=engine] Error while fetching metadata with correlation id 2 : {words=UNKNOWN_TOPIC_OR_PARTITION}
2023-07-27 20:52:21,833 WARN  [org.apa.kaf.cli.NetworkClient] (smallrye-kafka-consumer-thread-1) [Consumer clientId=kafka-consumer-words-in-1, groupId=engine] Error while fetching metadata with correlation id 2 : {words=UNKNOWN_TOPIC_OR_PARTITION}
2023-07-27 20:52:21,833 WARN  [org.apa.kaf.cli.NetworkClient] (smallrye-kafka-consumer-thread-3) [Consumer clientId=kafka-consumer-words-in-3, groupId=engine] Error while fetching metadata with correlation id 2 : {words=UNKNOWN_TOPIC_OR_PARTITION}
2023-07-27 20:52:21,833 WARN  [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | kafka-producer-words-out) [Producer clientId=kafka-producer-words-out] Error while fetching metadata with correlation id 1 : {words=UNKNOWN_TOPIC_OR_PARTITION}
2023-07-27 20:52:21,837 WARN  [org.apa.kaf.cli.NetworkClient] (smallrye-kafka-consumer-thread-2) [Consumer clientId=kafka-consumer-words-in-2, groupId=engine] Error while fetching metadata with correlation id 2 : {words=LEADER_NOT_AVAILABLE}

With Redpanda-v22.3.4:

2023-07-27 20:51:14,167 ERROR [org.apa.kaf.cli.pro.int.Sender] (kafka-producer-network-thread | kafka-dead-letter-topic-producer-kafka-consumer-words-in-2) [Producer clientId=kafka-dead-letter-topic-producer-kafka-consumer-words-in-2] Uncaught error in kafka producer I/O thread:  [Error Occurred After Shutdown]: java.lang.NullPointerException: Cannot invoke "String.getBytes(java.nio.charset.Charset)" because "this.key" is null
        at org.apache.kafka.common.message.FindCoordinatorRequestData.addSize(FindCoordinatorRequestData.java:235)
        at org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)
        at org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187)
        at org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:101)
        at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:524)
        at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:500)
        at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:460)
        at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:478)
        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
        at java.base/java.lang.Thread.run(Thread.java:833)

2023-07-27 20:51:14,168 ERROR [org.apa.kaf.cli.pro.int.Sender] (kafka-producer-network-thread | kafka-dead-letter-topic-producer-kafka-consumer-words-in-3) [Producer clientId=kafka-dead-letter-topic-producer-kafka-consumer-words-in-3] Uncaught error in kafka producer I/O thread:  [Error Occurred After Shutdown]: java.lang.NullPointerException: Cannot invoke "String.getBytes(java.nio.charset.Charset)" because "this.key" is null
        at org.apache.kafka.common.message.FindCoordinatorRequestData.addSize(FindCoordinatorRequestData.java:235)
        at org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)
        at org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187)
        at org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:101)
        at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:524)
        at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:500)
        at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:460)
        at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:478)
        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
        at java.base/java.lang.Thread.run(Thread.java:833)

2023-07-27 20:51:14,167 ERROR [org.apa.kaf.cli.pro.int.Sender] (kafka-producer-network-thread | kafka-dead-letter-topic-producer-kafka-consumer-words-in-1) [Producer clientId=kafka-dead-letter-topic-producer-kafka-consumer-words-in-1] Uncaught error in kafka producer I/O thread:  [Error Occurred After Shutdown]: java.lang.NullPointerException: Cannot invoke "String.getBytes(java.nio.charset.Charset)" because "this.key" is null
        at org.apache.kafka.common.message.FindCoordinatorRequestData.addSize(FindCoordinatorRequestData.java:235)
        at org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)
        at org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187)
        at org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:101)
        at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:524)
        at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:500)
        at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:460)
        at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:478)
        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
        at java.base/java.lang.Thread.run(Thread.java:833)

2023-07-27 20:51:14,597 INFO  [io.sma.rea.mes.kafka] (smallrye-kafka-consumer-thread-1) SRMSG18224: Executing consumer revoked re-balance listener for group 'engine'
2023-07-27 20:51:14,596 INFO  [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-1) SRMSG18256: Initialize record store for topic-partition 'words-0' at position -1.

@ozangunalp
Copy link
Contributor

Idempotent producer support may be different in Redpanda than in Kafka. For the dev service have you tried creating topics before hand using https://quarkus.io/guides/kafka#configuring-kafka-topics ?

@Dieken
Copy link
Contributor Author

Dieken commented Jul 31, 2023

Idempotent producer support may be different in Redpanda than in Kafka. For the dev service have you tried creating topics before hand using https://quarkus.io/guides/kafka#configuring-kafka-topics ?

quarkus.kafka.devservices.topic-partitions.messages=4

Tried this configuration, it doesn't help.

@Dieken
Copy link
Contributor Author

Dieken commented Jul 31, 2023

I agree @Dieken. The discovery should work in this case. I think this should be considered as a bug.

For now, you can work around it by configuring the dead letter queue serializer.

@ozangunalp Although mp.messaging.incoming.words-in.dead-letter-queue.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer fixed the original ClassNotFoundException, ignoring the harmless NPE error above, it would be better if Quarkus just works without this explicit configuration.

@cescoffier
Copy link
Member

I think that's the main issue: the DLQ serializer should be discovered automatically.
The idempotent provider issue is a difference between redpanda and confluent kafka - not something we can fix here (you can use kafka native as dev service it should behave like the confluent one)

@orlandokj
Copy link

orlandokj commented Aug 24, 2023

I had the same problem, looking at the quarkus logs on initialization I found this:
2023-08-24 08:25:03,263 INFO [io.qua.sma.dep.processor] (build-49) Generating Jackson deserializer for type ...
It doesn't generate the serializer (at least doesn't log it) maybe because it is not needed for an incoming channel.
Maybe a solution is to generate a serializer when there is a dead letter queue configured.

@cescoffier
Copy link
Member

Yes, that's one of the thing that need to be added. When using a DLQ you need both sides

@Dieken
Copy link
Contributor Author

Dieken commented Oct 7, 2023

I agree @Dieken. The discovery should work in this case. I think this should be considered as a bug.
For now, you can work around it by configuring the dead letter queue serializer.

@ozangunalp Although mp.messaging.incoming.words-in.dead-letter-queue.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer fixed the original ClassNotFoundException, ignoring the harmless NPE error above, it would be better if Quarkus just works without this explicit configuration.

@ozangunalp @cescoffier Could Quarkus just work without this explicit configuration of mp.messaging.incoming.xxx.dead-letter-queue.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer?

@cescoffier
Copy link
Member

That's the idea, but it's not implemented yet.

@ozangunalp
Copy link
Contributor

@Dieken thanks for bumping this, I'd forgotten about it. #36347 should resolve this.

@quarkus-bot quarkus-bot bot added this to the 3.5 - main milestone Oct 10, 2023
holly-cummins pushed a commit to holly-cummins/quarkus that referenced this issue Feb 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/kafka kind/bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants