diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java index 825c9550e3ff4..acb89e62ae24e 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java @@ -91,6 +91,16 @@ static boolean hasStateStoreConfig(String stateStoreName, Config config) { return stateStores.contains(stateStoreName); } + static boolean hasDLQConfig(String channelName, Config config) { + String propertyKey = getChannelPropertyKey(channelName, "failure-strategy", true); + Optional channelFailureStrategy = config.getOptionalValue(propertyKey, String.class); + Optional failureStrategy = channelFailureStrategy.or(() -> getConnectorProperty("failure-strategy", config)); + + return failureStrategy.isPresent() + && (failureStrategy.get().equals("dead-letter-queue") + || failureStrategy.get().equals("delayed-retry-topic")); + } + private static Optional getConnectorProperty(String keySuffix, Config config) { return config.getOptionalValue("mp.messaging.connector." + KafkaConnector.CONNECTOR_NAME + "." + keySuffix, String.class); @@ -207,8 +217,8 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery, BuildProducer config, BuildProducer generatedClass, BuildProducer reflection) { - Map alreadyGeneratedSerializers = new HashMap<>(); - Map alreadyGeneratedDeserializers = new HashMap<>(); + Map alreadyGeneratedSerializers = new HashMap<>(); + Map alreadyGeneratedDeserializers = new HashMap<>(); for (AnnotationInstance annotation : discovery.findRepeatableAnnotationsOnMethods(DotNames.INCOMING)) { String channelName = annotation.value().asString(); if (!discovery.isKafkaConnector(channelsManagedByConnectors, true, channelName)) { @@ -220,7 +230,7 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery, Type incomingType = getIncomingTypeFromMethod(method); processIncomingType(discovery, config, incomingType, channelName, generatedClass, reflection, - alreadyGeneratedDeserializers); + alreadyGeneratedDeserializers, alreadyGeneratedSerializers); } for (AnnotationInstance annotation : discovery.findRepeatableAnnotationsOnMethods(DotNames.OUTGOING)) { @@ -257,7 +267,7 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery, Type incomingType = getIncomingTypeFromChannelInjectionPoint(injectionPointType); processIncomingType(discovery, config, incomingType, channelName, generatedClass, reflection, - alreadyGeneratedDeserializers); + alreadyGeneratedDeserializers, alreadyGeneratedSerializers); processKafkaTransactions(discovery, config, channelName, injectionPointType); @@ -293,11 +303,12 @@ private void processKafkaTransactions(DefaultSerdeDiscoveryState discovery, private void processIncomingType(DefaultSerdeDiscoveryState discovery, BuildProducer config, Type incomingType, String channelName, BuildProducer generatedClass, BuildProducer reflection, - Map alreadyGeneratedDeserializers) { + Map alreadyGeneratedDeserializers, Map alreadyGeneratedSerializers) { extractKeyValueType(incomingType, (key, value, isBatchType) -> { - Result keyDeserializer = deserializerFor(discovery, key, generatedClass, reflection, alreadyGeneratedDeserializers); - Result valueDeserializer = deserializerFor(discovery, value, generatedClass, reflection, - alreadyGeneratedDeserializers); + Result keyDeserializer = deserializerFor(discovery, key, true, channelName, generatedClass, reflection, + alreadyGeneratedDeserializers, alreadyGeneratedSerializers); + Result valueDeserializer = deserializerFor(discovery, value, false, channelName, generatedClass, reflection, + alreadyGeneratedDeserializers, alreadyGeneratedSerializers); produceRuntimeConfigurationDefaultBuildItem(discovery, config, getChannelPropertyKey(channelName, "key.deserializer", true), keyDeserializer); @@ -494,7 +505,7 @@ private Type getOutgoingTypeFromChannelInjectionPoint(Type injectionPointType) { private void processOutgoingType(DefaultSerdeDiscoveryState discovery, Type outgoingType, BiConsumer serializerAcceptor, BuildProducer generatedClass, - BuildProducer reflection, Map alreadyGeneratedSerializer) { + BuildProducer reflection, Map alreadyGeneratedSerializer) { extractKeyValueType(outgoingType, (key, value, isBatch) -> { Result keySerializer = serializerFor(discovery, key, generatedClass, reflection, alreadyGeneratedSerializer); @@ -766,10 +777,14 @@ private static boolean isRawMessage(Type type) { ); // @formatter:on - private Result deserializerFor(DefaultSerdeDiscoveryState discovery, Type type, + private Result deserializerFor(DefaultSerdeDiscoveryState discovery, + Type type, + boolean key, + String channelName, BuildProducer generatedClass, BuildProducer reflection, - Map alreadyGeneratedSerializers) { + Map alreadyGeneratedDeserializers, + Map alreadyGeneratedSerializers) { Result result = serializerDeserializerFor(discovery, type, false); if (result != null && !result.exists) { // avoid returning Result.nonexistent() to callers, they expect a non-null Result to always be known @@ -779,16 +794,26 @@ private Result deserializerFor(DefaultSerdeDiscoveryState discovery, Type type, // also, only generate the serializer/deserializer for classes and only generate once if (result == null && type != null && generatedClass != null && type.kind() == Type.Kind.CLASS) { // Check if already generated - String clazz = alreadyGeneratedSerializers.get(type.toString()); - if (clazz == null) { - clazz = JacksonSerdeGenerator.generateDeserializer(generatedClass, type); + result = alreadyGeneratedDeserializers.get(type.toString()); + if (result == null) { + String clazz = JacksonSerdeGenerator.generateDeserializer(generatedClass, type); LOGGER.infof("Generating Jackson deserializer for type %s", type.name().toString()); // Deserializers are access by reflection. reflection.produce( ReflectiveClassBuildItem.builder(clazz).methods().build()); - alreadyGeneratedSerializers.put(type.toString(), clazz); + alreadyGeneratedDeserializers.put(type.toString(), result); + // if the channel has a DLQ config generate a serializer as well + if (hasDLQConfig(channelName, discovery.getConfig())) { + Result serializer = serializerFor(discovery, type, generatedClass, reflection, alreadyGeneratedSerializers); + if (serializer != null) { + result = Result.of(clazz) + .with(key, "dead-letter-queue.key.serializer", serializer.value) + .with(!key, "dead-letter-queue.value.serializer", serializer.value); + } + } else { + result = Result.of(clazz); + } } - result = Result.of(clazz); } return result; } @@ -796,7 +821,7 @@ private Result deserializerFor(DefaultSerdeDiscoveryState discovery, Type type, private Result serializerFor(DefaultSerdeDiscoveryState discovery, Type type, BuildProducer generatedClass, BuildProducer reflection, - Map alreadyGeneratedSerializers) { + Map alreadyGeneratedSerializers) { Result result = serializerDeserializerFor(discovery, type, true); if (result != null && !result.exists) { // avoid returning Result.nonexistent() to callers, they expect a non-null Result to always be known @@ -806,16 +831,16 @@ private Result serializerFor(DefaultSerdeDiscoveryState discovery, Type type, // also, only generate the serializer/deserializer for classes and only generate once if (result == null && type != null && generatedClass != null && type.kind() == Type.Kind.CLASS) { // Check if already generated - String clazz = alreadyGeneratedSerializers.get(type.toString()); - if (clazz == null) { - clazz = JacksonSerdeGenerator.generateSerializer(generatedClass, type); + result = alreadyGeneratedSerializers.get(type.toString()); + if (result == null) { + String clazz = JacksonSerdeGenerator.generateSerializer(generatedClass, type); LOGGER.infof("Generating Jackson serializer for type %s", type.name().toString()); // Serializers are access by reflection. reflection.produce( ReflectiveClassBuildItem.builder(clazz).methods().build()); - alreadyGeneratedSerializers.put(type.toString(), clazz); + result = Result.of(clazz); + alreadyGeneratedSerializers.put(type.toString(), result); } - result = Result.of(clazz); } return result; diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java index c1ca6b2e4eb93..ce11568c4e445 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java @@ -11,6 +11,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletionStage; +import java.util.function.Function; import jakarta.inject.Inject; @@ -22,6 +23,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; +import org.assertj.core.api.Assert; import org.assertj.core.groups.Tuple; import org.eclipse.microprofile.config.Config; import org.eclipse.microprofile.config.spi.ConfigProviderResolver; @@ -40,7 +42,9 @@ import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; +import io.quarkus.deployment.builditem.GeneratedClassBuildItem; import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem; +import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; import io.quarkus.kafka.client.serialization.JsonbSerializer; import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer; import io.quarkus.smallrye.reactivemessaging.deployment.items.ConnectorManagedChannelBuildItem; @@ -63,7 +67,16 @@ private static void doTest(Tuple[] expectations, Class... classesToIndex) { } private static void doTest(Config customConfig, Tuple[] expectations, Class... classesToIndex) { + doTest(customConfig, expectations, Collections.emptyList(), Collections.emptyList(), classesToIndex); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private static void doTest(Config customConfig, Tuple[] expectations, + List> generatedNames, + List> reflectiveNames, Class... classesToIndex) { List configs = new ArrayList<>(); + List generated = new ArrayList<>(); + List reflective = new ArrayList<>(); List> classes = new ArrayList<>(Arrays.asList(classesToIndex)); classes.add(Incoming.class); @@ -81,11 +94,35 @@ boolean isKafkaConnector(List list, boolean in }; try { new SmallRyeReactiveMessagingKafkaProcessor().discoverDefaultSerdeConfig(discovery, Collections.emptyList(), - configs::add, null, null); + configs::add, + (generatedNames == null) ? null : generated::add, + (reflectiveNames == null) ? null : reflective::add); assertThat(configs) .extracting(RunTimeConfigurationDefaultBuildItem::getKey, RunTimeConfigurationDefaultBuildItem::getValue) - .containsExactlyInAnyOrder(expectations); + .allSatisfy(tuple -> { + Object[] e = tuple.toArray(); + String key = (String) e[0]; + String value = (String) e[1]; + assertThat(Arrays.stream(expectations).filter(t -> key.equals(t.toArray()[0]))) + .hasSize(1) + .satisfiesOnlyOnce(t -> { + Object o = t.toArray()[1]; + if (o instanceof String) { + assertThat(value).isEqualTo((String) o); + } else { + ((Function) o).apply(value); + } + }); + }); + + assertThat(generated) + .extracting(GeneratedClassBuildItem::getName) + .allSatisfy(s -> assertThat(generatedNames).satisfiesOnlyOnce(c -> c.apply(s))); + + assertThat(reflective) + .flatExtracting(ReflectiveClassBuildItem::getClassNames) + .allSatisfy(s -> assertThat(reflectiveNames).satisfiesOnlyOnce(c -> c.apply(s))); } finally { // must not leak the Config instance associated to the system classloader if (customConfig == null) { @@ -94,6 +131,14 @@ boolean isKafkaConnector(List list, boolean in } } + Function assertMatches(String regex) { + return s -> assertThat(s).matches(regex); + } + + Function assertStartsWith(String starts) { + return s -> assertThat(s).startsWith(starts); + } + private static IndexView index(List> classes) { Indexer indexer = new Indexer(); for (Class clazz : classes) { @@ -2570,11 +2615,14 @@ public void genericSerdeImplementationAutoDetect() { Tuple[] expectations1 = { tuple("mp.messaging.outgoing.channel1.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), - tuple("mp.messaging.incoming.channel2.key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer"), + tuple("mp.messaging.incoming.channel3.value.deserializer", assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Deserializer_")), tuple("mp.messaging.incoming.channel2.value.deserializer", "io.quarkus.kafka.client.serialization.JsonObjectDeserializer"), }; + var generated1 = List.of(assertStartsWith("io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest$CustomDto_Deserializer_")); + var reflective1 = List.of(assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Deserializer_")); + Tuple[] expectations2 = { tuple("mp.messaging.outgoing.channel1.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$MySerializer"), @@ -2602,7 +2650,7 @@ public void genericSerdeImplementationAutoDetect() { }; // @formatter:on - doTest(expectations1, CustomSerdeImplementation.class, CustomDto.class); + doTest(null, expectations1, generated1, reflective1, CustomSerdeImplementation.class, CustomDto.class); doTest(expectations2, CustomSerdeImplementation.class, CustomDto.class, MySerializer.class, @@ -2795,5 +2843,51 @@ void method1(KafkaRecord msg) { } + @Test + void deadLetterQueue() { + Tuple[] expectations = { + tuple("mp.messaging.incoming.channel1.value.deserializer", + assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Deserializer_")), + tuple("mp.messaging.incoming.channel1.dead-letter-queue.value.serializer", + assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Serializer_")), + tuple("mp.messaging.incoming.channel2.key.deserializer", + assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Deserializer_")), + tuple("mp.messaging.incoming.channel2.value.deserializer", + assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Deserializer_")), + tuple("mp.messaging.incoming.channel2.dead-letter-queue.key.serializer", + assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Serializer_")), + tuple("mp.messaging.incoming.channel2.dead-letter-queue.value.serializer", + assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Serializer_")), + }; + var generated = List.of( + assertStartsWith("io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest$CustomDto_Deserializer_"), + assertStartsWith("io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest$CustomDto_Serializer_") + ); + var reflective = List.of( + assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Deserializer_"), + assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Serializer_") + ); + doTest(new SmallRyeConfigBuilder() + .withSources(new MapBackedConfigSource("test", Map.of( + "mp.messaging.incoming.channel1.failure-strategy", "dead-letter-queue", + "mp.messaging.incoming.channel2.failure-strategy", "delayed-retry-topic")) { + }) + .build(), expectations, generated, reflective, DeadLetterQueue.class); + } + + private static class DeadLetterQueue { + + @Incoming("channel1") + void method1(CustomDto msg) { + + } + + @Incoming("channel2") + void method2(Record msg) { + + } + + } + }