Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37429][transform] Map each column name to a new name in generated expression #3939

Merged
merged 5 commits into from
Mar 11, 2025

Conversation

Shawn-Hx
Copy link
Contributor

@Shawn-Hx Shawn-Hx commented Mar 6, 2025

This close FLINK-37429, FLINK-37326

Copy link
Contributor

@yuxiqian yuxiqian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for Shawn's quick fix! Just left some minor comments.

Comment on lines +47 to +51
public TransformFilter(
String expression,
String scriptExpression,
List<String> columnNames,
Map<String, String> columnNameMap) {
Copy link
Contributor

@yuxiqian yuxiqian Mar 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC TransformFilter is static and will not be refreshed after schema changes, but columnNameMap might be changed by altering columns. Will inserting columns in the middle affect the numbering system?

ProjectionColumn looks fine (since projection stuff should be recreated every time when schema changes), but still need to be verified by an IT case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every TransformExpressionKey has its own columnNameMap. If the expression script is not changed, the columnNameMap will not change. I have added a test for the schema change scenario.

Comment on lines +658 to +668
public static Map<String, String> generateColumnNameMap(List<String> originalColumnNames) {
int i = 0;
Map<String, String> columnNameMap = new HashMap<>();
for (String columnName : originalColumnNames) {
if (!columnNameMap.containsKey(columnName)) {
columnNameMap.put(columnName, MAPPED_COLUMN_NAME_PREFIX + i);
i++;
}
}
return columnNameMap;
}
Copy link
Contributor

@yuxiqian yuxiqian Mar 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor concern: though it's easy and simple to use field\d as column name, it would produce cryptic error message if Janino throws an exception about generated expressions.

Some options might be:

  1. We can leave "legal" names as is without mangling them, and only map names that are not valid Java identifiers.
  2. Try to extract valid characters from original column name, and append them as a hint. For example, invalid-name could be mapped to column1_invalidname (recognizable, at least).
  3. Always provide the column name map information when exceptions are thrown.

Copy link
Contributor Author

@Shawn-Hx Shawn-Hx Mar 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer option 3.
For option 1, seems it is not very easy to judge a column name is legal or not considering every java keyword is illegal.
For option 2, a corner case is there are two columns named a-b and a.b and we still cannot recognize them after mapping.
Have added column name map information to the exception log.

Copy link
Contributor

@yuxiqian yuxiqian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Covering this case in FlinkPipelineTransformITCase would be nice, but it's not a must for now.

Copy link
Contributor

@leonardBang leonardBang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Shawn-Hx for the contribution, LGTM

…line-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/TimestampTypeMetadataColumn.java

Co-authored-by: Leonard Xu <leonard@apache.org>
@leonardBang leonardBang merged commit f309d31 into apache:master Mar 11, 2025
26 checks passed
@Shawn-Hx Shawn-Hx deleted the FLINK-37429 branch March 12, 2025 07:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants