From e5d5d1377b01c0b9b165bb2fde9ec83023c55007 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Wed, 25 Nov 2020 15:59:29 +0800 Subject: [PATCH] kafka sink: increase the max retry of the producer of kafka (#1100) (#1118) Signed-off-by: ti-srebot --- cdc/sink/producer/kafka/kafka.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/cdc/sink/producer/kafka/kafka.go b/cdc/sink/producer/kafka/kafka.go index 872b51a2276..7d435851979 100644 --- a/cdc/sink/producer/kafka/kafka.go +++ b/cdc/sink/producer/kafka/kafka.go @@ -388,7 +388,10 @@ func newSaramaConfig(ctx context.Context, c Config) (*sarama.Config, error) { return nil, errors.Trace(err) } config.Version = version - config.Metadata.Retry.Max = 20 + // See: https://kafka.apache.org/documentation/#replication + // When one of the brokers in a Kafka cluster is down, the partition leaders in this broker is broken, Kafka will election a new partition leader and replication logs, this process will last from a few seconds to a few minutes. Kafka cluster will not provide a writing service in this process. + // Time out in one minute(120 * 500ms). + config.Metadata.Retry.Max = 120 config.Metadata.Retry.Backoff = 500 * time.Millisecond config.Producer.Partitioner = sarama.NewManualPartitioner @@ -413,10 +416,12 @@ func newSaramaConfig(ctx context.Context, c Config) (*sarama.Config, error) { config.Producer.Compression = sarama.CompressionNone } - config.Producer.Retry.Max = 20 + // Time out in five minutes(600 * 500ms). + config.Producer.Retry.Max = 600 config.Producer.Retry.Backoff = 500 * time.Millisecond - config.Admin.Retry.Max = 10000 + // Time out in one minute(120 * 500ms). + config.Admin.Retry.Max = 120 config.Admin.Retry.Backoff = 500 * time.Millisecond config.Admin.Timeout = 20 * time.Second