From 0c8ee766516e694010f72eb38888f3b91f49e5f5 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Sun, 6 Aug 2023 15:32:26 +0200 Subject: [PATCH] Apply DLQ strategies on deserialization failure --- .../docs/kafka/receiving-kafka-records.md | 14 ++ .../kafka/DeserializationFailureHandler.java | 22 +++ .../kafka/fault/DeserializerWrapper.java | 5 + .../kafka/fault/KafkaDeadLetterQueue.java | 6 +- .../KafkaDeadLetterSerializationHandler.java | 48 ++++++ .../kafka/fault/KafkaDelayedRetryTopic.java | 6 +- .../messaging/kafka/impl/KafkaSource.java | 15 ++ .../kafka/fault/KafkaFailureHandlerTest.java | 148 +++++++++++++++++- 8 files changed, 257 insertions(+), 7 deletions(-) create mode 100644 smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterSerializationHandler.java diff --git a/documentation/src/main/docs/kafka/receiving-kafka-records.md b/documentation/src/main/docs/kafka/receiving-kafka-records.md index 9987a2d06e..dc3c007868 100644 --- a/documentation/src/main/docs/kafka/receiving-kafka-records.md +++ b/documentation/src/main/docs/kafka/receiving-kafka-records.md @@ -449,6 +449,20 @@ produce a `null` value. To enable this behavior, set the `mp.messaging.incoming.$channel.fail-on-deserialization-failure` attribute to `false`. +If the `fail-on-deserialization-failure` attribute is set to `false` and +the `failure-strategy` attribute is `dead-letter-queue` the failed record +will be sent to the corresponding *dead letter queue* topic. +The forwarded record will have the original key and value, +and the following headers set: + +- `deserialization-failure-reason`: The deserialization failure message +- `deserialization-failure-cause`: The deserialization failure cause if any +- `deserialization-failure-key`: Whether the deserialization failure happened on a key +- `deserialization-failure-topic`: The topic of the incoming message when a deserialization failure happen +- `deserialization-failure-deserializer`: The class name of the underlying deserializer +- `deserialization-failure-key-data`: If applicable the key data that was not able to be deserialized +- `deserialization-failure-value-data`: If applicable the value data that was not able to be deserialized + ## Receiving Cloud Events The Kafka connector supports [Cloud Events](https://cloudevents.io/). diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/DeserializationFailureHandler.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/DeserializationFailureHandler.java index 05997989c1..c529bae0c4 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/DeserializationFailureHandler.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/DeserializationFailureHandler.java @@ -49,11 +49,26 @@ public interface DeserializationFailureHandler { */ String DESERIALIZATION_FAILURE_DATA = "deserialization-failure-data"; + /** + * Header name passing the key data that was not able to be deserialized. + */ + String DESERIALIZATION_FAILURE_KEY_DATA = "deserialization-failure-key-data"; + + /** + * Header name passing the value data that was not able to be deserialized. + */ + String DESERIALIZATION_FAILURE_VALUE_DATA = "deserialization-failure-value-data"; + /** * Header name passing the class name of the underlying deserializer. */ String DESERIALIZATION_FAILURE_DESERIALIZER = "deserialization-failure-deserializer"; + /** + * Header name passing the class name of the underlying deserializer. + */ + String DESERIALIZATION_FAILURE_DLQ = "deserialization-failure-dlq"; + byte[] TRUE_VALUE = "true".getBytes(StandardCharsets.UTF_8); /** @@ -111,6 +126,7 @@ static Headers addFailureDetailsToHeaders(String deserializer, String topic, boo if (headers != null) { headers.add(DESERIALIZATION_FAILURE_DESERIALIZER, deserializer.getBytes(StandardCharsets.UTF_8)); + headers.add(DESERIALIZATION_FAILURE_DLQ, TRUE_VALUE); headers.add(DESERIALIZATION_FAILURE_TOPIC, topic.getBytes(StandardCharsets.UTF_8)); if (isKey) { @@ -124,6 +140,12 @@ static Headers addFailureDetailsToHeaders(String deserializer, String topic, boo headers.add(DESERIALIZATION_FAILURE_CAUSE, cause.getBytes(StandardCharsets.UTF_8)); } if (data != null) { + if (isKey) { + headers.add(DESERIALIZATION_FAILURE_KEY_DATA, data); + } else { + headers.add(DESERIALIZATION_FAILURE_VALUE_DATA, data); + } + // Do not break retro-compatibility headers.add(DESERIALIZATION_FAILURE_DATA, data); } } diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/DeserializerWrapper.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/DeserializerWrapper.java index e56943cd23..f08065ebb5 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/DeserializerWrapper.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/DeserializerWrapper.java @@ -1,5 +1,7 @@ package io.smallrye.reactive.messaging.kafka.fault; +import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.addFailureDetailsToHeaders; + import java.util.Map; import java.util.function.BiConsumer; import java.util.function.Supplier; @@ -114,6 +116,9 @@ private T wrapDeserialize(Supplier deserialize, String topic, Headers headers } throw new KafkaException(e); } + // insert failure details to headers + addFailureDetailsToHeaders(delegate.getClass().getName(), topic, handleKeys, headers, data, e); + // fallback to null return null; } } diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java index 06809c70fb..b4c3802a2e 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java @@ -1,5 +1,6 @@ package io.smallrye.reactive.messaging.kafka.fault; +import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_DLQ; import static io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging.log; import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG; @@ -91,9 +92,10 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config, (String) deadQueueProducerConfig.get(KEY_SERIALIZER_CLASS_CONFIG), (String) deadQueueProducerConfig.get(VALUE_SERIALIZER_CLASS_CONFIG)); + var dlqSerializationHandler = new KafkaDeadLetterSerializationHandler<>(); // fire producer event (e.g. bind metrics) ReactiveKafkaProducer producer = new ReactiveKafkaProducer<>(deadQueueProducerConfig, - deadQueueTopic, 10000, false, null, null, null, + deadQueueTopic, 10000, false, null, dlqSerializationHandler, dlqSerializationHandler, (p, c) -> kafkaCDIEvents.producer().fire(p)); return new KafkaDeadLetterQueue(config.getChannel(), deadQueueTopic, producer, reportFailure); @@ -153,6 +155,8 @@ public Uni handle(IncomingKafkaRecord record, Throwable reaso if (outgoing != null && outgoing.getHeaders() != null) { outgoing.getHeaders().forEach(header -> dead.headers().add(header)); } + // remove DESERIALIZATION_FAILURE_DLQ header to prevent unconditional DQL in next consume + dead.headers().remove(DESERIALIZATION_FAILURE_DLQ); log.messageNackedDeadLetter(channel, topic); return producer.send(dead) .onFailure().invoke(t -> reportFailure.accept((Throwable) t, true)) diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterSerializationHandler.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterSerializationHandler.java new file mode 100644 index 0000000000..27fbf2b799 --- /dev/null +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterSerializationHandler.java @@ -0,0 +1,48 @@ +package io.smallrye.reactive.messaging.kafka.fault; + +import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_DATA; +import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_DESERIALIZER; +import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_IS_KEY; +import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_KEY_DATA; +import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_VALUE_DATA; +import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.TRUE_VALUE; + +import java.util.Arrays; +import java.util.Objects; + +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; + +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.kafka.SerializationFailureHandler; + +public class KafkaDeadLetterSerializationHandler implements SerializationFailureHandler { + + @Override + public byte[] decorateSerialization(Uni serialization, String topic, boolean isKey, String serializer, T data, + Headers headers) { + // deserializer failure + if (headers.lastHeader(DESERIALIZATION_FAILURE_DESERIALIZER) != null) { + // data exists + Header dataHeader = headers.lastHeader(DESERIALIZATION_FAILURE_DATA); + if (dataHeader != null) { + // if this is the key serialization we look at the _KEY_DATA header + if (isKey) { + Header isKeyHeader = headers.lastHeader(DESERIALIZATION_FAILURE_IS_KEY); + if (isKeyHeader != null && Arrays.equals(isKeyHeader.value(), TRUE_VALUE)) { + Header keyDataHeader = headers.lastHeader(DESERIALIZATION_FAILURE_KEY_DATA); + // fallback to data header + return Objects.requireNonNullElse(keyDataHeader, dataHeader).value(); + } + // if this is the value serialization we look at the _VALUE_DATA header + } else { + Header valueDataHeader = headers.lastHeader(DESERIALIZATION_FAILURE_VALUE_DATA); + // fallback to data header + return Objects.requireNonNullElse(valueDataHeader, dataHeader).value(); + } + } + } + // call serialization + return SerializationFailureHandler.super.decorateSerialization(serialization, topic, isKey, serializer, data, headers); + } +} diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDelayedRetryTopic.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDelayedRetryTopic.java index a5c6f25df2..4873b27a3c 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDelayedRetryTopic.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDelayedRetryTopic.java @@ -1,5 +1,6 @@ package io.smallrye.reactive.messaging.kafka.fault; +import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_DLQ; import static io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging.log; import static io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer.createDeserializationFailureHandler; import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG; @@ -118,9 +119,10 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config, log.delayedRetryTopic(config.getChannel(), retryTopics, maxRetries, retryTimeout, deadQueueTopic); + var dlqSerializationHandler = new KafkaDeadLetterSerializationHandler<>(); // fire producer event (e.g. bind metrics) ReactiveKafkaProducer producer = new ReactiveKafkaProducer<>(delayedRetryTopicProducerConfig, - retryTopics.get(0), 10000, false, null, null, null, + retryTopics.get(0), 10000, false, null, dlqSerializationHandler, dlqSerializationHandler, (p, c) -> kafkaCDIEvents.producer().fire(p)); Map retryConsumerConfig = new HashMap<>(consumer.configuration()); @@ -269,6 +271,8 @@ public Uni handle(IncomingKafkaRecord record, Throwable reaso if (outgoing != null && outgoing.getHeaders() != null) { outgoing.getHeaders().forEach(header -> retry.headers().add(header)); } + // remove DESERIALIZATION_FAILURE_DLQ header to prevent unconditional DQL in next consume + retry.headers().remove(DESERIALIZATION_FAILURE_DLQ); log.delayedRetryNack(channel, topic); return producer.send(retry) .onFailure().invoke(t -> reportFailure.accept((Throwable) t, true)) diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java index 04bd6d974c..5d9bfc19c0 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java @@ -1,5 +1,7 @@ package io.smallrye.reactive.messaging.kafka.impl; +import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_DLQ; +import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_REASON; import static io.smallrye.reactive.messaging.kafka.i18n.KafkaExceptions.ex; import static io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging.log; import static io.smallrye.reactive.messaging.kafka.impl.RebalanceListeners.findMatchingListener; @@ -20,6 +22,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.RebalanceInProgressException; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.header.Header; import io.smallrye.common.annotation.Identifier; import io.smallrye.mutiny.Multi; @@ -34,6 +38,7 @@ import io.smallrye.reactive.messaging.kafka.KafkaRecord; import io.smallrye.reactive.messaging.kafka.commit.ContextHolder; import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler; +import io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue; import io.smallrye.reactive.messaging.kafka.fault.KafkaDelayedRetryTopic; import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler; import io.smallrye.reactive.messaging.kafka.health.KafkaSourceHealth; @@ -161,6 +166,16 @@ public KafkaSource(Vertx vertx, Multi> incomingMulti = multi.onItem().transformToUni(rec -> { IncomingKafkaRecord record = new IncomingKafkaRecord<>(rec, channel, index, commitHandler, failureHandler, isCloudEventEnabled, isTracingEnabled); + if ((failureHandler instanceof KafkaDeadLetterQueue) + && rec.headers() != null + && rec.headers().lastHeader(DESERIALIZATION_FAILURE_DLQ) != null) { + Header reasonMsgHeader = rec.headers().lastHeader(DESERIALIZATION_FAILURE_REASON); + String message = reasonMsgHeader != null ? new String(reasonMsgHeader.value()) : null; + RecordDeserializationException reason = new RecordDeserializationException( + TopicPartitions.getTopicPartition(record), record.getOffset(), message, null); + return failureHandler.handle(record, reason, record.getMetadata()) + .onItem().transform(ignore -> null); + } return commitHandler.received(record); }).concatenate(); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/fault/KafkaFailureHandlerTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/fault/KafkaFailureHandlerTest.java index bd8445b020..3d1bb0b009 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/fault/KafkaFailureHandlerTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/fault/KafkaFailureHandlerTest.java @@ -1,5 +1,12 @@ package io.smallrye.reactive.messaging.kafka.fault; +import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_DATA; +import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_DESERIALIZER; +import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_DLQ; +import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_KEY_DATA; +import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_REASON; +import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_TOPIC; +import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_VALUE_DATA; import static io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue.DEAD_LETTER_CAUSE; import static io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue.DEAD_LETTER_CAUSE_CLASS_NAME; import static io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue.DEAD_LETTER_EXCEPTION_CLASS_NAME; @@ -31,6 +38,7 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.eclipse.microprofile.reactive.messaging.Incoming; @@ -120,10 +128,10 @@ public void testDeadLetterQueueStrategyWithDefaultTopic() { ConsumerTask records = companion.consumeIntegers().fromTopics("dead-letter-topic-kafka", 3); - MyReceiverBean bean = runApplication(getDeadLetterQueueConfig(), MyReceiverBean.class); + MyReceiverBean bean = runApplication(getDeadLetterQueueConfig(topic), MyReceiverBean.class); await().until(this::isReady); - companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>("dead-letter-default", i), 10); + companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 10); await().atMost(2, TimeUnit.MINUTES).until(() -> bean.list().size() >= 10); assertThat(bean.list()).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); @@ -138,7 +146,7 @@ public void testDeadLetterQueueStrategyWithDefaultTopic() { assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE)).isNull(); assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE_CLASS_NAME)).isNull(); assertThat(new String(r.headers().lastHeader(DEAD_LETTER_PARTITION).value())).isEqualTo("0"); - assertThat(new String(r.headers().lastHeader(DEAD_LETTER_TOPIC).value())).isEqualTo("dead-letter-default"); + assertThat(new String(r.headers().lastHeader(DEAD_LETTER_TOPIC).value())).isEqualTo(topic); assertThat(new String(r.headers().lastHeader(DEAD_LETTER_OFFSET).value())).isNotNull().isIn("3", "6", "9"); }); @@ -148,6 +156,136 @@ public void testDeadLetterQueueStrategyWithDefaultTopic() { assertThat(bean.producers()).isEqualTo(1); } + @Test + public void testDeadLetterQueueStrategyWithDeserializationError() { + String dlqTopic = topic + "-dlq"; + + ConsumerTask records = companion.consumeStrings().fromTopics(dlqTopic, 10); + + MyReceiverBean bean = runApplication(getDeadLetterQueueConfig(topic) + .with("dead-letter-queue.topic", dlqTopic) + .with("fail-on-deserialization-failure", false), MyReceiverBean.class); + await().until(this::isReady); + + companion.produceStrings().usingGenerator(i -> new ProducerRecord<>(topic, "boom-" + i), 10); + + String expectedReason = "Size of data received by IntegerDeserializer is not 4"; + await().atMost(2, TimeUnit.MINUTES).until(() -> records.getRecords().size() == 10); + assertThat(records.getRecords()).allSatisfy(r -> { + assertThat(r.topic()).isEqualTo(dlqTopic); + assertThat(r.value()).startsWith("boom-"); + assertThat(new String(r.headers().lastHeader(DEAD_LETTER_EXCEPTION_CLASS_NAME).value())) + .isEqualTo(RecordDeserializationException.class.getName()); + assertThat(new String(r.headers().lastHeader(DEAD_LETTER_REASON).value())).isEqualTo(expectedReason); + assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE)).isNull(); + assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE_CLASS_NAME)).isNull(); + assertThat(new String(r.headers().lastHeader(DEAD_LETTER_PARTITION).value())).isEqualTo("0"); + assertThat(new String(r.headers().lastHeader(DEAD_LETTER_TOPIC).value())).isEqualTo(topic); + assertThat(new String(r.headers().lastHeader(DEAD_LETTER_OFFSET).value())).isNotNull(); + assertThat(new String(r.headers().lastHeader(DESERIALIZATION_FAILURE_REASON).value())).isEqualTo(expectedReason); + assertThat(new String(r.headers().lastHeader(DESERIALIZATION_FAILURE_DATA).value())).startsWith("boom-"); + assertThat(new String(r.headers().lastHeader(DESERIALIZATION_FAILURE_DESERIALIZER).value())) + .isEqualTo(IntegerDeserializer.class.getName()); + assertThat(new String(r.headers().lastHeader(DESERIALIZATION_FAILURE_TOPIC).value())).isEqualTo(topic); + }); + + assertThat(bean.list()).isEmpty(); + + assertThat(isAlive()).isTrue(); + + assertThat(bean.consumers()).isEqualTo(1); + assertThat(bean.producers()).isEqualTo(1); + } + + @Test + public void testDeadLetterQueueStrategyWithDeserializationErrorAndFailureHandler() { + String dlqTopic = topic + "-dlq"; + + ConsumerTask records = companion.consumeStrings().fromTopics(dlqTopic, 10); + + MyReceiverBean bean = runApplication(getDeadLetterQueueConfig(topic) + .with("dead-letter-queue.topic", dlqTopic) + .with("fail-on-deserialization-failure", false), MyReceiverBean.class); + await().until(this::isReady); + + companion.produceStrings().usingGenerator(i -> new ProducerRecord<>(topic, "boom-" + i), 10); + + String expectedReason = "Size of data received by IntegerDeserializer is not 4"; + await().atMost(2, TimeUnit.MINUTES).until(() -> records.getRecords().size() == 10); + assertThat(records.getRecords()).allSatisfy(r -> { + assertThat(r.topic()).isEqualTo(dlqTopic); + assertThat(r.value()).startsWith("boom-"); + assertThat(new String(r.headers().lastHeader(DEAD_LETTER_EXCEPTION_CLASS_NAME).value())) + .isEqualTo(RecordDeserializationException.class.getName()); + assertThat(new String(r.headers().lastHeader(DEAD_LETTER_REASON).value())).isEqualTo(expectedReason); + assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE)).isNull(); + assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE_CLASS_NAME)).isNull(); + assertThat(new String(r.headers().lastHeader(DEAD_LETTER_PARTITION).value())).isEqualTo("0"); + assertThat(new String(r.headers().lastHeader(DEAD_LETTER_TOPIC).value())).isEqualTo(topic); + assertThat(new String(r.headers().lastHeader(DEAD_LETTER_OFFSET).value())).isNotNull(); + assertThat(new String(r.headers().lastHeader(DESERIALIZATION_FAILURE_REASON).value())).isEqualTo(expectedReason); + assertThat(new String(r.headers().lastHeader(DESERIALIZATION_FAILURE_DATA).value())).startsWith("boom-"); + assertThat(new String(r.headers().lastHeader(DESERIALIZATION_FAILURE_DESERIALIZER).value())) + .isEqualTo(IntegerDeserializer.class.getName()); + assertThat(new String(r.headers().lastHeader(DESERIALIZATION_FAILURE_TOPIC).value())).isEqualTo(topic); + assertThat(r.headers().lastHeader(DESERIALIZATION_FAILURE_DLQ)).isNull(); + }); + + assertThat(bean.list()).isEmpty(); + + assertThat(isAlive()).isTrue(); + + assertThat(bean.consumers()).isEqualTo(1); + assertThat(bean.producers()).isEqualTo(1); + } + + @Test + public void testDeadLetterQueueStrategyWithKeyDeserializationError() { + String dlqTopic = topic + "-dlq"; + + ConsumerTask records = companion.consume(Integer.class, String.class) + .fromTopics(dlqTopic, 10); + + MyReceiverBean bean = runApplication(getDeadLetterQueueConfig(topic) + .with("dead-letter-queue.topic", dlqTopic) + .with("fail-on-deserialization-failure", false) + .with("key.deserializer.encoding", "unknown-encoding"), MyReceiverBean.class); + await().until(this::isReady); + + companion.produce(Integer.class, String.class) + .usingGenerator(i -> new ProducerRecord<>(topic, i, "boom-" + i), 10); + + String expectedReason = "Size of data received by IntegerDeserializer is not 4"; + await().atMost(2, TimeUnit.MINUTES).until(() -> records.getRecords().size() == 10); + assertThat(records.getRecords()).allSatisfy(r -> { + assertThat(r.topic()).isEqualTo(dlqTopic); + assertThat(r.key()).isIn(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + assertThat(r.value()).startsWith("boom-"); + assertThat(new String(r.headers().lastHeader(DEAD_LETTER_EXCEPTION_CLASS_NAME).value())) + .isEqualTo(RecordDeserializationException.class.getName()); + assertThat(new String(r.headers().lastHeader(DEAD_LETTER_REASON).value())).isEqualTo(expectedReason); + assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE)).isNull(); + assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE_CLASS_NAME)).isNull(); + assertThat(new String(r.headers().lastHeader(DEAD_LETTER_PARTITION).value())).isEqualTo("0"); + assertThat(new String(r.headers().lastHeader(DEAD_LETTER_TOPIC).value())).isEqualTo(topic); + assertThat(new String(r.headers().lastHeader(DEAD_LETTER_OFFSET).value())).isNotNull(); + assertThat(new String(r.headers().lastHeader(DESERIALIZATION_FAILURE_REASON).value())).isEqualTo(expectedReason); + assertThat(new String(r.headers().lastHeader(DESERIALIZATION_FAILURE_KEY_DATA).value())).isNotNull(); + assertThat(new String(r.headers().lastHeader(DESERIALIZATION_FAILURE_VALUE_DATA).value())).startsWith("boom-"); + assertThat(new String(r.headers().lastHeader(DESERIALIZATION_FAILURE_DESERIALIZER).value())) + .isEqualTo(IntegerDeserializer.class.getName()); + assertThat(new String(r.headers().lastHeader(DESERIALIZATION_FAILURE_TOPIC).value())).isEqualTo(topic); + assertThat(r.headers().lastHeader(DESERIALIZATION_FAILURE_DLQ)).isNull(); + }); + + assertThat(bean.list()).isEmpty(); + + assertThat(isAlive()).isTrue(); + + assertThat(bean.consumers()).isEqualTo(1); + assertThat(bean.producers()).isEqualTo(1); + } + @Test public void testDeadLetterQueueStrategyWithMessageLessThrowable() { String dqTopic = topic + "-dead-letter-topic"; @@ -487,9 +625,9 @@ private KafkaMapBasedConfig getIgnoreConfig(String topic) { return config; } - private KafkaMapBasedConfig getDeadLetterQueueConfig() { + private KafkaMapBasedConfig getDeadLetterQueueConfig(String topic) { KafkaMapBasedConfig config = kafkaConfig("mp.messaging.incoming.kafka"); - config.put("topic", "dead-letter-default"); + config.put("topic", topic); config.put("group.id", UUID.randomUUID().toString()); config.put("value.deserializer", IntegerDeserializer.class.getName()); config.put("enable.auto.commit", "false");