Skip to content

Commit

Permalink
Apply DLQ strategies on deserialization failure
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Aug 8, 2023
1 parent 43d3c2d commit f077a45
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 7 deletions.
14 changes: 14 additions & 0 deletions documentation/src/main/docs/kafka/receiving-kafka-records.md
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,20 @@ produce a `null` value. To enable this behavior, set the
`mp.messaging.incoming.$channel.fail-on-deserialization-failure`
attribute to `false`.

If the `fail-on-deserialization-failure` attribute is set to `false` and
the `failure-strategy` attribute is `dead-letter-queue` the failed record
will be sent to the corresponding *dead letter queue* topic.
The forwarded record will have the original key and value,
and the following headers set:

- `deserialization-failure-reason`: The deserialization failure message
- `deserialization-failure-cause`: The deserialization failure cause if any
- `deserialization-failure-key`: Whether the deserialization failure happened on a key
- `deserialization-failure-topic`: The topic of the incoming message when a deserialization failure happen
- `deserialization-failure-deserializer`: The class name of the underlying deserializer
- `deserialization-failure-key-data`: If applicable the key data that was not able to be deserialized
- `deserialization-failure-value-data`: If applicable the value data that was not able to be deserialized

## Receiving Cloud Events

The Kafka connector supports [Cloud Events](https://cloudevents.io/).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,26 @@ public interface DeserializationFailureHandler<T> {
*/
String DESERIALIZATION_FAILURE_DATA = "deserialization-failure-data";

/**
* Header name passing the key data that was not able to be deserialized.
*/
String DESERIALIZATION_FAILURE_KEY_DATA = "deserialization-failure-key-data";

/**
* Header name passing the value data that was not able to be deserialized.
*/
String DESERIALIZATION_FAILURE_VALUE_DATA = "deserialization-failure-value-data";

/**
* Header name passing the class name of the underlying deserializer.
*/
String DESERIALIZATION_FAILURE_DESERIALIZER = "deserialization-failure-deserializer";

/**
* Header name passing the class name of the underlying deserializer.
*/
String DESERIALIZATION_FAILURE_DLQ = "deserialization-failure-dlq";

byte[] TRUE_VALUE = "true".getBytes(StandardCharsets.UTF_8);

/**
Expand Down Expand Up @@ -111,6 +126,7 @@ static Headers addFailureDetailsToHeaders(String deserializer, String topic, boo

if (headers != null) {
headers.add(DESERIALIZATION_FAILURE_DESERIALIZER, deserializer.getBytes(StandardCharsets.UTF_8));
headers.add(DESERIALIZATION_FAILURE_DLQ, TRUE_VALUE);
headers.add(DESERIALIZATION_FAILURE_TOPIC, topic.getBytes(StandardCharsets.UTF_8));

if (isKey) {
Expand All @@ -124,6 +140,12 @@ static Headers addFailureDetailsToHeaders(String deserializer, String topic, boo
headers.add(DESERIALIZATION_FAILURE_CAUSE, cause.getBytes(StandardCharsets.UTF_8));
}
if (data != null) {
if (isKey) {
headers.add(DESERIALIZATION_FAILURE_KEY_DATA, data);
} else {
headers.add(DESERIALIZATION_FAILURE_VALUE_DATA, data);
}
// Do not break retro-compatibility
headers.add(DESERIALIZATION_FAILURE_DATA, data);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.smallrye.reactive.messaging.kafka.fault;

import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.addFailureDetailsToHeaders;

import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
Expand Down Expand Up @@ -114,6 +116,9 @@ private T wrapDeserialize(Supplier<T> deserialize, String topic, Headers headers
}
throw new KafkaException(e);
}
// insert failure details to headers
addFailureDetailsToHeaders(delegate.getClass().getName(), topic, handleKeys, headers, data, e);
// fallback to null
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.smallrye.reactive.messaging.kafka.fault;

import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_DLQ;
import static io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging.log;
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG;
Expand Down Expand Up @@ -91,9 +92,10 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,
(String) deadQueueProducerConfig.get(KEY_SERIALIZER_CLASS_CONFIG),
(String) deadQueueProducerConfig.get(VALUE_SERIALIZER_CLASS_CONFIG));

var dlqSerializationHandler = new KafkaDeadLetterSerializationHandler<>();
// fire producer event (e.g. bind metrics)
ReactiveKafkaProducer<Object, Object> producer = new ReactiveKafkaProducer<>(deadQueueProducerConfig,
deadQueueTopic, 10000, false, null, null, null,
deadQueueTopic, 10000, false, null, dlqSerializationHandler, dlqSerializationHandler,
(p, c) -> kafkaCDIEvents.producer().fire(p));

return new KafkaDeadLetterQueue(config.getChannel(), deadQueueTopic, producer, reportFailure);
Expand Down Expand Up @@ -153,6 +155,8 @@ public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> record, Throwable reaso
if (outgoing != null && outgoing.getHeaders() != null) {
outgoing.getHeaders().forEach(header -> dead.headers().add(header));
}
// remove DESERIALIZATION_FAILURE_DLQ header to prevent unconditional DQL in next consume
dead.headers().remove(DESERIALIZATION_FAILURE_DLQ);
log.messageNackedDeadLetter(channel, topic);
return producer.send(dead)
.onFailure().invoke(t -> reportFailure.accept((Throwable) t, true))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.smallrye.reactive.messaging.kafka.fault;

import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_DATA;
import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_DESERIALIZER;
import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_IS_KEY;
import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_KEY_DATA;
import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_VALUE_DATA;
import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.TRUE_VALUE;

import java.util.Arrays;
import java.util.Objects;

import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.SerializationFailureHandler;

public class KafkaDeadLetterSerializationHandler<T> implements SerializationFailureHandler<T> {

@Override
public byte[] decorateSerialization(Uni<byte[]> serialization, String topic, boolean isKey, String serializer, T data,
Headers headers) {
// deserializer failure
if (headers.lastHeader(DESERIALIZATION_FAILURE_DESERIALIZER) != null) {
// data exists
Header dataHeader = headers.lastHeader(DESERIALIZATION_FAILURE_DATA);
if (dataHeader != null) {
// if this is the key serialization we look at the _KEY_DATA header
if (isKey) {
Header isKeyHeader = headers.lastHeader(DESERIALIZATION_FAILURE_IS_KEY);
if (isKeyHeader != null && Arrays.equals(isKeyHeader.value(), TRUE_VALUE)) {
Header keyDataHeader = headers.lastHeader(DESERIALIZATION_FAILURE_KEY_DATA);
// fallback to data header
return Objects.requireNonNullElse(keyDataHeader, dataHeader).value();
}
// if this is the value serialization we look at the _VALUE_DATA header
} else {
Header valueDataHeader = headers.lastHeader(DESERIALIZATION_FAILURE_VALUE_DATA);
// fallback to data header
return Objects.requireNonNullElse(valueDataHeader, dataHeader).value();
}
}
}
// call serialization
return SerializationFailureHandler.super.decorateSerialization(serialization, topic, isKey, serializer, data, headers);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.smallrye.reactive.messaging.kafka.fault;

import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_DLQ;
import static io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging.log;
import static io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer.createDeserializationFailureHandler;
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
Expand Down Expand Up @@ -118,9 +119,10 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,

log.delayedRetryTopic(config.getChannel(), retryTopics, maxRetries, retryTimeout, deadQueueTopic);

var dlqSerializationHandler = new KafkaDeadLetterSerializationHandler<>();
// fire producer event (e.g. bind metrics)
ReactiveKafkaProducer<Object, Object> producer = new ReactiveKafkaProducer<>(delayedRetryTopicProducerConfig,
retryTopics.get(0), 10000, false, null, null, null,
retryTopics.get(0), 10000, false, null, dlqSerializationHandler, dlqSerializationHandler,
(p, c) -> kafkaCDIEvents.producer().fire(p));

Map<String, Object> retryConsumerConfig = new HashMap<>(consumer.configuration());
Expand Down Expand Up @@ -269,6 +271,8 @@ public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> record, Throwable reaso
if (outgoing != null && outgoing.getHeaders() != null) {
outgoing.getHeaders().forEach(header -> retry.headers().add(header));
}
// remove DESERIALIZATION_FAILURE_DLQ header to prevent unconditional DQL in next consume
retry.headers().remove(DESERIALIZATION_FAILURE_DLQ);
log.delayedRetryNack(channel, topic);
return producer.send(retry)
.onFailure().invoke(t -> reportFailure.accept((Throwable) t, true))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.smallrye.reactive.messaging.kafka.impl;

import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_DLQ;
import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_REASON;
import static io.smallrye.reactive.messaging.kafka.i18n.KafkaExceptions.ex;
import static io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging.log;
import static io.smallrye.reactive.messaging.kafka.impl.RebalanceListeners.findMatchingListener;
Expand All @@ -20,6 +22,8 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.header.Header;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Multi;
Expand All @@ -34,6 +38,7 @@
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.commit.ContextHolder;
import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler;
import io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue;
import io.smallrye.reactive.messaging.kafka.fault.KafkaDelayedRetryTopic;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
import io.smallrye.reactive.messaging.kafka.health.KafkaSourceHealth;
Expand Down Expand Up @@ -161,6 +166,16 @@ public KafkaSource(Vertx vertx,
Multi<IncomingKafkaRecord<K, V>> incomingMulti = multi.onItem().transformToUni(rec -> {
IncomingKafkaRecord<K, V> record = new IncomingKafkaRecord<>(rec, channel, index, commitHandler,
failureHandler, isCloudEventEnabled, isTracingEnabled);
if ((failureHandler instanceof KafkaDeadLetterQueue)
&& rec.headers() != null
&& rec.headers().lastHeader(DESERIALIZATION_FAILURE_DLQ) != null) {
Header reasonMsgHeader = rec.headers().lastHeader(DESERIALIZATION_FAILURE_REASON);
String message = reasonMsgHeader != null ? new String(reasonMsgHeader.value()) : null;
RecordDeserializationException reason = new RecordDeserializationException(
TopicPartitions.getTopicPartition(record), record.getOffset(), message, null);
return failureHandler.handle(record, reason, record.getMetadata())
.onItem().transform(ignore -> null);
}
return commitHandler.received(record);
}).concatenate();

Expand Down
Loading

0 comments on commit f077a45

Please sign in to comment.