Skip to content

Commit

Permalink
sink: support to configure kafka-client-id in mq sink-uri (pingcap#706)
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei authored Jul 1, 2020
1 parent 5d8cb90 commit c42e70f
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 17 deletions.
2 changes: 2 additions & 0 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi
config.Compression = s
}

config.ClientID = sinkURI.Query().Get("kafka-client-id")

s = sinkURI.Query().Get("protocol")
if s != "" {
replicaConfig.Sink.Protocol = s
Expand Down
15 changes: 10 additions & 5 deletions cdc/sink/mqProducer/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type KafkaConfig struct {
Version string
MaxMessageBytes int
Compression string
ClientID string
}

// DefaultKafkaConfig is the default Kafka configuration
Expand Down Expand Up @@ -274,13 +275,17 @@ var (
commonInvalidChar *regexp.Regexp = regexp.MustCompile(`[\?:,"]`)
)

func kafkaClientID(role, captureAddr, changefeedID string) (string, error) {
clientID := fmt.Sprintf("TiCDC_sarama_producer_%s_%s_%s", role, captureAddr, changefeedID)
clientID = commonInvalidChar.ReplaceAllString(clientID, "_")
func kafkaClientID(role, captureAddr, changefeedID, configuredClientID string) (clientID string, err error) {
if configuredClientID != "" {
clientID = configuredClientID
} else {
clientID = fmt.Sprintf("TiCDC_sarama_producer_%s_%s_%s", role, captureAddr, changefeedID)
clientID = commonInvalidChar.ReplaceAllString(clientID, "_")
}
if !validClienID.MatchString(clientID) {
return "", errors.Errorf("invalid kafka client ID '%s'", clientID)
}
return clientID, nil
return
}

// NewSaramaConfig return the default config and set the according version and metrics
Expand All @@ -300,7 +305,7 @@ func newSaramaConfig(ctx context.Context, c KafkaConfig) (*sarama.Config, error)
captureAddr := util.CaptureAddrFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)

config.ClientID, err = kafkaClientID(role, captureAddr, changefeedID)
config.ClientID, err = kafkaClientID(role, captureAddr, changefeedID, c.ClientID)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
34 changes: 23 additions & 11 deletions cdc/sink/mqProducer/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,27 @@ var _ = check.Suite(&kafkaSuite{})
func Test(t *testing.T) { check.TestingT(t) }

func (s *kafkaSuite) TestClientID(c *check.C) {
_, err := kafkaClientID("owner", "domain:1234", "123-121-121-121")
c.Assert(err, check.IsNil)

_, err = kafkaClientID("owner", "127.0.0.1:1234", "123-121-121-121")
c.Assert(err, check.IsNil)

_, err = kafkaClientID("owner", "127.0.0.1:1234?:,\"", "123-121-121-121")
c.Assert(err, check.IsNil)

_, err = kafkaClientID("owner", "中文", "123-121-121-121")
c.Assert(err, check.NotNil)
testCases := []struct {
role string
addr string
changefeedID string
configuredID string
hasError bool
expected string
}{
{"owner", "domain:1234", "123-121-121-121", "", false, "TiCDC_sarama_producer_owner_domain_1234_123-121-121-121"},
{"owner", "127.0.0.1:1234", "123-121-121-121", "", false, "TiCDC_sarama_producer_owner_127.0.0.1_1234_123-121-121-121"},
{"owner", "127.0.0.1:1234?:,\"", "123-121-121-121", "", false, "TiCDC_sarama_producer_owner_127.0.0.1_1234_____123-121-121-121"},
{"owner", "中文", "123-121-121-121", "", true, ""},
{"owner", "127.0.0.1:1234", "123-121-121-121", "cdc-changefeed-1", false, "cdc-changefeed-1"},
}
for _, tc := range testCases {
id, err := kafkaClientID(tc.role, tc.addr, tc.changefeedID, tc.configuredID)
if tc.hasError {
c.Assert(err, check.NotNil)
} else {
c.Assert(err, check.IsNil)
c.Assert(id, check.Equals, tc.expected)
}
}
}
2 changes: 1 addition & 1 deletion tests/simple/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ function prepare() {

TOPIC_NAME="ticdc-simple-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4";;
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-client-id=cdc_test_simple";;
mysql) ;&
*) SINK_URI="mysql://root@127.0.0.1:3306/";;
esac
Expand Down

0 comments on commit c42e70f

Please sign in to comment.