Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36154][formats] Support deserialize json ignore key case #25255

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS;
import static org.apache.flink.formats.json.JsonFormatOptions.FAIL_ON_MISSING_FIELD;
import static org.apache.flink.formats.json.JsonFormatOptions.IGNORE_KEY_CASE;
import static org.apache.flink.formats.json.JsonFormatOptions.IGNORE_PARSE_ERRORS;
import static org.apache.flink.formats.json.JsonFormatOptions.MAP_NULL_KEY_LITERAL;
import static org.apache.flink.formats.json.JsonFormatOptions.MAP_NULL_KEY_MODE;
Expand All @@ -71,6 +72,7 @@ public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
final boolean failOnMissingField = formatOptions.get(FAIL_ON_MISSING_FIELD);
final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
final boolean jsonParserEnabled = formatOptions.get(DECODE_JSON_PARSER_ENABLED);
final boolean ignoreKeyCase = formatOptions.get(IGNORE_KEY_CASE);
TimestampFormat timestampOption = JsonFormatOptionsUtil.getTimestampFormat(formatOptions);

return new ProjectableDecodingFormat<DeserializationSchema<RowData>>() {
Expand All @@ -90,6 +92,7 @@ public DeserializationSchema<RowData> createRuntimeDecoder(
rowDataTypeInfo,
failOnMissingField,
ignoreParseErrors,
ignoreKeyCase,
timestampOption,
toProjectedNames(
(RowType) physicalDataType.getLogicalType(), projections));
Expand Down Expand Up @@ -202,6 +205,7 @@ public Set<ConfigOption<?>> forwardOptions() {
options.add(MAP_NULL_KEY_LITERAL);
options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
options.add(ENCODE_IGNORE_NULL_FIELDS);
options.add(IGNORE_KEY_CASE);
return options;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ public class JsonFormatOptions {
.withDescription(
"Optional flag to specify whether to use the Jackson JsonParser to decode json with better performance, true by default.");

public static final ConfigOption<Boolean> IGNORE_KEY_CASE =
ConfigOptions.key("ignore-key-case")
.booleanType()
.defaultValue(false)
.withDescription(
"Optional flag to specify whether to ignore key case of field when decoding, false by default.");

// --------------------------------------------------------------------------------------------
// Enums
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,33 @@ public JsonParserRowDataDeserializationSchema(
TypeInformation<RowData> resultTypeInfo,
boolean failOnMissingField,
boolean ignoreParseErrors,
boolean ignoreKeyCase,
TimestampFormat timestampFormat) {
this(rowType, resultTypeInfo, failOnMissingField, ignoreParseErrors, timestampFormat, null);
this(
rowType,
resultTypeInfo,
failOnMissingField,
ignoreParseErrors,
ignoreKeyCase,
timestampFormat,
null);
}

public JsonParserRowDataDeserializationSchema(
RowType rowType,
TypeInformation<RowData> resultTypeInfo,
boolean failOnMissingField,
boolean ignoreParseErrors,
boolean ignoreKeyCase,
TimestampFormat timestampFormat,
@Nullable String[][] projectedFields) {
super(rowType, resultTypeInfo, failOnMissingField, ignoreParseErrors, timestampFormat);
this.runtimeConverter =
new JsonParserToRowDataConverters(
failOnMissingField, ignoreParseErrors, timestampFormat)
failOnMissingField,
ignoreParseErrors,
ignoreKeyCase,
timestampFormat)
.createConverter(projectedFields, checkNotNull(rowType));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,20 @@ public class JsonParserToRowDataConverters implements Serializable {
/** Timestamp format specification which is used to parse timestamp. */
private final TimestampFormat timestampFormat;

/**
* Flag indicating whether ignore key case of field between json string and table row schema.
*/
private final boolean ignoreKeyCase;

public JsonParserToRowDataConverters(
boolean failOnMissingField,
boolean ignoreParseErrors,
boolean ignoreKeyCase,
TimestampFormat timestampFormat) {
this.failOnMissingField = failOnMissingField;
this.ignoreParseErrors = ignoreParseErrors;
this.timestampFormat = timestampFormat;
this.ignoreKeyCase = ignoreKeyCase;
}

/**
Expand Down Expand Up @@ -391,7 +398,7 @@ public JsonParserToRowDataConverter createRowConverter(RowType rowType) {

Map<String, Integer> nameIdxMap = new HashMap<>();
for (int i = 0; i < rowType.getFieldCount(); i++) {
nameIdxMap.put(fieldNames[i], i);
nameIdxMap.put(ignoreKeyCase ? fieldNames[i].toLowerCase() : fieldNames[i], i);
}

return jp -> {
Expand All @@ -407,7 +414,7 @@ public JsonParserToRowDataConverter createRowConverter(RowType rowType) {
skipToNextField(jp);
continue;
}
String fieldName = jp.getText();
String fieldName = ignoreKeyCase ? jp.getText().toLowerCase() : jp.getText();
jp.nextToken();
Integer idx = nameIdxMap.get(fieldName);
if (idx != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ private void testSchemaDeserializationSchema(Map<String, String> options) {
InternalTypeInfo.of(PHYSICAL_TYPE),
false,
true,
false,
TimestampFormat.ISO_8601);

DeserializationSchema<RowData> actualDeser =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void testParsePartialJson() throws Exception {

DeserializationSchema<RowData> deserializationSchema =
new JsonParserRowDataDeserializationSchema(
schema, resultTypeInfo, false, false, TimestampFormat.ISO_8601);
schema, resultTypeInfo, false, false, false, TimestampFormat.ISO_8601);
open(deserializationSchema);

Row expected = new Row(5);
Expand Down Expand Up @@ -168,6 +168,7 @@ private void innerTestProjected(byte[] serializedJson, GenericRowData expected)
resultTypeInfo,
false,
false,
false,
TimestampFormat.ISO_8601,
new String[][] {
new String[] {"f1"},
Expand Down Expand Up @@ -261,6 +262,7 @@ private void innerTestProjectNestedField(byte[] serializedJson, GenericRowData e
resultTypeInfo,
false,
false,
false,
TimestampFormat.ISO_8601,
new String[][] {
new String[] {"f1"},
Expand Down Expand Up @@ -328,6 +330,7 @@ private void innerTestProjectBothRowAndNestedField(
resultTypeInfo,
false,
false,
false,
TimestampFormat.ISO_8601,
new String[][] {
new String[] {"f1"},
Expand All @@ -343,4 +346,35 @@ private void innerTestProjectBothRowAndNestedField(
RowData rowData = deserializationSchema.deserialize(serializedJson);
assertThat(rowData).isEqualTo(expected);
}

@Test
public void testIgnoreKeyCase() throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
ObjectNode objectNode = objectMapper.createObjectNode();
objectNode.put("A", "a1");
objectNode.putObject("B").put("X", "x123").put("y", "y123");
byte[] serializedJson = objectMapper.writeValueAsBytes(objectNode);

DataType dataType =
ROW(
FIELD("a", STRING()),
FIELD("b", ROW(FIELD("x", STRING()), FIELD("Y", STRING()))));
RowType schema = (RowType) dataType.getLogicalType();
TypeInformation<RowData> resultTypeInfo = InternalTypeInfo.of(schema);

DeserializationSchema<RowData> deserializationSchema =
new JsonParserRowDataDeserializationSchema(
schema, resultTypeInfo, false, false, true, TimestampFormat.ISO_8601);
open(deserializationSchema);
Row excepted = new Row(2);
excepted.setField(0, "a1");
Row internalRow = new Row(2);
internalRow.setField(0, "x123");
internalRow.setField(1, "y123");
excepted.setField(1, internalRow);

RowData rowData = deserializationSchema.deserialize(serializedJson);
Row actual = convertToExternal(rowData, dataType);
assertThat(actual).isEqualTo(excepted);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,7 @@ private DeserializationSchema<RowData> createDeserializationSchema(
InternalTypeInfo.of(rowType),
failOnMissingField,
ignoreParseErrors,
false,
timestampFormat);
} else {
return new JsonRowDataDeserializationSchema(
Expand Down