Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for idempotent producer fatal errors, triggered after a possibly persisted message state #4438

Merged
merged 9 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,23 @@ librdkafka v2.2.1 is a maintenance release:

* Added Topic id to the metadata response which is part of the [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers)
* Fixed ListConsumerGroupOffsets not fetching offsets for all the topics in a group with Apache Kafka version below 2.4.0.
* Fix for idempotent producer fatal errors, triggered after a possibly persisted message state (#4438).


## Fixes

### Idempotent producer fixes

* After a possibly persisted error, such as a disconnection or a timeout, next expected sequence
was increased, leading to a fatal error if the message wasn't persisted and
emasab marked this conversation as resolved.
Show resolved Hide resolved
the second one in queue failed with an `OUT_OF_ORDER_SEQUENCE_NUMBER`.
The error could contain the message "sequence desynchronization" with
just one possibly persisted error or "rewound sequence number" in case of
multiple errored messages.
Solved by treating the possible persisted message as _not_ persisted,
and expecting a `DUPLICATE_SEQUENCE_NUMBER` error in case it was or
`NO_ERROR` in case it wasn't, in both cases the message will be considered
delivered.



Expand Down
20 changes: 9 additions & 11 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -3343,17 +3343,15 @@ static int rd_kafka_handle_Produce_error(rd_kafka_broker_t *rkb,
* which should not be treated as a fatal error
* since this request and sub-sequent requests
* will be retried and thus return to order.
* Unless the error was a timeout, or similar,
* in which case the request might have made it
* and the messages are considered possibly persisted:
* in this case we allow the next in-flight response
* to be successful, in which case we mark
* this request's messages as succesfully delivered. */
if (perr->status &
RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED)
perr->update_next_ack = rd_true;
else
perr->update_next_ack = rd_false;
* In case the message is possibly persisted
* we still treat it as not persisted,
* expecting DUPLICATE_SEQUENCE_NUMBER
* in case it was persisted or NO_ERROR in case
* it wasn't. Increasing next expected sequence
* prematurely can lead to fatal errors in case
* it wasn't persisted and the second one to retry
* fails with OUT_OF_ORDER_SEQUENCE_NUMBER. */
perr->update_next_ack = rd_false;
emasab marked this conversation as resolved.
Show resolved Hide resolved
perr->update_next_err = rd_true;

/* Drain outstanding requests so that retries
Expand Down
339 changes: 339 additions & 0 deletions tests/0143-idempotence_mock.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,339 @@
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2023, Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include "test.h"

#include "../src/rdkafka_proto.h"

#include <stdarg.h>


/**
* @name Idempotent producer tests using the mock cluster
*
*/


static int allowed_error;

/**
* @brief Decide what error_cb's will cause the test to fail.
*/
static int
error_is_fatal_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, const char *reason) {
if (err == allowed_error ||
/* If transport errors are allowed then it is likely
* that we'll also see ALL_BROKERS_DOWN. */
(allowed_error == RD_KAFKA_RESP_ERR__TRANSPORT &&
err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN)) {
TEST_SAY("Ignoring allowed error: %s: %s\n",
rd_kafka_err2name(err), reason);
return 0;
}
return 1;
}


static rd_kafka_resp_err_t (*on_response_received_cb)(rd_kafka_t *rk,
int sockfd,
const char *brokername,
int32_t brokerid,
int16_t ApiKey,
int16_t ApiVersion,
int32_t CorrId,
size_t size,
int64_t rtt,
rd_kafka_resp_err_t err,
void *ic_opaque);

/**
* @brief Simple on_response_received interceptor that simply calls the
* sub-test's on_response_received_cb function, if set.
*/
static rd_kafka_resp_err_t
on_response_received_trampoline(rd_kafka_t *rk,
int sockfd,
const char *brokername,
int32_t brokerid,
int16_t ApiKey,
int16_t ApiVersion,
int32_t CorrId,
size_t size,
int64_t rtt,
rd_kafka_resp_err_t err,
void *ic_opaque) {
TEST_ASSERT(on_response_received_cb != NULL, "");
return on_response_received_cb(rk, sockfd, brokername, brokerid, ApiKey,
ApiVersion, CorrId, size, rtt, err,
ic_opaque);
}


/**
* @brief on_new interceptor to add an on_response_received interceptor.
*/
static rd_kafka_resp_err_t on_new_producer(rd_kafka_t *rk,
const rd_kafka_conf_t *conf,
void *ic_opaque,
char *errstr,
size_t errstr_size) {
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;

if (on_response_received_cb)
err = rd_kafka_interceptor_add_on_response_received(
rk, "on_response_received", on_response_received_trampoline,
ic_opaque);

return err;
}


/**
* @brief Create an idempotent producer and a mock cluster.
*
* The var-arg list is a NULL-terminated list of
* (const char *key, const char *value) config properties.
*
* Special keys:
* "on_response_received", "" - enable the on_response_received_cb
* interceptor,
* which must be assigned prior to
* calling create_tnx_producer().
*/
static RD_SENTINEL rd_kafka_t *
create_idempo_producer(rd_kafka_mock_cluster_t **mclusterp,
int broker_cnt,
...) {
rd_kafka_conf_t *conf;
rd_kafka_t *rk;
char numstr[8];
va_list ap;
const char *key;
rd_bool_t add_interceptors = rd_false;

rd_snprintf(numstr, sizeof(numstr), "%d", broker_cnt);

test_conf_init(&conf, NULL, 60);

test_conf_set(conf, "enable.idempotence", "true");
/* When mock brokers are set to down state they're still binding
* the port, just not listening to it, which makes connection attempts
* stall until socket.connection.setup.timeout.ms expires.
* To speed up detection of brokers being down we reduce this timeout
* to just a couple of seconds. */
test_conf_set(conf, "socket.connection.setup.timeout.ms", "5000");
/* Speed up reconnects */
test_conf_set(conf, "reconnect.backoff.max.ms", "2000");
test_conf_set(conf, "test.mock.num.brokers", numstr);
rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);

test_curr->ignore_dr_err = rd_false;

va_start(ap, broker_cnt);
while ((key = va_arg(ap, const char *))) {
if (!strcmp(key, "on_response_received")) {
add_interceptors = rd_true;
(void)va_arg(ap, const char *);
} else {
test_conf_set(conf, key, va_arg(ap, const char *));
}
}
va_end(ap);

/* Add an on_.. interceptors */
if (add_interceptors)
rd_kafka_conf_interceptor_add_on_new(conf, "on_new_producer",
on_new_producer, NULL);

rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

if (mclusterp) {
*mclusterp = rd_kafka_handle_mock_cluster(rk);
TEST_ASSERT(*mclusterp, "failed to create mock cluster");

/* Create some of the common consumer "input" topics
* that we must be able to commit to with
* send_offsets_to_transaction().
* The number depicts the number of partitions in the topic. */
TEST_CALL_ERR__(
rd_kafka_mock_topic_create(*mclusterp, "srctopic4", 4, 1));
TEST_CALL_ERR__(rd_kafka_mock_topic_create(
*mclusterp, "srctopic64", 64, 1));
}

return rk;
}

/**
* @brief A possibly persisted error should treat the message as not persisted,
* avoid increasing next expected sequence an causing a possible fatal
* error.
* n = 1 triggered the "sequence desynchronization" fatal
* error, n > 1 triggered the "rewound sequence number" fatal error.
* See #3584.
*
* @param n Number of messages (1 to 5) to send before disconnection. These
* will fail with a possibly persisted error,
* rest will be sent before reconnecting.
*
*/
static void
do_test_idempo_possibly_persisted_not_causing_fatal_error(size_t n) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
size_t i;
int remains = 0;

SUB_TEST_QUICK();

rk = create_idempo_producer(&mcluster, 1, "batch.num.messages", "1",
"linger.ms", "0", NULL);
test_curr->ignore_dr_err = rd_true;
test_curr->is_fatal_cb = error_is_fatal_cb;
/* Only allow an error from the disconnection below. */
allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;

/* Produce 5 messages without error first, msgids 1->5. */
test_produce_msgs2(rk, "mytopic", 0, 0, 0, 5, NULL, 64);
rd_kafka_flush(rk, -1);

/* First sequence is for the immediately produced reply,
* never delivered because of the disconnection. */
emasab marked this conversation as resolved.
Show resolved Hide resolved
for (i = 0; i < n; i++) {
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, 1, RD_KAFKAP_Produce, 1,
RD_KAFKA_RESP_ERR_NO_ERROR, 750);
}
/* After disconnection: first message fails with NOT_ENOUGH_REPLICAS,
* rest with OUT_OF_ORDER_SEQUENCE_NUMBER. */
for (i = 0; i < 5; i++) {
if (i == 0) {
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, 1, RD_KAFKAP_Produce, 1,
RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS, 750);
} else {
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, 1, RD_KAFKAP_Produce, 1,
RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER, 1);
}
}

/* Produce n messages that will be retried, msgids 6->(6+n-1). */
test_produce_msgs2_nowait(rk, "mytopic", 0, 0, 0, n, NULL, 64,
&remains);

/* Wait that messages are sent, then set it down and up again.
* Causing a "possibly persisted" error that increased next_ack,
* with subsequent fatal error when retrying the same sequences. */
emasab marked this conversation as resolved.
Show resolved Hide resolved
rd_usleep(250000, 0);
rd_kafka_mock_broker_set_down(mcluster, 1);
rd_usleep(250000, 0);

/* Produce rest of (5 - n) messages that will enqueued
* after retried ones, msgids (6+n)->10. */
if (n < 5)
test_produce_msgs2_nowait(rk, "mytopic", 0, 0, 0, 5 - n, NULL,
64, &remains);

rd_kafka_mock_broker_set_up(mcluster, 1);

/* All done, producer recovers without fatal errors. */
rd_kafka_flush(rk, -1);
rd_kafka_destroy(rk);

allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;

SUB_TEST_PASS();
}

/**
* @brief After a possibly persisted error that caused a retry, messages
* can fail with DUPLICATE_SEQUENCE_NUMBER or succeed and in both
* cases they'll be considered as persisted.
*/
static void
do_test_idempo_duplicate_sequence_number_after_possibly_persisted(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
int remains = 0;

SUB_TEST_QUICK();

rk = create_idempo_producer(&mcluster, 1, "batch.num.messages", "1",
"linger.ms", "0", NULL);
test_curr->ignore_dr_err = rd_true;
test_curr->is_fatal_cb = error_is_fatal_cb;
/* Only allow an error from the disconnection below. */
allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;

/* Produce 5 messages without error first, msgids 1-5. */
test_produce_msgs2(rk, "mytopic", 0, 0, 0, 5, NULL, 64);


/* Make sure first response comes after disconnection. */
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, 1, RD_KAFKAP_Produce, 5,
RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER, 500,
RD_KAFKA_RESP_ERR_NO_ERROR, 0, RD_KAFKA_RESP_ERR_NO_ERROR, 0,
RD_KAFKA_RESP_ERR_NO_ERROR, 0, RD_KAFKA_RESP_ERR_NO_ERROR, 0);

test_produce_msgs2_nowait(rk, "mytopic", 0, 0, 0, 5, NULL, 64,
&remains);

/* Let the message fail because of _TRANSPORT (possibly persisted). */
rd_kafka_mock_broker_set_down(mcluster, 1);

rd_usleep(250000, 0);

/* When retrying the first DUPLICATE_SEQUENCE_NUMBER is treated
* as NO_ERROR. */
rd_kafka_mock_broker_set_up(mcluster, 1);

/* All done. */
rd_kafka_flush(rk, -1);
rd_kafka_destroy(rk);

allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;

SUB_TEST_PASS();
}

int main_0143_idempotence_mock(int argc, char **argv) {
if (test_needs_auth()) {
TEST_SKIP("Mock cluster does not support SSL/SASL\n");
return 0;
}

int i;
for (i = 1; i <= 5; i++)
do_test_idempo_possibly_persisted_not_causing_fatal_error(i);

do_test_idempo_duplicate_sequence_number_after_possibly_persisted();

return 0;
}
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ set(
0139-offset_validation_mock.c
0140-commit_metadata.cpp
0142-reauthentication.c
0143-idempotence_mock.c
8000-idle.cpp
8001-fetch_from_follower_mock_manual.c
test.c
Expand Down
Loading