Skip to content

Commit

Permalink
[FLINK-35256][cdc][runtime] Fix transform node does not respect type …
Browse files Browse the repository at this point in the history
…nullability (apache#3272)
  • Loading branch information
yuxiqian authored and wuzhenhua01 committed Aug 4, 2024
1 parent c2eeb30 commit fb58018
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.runtime.operators.transform.ProjectionColumn;
Expand Down Expand Up @@ -154,6 +155,12 @@ public static List<ProjectionColumn> generateProjectionColumns(
.collect(
Collectors.toMap(
RelDataTypeField::getName, RelDataTypeField::getType));

Map<String, Boolean> isNotNullMap =
columns.stream()
.collect(
Collectors.toMap(
Column::getName, column -> !column.getType().isNullable()));
List<ProjectionColumn> projectionColumns = new ArrayList<>();
for (SqlNode sqlNode : sqlSelect.getSelectList()) {
if (sqlNode instanceof SqlBasicCall) {
Expand Down Expand Up @@ -205,21 +212,27 @@ public static List<ProjectionColumn> generateProjectionColumns(
} else if (sqlNode instanceof SqlIdentifier) {
SqlIdentifier sqlIdentifier = (SqlIdentifier) sqlNode;
String columnName = sqlIdentifier.names.get(sqlIdentifier.names.size() - 1);
DataType columnType =
DataTypeConverter.convertCalciteRelDataTypeToDataType(
relDataTypeMap.get(columnName));
if (isMetadataColumn(columnName)) {
projectionColumns.add(
ProjectionColumn.of(
columnName,
DataTypeConverter.convertCalciteRelDataTypeToDataType(
relDataTypeMap.get(columnName)),
// Metadata columns should never be null
columnType.notNull(),
columnName,
columnName,
Arrays.asList(columnName)));
} else {
// Calcite translated column type doesn't keep nullability.
// Appending it manually to circumvent this problem.
projectionColumns.add(
ProjectionColumn.of(
columnName,
DataTypeConverter.convertCalciteRelDataTypeToDataType(
relDataTypeMap.get(columnName))));
isNotNullMap.get(columnName)
? columnType.notNull()
: columnType.nullable()));
}
} else {
throw new ParseException("Unrecognized projection: " + sqlNode.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,26 @@ public class TransformSchemaOperatorTest {
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
.build();

private static final Schema NULLABILITY_SCHEMA =
Schema.newBuilder()
.physicalColumn("id", DataTypes.STRING().notNull())
.physicalColumn("name", DataTypes.STRING())
.primaryKey("id")
.partitionKey("id")
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
.build();

private static final Schema EXPECTED_NULLABILITY_SCHEMA =
Schema.newBuilder()
.physicalColumn("id", DataTypes.STRING().notNull())
.physicalColumn("uid", DataTypes.STRING())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("uname", DataTypes.STRING())
.primaryKey("id")
.partitionKey("id")
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
.build();

@Test
void testEventTransform() throws Exception {
TransformSchemaOperator transform =
Expand Down Expand Up @@ -176,4 +196,33 @@ void testEventTransform() throws Exception {
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(new StreamRecord<>(updateEventExpect));
}

@Test
public void testNullabilityColumn() throws Exception {
TransformSchemaOperator transform =
TransformSchemaOperator.newBuilder()
.addTransform(
CUSTOMERS_TABLEID.identifier(),
"id, upper(id) uid, name, upper(name) uname",
"id",
"id",
"key1=value1,key2=value2")
.build();
EventOperatorTestHarness<TransformSchemaOperator, Event>
transformFunctionEventEventOperatorTestHarness =
new EventOperatorTestHarness<>(transform, 1);
// Initialization
transformFunctionEventEventOperatorTestHarness.open();
// Create table
CreateTableEvent createTableEvent =
new CreateTableEvent(CUSTOMERS_TABLEID, NULLABILITY_SCHEMA);
transform.processElement(new StreamRecord<>(createTableEvent));

Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(
new StreamRecord<>(
new CreateTableEvent(
CUSTOMERS_TABLEID, EXPECTED_NULLABILITY_SCHEMA)));
}
}

0 comments on commit fb58018

Please sign in to comment.