diff --git a/cdc/sink/mq/manager/kafka_manager.go b/cdc/sink/mq/manager/kafka_manager.go index 138796833b1..f57b010029b 100644 --- a/cdc/sink/mq/manager/kafka_manager.go +++ b/cdc/sink/mq/manager/kafka_manager.go @@ -23,43 +23,56 @@ import ( "github.com/Shopify/sarama" "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/contextutil" + "github.com/pingcap/tiflow/cdc/model" kafkaconfig "github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/kafka" "github.com/pingcap/tiflow/pkg/retry" - "go.uber.org/atomic" "go.uber.org/zap" ) +const ( + // metaRefreshInterval is the interval of refreshing metadata. + // We can't get the metadata too frequently, because it may cause + // the kafka cluster to be overloaded. Especially when there are + // many topics in the cluster or there are many TiCDC changefeeds. + metaRefreshInterval = 10 * time.Minute +) + // kafkaTopicManager is a manager for kafka topics. type kafkaTopicManager struct { - client kafka.Client - admin kafka.ClusterAdminClient + changefeedID model.ChangeFeedID + + admin kafka.ClusterAdminClient cfg *kafkaconfig.AutoCreateTopicConfig topics sync.Map - lastMetadataRefresh atomic.Int64 + metaRefreshTicker *time.Ticker + + // cancel is used to cancel the background goroutine. + cancel context.CancelFunc } // NewKafkaTopicManager creates a new topic manager. func NewKafkaTopicManager( - client kafka.Client, + ctx context.Context, admin kafka.ClusterAdminClient, cfg *kafkaconfig.AutoCreateTopicConfig, ) (*kafkaTopicManager, error) { + changefeedID := contextutil.ChangefeedIDFromCtx(ctx) mgr := &kafkaTopicManager{ - client: client, - admin: admin, - cfg: cfg, + changefeedID: changefeedID, + admin: admin, + cfg: cfg, + metaRefreshTicker: time.NewTicker(metaRefreshInterval), } - // do an initial metadata fetching using ListTopics - err := mgr.listTopics() - if err != nil { - return nil, err - } + ctx, mgr.cancel = context.WithCancel(ctx) + // Background refresh metadata. + go mgr.backgroundRefreshMeta(ctx) return mgr, nil } @@ -67,15 +80,11 @@ func NewKafkaTopicManager( // GetPartitionNum returns the number of partitions of the topic. // It may also try to update the topics' information maintained by manager. func (m *kafkaTopicManager) GetPartitionNum(topic string) (int32, error) { - err := m.tryRefreshMeta() - if err != nil { - return 0, errors.Trace(err) - } - if partitions, ok := m.topics.Load(topic); ok { return partitions.(int32), nil } + // If the topic is not in the metadata, we try to create the topic. partitionNum, err := m.CreateTopicAndWaitUntilVisible(topic) if err != nil { return 0, errors.Trace(err) @@ -84,25 +93,33 @@ func (m *kafkaTopicManager) GetPartitionNum(topic string) (int32, error) { return partitionNum, nil } -// tryRefreshMeta try to refresh the topics' information maintained by manager. -func (m *kafkaTopicManager) tryRefreshMeta() error { - if time.Since(time.Unix(m.lastMetadataRefresh.Load(), 0)) > time.Minute { - topics, err := m.client.Topics() - if err != nil { - return err - } - - for _, topic := range topics { - partitions, err := m.client.Partitions(topic) +func (m *kafkaTopicManager) backgroundRefreshMeta(ctx context.Context) { + for { + select { + case <-ctx.Done(): + log.Info("Background refresh Kafka metadata goroutine exit.", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + ) + return + case <-m.metaRefreshTicker.C: + topicMetaList, err := m.getMetadataOfTopics() + // We ignore the error here, because the error may be caused by the + // network problem, and we can try to get the metadata next time. if err != nil { - return err + log.Warn("Get metadata of topics failed", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Error(err)) } - m.tryUpdatePartitionsAndLogging(topic, int32(len(partitions))) + + for _, detail := range topicMetaList { + partitionNum := int32(len(detail.Partitions)) + m.tryUpdatePartitionsAndLogging(detail.Name, partitionNum) + } + } - m.lastMetadataRefresh.Store(time.Now().Unix()) } - - return nil } // tryUpdatePartitionsAndLogging try to update the partitions of the topic. @@ -113,6 +130,8 @@ func (m *kafkaTopicManager) tryUpdatePartitionsAndLogging(topic string, partitio m.topics.Store(topic, partitions) log.Info( "update topic partition number", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), zap.String("topic", topic), zap.Int32("oldPartitionNumber", oldPartitions.(int32)), zap.Int32("newPartitionNumber", partitions), @@ -122,6 +141,8 @@ func (m *kafkaTopicManager) tryUpdatePartitionsAndLogging(topic string, partitio m.topics.Store(topic, partitions) log.Info( "store topic partition number", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), zap.String("topic", topic), zap.Int32("partitionNumber", partitions), ) @@ -143,13 +164,18 @@ func (m *kafkaTopicManager) getMetadataOfTopics() ([]*sarama.TopicMetadata, erro if err != nil { log.Warn( "Kafka admin client describe topics failed", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), zap.Error(err), zap.Duration("duration", time.Since(start)), ) return nil, err } + log.Info( "Kafka admin client describe topics success", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), zap.Duration("duration", time.Since(start))) return topicMetaList, nil @@ -166,6 +192,8 @@ func (m *kafkaTopicManager) waitUntilTopicVisible(topicName string) error { topicMetaList, err := m.admin.DescribeTopics([]string{topicName}) if err != nil { log.Error("Kafka admin client describe topic failed", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), zap.String("topic", topicName), zap.Error(err), zap.Duration("duration", time.Since(start))) @@ -174,6 +202,8 @@ func (m *kafkaTopicManager) waitUntilTopicVisible(topicName string) error { if len(topicMetaList) != 1 { log.Error("topic metadata length is wrong.", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), zap.String("topic", topicName), zap.Int("expected", 1), zap.Int("actual", len(topicMetaList))) @@ -184,12 +214,16 @@ func (m *kafkaTopicManager) waitUntilTopicVisible(topicName string) error { meta := topicMetaList[0] if meta.Err != sarama.ErrNoError { log.Error("topic metadata is fetched with error", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), zap.String("topic", topicName), zap.Error(meta.Err)) return meta.Err } log.Info("Kafka admin client describe topic success", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), zap.String("topic", topicName), zap.Int("partitionNumber", len(meta.Partitions)), zap.Duration("duration", time.Since(start))) @@ -203,70 +237,9 @@ func (m *kafkaTopicManager) waitUntilTopicVisible(topicName string) error { return err } -// listTopics is used to do an initial metadata fetching. -func (m *kafkaTopicManager) listTopics() error { - start := time.Now() - topics, err := m.admin.ListTopics() - if err != nil { - log.Error( - "Kafka admin client list topics failed", - zap.Error(err), - zap.Duration("duration", time.Since(start)), - ) - return errors.Trace(err) - } - log.Info( - "Kafka admin client list topics success", - zap.Duration("duration", time.Since(start)), - ) - - // Now that we have access to the latest topics' information, - // we need to update it here immediately. - for topic, detail := range topics { - m.tryUpdatePartitionsAndLogging(topic, detail.NumPartitions) - } - m.lastMetadataRefresh.Store(time.Now().Unix()) - - return nil -} - // createTopic creates a topic with the given name // and returns the number of partitions. func (m *kafkaTopicManager) createTopic(topicName string) (int32, error) { - topicMetaList, err := m.getMetadataOfTopics() - if err != nil { - return 0, errors.Trace(err) - } - - // Now that we have access to the latest topics' information, - // we need to update it here immediately. - targetTopicFound := false - targetTopicPartitionNum := 0 - for _, topic := range topicMetaList { - if topic.Err != sarama.ErrNoError { - log.Error("Kafka admin client fetch topic metadata failed.", - zap.String("topic", topic.Name), - zap.Error(topic.Err)) - continue - } - - if topic.Name == topicName { - targetTopicFound = true - targetTopicPartitionNum = len(topic.Partitions) - } - m.tryUpdatePartitionsAndLogging(topic.Name, int32(len(topic.Partitions))) - } - m.lastMetadataRefresh.Store(time.Now().Unix()) - - // Maybe our cache has expired information, so we just return it. - if targetTopicFound { - log.Info( - "topic already exists and the cached information has expired", - zap.String("topic", topicName), - ) - return int32(targetTopicPartitionNum), nil - } - if !m.cfg.AutoCreate { return 0, cerror.ErrKafkaInvalidConfig.GenWithStack( fmt.Sprintf("`auto-create-topic` is false, "+ @@ -274,7 +247,7 @@ func (m *kafkaTopicManager) createTopic(topicName string) (int32, error) { } start := time.Now() - err = m.admin.CreateTopic(topicName, &sarama.TopicDetail{ + err := m.admin.CreateTopic(topicName, &sarama.TopicDetail{ NumPartitions: m.cfg.PartitionNum, ReplicationFactor: m.cfg.ReplicationFactor, }, false) @@ -282,6 +255,8 @@ func (m *kafkaTopicManager) createTopic(topicName string) (int32, error) { if err != nil && !strings.Contains(err.Error(), sarama.ErrTopicAlreadyExists.Error()) { log.Error( "Kafka admin client create the topic failed", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), zap.String("topic", topicName), zap.Int32("partitionNumber", m.cfg.PartitionNum), zap.Int16("replicationFactor", m.cfg.ReplicationFactor), @@ -293,6 +268,8 @@ func (m *kafkaTopicManager) createTopic(topicName string) (int32, error) { log.Info( "Kafka admin client create the topic success", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), zap.String("topic", topicName), zap.Int32("partitionNumber", m.cfg.PartitionNum), zap.Int16("replicationFactor", m.cfg.ReplicationFactor), @@ -305,6 +282,33 @@ func (m *kafkaTopicManager) createTopic(topicName string) (int32, error) { // CreateTopicAndWaitUntilVisible wraps createTopic and waitUntilTopicVisible together. func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible(topicName string) (int32, error) { + // If the topic is not in the cache, we try to get the metadata of the topic. + topicDetails, err := m.admin.DescribeTopics([]string{topicName}) + if err != nil { + return 0, errors.Trace(err) + } + for _, detail := range topicDetails { + if detail.Err == sarama.ErrNoError { + if detail.Name == topicName { + log.Info("Kafka topic already exists", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.String("topic", topicName), + zap.Int32("partitionNumber", int32(len(detail.Partitions)))) + partitionNum := int32(len(detail.Partitions)) + m.tryUpdatePartitionsAndLogging(topicName, partitionNum) + return partitionNum, nil + } + } else if detail.Err != sarama.ErrUnknownTopicOrPartition { + log.Error("Kafka admin client describe topic failed", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.String("topic", topicName), + zap.Error(detail.Err)) + return 0, errors.Trace(detail.Err) + } + } + partitionNum, err := m.createTopic(topicName) if err != nil { return 0, errors.Trace(err) @@ -317,3 +321,8 @@ func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible(topicName string) (in return partitionNum, nil } + +// Close exits the background goroutine. +func (m *kafkaTopicManager) Close() { + m.cancel() +} diff --git a/cdc/sink/mq/manager/kafka_manager_test.go b/cdc/sink/mq/manager/kafka_manager_test.go index 92d8af53c4e..55296bc1e38 100644 --- a/cdc/sink/mq/manager/kafka_manager_test.go +++ b/cdc/sink/mq/manager/kafka_manager_test.go @@ -14,20 +14,19 @@ package manager import ( + "context" "testing" - "time" kafkaconfig "github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka" - kafkamock "github.com/pingcap/tiflow/pkg/kafka" + "github.com/pingcap/tiflow/pkg/kafka" "github.com/stretchr/testify/require" ) func TestPartitions(t *testing.T) { t.Parallel() - client := kafkamock.NewClientMockImpl() - adminClient := kafkamock.NewClusterAdminClientMockImpl() - defer func(adminClient *kafkamock.ClusterAdminClientMockImpl) { + adminClient := kafka.NewClusterAdminClientMockImpl() + defer func(adminClient *kafka.ClusterAdminClientMockImpl) { _ = adminClient.Close() }(adminClient) cfg := &kafkaconfig.AutoCreateTopicConfig{ @@ -36,56 +35,21 @@ func TestPartitions(t *testing.T) { ReplicationFactor: 1, } - manager, err := NewKafkaTopicManager(client, adminClient, cfg) + manager, err := NewKafkaTopicManager(context.TODO(), adminClient, cfg) require.Nil(t, err) + defer manager.Close() partitionsNum, err := manager.GetPartitionNum( - kafkamock.DefaultMockTopicName) + kafka.DefaultMockTopicName) require.Nil(t, err) require.Equal(t, int32(3), partitionsNum) } -func TestTryRefreshMeta(t *testing.T) { - t.Parallel() - - client := kafkamock.NewClientMockImpl() - adminClient := kafkamock.NewClusterAdminClientMockImpl() - defer func(adminClient *kafkamock.ClusterAdminClientMockImpl) { - _ = adminClient.Close() - }(adminClient) - cfg := &kafkaconfig.AutoCreateTopicConfig{ - AutoCreate: true, - PartitionNum: 2, - ReplicationFactor: 1, - } - - manager, err := NewKafkaTopicManager(client, adminClient, cfg) - require.Nil(t, err) - partitionsNum, err := manager.GetPartitionNum( - kafkamock.DefaultMockTopicName) - require.Nil(t, err) - require.Equal(t, int32(3), partitionsNum) - - // Mock create a topic. - client.AddTopic("test", 4) - manager.lastMetadataRefresh.Store(time.Now().Add(-2 * time.Minute).Unix()) - partitionsNum, err = manager.GetPartitionNum("test") - require.Nil(t, err) - require.Equal(t, int32(4), partitionsNum) - - // Mock delete a topic. - // NOTICE: we do not refresh metadata for the deleted topic. - client.DeleteTopic("test") - partitionsNum, err = manager.GetPartitionNum("test") - require.Nil(t, err) - require.Equal(t, int32(4), partitionsNum) -} - func TestCreateTopic(t *testing.T) { t.Parallel() - client := kafkamock.NewClientMockImpl() - adminClient := kafkamock.NewClusterAdminClientMockImpl() - defer func(adminClient *kafkamock.ClusterAdminClientMockImpl) { + ctx := context.Background() + adminClient := kafka.NewClusterAdminClientMockImpl() + defer func(adminClient *kafka.ClusterAdminClientMockImpl) { _ = adminClient.Close() }(adminClient) @@ -95,13 +59,14 @@ func TestCreateTopic(t *testing.T) { ReplicationFactor: 1, } - manager, err := NewKafkaTopicManager(client, adminClient, cfg) + manager, err := NewKafkaTopicManager(ctx, adminClient, cfg) require.Nil(t, err) - partitionNum, err := manager.createTopic(kafkamock.DefaultMockTopicName) + defer manager.Close() + partitionNum, err := manager.CreateTopicAndWaitUntilVisible(kafka.DefaultMockTopicName) require.Nil(t, err) require.Equal(t, int32(3), partitionNum) - partitionNum, err = manager.createTopic("new-topic") + partitionNum, err = manager.CreateTopicAndWaitUntilVisible("new-topic") require.Nil(t, err) require.Equal(t, int32(2), partitionNum) partitionsNum, err := manager.GetPartitionNum("new-topic") @@ -110,9 +75,10 @@ func TestCreateTopic(t *testing.T) { // Try to create a topic without auto create. cfg.AutoCreate = false - manager, err = NewKafkaTopicManager(client, adminClient, cfg) + manager, err = NewKafkaTopicManager(ctx, adminClient, cfg) require.Nil(t, err) - _, err = manager.createTopic("new-topic2") + defer manager.Close() + _, err = manager.CreateTopicAndWaitUntilVisible("new-topic2") require.Regexp( t, "`auto-create-topic` is false, and new-topic2 not found", @@ -126,9 +92,10 @@ func TestCreateTopic(t *testing.T) { PartitionNum: 2, ReplicationFactor: 4, } - manager, err = NewKafkaTopicManager(client, adminClient, cfg) + manager, err = NewKafkaTopicManager(ctx, adminClient, cfg) require.Nil(t, err) - _, err = manager.createTopic("new-topic-failed") + defer manager.Close() + _, err = manager.CreateTopicAndWaitUntilVisible("new-topic-failed") require.Regexp( t, "new sarama producer: kafka server: Replication-factor is invalid", @@ -139,9 +106,8 @@ func TestCreateTopic(t *testing.T) { func TestCreateTopicWithDelay(t *testing.T) { t.Parallel() - client := kafkamock.NewClientMockImpl() - adminClient := kafkamock.NewClusterAdminClientMockImpl() - defer func(adminClient *kafkamock.ClusterAdminClientMockImpl) { + adminClient := kafka.NewClusterAdminClientMockImpl() + defer func(adminClient *kafka.ClusterAdminClientMockImpl) { _ = adminClient.Close() }(adminClient) cfg := &kafkaconfig.AutoCreateTopicConfig{ @@ -150,8 +116,9 @@ func TestCreateTopicWithDelay(t *testing.T) { ReplicationFactor: 1, } - manager, err := NewKafkaTopicManager(client, adminClient, cfg) + manager, err := NewKafkaTopicManager(context.TODO(), adminClient, cfg) require.Nil(t, err) + defer manager.Close() partitionNum, err := manager.createTopic("new_topic") require.Nil(t, err) err = adminClient.SetRemainingFetchesUntilTopicVisible("new_topic", 3) diff --git a/cdc/sink/mq/manager/manager.go b/cdc/sink/mq/manager/manager.go index 5c20a672465..f039aa3f476 100644 --- a/cdc/sink/mq/manager/manager.go +++ b/cdc/sink/mq/manager/manager.go @@ -21,4 +21,6 @@ type TopicManager interface { GetPartitionNum(topic string) (int32, error) // CreateTopicAndWaitUntilVisible creates the topic and wait for the topic completion. CreateTopicAndWaitUntilVisible(topicName string) (int32, error) + // Close closes the topic manager. + Close() } diff --git a/cdc/sink/mq/manager/pulsar_manager.go b/cdc/sink/mq/manager/pulsar_manager.go index f01526bf65b..21134f8922f 100644 --- a/cdc/sink/mq/manager/pulsar_manager.go +++ b/cdc/sink/mq/manager/pulsar_manager.go @@ -38,3 +38,7 @@ func (m *pulsarTopicManager) GetPartitionNum(_ string) (int32, error) { func (m *pulsarTopicManager) CreateTopicAndWaitUntilVisible(_ string) (int32, error) { return m.partitionNum, nil } + +// Close do nothing. +func (m *pulsarTopicManager) Close() { +} diff --git a/cdc/sink/mq/mq.go b/cdc/sink/mq/mq.go index aac0ecd6b08..b58f2233545 100644 --- a/cdc/sink/mq/mq.go +++ b/cdc/sink/mq/mq.go @@ -351,6 +351,7 @@ func (k *mqSink) Close(_ context.Context) error { // NOTICE: We must close the resolved buffer before closing the flush worker. // Otherwise, bgFlushTs method will panic. k.flushWorker.close() + k.topicManager.Close() // We need to close it asynchronously. // Otherwise, we might get stuck with it in an unhealthy state of kafka. go k.mqProducer.Close() @@ -407,7 +408,7 @@ func NewKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, } baseConfig := kafka.NewConfig() - if err := baseConfig.Apply(sinkURI); err != nil { + if err := baseConfig.Apply(sinkURI, replicaConfig); err != nil { return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) } @@ -456,7 +457,7 @@ func NewKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, } topicManager, err := manager.NewKafkaTopicManager( - client, + ctx, adminClient, baseConfig.DeriveTopicConfig(), ) diff --git a/cdc/sink/mq/producer/kafka/config.go b/cdc/sink/mq/producer/kafka/config.go index 6a0c2dcc6a3..423866f07d1 100644 --- a/cdc/sink/mq/producer/kafka/config.go +++ b/cdc/sink/mq/producer/kafka/config.go @@ -16,6 +16,7 @@ package kafka import ( "context" "crypto/tls" + "encoding/base64" "net/url" "strconv" "strings" @@ -101,8 +102,8 @@ func (c *Config) setPartitionNum(realPartitionCount int32) error { return nil } -// Apply the sinkURI to update Config -func (c *Config) Apply(sinkURI *url.URL) error { +// Apply the configuration to the sarama producer. +func (c *Config) Apply(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) error { c.BrokerEndpoints = strings.Split(sinkURI.Host, ",") params := sinkURI.Query() s := params.Get("partition-num") @@ -183,7 +184,7 @@ func (c *Config) Apply(sinkURI *url.URL) error { c.ReadTimeout = a } - err := c.applySASL(params) + err := c.applySASL(params, replicaConfig) if err != nil { return err } @@ -245,7 +246,7 @@ func (c *Config) applyTLS(params url.Values) error { return nil } -func (c *Config) applySASL(params url.Values) error { +func (c *Config) applySASL(params url.Values, replicaConfig *config.ReplicaConfig) error { s := params.Get("sasl-user") if s != "" { c.SASL.SASLUser = s @@ -263,6 +264,12 @@ func (c *Config) applySASL(params url.Values) error { return cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) } c.SASL.SASLMechanism = mechanism + } else if replicaConfig != nil && replicaConfig.Sink != nil && replicaConfig.Sink.KafkaConfig != nil && replicaConfig.Sink.KafkaConfig.SASLMechanism != nil { + mechanism, err := security.SASLMechanismFromString(*replicaConfig.Sink.KafkaConfig.SASLMechanism) + if err != nil { + return cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + c.SASL.SASLMechanism = mechanism } s = params.Get("sasl-gssapi-auth-type") @@ -313,6 +320,67 @@ func (c *Config) applySASL(params url.Values) error { c.SASL.GSSAPI.DisablePAFXFAST = disablePAFXFAST } + if replicaConfig.Sink != nil && replicaConfig.Sink.KafkaConfig != nil { + if replicaConfig.Sink.KafkaConfig.SASLOAuthClientID != nil { + clientID := *replicaConfig.Sink.KafkaConfig.SASLOAuthClientID + if clientID == "" { + return cerror.ErrKafkaInvalidConfig.GenWithStack("OAuth2 client ID cannot be empty") + } + c.SASL.OAuth2.ClientID = clientID + } + + if replicaConfig.Sink.KafkaConfig.SASLOAuthClientSecret != nil { + clientSecret := *replicaConfig.Sink.KafkaConfig.SASLOAuthClientSecret + if clientSecret == "" { + return cerror.ErrKafkaInvalidConfig.GenWithStack( + "OAuth2 client secret cannot be empty") + } + + // BASE64 decode the client secret + decodedClientSecret, err := base64.StdEncoding.DecodeString(clientSecret) + if err != nil { + log.Error("OAuth2 client secret is not base64 encoded", zap.Error(err)) + return cerror.ErrKafkaInvalidConfig.GenWithStack( + "OAuth2 client secret is not base64 encoded") + } + c.SASL.OAuth2.ClientSecret = string(decodedClientSecret) + } + + if replicaConfig.Sink.KafkaConfig.SASLOAuthTokenURL != nil { + tokenURL := *replicaConfig.Sink.KafkaConfig.SASLOAuthTokenURL + if tokenURL == "" { + return cerror.ErrKafkaInvalidConfig.GenWithStack( + "OAuth2 token URL cannot be empty") + } + c.SASL.OAuth2.TokenURL = tokenURL + } + + if c.SASL.OAuth2.IsEnable() { + if c.SASL.SASLMechanism != security.OAuthMechanism { + return cerror.ErrKafkaInvalidConfig.GenWithStack( + "OAuth2 is only supported with SASL mechanism type OAUTHBEARER, but got %s", + c.SASL.SASLMechanism) + } + + if err := c.SASL.OAuth2.Validate(); err != nil { + return cerror.ErrKafkaInvalidConfig.Wrap(err) + } + c.SASL.OAuth2.SetDefault() + } + + if replicaConfig.Sink.KafkaConfig.SASLOAuthScopes != nil { + c.SASL.OAuth2.Scopes = replicaConfig.Sink.KafkaConfig.SASLOAuthScopes + } + + if replicaConfig.Sink.KafkaConfig.SASLOAuthGrantType != nil { + c.SASL.OAuth2.GrantType = *replicaConfig.Sink.KafkaConfig.SASLOAuthGrantType + } + + if replicaConfig.Sink.KafkaConfig.SASLOAuthAudience != nil { + c.SASL.OAuth2.Audience = *replicaConfig.Sink.KafkaConfig.SASLOAuthAudience + } + } + return nil } @@ -432,12 +500,14 @@ func NewSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) { } } - completeSaramaSASLConfig(config, c) + if err := completeSaramaSASLConfig(ctx, config, c); err != nil { + return nil, errors.Trace(err) + } return config, err } -func completeSaramaSASLConfig(config *sarama.Config, c *Config) { +func completeSaramaSASLConfig(ctx context.Context, config *sarama.Config, c *Config) error { if c.SASL != nil && c.SASL.SASLMechanism != "" { config.Net.SASL.Enable = true config.Net.SASL.Mechanism = sarama.SASLMechanism(c.SASL.SASLMechanism) @@ -467,6 +537,14 @@ func completeSaramaSASLConfig(config *sarama.Config, c *Config) { case security.KeyTabAuth: config.Net.SASL.GSSAPI.KeyTabPath = c.SASL.GSSAPI.KeyTabPath } + case sarama.SASLTypeOAuth: + p, err := newTokenProvider(ctx, c) + if err != nil { + return errors.Trace(err) + } + config.Net.SASL.TokenProvider = p } + } + return nil } diff --git a/cdc/sink/mq/producer/kafka/config_test.go b/cdc/sink/mq/producer/kafka/config_test.go index b725c46f61b..967a8ea2f5b 100644 --- a/cdc/sink/mq/producer/kafka/config_test.go +++ b/cdc/sink/mq/producer/kafka/config_test.go @@ -106,7 +106,7 @@ func TestConfigTimeouts(t *testing.T) { sinkURI, err := url.Parse(uri) require.Nil(t, err) - err = cfg.Apply(sinkURI) + err = cfg.Apply(sinkURI, config.GetDefaultReplicaConfig()) require.Nil(t, err) require.Equal(t, 5*time.Second, cfg.DialTimeout) @@ -121,6 +121,8 @@ func TestConfigTimeouts(t *testing.T) { } func TestCompleteConfigByOpts(t *testing.T) { + replicaCfg := config.GetDefaultReplicaConfig() + cfg := NewConfig() // Normal config. @@ -132,7 +134,7 @@ func TestCompleteConfigByOpts(t *testing.T) { sinkURI, err := url.Parse(uri) require.Nil(t, err) - err = cfg.Apply(sinkURI) + err = cfg.Apply(sinkURI, replicaCfg) require.Nil(t, err) require.Equal(t, int32(1), cfg.PartitionNum) require.Equal(t, int16(3), cfg.ReplicationFactor) @@ -144,7 +146,7 @@ func TestCompleteConfigByOpts(t *testing.T) { sinkURI, err = url.Parse(uri) require.Nil(t, err) cfg = NewConfig() - err = cfg.Apply(sinkURI) + err = cfg.Apply(sinkURI, replicaCfg) require.Nil(t, err) require.Len(t, cfg.BrokerEndpoints, 3) @@ -153,7 +155,7 @@ func TestCompleteConfigByOpts(t *testing.T) { sinkURI, err = url.Parse(uri) require.Nil(t, err) cfg = NewConfig() - err = cfg.Apply(sinkURI) + err = cfg.Apply(sinkURI, replicaCfg) require.Regexp(t, ".*invalid syntax.*", errors.Cause(err)) // Illegal max-message-bytes. @@ -161,7 +163,7 @@ func TestCompleteConfigByOpts(t *testing.T) { sinkURI, err = url.Parse(uri) require.Nil(t, err) cfg = NewConfig() - err = cfg.Apply(sinkURI) + err = cfg.Apply(sinkURI, replicaCfg) require.Regexp(t, ".*invalid syntax.*", errors.Cause(err)) // Illegal partition-num. @@ -169,7 +171,7 @@ func TestCompleteConfigByOpts(t *testing.T) { sinkURI, err = url.Parse(uri) require.Nil(t, err) cfg = NewConfig() - err = cfg.Apply(sinkURI) + err = cfg.Apply(sinkURI, replicaCfg) require.Regexp(t, ".*invalid syntax.*", errors.Cause(err)) // Out of range partition-num. @@ -177,7 +179,7 @@ func TestCompleteConfigByOpts(t *testing.T) { sinkURI, err = url.Parse(uri) require.Nil(t, err) cfg = NewConfig() - err = cfg.Apply(sinkURI) + err = cfg.Apply(sinkURI, replicaCfg) require.Regexp(t, ".*invalid partition num.*", errors.Cause(err)) } @@ -390,7 +392,7 @@ func TestConfigurationCombinations(t *testing.T) { require.Nil(t, err) baseConfig := NewConfig() - err = baseConfig.Apply(sinkURI) + err = baseConfig.Apply(sinkURI, config.GetDefaultReplicaConfig()) require.Nil(t, err) saramaConfig, err := NewSaramaConfig(context.Background(), baseConfig) @@ -428,26 +430,30 @@ func TestApplySASL(t *testing.T) { t.Parallel() tests := []struct { - name string - URI string - exceptErr string + name string + URI string + replicaConfig func() *config.ReplicaConfig + exceptErr string }{ { - name: "no params", - URI: "kafka://127.0.0.1:9092/abc", - exceptErr: "", + name: "no params", + URI: "kafka://127.0.0.1:9092/abc", + replicaConfig: config.GetDefaultReplicaConfig, + exceptErr: "", }, { name: "valid PLAIN SASL", URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" + "&sasl-user=user&sasl-password=password&sasl-mechanism=plain", - exceptErr: "", + replicaConfig: config.GetDefaultReplicaConfig, + exceptErr: "", }, { name: "valid SCRAM SASL", URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" + "&sasl-user=user&sasl-password=password&sasl-mechanism=SCRAM-SHA-512", - exceptErr: "", + replicaConfig: config.GetDefaultReplicaConfig, + exceptErr: "", }, { name: "valid GSSAPI user auth SASL", @@ -457,7 +463,8 @@ func TestApplySASL(t *testing.T) { "&sasl-gssapi-service-name=a&sasl-gssapi-user=user" + "&sasl-gssapi-password=pwd" + "&sasl-gssapi-realm=realm&sasl-gssapi-disable-pafxfast=false", - exceptErr: "", + replicaConfig: config.GetDefaultReplicaConfig, + exceptErr: "", }, { name: "valid GSSAPI keytab auth SASL", @@ -467,19 +474,136 @@ func TestApplySASL(t *testing.T) { "&sasl-gssapi-service-name=a&sasl-gssapi-user=user" + "&sasl-gssapi-keytab-path=/root/keytab" + "&sasl-gssapi-realm=realm&sasl-gssapi-disable-pafxfast=false", - exceptErr: "", + replicaConfig: config.GetDefaultReplicaConfig, + exceptErr: "", }, { name: "invalid mechanism", URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" + "&sasl-mechanism=a", - exceptErr: "unknown a SASL mechanism", + replicaConfig: config.GetDefaultReplicaConfig, + exceptErr: "unknown a SASL mechanism", }, { name: "invalid GSSAPI auth type", URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" + "&sasl-mechanism=gssapi&sasl-gssapi-auth-type=keyta1b", - exceptErr: "unknown keyta1b auth type", + replicaConfig: config.GetDefaultReplicaConfig, + exceptErr: "unknown keyta1b auth type", + }, + { + name: "valid OAUTHBEARER SASL", + URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0" + + "&partition-num=0&sasl-mechanism=OAUTHBEARER", + replicaConfig: func() *config.ReplicaConfig { + cfg := config.GetDefaultReplicaConfig() + oauthMechanism := string(security.OAuthMechanism) + clientID := "client_id" + clientSecret := "Y2xpZW50X3NlY3JldA==" // base64(client_secret) + tokenURL := "127.0.0.1:9093/token" + cfg.Sink.KafkaConfig = &config.KafkaConfig{ + SASLMechanism: &oauthMechanism, + SASLOAuthClientID: &clientID, + SASLOAuthClientSecret: &clientSecret, + SASLOAuthTokenURL: &tokenURL, + } + return cfg + }, + exceptErr: "", + }, + { + name: "invalid OAUTHBEARER SASL: missing client id", + URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0" + + "&partition-num=0&sasl-mechanism=OAUTHBEARER", + replicaConfig: func() *config.ReplicaConfig { + cfg := config.GetDefaultReplicaConfig() + oauthMechanism := string(security.OAuthMechanism) + clientSecret := "Y2xpZW50X3NlY3JldA==" // base64(client_secret) + tokenURL := "127.0.0.1:9093/token" + cfg.Sink.KafkaConfig = &config.KafkaConfig{ + SASLMechanism: &oauthMechanism, + SASLOAuthClientSecret: &clientSecret, + SASLOAuthTokenURL: &tokenURL, + } + return cfg + }, + exceptErr: "OAuth2 client id is empty", + }, + { + name: "invalid OAUTHBEARER SASL: missing client secret", + URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0" + + "&partition-num=0&sasl-mechanism=OAUTHBEARER", + replicaConfig: func() *config.ReplicaConfig { + cfg := config.GetDefaultReplicaConfig() + oauthMechanism := string(security.OAuthMechanism) + clientID := "client_id" + tokenURL := "127.0.0.1:9093/token" + cfg.Sink.KafkaConfig = &config.KafkaConfig{ + SASLMechanism: &oauthMechanism, + SASLOAuthClientID: &clientID, + SASLOAuthTokenURL: &tokenURL, + } + return cfg + }, + exceptErr: "OAuth2 client secret is empty", + }, + { + name: "invalid OAUTHBEARER SASL: missing token url", + URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0" + + "&partition-num=0&sasl-mechanism=OAUTHBEARER", + replicaConfig: func() *config.ReplicaConfig { + cfg := config.GetDefaultReplicaConfig() + oauthMechanism := string(security.OAuthMechanism) + clientID := "client_id" + clientSecret := "Y2xpZW50X3NlY3JldA==" // base64(client_secret) + cfg.Sink.KafkaConfig = &config.KafkaConfig{ + SASLMechanism: &oauthMechanism, + SASLOAuthClientID: &clientID, + SASLOAuthClientSecret: &clientSecret, + } + return cfg + }, + exceptErr: "OAuth2 token url is empty", + }, + { + name: "invalid OAUTHBEARER SASL: non base64 client secret", + URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0" + + "&partition-num=0&sasl-mechanism=OAUTHBEARER", + replicaConfig: func() *config.ReplicaConfig { + cfg := config.GetDefaultReplicaConfig() + oauthMechanism := string(security.OAuthMechanism) + clientID := "client_id" + clientSecret := "client_secret" + tokenURL := "127.0.0.1:9093/token" + cfg.Sink.KafkaConfig = &config.KafkaConfig{ + SASLMechanism: &oauthMechanism, + SASLOAuthClientID: &clientID, + SASLOAuthClientSecret: &clientSecret, + SASLOAuthTokenURL: &tokenURL, + } + return cfg + }, + exceptErr: "OAuth2 client secret is not base64 encoded", + }, + { + name: "invalid OAUTHBEARER SASL: wrong mechanism", + URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0" + + "&partition-num=0&sasl-mechanism=GSSAPI", + replicaConfig: func() *config.ReplicaConfig { + cfg := config.GetDefaultReplicaConfig() + oauthMechanism := string(security.OAuthMechanism) + clientID := "client_id" + clientSecret := "Y2xpZW50X3NlY3JldA==" // base64(client_secret) + tokenURL := "127.0.0.1:9093/token" + cfg.Sink.KafkaConfig = &config.KafkaConfig{ + SASLMechanism: &oauthMechanism, + SASLOAuthClientID: &clientID, + SASLOAuthClientSecret: &clientSecret, + SASLOAuthTokenURL: &tokenURL, + } + return cfg + }, + exceptErr: "OAuth2 is only supported with SASL mechanism type OAUTHBEARER", }, } @@ -490,9 +614,11 @@ func TestApplySASL(t *testing.T) { sinkURI, err := url.Parse(test.URI) require.Nil(t, err) if test.exceptErr == "" { - require.Nil(t, cfg.applySASL(sinkURI.Query())) + require.Nil(t, cfg.applySASL( + sinkURI.Query(), test.replicaConfig())) } else { - require.Regexp(t, test.exceptErr, cfg.applySASL(sinkURI.Query()).Error()) + require.Regexp(t, test.exceptErr, cfg.applySASL( + sinkURI.Query(), test.replicaConfig()).Error()) } }) } @@ -567,6 +693,7 @@ func TestApplyTLS(t *testing.T) { func TestCompleteSaramaSASLConfig(t *testing.T) { t.Parallel() + ctx := context.Background() // Test that SASL is turned on correctly. cfg := NewConfig() cfg.SASL = &security.SASL{ @@ -576,10 +703,10 @@ func TestCompleteSaramaSASLConfig(t *testing.T) { GSSAPI: security.GSSAPI{}, } saramaConfig := sarama.NewConfig() - completeSaramaSASLConfig(saramaConfig, cfg) + completeSaramaSASLConfig(ctx, saramaConfig, cfg) require.False(t, saramaConfig.Net.SASL.Enable) cfg.SASL.SASLMechanism = "plain" - completeSaramaSASLConfig(saramaConfig, cfg) + completeSaramaSASLConfig(ctx, saramaConfig, cfg) require.True(t, saramaConfig.Net.SASL.Enable) // Test that the SCRAMClientGeneratorFunc is set up correctly. cfg = NewConfig() @@ -590,9 +717,9 @@ func TestCompleteSaramaSASLConfig(t *testing.T) { GSSAPI: security.GSSAPI{}, } saramaConfig = sarama.NewConfig() - completeSaramaSASLConfig(saramaConfig, cfg) + completeSaramaSASLConfig(ctx, saramaConfig, cfg) require.Nil(t, saramaConfig.Net.SASL.SCRAMClientGeneratorFunc) cfg.SASL.SASLMechanism = "SCRAM-SHA-512" - completeSaramaSASLConfig(saramaConfig, cfg) + completeSaramaSASLConfig(ctx, saramaConfig, cfg) require.NotNil(t, saramaConfig.Net.SASL.SCRAMClientGeneratorFunc) } diff --git a/cdc/sink/mq/producer/kafka/kafka.go b/cdc/sink/mq/producer/kafka/kafka.go index e41d982739c..b7effca49b4 100644 --- a/cdc/sink/mq/producer/kafka/kafka.go +++ b/cdc/sink/mq/producer/kafka/kafka.go @@ -406,21 +406,36 @@ func kafkaClientID(role, captureAddr string, func AdjustConfig( admin kafka.ClusterAdminClient, config *Config, saramaConfig *sarama.Config, topic string, ) error { - topics, err := admin.ListTopics() + topics, err := admin.DescribeTopics([]string{topic}) if err != nil { return errors.Trace(err) } + var info *sarama.TopicMetadata + var exists bool + for _, t := range topics { + if t.Err == sarama.ErrNoError { + if t.Name == topic { + info = t + exists = true + log.Info("topic already exists", zap.String("topic", topic)) + break + } + } else if t.Err != sarama.ErrUnknownTopicOrPartition { + log.Warn("failed to describe topic", zap.String("topic", topic), zap.Error(t.Err)) + return errors.Trace(t.Err) + } + } - err = validateMinInsyncReplicas(admin, topics, topic, int(config.ReplicationFactor)) + err = validateMinInsyncReplicas(admin, topic, exists, int(config.ReplicationFactor)) if err != nil { return errors.Trace(err) } - info, exists := topics[topic] // once we have found the topic, no matter `auto-create-topic`, make sure user input parameters are valid. if exists { // make sure that producer's `MaxMessageBytes` smaller than topic's `max.message.bytes` - topicMaxMessageBytesStr, err := getTopicConfig(admin, info, kafka.TopicMaxMessageBytesConfigName, + topicMaxMessageBytesStr, err := getTopicConfig(admin, + info.Name, kafka.TopicMaxMessageBytesConfigName, kafka.BrokerMessageMaxBytesConfigName) if err != nil { return errors.Trace(err) @@ -444,7 +459,7 @@ func AdjustConfig( zap.String("topic", topic), zap.Any("detail", info)) } - if err := config.setPartitionNum(info.NumPartitions); err != nil { + if err := config.setPartitionNum(int32(len(info.Partitions))); err != nil { return errors.Trace(err) } @@ -484,12 +499,11 @@ func AdjustConfig( func validateMinInsyncReplicas( admin kafka.ClusterAdminClient, - topics map[string]sarama.TopicDetail, topic string, replicationFactor int, + topic string, topicExists bool, replicationFactor int, ) error { minInsyncReplicasConfigGetter := func() (string, bool, error) { - info, exists := topics[topic] - if exists { - minInsyncReplicasStr, err := getTopicConfig(admin, info, + if topicExists { + minInsyncReplicasStr, err := getTopicConfig(admin, topic, kafka.MinInsyncReplicasConfigName, kafka.MinInsyncReplicasConfigName) if err != nil { @@ -555,22 +569,53 @@ func getBrokerConfig(admin kafka.ClusterAdminClient, brokerConfigName string) (s return "", err } - if len(configEntries) == 0 || configEntries[0].Name != brokerConfigName { - log.Warn("Kafka config item not found", zap.String("configName", brokerConfigName)) - return "", cerror.ErrKafkaBrokerConfigNotFound.GenWithStack( - "cannot find the `%s` from the broker's configuration", brokerConfigName) + // For compatibility with KOP, we checked all return values. + // 1. Kafka only returns requested configs. + // 2. Kop returns all configs. + for _, entry := range configEntries { + if entry.Name == brokerConfigName { + return entry.Value, nil + } } - - return configEntries[0].Value, nil + log.Warn("Kafka config item not found", + zap.String("configName", brokerConfigName)) + return "", cerror.ErrKafkaBrokerConfigNotFound.GenWithStack( + "cannot find the `%s` from the broker's configuration", brokerConfigName) } // getTopicConfig gets topic config by name. // If the topic does not have this configuration, we will try to get it from the broker's configuration. // NOTICE: The configuration names of topic and broker may be different for the same configuration. -func getTopicConfig(admin kafka.ClusterAdminClient, detail sarama.TopicDetail, topicConfigName string, brokerConfigName string) (string, error) { - if a, ok := detail.ConfigEntries[topicConfigName]; ok { - return *a, nil +func getTopicConfig(admin kafka.ClusterAdminClient, + topicName string, topicConfigName string, brokerConfigName string, +) (string, error) { + var configEntries []sarama.ConfigEntry + configEntries, err := admin.DescribeConfig(sarama.ConfigResource{ + Type: sarama.TopicResource, + Name: topicName, + ConfigNames: []string{topicConfigName}, + }) + if err != nil { + log.Warn("Get topic config failed", + zap.String("topicName", topicName), + zap.String("configName", topicConfigName), + zap.Error(err)) + } + // For compatibility with KOP, we checked all return values. + // 1. Kafka only returns requested configs. + // 2. Kop returns all configs. + for _, entry := range configEntries { + if entry.Name == topicConfigName { + log.Info("Kafka config item found", + zap.String("topicName", topicName), + zap.String("configName", topicConfigName), + zap.String("configValue", entry.Value)) + return entry.Value, nil + } } + log.Warn("Kafka config item not found", + zap.String("topicName", topicName), + zap.String("configName", topicConfigName)) return getBrokerConfig(admin, brokerConfigName) } diff --git a/cdc/sink/mq/producer/kafka/oauth2_token_provider.go b/cdc/sink/mq/producer/kafka/oauth2_token_provider.go new file mode 100644 index 00000000000..268d33bc3ba --- /dev/null +++ b/cdc/sink/mq/producer/kafka/oauth2_token_provider.go @@ -0,0 +1,92 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + "net/url" + + "github.com/Shopify/sarama" + "github.com/pingcap/errors" + "golang.org/x/oauth2" + "golang.org/x/oauth2/clientcredentials" +) + +// tsokenProvider is a user-defined callback for generating +// access tokens for SASL/OAUTHBEARER auth. +type tokenProvider struct { + tokenSource oauth2.TokenSource +} + +var _ sarama.AccessTokenProvider = (*tokenProvider)(nil) + +// Token implements the sarama.AccessTokenProvider interface. +// Token returns an access token. The implementation should ensure token +// reuse so that multiple calls at connect time do not create multiple +// tokens. The implementation should also periodically refresh the token in +// order to guarantee that each call returns an unexpired token. This +// method should not block indefinitely--a timeout error should be returned +// after a short period of inactivity so that the broker connection logic +// can log debugging information and retry. +func (t *tokenProvider) Token() (*sarama.AccessToken, error) { + token, err := t.tokenSource.Token() + if err != nil { + // Errors will result in Sarama retrying the broker connection and logging + // the transient error, with a Broker connection error surfacing after retry + // attempts have been exhausted. + return nil, err + } + + return &sarama.AccessToken{Token: token.AccessToken}, nil +} + +func newTokenProvider(ctx context.Context, + kafkaConfig *Config, +) (sarama.AccessTokenProvider, error) { + // grant_type is by default going to be set to 'client_credentials' by the + // clientcredentials library as defined by the spec, however non-compliant + // auth server implementations may want a custom type + var endpointParams url.Values + if kafkaConfig.SASL.OAuth2.GrantType != "" { + if endpointParams == nil { + endpointParams = url.Values{} + } + endpointParams.Set("grant_type", kafkaConfig.SASL.OAuth2.GrantType) + } + + // audience is an optional parameter that can be used to specify the + // intended audience of the token. + if kafkaConfig.SASL.OAuth2.Audience != "" { + if endpointParams == nil { + endpointParams = url.Values{} + } + endpointParams.Set("audience", kafkaConfig.SASL.OAuth2.Audience) + } + + tokenURL, err := url.Parse(kafkaConfig.SASL.OAuth2.TokenURL) + if err != nil { + return nil, errors.Trace(err) + } + + cfg := clientcredentials.Config{ + ClientID: kafkaConfig.SASL.OAuth2.ClientID, + ClientSecret: kafkaConfig.SASL.OAuth2.ClientSecret, + TokenURL: tokenURL.String(), + EndpointParams: endpointParams, + Scopes: kafkaConfig.SASL.OAuth2.Scopes, + } + return &tokenProvider{ + tokenSource: cfg.TokenSource(ctx), + }, nil +} diff --git a/cdc/sink/mq/producer/kafka/oauth2_token_provider_test.go b/cdc/sink/mq/producer/kafka/oauth2_token_provider_test.go new file mode 100644 index 00000000000..f986c22ba07 --- /dev/null +++ b/cdc/sink/mq/producer/kafka/oauth2_token_provider_test.go @@ -0,0 +1,74 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + "testing" + + "github.com/pingcap/tiflow/pkg/security" + "github.com/stretchr/testify/require" +) + +func TestNewTokenProvider(t *testing.T) { + t.Parallel() + + for _, test := range []struct { + name string + config *Config + expectedErr string + }{ + { + name: "valid", + config: &Config{ + SASL: &security.SASL{ + OAuth2: security.OAuth2{ + ClientID: "client-id", + ClientSecret: "client-secret", + TokenURL: "http://localhost:8080/oauth2/token", + Scopes: []string{"scope1", "scope2"}, + GrantType: "client_credentials", + }, + }, + }, + }, + { + name: "invalid token URL", + config: &Config{ + SASL: &security.SASL{ + OAuth2: security.OAuth2{ + ClientID: "client-id", + ClientSecret: "client-secret", + TokenURL: "http://test.com/Segment%%2815197306101420000%29", + Scopes: []string{"scope1", "scope2"}, + GrantType: "client_credentials", + }, + }, + }, + expectedErr: "invalid URL escape", + }, + } { + ts := test + t.Run(ts.name, func(t *testing.T) { + t.Parallel() + _, err := newTokenProvider(context.TODO(), ts.config) + if ts.expectedErr == "" { + require.NoError(t, err) + } else { + require.Error(t, err) + require.Contains(t, err.Error(), ts.expectedErr) + } + }) + } +} diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index 8093ce171f1..add1b698ff1 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -741,6 +741,35 @@ var doc = `{ } } }, + "config.KafkaConfig": { + "type": "object", + "properties": { + "sasl-mechanism": { + "type": "string" + }, + "sasl-oauth-audience": { + "type": "string" + }, + "sasl-oauth-client-id": { + "type": "string" + }, + "sasl-oauth-client-secret": { + "type": "string" + }, + "sasl-oauth-grant-type": { + "type": "string" + }, + "sasl-oauth-scopes": { + "type": "array", + "items": { + "type": "string" + } + }, + "sasl-oauth-token-url": { + "type": "string" + } + } + }, "config.SinkConfig": { "type": "object", "properties": { @@ -759,6 +788,9 @@ var doc = `{ "encoder-concurrency": { "type": "integer" }, + "kafka-config": { + "$ref": "#/definitions/config.KafkaConfig" + }, "protocol": { "type": "string" }, diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index bbf8545643b..6643d9597db 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -722,6 +722,35 @@ } } }, + "config.KafkaConfig": { + "type": "object", + "properties": { + "sasl-mechanism": { + "type": "string" + }, + "sasl-oauth-audience": { + "type": "string" + }, + "sasl-oauth-client-id": { + "type": "string" + }, + "sasl-oauth-client-secret": { + "type": "string" + }, + "sasl-oauth-grant-type": { + "type": "string" + }, + "sasl-oauth-scopes": { + "type": "array", + "items": { + "type": "string" + } + }, + "sasl-oauth-token-url": { + "type": "string" + } + } + }, "config.SinkConfig": { "type": "object", "properties": { @@ -740,6 +769,9 @@ "encoder-concurrency": { "type": "integer" }, + "kafka-config": { + "$ref": "#/definitions/config.KafkaConfig" + }, "protocol": { "type": "string" }, diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 9dd4b9a4ebb..75425ae16af 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -27,6 +27,25 @@ definitions: topic: type: string type: object + config.KafkaConfig: + properties: + sasl-mechanism: + type: string + sasl-oauth-audience: + type: string + sasl-oauth-client-id: + type: string + sasl-oauth-client-secret: + type: string + sasl-oauth-grant-type: + type: string + sasl-oauth-scopes: + items: + type: string + type: array + sasl-oauth-token-url: + type: string + type: object config.SinkConfig: properties: column-selectors: @@ -39,6 +58,8 @@ definitions: type: array encoder-concurrency: type: integer + kafka-config: + $ref: '#/definitions/config.KafkaConfig' protocol: type: string schema-registry: diff --git a/go.mod b/go.mod index d2cd42873ae..fef5c38743c 100644 --- a/go.mod +++ b/go.mod @@ -88,12 +88,13 @@ require ( go.uber.org/goleak v1.1.12 go.uber.org/multierr v1.8.0 go.uber.org/zap v1.21.0 - golang.org/x/net v0.0.0-20221014081412-f15817d10f9b + golang.org/x/net v0.2.0 + golang.org/x/oauth2 v0.2.0 golang.org/x/sync v0.1.0 - golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab + golang.org/x/sys v0.2.0 golang.org/x/text v0.4.0 golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 - golang.org/x/tools v0.1.12 // indirect + golang.org/x/tools v0.2.0 // indirect google.golang.org/genproto v0.0.0-20221201164419-0e50fba7f41c google.golang.org/grpc v1.50.1 gopkg.in/yaml.v2 v2.4.0 @@ -265,10 +266,9 @@ require ( go.opentelemetry.io/otel/sdk/metric v0.20.0 // indirect go.opentelemetry.io/otel/trace v0.20.0 // indirect go.opentelemetry.io/proto/otlp v0.7.0 // indirect - golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect - golang.org/x/exp v0.0.0-20220516143420-24438e51023a // indirect - golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 // indirect - golang.org/x/term v0.0.0-20220411215600-e5f449aeb171 // indirect + golang.org/x/crypto v0.1.0 // indirect + golang.org/x/exp v0.0.0-20220426173459-3bcf042a4bf5 // indirect + golang.org/x/term v0.2.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/api v0.103.0 // indirect google.golang.org/appengine v1.6.7 // indirect diff --git a/go.sum b/go.sum index b09506e50bf..8e497d6baed 100644 --- a/go.sum +++ b/go.sum @@ -1390,8 +1390,9 @@ golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa h1:zuSxTR4o9y82ebqCUJYNGJbGPo6sKVl54f/TVDObg1c= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU= +golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20181106170214-d68db9428509/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1407,9 +1408,8 @@ golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= golang.org/x/exp v0.0.0-20200513190911-00229845015e/go.mod h1:4M0jN8W1tt0AVLNr8HDosyJCDCDuyL9N9+3m7wDWgKw= +golang.org/x/exp v0.0.0-20220426173459-3bcf042a4bf5 h1:rxKZ2gOnYxjfmakvUUqh9Gyb6KXfrj7JWTxORTYqb0E= golang.org/x/exp v0.0.0-20220426173459-3bcf042a4bf5/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= -golang.org/x/exp v0.0.0-20220516143420-24438e51023a h1:tiLLxEjKNE6Hrah/Dp/cyHvsyjDLcMFSocOHO5XDmOM= -golang.org/x/exp v0.0.0-20220516143420-24438e51023a/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= @@ -1441,7 +1441,7 @@ golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= golang.org/x/mod v0.6.0-dev.0.20211013180041-c96bc1413d57/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= -golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s= +golang.org/x/mod v0.6.0 h1:b9gGHsz9/HhJ3HF5DHQytPpuwocVTChQJK3AvoLRD5I= golang.org/x/net v0.0.0-20180530234432-1e491301e022/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1509,8 +1509,8 @@ golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM= golang.org/x/net v0.0.0-20220809184613-07c6da5e1ced/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= -golang.org/x/net v0.0.0-20221014081412-f15817d10f9b h1:tvrvnPFcdzp294diPnrdZZZ8XUt2Tyj7svb7X52iDuU= -golang.org/x/net v0.0.0-20221014081412-f15817d10f9b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= +golang.org/x/net v0.2.0 h1:sZfSu1wtKLGlWI4ZZayP0ck9Y73K1ynO6gqzTdBVdPU= +golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1527,8 +1527,8 @@ golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914/go.mod h1:KelEdhl1UZF7XfJ golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= -golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 h1:nt+Q6cXKz4MosCSpnbMtqiQ8Oz0pxTef2B4Vca2lvfk= -golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783/go.mod h1:h4gKUeWbJ4rQPri7E0u6Gs4e9Ri2zaLxzw5DI5XGrYg= +golang.org/x/oauth2 v0.2.0 h1:GtQkldQ9m7yvzCL1V+LrYow3Khe0eJH0w7RbX/VbaIU= +golang.org/x/oauth2 v0.2.0/go.mod h1:Cwn6afJ8jrQwYMxQDTpISoXmXW9I6qF6vDeuuoX3Ibs= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -1645,13 +1645,14 @@ golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab h1:2QkjZIsXupsJbJIdSjjUOgWK3aEtzyuh2mPt3l/CkeU= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.2.0 h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A= +golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.0.0-20220411215600-e5f449aeb171 h1:EH1Deb8WZJ0xc0WK//leUHXcX9aLE5SymusoTmMZye8= -golang.org/x/term v0.0.0-20220411215600-e5f449aeb171/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.2.0 h1:z85xZCsEl7bi/KwbNADeBYoOP0++7W1ipu+aGnpwzRM= +golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1756,8 +1757,8 @@ golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.8-0.20211029000441-d6a9af8af023/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= golang.org/x/tools v0.1.8/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E= -golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= -golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.2.0 h1:G6AHpWxTMGY1KyEYoAQ5WTtIekUUvDNjan3ugu60JvE= +golang.org/x/tools v0.2.0/go.mod h1:y4OqIKeOV/fWJetJ8bXPU1sEVniLMIyDAZWeHdV+NTA= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 7275077049d..25af977ffb8 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -88,6 +88,19 @@ type SinkConfig struct { ColumnSelectors []*ColumnSelector `toml:"column-selectors" json:"column-selectors"` SchemaRegistry string `toml:"schema-registry" json:"schema-registry"` EncoderConcurrency int `toml:"encoder-concurrency" json:"encoder-concurrency"` + + KafkaConfig *KafkaConfig `toml:"kafka-config" json:"kafka-config,omitempty"` +} + +// KafkaConfig represents a kafka sink configuration +type KafkaConfig struct { + SASLMechanism *string `toml:"sasl-mechanism" json:"sasl-mechanism,omitempty"` + SASLOAuthClientID *string `toml:"sasl-oauth-client-id" json:"sasl-oauth-client-id,omitempty"` + SASLOAuthClientSecret *string `toml:"sasl-oauth-client-secret" json:"sasl-oauth-client-secret,omitempty"` + SASLOAuthTokenURL *string `toml:"sasl-oauth-token-url" json:"sasl-oauth-token-url,omitempty"` + SASLOAuthScopes []string `toml:"sasl-oauth-scopes" json:"sasl-oauth-scopes,omitempty"` + SASLOAuthGrantType *string `toml:"sasl-oauth-grant-type" json:"sasl-oauth-grant-type,omitempty"` + SASLOAuthAudience *string `toml:"sasl-oauth-audience" json:"sasl-oauth-audience,omitempty"` } // DispatchRule represents partition rule for a table. diff --git a/pkg/kafka/cluster_admin_client.go b/pkg/kafka/cluster_admin_client.go index bcb13276777..fac44e1e4cd 100644 --- a/pkg/kafka/cluster_admin_client.go +++ b/pkg/kafka/cluster_admin_client.go @@ -20,8 +20,6 @@ import ( // ClusterAdminClient is the administrative client for Kafka, which supports managing and inspecting topics, // brokers, configurations and ACLs. type ClusterAdminClient interface { - // ListTopics list the topics available in the cluster with the default options. - ListTopics() (map[string]sarama.TopicDetail, error) // DescribeCluster gets information about the nodes in the cluster DescribeCluster() (brokers []*sarama.Broker, controllerID int32, err error) // DescribeConfig gets the configuration for the specified resources. diff --git a/pkg/kafka/cluster_admin_client_mock_impl.go b/pkg/kafka/cluster_admin_client_mock_impl.go index e1ebdd0887a..b80ee60cfd6 100644 --- a/pkg/kafka/cluster_admin_client_mock_impl.go +++ b/pkg/kafka/cluster_admin_client_mock_impl.go @@ -61,6 +61,7 @@ type ClusterAdminClientMockImpl struct { topics map[string]*topicDetail // Cluster controller ID. controllerID int32 + topicConfigs map[string]map[string]string brokerConfigs []sarama.ConfigEntry } @@ -89,22 +90,19 @@ func NewClusterAdminClientMockImpl() *ClusterAdminClientMockImpl { }, } + topicConfigs := make(map[string]map[string]string) + topicConfigs[DefaultMockTopicName] = make(map[string]string) + topicConfigs[DefaultMockTopicName][TopicMaxMessageBytesConfigName] = TopicMaxMessageBytes + topicConfigs[DefaultMockTopicName][MinInsyncReplicasConfigName] = MinInSyncReplicas + return &ClusterAdminClientMockImpl{ topics: topics, controllerID: defaultMockControllerID, brokerConfigs: brokerConfigs, + topicConfigs: topicConfigs, } } -// ListTopics returns all topics directly. -func (c *ClusterAdminClientMockImpl) ListTopics() (map[string]sarama.TopicDetail, error) { - topicsDetailsMap := make(map[string]sarama.TopicDetail) - for topic, detail := range c.topics { - topicsDetailsMap[topic] = detail.TopicDetail - } - return topicsDetailsMap, nil -} - // DescribeCluster returns the controller ID. func (c *ClusterAdminClientMockImpl) DescribeCluster() (brokers []*sarama.Broker, controllerID int32, err error) { return nil, c.controllerID, nil @@ -113,10 +111,27 @@ func (c *ClusterAdminClientMockImpl) DescribeCluster() (brokers []*sarama.Broker // DescribeConfig return brokerConfigs directly. func (c *ClusterAdminClientMockImpl) DescribeConfig(resource sarama.ConfigResource) ([]sarama.ConfigEntry, error) { var result []sarama.ConfigEntry - for _, name := range resource.ConfigNames { - for _, config := range c.brokerConfigs { - if name == config.Name { - result = append(result, config) + if resource.Type == sarama.TopicResource { + if _, ok := c.topics[resource.Name]; !ok { + return nil, fmt.Errorf("topic %s not found", resource.Name) + } + for _, name := range resource.ConfigNames { + for n, config := range c.topicConfigs[resource.Name] { + if name == n { + result = append(result, sarama.ConfigEntry{ + Name: n, + Value: config, + }) + } + } + } + + } else if resource.Type == sarama.BrokerResource { + for _, name := range resource.ConfigNames { + for _, config := range c.brokerConfigs { + if name == config.Name { + result = append(result, config) + } } } } @@ -207,7 +222,7 @@ func (c *ClusterAdminClientMockImpl) Close() error { // SetMinInsyncReplicas sets the MinInsyncReplicas for broker and default topic. func (c *ClusterAdminClientMockImpl) SetMinInsyncReplicas(minInsyncReplicas string) { - c.topics[DefaultMockTopicName].ConfigEntries[MinInsyncReplicasConfigName] = &minInsyncReplicas + c.topicConfigs[DefaultMockTopicName][MinInsyncReplicasConfigName] = minInsyncReplicas for i, config := range c.brokerConfigs { if config.Name == MinInsyncReplicasConfigName { diff --git a/pkg/security/sasl.go b/pkg/security/sasl.go index 8077589d547..3be79701b54 100644 --- a/pkg/security/sasl.go +++ b/pkg/security/sasl.go @@ -35,6 +35,8 @@ const ( SCRAM512Mechanism SASLMechanism = sarama.SASLTypeSCRAMSHA512 // GSSAPIMechanism means the SASL mechanism is GSSAPI. GSSAPIMechanism SASLMechanism = sarama.SASLTypeGSSAPI + // OAuthMechanism means the SASL mechanism is OAuth2. + OAuthMechanism SASLMechanism = sarama.SASLTypeOAuth ) // SASLMechanismFromString converts the string to SASL mechanism. @@ -48,6 +50,8 @@ func SASLMechanismFromString(s string) (SASLMechanism, error) { return SCRAM512Mechanism, nil case "gssapi": return GSSAPIMechanism, nil + case "oauthbearer": + return OAuthMechanism, nil default: return UnknownMechanism, errors.Errorf("unknown %s SASL mechanism", s) } @@ -55,10 +59,47 @@ func SASLMechanismFromString(s string) (SASLMechanism, error) { // SASL holds necessary path parameter to support sasl-scram type SASL struct { - SASLUser string `toml:"sasl-user" json:"sasl-user"` - SASLPassword string `toml:"sasl-password" json:"sasl-password"` - SASLMechanism SASLMechanism `toml:"sasl-mechanism" json:"sasl-mechanism"` - GSSAPI GSSAPI `toml:"sasl-gssapi" json:"sasl-gssapi"` + SASLUser string + SASLPassword string + SASLMechanism SASLMechanism + GSSAPI GSSAPI + OAuth2 OAuth2 +} + +// OAuth2 holds necessary parameters to support sasl-oauth2. +type OAuth2 struct { + ClientID string + ClientSecret string + TokenURL string + Scopes []string + GrantType string + Audience string +} + +// Validate validates the parameters of OAuth2. +// Some parameters are required, some are optional. +func (o *OAuth2) Validate() error { + if len(o.ClientID) == 0 { + return errors.New("OAuth2 client id is empty") + } + if len(o.ClientSecret) == 0 { + return errors.New("OAuth2 client secret is empty") + } + if len(o.TokenURL) == 0 { + return errors.New("OAuth2 token url is empty") + } + return nil +} + +// SetDefault sets the default value of OAuth2. +func (o *OAuth2) SetDefault() { + o.GrantType = "client_credentials" +} + +// IsEnable checks whether the OAuth2 is enabled. +// One of values of ClientID, ClientSecret and TokenURL is not empty means enabled. +func (o *OAuth2) IsEnable() bool { + return len(o.ClientID) > 0 || len(o.ClientSecret) > 0 || len(o.TokenURL) > 0 } // GSSAPIAuthType defines the type of GSSAPI authentication.