-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-37429][transform] Map each column name to a new name in generated expression #3939
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for Shawn's quick fix! Just left some minor comments.
public TransformFilter( | ||
String expression, | ||
String scriptExpression, | ||
List<String> columnNames, | ||
Map<String, String> columnNameMap) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC TransformFilter
is static and will not be refreshed after schema changes, but columnNameMap
might be changed by altering columns. Will inserting columns in the middle affect the numbering system?
ProjectionColumn
looks fine (since projection stuff should be recreated every time when schema changes), but still need to be verified by an IT case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every TransformExpressionKey
has its own columnNameMap
. If the expression script is not changed, the columnNameMap
will not change. I have added a test for the schema change scenario.
public static Map<String, String> generateColumnNameMap(List<String> originalColumnNames) { | ||
int i = 0; | ||
Map<String, String> columnNameMap = new HashMap<>(); | ||
for (String columnName : originalColumnNames) { | ||
if (!columnNameMap.containsKey(columnName)) { | ||
columnNameMap.put(columnName, MAPPED_COLUMN_NAME_PREFIX + i); | ||
i++; | ||
} | ||
} | ||
return columnNameMap; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor concern: though it's easy and simple to use field\d
as column name, it would produce cryptic error message if Janino throws an exception about generated expressions.
Some options might be:
- We can leave "legal" names as is without mangling them, and only map names that are not valid Java identifiers.
- Try to extract valid characters from original column name, and append them as a hint. For example,
invalid-name
could be mapped tocolumn1_invalidname
(recognizable, at least). - Always provide the column name map information when exceptions are thrown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer option 3.
For option 1, seems it is not very easy to judge a column name is legal or not considering every java keyword is illegal.
For option 2, a corner case is there are two columns named a-b
and a.b
and we still cannot recognize them after mapping.
Have added column name map information to the exception log.
...rc/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
Outdated
Show resolved
Hide resolved
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
Outdated
Show resolved
Hide resolved
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Covering this case in FlinkPipelineTransformITCase
would be nice, but it's not a must for now.
...src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Shawn-Hx for the contribution, LGTM
...src/main/java/org/apache/flink/cdc/connectors/values/source/TimestampTypeMetadataColumn.java
Outdated
Show resolved
Hide resolved
…line-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/TimestampTypeMetadataColumn.java Co-authored-by: Leonard Xu <leonard@apache.org>
This close FLINK-37429, FLINK-37326