-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36086] Set isSnapshotCompleted only immediately after snapshot #3552
Conversation
@GOODBOY008 Would you like to take a look at this PR when you have time? |
@GOODBOY008 do you have an idea why the test introduced in #2176 passes even with the fix reverted? I'd like to make sure that I'm not breaking the original fix. I would also appreciate advise on testing the restart-mid-transaction scenario, if that's possible. |
@morozov , I had try remove |
This pull request has been automatically marked as stale because it has not had recent activity for 60 days. It will be closed in 30 days if no further activity occurs. |
@leonardBang, @GOODBOY008, what are the next steps to get this merged? |
Hey @GOODBOY008 Would you like to review this PR when you have time ? |
@leonardBang this PR is getting merge conflicts and may require extra care. Are you interested in getting this applied? |
@morozov I will design and add unit tests to verify the effectiveness of the changes. |
Fixed in #3873. |
If the SQL Server source connector is restarted while handling updates from a transaction with multiple updates, upon restart, it will skip the non-processed changes and proceed from the next transaction.
This is an analog of DBZ-1128 but reproducible only in Flink CDC.
This is a regression introduced in #2176.
Lower-level details
The
isSnapshotCompleted
offset context flag in Debezium tells the source connector to jump one transaction ahead in the beginning of the streaming phase. This is only necessary during the transition from the initial state snapshot to streaming. Without this flag set, the streaming change data source would stream the changes from the transaction that was already included in the snapshot. This is is the issue that #2176 attempted to address.The following line sets this flag unconditionally for the stream split:
flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerStreamFetchTask.java
Line 59 in c5396fb
It makes Debezium jump one transaction ahead not only after completing the initial state snapshot but always during the start of streaming. This way, it may potentially skip changes from a transaction that wasn't fully captured prior to the restart.
The proposed solution is to set it only during the transition from the initial state snapshot to streaming.
Testing considerations
start(..., Predicate<SourceRecord> isStopRecord)
in the Debezium testing framework). How could I implement a similar case in Flink CDC?