diff --git a/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkTable.java b/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkTable.java index a689b6c2..27ae2f90 100644 --- a/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkTable.java +++ b/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkTable.java @@ -2,8 +2,8 @@ import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionOptions; import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionProvider; +import com.starrocks.connector.flink.table.StarRocksDataType; import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; - import org.apache.flink.table.api.TableColumn; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.constraints.UniqueConstraint; @@ -55,6 +55,10 @@ public boolean isOpAutoProjectionInJson() { return version == null || version.length() > 0 && !version.trim().startsWith("1."); } + public Map getFieldMapping() { + return starRocksQueryVisitor.getFieldMapping(); + } + public void validateTableStructure(StarRocksSinkOptions sinkOptions, TableSchema flinkSchema) { if (flinkSchema == null) { return; diff --git a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java index c2c070a7..8248daf8 100644 --- a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java +++ b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java @@ -14,8 +14,6 @@ package com.starrocks.connector.flink.row.sink; -import com.starrocks.connector.flink.table.StarRocksDataType; - import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.JSONSerializer; import com.alibaba.fastjson.serializer.ObjectSerializer; @@ -23,6 +21,7 @@ import com.alibaba.fastjson.serializer.SerializeWriter; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.starrocks.connector.flink.table.StarRocksDataType; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -140,6 +139,12 @@ private Object typeConvertion(LogicalType type, RowData record, int pos) { if ((starRocksDataType == StarRocksDataType.JSON || starRocksDataType == StarRocksDataType.UNKNOWN) && (sValue.charAt(0) == '{' || sValue.charAt(0) == '[')) { + // The json string need to be converted to a json object, and to the json string + // again via JSON.toJSONString in StarRocksJsonSerializer#serialize. Otherwise, + // the final json string in stream load will not be correct. For example, the received + // string is "{"a": 1, "b": 2}", and if input it to JSON.toJSONString directly, the + // result will be "{\"a\": 1, \"b\": 2}" which will not be recognized as a json in + // StarRocks return JSON.parse(sValue); } return sValue; diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java index a9019809..2ec28390 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java @@ -64,7 +64,6 @@ public StarRocksDynamicSinkFunctionV2(StarRocksSinkOptions sinkOptions, StarRocksIRowTransformer rowTransformer) { this.sinkOptions = sinkOptions; this.rowTransformer = rowTransformer; - rowTransformer.setTableSchema(schema); StarRocksSinkTable sinkTable = StarRocksSinkTable.builder() .sinkOptions(sinkOptions) .build(); @@ -72,6 +71,8 @@ public StarRocksDynamicSinkFunctionV2(StarRocksSinkOptions sinkOptions, // StarRocksJsonSerializer depends on SinkOptions#supportUpsertDelete which is decided in // StarRocksSinkTable#validateTableStructure, so create serializer after validating table structure this.serializer = StarRocksSerializerFactory.createSerializer(sinkOptions, schema.getFieldNames()); + rowTransformer.setStarRocksColumns(sinkTable.getFieldMapping()); + rowTransformer.setTableSchema(schema); this.sinkManager = new StarRocksSinkManagerV2(sinkOptions.getProperties()); }