Skip to content

Commit

Permalink
GH-2671: Improve DLQ Exception Message Header
Browse files Browse the repository at this point in the history
Resolves #2671

With Spring Framework 6, `NestedRuntimeException.getMessage()` no longer
includes the messages for nested exceptions via the cause chain.

See spring-projects/spring-framework#25162

This means that the DLQ exception message header always contains the same
`Failed listener` message.

Find the root cause exception and include its message in the header.
Ignore any nested `TimestampedException` and `LEFE` between the top
`LEFE` and the root cause; these will still appear in the stack trace.
  • Loading branch information
garyrussell authored May 2, 2023
1 parent c900185 commit 6f58505
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -762,17 +762,18 @@ private void addExceptionInfoHeaders(Headers kafkaHeaders, Exception exception,
isKey ? names.exceptionInfo.keyExceptionFqcn : names.exceptionInfo.exceptionFqcn,
() -> exception.getClass().getName().getBytes(StandardCharsets.UTF_8),
HeaderNames.HeadersToAdd.EXCEPTION);
if (exception.getCause() != null) {
Exception cause = ErrorHandlingUtils.findRootCause(exception);
if (cause != null) {
appendOrReplace(kafkaHeaders,
names.exceptionInfo.exceptionCauseFqcn,
() -> exception.getCause().getClass().getName().getBytes(StandardCharsets.UTF_8),
() -> cause.getClass().getName().getBytes(StandardCharsets.UTF_8),
HeaderNames.HeadersToAdd.EX_CAUSE);
}
String message = exception.getMessage();
String message = buildMessage(exception, cause);
if (message != null) {
appendOrReplace(kafkaHeaders,
isKey ? names.exceptionInfo.keyExceptionMessage : names.exceptionInfo.exceptionMessage,
() -> exception.getMessage().getBytes(StandardCharsets.UTF_8),
() -> message.getBytes(StandardCharsets.UTF_8),
HeaderNames.HeadersToAdd.EX_MSG);
}
appendOrReplace(kafkaHeaders,
Expand All @@ -781,6 +782,26 @@ private void addExceptionInfoHeaders(Headers kafkaHeaders, Exception exception,
HeaderNames.HeadersToAdd.EX_STACKTRACE);
}

@Nullable
private String buildMessage(Exception exception, Throwable cause) {
String message = exception.getMessage();
if (!exception.equals(cause)) {
if (message != null) {
message = message + "; ";
}
String causeMsg = cause.getMessage();
if (causeMsg != null) {
if (message != null) {
message = message + causeMsg;
}
else {
message = causeMsg;
}
}
}
return message;
}

private void appendOrReplace(Headers headers, String header, Supplier<byte[]> valueSupplier,
HeaderNames.HeadersToAdd hta) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,4 +212,22 @@ public static Exception unwrapIfNeeded(Exception exception) {
return theEx;
}

/**
* Find the root cause, ignoring any {@link ListenerExecutionFailedException} and
* {@link TimestampedException}.
* @param exception the exception to examine.
* @return the root cause.
* @since 3.0.7
*/
public static Exception findRootCause(Exception exception) {
Exception realException = exception;
while ((realException instanceof ListenerExecutionFailedException
|| realException instanceof TimestampedException)
&& realException.getCause() instanceof Exception cause) {

realException = cause;
}
return realException;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,7 @@ public boolean recovered(ConsumerRecord<?, ?> record, Exception exception,
private FailedRecord getFailedRecordInstance(ConsumerRecord<?, ?> record, Exception exception,
Map<TopicPartition, FailedRecord> map, TopicPartition topicPartition) {

Exception realException = exception;
while ((realException instanceof ListenerExecutionFailedException
|| realException instanceof TimestampedException)
&& realException.getCause() instanceof Exception) {

realException = (Exception) realException.getCause();
}
Exception realException = ErrorHandlingUtils.findRootCause(exception);
FailedRecord failedRecord = map.get(topicPartition);
if (failedRecord == null || failedRecord.getOffset() != record.offset()
|| (this.resetStateOnExceptionChange
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ void headersNotStripped() {
headers = captor.getValue().headers();
assertThat(headers.lastHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)).isNotNull();
assertThat(headers.lastHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER)).isNotNull();
assertThat(headers.lastHeader(KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE).value()).isEqualTo("testK".getBytes());
assertThat(new String(headers.lastHeader(KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE).value())).isEqualTo("testK");
assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE).value()).isEqualTo("testV".getBytes());
}

Expand Down Expand Up @@ -399,7 +399,8 @@ void appendOriginalHeaders() {
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setAppendOriginalHeaders(true);
recoverer.setStripPreviousExceptionHeaders(false);
recoverer.accept(record, new RuntimeException(new IllegalStateException()));
recoverer.accept(record, new ListenerExecutionFailedException("Listener failed",
new TimestampedException(new RuntimeException("ex1 msg", new IllegalStateException()))));
ArgumentCaptor<ProducerRecord> producerRecordCaptor = ArgumentCaptor.forClass(ProducerRecord.class);
then(template).should(times(1)).send(producerRecordCaptor.capture());
Headers headers = producerRecordCaptor.getValue().headers();
Expand All @@ -412,11 +413,15 @@ void appendOriginalHeaders() {
Header firstExceptionCauseType = headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN);
Header firstExceptionMessage = headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE);
Header firstExceptionStackTrace = headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE);
assertThat(new String(firstExceptionMessage.value())).isEqualTo("Listener failed; ex1 msg");
assertThat(new String(firstExceptionType.value())).isEqualTo(ListenerExecutionFailedException.class.getName());
assertThat(new String(firstExceptionCauseType.value())).isEqualTo(RuntimeException.class.getName());

ConsumerRecord<String, String> anotherRecord = new ConsumerRecord<>("bar", 1, 12L, 4321L,
TimestampType.LOG_APPEND_TIME, 321, 321, "bar", null, new RecordHeaders(), Optional.empty());
headers.forEach(header -> anotherRecord.headers().add(header));
recoverer.accept(anotherRecord, new RuntimeException(new IllegalStateException()));
recoverer.accept(anotherRecord, new ListenerExecutionFailedException("Listener failed",
new TimestampedException(new RuntimeException("ex2 msg", new IllegalStateException()))));
ArgumentCaptor<ProducerRecord> anotherProducerRecordCaptor = ArgumentCaptor.forClass(ProducerRecord.class);
then(template).should(times(2)).send(anotherProducerRecordCaptor.capture());
Headers anotherHeaders = anotherProducerRecordCaptor.getAllValues().get(1).headers();
Expand All @@ -436,6 +441,8 @@ void appendOriginalHeaders() {
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN))
.isNotSameAs(firstExceptionCauseType);
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE)).isNotSameAs(firstExceptionMessage);
assertThat(new String(anotherHeaders.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE).value()))
.isEqualTo("Listener failed; ex2 msg");
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE))
.isNotSameAs(firstExceptionStackTrace);
Iterator<Header> exceptionHeaders = anotherHeaders.headers(KafkaHeaders.DLT_EXCEPTION_FQCN).iterator();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2022 the original author or authors.
* Copyright 2017-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -751,8 +751,8 @@ public void accept(ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, Excepti
.contains("ListenerExecutionFailedException");
assertThat(new String(headers.get(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN, byte[].class)))
.isEqualTo("java.lang.RuntimeException");
assertThat(headers.get(KafkaHeaders.DLT_EXCEPTION_MESSAGE, byte[].class))
.contains("Listener failed".getBytes());
assertThat(new String(headers.get(KafkaHeaders.DLT_EXCEPTION_MESSAGE, byte[].class)))
.contains("Listener failed; fail for max failures");
assertThat(headers.get(KafkaHeaders.DLT_EXCEPTION_STACKTRACE)).isNotNull();
assertThat(headers.get(KafkaHeaders.DLT_EXCEPTION_STACKTRACE, byte[].class))
.contains("fail for max failures".getBytes());
Expand Down

0 comments on commit 6f58505

Please sign in to comment.