Skip to content

Commit

Permalink
fixed e2e and config DEBEZIUM_RECORD_INCLUDE_SCHEMA
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jul 14, 2023
1 parent 3a2d969 commit cd93772
Show file tree
Hide file tree
Showing 12 changed files with 204 additions and 158 deletions.
1 change: 1 addition & 0 deletions docs/en/connector-v2/source/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ source {
- Add Kafka Source Connector

### Next Version

- [Improve] Support setting read starting offset or time at startup config ([3157](https://github.com/apache/incubator-seatunnel/pull/3157))
- [Improve] Support for dynamic discover topic & partition in streaming mode ([3125](https://github.com/apache/incubator-seatunnel/pull/3125))
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
public class Config {

public static final String CONNECTOR_IDENTITY = "Kafka";
public static final String REPLICATION_FACTOR = "replication.factor";
public static final String DEBEZIUM_FORMAT = "debezium-json";
/** The default field delimiter is “,” */
public static final String DEFAULT_FIELD_DELIMITER = ",";

Expand Down Expand Up @@ -99,6 +97,12 @@ public class Config {
"Data format. The default format is json. Optional text format. The default field separator is \", \". "
+ "If you customize the delimiter, add the \"field_delimiter\" option.");

public static final Option<Boolean> DEBEZIUM_RECORD_INCLUDE_SCHEMA =
Options.key("debezium_record_include_schema")
.booleanType()
.defaultValue(true)
.withDescription("Does the debezium record carry a schema.");

public static final Option<String> FIELD_DELIMITER =
Options.key("field_delimiter")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ public enum MessageFormat {
JSON,
TEXT,
CANAL_JSON,
COMPATIBLE_DEBEZIUM_JSON,
DEBEZIUM_FORMAT

DEBEZIUM_JSON,
COMPATIBLE_DEBEZIUM_JSON
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,10 @@ private static SerializationSchema createSerializationSchema(
.build();
case CANAL_JSON:
return new CanalJsonSerializationSchema(rowType);
case DEBEZIUM_JSON:
return new DebeziumJsonSerializationSchema(rowType);
case COMPATIBLE_DEBEZIUM_JSON:
return new CompatibleDebeziumJsonSerializationSchema(rowType, isKey);
case DEBEZIUM_FORMAT:
return new DebeziumJsonSerializationSchema(rowType);
default:
throw new SeaTunnelJsonFormatException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMMIT_ON_CHECKPOINT;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEBEZIUM_RECORD_INCLUDE_SCHEMA;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FIELD_DELIMITER;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
Expand Down Expand Up @@ -268,7 +269,12 @@ private void setDeserialization(Config config) {
.build();
break;
case DEBEZIUM_JSON:
deserializationSchema = new DebeziumJsonDeserializationSchema(typeInfo, true);
boolean includeSchema = DEBEZIUM_RECORD_INCLUDE_SCHEMA.defaultValue();
if (config.hasPath(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key())) {
includeSchema = config.getBoolean(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key());
}
deserializationSchema =
new DebeziumJsonDeserializationSchema(typeInfo, true, includeSchema);
break;
default:
throw new SeaTunnelJsonFormatException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public OptionRule optionRule() {
Config.KAFKA_CONFIG,
Config.SCHEMA,
Config.FORMAT,
Config.DEBEZIUM_RECORD_INCLUDE_SCHEMA,
Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS)
.conditional(Config.START_MODE, StartMode.TIMESTAMP, Config.START_MODE_TIMESTAMP)
.conditional(
Expand Down
Loading

0 comments on commit cd93772

Please sign in to comment.