Skip to content

Commit

Permalink
kafka sink: increase the max retry of the producer of kafka (#1100) (#…
Browse files Browse the repository at this point in the history
…1118)

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
ti-srebot authored Nov 25, 2020
1 parent b181f9d commit e5d5d13
Showing 1 changed file with 8 additions and 3 deletions.
11 changes: 8 additions & 3 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,10 @@ func newSaramaConfig(ctx context.Context, c Config) (*sarama.Config, error) {
return nil, errors.Trace(err)
}
config.Version = version
config.Metadata.Retry.Max = 20
// See: https://kafka.apache.org/documentation/#replication
// When one of the brokers in a Kafka cluster is down, the partition leaders in this broker is broken, Kafka will election a new partition leader and replication logs, this process will last from a few seconds to a few minutes. Kafka cluster will not provide a writing service in this process.
// Time out in one minute(120 * 500ms).
config.Metadata.Retry.Max = 120
config.Metadata.Retry.Backoff = 500 * time.Millisecond

config.Producer.Partitioner = sarama.NewManualPartitioner
Expand All @@ -413,10 +416,12 @@ func newSaramaConfig(ctx context.Context, c Config) (*sarama.Config, error) {
config.Producer.Compression = sarama.CompressionNone
}

config.Producer.Retry.Max = 20
// Time out in five minutes(600 * 500ms).
config.Producer.Retry.Max = 600
config.Producer.Retry.Backoff = 500 * time.Millisecond

config.Admin.Retry.Max = 10000
// Time out in one minute(120 * 500ms).
config.Admin.Retry.Max = 120
config.Admin.Retry.Backoff = 500 * time.Millisecond
config.Admin.Timeout = 20 * time.Second

Expand Down

0 comments on commit e5d5d13

Please sign in to comment.