diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java index cc4805a821fd..5aa3ce13ff68 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java @@ -20,7 +20,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.configuration.ReadonlyConfig; -import org.apache.seatunnel.api.sink.DataSaveMode; import lombok.Getter; import lombok.Setter; @@ -44,7 +43,6 @@ import static org.apache.seatunnel.connectors.doris.config.DorisOptions.FENODES; import static org.apache.seatunnel.connectors.doris.config.DorisOptions.PASSWORD; import static org.apache.seatunnel.connectors.doris.config.DorisOptions.QUERY_PORT; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.SAVE_MODE; import static org.apache.seatunnel.connectors.doris.config.DorisOptions.SAVE_MODE_CREATE_TEMPLATE; import static org.apache.seatunnel.connectors.doris.config.DorisOptions.SINK_BUFFER_COUNT; import static org.apache.seatunnel.connectors.doris.config.DorisOptions.SINK_BUFFER_SIZE; @@ -93,7 +91,6 @@ public class DorisConfig implements Serializable { private Properties streamLoadProps; // create table option - private DataSaveMode saveMode; private String createTableTemplate; public static DorisConfig of(Config pluginConfig) { @@ -134,7 +131,6 @@ public static DorisConfig of(ReadonlyConfig config) { dorisConfig.setEnableDelete(config.get(SINK_ENABLE_DELETE)); // create table option - dorisConfig.setSaveMode(config.get(SAVE_MODE)); dorisConfig.setCreateTableTemplate(config.get(SAVE_MODE_CREATE_TEMPLATE)); return dorisConfig; diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java index 3182beee2504..270cc03b2179 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java @@ -21,12 +21,8 @@ import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; -import org.apache.seatunnel.api.configuration.SingleChoiceOption; import org.apache.seatunnel.api.configuration.util.OptionRule; -import org.apache.seatunnel.api.sink.DataSaveMode; -import org.apache.seatunnel.api.sink.SupportDataSaveMode; -import java.util.Collections; import java.util.Map; public interface DorisOptions { @@ -213,16 +209,6 @@ public interface DorisOptions { .withDescription(""); // create table - - SingleChoiceOption SAVE_MODE = - Options.key(SupportDataSaveMode.SAVE_MODE_KEY) - .singleChoice( - DataSaveMode.class, - Collections.singletonList(DataSaveMode.KEEP_SCHEMA_AND_DATA)) - .noDefaultValue() - .withDescription( - "Table structure and data processing methods that already exist on the target end"); - Option SAVE_MODE_CREATE_TEMPLATE = Options.key("save_mode_create_template") .stringType() @@ -241,8 +227,5 @@ public interface DorisOptions { OptionRule.builder().required(FENODES, USERNAME, PASSWORD, TABLE_IDENTIFIER); OptionRule.Builder CATALOG_RULE = - OptionRule.builder() - .required(FENODES, QUERY_PORT, USERNAME, PASSWORD) - .optional(SAVE_MODE) - .conditional(SAVE_MODE, DataSaveMode.KEEP_SCHEMA_AND_DATA); + OptionRule.builder().required(FENODES, QUERY_PORT, USERNAME, PASSWORD); } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java index 050541fc9267..92dade722fe0 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java @@ -24,21 +24,19 @@ import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.serialization.Serializer; -import org.apache.seatunnel.api.sink.DataSaveMode; +import org.apache.seatunnel.api.sink.SaveModeHandler; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; import org.apache.seatunnel.api.sink.SinkCommitter; import org.apache.seatunnel.api.sink.SinkWriter; -import org.apache.seatunnel.api.sink.SupportDataSaveMode; +import org.apache.seatunnel.api.sink.SupportSaveMode; import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.config.CheckConfigUtil; import org.apache.seatunnel.common.config.CheckResult; import org.apache.seatunnel.common.constants.PluginType; -import org.apache.seatunnel.connectors.doris.catalog.DorisCatalog; import org.apache.seatunnel.connectors.doris.config.DorisConfig; import org.apache.seatunnel.connectors.doris.config.DorisOptions; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; @@ -59,7 +57,7 @@ @AutoService(SeaTunnelSink.class) public class DorisSink implements SeaTunnelSink, - SupportDataSaveMode { + SupportSaveMode { private DorisConfig dorisConfig; private SeaTunnelRowType seaTunnelRowType; @@ -67,15 +65,12 @@ public class DorisSink private CatalogTable catalogTable; - private DataSaveMode saveMode; - public DorisSink() {} public DorisSink(ReadonlyConfig config, CatalogTable catalogTable) { this.dorisConfig = DorisConfig.of(config); this.catalogTable = catalogTable; this.seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType(); - this.saveMode = dorisConfig.getSaveMode(); } @Override @@ -106,7 +101,6 @@ public void prepare(Config pluginConfig) throws PrepareFailException { + catalogTable.getTableId().getTableName(); dorisConfig.setTableIdentifier(tableIdentifier); } - this.saveMode = dorisConfig.getSaveMode(); } @Override @@ -170,28 +164,7 @@ public Optional> getAggregatedCommitInfoSerializer() } @Override - public DataSaveMode getUserConfigSaveMode() { - return saveMode; - } - - @Override - public void handleSaveMode(DataSaveMode userConfigSaveMode) { - if (catalogTable != null && DataSaveMode.KEEP_SCHEMA_AND_DATA.equals(userConfigSaveMode)) { - try (DorisCatalog dorisCatalog = - new DorisCatalog( - "Doris", - dorisConfig.getFrontends(), - dorisConfig.getQueryPort(), - dorisConfig.getUsername(), - dorisConfig.getPassword(), - dorisConfig)) { - dorisCatalog.open(); - TablePath tablePath = TablePath.of(dorisConfig.getTableIdentifier()); - if (!dorisCatalog.databaseExists(tablePath.getDatabaseName())) { - dorisCatalog.createDatabase(tablePath, true); - } - dorisCatalog.createTable(tablePath, catalogTable, true); - } - } + public Optional getSaveModeHandler() { + return Optional.empty(); } }