Skip to content

Commit

Permalink
[FLINK-35430][cdc-connector][kafka] Pass the time zone infor to JsonS…
Browse files Browse the repository at this point in the history
…erializationSchema

This closes  apache#3359.
  • Loading branch information
joyCurry30 authored and zhangchaoming.zcm committed Jan 3, 2025
1 parent 981967a commit 1aad51a
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.flink.formats.json.JsonFormatOptions;
import org.apache.flink.formats.json.JsonFormatOptionsUtil;

import java.time.ZoneId;

import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL;

Expand All @@ -46,7 +48,7 @@ public class ChangeLogJsonFormatFactory {
* @return The configured instance of {@link SerializationSchema}.
*/
public static SerializationSchema<Event> createSerializationSchema(
ReadableConfig formatOptions, JsonSerializationType type) {
ReadableConfig formatOptions, JsonSerializationType type, ZoneId zoneId) {
TimestampFormat timestampFormat = JsonFormatOptionsUtil.getTimestampFormat(formatOptions);
JsonFormatOptions.MapNullKeyMode mapNullKeyMode =
JsonFormatOptionsUtil.getMapNullKeyMode(formatOptions);
Expand All @@ -62,6 +64,7 @@ public static SerializationSchema<Event> createSerializationSchema(
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
zoneId,
encodeDecimalAsPlainNumber);
}
case CANAL_JSON:
Expand All @@ -70,6 +73,7 @@ public static SerializationSchema<Event> createSerializationSchema(
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
zoneId,
encodeDecimalAsPlainNumber);
}
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,13 @@ public CanalJsonSerializationSchema(
TimestampFormat timestampFormat,
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral,
ZoneId zoneId,
boolean encodeDecimalAsPlainNumber) {
this.timestampFormat = timestampFormat;
this.mapNullKeyMode = mapNullKeyMode;
this.mapNullKeyLiteral = mapNullKeyLiteral;
this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
this.zoneId = ZoneId.systemDefault();
this.zoneId = zoneId;
jsonSerializers = new HashMap<>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,13 @@ public DebeziumJsonSerializationSchema(
TimestampFormat timestampFormat,
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral,
ZoneId zoneId,
boolean encodeDecimalAsPlainNumber) {
this.timestampFormat = timestampFormat;
this.mapNullKeyMode = mapNullKeyMode;
this.mapNullKeyLiteral = mapNullKeyLiteral;
this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
this.zoneId = ZoneId.systemDefault();
this.zoneId = zoneId;
jsonSerializers = new HashMap<>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public DataSink createDataSink(Context context) {
context.getFactoryConfiguration().get(KafkaDataSinkOptions.VALUE_FORMAT);
SerializationSchema<Event> valueSerialization =
ChangeLogJsonFormatFactory.createSerializationSchema(
configuration, jsonSerializationType);
configuration, jsonSerializationType, zoneId);
final Properties kafkaProperties = new Properties();
Map<String, String> allOptions = context.getFactoryConfiguration().toMap();
allOptions.keySet().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.time.ZoneId;

/** Tests for {@link CanalJsonSerializationSchema}. */
public class CanalJsonSerializationSchemaTest {

Expand All @@ -53,7 +55,9 @@ public void testSerialize() throws Exception {
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
SerializationSchema<Event> serializationSchema =
ChangeLogJsonFormatFactory.createSerializationSchema(
new Configuration(), JsonSerializationType.CANAL_JSON);
new Configuration(),
JsonSerializationType.CANAL_JSON,
ZoneId.systemDefault());
serializationSchema.open(new MockInitializationContext());

// create table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.time.ZoneId;

/** Tests for {@link DebeziumJsonSerializationSchema}. */
public class DebeziumJsonSerializationSchemaTest {

Expand All @@ -53,7 +55,9 @@ public void testSerialize() throws Exception {
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
SerializationSchema<Event> serializationSchema =
ChangeLogJsonFormatFactory.createSerializationSchema(
new Configuration(), JsonSerializationType.DEBEZIUM_JSON);
new Configuration(),
JsonSerializationType.DEBEZIUM_JSON,
ZoneId.systemDefault());
serializationSchema.open(new MockInitializationContext());
// create table
Schema schema =
Expand Down

0 comments on commit 1aad51a

Please sign in to comment.