-
Notifications
You must be signed in to change notification settings - Fork 284
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ticdc: support detecting the Kafka cluster version automatically #10854
Changes from 3 commits
8f7849b
1b02cc0
b075719
57a178c
5cb0d36
c397631
fc56778
49f78fb
4c10b56
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ | |
import ( | ||
"context" | ||
"crypto/tls" | ||
"math/rand" | ||
"strings" | ||
"time" | ||
|
||
|
@@ -31,13 +32,7 @@ | |
func NewSaramaConfig(ctx context.Context, o *Options) (*sarama.Config, error) { | ||
config := sarama.NewConfig() | ||
config.ClientID = o.ClientID | ||
|
||
version, err := sarama.ParseKafkaVersion(o.Version) | ||
if err != nil { | ||
return nil, cerror.WrapError(cerror.ErrKafkaInvalidVersion, err) | ||
} | ||
config.Version = version | ||
|
||
var err error | ||
// Admin client would refresh metadata periodically, | ||
// if metadata cannot be refreshed easily, this would indicate the network condition between the | ||
// capture server and kafka broker is not good. | ||
|
@@ -122,6 +117,28 @@ | |
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) | ||
} | ||
|
||
kafkaVersion, err := GetKafkaVersion(ctx, config, o) | ||
if err != nil { | ||
log.Warn("Can't get Kafka version by broker. ticdc will use default version", | ||
zap.String("defaultVersion", kafkaVersion.String())) | ||
} | ||
|
||
version, err := sarama.ParseKafkaVersion(o.Version) | ||
if err != nil { | ||
return nil, cerror.WrapError(cerror.ErrKafkaInvalidVersion, err) | ||
} | ||
if o.IsAssignedVersion { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
config.Version = version | ||
if version.String() != kafkaVersion.String() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. according to the api version mapping, if the kafka cluster version is greater than 2.8.0, |
||
log.Warn("The Kafka version you assigned may not be correct. "+ | ||
"Please assign a version equal to or less than the specified version", | ||
zap.String("assignedVersion", version.String()), | ||
zap.String("desiredVersion", kafkaVersion.String())) | ||
} | ||
} else { | ||
config.Version = kafkaVersion | ||
} | ||
|
||
return config, nil | ||
} | ||
|
||
|
@@ -167,3 +184,66 @@ | |
|
||
return nil | ||
} | ||
|
||
func GetKafkaVersion(ctx context.Context, config *sarama.Config, o *Options) (sarama.KafkaVersion, error) { | ||
var err error | ||
addrs := o.BrokerEndpoints | ||
if len(addrs) > 1 { | ||
// Shuffle the list of addresses to randomize the order in which | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why shuffle here ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Prevent latency from increasing when the first address is inaccessible. |
||
// connections are attempted. This prevents routing all connections | ||
// to the first broker (which will usually succeed). | ||
rand.Shuffle(len(addrs), func(i, j int) { | ||
addrs[i], addrs[j] = addrs[j], addrs[i] | ||
}) | ||
} | ||
for i := range addrs { | ||
broker := sarama.NewBroker(addrs[i]) | ||
err = broker.Open(config) | ||
if err != nil { | ||
log.Warn("Kafka fail to open broker", zap.String("addr", addrs[i])) | ||
continue | ||
} | ||
defer func() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do not use defer in for loop |
||
broker.Close() | ||
}() | ||
apiResponse, err := broker.ApiVersions(&sarama.ApiVersionsRequest{}) | ||
if err != nil { | ||
log.Warn("Kafka fail to get ApiVersions", zap.String("addr", addrs[i])) | ||
continue | ||
} | ||
// ApiKey method | ||
// 0 Produce | ||
// 3 Metadata (default) | ||
version := apiResponse.ApiKeys[3].MaxVersion | ||
kafkaVersion := requiredVersion(version) | ||
return kafkaVersion, nil | ||
} | ||
return sarama.V2_0_0_0, err | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is better to use constants rather than literals. |
||
} | ||
|
||
func requiredVersion(version int16) sarama.KafkaVersion { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
switch version { | ||
case 10: | ||
return sarama.V2_8_0_0 | ||
case 9: | ||
return sarama.V2_4_0_0 | ||
case 8: | ||
return sarama.V2_3_0_0 | ||
case 7: | ||
return sarama.V2_1_0_0 | ||
case 6: | ||
return sarama.V2_0_0_0 | ||
case 5: | ||
return sarama.V1_0_0_0 | ||
case 3, 4: | ||
return sarama.V0_11_0_0 | ||
case 2: | ||
return sarama.V0_10_1_0 | ||
case 1: | ||
return sarama.V0_10_0_0 | ||
case 0: | ||
return sarama.V0_8_2_0 | ||
default: | ||
return sarama.V2_0_0_0 | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only detect the kafka cluster version when there is no version specified by the user
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
detect the kafka version is necessary if the user specified an error version