-
Notifications
You must be signed in to change notification settings - Fork 284
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ticdc: support detecting the Kafka cluster version automatically #10854
Conversation
Hi @wk989898. Thanks for your PR. I'm waiting for a pingcap member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
pkg/sink/kafka/sarama.go
Outdated
@@ -122,6 +117,28 @@ func NewSaramaConfig(ctx context.Context, o *Options) (*sarama.Config, error) { | |||
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) | |||
} | |||
|
|||
kafkaVersion, err := GetKafkaVersion(ctx, config, o) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only detect the kafka cluster version when there is no version specified by the user
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
detect the kafka version is necessary if the user specified an error version
pkg/sink/kafka/sarama.go
Outdated
} | ||
if o.IsAssignedVersion { | ||
config.Version = version | ||
if version.String() != kafkaVersion.String() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
according to the api version mapping, if the kafka cluster version is greater than 2.8.0, version.String() != kafkaVersion.String()
is always true.
pkg/sink/kafka/sarama.go
Outdated
log.Warn("Kafka fail to open broker", zap.String("addr", addrs[i])) | ||
continue | ||
} | ||
defer func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do not use defer in for loop
pkg/sink/kafka/sarama.go
Outdated
return sarama.V2_0_0_0, err | ||
} | ||
|
||
func requiredVersion(version int16) sarama.KafkaVersion { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
requiredVersion
is not a good name here, this function is just a version mapping.
pkg/sink/kafka/sarama.go
Outdated
} | ||
if o.IsAssignedVersion { | ||
config.Version = version | ||
if !version.IsAtLeast(sarama.V2_8_0_0) && version.String() != kafkaVersion.String() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better to use constants rather than literals.
pkg/sink/kafka/sarama.go
Outdated
return version, err | ||
} | ||
} | ||
return sarama.V2_0_0_0, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better to use constants rather than literals.
if err != nil { | ||
return nil, cerror.WrapError(cerror.ErrKafkaInvalidVersion, err) | ||
} | ||
if o.IsAssignedVersion { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
config.Version = kafkaVersion
can be set at the very first, else
is redundant.
var err error | ||
addrs := o.BrokerEndpoints | ||
if len(addrs) > 1 { | ||
// Shuffle the list of addresses to randomize the order in which |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why shuffle here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prevent latency from increasing when the first address is inaccessible.
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: 3AceShowHand, CharlesCheung96 The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
[LGTM Timeline notifier]Timeline:
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files
Flags with carried forward coverage won't be shown. Click here to find out more. @@ Coverage Diff @@
## master #10854 +/- ##
================================================
+ Coverage 57.3966% 57.4052% +0.0086%
================================================
Files 851 851
Lines 125469 125539 +70
================================================
+ Hits 72015 72066 +51
- Misses 48034 48056 +22
+ Partials 5420 5417 -3 |
/retest |
@wk989898: Cannot trigger testing until a trusted user reviews the PR and leaves an In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
/retest |
In response to a cherrypick label: new pull request created to branch |
In response to a cherrypick label: new pull request created to branch |
What problem does this PR solve?
Issue Number: close #10852
What is changed and how it works?
Add Kafka version detect. Connect a broker and use
ApiVersionsRequest
to getApiKeys
andMaxVersion
, according toMaxVersion
infer the Kafka version.Test:
ticdc instance: v8.0.0-master-dirty
pd instance:v7.6.0
tikv instance:v7.6.0
tidb instance:v7.6.0
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
not sure.
Do you need to update user documentation, design documentation or monitoring documentation?
no.
Release note