Skip to content

Commit

Permalink
[Bugfix] Can't load data to StarRocks json column (StarRocks#161)
Browse files Browse the repository at this point in the history
Can't map a Flink String column(json string) to StarRocks JSON column. The reason is that StarRocksDynamicSinkFunctionV2 does not set StarRocksTableRowTransformer#columns, and StarRocksTableRowTransformer#typeConvertion will not check whether the received data is a json string, and can't map it to StarRocks JSON.
  • Loading branch information
banmoy committed Dec 14, 2022
1 parent 164aa2b commit 99300ea
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionOptions;
import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionProvider;
import com.starrocks.connector.flink.table.StarRocksDataType;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;

import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.constraints.UniqueConstraint;
Expand Down Expand Up @@ -55,6 +55,10 @@ public boolean isOpAutoProjectionInJson() {
return version == null || version.length() > 0 && !version.trim().startsWith("1.");
}

public Map<String, StarRocksDataType> getFieldMapping() {
return starRocksQueryVisitor.getFieldMapping();
}

public void validateTableStructure(StarRocksSinkOptions sinkOptions, TableSchema flinkSchema) {
if (flinkSchema == null) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@

package com.starrocks.connector.flink.row.sink;

import com.starrocks.connector.flink.table.StarRocksDataType;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.JSONSerializer;
import com.alibaba.fastjson.serializer.ObjectSerializer;
import com.alibaba.fastjson.serializer.SerializeConfig;
import com.alibaba.fastjson.serializer.SerializeWriter;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.starrocks.connector.flink.table.StarRocksDataType;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
Expand Down Expand Up @@ -140,6 +139,12 @@ private Object typeConvertion(LogicalType type, RowData record, int pos) {
if ((starRocksDataType == StarRocksDataType.JSON ||
starRocksDataType == StarRocksDataType.UNKNOWN)
&& (sValue.charAt(0) == '{' || sValue.charAt(0) == '[')) {
// The json string need to be converted to a json object, and to the json string
// again via JSON.toJSONString in StarRocksJsonSerializer#serialize. Otherwise,
// the final json string in stream load will not be correct. For example, the received
// string is "{"a": 1, "b": 2}", and if input it to JSON.toJSONString directly, the
// result will be "{\"a\": 1, \"b\": 2}" which will not be recognized as a json in
// StarRocks
return JSON.parse(sValue);
}
return sValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,15 @@ public StarRocksDynamicSinkFunctionV2(StarRocksSinkOptions sinkOptions,
StarRocksIRowTransformer<T> rowTransformer) {
this.sinkOptions = sinkOptions;
this.rowTransformer = rowTransformer;
rowTransformer.setTableSchema(schema);
StarRocksSinkTable sinkTable = StarRocksSinkTable.builder()
.sinkOptions(sinkOptions)
.build();
sinkTable.validateTableStructure(sinkOptions, schema);
// StarRocksJsonSerializer depends on SinkOptions#supportUpsertDelete which is decided in
// StarRocksSinkTable#validateTableStructure, so create serializer after validating table structure
this.serializer = StarRocksSerializerFactory.createSerializer(sinkOptions, schema.getFieldNames());
rowTransformer.setStarRocksColumns(sinkTable.getFieldMapping());
rowTransformer.setTableSchema(schema);
this.sinkManager = new StarRocksSinkManagerV2(sinkOptions.getProperties());
}

Expand Down

0 comments on commit 99300ea

Please sign in to comment.