From f87d854725ddf9dcdb553f0f0b231dff87f26aed Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Tue, 11 Jun 2024 18:29:46 +0200 Subject: [PATCH 1/2] Fix for an idempotent producer error, with a message batch not reconstructed identically when retried Issues: #4736 Fix for an idempotent producer error, with a message batch not reconstructed identically when retried. Caused the error message "Local: Inconsistent state: Unable to reconstruct MessageSet". Happening on large batches. Solved by using the same backoff baseline for all messages in the batch. Happens since 2.2.0 --- CHANGELOG.md | 11 +++++++++++ src/rdkafka_partition.c | 12 ++++++++++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2de9f83148..ee207a7a20 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,8 @@ librdkafka v2.5.0 is a feature release. * Fix segfault when using long client id because of erased segment when using flexver. (#4689) +* Fix for an idempotent producer error, with a message batch not reconstructed + identically when retried (#) ## Enhancements @@ -21,6 +23,15 @@ librdkafka v2.5.0 is a feature release. Happens since 1.x when a portion of the buffer (segment) is erased for flexver or compression. More likely to happen since 2.1.0, because of the upgrades to flexver, with certain string sizes like a long client id (#4689). +### Idempotent producer fixes + + * Issues: #4736 + Fix for an idempotent producer error, with a message batch not reconstructed + identically when retried. Caused the error message "Local: Inconsistent state: Unable to reconstruct MessageSet". + Happening on large batches. Solved by using the same backoff baseline for all messages + in the batch. + Happens since 2.2.0 (#). + # librdkafka v2.4.0 diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index 49e6f76e6f..451d06eb08 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -896,6 +896,7 @@ int rd_kafka_retry_msgq(rd_kafka_msgq_t *destq, int retry_max_ms) { rd_kafka_msgq_t retryable = RD_KAFKA_MSGQ_INITIALIZER(retryable); rd_kafka_msg_t *rkm, *tmp; + rd_ts_t now; int64_t jitter = rd_jitter(100 - RD_KAFKA_RETRY_JITTER_PERCENT, 100 + RD_KAFKA_RETRY_JITTER_PERCENT); /* Scan through messages to see which ones are eligible for retry, @@ -903,7 +904,14 @@ int rd_kafka_retry_msgq(rd_kafka_msgq_t *destq, * set backoff time for first message and optionally * increase retry count for each message. * Sorted insert is not necessary since the original order - * srcq order is maintained. */ + * srcq order is maintained. + * + * Start timestamp for calculating backoff is common, + * to avoid that messages from the same batch + * have different backoff, as they need to be retried + * by reconstructing the same batch, when idempotency is + * enabled. */ + now = rd_clock(); TAILQ_FOREACH_SAFE(rkm, &srcq->rkmq_msgs, rkm_link, tmp) { if (rkm->rkm_u.producer.retries + incr_retry > max_retries) continue; @@ -927,7 +935,7 @@ int rd_kafka_retry_msgq(rd_kafka_msgq_t *destq, backoff = jitter * backoff * 10; if (backoff > retry_max_ms * 1000) backoff = retry_max_ms * 1000; - backoff = rd_clock() + backoff; + backoff = now + backoff; } rkm->rkm_u.producer.ts_backoff = backoff; From ea49df183fa22449c1e731942ffd31d6c5378a72 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Wed, 12 Jun 2024 10:16:51 +0200 Subject: [PATCH 2/2] Address comments --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ee207a7a20..e4dbc55350 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ librdkafka v2.5.0 is a feature release. * Fix segfault when using long client id because of erased segment when using flexver. (#4689) * Fix for an idempotent producer error, with a message batch not reconstructed - identically when retried (#) + identically when retried (#4750) ## Enhancements @@ -30,7 +30,7 @@ librdkafka v2.5.0 is a feature release. identically when retried. Caused the error message "Local: Inconsistent state: Unable to reconstruct MessageSet". Happening on large batches. Solved by using the same backoff baseline for all messages in the batch. - Happens since 2.2.0 (#). + Happens since 2.2.0 (#4750).