Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint. #19972

Merged
merged 4 commits into from
Aug 12, 2022

Conversation

syhily
Copy link
Contributor

@syhily syhily commented Jun 15, 2022

What is the purpose of the change

Brief change log

This task modifies the flink-connector-pulsar module, adds some new mechanisms to set the initial consuming position.

  • Change StartCursor, add new useful methods, and deprecate the confused fromMessageTime() method.
  • Change StopCursor, and add new useful methods.
  • Introduce a new SplitAssigner for assigning the splits among the Pulsar readers. Make the partition assignment logic clear and testable.
  • Change the start position seeking mechanism from Pulsar consumer API to Pulsar admin API. Don't reset the start position when the topic has a subscription.

Verifying this change

This change is already covered by existing tests, such as:

  • PulsarSourceITCase
  • PulsarSourceEnumeratorTest
  • PulsarOrderedPartitionSplitReaderTest

We add new tests for new partition assign logic:

  • NormalSplitAssignerTest
  • SharedSplitAssignerTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduces a new feature? (yes)
  • If yes, how is the feature documented? (docs)

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 15, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@syhily syhily changed the title [FLINK-27399][Connector/Pulsar] Modify start cursor and stop cursor, change initial position setting logic. [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint. Jun 15, 2022
@syhily syhily force-pushed the feature/new-seek-lifecycle branch 11 times, most recently from cbf393b to 1ae927b Compare July 8, 2022 18:34
Copy link
Contributor

@imaffe imaffe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed until the NormlSplitAssigner. Will continue review tomorrow.

@MartijnVisser MartijnVisser requested a review from PatrickRen July 11, 2022 07:31
@MartijnVisser
Copy link
Contributor

@PatrickRen Can you also have a look at this PR? I've understood that this PR should help resolve this blocker test stability FLINK-26721

@syhily syhily force-pushed the feature/new-seek-lifecycle branch from 1ae927b to 12049bf Compare July 11, 2022 22:19
@MartijnVisser MartijnVisser requested a review from tisonkun July 12, 2022 13:31
@MartijnVisser
Copy link
Contributor

@tisonkun I've understood from @wuchong that you might also want to help/have a look at this PR, therefore I've tagged you.

@tisonkun
Copy link
Member

@MartijnVisser Thank you. I'll review the patch in this week. Actually I ever try to request myself as a reviewer but forget several times >_<

Copy link
Member

@tisonkun tisonkun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looks good. The effective changes are:

  1. Changing SplitAssignmentState to SplitAssigner
  2. Create subscription via PulsarAdmin API instead of consumer API and seek.

These significant changes are reasonable from my perspective.

Comments inline.

@MartijnVisser
Copy link
Contributor

@syhily Can you resolve the latest review comments?

@syhily syhily force-pushed the feature/new-seek-lifecycle branch from 12049bf to 21eda6f Compare July 28, 2022 08:22
@syhily syhily requested review from imaffe and a49a and removed request for imaffe August 11, 2022 19:16
@syhily syhily force-pushed the feature/new-seek-lifecycle branch from 2baa322 to f807c4a Compare August 12, 2022 03:24
@tisonkun tisonkun requested review from tisonkun and removed request for a49a August 12, 2022 03:38
@syhily syhily force-pushed the feature/new-seek-lifecycle branch from f807c4a to 938be57 Compare August 12, 2022 05:18
@syhily syhily force-pushed the feature/new-seek-lifecycle branch from 938be57 to 7ea6b3c Compare August 12, 2022 07:31
@syhily
Copy link
Contributor Author

syhily commented Aug 12, 2022

@tisonkun Finally, the ci turns green.

@tisonkun tisonkun merged commit 18d21a0 into apache:master Aug 12, 2022
@syhily syhily deleted the feature/new-seek-lifecycle branch August 13, 2022 06:15
syhily added a commit to streamnative/flink that referenced this pull request Aug 13, 2022
…ting logic for better handle the checkpoint. (apache#19972)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171

(cherry picked from commit 18d21a0)
syhily added a commit to streamnative/flink that referenced this pull request Aug 13, 2022
…ting logic for better handle the checkpoint. (apache#19972)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171
syhily added a commit to streamnative/flink that referenced this pull request Aug 13, 2022
…ting logic for better handle the checkpoint. (apache#19972)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171
syhily added a commit to streamnative/flink that referenced this pull request Aug 13, 2022
…ting logic for better handle the checkpoint. (apache#19972)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171
syhily added a commit to streamnative/flink that referenced this pull request Aug 14, 2022
…ting logic for better handle the checkpoint. (apache#19972)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171
tisonkun pushed a commit that referenced this pull request Aug 14, 2022
…ting logic for better handle the checkpoint. (#19972) (#20565)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171
tisonkun pushed a commit that referenced this pull request Aug 14, 2022
…ting logic for better handle the checkpoint. (#19972) (#20564)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171
huangxiaofeng10047 pushed a commit to huangxiaofeng10047/flink that referenced this pull request Nov 3, 2022
…ting logic for better handle the checkpoint. (apache#19972)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171
Rodger-zk pushed a commit to Rodger-zk/Rodger-zk-bkbase-flink that referenced this pull request Dec 3, 2024
…ting logic for better handle the checkpoint. (apache#19972)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants