Skip to content

Commit

Permalink
Kafka serde discovery generates serializer for dead letter queue fail…
Browse files Browse the repository at this point in the history
…ure strategy

Resolves quarkusio#34931
  • Loading branch information
ozangunalp authored and holly-cummins committed Feb 8, 2024
1 parent 2d60252 commit 42b35fc
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,16 @@ static boolean hasStateStoreConfig(String stateStoreName, Config config) {
return stateStores.contains(stateStoreName);
}

static boolean hasDLQConfig(String channelName, Config config) {
String propertyKey = getChannelPropertyKey(channelName, "failure-strategy", true);
Optional<String> channelFailureStrategy = config.getOptionalValue(propertyKey, String.class);
Optional<String> failureStrategy = channelFailureStrategy.or(() -> getConnectorProperty("failure-strategy", config));

return failureStrategy.isPresent()
&& (failureStrategy.get().equals("dead-letter-queue")
|| failureStrategy.get().equals("delayed-retry-topic"));
}

private static Optional<String> getConnectorProperty(String keySuffix, Config config) {
return config.getOptionalValue("mp.messaging.connector." + KafkaConnector.CONNECTOR_NAME + "." + keySuffix,
String.class);
Expand Down Expand Up @@ -207,8 +217,8 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
BuildProducer<RunTimeConfigurationDefaultBuildItem> config,
BuildProducer<GeneratedClassBuildItem> generatedClass,
BuildProducer<ReflectiveClassBuildItem> reflection) {
Map<String, String> alreadyGeneratedSerializers = new HashMap<>();
Map<String, String> alreadyGeneratedDeserializers = new HashMap<>();
Map<String, Result> alreadyGeneratedSerializers = new HashMap<>();
Map<String, Result> alreadyGeneratedDeserializers = new HashMap<>();
for (AnnotationInstance annotation : discovery.findRepeatableAnnotationsOnMethods(DotNames.INCOMING)) {
String channelName = annotation.value().asString();
if (!discovery.isKafkaConnector(channelsManagedByConnectors, true, channelName)) {
Expand All @@ -220,7 +230,7 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
Type incomingType = getIncomingTypeFromMethod(method);

processIncomingType(discovery, config, incomingType, channelName, generatedClass, reflection,
alreadyGeneratedDeserializers);
alreadyGeneratedDeserializers, alreadyGeneratedSerializers);
}

for (AnnotationInstance annotation : discovery.findRepeatableAnnotationsOnMethods(DotNames.OUTGOING)) {
Expand Down Expand Up @@ -257,7 +267,7 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
Type incomingType = getIncomingTypeFromChannelInjectionPoint(injectionPointType);

processIncomingType(discovery, config, incomingType, channelName, generatedClass, reflection,
alreadyGeneratedDeserializers);
alreadyGeneratedDeserializers, alreadyGeneratedSerializers);

processKafkaTransactions(discovery, config, channelName, injectionPointType);

Expand Down Expand Up @@ -293,11 +303,12 @@ private void processKafkaTransactions(DefaultSerdeDiscoveryState discovery,
private void processIncomingType(DefaultSerdeDiscoveryState discovery,
BuildProducer<RunTimeConfigurationDefaultBuildItem> config, Type incomingType, String channelName,
BuildProducer<GeneratedClassBuildItem> generatedClass, BuildProducer<ReflectiveClassBuildItem> reflection,
Map<String, String> alreadyGeneratedDeserializers) {
Map<String, Result> alreadyGeneratedDeserializers, Map<String, Result> alreadyGeneratedSerializers) {
extractKeyValueType(incomingType, (key, value, isBatchType) -> {
Result keyDeserializer = deserializerFor(discovery, key, generatedClass, reflection, alreadyGeneratedDeserializers);
Result valueDeserializer = deserializerFor(discovery, value, generatedClass, reflection,
alreadyGeneratedDeserializers);
Result keyDeserializer = deserializerFor(discovery, key, true, channelName, generatedClass, reflection,
alreadyGeneratedDeserializers, alreadyGeneratedSerializers);
Result valueDeserializer = deserializerFor(discovery, value, false, channelName, generatedClass, reflection,
alreadyGeneratedDeserializers, alreadyGeneratedSerializers);

produceRuntimeConfigurationDefaultBuildItem(discovery, config,
getChannelPropertyKey(channelName, "key.deserializer", true), keyDeserializer);
Expand Down Expand Up @@ -494,7 +505,7 @@ private Type getOutgoingTypeFromChannelInjectionPoint(Type injectionPointType) {

private void processOutgoingType(DefaultSerdeDiscoveryState discovery, Type outgoingType,
BiConsumer<Result, Result> serializerAcceptor, BuildProducer<GeneratedClassBuildItem> generatedClass,
BuildProducer<ReflectiveClassBuildItem> reflection, Map<String, String> alreadyGeneratedSerializer) {
BuildProducer<ReflectiveClassBuildItem> reflection, Map<String, Result> alreadyGeneratedSerializer) {
extractKeyValueType(outgoingType, (key, value, isBatch) -> {
Result keySerializer = serializerFor(discovery, key, generatedClass, reflection,
alreadyGeneratedSerializer);
Expand Down Expand Up @@ -766,10 +777,14 @@ private static boolean isRawMessage(Type type) {
);
// @formatter:on

private Result deserializerFor(DefaultSerdeDiscoveryState discovery, Type type,
private Result deserializerFor(DefaultSerdeDiscoveryState discovery,
Type type,
boolean key,
String channelName,
BuildProducer<GeneratedClassBuildItem> generatedClass,
BuildProducer<ReflectiveClassBuildItem> reflection,
Map<String, String> alreadyGeneratedSerializers) {
Map<String, Result> alreadyGeneratedDeserializers,
Map<String, Result> alreadyGeneratedSerializers) {
Result result = serializerDeserializerFor(discovery, type, false);
if (result != null && !result.exists) {
// avoid returning Result.nonexistent() to callers, they expect a non-null Result to always be known
Expand All @@ -779,24 +794,34 @@ private Result deserializerFor(DefaultSerdeDiscoveryState discovery, Type type,
// also, only generate the serializer/deserializer for classes and only generate once
if (result == null && type != null && generatedClass != null && type.kind() == Type.Kind.CLASS) {
// Check if already generated
String clazz = alreadyGeneratedSerializers.get(type.toString());
if (clazz == null) {
clazz = JacksonSerdeGenerator.generateDeserializer(generatedClass, type);
result = alreadyGeneratedDeserializers.get(type.toString());
if (result == null) {
String clazz = JacksonSerdeGenerator.generateDeserializer(generatedClass, type);
LOGGER.infof("Generating Jackson deserializer for type %s", type.name().toString());
// Deserializers are access by reflection.
reflection.produce(
ReflectiveClassBuildItem.builder(clazz).methods().build());
alreadyGeneratedSerializers.put(type.toString(), clazz);
alreadyGeneratedDeserializers.put(type.toString(), result);
// if the channel has a DLQ config generate a serializer as well
if (hasDLQConfig(channelName, discovery.getConfig())) {
Result serializer = serializerFor(discovery, type, generatedClass, reflection, alreadyGeneratedSerializers);
if (serializer != null) {
result = Result.of(clazz)
.with(key, "dead-letter-queue.key.serializer", serializer.value)
.with(!key, "dead-letter-queue.value.serializer", serializer.value);
}
} else {
result = Result.of(clazz);
}
}
result = Result.of(clazz);
}
return result;
}

private Result serializerFor(DefaultSerdeDiscoveryState discovery, Type type,
BuildProducer<GeneratedClassBuildItem> generatedClass,
BuildProducer<ReflectiveClassBuildItem> reflection,
Map<String, String> alreadyGeneratedSerializers) {
Map<String, Result> alreadyGeneratedSerializers) {
Result result = serializerDeserializerFor(discovery, type, true);
if (result != null && !result.exists) {
// avoid returning Result.nonexistent() to callers, they expect a non-null Result to always be known
Expand All @@ -806,16 +831,16 @@ private Result serializerFor(DefaultSerdeDiscoveryState discovery, Type type,
// also, only generate the serializer/deserializer for classes and only generate once
if (result == null && type != null && generatedClass != null && type.kind() == Type.Kind.CLASS) {
// Check if already generated
String clazz = alreadyGeneratedSerializers.get(type.toString());
if (clazz == null) {
clazz = JacksonSerdeGenerator.generateSerializer(generatedClass, type);
result = alreadyGeneratedSerializers.get(type.toString());
if (result == null) {
String clazz = JacksonSerdeGenerator.generateSerializer(generatedClass, type);
LOGGER.infof("Generating Jackson serializer for type %s", type.name().toString());
// Serializers are access by reflection.
reflection.produce(
ReflectiveClassBuildItem.builder(clazz).methods().build());
alreadyGeneratedSerializers.put(type.toString(), clazz);
result = Result.of(clazz);
alreadyGeneratedSerializers.put(type.toString(), result);
}
result = Result.of(clazz);
}

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;

import jakarta.inject.Inject;

Expand All @@ -22,6 +23,7 @@
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.assertj.core.api.Assert;
import org.assertj.core.groups.Tuple;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.spi.ConfigProviderResolver;
Expand All @@ -40,7 +42,9 @@
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

import io.quarkus.deployment.builditem.GeneratedClassBuildItem;
import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.kafka.client.serialization.JsonbSerializer;
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;
import io.quarkus.smallrye.reactivemessaging.deployment.items.ConnectorManagedChannelBuildItem;
Expand All @@ -63,7 +67,16 @@ private static void doTest(Tuple[] expectations, Class<?>... classesToIndex) {
}

private static void doTest(Config customConfig, Tuple[] expectations, Class<?>... classesToIndex) {
doTest(customConfig, expectations, Collections.emptyList(), Collections.emptyList(), classesToIndex);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
private static void doTest(Config customConfig, Tuple[] expectations,
List<Function<String, Assert>> generatedNames,
List<Function<String, Assert>> reflectiveNames, Class<?>... classesToIndex) {
List<RunTimeConfigurationDefaultBuildItem> configs = new ArrayList<>();
List<GeneratedClassBuildItem> generated = new ArrayList<>();
List<ReflectiveClassBuildItem> reflective = new ArrayList<>();

List<Class<?>> classes = new ArrayList<>(Arrays.asList(classesToIndex));
classes.add(Incoming.class);
Expand All @@ -81,11 +94,35 @@ boolean isKafkaConnector(List<ConnectorManagedChannelBuildItem> list, boolean in
};
try {
new SmallRyeReactiveMessagingKafkaProcessor().discoverDefaultSerdeConfig(discovery, Collections.emptyList(),
configs::add, null, null);
configs::add,
(generatedNames == null) ? null : generated::add,
(reflectiveNames == null) ? null : reflective::add);

assertThat(configs)
.extracting(RunTimeConfigurationDefaultBuildItem::getKey, RunTimeConfigurationDefaultBuildItem::getValue)
.containsExactlyInAnyOrder(expectations);
.allSatisfy(tuple -> {
Object[] e = tuple.toArray();
String key = (String) e[0];
String value = (String) e[1];
assertThat(Arrays.stream(expectations).filter(t -> key.equals(t.toArray()[0])))
.hasSize(1)
.satisfiesOnlyOnce(t -> {
Object o = t.toArray()[1];
if (o instanceof String) {
assertThat(value).isEqualTo((String) o);
} else {
((Function<String, Assert>) o).apply(value);
}
});
});

assertThat(generated)
.extracting(GeneratedClassBuildItem::getName)
.allSatisfy(s -> assertThat(generatedNames).satisfiesOnlyOnce(c -> c.apply(s)));

assertThat(reflective)
.flatExtracting(ReflectiveClassBuildItem::getClassNames)
.allSatisfy(s -> assertThat(reflectiveNames).satisfiesOnlyOnce(c -> c.apply(s)));
} finally {
// must not leak the Config instance associated to the system classloader
if (customConfig == null) {
Expand All @@ -94,6 +131,14 @@ boolean isKafkaConnector(List<ConnectorManagedChannelBuildItem> list, boolean in
}
}

Function<String, Assert> assertMatches(String regex) {
return s -> assertThat(s).matches(regex);
}

Function<String, Assert> assertStartsWith(String starts) {
return s -> assertThat(s).startsWith(starts);
}

private static IndexView index(List<Class<?>> classes) {
Indexer indexer = new Indexer();
for (Class<?> clazz : classes) {
Expand Down Expand Up @@ -2570,11 +2615,14 @@ public void genericSerdeImplementationAutoDetect() {

Tuple[] expectations1 = {
tuple("mp.messaging.outgoing.channel1.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"),

tuple("mp.messaging.incoming.channel2.key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer"),
tuple("mp.messaging.incoming.channel3.value.deserializer", assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Deserializer_")),
tuple("mp.messaging.incoming.channel2.value.deserializer", "io.quarkus.kafka.client.serialization.JsonObjectDeserializer"),
};

var generated1 = List.of(assertStartsWith("io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest$CustomDto_Deserializer_"));
var reflective1 = List.of(assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Deserializer_"));

Tuple[] expectations2 = {
tuple("mp.messaging.outgoing.channel1.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$MySerializer"),

Expand Down Expand Up @@ -2602,7 +2650,7 @@ public void genericSerdeImplementationAutoDetect() {
};
// @formatter:on

doTest(expectations1, CustomSerdeImplementation.class, CustomDto.class);
doTest(null, expectations1, generated1, reflective1, CustomSerdeImplementation.class, CustomDto.class);

doTest(expectations2, CustomSerdeImplementation.class, CustomDto.class,
MySerializer.class,
Expand Down Expand Up @@ -2795,5 +2843,51 @@ void method1(KafkaRecord<Integer, String> msg) {

}

@Test
void deadLetterQueue() {
Tuple[] expectations = {
tuple("mp.messaging.incoming.channel1.value.deserializer",
assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Deserializer_")),
tuple("mp.messaging.incoming.channel1.dead-letter-queue.value.serializer",
assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Serializer_")),
tuple("mp.messaging.incoming.channel2.key.deserializer",
assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Deserializer_")),
tuple("mp.messaging.incoming.channel2.value.deserializer",
assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Deserializer_")),
tuple("mp.messaging.incoming.channel2.dead-letter-queue.key.serializer",
assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Serializer_")),
tuple("mp.messaging.incoming.channel2.dead-letter-queue.value.serializer",
assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Serializer_")),
};
var generated = List.of(
assertStartsWith("io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest$CustomDto_Deserializer_"),
assertStartsWith("io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest$CustomDto_Serializer_")
);
var reflective = List.of(
assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Deserializer_"),
assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Serializer_")
);
doTest(new SmallRyeConfigBuilder()
.withSources(new MapBackedConfigSource("test", Map.of(
"mp.messaging.incoming.channel1.failure-strategy", "dead-letter-queue",
"mp.messaging.incoming.channel2.failure-strategy", "delayed-retry-topic")) {
})
.build(), expectations, generated, reflective, DeadLetterQueue.class);
}

private static class DeadLetterQueue {

@Incoming("channel1")
void method1(CustomDto msg) {

}

@Incoming("channel2")
void method2(Record<CustomDto, CustomDto> msg) {

}

}


}

0 comments on commit 42b35fc

Please sign in to comment.