Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-34634]Fix that Restarting the job will not read the changelog anymore if it stops before the synchronization of meta information is complete and some table is removed #3134

Merged
merged 5 commits into from
Apr 12, 2024

Conversation

loserwang1024
Copy link
Contributor

@loserwang1024 loserwang1024 commented Mar 13, 2024

What's the problem

Once, I removed a table from the option and then restarted the job from the savepoint, but the job couldn't read the binlog anymore. When I checked the logs, I found an Error level log stating: ' The enumerator received invalid request meta group id 6, the valid meta group id range is [0, 4].'

It appears that the Reader is requesting more splits than the Enumerator is aware of.
However, the code should indeed remove redundant split information from the Reader as seen in #2292. So why does this issue occur?

why occurs

image
Upon examining the code, I discovered the cause. If the job stops before completing all the split meta information and then restarts, this issue occurs. Suppose that the totalFinishedSplitSize of binlogSplit in the Reader is 6, and no meta information has been synchronized, leaving the finishedSnapshotSplitInfos of binlogSplit in the Reader empty. After restarting, the totalFinishedSplitSize of binlogSplit in the Reader equals (6 - (0 - 0)) which is still 6, but in the Enumerator, it is only 4(the removed table have two split). This could lead to an out-of-range request.
image

How to reproduce

  • Add Thread.sleep(1000L) in com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader#handleSourceEvents to postpone split meta infos synchronization.
public void handleSourceEvents(SourceEvent sourceEvent) {
else if (sourceEvent instanceof BinlogSplitMetaEvent) {
    LOG.debug(
            "Source reader {} receives binlog meta with group id {}.",
            subtaskId,
            ((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId());
    try {
        Thread.sleep(1000L);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    fillMetadataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent);
} 
  • Add Thread.sleep(500L) in com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#testRemoveTablesOneByOne to trigger savepoint before meta infos synchronization finishes.
// step 2: execute insert and trigger savepoint with all tables added
{
    // ..ingore 

    waitForSinkSize("sink", fetchedDataList.size());
    Thread.sleep(500L);
    assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
    finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
    jobClient.cancel().get();
}

// test removing table one by one, note that there should be at least one table remaining
for (int round = 0; round < captureAddressTables.length - 1; round++) {
...
}
  • Add chunk-meta.group.size  =2 in com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#getCreateTableStatement

Finally, run test(com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#testJobManagerFailoverForRemoveTable), the error log will occur.

@loserwang1024
Copy link
Contributor Author

@ruanhang1993 , @leonardBang , @PatrickRen , CC

…angelog anymore if it stops before the synchronization of meta information is complete and some table is removed
… changelog anymore if it stops before the synchronization of meta information is complete and some table is removed
@Jiabao-Sun Jiabao-Sun self-requested a review March 29, 2024 01:14
@Jiabao-Sun
Copy link
Contributor

Thanks @loserwang1024 for investigating this issue.

IMHO, I think this fix is a bit complicated.
Can we simply consider that in the previous PR, the split that has not yet completed the meta info synchronization should not be deleted?

@ruanhang1993 what do you think ?

@loserwang1024
Copy link
Contributor Author

loserwang1024 commented Apr 1, 2024

the split that has not yet completed the meta info synchronization should not be deleted?

@Jiabao-Sun What if the snapshot splits are not completed, and then removed from tableList? The meta info won't be removed later , and also won't be finished , thus the job won't come into stream phase .

@Jiabao-Sun
Copy link
Contributor

@loserwang1024 Please correct me if I'm wrong.
In PR-2292, only unnecessary finished snapshot splits were removed from BinlogSplit, which means the checkpoint is already in the stream phase. As for the checkpoint in the snapshot phase, it should not be affected by PR-2292.

@loserwang1024
Copy link
Contributor Author

loserwang1024 commented Apr 1, 2024

@Jiabao-Sun For example: table A, B, C are finished, and each table includes two snapshot infos.

  1. Before stop the job, totalFinishedSplitSize of enumerator and reader is both 6.
  2. The enumerator send 4 snapshot infos(A+B) to reader.
  3. Stop the job, remove table C and then restart reader.
  4. Enumerator start and remove table C's split which is not included anymore . Thus, totalFinishedSplitSize of enumerator is 4 now.
image 5. `the split that has not yet completed the meta info synchronization should not be deleted`, thus, totalFinishedSplitSize of reader is still 6.

The totalFinishedSplitSize of reader and enumerator is different.

Copy link
Contributor

@Jiabao-Sun Jiabao-Sun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying, it makes sense to me.

final int expectedMetaGroupId =
ChunkUtils.getNextMetaGroupId(
binlogSplit.getFinishedSnapshotSplitInfos().size(),
sourceConfig.getSplitMetaGroupSize());
if (receivedMetaGroupId == expectedMetaGroupId) {
if (receivedTotalFinishedSplitSize < binlogSplit.getTotalFinishedSplitSize()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some logs here.

@PatrickRen PatrickRen merged commit 48ca862 into apache:master Apr 12, 2024
13 checks passed
wuzhenhua01 pushed a commit to wuzhenhua01/flink-cdc-connectors that referenced this pull request Aug 4, 2024
…ead the changelog anymore if it stops before the synchronization of meta information is complete and some table is removed (apache#3134)
ChaomingZhangCN pushed a commit to ChaomingZhangCN/flink-cdc that referenced this pull request Jan 13, 2025
…ead the changelog anymore if it stops before the synchronization of meta information is complete and some table is removed (apache#3134)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants