Skip to content

Commit

Permalink
KAFKA-16507 Add KeyDeserializationException and ValueDeserializationE…
Browse files Browse the repository at this point in the history
…xception with record content (apache#15691)

Implements KIP-1036.

Add raw ConsumerRecord data to RecordDeserialisationException to make DLQ implementation easier.

Reviewers: Kirk True <ktrue@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Matthias J. Sax <matthias@confluent.io>
  • Loading branch information
fred-ro authored May 28, 2024
1 parent 4d04eb8 commit 4eb60b5
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 19 deletions.
5 changes: 5 additions & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@
<allow pkg="org.apache.kafka.common" />
</subpackage>

<subpackage name="errors">
<allow class="org.apache.kafka.common.header.Headers" />
<allow class="org.apache.kafka.common.record.TimestampType" />
</subpackage>

</subpackage>

<subpackage name="clients">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.RecordDeserializationException.DeserializationExceptionOrigin;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
Expand Down Expand Up @@ -311,25 +312,39 @@ <K, V> ConsumerRecord<K, V> parseRecord(Deserializers<K, V> deserializers,
Optional<Integer> leaderEpoch,
TimestampType timestampType,
Record record) {
ByteBuffer keyBytes = record.key();
ByteBuffer valueBytes = record.value();
Headers headers = new RecordHeaders(record.headers());
K key;
V value;
try {
long offset = record.offset();
long timestamp = record.timestamp();
Headers headers = new RecordHeaders(record.headers());
ByteBuffer keyBytes = record.key();
K key = keyBytes == null ? null : deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
ByteBuffer valueBytes = record.value();
V value = valueBytes == null ? null : deserializers.valueDeserializer.deserialize(partition.topic(), headers, valueBytes);
return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
timestamp, timestampType,
keyBytes == null ? ConsumerRecord.NULL_SIZE : keyBytes.remaining(),
valueBytes == null ? ConsumerRecord.NULL_SIZE : valueBytes.remaining(),
key, value, headers, leaderEpoch);
key = keyBytes == null ? null : deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
} catch (RuntimeException e) {
log.error("Deserializers with error: {}", deserializers);
throw new RecordDeserializationException(partition, record.offset(),
"Error deserializing key/value for partition " + partition +
" at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", e);
log.error("Key Deserializers with error: {}", deserializers);
throw newRecordDeserializationException(DeserializationExceptionOrigin.KEY, partition, timestampType, record, e, headers);
}
try {
value = valueBytes == null ? null : deserializers.valueDeserializer.deserialize(partition.topic(), headers, valueBytes);
} catch (RuntimeException e) {
log.error("Value Deserializers with error: {}", deserializers);
throw newRecordDeserializationException(DeserializationExceptionOrigin.VALUE, partition, timestampType, record, e, headers);
}
return new ConsumerRecord<>(partition.topic(), partition.partition(), record.offset(),
record.timestamp(), timestampType,
keyBytes == null ? ConsumerRecord.NULL_SIZE : keyBytes.remaining(),
valueBytes == null ? ConsumerRecord.NULL_SIZE : valueBytes.remaining(),
key, value, headers, leaderEpoch);
}

private static RecordDeserializationException newRecordDeserializationException(DeserializationExceptionOrigin origin,
TopicPartition partition,
TimestampType timestampType,
Record record,
RuntimeException e,
Headers headers) {
return new RecordDeserializationException(origin, partition, record.offset(), record.timestamp(), timestampType, record.key(), record.value(), headers,
"Error deserializing " + origin.name() + " for partition " + partition + " at offset " + record.offset()
+ ". If needed, please seek past the record to continue consumption.", e);
}

private Optional<Integer> maybeLeaderEpoch(int leaderEpoch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,74 @@
*/
package org.apache.kafka.common.errors;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;

import java.nio.ByteBuffer;

/**
* This exception is raised for any error that occurs while deserializing records received by the consumer using
* the configured {@link org.apache.kafka.common.serialization.Deserializer}.
*/
public class RecordDeserializationException extends SerializationException {

private static final long serialVersionUID = 1L;
private static final long serialVersionUID = 2L;

public enum DeserializationExceptionOrigin {
KEY,
VALUE
}

private final DeserializationExceptionOrigin origin;
private final TopicPartition partition;
private final long offset;
private final TimestampType timestampType;
private final long timestamp;
private final ByteBuffer keyBuffer;
private final ByteBuffer valueBuffer;
private final Headers headers;

public RecordDeserializationException(TopicPartition partition, long offset, String message, Throwable cause) {
@Deprecated
public RecordDeserializationException(TopicPartition partition,
long offset,
String message,
Throwable cause) {
super(message, cause);
this.origin = null;
this.partition = partition;
this.offset = offset;
this.timestampType = TimestampType.NO_TIMESTAMP_TYPE;
this.timestamp = ConsumerRecord.NO_TIMESTAMP;
this.keyBuffer = null;
this.valueBuffer = null;
this.headers = null;
}

public RecordDeserializationException(DeserializationExceptionOrigin origin,
TopicPartition partition,
long offset,
long timestamp,
TimestampType timestampType,
ByteBuffer keyBuffer,
ByteBuffer valueBuffer,
Headers headers,
String message,
Throwable cause) {
super(message, cause);
this.origin = origin;
this.offset = offset;
this.timestampType = timestampType;
this.timestamp = timestamp;
this.partition = partition;
this.keyBuffer = keyBuffer;
this.valueBuffer = valueBuffer;
this.headers = headers;
}

public DeserializationExceptionOrigin origin() {
return origin;
}

public TopicPartition topicPartition() {
Expand All @@ -41,4 +93,24 @@ public TopicPartition topicPartition() {
public long offset() {
return offset;
}

public TimestampType timestampType() {
return timestampType;
}

public long timestamp() {
return timestamp;
}

public ByteBuffer keyBuffer() {
return keyBuffer;
}

public ByteBuffer valueBuffer() {
return valueBuffer;
}

public Headers headers() {
return headers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
Expand All @@ -41,14 +43,17 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.UUID;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class CompletedFetchTest {
Expand Down Expand Up @@ -161,6 +166,10 @@ public void testCorruptedMessage() {
final UUIDSerializer serializer = new UUIDSerializer()) {
builder.append(new SimpleRecord(serializer.serialize(TOPIC_NAME, UUID.randomUUID())));
builder.append(0L, "key".getBytes(), "value".getBytes());
builder.append(new SimpleRecord(serializer.serialize(TOPIC_NAME, UUID.randomUUID())));
Headers headers = new RecordHeaders();
headers.add("hkey", "hvalue".getBytes());
builder.append(10L, serializer.serialize("key", UUID.randomUUID()), "otherValue".getBytes(), headers.toArray());
Records records = builder.build();

FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData()
Expand All @@ -176,8 +185,29 @@ public void testCorruptedMessage() {

completedFetch.fetchRecords(fetchConfig, deserializers, 10);

assertThrows(RecordDeserializationException.class,
RecordDeserializationException thrown = assertThrows(RecordDeserializationException.class,
() -> completedFetch.fetchRecords(fetchConfig, deserializers, 10));
assertEquals(RecordDeserializationException.DeserializationExceptionOrigin.KEY, thrown.origin());
assertEquals(1, thrown.offset());
assertEquals(TOPIC_NAME, thrown.topicPartition().topic());
assertEquals(0, thrown.topicPartition().partition());
assertEquals(0, thrown.timestamp());
assertArrayEquals("key".getBytes(), Utils.toNullableArray(thrown.keyBuffer()));
assertArrayEquals("value".getBytes(), Utils.toNullableArray(thrown.valueBuffer()));
assertEquals(0, thrown.headers().toArray().length);

CompletedFetch completedFetch2 = newCompletedFetch(2, partitionData);
completedFetch2.fetchRecords(fetchConfig, deserializers, 10);
RecordDeserializationException valueThrown = assertThrows(RecordDeserializationException.class,
() -> completedFetch2.fetchRecords(fetchConfig, deserializers, 10));
assertEquals(RecordDeserializationException.DeserializationExceptionOrigin.VALUE, valueThrown.origin());
assertEquals(3, valueThrown.offset());
assertEquals(TOPIC_NAME, valueThrown.topicPartition().topic());
assertEquals(0, valueThrown.topicPartition().partition());
assertEquals(10L, valueThrown.timestamp());
assertNotNull(valueThrown.keyBuffer());
assertArrayEquals("otherValue".getBytes(), Utils.toNullableArray(valueThrown.valueBuffer()));
assertEquals(headers, valueThrown.headers());
}
}
}
Expand Down

0 comments on commit 4eb60b5

Please sign in to comment.