Skip to content

Commit

Permalink
sink(ticdc): Optimize the performance when getting the metadata of Ka…
Browse files Browse the repository at this point in the history
…fka topics (#9060) (#9105)

close #8957, close #8959
  • Loading branch information
ti-chi-bot authored Jun 8, 2023
1 parent f5ee171 commit 8c3a94b
Show file tree
Hide file tree
Showing 11 changed files with 116 additions and 415 deletions.
3 changes: 3 additions & 0 deletions cdc/sink/ddlsink/mq/mq_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ func (k *DDLSink) Close() {
if k.producer != nil {
k.producer.Close()
}
if k.topicManager != nil {
k.topicManager.Close()
}
if k.admin != nil {
k.admin.Close()
}
Expand Down
156 changes: 81 additions & 75 deletions cdc/sink/dmlsink/mq/manager/kafka_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,36 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/sink/kafka"
"go.uber.org/atomic"
"go.uber.org/zap"
)

const (
// metaRefreshInterval is the interval of refreshing metadata.
// We can't get the metadata too frequently, because it may cause
// the kafka cluster to be overloaded. Especially when there are
// many topics in the cluster or there are many TiCDC changefeeds.
metaRefreshInterval = 10 * time.Minute
)

// kafkaTopicManager is a manager for kafka topics.
type kafkaTopicManager struct {
changefeedID model.ChangeFeedID

admin kafka.ClusterAdminClient

cfg *kafka.AutoCreateTopicConfig

topics sync.Map

lastMetadataRefresh atomic.Int64
metaRefreshTicker *time.Ticker

// cancel is used to cancel the background goroutine.
cancel context.CancelFunc
}

// NewKafkaTopicManager creates a new topic manager.
Expand All @@ -45,16 +59,17 @@ func NewKafkaTopicManager(
admin kafka.ClusterAdminClient,
cfg *kafka.AutoCreateTopicConfig,
) (*kafkaTopicManager, error) {
changefeedID := contextutil.ChangefeedIDFromCtx(ctx)
mgr := &kafkaTopicManager{
admin: admin,
cfg: cfg,
changefeedID: changefeedID,
admin: admin,
cfg: cfg,
metaRefreshTicker: time.NewTicker(metaRefreshInterval),
}

// do an initial metadata fetching using ListTopics
err := mgr.listTopics(ctx)
if err != nil {
return nil, err
}
ctx, mgr.cancel = context.WithCancel(ctx)
// Background refresh metadata.
go mgr.backgroundRefreshMeta(ctx)

return mgr, nil
}
Expand All @@ -65,15 +80,11 @@ func (m *kafkaTopicManager) GetPartitionNum(
ctx context.Context,
topic string,
) (int32, error) {
err := m.tryRefreshMeta(ctx)
if err != nil {
return 0, errors.Trace(err)
}

if partitions, ok := m.topics.Load(topic); ok {
return partitions.(int32), nil
}

// If the topic is not in the metadata, we try to create the topic.
partitionNum, err := m.CreateTopicAndWaitUntilVisible(ctx, topic)
if err != nil {
return 0, errors.Trace(err)
Expand All @@ -82,20 +93,32 @@ func (m *kafkaTopicManager) GetPartitionNum(
return partitionNum, nil
}

// tryRefreshMeta try to refresh the topics' information maintained by manager.
func (m *kafkaTopicManager) tryRefreshMeta(ctx context.Context) error {
if time.Since(time.Unix(m.lastMetadataRefresh.Load(), 0)) > time.Minute {
topics, err := m.admin.GetTopicsPartitions(ctx)
if err != nil {
return err
}
for topic, numPartitions := range topics {
m.tryUpdatePartitionsAndLogging(topic, numPartitions)
func (m *kafkaTopicManager) backgroundRefreshMeta(ctx context.Context) {
for {
select {
case <-ctx.Done():
log.Info("Background refresh Kafka metadata goroutine exit.",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
)
return
case <-m.metaRefreshTicker.C:
topicMetaList, err := m.getMetadataOfTopics(ctx)
// We ignore the error here, because the error may be caused by the
// network problem, and we can try to get the metadata next time.
if err != nil {
log.Warn("Get metadata of topics failed",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Error(err))
}

for topic, detail := range topicMetaList {
m.tryUpdatePartitionsAndLogging(topic, detail.NumPartitions)
}

}
m.lastMetadataRefresh.Store(time.Now().Unix())
}

return nil
}

// tryUpdatePartitionsAndLogging try to update the partitions of the topic.
Expand All @@ -106,6 +129,8 @@ func (m *kafkaTopicManager) tryUpdatePartitionsAndLogging(topic string, partitio
m.topics.Store(topic, partitions)
log.Info(
"update topic partition number",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.String("topic", topic),
zap.Int32("oldPartitionNumber", oldPartitions.(int32)),
zap.Int32("newPartitionNumber", partitions),
Expand All @@ -115,6 +140,8 @@ func (m *kafkaTopicManager) tryUpdatePartitionsAndLogging(topic string, partitio
m.topics.Store(topic, partitions)
log.Info(
"store topic partition number",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.String("topic", topic),
zap.Int32("partitionNumber", partitions),
)
Expand All @@ -139,6 +166,8 @@ func (m *kafkaTopicManager) getMetadataOfTopics(
if err != nil {
log.Warn(
"Kafka admin client describe topics failed",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Error(err),
zap.Duration("duration", time.Since(start)),
)
Expand All @@ -147,6 +176,8 @@ func (m *kafkaTopicManager) getMetadataOfTopics(

log.Info(
"Kafka admin client describe topics success",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Duration("duration", time.Since(start)))

return topicMetaList, nil
Expand All @@ -167,12 +198,16 @@ func (m *kafkaTopicManager) waitUntilTopicVisible(
meta, err := m.admin.GetTopicsMeta(ctx, topics, false)
if err != nil {
log.Warn(" topic not found, retry it",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Error(err),
zap.Duration("duration", time.Since(start)),
)
return err
}
log.Info("topic found",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.String("topic", topicName),
zap.Int32("partitionNumber", meta[topicName].NumPartitions),
zap.Duration("duration", time.Since(start)))
Expand All @@ -185,75 +220,29 @@ func (m *kafkaTopicManager) waitUntilTopicVisible(
return err
}

// listTopics is used to do an initial metadata fetching.
func (m *kafkaTopicManager) listTopics(ctx context.Context) error {
start := time.Now()

topics, err := m.admin.GetTopicsPartitions(ctx)
if err != nil {
log.Error(
"Kafka admin client list topics failed",
zap.Error(err),
zap.Duration("duration", time.Since(start)),
)
return errors.Trace(err)
}
log.Info(
"Kafka admin client list topics success",
zap.Duration("duration", time.Since(start)),
)

// Now that we have access to the latest topics' information,
// we need to update it here immediately.
for topic, numPartitions := range topics {
m.tryUpdatePartitionsAndLogging(topic, numPartitions)
}
m.lastMetadataRefresh.Store(time.Now().Unix())

return nil
}

// createTopic creates a topic with the given name
// and returns the number of partitions.
func (m *kafkaTopicManager) createTopic(
ctx context.Context,
topicName string,
) (int32, error) {
topicMetaList, err := m.getMetadataOfTopics(ctx)
if err != nil {
return 0, errors.Trace(err)
}

// Now that we have access to the latest topics' information,
// we need to update it here immediately.
for topic, detail := range topicMetaList {
m.tryUpdatePartitionsAndLogging(topic, detail.NumPartitions)
}

detail, targetTopicFound := topicMetaList[topicName]
if targetTopicFound {
log.Info(
"topic already exists and the cached information has expired",
zap.String("topic", topicName),
)
return detail.NumPartitions, nil
}

if !m.cfg.AutoCreate {
return 0, cerror.ErrKafkaInvalidConfig.GenWithStack(
fmt.Sprintf("`auto-create-topic` is false, "+
"and %s not found", topicName))
}

start := time.Now()
err = m.admin.CreateTopic(ctx, &kafka.TopicDetail{
err := m.admin.CreateTopic(ctx, &kafka.TopicDetail{
Name: topicName,
NumPartitions: m.cfg.PartitionNum,
ReplicationFactor: m.cfg.ReplicationFactor,
}, false)
if err != nil {
log.Error(
"Kafka admin client create the topic failed",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.String("topic", topicName),
zap.Int32("partitionNumber", m.cfg.PartitionNum),
zap.Int16("replicationFactor", m.cfg.ReplicationFactor),
Expand All @@ -265,6 +254,8 @@ func (m *kafkaTopicManager) createTopic(

log.Info(
"Kafka admin client create the topic success",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.String("topic", topicName),
zap.Int32("partitionNumber", m.cfg.PartitionNum),
zap.Int16("replicationFactor", m.cfg.ReplicationFactor),
Expand All @@ -280,6 +271,16 @@ func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible(
ctx context.Context,
topicName string,
) (int32, error) {
// If the topic is not in the cache, we try to get the metadata of the topic.
topicDetails, err := m.admin.GetTopicsMeta(ctx, []string{topicName}, true)
if err != nil {
return 0, errors.Trace(err)
}
if detail, ok := topicDetails[topicName]; ok {
m.tryUpdatePartitionsAndLogging(topicName, detail.NumPartitions)
return detail.NumPartitions, nil
}

partitionNum, err := m.createTopic(ctx, topicName)
if err != nil {
return 0, errors.Trace(err)
Expand All @@ -292,3 +293,8 @@ func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible(

return partitionNum, nil
}

// Close exits the background goroutine.
func (m *kafkaTopicManager) Close() {
m.cancel()
}
Loading

0 comments on commit 8c3a94b

Please sign in to comment.