Skip to content

Commit 3f868ef

Browse files
joyCurry30wuzhenhua01
authored andcommitted
[FLINK-35430][cdc-connector][kafka] Pass the time zone infor to JsonSerializationSchema
This closes apache#3359.
1 parent e3a4dc9 commit 3f868ef

File tree

6 files changed

+20
-6
lines changed

6 files changed

+20
-6
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.apache.flink.formats.json.JsonFormatOptions;
2828
import org.apache.flink.formats.json.JsonFormatOptionsUtil;
2929

30+
import java.time.ZoneId;
31+
3032
import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
3133
import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL;
3234

@@ -46,7 +48,7 @@ public class ChangeLogJsonFormatFactory {
4648
* @return The configured instance of {@link SerializationSchema}.
4749
*/
4850
public static SerializationSchema<Event> createSerializationSchema(
49-
ReadableConfig formatOptions, JsonSerializationType type) {
51+
ReadableConfig formatOptions, JsonSerializationType type, ZoneId zoneId) {
5052
TimestampFormat timestampFormat = JsonFormatOptionsUtil.getTimestampFormat(formatOptions);
5153
JsonFormatOptions.MapNullKeyMode mapNullKeyMode =
5254
JsonFormatOptionsUtil.getMapNullKeyMode(formatOptions);
@@ -62,6 +64,7 @@ public static SerializationSchema<Event> createSerializationSchema(
6264
timestampFormat,
6365
mapNullKeyMode,
6466
mapNullKeyLiteral,
67+
zoneId,
6568
encodeDecimalAsPlainNumber);
6669
}
6770
case CANAL_JSON:
@@ -70,6 +73,7 @@ public static SerializationSchema<Event> createSerializationSchema(
7073
timestampFormat,
7174
mapNullKeyMode,
7275
mapNullKeyLiteral,
76+
zoneId,
7377
encodeDecimalAsPlainNumber);
7478
}
7579
default:

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,13 @@ public CanalJsonSerializationSchema(
8181
TimestampFormat timestampFormat,
8282
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
8383
String mapNullKeyLiteral,
84+
ZoneId zoneId,
8485
boolean encodeDecimalAsPlainNumber) {
8586
this.timestampFormat = timestampFormat;
8687
this.mapNullKeyMode = mapNullKeyMode;
8788
this.mapNullKeyLiteral = mapNullKeyLiteral;
8889
this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
89-
this.zoneId = ZoneId.systemDefault();
90+
this.zoneId = zoneId;
9091
jsonSerializers = new HashMap<>();
9192
}
9293

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,13 @@ public DebeziumJsonSerializationSchema(
8080
TimestampFormat timestampFormat,
8181
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
8282
String mapNullKeyLiteral,
83+
ZoneId zoneId,
8384
boolean encodeDecimalAsPlainNumber) {
8485
this.timestampFormat = timestampFormat;
8586
this.mapNullKeyMode = mapNullKeyMode;
8687
this.mapNullKeyLiteral = mapNullKeyLiteral;
8788
this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
88-
this.zoneId = ZoneId.systemDefault();
89+
this.zoneId = zoneId;
8990
jsonSerializers = new HashMap<>();
9091
}
9192

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public DataSink createDataSink(Context context) {
6262
context.getFactoryConfiguration().get(KafkaDataSinkOptions.VALUE_FORMAT);
6363
SerializationSchema<Event> valueSerialization =
6464
ChangeLogJsonFormatFactory.createSerializationSchema(
65-
configuration, jsonSerializationType);
65+
configuration, jsonSerializationType, zoneId);
6666
final Properties kafkaProperties = new Properties();
6767
Map<String, String> allOptions = context.getFactoryConfiguration().toMap();
6868
allOptions.keySet().stream()

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
import org.junit.jupiter.api.Assertions;
4141
import org.junit.jupiter.api.Test;
4242

43+
import java.time.ZoneId;
44+
4345
/** Tests for {@link CanalJsonSerializationSchema}. */
4446
public class CanalJsonSerializationSchemaTest {
4547

@@ -53,7 +55,9 @@ public void testSerialize() throws Exception {
5355
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
5456
SerializationSchema<Event> serializationSchema =
5557
ChangeLogJsonFormatFactory.createSerializationSchema(
56-
new Configuration(), JsonSerializationType.CANAL_JSON);
58+
new Configuration(),
59+
JsonSerializationType.CANAL_JSON,
60+
ZoneId.systemDefault());
5761
serializationSchema.open(new MockInitializationContext());
5862

5963
// create table

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
import org.junit.jupiter.api.Assertions;
4141
import org.junit.jupiter.api.Test;
4242

43+
import java.time.ZoneId;
44+
4345
/** Tests for {@link DebeziumJsonSerializationSchema}. */
4446
public class DebeziumJsonSerializationSchemaTest {
4547

@@ -53,7 +55,9 @@ public void testSerialize() throws Exception {
5355
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
5456
SerializationSchema<Event> serializationSchema =
5557
ChangeLogJsonFormatFactory.createSerializationSchema(
56-
new Configuration(), JsonSerializationType.DEBEZIUM_JSON);
58+
new Configuration(),
59+
JsonSerializationType.DEBEZIUM_JSON,
60+
ZoneId.systemDefault());
5761
serializationSchema.open(new MockInitializationContext());
5862
// create table
5963
Schema schema =

0 commit comments

Comments
 (0)