Skip to content

Commit de19f7b

Browse files
authored
[FLINK-35256][runtime] Fix transform node does not respect type nullability (#3277)
1 parent 65a6880 commit de19f7b

File tree

2 files changed

+66
-4
lines changed

2 files changed

+66
-4
lines changed

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java

+17-4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.flink.api.common.io.ParseException;
2121
import org.apache.flink.cdc.common.schema.Column;
22+
import org.apache.flink.cdc.common.types.DataType;
2223
import org.apache.flink.cdc.common.types.DataTypes;
2324
import org.apache.flink.cdc.common.utils.StringUtils;
2425
import org.apache.flink.cdc.runtime.operators.transform.ProjectionColumn;
@@ -154,6 +155,12 @@ public static List<ProjectionColumn> generateProjectionColumns(
154155
.collect(
155156
Collectors.toMap(
156157
RelDataTypeField::getName, RelDataTypeField::getType));
158+
159+
Map<String, Boolean> isNotNullMap =
160+
columns.stream()
161+
.collect(
162+
Collectors.toMap(
163+
Column::getName, column -> !column.getType().isNullable()));
157164
List<ProjectionColumn> projectionColumns = new ArrayList<>();
158165
for (SqlNode sqlNode : sqlSelect.getSelectList()) {
159166
if (sqlNode instanceof SqlBasicCall) {
@@ -205,21 +212,27 @@ public static List<ProjectionColumn> generateProjectionColumns(
205212
} else if (sqlNode instanceof SqlIdentifier) {
206213
SqlIdentifier sqlIdentifier = (SqlIdentifier) sqlNode;
207214
String columnName = sqlIdentifier.names.get(sqlIdentifier.names.size() - 1);
215+
DataType columnType =
216+
DataTypeConverter.convertCalciteRelDataTypeToDataType(
217+
relDataTypeMap.get(columnName));
208218
if (isMetadataColumn(columnName)) {
209219
projectionColumns.add(
210220
ProjectionColumn.of(
211221
columnName,
212-
DataTypeConverter.convertCalciteRelDataTypeToDataType(
213-
relDataTypeMap.get(columnName)),
222+
// Metadata columns should never be null
223+
columnType.notNull(),
214224
columnName,
215225
columnName,
216226
Arrays.asList(columnName)));
217227
} else {
228+
// Calcite translated column type doesn't keep nullability.
229+
// Appending it manually to circumvent this problem.
218230
projectionColumns.add(
219231
ProjectionColumn.of(
220232
columnName,
221-
DataTypeConverter.convertCalciteRelDataTypeToDataType(
222-
relDataTypeMap.get(columnName))));
233+
isNotNullMap.get(columnName)
234+
? columnType.notNull()
235+
: columnType.nullable()));
223236
}
224237
} else {
225238
throw new ParseException("Unrecognized projection: " + sqlNode.toString());

flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformSchemaOperatorTest.java

+49
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,26 @@ public class TransformSchemaOperatorTest {
7474
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
7575
.build();
7676

77+
private static final Schema NULLABILITY_SCHEMA =
78+
Schema.newBuilder()
79+
.physicalColumn("id", DataTypes.STRING().notNull())
80+
.physicalColumn("name", DataTypes.STRING())
81+
.primaryKey("id")
82+
.partitionKey("id")
83+
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
84+
.build();
85+
86+
private static final Schema EXPECTED_NULLABILITY_SCHEMA =
87+
Schema.newBuilder()
88+
.physicalColumn("id", DataTypes.STRING().notNull())
89+
.physicalColumn("uid", DataTypes.STRING())
90+
.physicalColumn("name", DataTypes.STRING())
91+
.physicalColumn("uname", DataTypes.STRING())
92+
.primaryKey("id")
93+
.partitionKey("id")
94+
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
95+
.build();
96+
7797
@Test
7898
void testEventTransform() throws Exception {
7999
TransformSchemaOperator transform =
@@ -176,4 +196,33 @@ void testEventTransform() throws Exception {
176196
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
177197
.isEqualTo(new StreamRecord<>(updateEventExpect));
178198
}
199+
200+
@Test
201+
public void testNullabilityColumn() throws Exception {
202+
TransformSchemaOperator transform =
203+
TransformSchemaOperator.newBuilder()
204+
.addTransform(
205+
CUSTOMERS_TABLEID.identifier(),
206+
"id, upper(id) uid, name, upper(name) uname",
207+
"id",
208+
"id",
209+
"key1=value1,key2=value2")
210+
.build();
211+
EventOperatorTestHarness<TransformSchemaOperator, Event>
212+
transformFunctionEventEventOperatorTestHarness =
213+
new EventOperatorTestHarness<>(transform, 1);
214+
// Initialization
215+
transformFunctionEventEventOperatorTestHarness.open();
216+
// Create table
217+
CreateTableEvent createTableEvent =
218+
new CreateTableEvent(CUSTOMERS_TABLEID, NULLABILITY_SCHEMA);
219+
transform.processElement(new StreamRecord<>(createTableEvent));
220+
221+
Assertions.assertThat(
222+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
223+
.isEqualTo(
224+
new StreamRecord<>(
225+
new CreateTableEvent(
226+
CUSTOMERS_TABLEID, EXPECTED_NULLABILITY_SCHEMA)));
227+
}
179228
}

0 commit comments

Comments
 (0)