Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): Optimize the performance when getting the metadata of Kafka topics #9146

Merged
merged 8 commits into from
Jun 13, 2023

Conversation

Rustin170506
Copy link
Member

@Rustin170506 Rustin170506 commented Jun 7, 2023

This is an automated cherry-pick of #9060 and #8893 and #8938

What problem does this PR solve?

Issue Number: close #8959 close #8957 #8892

What is changed and how it works?

  • Get the metadata by needs instead of getting all topic's metadata.
  • Support OAuth.
  • Support Kop.

Check List

Tests

  • Unit test
  • Manual test

Questions

Will it cause performance regression or break compatibility?

No

Do you need to update user documentation, design documentation or monitoring documentation?

No

Release note

Optimize the performance when TiCDC gets the metadata of Kafka topics

…fka topics

Signed-off-by: hi-rustin <rustin.liu@gmail.com>
@ti-chi-bot ti-chi-bot bot added do-not-merge/cherry-pick-not-approved release-note Denotes a PR that will be considered when it comes time to generate release notes. labels Jun 7, 2023
@ti-chi-bot ti-chi-bot bot requested review from okJiang and sdojjy June 7, 2023 07:37
@ti-chi-bot ti-chi-bot bot added the size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. label Jun 7, 2023
Signed-off-by: hi-rustin <rustin.liu@gmail.com>
@Rustin170506 Rustin170506 changed the title pkg/sink(ticdc): add GetTopicConfig support sink(ticdc): Optimize the performance when getting the metadata of Kafka topics Jun 7, 2023
@Rustin170506 Rustin170506 requested review from nongfushanquan and removed request for okJiang June 7, 2023 08:36
Signed-off-by: hi-rustin <rustin.liu@gmail.com>
Signed-off-by: hi-rustin <rustin.liu@gmail.com>
Signed-off-by: hi-rustin <rustin.liu@gmail.com>
@ti-chi-bot ti-chi-bot bot added the approved label Jun 8, 2023
… (pingcap#9095)

close pingcap#8865

Signed-off-by: hi-rustin <rustin.liu@gmail.com>

Fix

Signed-off-by: hi-rustin <rustin.liu@gmail.com>

Fix tidy

Signed-off-by: hi-rustin <rustin.liu@gmail.com>

Update docs

Signed-off-by: hi-rustin <rustin.liu@gmail.com>
@Rustin170506
Copy link
Member Author

Rustin170506 commented Jun 8, 2023

Tested locally:

  1. create the pulsar and KOP sever with docker
  2. start tidb cluster
  3. start ticdc server
  4. create tenant and namespace
bin/pulsar-admin --auth-plugin org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2 --auth-params '{"privateKey":"/conf/client_credentials.json", "issuerUrl":"https://dev-x.us.auth0.com", "audience":"pulsar"}' tenants create ticdc
bin/pulsar-admin --auth-plugin org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2 --auth-params '{"privateKey":"/conf/client_credentials.json", "issuerUrl":"https://dev-x.us.auth0.com", "audience":"pulsar"}'  tenants list
bin/pulsar-admin --auth-plugin org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2 --auth-params '{"privateKey":"/conf/client_credentials.json", "issuerUrl":"https://dev-x.us.auth0.com", "audience":"pulsar"}'  namespaces create ticdc/test
bin/pulsar-admin --auth-plugin org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2 --auth-params '{"privateKey":"/conf/client_credentials.json", "issuerUrl":"https://dev-x.us.auth0.com", "audience":"pulsar"}'  namespaces list ticdc
  1. create a changefeed with config:
./cdc cli changefeed create --sink-uri="kafka://10.x.27:19092/ticdc/test/a?max-message-bytes=1048576&protoco
l=open-protocol&kafka-version=2.0.0&replication-factor=1" --config=changefeed.toml
[2023/06/08 16:33:29.934 +08:00] [WARN] [kafka.go:580] ["Kafka config item not found"] [configName=min.insync.replicas]
[2023/06/08 16:33:29.948 +08:00] [WARN] [kafka.go:494] ["partition-num is not set, use the default partition count"] [topic=ticdc/test/a] [partitions=3]
[2023/06/08 16:33:30.298 +08:00] [WARN] [mq.go:202] ["resolved ts buffer is closed"] [namespace=] [changefeed=] [role=cdc-client]
[2023/06/08 16:33:30.299 +08:00] [WARN] [mq_flush_worker.go:117] ["MQ sink flush worker channel closed"]
Create changefeed successfully!
ID: 7d1a813f-c0f8-4866-8672-5e20cee471c6
Info: {"upstream-id":0,"sink-uri":"kafka://10.x27:19092/ticdc/test/a?max-message-bytes=1048576\u0026protocol=open-protocol\u0026kafka-version=2.0.0\u0026replication-factor=1","opts":{},"create-time":"2023-06-08T16:33:27.673989417+08:00","start-ts":442030675094667266,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"open-protocol","dispatchers":null,"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"kafka-config":{"sasl-mechanism":"OAUTHBEARER","sasl-oauth-client-id":"x","sasl-oauth-client-secret":"x==","sasl-oauth-token-url":"https://dev-x.us.auth0.com/oauth/token","sasl-oauth-grant-type":"client_credentials","sasl-oauth-audience":"pulsar"}},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""}},"state":"normal","error":null,"sync-point-enabled":false,"sync-point-interval":600000000000,"creator-version":"v6.1.0-master"}
  1. check the topic created: bin/pulsar-admin --auth-plugin org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2 --auth-params '{"privateKey":"/conf/client_credentials.json", "issuerUrl":"https://dev-x.us.auth0.com", "audience":"pulsar"}' topics list-partitioned-topics ticdc/test
  2. try to create table and write some data, consume the topic:
bin/pulsar-client --auth-plugin org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2 --auth-params '{"privateKey":"/conf/client_credentials.json", "issuerUrl":"https://dev-x.us.auth0.com", "audience":"pulsar"}'  consume -s sub ticdc/test/a  -n 0

----- got message -----
key:[AAAAAAAAAAEAAAAAAAAAH3sidHMiOjQ0MjAzMDg4NTA1ODkwNDA2NSwidCI6M30=], properties:[], content:
----- got message -----
key:[AAAAAAAAAAEAAAAAAAAAR3sidHMiOjQ0MjAzMDg4NTM3MzQ3Njg2Niwic2NtIjoidGVzdCIsInRibCI6InRwX2ludF9sejQiLCJyaWQiOjMsInQiOjF9], properties:[], content:?{"u":{"c_bigint":{"t":8,"f":65,"v":8},"c_int":{"t":3,"f":65,"v":4},"c_mediumint":{"t":9,"f":65,"v":3},"c_smallint":{"t":2,"f":65,"v":2},"c_tinyint":{"t":1,"f":65,"v":1},"id":{"t":3,"h":true,"f":11,"v":3}}}

Test it with auto-create=false:
exist:

bin git:(pr/hi-rustin/9146) ./cdc cli changefeed create --sink-uri="kafka://10.x.27:19092/ticdc/test/a?max-message-bytes=1048576&protocol=open-protocol&kafka-version=2.0.0&replication-factor=1&auto-create-topic=false" --config=cha
ngefeed.toml
[2023/06/08 16:48:40.121 +08:00] [WARN] [kafka.go:449] ["topic's `max.message.bytes` less than the `max-message-bytes`,use topic's `max.message.bytes` to initialize the Kafka producer"] [max.message.bytes=1000012] [max-message-bytes=1048576]
Create changefeed successfully!
ID: e17c39ff-85b4-469f-9f87-e069ff45a9dd
Info: {"upstream-id":0,"sink-uri":"kafka://10x.27:19092/ticdc/test/a?max-message-bytes=1048576\u0026protocol=open-protocol\u0026kafka-version=2.0.0\u0026replication-factor=1\u0026auto-create-topic=false","opts":{},"create-time":"2023-06-08T16:48:39.01068468+08:00","start-ts":442030913999863809,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"open-protocol","dispatchers":null,"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"kafka-config":{"sasl-mechanism":"OAUTHBEARER","sasl-oauth-client-id":"x","sasl-oauth-client-secret":"x==","sasl-oauth-token-url":"https://dev-x.us.auth0.com/oauth/token","sasl-oauth-grant-type":"client_credentials","sasl-oauth-audience":"pulsar"}},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""}},"state":"normal","error":null,"sync-point-enabled":false,"sync-point-interval":600000000000,"creator-version":"v6.1.0-master"}

non-exist:

./cdc cli changefeed create --sink-uri="kafka://1x27:19092/ticdc/test/b?max-message-bytes=1048576&protoco
l=open-protocol&kafka-version=2.0.0&replication-factor=1&auto-create-topic=false" --config=changefeed.toml
[2023/06/08 16:50:15.507 +08:00] [WARN] [kafka.go:580] ["Kafka config item not found"] [configName=min.insync.replicas]
[2023/06/08 16:50:15.512 +08:00] [WARN] [kafka.go:494] ["partition-num is not set, use the default partition count"] [topic=ticdc/test/b] [partitions=3]
Error: [CDC:ErrKafkaCreateTopic]kafka create topic failed: [CDC:ErrKafkaInvalidConfig]`auto-create-topic` is false, and ticdc/test/b not found
Usage:
  cdc cli changefeed create [flags]

Flags:
  -c, --changefeed-id string              Replication task (changefeed) ID
      --config string                     Path of the configuration file
      --cyclic-filter-replica-ids uints   (Experimental) Cyclic replication filter replica ID of changefeed (default [])
      --cyclic-replica-id uint            (Experimental) Cyclic replication replica ID of changefeed
      --cyclic-sync-ddl                   (Experimental) Cyclic replication sync DDL of changefeed (default true)
      --disable-gc-check                  Disable GC safe point check
  -h, --help                              help for create
      --no-confirm                        Don't ask user whether to ignore ineligible table
      --opts key=value                    Extra options, in the key=value format
      --schema-registry string            Avro Schema Registry URI
      --sink-uri string                   sink uri
      --sort-engine string                sort engine used for data sort (default "unified")
      --start-ts uint                     Start ts of changefeed
      --sync-interval duration            (Experimental) Set the interval for syncpoint in replication(default 10min) (default 10m0s)
      --sync-point                        (Experimental) Set and Record syncpoint in replication(default off)
      --target-ts uint                    Target ts of changefeed
      --tz string                         timezone used when checking sink uri (changefeed timezone is determined by cdc server) (default "SYSTEM")

Global Flags:
      --ca string          CA certificate path for TLS connection
      --cert string        Certificate path for TLS connection
  -i, --interact           Run cdc cli with readline
      --key string         Private key path for TLS connection
      --log-level string   log level (etc: debug|info|warn|error) (default "warn")
      --pd string          PD address, use ',' to separate multiple PDs (default "http://127.0.0.1:2379")

[CDC:ErrKafkaCreateTopic]kafka create topic failed: [CDC:ErrKafkaInvalidConfig]`auto-create-topic` is false, and ticdc/test/b not found

@ti-chi-bot ti-chi-bot bot added size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. and removed size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. labels Jun 8, 2023
@VelocityLight VelocityLight added the cherry-pick-approved Cherry pick PR approved by release team. label Jun 9, 2023
@pingcap pingcap deleted a comment from ti-chi-bot bot Jun 12, 2023
@ti-chi-bot ti-chi-bot bot added needs-1-more-lgtm Indicates a PR needs 1 more LGTM. and removed approved labels Jun 12, 2023
@ti-chi-bot ti-chi-bot added the cherry-pick-approved Cherry pick PR approved by release team. label Jun 12, 2023
@ti-chi-bot
Copy link
Contributor

ti-chi-bot bot commented Jun 12, 2023

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: asddongmen, nongfushanquan

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot added lgtm approved and removed needs-1-more-lgtm Indicates a PR needs 1 more LGTM. labels Jun 12, 2023
@ti-chi-bot
Copy link
Contributor

ti-chi-bot bot commented Jun 12, 2023

[LGTM Timeline notifier]

Timeline:

  • 2023-06-12 09:38:13.948991358 +0000 UTC m=+145243.403752653: ☑️ agreed by nongfushanquan.
  • 2023-06-12 09:55:25.78394434 +0000 UTC m=+146275.238705658: ☑️ agreed by asddongmen.

@Rustin170506
Copy link
Member Author

/run-dm-integration-test

2 similar comments
@Rustin170506
Copy link
Member Author

/run-dm-integration-test

@Rustin170506
Copy link
Member Author

/run-dm-integration-test

@ti-chi-bot ti-chi-bot bot added the needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. label Jun 12, 2023
@ti-chi-bot ti-chi-bot bot removed the needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. label Jun 13, 2023
@Rustin170506
Copy link
Member Author

/run-all-tests

@nongfushanquan
Copy link
Contributor

/retest

@Rustin170506
Copy link
Member Author

/run-dm-integration-test

/run-integration-test

@Rustin170506
Copy link
Member Author

/run-all-tests

@Rustin170506
Copy link
Member Author

/test verify

@Rustin170506
Copy link
Member Author

/test verify

/run-dm-integration-test

@ti-chi-bot
Copy link
Contributor

ti-chi-bot bot commented Jun 13, 2023

@hi-rustin: The specified target(s) for /test were not found.
The following commands are available to trigger required jobs:

  • /debug cdc-integration-kafka-test
  • /debug cdc-integration-mysql-test
  • /debug dm-compatibility-test
  • /debug dm-integration-test
  • /test verify

Use /test all to run the following jobs that were automatically triggered:

  • pingcap/tiflow/release-6.1/ghpr_verify

In response to this:

/test verify

/run-dm-integration-test

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@Rustin170506
Copy link
Member Author

/test verify

@Rustin170506
Copy link
Member Author

/run-dm-integration-test

@ti-chi-bot ti-chi-bot bot merged commit e94472f into pingcap:release-6.1 Jun 13, 2023
@ti-chi-bot ti-chi-bot removed the cherry-pick-approved Cherry pick PR approved by release team. label Jul 12, 2023
@ti-chi-bot
Copy link
Contributor

ti-chi-bot bot commented Jul 12, 2023

This cherry pick PR is for a release branch and has not yet been approved by release team.
Adding the do-not-merge/cherry-pick-not-approved label.

To merge this cherry pick, it must first be approved by the collaborators.

AFTER it has been approved by collaborators, please ping the release team in a comment to request a cherry pick review.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved do-not-merge/cherry-pick-not-approved lgtm release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants