-
Notifications
You must be signed in to change notification settings - Fork 13.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint. #19972
[FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint. #19972
Conversation
cbf393b
to
1ae927b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed until the NormlSplitAssigner. Will continue review tomorrow.
...rc/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
Show resolved
Hide resolved
@PatrickRen Can you also have a look at this PR? I've understood that this PR should help resolve this blocker test stability FLINK-26721 |
1ae927b
to
12049bf
Compare
@MartijnVisser Thank you. I'll review the patch in this week. Actually I ever try to request myself as a reviewer but forget several times >_< |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally looks good. The effective changes are:
- Changing
SplitAssignmentState
toSplitAssigner
- Create subscription via PulsarAdmin API instead of consumer API and seek.
These significant changes are reasonable from my perspective.
Comments inline.
...src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/MessageIdUtils.java
Outdated
Show resolved
Hide resolved
...org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java
Outdated
Show resolved
Hide resolved
...a/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java
Show resolved
Hide resolved
...org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java
Outdated
Show resolved
Hide resolved
...sar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
Outdated
Show resolved
Hide resolved
...a/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java
Show resolved
Hide resolved
.../java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java
Outdated
Show resolved
Hide resolved
...java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerTestBase.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
Show resolved
Hide resolved
@syhily Can you resolve the latest review comments? |
...n/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java
Show resolved
Hide resolved
...n/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NormalSplitAssigner.java
Outdated
Show resolved
Hide resolved
...n/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java
Show resolved
Hide resolved
...n/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NormalSplitAssigner.java
Outdated
Show resolved
Hide resolved
12049bf
to
21eda6f
Compare
2baa322
to
f807c4a
Compare
...org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java
Outdated
Show resolved
Hide resolved
f807c4a
to
938be57
Compare
…top cursor for better handle the consuming behaviors.
…ad seek every time. This should fix the wrong position setting.
…it stops at correct space
938be57
to
7ea6b3c
Compare
@tisonkun Finally, the ci turns green. |
…ting logic for better handle the checkpoint. (apache#19972) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171 (cherry picked from commit 18d21a0)
…ting logic for better handle the checkpoint. (apache#19972) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
…ting logic for better handle the checkpoint. (apache#19972) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
…ting logic for better handle the checkpoint. (apache#19972) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
…ting logic for better handle the checkpoint. (apache#19972) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
…ting logic for better handle the checkpoint. (#19972) (#20565) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
…ting logic for better handle the checkpoint. (#19972) (#20564) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
…ting logic for better handle the checkpoint. (apache#19972) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
…ting logic for better handle the checkpoint. (apache#19972) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
What is the purpose of the change
Brief change log
This task modifies the
flink-connector-pulsar
module, adds some new mechanisms to set the initial consuming position.StartCursor
, add new useful methods, and deprecate the confusedfromMessageTime()
method.StopCursor
, and add new useful methods.SplitAssigner
for assigning the splits among the Pulsar readers. Make the partition assignment logic clear and testable.Verifying this change
This change is already covered by existing tests, such as:
PulsarSourceITCase
PulsarSourceEnumeratorTest
PulsarOrderedPartitionSplitReaderTest
We add new tests for new partition assign logic:
NormalSplitAssignerTest
SharedSplitAssignerTest
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes)Documentation