Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35984][transform] Fix: Job crashes when metadata column names present in transform rules #3528

Merged
merged 3 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,22 +152,22 @@ private TransformExpressionKey generateTransformExpressionKey() {
}
}
}
if (scriptExpression.contains(TransformParser.DEFAULT_NAMESPACE_NAME)
&& !argumentNames.contains(TransformParser.DEFAULT_NAMESPACE_NAME)) {
argumentNames.add(TransformParser.DEFAULT_NAMESPACE_NAME);
paramTypes.add(String.class);
}

if (scriptExpression.contains(TransformParser.DEFAULT_SCHEMA_NAME)
&& !argumentNames.contains(TransformParser.DEFAULT_SCHEMA_NAME)) {
argumentNames.add(TransformParser.DEFAULT_SCHEMA_NAME);
paramTypes.add(String.class);
}

if (scriptExpression.contains(TransformParser.DEFAULT_TABLE_NAME)
&& !argumentNames.contains(TransformParser.DEFAULT_TABLE_NAME)) {
argumentNames.add(TransformParser.DEFAULT_TABLE_NAME);
paramTypes.add(String.class);
for (String originalColumnName : originalColumnNames) {
switch (originalColumnName) {
case TransformParser.DEFAULT_NAMESPACE_NAME:
argumentNames.add(TransformParser.DEFAULT_NAMESPACE_NAME);
paramTypes.add(String.class);
break;
case TransformParser.DEFAULT_SCHEMA_NAME:
argumentNames.add(TransformParser.DEFAULT_SCHEMA_NAME);
paramTypes.add(String.class);
break;
case TransformParser.DEFAULT_TABLE_NAME:
argumentNames.add(TransformParser.DEFAULT_TABLE_NAME);
paramTypes.add(String.class);
break;
}
}

argumentNames.add(JaninoCompiler.DEFAULT_TIME_ZONE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private static RelNode sqlToRel(
List<Column> columns,
SqlNode sqlNode,
List<UserDefinedFunctionDescriptor> udfDescriptors) {
List<Column> columnsWithMetadata = copyFillMetadataColumn(sqlNode.toString(), columns);
List<Column> columnsWithMetadata = copyFillMetadataColumn(columns);
CalciteSchema rootSchema = CalciteSchema.createRootSchema(true);
SchemaPlus schema = rootSchema.plus();
Map<String, Object> operand = new HashMap<>();
Expand Down Expand Up @@ -498,29 +498,15 @@ private static SqlSelect parseProjectionExpression(String projection) {
return parseSelect(statement.toString());
}

private static List<Column> copyFillMetadataColumn(
String transformStatement, List<Column> columns) {
private static List<Column> copyFillMetadataColumn(List<Column> columns) {
// Add metaColumn for SQLValidater.validate
MOBIN-F marked this conversation as resolved.
Show resolved Hide resolved
List<Column> columnsWithMetadata = new ArrayList<>(columns);
if (transformStatement.contains(DEFAULT_NAMESPACE_NAME)
&& !containsMetadataColumn(columnsWithMetadata, DEFAULT_NAMESPACE_NAME)) {
columnsWithMetadata.add(
Column.physicalColumn(DEFAULT_NAMESPACE_NAME, DataTypes.STRING()));
}
if (transformStatement.contains(DEFAULT_SCHEMA_NAME)
&& !containsMetadataColumn(columnsWithMetadata, DEFAULT_SCHEMA_NAME)) {
columnsWithMetadata.add(Column.physicalColumn(DEFAULT_SCHEMA_NAME, DataTypes.STRING()));
}
if (transformStatement.contains(DEFAULT_TABLE_NAME)
&& !containsMetadataColumn(columnsWithMetadata, DEFAULT_TABLE_NAME)) {
columnsWithMetadata.add(Column.physicalColumn(DEFAULT_TABLE_NAME, DataTypes.STRING()));
}
columnsWithMetadata.add(Column.physicalColumn(DEFAULT_NAMESPACE_NAME, DataTypes.STRING()));
columnsWithMetadata.add(Column.physicalColumn(DEFAULT_SCHEMA_NAME, DataTypes.STRING()));
columnsWithMetadata.add(Column.physicalColumn(DEFAULT_TABLE_NAME, DataTypes.STRING()));
return columnsWithMetadata;
}

private static boolean containsMetadataColumn(List<Column> columns, String columnName) {
return columns.stream().anyMatch(column -> column.getName().equals(columnName));
}

private static boolean isMetadataColumn(String columnName) {
return DEFAULT_TABLE_NAME.equals(columnName)
|| DEFAULT_SCHEMA_NAME.equals(columnName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,57 @@ public void testMetadataAndCalculatedTransform() throws Exception {
.destroyHarness();
}

@Test
public void testMetadataTransformIncludeMetaColumnString() throws Exception {
TableId tableId = TableId.tableId("my_company", "my_branch", "schema_nullability");
UnifiedTransformTestCase.of(
tableId,
"id, name, age, id + age as computed, __namespace_name__ as metaColNameSpaceName, __schema_name__ as metaColSchemaName, __table_name__ as metaColNameTableName, "
+ "UPPER(__schema_name__) as metaColSchemaNameUpper, '__table_name__' as metaColStr1, '__namespace__name__schema__name__table__name__' as metaColStr2",
"id > 100",
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING().notNull())
.physicalColumn("age", DataTypes.INT().notNull())
.primaryKey("id")
.build(),
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING().notNull())
.physicalColumn("age", DataTypes.INT().notNull())
.primaryKey("id")
.build(),
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING().notNull())
.physicalColumn("age", DataTypes.INT().notNull())
.physicalColumn("computed", DataTypes.INT())
.physicalColumn("metaColNameSpaceName", DataTypes.STRING())
.physicalColumn("metaColSchemaName", DataTypes.STRING())
.physicalColumn("metaColNameTableName", DataTypes.STRING())
.physicalColumn("metaColSchemaNameUpper", DataTypes.STRING())
.physicalColumn("metaColStr1", DataTypes.STRING())
.physicalColumn("metaColStr2", DataTypes.STRING())
.primaryKey("id")
.build())
.initializeHarness()
.insertSource(1000, "Alice", 17)
.insertPreTransformed(1000, "Alice", 17)
.insertPostTransformed(
1000,
"Alice",
17,
1017,
"my_company",
"my_branch",
"schema_nullability",
"MY_BRANCH",
"__table_name__",
"__namespace__name__schema__name__table__name__")
.runTests()
.destroyHarness();
}

@Test
public void testTransformWithCast() throws Exception {
TableId tableId = TableId.tableId("my_company", "my_branch", "transform_with_cast");
Expand Down
Loading