Skip to content

Commit

Permalink
sink(ticdc): Optimize the performance when getting the metadata of Ka…
Browse files Browse the repository at this point in the history
…fka topics (#9060) (#9104)

close #8957, close #8959
  • Loading branch information
ti-chi-bot authored Jun 2, 2023
1 parent f827a89 commit 8a3bb9c
Show file tree
Hide file tree
Showing 12 changed files with 211 additions and 179 deletions.
198 changes: 103 additions & 95 deletions cdc/sink/mq/manager/kafka_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,59 +23,68 @@ import (
"github.com/Shopify/sarama"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
kafkaconfig "github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/sink/kafka"
"go.uber.org/atomic"
"go.uber.org/zap"
)

const (
// metaRefreshInterval is the interval of refreshing metadata.
// We can't get the metadata too frequently, because it may cause
// the kafka cluster to be overloaded. Especially when there are
// many topics in the cluster or there are many TiCDC changefeeds.
metaRefreshInterval = 10 * time.Minute
)

// kafkaTopicManager is a manager for kafka topics.
type kafkaTopicManager struct {
client kafka.Client
admin kafka.ClusterAdminClient
changefeedID model.ChangeFeedID

admin kafka.ClusterAdminClient

cfg *kafkaconfig.AutoCreateTopicConfig

topics sync.Map

lastMetadataRefresh atomic.Int64
metaRefreshTicker *time.Ticker

// cancel is used to cancel the background goroutine.
cancel context.CancelFunc
}

// NewKafkaTopicManager creates a new topic manager.
func NewKafkaTopicManager(
client kafka.Client,
ctx context.Context,
admin kafka.ClusterAdminClient,
cfg *kafkaconfig.AutoCreateTopicConfig,
) (*kafkaTopicManager, error) {
changefeedID := contextutil.ChangefeedIDFromCtx(ctx)
mgr := &kafkaTopicManager{
client: client,
admin: admin,
cfg: cfg,
changefeedID: changefeedID,
admin: admin,
cfg: cfg,
metaRefreshTicker: time.NewTicker(metaRefreshInterval),
}

// do an initial metadata fetching using ListTopics
err := mgr.listTopics()
if err != nil {
return nil, err
}
ctx, mgr.cancel = context.WithCancel(ctx)
// Background refresh metadata.
go mgr.backgroundRefreshMeta(ctx)

return mgr, nil
}

// GetPartitionNum returns the number of partitions of the topic.
// It may also try to update the topics' information maintained by manager.
func (m *kafkaTopicManager) GetPartitionNum(topic string) (int32, error) {
err := m.tryRefreshMeta()
if err != nil {
return 0, errors.Trace(err)
}

if partitions, ok := m.topics.Load(topic); ok {
return partitions.(int32), nil
}

// If the topic is not in the metadata, we try to create the topic.
partitionNum, err := m.CreateTopicAndWaitUntilVisible(topic)
if err != nil {
return 0, errors.Trace(err)
Expand All @@ -84,25 +93,33 @@ func (m *kafkaTopicManager) GetPartitionNum(topic string) (int32, error) {
return partitionNum, nil
}

// tryRefreshMeta try to refresh the topics' information maintained by manager.
func (m *kafkaTopicManager) tryRefreshMeta() error {
if time.Since(time.Unix(m.lastMetadataRefresh.Load(), 0)) > time.Minute {
topics, err := m.client.Topics()
if err != nil {
return err
}

for _, topic := range topics {
partitions, err := m.client.Partitions(topic)
func (m *kafkaTopicManager) backgroundRefreshMeta(ctx context.Context) {
for {
select {
case <-ctx.Done():
log.Info("Background refresh Kafka metadata goroutine exit.",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
)
return
case <-m.metaRefreshTicker.C:
topicMetaList, err := m.getMetadataOfTopics()
// We ignore the error here, because the error may be caused by the
// network problem, and we can try to get the metadata next time.
if err != nil {
return err
log.Warn("Get metadata of topics failed",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Error(err))
}

for _, detail := range topicMetaList {
partitionNum := int32(len(detail.Partitions))
m.tryUpdatePartitionsAndLogging(detail.Name, partitionNum)
}
m.tryUpdatePartitionsAndLogging(topic, int32(len(partitions)))

}
m.lastMetadataRefresh.Store(time.Now().Unix())
}

return nil
}

// tryUpdatePartitionsAndLogging try to update the partitions of the topic.
Expand All @@ -113,6 +130,8 @@ func (m *kafkaTopicManager) tryUpdatePartitionsAndLogging(topic string, partitio
m.topics.Store(topic, partitions)
log.Info(
"update topic partition number",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.String("topic", topic),
zap.Int32("oldPartitionNumber", oldPartitions.(int32)),
zap.Int32("newPartitionNumber", partitions),
Expand All @@ -122,6 +141,8 @@ func (m *kafkaTopicManager) tryUpdatePartitionsAndLogging(topic string, partitio
m.topics.Store(topic, partitions)
log.Info(
"store topic partition number",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.String("topic", topic),
zap.Int32("partitionNumber", partitions),
)
Expand All @@ -143,6 +164,8 @@ func (m *kafkaTopicManager) getMetadataOfTopics() ([]*sarama.TopicMetadata, erro
if err != nil {
log.Warn(
"Kafka admin client describe topics failed",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Error(err),
zap.Duration("duration", time.Since(start)),
)
Expand All @@ -151,6 +174,8 @@ func (m *kafkaTopicManager) getMetadataOfTopics() ([]*sarama.TopicMetadata, erro

log.Info(
"Kafka admin client describe topics success",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Duration("duration", time.Since(start)))

return topicMetaList, nil
Expand All @@ -167,6 +192,8 @@ func (m *kafkaTopicManager) waitUntilTopicVisible(topicName string) error {
topicMetaList, err := m.admin.DescribeTopics([]string{topicName})
if err != nil {
log.Error("Kafka admin client describe topic failed",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.String("topic", topicName),
zap.Error(err),
zap.Duration("duration", time.Since(start)))
Expand All @@ -175,6 +202,8 @@ func (m *kafkaTopicManager) waitUntilTopicVisible(topicName string) error {

if len(topicMetaList) != 1 {
log.Error("topic metadata length is wrong.",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.String("topic", topicName),
zap.Int("expected", 1),
zap.Int("actual", len(topicMetaList)))
Expand All @@ -185,12 +214,16 @@ func (m *kafkaTopicManager) waitUntilTopicVisible(topicName string) error {
meta := topicMetaList[0]
if meta.Err != sarama.ErrNoError {
log.Error("topic metadata is fetched with error",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.String("topic", topicName),
zap.Error(meta.Err))
return meta.Err
}

log.Info("Kafka admin client describe topic success",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.String("topic", topicName),
zap.Int("partitionNumber", len(meta.Partitions)),
zap.Duration("duration", time.Since(start)))
Expand All @@ -204,85 +237,26 @@ func (m *kafkaTopicManager) waitUntilTopicVisible(topicName string) error {
return err
}

// listTopics is used to do an initial metadata fetching.
func (m *kafkaTopicManager) listTopics() error {
start := time.Now()
topics, err := m.admin.ListTopics()
if err != nil {
log.Error(
"Kafka admin client list topics failed",
zap.Error(err),
zap.Duration("duration", time.Since(start)),
)
return errors.Trace(err)
}
log.Info(
"Kafka admin client list topics success",
zap.Duration("duration", time.Since(start)),
)

// Now that we have access to the latest topics' information,
// we need to update it here immediately.
for topic, detail := range topics {
m.tryUpdatePartitionsAndLogging(topic, detail.NumPartitions)
}
m.lastMetadataRefresh.Store(time.Now().Unix())

return nil
}

// createTopic creates a topic with the given name
// and returns the number of partitions.
func (m *kafkaTopicManager) createTopic(topicName string) (int32, error) {
topicMetaList, err := m.getMetadataOfTopics()
if err != nil {
return 0, errors.Trace(err)
}

// Now that we have access to the latest topics' information,
// we need to update it here immediately.
targetTopicFound := false
targetTopicPartitionNum := 0
for _, topic := range topicMetaList {
if topic.Err != sarama.ErrNoError {
log.Error("Kafka admin client fetch topic metadata failed.",
zap.String("topic", topic.Name),
zap.Error(topic.Err))
continue
}

if topic.Name == topicName {
targetTopicFound = true
targetTopicPartitionNum = len(topic.Partitions)
}
m.tryUpdatePartitionsAndLogging(topic.Name, int32(len(topic.Partitions)))
}
m.lastMetadataRefresh.Store(time.Now().Unix())

// Maybe our cache has expired information, so we just return it.
if targetTopicFound {
log.Info(
"topic already exists and the cached information has expired",
zap.String("topic", topicName),
)
return int32(targetTopicPartitionNum), nil
}

if !m.cfg.AutoCreate {
return 0, cerror.ErrKafkaInvalidConfig.GenWithStack(
fmt.Sprintf("`auto-create-topic` is false, "+
"and %s not found", topicName))
}

start := time.Now()
err = m.admin.CreateTopic(topicName, &sarama.TopicDetail{
err := m.admin.CreateTopic(topicName, &sarama.TopicDetail{
NumPartitions: m.cfg.PartitionNum,
ReplicationFactor: m.cfg.ReplicationFactor,
}, false)
// Ignore the already exists error because it's not harmful.
if err != nil && !strings.Contains(err.Error(), sarama.ErrTopicAlreadyExists.Error()) {
log.Error(
"Kafka admin client create the topic failed",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.String("topic", topicName),
zap.Int32("partitionNumber", m.cfg.PartitionNum),
zap.Int16("replicationFactor", m.cfg.ReplicationFactor),
Expand All @@ -294,6 +268,8 @@ func (m *kafkaTopicManager) createTopic(topicName string) (int32, error) {

log.Info(
"Kafka admin client create the topic success",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.String("topic", topicName),
zap.Int32("partitionNumber", m.cfg.PartitionNum),
zap.Int16("replicationFactor", m.cfg.ReplicationFactor),
Expand All @@ -306,6 +282,33 @@ func (m *kafkaTopicManager) createTopic(topicName string) (int32, error) {

// CreateTopicAndWaitUntilVisible wraps createTopic and waitUntilTopicVisible together.
func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible(topicName string) (int32, error) {
// If the topic is not in the cache, we try to get the metadata of the topic.
topicDetails, err := m.admin.DescribeTopics([]string{topicName})
if err != nil {
return 0, errors.Trace(err)
}
for _, detail := range topicDetails {
if detail.Err == sarama.ErrNoError {
if detail.Name == topicName {
log.Info("Kafka topic already exists",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.String("topic", topicName),
zap.Int32("partitionNumber", int32(len(detail.Partitions))))
partitionNum := int32(len(detail.Partitions))
m.tryUpdatePartitionsAndLogging(topicName, partitionNum)
return partitionNum, nil
}
} else if detail.Err != sarama.ErrUnknownTopicOrPartition {
log.Error("Kafka admin client describe topic failed",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.String("topic", topicName),
zap.Error(detail.Err))
return 0, errors.Trace(detail.Err)
}
}

partitionNum, err := m.createTopic(topicName)
if err != nil {
return 0, errors.Trace(err)
Expand All @@ -318,3 +321,8 @@ func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible(topicName string) (in

return partitionNum, nil
}

// Close exits the background goroutine.
func (m *kafkaTopicManager) Close() {
m.cancel()
}
Loading

0 comments on commit 8a3bb9c

Please sign in to comment.