Skip to content

Commit

Permalink
kafka: change default metadataRefreshFrequency to 60s and use GroupSt…
Browse files Browse the repository at this point in the history
…rategies instead of deprecated Strategy config (#34)

sync from the internal merge request 1442
  • Loading branch information
liuzengh authored Dec 11, 2024
1 parent 8ccb67d commit e786899
Showing 1 changed file with 26 additions and 25 deletions.
51 changes: 26 additions & 25 deletions kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (uc *UserConfig) getServerConfig() *sarama.Config {
sc.Consumer.Fetch.Max = int32(uc.FetchMax)
sc.Consumer.Offsets.Initial = uc.Initial
sc.Consumer.Offsets.AutoCommit.Interval = 3 * time.Second // How often to submit consumption progress
sc.Consumer.Group.Rebalance.Strategy = uc.Strategy
sc.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{uc.Strategy}
sc.Consumer.Group.Rebalance.Timeout = uc.GroupRebalanceTimeout
sc.Consumer.Group.Rebalance.Retry.Max = uc.GroupRebalanceRetryMax
sc.Consumer.Group.Session.Timeout = uc.GroupSessionTimeout
Expand Down Expand Up @@ -136,30 +136,31 @@ func GetDefaultConfig() *UserConfig {
// The maximum waiting time for a single consumption pull request.
// The maximum wait time will only wait if there is no recent data.
// This value should be set larger to reduce the consumption of empty requests on the QPS of the server.
MaxWaitTime: time.Second,
RequiredAcks: sarama.WaitForAll,
ReturnSuccesses: true,
Timeout: time.Second, // Maximum request processing time on the server side
MaxMessageBytes: 131072, // CDMQ set up
FlushMessages: 0,
FlushMaxMessages: 0,
FlushBytes: 0,
FlushFrequency: 0,
BatchConsumeCount: 0,
BatchFlush: 2 * time.Second,
ScramClient: nil,
MaxRetry: 0, // Unlimited retries, compatible with historical situations
NetMaxOpenRequests: 5,
MaxProcessingTime: 100 * time.Millisecond,
NetDailTimeout: 30 * time.Second,
NetReadTimeout: 30 * time.Second,
NetWriteTimeout: 30 * time.Second,
GroupSessionTimeout: 10 * time.Second,
GroupRebalanceTimeout: 60 * time.Second,
GroupRebalanceRetryMax: 4,
MetadataRetryMax: 1,
MetadataRetryBackoff: 1000 * time.Millisecond,
MetadataRefreshFrequency: 600 * time.Second,
MaxWaitTime: time.Second,
RequiredAcks: sarama.WaitForAll,
ReturnSuccesses: true,
Timeout: time.Second, // Maximum request processing time on the server side
MaxMessageBytes: 131072, // CDMQ set up
FlushMessages: 0,
FlushMaxMessages: 0,
FlushBytes: 0,
FlushFrequency: 0,
BatchConsumeCount: 0,
BatchFlush: 2 * time.Second,
ScramClient: nil,
MaxRetry: 0, // Unlimited retries, compatible with historical situations
NetMaxOpenRequests: 5,
MaxProcessingTime: 100 * time.Millisecond,
NetDailTimeout: 30 * time.Second,
NetReadTimeout: 30 * time.Second,
NetWriteTimeout: 30 * time.Second,
GroupSessionTimeout: 10 * time.Second,
GroupRebalanceTimeout: 60 * time.Second,
GroupRebalanceRetryMax: 4,
MetadataRetryMax: 1,
MetadataRetryBackoff: 1000 * time.Millisecond,
// The default time for sarama is 10 minutes, which results in a slow detection of new partitions, so it is shortened to 2 minutes.
MetadataRefreshFrequency: 120 * time.Second,
MetadataFull: false, // disable pull all metadata
MetadataAllowAutoTopicCreation: true,
IsolationLevel: 0,
Expand Down

0 comments on commit e786899

Please sign in to comment.