From c42e70f9ca47642459e10a7d3aacca797de0eabb Mon Sep 17 00:00:00 2001 From: amyangfei Date: Wed, 1 Jul 2020 15:07:45 +0800 Subject: [PATCH] sink: support to configure kafka-client-id in mq sink-uri (#706) --- cdc/sink/mq.go | 2 ++ cdc/sink/mqProducer/kafka.go | 15 +++++++++----- cdc/sink/mqProducer/kafka_test.go | 34 +++++++++++++++++++++---------- tests/simple/run.sh | 2 +- 4 files changed, 36 insertions(+), 17 deletions(-) diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index 32ad6d5a660..6832b3da950 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -326,6 +326,8 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi config.Compression = s } + config.ClientID = sinkURI.Query().Get("kafka-client-id") + s = sinkURI.Query().Get("protocol") if s != "" { replicaConfig.Sink.Protocol = s diff --git a/cdc/sink/mqProducer/kafka.go b/cdc/sink/mqProducer/kafka.go index 817613a02ac..4d478878714 100644 --- a/cdc/sink/mqProducer/kafka.go +++ b/cdc/sink/mqProducer/kafka.go @@ -37,6 +37,7 @@ type KafkaConfig struct { Version string MaxMessageBytes int Compression string + ClientID string } // DefaultKafkaConfig is the default Kafka configuration @@ -274,13 +275,17 @@ var ( commonInvalidChar *regexp.Regexp = regexp.MustCompile(`[\?:,"]`) ) -func kafkaClientID(role, captureAddr, changefeedID string) (string, error) { - clientID := fmt.Sprintf("TiCDC_sarama_producer_%s_%s_%s", role, captureAddr, changefeedID) - clientID = commonInvalidChar.ReplaceAllString(clientID, "_") +func kafkaClientID(role, captureAddr, changefeedID, configuredClientID string) (clientID string, err error) { + if configuredClientID != "" { + clientID = configuredClientID + } else { + clientID = fmt.Sprintf("TiCDC_sarama_producer_%s_%s_%s", role, captureAddr, changefeedID) + clientID = commonInvalidChar.ReplaceAllString(clientID, "_") + } if !validClienID.MatchString(clientID) { return "", errors.Errorf("invalid kafka client ID '%s'", clientID) } - return clientID, nil + return } // NewSaramaConfig return the default config and set the according version and metrics @@ -300,7 +305,7 @@ func newSaramaConfig(ctx context.Context, c KafkaConfig) (*sarama.Config, error) captureAddr := util.CaptureAddrFromCtx(ctx) changefeedID := util.ChangefeedIDFromCtx(ctx) - config.ClientID, err = kafkaClientID(role, captureAddr, changefeedID) + config.ClientID, err = kafkaClientID(role, captureAddr, changefeedID, c.ClientID) if err != nil { return nil, errors.Trace(err) } diff --git a/cdc/sink/mqProducer/kafka_test.go b/cdc/sink/mqProducer/kafka_test.go index 3420c5bb6e7..95ad938445b 100644 --- a/cdc/sink/mqProducer/kafka_test.go +++ b/cdc/sink/mqProducer/kafka_test.go @@ -26,15 +26,27 @@ var _ = check.Suite(&kafkaSuite{}) func Test(t *testing.T) { check.TestingT(t) } func (s *kafkaSuite) TestClientID(c *check.C) { - _, err := kafkaClientID("owner", "domain:1234", "123-121-121-121") - c.Assert(err, check.IsNil) - - _, err = kafkaClientID("owner", "127.0.0.1:1234", "123-121-121-121") - c.Assert(err, check.IsNil) - - _, err = kafkaClientID("owner", "127.0.0.1:1234?:,\"", "123-121-121-121") - c.Assert(err, check.IsNil) - - _, err = kafkaClientID("owner", "中文", "123-121-121-121") - c.Assert(err, check.NotNil) + testCases := []struct { + role string + addr string + changefeedID string + configuredID string + hasError bool + expected string + }{ + {"owner", "domain:1234", "123-121-121-121", "", false, "TiCDC_sarama_producer_owner_domain_1234_123-121-121-121"}, + {"owner", "127.0.0.1:1234", "123-121-121-121", "", false, "TiCDC_sarama_producer_owner_127.0.0.1_1234_123-121-121-121"}, + {"owner", "127.0.0.1:1234?:,\"", "123-121-121-121", "", false, "TiCDC_sarama_producer_owner_127.0.0.1_1234_____123-121-121-121"}, + {"owner", "中文", "123-121-121-121", "", true, ""}, + {"owner", "127.0.0.1:1234", "123-121-121-121", "cdc-changefeed-1", false, "cdc-changefeed-1"}, + } + for _, tc := range testCases { + id, err := kafkaClientID(tc.role, tc.addr, tc.changefeedID, tc.configuredID) + if tc.hasError { + c.Assert(err, check.NotNil) + } else { + c.Assert(err, check.IsNil) + c.Assert(id, check.Equals, tc.expected) + } + } } diff --git a/tests/simple/run.sh b/tests/simple/run.sh index b9459135a67..821cb0a26e1 100644 --- a/tests/simple/run.sh +++ b/tests/simple/run.sh @@ -25,7 +25,7 @@ function prepare() { TOPIC_NAME="ticdc-simple-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4";; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-client-id=cdc_test_simple";; mysql) ;& *) SINK_URI="mysql://root@127.0.0.1:3306/";; esac