-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36114] Make SchemaRegistryRequestHandler
thread safe by blocking more schema change events
#3563
Conversation
…king more schema change events
...-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
Outdated
Show resolved
Hide resolved
.../org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
Outdated
Show resolved
Hide resolved
.../org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
Outdated
Show resolved
Hide resolved
Thanks @yuxiqian for the contribution, I left some comments @loserwang1024 Would you like to take a look at this PR whrn you have time? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have left some comments.
...-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
Show resolved
Hide resolved
...-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
Outdated
Show resolved
Hide resolved
...-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
Outdated
Show resolved
Hide resolved
TableId tableId = event.tableId(); | ||
Optional<Schema> latestSchema = getLatestOriginalSchema(tableId); | ||
return Boolean.TRUE.equals( | ||
SchemaChangeEventVisitor.visit( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder we should do it in registry or sink? @leonardBang , WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @yuxiqian for the cooperation fix, LGTM, I believe your effort will make the schema evolution more stable.
…afe by blocking subsequent schemaChangeEvent This closes apache#3563. Co-authored-by: Hongshun Wang <loserwang1024@gmail.com> (cherry picked from commit ee843e2)
…afe by blocking subsequent schemaChangeEvent This closes apache#3563. Co-authored-by: Hongshun Wang <loserwang1024@gmail.com> (cherry picked from commit ee843e2)
…afe by blocking subsequent schemaChangeEvent This closes apache#3563. Co-authored-by: Hongshun Wang <loserwang1024@gmail.com>
…afe by blocking subsequent schemaChangeEvent This closes apache#3563. Co-authored-by: Hongshun Wang <loserwang1024@gmail.com>
This closes FLINK-36114.
Currently, SchemaRegistry asynchronously receives schema change requests from SchemaOperator, and results of multiple requests might got mixed up together, causing incorrect logic flow in multiple parallelism cases.
Changing SchemaRegistry's behavior to accept requests in serial should resolve this problem.
This PR also changes MySQL pipeline source behavior.
Previously, some subTask might not emit CreateTableEvents if there are no corresponding SnapshotSplit assigned to them. This might cause problems if they are assigned some StreamSplits later, since their downstream Transform nodes never received CreateTableEvent and doesn't know about the schema.
By forcing MySQL source emitting CreateTableEvents when transiting from Snapshot to BinLog stage should resolve this problem.
This PR also adjusts Pipeline E2e test case parallelism to 1 to 4 to verify this change.