From 5c1b3b82e22ff1bd78470b559af7cc9358c9ffdf Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Thu, 1 Jun 2023 16:45:20 +0800 Subject: [PATCH 1/9] Resolve conflicts Signed-off-by: hi-rustin --- cdc/sink/mq/manager/kafka_manager.go | 168 ++++++++++------------ cdc/sink/mq/manager/kafka_manager_test.go | 65 +++------ cdc/sink/mq/manager/manager.go | 2 + cdc/sink/mq/mq.go | 2 +- cdc/sink/mq/producer/kafka/kafka.go | 60 ++++++-- cdc/sinkv2/ddlsink/mq/kafka_ddl_sink.go | 2 +- cdc/sinkv2/eventsink/mq/kafka_dml_sink.go | 2 +- cdc/sinkv2/util/helper.go | 6 +- pkg/sink/kafka/cluster_admin_client.go | 2 - 9 files changed, 147 insertions(+), 162 deletions(-) diff --git a/cdc/sink/mq/manager/kafka_manager.go b/cdc/sink/mq/manager/kafka_manager.go index 8a09b2fd1d0..f5e0827a73a 100644 --- a/cdc/sink/mq/manager/kafka_manager.go +++ b/cdc/sink/mq/manager/kafka_manager.go @@ -23,43 +23,56 @@ import ( "github.com/Shopify/sarama" "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/contextutil" + "github.com/pingcap/tiflow/cdc/model" kafkaconfig "github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/sink/kafka" - "go.uber.org/atomic" "go.uber.org/zap" ) +const ( + // metaRefreshInterval is the interval of refreshing metadata. + // We can't get the metadata too frequently, because it may cause + // the kafka cluster to be overloaded. Especially when there are + // many topics in the cluster or there are many TiCDC changefeeds. + metaRefreshInterval = 10 * time.Minute +) + // kafkaTopicManager is a manager for kafka topics. type kafkaTopicManager struct { - client kafka.Client - admin kafka.ClusterAdminClient + changefeedID model.ChangeFeedID + + admin kafka.ClusterAdminClient cfg *kafkaconfig.AutoCreateTopicConfig topics sync.Map - lastMetadataRefresh atomic.Int64 + metaRefreshTicker *time.Ticker + + // cancel is used to cancel the background goroutine. + cancel context.CancelFunc } // NewKafkaTopicManager creates a new topic manager. func NewKafkaTopicManager( - client kafka.Client, + ctx context.Context, admin kafka.ClusterAdminClient, cfg *kafkaconfig.AutoCreateTopicConfig, ) (*kafkaTopicManager, error) { + changefeedID := contextutil.ChangefeedIDFromCtx(ctx) mgr := &kafkaTopicManager{ - client: client, - admin: admin, - cfg: cfg, + changefeedID: changefeedID, + admin: admin, + cfg: cfg, + metaRefreshTicker: time.NewTicker(metaRefreshInterval), } - // do an initial metadata fetching using ListTopics - err := mgr.listTopics() - if err != nil { - return nil, err - } + ctx, mgr.cancel = context.WithCancel(ctx) + // Background refresh metadata. + go mgr.backgroundRefreshMeta(ctx) return mgr, nil } @@ -67,15 +80,11 @@ func NewKafkaTopicManager( // GetPartitionNum returns the number of partitions of the topic. // It may also try to update the topics' information maintained by manager. func (m *kafkaTopicManager) GetPartitionNum(topic string) (int32, error) { - err := m.tryRefreshMeta() - if err != nil { - return 0, errors.Trace(err) - } - if partitions, ok := m.topics.Load(topic); ok { return partitions.(int32), nil } + // If the topic is not in the metadata, we try to create the topic. partitionNum, err := m.CreateTopicAndWaitUntilVisible(topic) if err != nil { return 0, errors.Trace(err) @@ -84,25 +93,33 @@ func (m *kafkaTopicManager) GetPartitionNum(topic string) (int32, error) { return partitionNum, nil } -// tryRefreshMeta try to refresh the topics' information maintained by manager. -func (m *kafkaTopicManager) tryRefreshMeta() error { - if time.Since(time.Unix(m.lastMetadataRefresh.Load(), 0)) > time.Minute { - topics, err := m.client.Topics() - if err != nil { - return err - } - - for _, topic := range topics { - partitions, err := m.client.Partitions(topic) +func (m *kafkaTopicManager) backgroundRefreshMeta(ctx context.Context) { + for { + select { + case <-ctx.Done(): + log.Info("Background refresh Kafka metadata goroutine exit.", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + ) + return + case <-m.metaRefreshTicker.C: + topicMetaList, err := m.getMetadataOfTopics() + // We ignore the error here, because the error may be caused by the + // network problem, and we can try to get the metadata next time. if err != nil { - return err + log.Warn("Get metadata of topics failed", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Error(err)) + } + + for _, detail := range topicMetaList { + partitionNum := int32(len(detail.Partitions)) + m.tryUpdatePartitionsAndLogging(detail.Name, partitionNum) } - m.tryUpdatePartitionsAndLogging(topic, int32(len(partitions))) + } - m.lastMetadataRefresh.Store(time.Now().Unix()) } - - return nil } // tryUpdatePartitionsAndLogging try to update the partitions of the topic. @@ -204,70 +221,9 @@ func (m *kafkaTopicManager) waitUntilTopicVisible(topicName string) error { return err } -// listTopics is used to do an initial metadata fetching. -func (m *kafkaTopicManager) listTopics() error { - start := time.Now() - topics, err := m.admin.ListTopics() - if err != nil { - log.Error( - "Kafka admin client list topics failed", - zap.Error(err), - zap.Duration("duration", time.Since(start)), - ) - return errors.Trace(err) - } - log.Info( - "Kafka admin client list topics success", - zap.Duration("duration", time.Since(start)), - ) - - // Now that we have access to the latest topics' information, - // we need to update it here immediately. - for topic, detail := range topics { - m.tryUpdatePartitionsAndLogging(topic, detail.NumPartitions) - } - m.lastMetadataRefresh.Store(time.Now().Unix()) - - return nil -} - // createTopic creates a topic with the given name // and returns the number of partitions. func (m *kafkaTopicManager) createTopic(topicName string) (int32, error) { - topicMetaList, err := m.getMetadataOfTopics() - if err != nil { - return 0, errors.Trace(err) - } - - // Now that we have access to the latest topics' information, - // we need to update it here immediately. - targetTopicFound := false - targetTopicPartitionNum := 0 - for _, topic := range topicMetaList { - if topic.Err != sarama.ErrNoError { - log.Error("Kafka admin client fetch topic metadata failed.", - zap.String("topic", topic.Name), - zap.Error(topic.Err)) - continue - } - - if topic.Name == topicName { - targetTopicFound = true - targetTopicPartitionNum = len(topic.Partitions) - } - m.tryUpdatePartitionsAndLogging(topic.Name, int32(len(topic.Partitions))) - } - m.lastMetadataRefresh.Store(time.Now().Unix()) - - // Maybe our cache has expired information, so we just return it. - if targetTopicFound { - log.Info( - "topic already exists and the cached information has expired", - zap.String("topic", topicName), - ) - return int32(targetTopicPartitionNum), nil - } - if !m.cfg.AutoCreate { return 0, cerror.ErrKafkaInvalidConfig.GenWithStack( fmt.Sprintf("`auto-create-topic` is false, "+ @@ -275,7 +231,7 @@ func (m *kafkaTopicManager) createTopic(topicName string) (int32, error) { } start := time.Now() - err = m.admin.CreateTopic(topicName, &sarama.TopicDetail{ + err := m.admin.CreateTopic(topicName, &sarama.TopicDetail{ NumPartitions: m.cfg.PartitionNum, ReplicationFactor: m.cfg.ReplicationFactor, }, false) @@ -306,6 +262,23 @@ func (m *kafkaTopicManager) createTopic(topicName string) (int32, error) { // CreateTopicAndWaitUntilVisible wraps createTopic and waitUntilTopicVisible together. func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible(topicName string) (int32, error) { + // If the topic is not in the cache, we try to get the metadata of the topic. + topicDetails, err := m.admin.DescribeTopics([]string{topicName}) + if err != nil { + return 0, errors.Trace(err) + } + for _, detail := range topicDetails { + if detail.Err == sarama.ErrNoError { + if detail.Name == topicName { + partitionNum := int32(len(detail.Partitions)) + m.tryUpdatePartitionsAndLogging(topicName, partitionNum) + return partitionNum, nil + } + } else if detail.Err != sarama.ErrUnknownTopicOrPartition { + return 0, errors.Trace(err) + } + } + partitionNum, err := m.createTopic(topicName) if err != nil { return 0, errors.Trace(err) @@ -318,3 +291,8 @@ func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible(topicName string) (in return partitionNum, nil } + +// Close exits the background goroutine. +func (m *kafkaTopicManager) Close() { + m.cancel() +} diff --git a/cdc/sink/mq/manager/kafka_manager_test.go b/cdc/sink/mq/manager/kafka_manager_test.go index 70884e21c32..ee267ed7075 100644 --- a/cdc/sink/mq/manager/kafka_manager_test.go +++ b/cdc/sink/mq/manager/kafka_manager_test.go @@ -14,8 +14,8 @@ package manager import ( + "context" "testing" - "time" kafkaconfig "github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka" "github.com/pingcap/tiflow/pkg/sink/kafka" @@ -25,7 +25,6 @@ import ( func TestPartitions(t *testing.T) { t.Parallel() - client := kafka.NewClientMockImpl() adminClient := kafka.NewClusterAdminClientMockImpl() defer func(adminClient *kafka.ClusterAdminClientMockImpl) { _ = adminClient.Close() @@ -36,54 +35,19 @@ func TestPartitions(t *testing.T) { ReplicationFactor: 1, } - manager, err := NewKafkaTopicManager(client, adminClient, cfg) + manager, err := NewKafkaTopicManager(context.TODO(), adminClient, cfg) require.Nil(t, err) + defer manager.Close() partitionsNum, err := manager.GetPartitionNum( kafka.DefaultMockTopicName) require.Nil(t, err) require.Equal(t, int32(3), partitionsNum) } -func TestTryRefreshMeta(t *testing.T) { - t.Parallel() - - client := kafka.NewClientMockImpl() - adminClient := kafka.NewClusterAdminClientMockImpl() - defer func(adminClient *kafka.ClusterAdminClientMockImpl) { - _ = adminClient.Close() - }(adminClient) - cfg := &kafkaconfig.AutoCreateTopicConfig{ - AutoCreate: true, - PartitionNum: 2, - ReplicationFactor: 1, - } - - manager, err := NewKafkaTopicManager(client, adminClient, cfg) - require.Nil(t, err) - partitionsNum, err := manager.GetPartitionNum( - kafka.DefaultMockTopicName) - require.Nil(t, err) - require.Equal(t, int32(3), partitionsNum) - - // Mock create a topic. - client.AddTopic("test", 4) - manager.lastMetadataRefresh.Store(time.Now().Add(-2 * time.Minute).Unix()) - partitionsNum, err = manager.GetPartitionNum("test") - require.Nil(t, err) - require.Equal(t, int32(4), partitionsNum) - - // Mock delete a topic. - // NOTICE: we do not refresh metadata for the deleted topic. - client.DeleteTopic("test") - partitionsNum, err = manager.GetPartitionNum("test") - require.Nil(t, err) - require.Equal(t, int32(4), partitionsNum) -} - func TestCreateTopic(t *testing.T) { t.Parallel() - client := kafka.NewClientMockImpl() + ctx := context.Background() adminClient := kafka.NewClusterAdminClientMockImpl() defer func(adminClient *kafka.ClusterAdminClientMockImpl) { _ = adminClient.Close() @@ -95,13 +59,14 @@ func TestCreateTopic(t *testing.T) { ReplicationFactor: 1, } - manager, err := NewKafkaTopicManager(client, adminClient, cfg) + manager, err := NewKafkaTopicManager(ctx, adminClient, cfg) require.Nil(t, err) - partitionNum, err := manager.createTopic(kafka.DefaultMockTopicName) + defer manager.Close() + partitionNum, err := manager.CreateTopicAndWaitUntilVisible(kafka.DefaultMockTopicName) require.Nil(t, err) require.Equal(t, int32(3), partitionNum) - partitionNum, err = manager.createTopic("new-topic") + partitionNum, err = manager.CreateTopicAndWaitUntilVisible("new-topic") require.Nil(t, err) require.Equal(t, int32(2), partitionNum) partitionsNum, err := manager.GetPartitionNum("new-topic") @@ -110,9 +75,10 @@ func TestCreateTopic(t *testing.T) { // Try to create a topic without auto create. cfg.AutoCreate = false - manager, err = NewKafkaTopicManager(client, adminClient, cfg) + manager, err = NewKafkaTopicManager(ctx, adminClient, cfg) require.Nil(t, err) - _, err = manager.createTopic("new-topic2") + defer manager.Close() + _, err = manager.CreateTopicAndWaitUntilVisible("new-topic2") require.Regexp( t, "`auto-create-topic` is false, and new-topic2 not found", @@ -126,9 +92,10 @@ func TestCreateTopic(t *testing.T) { PartitionNum: 2, ReplicationFactor: 4, } - manager, err = NewKafkaTopicManager(client, adminClient, cfg) + manager, err = NewKafkaTopicManager(ctx, adminClient, cfg) require.Nil(t, err) - _, err = manager.createTopic("new-topic-failed") + defer manager.Close() + _, err = manager.CreateTopicAndWaitUntilVisible("new-topic-failed") require.Regexp( t, "new sarama producer: kafka server: Replication-factor is invalid", @@ -139,7 +106,6 @@ func TestCreateTopic(t *testing.T) { func TestCreateTopicWithDelay(t *testing.T) { t.Parallel() - client := kafka.NewClientMockImpl() adminClient := kafka.NewClusterAdminClientMockImpl() defer func(adminClient *kafka.ClusterAdminClientMockImpl) { _ = adminClient.Close() @@ -150,8 +116,9 @@ func TestCreateTopicWithDelay(t *testing.T) { ReplicationFactor: 1, } - manager, err := NewKafkaTopicManager(client, adminClient, cfg) + manager, err := NewKafkaTopicManager(context.TODO(), adminClient, cfg) require.Nil(t, err) + defer manager.Close() partitionNum, err := manager.createTopic("new_topic") require.Nil(t, err) err = adminClient.SetRemainingFetchesUntilTopicVisible("new_topic", 3) diff --git a/cdc/sink/mq/manager/manager.go b/cdc/sink/mq/manager/manager.go index 5c20a672465..f039aa3f476 100644 --- a/cdc/sink/mq/manager/manager.go +++ b/cdc/sink/mq/manager/manager.go @@ -21,4 +21,6 @@ type TopicManager interface { GetPartitionNum(topic string) (int32, error) // CreateTopicAndWaitUntilVisible creates the topic and wait for the topic completion. CreateTopicAndWaitUntilVisible(topicName string) (int32, error) + // Close closes the topic manager. + Close() } diff --git a/cdc/sink/mq/mq.go b/cdc/sink/mq/mq.go index a093cc650d1..ba55d9d3eb3 100644 --- a/cdc/sink/mq/mq.go +++ b/cdc/sink/mq/mq.go @@ -432,7 +432,7 @@ func NewKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, } topicManager, err := manager.NewKafkaTopicManager( - client, + ctx, adminClient, baseConfig.DeriveTopicConfig(), ) diff --git a/cdc/sink/mq/producer/kafka/kafka.go b/cdc/sink/mq/producer/kafka/kafka.go index 866d7d84172..1b7fd267c15 100644 --- a/cdc/sink/mq/producer/kafka/kafka.go +++ b/cdc/sink/mq/producer/kafka/kafka.go @@ -398,7 +398,7 @@ func kafkaClientID(role, captureAddr string, func AdjustConfig( admin kafka.ClusterAdminClient, config *Config, saramaConfig *sarama.Config, topic string, ) error { - topics, err := admin.ListTopics() + topics, err := admin.DescribeTopics([]string{topic}) if err != nil { return errors.Trace(err) } @@ -408,11 +408,19 @@ func AdjustConfig( return errors.Trace(err) } - info, exists := topics[topic] + var info *sarama.TopicMetadata + var exists bool + for _, t := range topics { + if t.Name == topic { + info = t + exists = true + break + } + } // once we have found the topic, no matter `auto-create-topic`, make sure user input parameters are valid. if exists { // make sure that producer's `MaxMessageBytes` smaller than topic's `max.message.bytes` - topicMaxMessageBytesStr, err := getTopicConfig(admin, info, kafka.TopicMaxMessageBytesConfigName, + topicMaxMessageBytesStr, err := getTopicConfig(admin, info.Name, kafka.TopicMaxMessageBytesConfigName, kafka.BrokerMessageMaxBytesConfigName) if err != nil { return errors.Trace(err) @@ -436,7 +444,7 @@ func AdjustConfig( zap.String("topic", topic), zap.Any("detail", info)) } - if err := config.setPartitionNum(info.NumPartitions); err != nil { + if err := config.setPartitionNum(int32(len(info.Partitions))); err != nil { return errors.Trace(err) } @@ -476,12 +484,20 @@ func AdjustConfig( func validateMinInsyncReplicas( admin kafka.ClusterAdminClient, - topics map[string]sarama.TopicDetail, topic string, replicationFactor int, + topics []*sarama.TopicMetadata, topic string, replicationFactor int, ) error { minInsyncReplicasConfigGetter := func() (string, bool, error) { - info, exists := topics[topic] + var info *sarama.TopicMetadata + var exists bool + for _, t := range topics { + if t.Name == topic { + info = t + exists = true + break + } + } if exists { - minInsyncReplicasStr, err := getTopicConfig(admin, info, + minInsyncReplicasStr, err := getTopicConfig(admin, info.Name, kafka.MinInsyncReplicasConfigName, kafka.MinInsyncReplicasConfigName) if err != nil { @@ -565,10 +581,34 @@ func getBrokerConfig(admin kafka.ClusterAdminClient, brokerConfigName string) (s // getTopicConfig gets topic config by name. // If the topic does not have this configuration, we will try to get it from the broker's configuration. // NOTICE: The configuration names of topic and broker may be different for the same configuration. -func getTopicConfig(admin kafka.ClusterAdminClient, detail sarama.TopicDetail, topicConfigName string, brokerConfigName string) (string, error) { - if a, ok := detail.ConfigEntries[topicConfigName]; ok { - return *a, nil +func getTopicConfig(admin kafka.ClusterAdminClient, topicName string, topicConfigName string, brokerConfigName string) (string, error) { + var configEntries []sarama.ConfigEntry + configEntries, err := admin.DescribeConfig(sarama.ConfigResource{ + Type: sarama.TopicResource, + Name: topicName, + ConfigNames: []string{topicConfigName}, + }) + if err != nil { + log.Warn("Get topic config failed", + zap.String("topicName", topicName), + zap.String("configName", topicConfigName), + zap.Error(err)) } + // For compatibility with KOP, we checked all return values. + // 1. Kafka only returns requested configs. + // 2. Kop returns all configs. + for _, entry := range configEntries { + if entry.Name == topicConfigName { + log.Info("Kafka config item found", + zap.String("topicName", topicName), + zap.String("configName", topicConfigName), + zap.String("configValue", entry.Value)) + return entry.Value, nil + } + } + log.Warn("Kafka config item not found", + zap.String("topicName", topicName), + zap.String("configName", topicConfigName)) return getBrokerConfig(admin, brokerConfigName) } diff --git a/cdc/sinkv2/ddlsink/mq/kafka_ddl_sink.go b/cdc/sinkv2/ddlsink/mq/kafka_ddl_sink.go index bd58836366e..2603965effc 100644 --- a/cdc/sinkv2/ddlsink/mq/kafka_ddl_sink.go +++ b/cdc/sinkv2/ddlsink/mq/kafka_ddl_sink.go @@ -99,9 +99,9 @@ func NewKafkaDDLSink( }() topicManager, err := util.GetTopicManagerAndTryCreateTopic( + ctx, topic, baseConfig.DeriveTopicConfig(), - client, adminClient, ) if err != nil { diff --git a/cdc/sinkv2/eventsink/mq/kafka_dml_sink.go b/cdc/sinkv2/eventsink/mq/kafka_dml_sink.go index 1d6b01c6634..998ef8b3318 100644 --- a/cdc/sinkv2/eventsink/mq/kafka_dml_sink.go +++ b/cdc/sinkv2/eventsink/mq/kafka_dml_sink.go @@ -97,9 +97,9 @@ func NewKafkaDMLSink( }() topicManager, err := util.GetTopicManagerAndTryCreateTopic( + ctx, topic, baseConfig.DeriveTopicConfig(), - client, adminClient, ) if err != nil { diff --git a/cdc/sinkv2/util/helper.go b/cdc/sinkv2/util/helper.go index e041aaee665..161e37326b1 100644 --- a/cdc/sinkv2/util/helper.go +++ b/cdc/sinkv2/util/helper.go @@ -14,10 +14,10 @@ package util import ( + "context" "net/url" "strings" - "github.com/Shopify/sarama" "github.com/pingcap/tiflow/cdc/sink/codec/common" "github.com/pingcap/tiflow/cdc/sink/mq/manager" "github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka" @@ -89,13 +89,13 @@ func GetEncoderConfig( // GetTopicManagerAndTryCreateTopic returns the topic manager and try to create the topic. func GetTopicManagerAndTryCreateTopic( + ctx context.Context, topic string, topicCfg *kafka.AutoCreateTopicConfig, - client sarama.Client, adminClient pkafka.ClusterAdminClient, ) (manager.TopicManager, error) { topicManager, err := manager.NewKafkaTopicManager( - client, + ctx, adminClient, topicCfg, ) diff --git a/pkg/sink/kafka/cluster_admin_client.go b/pkg/sink/kafka/cluster_admin_client.go index bcb13276777..fac44e1e4cd 100644 --- a/pkg/sink/kafka/cluster_admin_client.go +++ b/pkg/sink/kafka/cluster_admin_client.go @@ -20,8 +20,6 @@ import ( // ClusterAdminClient is the administrative client for Kafka, which supports managing and inspecting topics, // brokers, configurations and ACLs. type ClusterAdminClient interface { - // ListTopics list the topics available in the cluster with the default options. - ListTopics() (map[string]sarama.TopicDetail, error) // DescribeCluster gets information about the nodes in the cluster DescribeCluster() (brokers []*sarama.Broker, controllerID int32, err error) // DescribeConfig gets the configuration for the specified resources. From 9ffb54cb1dbe64ce88192db1eecb8e5ad7e5fcff Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Thu, 1 Jun 2023 17:17:54 +0800 Subject: [PATCH 2/9] Fix Signed-off-by: hi-rustin --- cdc/sink/mq/producer/kafka/kafka.go | 29 +++++++++++------------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/cdc/sink/mq/producer/kafka/kafka.go b/cdc/sink/mq/producer/kafka/kafka.go index 1b7fd267c15..cfba06c3f38 100644 --- a/cdc/sink/mq/producer/kafka/kafka.go +++ b/cdc/sink/mq/producer/kafka/kafka.go @@ -402,21 +402,23 @@ func AdjustConfig( if err != nil { return errors.Trace(err) } - - err = validateMinInsyncReplicas(admin, topics, topic, int(config.ReplicationFactor)) - if err != nil { - return errors.Trace(err) - } - var info *sarama.TopicMetadata var exists bool for _, t := range topics { + if t.Err != sarama.ErrNoError { + return errors.Trace(t.Err) + } if t.Name == topic { info = t exists = true break } } + + err = validateMinInsyncReplicas(admin, topic, exists, int(config.ReplicationFactor)) + if err != nil { + return errors.Trace(err) + } // once we have found the topic, no matter `auto-create-topic`, make sure user input parameters are valid. if exists { // make sure that producer's `MaxMessageBytes` smaller than topic's `max.message.bytes` @@ -484,20 +486,11 @@ func AdjustConfig( func validateMinInsyncReplicas( admin kafka.ClusterAdminClient, - topics []*sarama.TopicMetadata, topic string, replicationFactor int, + topic string, topicExists bool, replicationFactor int, ) error { minInsyncReplicasConfigGetter := func() (string, bool, error) { - var info *sarama.TopicMetadata - var exists bool - for _, t := range topics { - if t.Name == topic { - info = t - exists = true - break - } - } - if exists { - minInsyncReplicasStr, err := getTopicConfig(admin, info.Name, + if topicExists { + minInsyncReplicasStr, err := getTopicConfig(admin, topic, kafka.MinInsyncReplicasConfigName, kafka.MinInsyncReplicasConfigName) if err != nil { From 028c7a1c20742f1d6568a8cae43c98c654bd3628 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Thu, 1 Jun 2023 17:33:20 +0800 Subject: [PATCH 3/9] Close Signed-off-by: hi-rustin --- cdc/sink/mq/mq.go | 1 + cdc/sinkv2/ddlsink/mq/mq_ddl_sink.go | 1 + cdc/sinkv2/eventsink/mq/mq_dml_sink.go | 4 ++++ 3 files changed, 6 insertions(+) diff --git a/cdc/sink/mq/mq.go b/cdc/sink/mq/mq.go index ba55d9d3eb3..b2ee18037ce 100644 --- a/cdc/sink/mq/mq.go +++ b/cdc/sink/mq/mq.go @@ -327,6 +327,7 @@ func (k *mqSink) Close(_ context.Context) error { // NOTICE: We must close the resolved buffer before closing the flush worker. // Otherwise, bgFlushTs method will panic. k.flushWorker.close() + k.topicManager.Close() // We need to close it asynchronously. // Otherwise, we might get stuck with it in an unhealthy state of kafka. go k.mqProducer.Close() diff --git a/cdc/sinkv2/ddlsink/mq/mq_ddl_sink.go b/cdc/sinkv2/ddlsink/mq/mq_ddl_sink.go index 6abdeadda8c..e1097f9f7b5 100644 --- a/cdc/sinkv2/ddlsink/mq/mq_ddl_sink.go +++ b/cdc/sinkv2/ddlsink/mq/mq_ddl_sink.go @@ -172,6 +172,7 @@ func (k *ddlSink) WriteCheckpointTs(ctx context.Context, } func (k *ddlSink) Close() error { + k.topicManager.Close() k.producer.Close() return nil } diff --git a/cdc/sinkv2/eventsink/mq/mq_dml_sink.go b/cdc/sinkv2/eventsink/mq/mq_dml_sink.go index 04260782672..b66ddd5d28a 100644 --- a/cdc/sinkv2/eventsink/mq/mq_dml_sink.go +++ b/cdc/sinkv2/eventsink/mq/mq_dml_sink.go @@ -157,6 +157,10 @@ func (s *dmlSink) Close() { s.cancel() } s.wg.Wait() + + s.alive.RLock() + s.alive.topicManager.Close() + s.alive.RUnlock() } // Dead checks whether it's dead or not. From 3790ff8e509c5d5aeb5837f22961f1d1a4b31c63 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Thu, 1 Jun 2023 17:50:24 +0800 Subject: [PATCH 4/9] Fix tests Signed-off-by: hi-rustin --- cdc/sink/mq/producer/kafka/kafka.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/cdc/sink/mq/producer/kafka/kafka.go b/cdc/sink/mq/producer/kafka/kafka.go index cfba06c3f38..8fc5ffe9a69 100644 --- a/cdc/sink/mq/producer/kafka/kafka.go +++ b/cdc/sink/mq/producer/kafka/kafka.go @@ -405,14 +405,15 @@ func AdjustConfig( var info *sarama.TopicMetadata var exists bool for _, t := range topics { - if t.Err != sarama.ErrNoError { + if t.Err == sarama.ErrNoError { + if t.Name == topic { + info = t + exists = true + break + } + } else if t.Err != sarama.ErrUnknownTopicOrPartition { return errors.Trace(t.Err) } - if t.Name == topic { - info = t - exists = true - break - } } err = validateMinInsyncReplicas(admin, topic, exists, int(config.ReplicationFactor)) From 197c563e6a00cee06eaaa5fdf14b1901e0ec97bc Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Fri, 2 Jun 2023 14:38:15 +0800 Subject: [PATCH 5/9] Fix test Signed-off-by: hi-rustin --- .../kafka/cluster_admin_client_mock_impl.go | 34 ++++++++++++++++--- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/pkg/sink/kafka/cluster_admin_client_mock_impl.go b/pkg/sink/kafka/cluster_admin_client_mock_impl.go index e1ebdd0887a..5a5735630e8 100644 --- a/pkg/sink/kafka/cluster_admin_client_mock_impl.go +++ b/pkg/sink/kafka/cluster_admin_client_mock_impl.go @@ -61,6 +61,7 @@ type ClusterAdminClientMockImpl struct { topics map[string]*topicDetail // Cluster controller ID. controllerID int32 + topicConfigs map[string]map[string]string brokerConfigs []sarama.ConfigEntry } @@ -89,10 +90,16 @@ func NewClusterAdminClientMockImpl() *ClusterAdminClientMockImpl { }, } + topicConfigs := make(map[string]map[string]string) + topicConfigs[DefaultMockTopicName] = make(map[string]string) + topicConfigs[DefaultMockTopicName][TopicMaxMessageBytesConfigName] = TopicMaxMessageBytes + topicConfigs[DefaultMockTopicName][MinInsyncReplicasConfigName] = MinInSyncReplicas + return &ClusterAdminClientMockImpl{ topics: topics, controllerID: defaultMockControllerID, brokerConfigs: brokerConfigs, + topicConfigs: topicConfigs, } } @@ -113,10 +120,27 @@ func (c *ClusterAdminClientMockImpl) DescribeCluster() (brokers []*sarama.Broker // DescribeConfig return brokerConfigs directly. func (c *ClusterAdminClientMockImpl) DescribeConfig(resource sarama.ConfigResource) ([]sarama.ConfigEntry, error) { var result []sarama.ConfigEntry - for _, name := range resource.ConfigNames { - for _, config := range c.brokerConfigs { - if name == config.Name { - result = append(result, config) + if resource.Type == sarama.TopicResource { + if _, ok := c.topics[resource.Name]; !ok { + return nil, fmt.Errorf("topic %s not found", resource.Name) + } + for _, name := range resource.ConfigNames { + for n, config := range c.topicConfigs[resource.Name] { + if name == n { + result = append(result, sarama.ConfigEntry{ + Name: n, + Value: config, + }) + } + } + } + + } else if resource.Type == sarama.BrokerResource { + for _, name := range resource.ConfigNames { + for _, config := range c.brokerConfigs { + if name == config.Name { + result = append(result, config) + } } } } @@ -207,7 +231,7 @@ func (c *ClusterAdminClientMockImpl) Close() error { // SetMinInsyncReplicas sets the MinInsyncReplicas for broker and default topic. func (c *ClusterAdminClientMockImpl) SetMinInsyncReplicas(minInsyncReplicas string) { - c.topics[DefaultMockTopicName].ConfigEntries[MinInsyncReplicasConfigName] = &minInsyncReplicas + c.topicConfigs[DefaultMockTopicName][MinInsyncReplicasConfigName] = minInsyncReplicas for i, config := range c.brokerConfigs { if config.Name == MinInsyncReplicasConfigName { From 32d33f5b201ee710007b89cd5bb6e96db43f236b Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Fri, 2 Jun 2023 15:21:17 +0800 Subject: [PATCH 6/9] Add comment Signed-off-by: hi-rustin --- cdc/sink/mq/manager/kafka_manager.go | 32 +++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/cdc/sink/mq/manager/kafka_manager.go b/cdc/sink/mq/manager/kafka_manager.go index f5e0827a73a..beb3ba60254 100644 --- a/cdc/sink/mq/manager/kafka_manager.go +++ b/cdc/sink/mq/manager/kafka_manager.go @@ -130,6 +130,8 @@ func (m *kafkaTopicManager) tryUpdatePartitionsAndLogging(topic string, partitio m.topics.Store(topic, partitions) log.Info( "update topic partition number", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), zap.String("topic", topic), zap.Int32("oldPartitionNumber", oldPartitions.(int32)), zap.Int32("newPartitionNumber", partitions), @@ -139,6 +141,8 @@ func (m *kafkaTopicManager) tryUpdatePartitionsAndLogging(topic string, partitio m.topics.Store(topic, partitions) log.Info( "store topic partition number", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), zap.String("topic", topic), zap.Int32("partitionNumber", partitions), ) @@ -160,6 +164,8 @@ func (m *kafkaTopicManager) getMetadataOfTopics() ([]*sarama.TopicMetadata, erro if err != nil { log.Warn( "Kafka admin client describe topics failed", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), zap.Error(err), zap.Duration("duration", time.Since(start)), ) @@ -168,6 +174,8 @@ func (m *kafkaTopicManager) getMetadataOfTopics() ([]*sarama.TopicMetadata, erro log.Info( "Kafka admin client describe topics success", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), zap.Duration("duration", time.Since(start))) return topicMetaList, nil @@ -184,6 +192,8 @@ func (m *kafkaTopicManager) waitUntilTopicVisible(topicName string) error { topicMetaList, err := m.admin.DescribeTopics([]string{topicName}) if err != nil { log.Error("Kafka admin client describe topic failed", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), zap.String("topic", topicName), zap.Error(err), zap.Duration("duration", time.Since(start))) @@ -192,6 +202,8 @@ func (m *kafkaTopicManager) waitUntilTopicVisible(topicName string) error { if len(topicMetaList) != 1 { log.Error("topic metadata length is wrong.", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), zap.String("topic", topicName), zap.Int("expected", 1), zap.Int("actual", len(topicMetaList))) @@ -202,12 +214,16 @@ func (m *kafkaTopicManager) waitUntilTopicVisible(topicName string) error { meta := topicMetaList[0] if meta.Err != sarama.ErrNoError { log.Error("topic metadata is fetched with error", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), zap.String("topic", topicName), zap.Error(meta.Err)) return meta.Err } log.Info("Kafka admin client describe topic success", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), zap.String("topic", topicName), zap.Int("partitionNumber", len(meta.Partitions)), zap.Duration("duration", time.Since(start))) @@ -239,6 +255,8 @@ func (m *kafkaTopicManager) createTopic(topicName string) (int32, error) { if err != nil && !strings.Contains(err.Error(), sarama.ErrTopicAlreadyExists.Error()) { log.Error( "Kafka admin client create the topic failed", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), zap.String("topic", topicName), zap.Int32("partitionNumber", m.cfg.PartitionNum), zap.Int16("replicationFactor", m.cfg.ReplicationFactor), @@ -250,6 +268,8 @@ func (m *kafkaTopicManager) createTopic(topicName string) (int32, error) { log.Info( "Kafka admin client create the topic success", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), zap.String("topic", topicName), zap.Int32("partitionNumber", m.cfg.PartitionNum), zap.Int16("replicationFactor", m.cfg.ReplicationFactor), @@ -270,11 +290,21 @@ func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible(topicName string) (in for _, detail := range topicDetails { if detail.Err == sarama.ErrNoError { if detail.Name == topicName { + log.Info("Kafka topic already exists", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.String("topic", topicName), + zap.Int32("partitionNumber", int32(len(detail.Partitions)))) partitionNum := int32(len(detail.Partitions)) m.tryUpdatePartitionsAndLogging(topicName, partitionNum) return partitionNum, nil } - } else if detail.Err != sarama.ErrUnknownTopicOrPartition { + } else { + log.Error("Kafka admin client describe topic failed", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.String("topic", topicName), + zap.Error(err)) return 0, errors.Trace(err) } } From d767aeb58e3fd0411b64e11e023a5e1502d4c9be Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Fri, 2 Jun 2023 15:23:38 +0800 Subject: [PATCH 7/9] Add log Signed-off-by: hi-rustin --- cdc/sink/mq/producer/kafka/kafka.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cdc/sink/mq/producer/kafka/kafka.go b/cdc/sink/mq/producer/kafka/kafka.go index 8fc5ffe9a69..41410e09b1a 100644 --- a/cdc/sink/mq/producer/kafka/kafka.go +++ b/cdc/sink/mq/producer/kafka/kafka.go @@ -409,9 +409,11 @@ func AdjustConfig( if t.Name == topic { info = t exists = true + log.Info("topic already exists", zap.String("topic", topic)) break } } else if t.Err != sarama.ErrUnknownTopicOrPartition { + log.Warn("failed to describe topic", zap.String("topic", topic), zap.Error(t.Err)) return errors.Trace(t.Err) } } From ff6fa2dea00d274adeef7de52b90385ba9f39071 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Fri, 2 Jun 2023 15:33:49 +0800 Subject: [PATCH 8/9] Remove useless code Signed-off-by: hi-rustin --- pkg/sink/kafka/cluster_admin_client_mock_impl.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/pkg/sink/kafka/cluster_admin_client_mock_impl.go b/pkg/sink/kafka/cluster_admin_client_mock_impl.go index 5a5735630e8..b80ee60cfd6 100644 --- a/pkg/sink/kafka/cluster_admin_client_mock_impl.go +++ b/pkg/sink/kafka/cluster_admin_client_mock_impl.go @@ -103,15 +103,6 @@ func NewClusterAdminClientMockImpl() *ClusterAdminClientMockImpl { } } -// ListTopics returns all topics directly. -func (c *ClusterAdminClientMockImpl) ListTopics() (map[string]sarama.TopicDetail, error) { - topicsDetailsMap := make(map[string]sarama.TopicDetail) - for topic, detail := range c.topics { - topicsDetailsMap[topic] = detail.TopicDetail - } - return topicsDetailsMap, nil -} - // DescribeCluster returns the controller ID. func (c *ClusterAdminClientMockImpl) DescribeCluster() (brokers []*sarama.Broker, controllerID int32, err error) { return nil, c.controllerID, nil From 1854e38c4054af86bc01451fdf6f8fd40aada919 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Fri, 2 Jun 2023 15:54:21 +0800 Subject: [PATCH 9/9] Fix bug Signed-off-by: hi-rustin --- cdc/sink/mq/manager/kafka_manager.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cdc/sink/mq/manager/kafka_manager.go b/cdc/sink/mq/manager/kafka_manager.go index beb3ba60254..0b19f011461 100644 --- a/cdc/sink/mq/manager/kafka_manager.go +++ b/cdc/sink/mq/manager/kafka_manager.go @@ -299,13 +299,13 @@ func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible(topicName string) (in m.tryUpdatePartitionsAndLogging(topicName, partitionNum) return partitionNum, nil } - } else { + } else if detail.Err != sarama.ErrUnknownTopicOrPartition { log.Error("Kafka admin client describe topic failed", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), zap.String("topic", topicName), - zap.Error(err)) - return 0, errors.Trace(err) + zap.Error(detail.Err)) + return 0, errors.Trace(detail.Err) } }