-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-34634]Fix that Restarting the job will not read the changelog anymore if it stops before the synchronization of meta information is complete and some table is removed #3134
Conversation
@ruanhang1993 , @leonardBang , @PatrickRen , CC |
…ta group calculation
…angelog anymore if it stops before the synchronization of meta information is complete and some table is removed
… changelog anymore if it stops before the synchronization of meta information is complete and some table is removed
bd55643
to
32bc3d7
Compare
Thanks @loserwang1024 for investigating this issue. IMHO, I think this fix is a bit complicated. @ruanhang1993 what do you think ? |
@Jiabao-Sun What if the snapshot splits are not completed, and then removed from tableList? The meta info won't be removed later , and also won't be finished , thus the job won't come into stream phase . |
@loserwang1024 Please correct me if I'm wrong. |
@Jiabao-Sun For example: table A, B, C are finished, and each table includes two snapshot infos.
![]() The totalFinishedSplitSize of reader and enumerator is different. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for clarifying, it makes sense to me.
...main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
Show resolved
Hide resolved
final int expectedMetaGroupId = | ||
ChunkUtils.getNextMetaGroupId( | ||
binlogSplit.getFinishedSnapshotSplitInfos().size(), | ||
sourceConfig.getSplitMetaGroupSize()); | ||
if (receivedMetaGroupId == expectedMetaGroupId) { | ||
if (receivedTotalFinishedSplitSize < binlogSplit.getTotalFinishedSplitSize()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some logs here.
...java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java
Show resolved
Hide resolved
.../main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaEvent.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaEvent.java
Outdated
Show resolved
Hide resolved
…ead the changelog anymore if it stops before the synchronization of meta information is complete and some table is removed (apache#3134)
…ead the changelog anymore if it stops before the synchronization of meta information is complete and some table is removed (apache#3134)
What's the problem
Once, I removed a table from the option and then restarted the job from the savepoint, but the job couldn't read the binlog anymore. When I checked the logs, I found an Error level log stating: ' The enumerator received invalid request meta group id 6, the valid meta group id range is [0, 4].'
It appears that the Reader is requesting more splits than the Enumerator is aware of.
However, the code should indeed remove redundant split information from the Reader as seen in #2292. So why does this issue occur?
why occurs
Upon examining the code, I discovered the cause. If the job stops before completing all the split meta information and then restarts, this issue occurs. Suppose that the totalFinishedSplitSize of binlogSplit in the Reader is 6, and no meta information has been synchronized, leaving the finishedSnapshotSplitInfos of binlogSplit in the Reader empty. After restarting, the totalFinishedSplitSize of binlogSplit in the Reader equals (6 - (0 - 0)) which is still 6, but in the Enumerator, it is only 4(the removed table have two split). This could lead to an out-of-range request.
How to reproduce
Finally, run test(com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#testJobManagerFailoverForRemoveTable), the error log will occur.