Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ticdc: support detecting the Kafka cluster version automatically #10854

Merged
merged 9 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/sink/kafka/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ type Options struct {
// User should make sure that `replication-factor` not greater than the number of kafka brokers.
ReplicationFactor int16
Version string
IsAssignedVersion bool
MaxMessageBytes int
Compression string
ClientID string
Expand Down Expand Up @@ -246,6 +247,7 @@ func (o *Options) Apply(changefeedID model.ChangeFeedID,

if urlParameter.KafkaVersion != nil {
o.Version = *urlParameter.KafkaVersion
o.IsAssignedVersion = true
}

if urlParameter.MaxMessageBytes != nil {
Expand Down
94 changes: 87 additions & 7 deletions pkg/sink/kafka/sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import (
"context"
"crypto/tls"
"math/rand"
"strings"
"time"

Expand All @@ -31,13 +32,7 @@
func NewSaramaConfig(ctx context.Context, o *Options) (*sarama.Config, error) {
config := sarama.NewConfig()
config.ClientID = o.ClientID

version, err := sarama.ParseKafkaVersion(o.Version)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidVersion, err)
}
config.Version = version

var err error
// Admin client would refresh metadata periodically,
// if metadata cannot be refreshed easily, this would indicate the network condition between the
// capture server and kafka broker is not good.
Expand Down Expand Up @@ -122,6 +117,28 @@
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}

kafkaVersion, err := GetKafkaVersion(ctx, config, o)
Copy link
Member

@sdojjy sdojjy Mar 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only detect the kafka cluster version when there is no version specified by the user

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

detect the kafka version is necessary if the user specified an error version

if err != nil {
log.Warn("Can't get Kafka version by broker. ticdc will use default version",
zap.String("defaultVersion", kafkaVersion.String()))
}

version, err := sarama.ParseKafkaVersion(o.Version)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidVersion, err)
}
if o.IsAssignedVersion {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config.Version = kafkaVersion can be set at the very first, else is redundant.

config.Version = version
if version.String() != kafkaVersion.String() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to the api version mapping, if the kafka cluster version is greater than 2.8.0, version.String() != kafkaVersion.String() is always true.

log.Warn("The Kafka version you assigned may not be correct. "+
"Please assign a version equal to or less than the specified version",
zap.String("assignedVersion", version.String()),
zap.String("desiredVersion", kafkaVersion.String()))
}
} else {
config.Version = kafkaVersion
}

return config, nil
}

Expand Down Expand Up @@ -167,3 +184,66 @@

return nil
}

func GetKafkaVersion(ctx context.Context, config *sarama.Config, o *Options) (sarama.KafkaVersion, error) {

Check failure on line 188 in pkg/sink/kafka/sarama.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

exported: exported function GetKafkaVersion should have comment or be unexported (revive)
var err error
addrs := o.BrokerEndpoints
if len(addrs) > 1 {
// Shuffle the list of addresses to randomize the order in which
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why shuffle here ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prevent latency from increasing when the first address is inaccessible.

// connections are attempted. This prevents routing all connections
// to the first broker (which will usually succeed).
rand.Shuffle(len(addrs), func(i, j int) {
addrs[i], addrs[j] = addrs[j], addrs[i]
})
}
for i := range addrs {
broker := sarama.NewBroker(addrs[i])
err = broker.Open(config)
if err != nil {
log.Warn("Kafka fail to open broker", zap.String("addr", addrs[i]))
continue
}
defer func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not use defer in for loop

broker.Close()
}()
apiResponse, err := broker.ApiVersions(&sarama.ApiVersionsRequest{})
if err != nil {
log.Warn("Kafka fail to get ApiVersions", zap.String("addr", addrs[i]))
continue
}
// ApiKey method
// 0 Produce
// 3 Metadata (default)
version := apiResponse.ApiKeys[3].MaxVersion
kafkaVersion := requiredVersion(version)
return kafkaVersion, nil
}
return sarama.V2_0_0_0, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to use constants rather than literals.

}

func requiredVersion(version int16) sarama.KafkaVersion {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requiredVersion is not a good name here, this function is just a version mapping.

switch version {
case 10:
return sarama.V2_8_0_0
case 9:
return sarama.V2_4_0_0
case 8:
return sarama.V2_3_0_0
case 7:
return sarama.V2_1_0_0
case 6:
return sarama.V2_0_0_0
case 5:
return sarama.V1_0_0_0
case 3, 4:
return sarama.V0_11_0_0
case 2:
return sarama.V0_10_1_0
case 1:
return sarama.V0_10_0_0
case 0:
return sarama.V0_8_2_0
default:
return sarama.V2_0_0_0
}
}
Loading