Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for idempotent producer fatal errors, triggered after a possibly persisted message state #4438

Merged
merged 9 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 30 additions & 15 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ librdkafka v2.3.0 is a feature release:
* [KIP-430](https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses):
Return authorized operations in Describe Responses.
(#4240, @jainruchir).
* [KIP-580](https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients): Added Exponential Backoff mechanism for
retriable requests with `retry.backoff.ms` as minimum backoff and `retry.backoff.max.ms` as the
maximum backoff, with 20% jitter(#4422).
* Fixed ListConsumerGroupOffsets not fetching offsets for all the topics in a group with Apache Kafka version below 2.4.0.
* Add missing destroy that leads to leaking partition structure memory when there
are partition leader changes and a stale leader epoch is received (#4429).
Expand All @@ -30,6 +33,20 @@ librdkafka v2.3.0 is a feature release:
don't cause an offset reset (#4447).
* Fix to ensure max.poll.interval.ms is reset when rd_kafka_poll is called with
consume_cb (#4431).
* Fix for idempotent producer fatal errors, triggered after a possibly persisted message state (#4438).


## Upgrade considerations

* `retry.backoff.ms`:
If it is set greater than `retry.backoff.max.ms` which has the default value of 1000 ms then it is assumes the value of `retry.backoff.max.ms`.
To change this behaviour make sure that `retry.backoff.ms` is always less than `retry.backoff.max.ms`.
If equal then the backoff will be linear instead of exponential.

* `topic.metadata.refresh.fast.interval.ms`:
If it is set greater than `retry.backoff.max.ms` which has the default value of 1000 ms then it is assumes the value of `retry.backoff.max.ms`.
To change this behaviour make sure that `topic.metadata.refresh.fast.interval.ms` is always less than `retry.backoff.max.ms`.
If equal then the backoff will be linear instead of exponential.


## Fixes
Expand All @@ -40,6 +57,18 @@ librdkafka v2.3.0 is a feature release:
rack information on 32bit architectures.
Solved by aligning all allocations to the maximum allowed word size (#4449).

### Idempotent producer fixes

* After a possibly persisted error, such as a disconnection or a timeout, next expected sequence
used to increase, leading to a fatal error if the message wasn't persisted and
the second one in queue failed with an `OUT_OF_ORDER_SEQUENCE_NUMBER`.
The error could contain the message "sequence desynchronization" with
just one possibly persisted error or "rewound sequence number" in case of
multiple errored messages.
Solved by treating the possible persisted message as _not_ persisted,
and expecting a `DUPLICATE_SEQUENCE_NUMBER` error in case it was or
`NO_ERROR` in case it wasn't, in both cases the message will be considered
delivered (#4438).

### Consumer fixes

Expand All @@ -65,18 +94,6 @@ librdkafka v2.3.0 is a feature release:
Solved by moving the `max.poll.interval.ms` check into `rd_kafka_q_serve` (#4431).


## Upgrade considerations

* `retry.backoff.ms`:
If it is set greater than `retry.backoff.max.ms` which has the default value of 1000 ms then it is assumes the value of `retry.backoff.max.ms`.
To change this behaviour make sure that `retry.backoff.ms` is always less than `retry.backoff.max.ms`.
If equal then the backoff will be linear instead of exponential.

* `topic.metadata.refresh.fast.interval.ms`:
If it is set greater than `retry.backoff.max.ms` which has the default value of 1000 ms then it is assumes the value of `retry.backoff.max.ms`.
To change this behaviour make sure that `topic.metadata.refresh.fast.interval.ms` is always less than `retry.backoff.max.ms`.
If equal then the backoff will be linear instead of exponential.


# librdkafka v2.2.0

Expand Down Expand Up @@ -106,9 +123,7 @@ librdkafka v2.2.0 is a feature release:
* [KIP-339](https://cwiki.apache.org/confluence/display/KAFKA/KIP-339%3A+Create+a+new+IncrementalAlterConfigs+API):
IncrementalAlterConfigs API (started by @PrasanthV454, #4110).
* [KIP-554](https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API): Add Broker-side SCRAM Config API (#4241).
* [KIP-580](https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients): Added Exponential Backoff mechanism for
retriable requests with `retry.backoff.ms` as minimum backoff and `retry.backoff.max.ms` as the
maximum backoff, with 20% jitter(#4422).


## Enhancements

Expand Down
19 changes: 7 additions & 12 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -3499,17 +3499,12 @@ static int rd_kafka_handle_Produce_error(rd_kafka_broker_t *rkb,
* which should not be treated as a fatal error
* since this request and sub-sequent requests
* will be retried and thus return to order.
* Unless the error was a timeout, or similar,
* in which case the request might have made it
* and the messages are considered possibly persisted:
* in this case we allow the next in-flight response
* to be successful, in which case we mark
* this request's messages as succesfully delivered. */
if (perr->status &
RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED)
perr->update_next_ack = rd_true;
else
perr->update_next_ack = rd_false;
* In case the message is possibly persisted
* we still treat it as not persisted,
* expecting DUPLICATE_SEQUENCE_NUMBER
* in case it was persisted or NO_ERROR in case
* it wasn't. */
perr->update_next_ack = rd_false;
perr->update_next_err = rd_true;

/* Drain outstanding requests so that retries
Expand Down Expand Up @@ -3790,7 +3785,7 @@ static void rd_kafka_msgbatch_handle_Produce_result(
.err = err,
.incr_retry = 1,
.status = status,
.update_next_ack = rd_true,
.update_next_ack = rd_false,
.update_next_err = rd_true,
.last_seq = (batch->first_seq +
rd_kafka_msgq_len(&batch->msgq) - 1)};
Expand Down
Loading