Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): Optimize the performance when getting the metadata of Kafka topics #9146

Merged
merged 8 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 104 additions & 95 deletions cdc/sink/mq/manager/kafka_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,59 +23,68 @@ import (
"github.com/Shopify/sarama"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
kafkaconfig "github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/kafka"
"github.com/pingcap/tiflow/pkg/retry"
"go.uber.org/atomic"
"go.uber.org/zap"
)

const (
// metaRefreshInterval is the interval of refreshing metadata.
// We can't get the metadata too frequently, because it may cause
// the kafka cluster to be overloaded. Especially when there are
// many topics in the cluster or there are many TiCDC changefeeds.
metaRefreshInterval = 10 * time.Minute
)

// kafkaTopicManager is a manager for kafka topics.
type kafkaTopicManager struct {
client kafka.Client
admin kafka.ClusterAdminClient
changefeedID model.ChangeFeedID

admin kafka.ClusterAdminClient

cfg *kafkaconfig.AutoCreateTopicConfig

topics sync.Map

lastMetadataRefresh atomic.Int64
metaRefreshTicker *time.Ticker

// cancel is used to cancel the background goroutine.
cancel context.CancelFunc
}

// NewKafkaTopicManager creates a new topic manager.
func NewKafkaTopicManager(
client kafka.Client,
ctx context.Context,
admin kafka.ClusterAdminClient,
cfg *kafkaconfig.AutoCreateTopicConfig,
) (*kafkaTopicManager, error) {
changefeedID := contextutil.ChangefeedIDFromCtx(ctx)
mgr := &kafkaTopicManager{
client: client,
admin: admin,
cfg: cfg,
changefeedID: changefeedID,
admin: admin,
cfg: cfg,
metaRefreshTicker: time.NewTicker(metaRefreshInterval),
}

// do an initial metadata fetching using ListTopics
err := mgr.listTopics()
if err != nil {
return nil, err
}
ctx, mgr.cancel = context.WithCancel(ctx)
// Background refresh metadata.
go mgr.backgroundRefreshMeta(ctx)

return mgr, nil
}

// GetPartitionNum returns the number of partitions of the topic.
// It may also try to update the topics' information maintained by manager.
func (m *kafkaTopicManager) GetPartitionNum(topic string) (int32, error) {
err := m.tryRefreshMeta()
if err != nil {
return 0, errors.Trace(err)
}

if partitions, ok := m.topics.Load(topic); ok {
return partitions.(int32), nil
}

// If the topic is not in the metadata, we try to create the topic.
partitionNum, err := m.CreateTopicAndWaitUntilVisible(topic)
if err != nil {
return 0, errors.Trace(err)
Expand All @@ -84,25 +93,33 @@ func (m *kafkaTopicManager) GetPartitionNum(topic string) (int32, error) {
return partitionNum, nil
}

// tryRefreshMeta try to refresh the topics' information maintained by manager.
func (m *kafkaTopicManager) tryRefreshMeta() error {
if time.Since(time.Unix(m.lastMetadataRefresh.Load(), 0)) > time.Minute {
topics, err := m.client.Topics()
if err != nil {
return err
}

for _, topic := range topics {
partitions, err := m.client.Partitions(topic)
func (m *kafkaTopicManager) backgroundRefreshMeta(ctx context.Context) {
for {
select {
case <-ctx.Done():
log.Info("Background refresh Kafka metadata goroutine exit.",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
)
return
case <-m.metaRefreshTicker.C:
topicMetaList, err := m.getMetadataOfTopics()
// We ignore the error here, because the error may be caused by the
// network problem, and we can try to get the metadata next time.
if err != nil {
return err
log.Warn("Get metadata of topics failed",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Error(err))
}
m.tryUpdatePartitionsAndLogging(topic, int32(len(partitions)))

for _, detail := range topicMetaList {
partitionNum := int32(len(detail.Partitions))
m.tryUpdatePartitionsAndLogging(detail.Name, partitionNum)
}

}
m.lastMetadataRefresh.Store(time.Now().Unix())
}

return nil
}

// tryUpdatePartitionsAndLogging try to update the partitions of the topic.
Expand All @@ -113,6 +130,8 @@ func (m *kafkaTopicManager) tryUpdatePartitionsAndLogging(topic string, partitio
m.topics.Store(topic, partitions)
log.Info(
"update topic partition number",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.String("topic", topic),
zap.Int32("oldPartitionNumber", oldPartitions.(int32)),
zap.Int32("newPartitionNumber", partitions),
Expand All @@ -122,6 +141,8 @@ func (m *kafkaTopicManager) tryUpdatePartitionsAndLogging(topic string, partitio
m.topics.Store(topic, partitions)
log.Info(
"store topic partition number",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.String("topic", topic),
zap.Int32("partitionNumber", partitions),
)
Expand All @@ -143,13 +164,18 @@ func (m *kafkaTopicManager) getMetadataOfTopics() ([]*sarama.TopicMetadata, erro
if err != nil {
log.Warn(
"Kafka admin client describe topics failed",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Error(err),
zap.Duration("duration", time.Since(start)),
)
return nil, err
}

log.Info(
"Kafka admin client describe topics success",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Duration("duration", time.Since(start)))

return topicMetaList, nil
Expand All @@ -166,6 +192,8 @@ func (m *kafkaTopicManager) waitUntilTopicVisible(topicName string) error {
topicMetaList, err := m.admin.DescribeTopics([]string{topicName})
if err != nil {
log.Error("Kafka admin client describe topic failed",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.String("topic", topicName),
zap.Error(err),
zap.Duration("duration", time.Since(start)))
Expand All @@ -174,6 +202,8 @@ func (m *kafkaTopicManager) waitUntilTopicVisible(topicName string) error {

if len(topicMetaList) != 1 {
log.Error("topic metadata length is wrong.",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.String("topic", topicName),
zap.Int("expected", 1),
zap.Int("actual", len(topicMetaList)))
Expand All @@ -184,12 +214,16 @@ func (m *kafkaTopicManager) waitUntilTopicVisible(topicName string) error {
meta := topicMetaList[0]
if meta.Err != sarama.ErrNoError {
log.Error("topic metadata is fetched with error",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.String("topic", topicName),
zap.Error(meta.Err))
return meta.Err
}

log.Info("Kafka admin client describe topic success",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.String("topic", topicName),
zap.Int("partitionNumber", len(meta.Partitions)),
zap.Duration("duration", time.Since(start)))
Expand All @@ -203,85 +237,26 @@ func (m *kafkaTopicManager) waitUntilTopicVisible(topicName string) error {
return err
}

// listTopics is used to do an initial metadata fetching.
func (m *kafkaTopicManager) listTopics() error {
start := time.Now()
topics, err := m.admin.ListTopics()
if err != nil {
log.Error(
"Kafka admin client list topics failed",
zap.Error(err),
zap.Duration("duration", time.Since(start)),
)
return errors.Trace(err)
}
log.Info(
"Kafka admin client list topics success",
zap.Duration("duration", time.Since(start)),
)

// Now that we have access to the latest topics' information,
// we need to update it here immediately.
for topic, detail := range topics {
m.tryUpdatePartitionsAndLogging(topic, detail.NumPartitions)
}
m.lastMetadataRefresh.Store(time.Now().Unix())

return nil
}

// createTopic creates a topic with the given name
// and returns the number of partitions.
func (m *kafkaTopicManager) createTopic(topicName string) (int32, error) {
topicMetaList, err := m.getMetadataOfTopics()
if err != nil {
return 0, errors.Trace(err)
}

// Now that we have access to the latest topics' information,
// we need to update it here immediately.
targetTopicFound := false
targetTopicPartitionNum := 0
for _, topic := range topicMetaList {
if topic.Err != sarama.ErrNoError {
log.Error("Kafka admin client fetch topic metadata failed.",
zap.String("topic", topic.Name),
zap.Error(topic.Err))
continue
}

if topic.Name == topicName {
targetTopicFound = true
targetTopicPartitionNum = len(topic.Partitions)
}
m.tryUpdatePartitionsAndLogging(topic.Name, int32(len(topic.Partitions)))
}
m.lastMetadataRefresh.Store(time.Now().Unix())

// Maybe our cache has expired information, so we just return it.
if targetTopicFound {
log.Info(
"topic already exists and the cached information has expired",
zap.String("topic", topicName),
)
return int32(targetTopicPartitionNum), nil
}

if !m.cfg.AutoCreate {
return 0, cerror.ErrKafkaInvalidConfig.GenWithStack(
fmt.Sprintf("`auto-create-topic` is false, "+
"and %s not found", topicName))
}

start := time.Now()
err = m.admin.CreateTopic(topicName, &sarama.TopicDetail{
err := m.admin.CreateTopic(topicName, &sarama.TopicDetail{
NumPartitions: m.cfg.PartitionNum,
ReplicationFactor: m.cfg.ReplicationFactor,
}, false)
// Ignore the already exists error because it's not harmful.
if err != nil && !strings.Contains(err.Error(), sarama.ErrTopicAlreadyExists.Error()) {
log.Error(
"Kafka admin client create the topic failed",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.String("topic", topicName),
zap.Int32("partitionNumber", m.cfg.PartitionNum),
zap.Int16("replicationFactor", m.cfg.ReplicationFactor),
Expand All @@ -293,6 +268,8 @@ func (m *kafkaTopicManager) createTopic(topicName string) (int32, error) {

log.Info(
"Kafka admin client create the topic success",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.String("topic", topicName),
zap.Int32("partitionNumber", m.cfg.PartitionNum),
zap.Int16("replicationFactor", m.cfg.ReplicationFactor),
Expand All @@ -305,6 +282,33 @@ func (m *kafkaTopicManager) createTopic(topicName string) (int32, error) {

// CreateTopicAndWaitUntilVisible wraps createTopic and waitUntilTopicVisible together.
func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible(topicName string) (int32, error) {
// If the topic is not in the cache, we try to get the metadata of the topic.
topicDetails, err := m.admin.DescribeTopics([]string{topicName})
if err != nil {
return 0, errors.Trace(err)
}
for _, detail := range topicDetails {
if detail.Err == sarama.ErrNoError {
if detail.Name == topicName {
log.Info("Kafka topic already exists",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.String("topic", topicName),
zap.Int32("partitionNumber", int32(len(detail.Partitions))))
partitionNum := int32(len(detail.Partitions))
m.tryUpdatePartitionsAndLogging(topicName, partitionNum)
return partitionNum, nil
}
} else if detail.Err != sarama.ErrUnknownTopicOrPartition {
log.Error("Kafka admin client describe topic failed",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.String("topic", topicName),
zap.Error(detail.Err))
return 0, errors.Trace(detail.Err)
}
}

partitionNum, err := m.createTopic(topicName)
if err != nil {
return 0, errors.Trace(err)
Expand All @@ -317,3 +321,8 @@ func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible(topicName string) (in

return partitionNum, nil
}

// Close exits the background goroutine.
func (m *kafkaTopicManager) Close() {
m.cancel()
}
Loading