diff --git a/cdc/sink/producer/kafka/kafka.go b/cdc/sink/producer/kafka/kafka.go index 872b51a2276..7d435851979 100644 --- a/cdc/sink/producer/kafka/kafka.go +++ b/cdc/sink/producer/kafka/kafka.go @@ -388,7 +388,10 @@ func newSaramaConfig(ctx context.Context, c Config) (*sarama.Config, error) { return nil, errors.Trace(err) } config.Version = version - config.Metadata.Retry.Max = 20 + // See: https://kafka.apache.org/documentation/#replication + // When one of the brokers in a Kafka cluster is down, the partition leaders in this broker is broken, Kafka will election a new partition leader and replication logs, this process will last from a few seconds to a few minutes. Kafka cluster will not provide a writing service in this process. + // Time out in one minute(120 * 500ms). + config.Metadata.Retry.Max = 120 config.Metadata.Retry.Backoff = 500 * time.Millisecond config.Producer.Partitioner = sarama.NewManualPartitioner @@ -413,10 +416,12 @@ func newSaramaConfig(ctx context.Context, c Config) (*sarama.Config, error) { config.Producer.Compression = sarama.CompressionNone } - config.Producer.Retry.Max = 20 + // Time out in five minutes(600 * 500ms). + config.Producer.Retry.Max = 600 config.Producer.Retry.Backoff = 500 * time.Millisecond - config.Admin.Retry.Max = 10000 + // Time out in one minute(120 * 500ms). + config.Admin.Retry.Max = 120 config.Admin.Retry.Backoff = 500 * time.Millisecond config.Admin.Timeout = 20 * time.Second