diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java index 1c37dfd773c..ef92d415262 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java @@ -17,6 +17,9 @@ package org.apache.seatunnel.connectors.seatunnel.kafka.sink; +import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS; +import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC; + import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.serialization.DefaultSerializer; import org.apache.seatunnel.api.serialization.Serializer; @@ -26,6 +29,9 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaAggregatedCommitInfo; import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo; import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState; @@ -50,6 +56,10 @@ public class KafkaSink implements SeaTunnelSink getSerializer(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) { - return new DefaultSeaTunnelRowSerializer(pluginConfig.getString("topics"), seaTunnelRowType); + return new DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC), seaTunnelRowType); } private KafkaSemantics getKafkaSemantics(Config pluginConfig) {