diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java index 8a0b31a5eeb9..a8a98e2b6bce 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java @@ -28,6 +28,8 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -219,4 +221,8 @@ default ResultSetMetaData getResultSetMetaData( default String extractTableName(TablePath tablePath) { return tablePath.getSchemaAndTableName(); } + + default Map defaultParameter() { + return new HashMap<>(); + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java index c71dc3f76a15..e45fdbea514b 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java @@ -27,6 +27,8 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -84,4 +86,11 @@ public PreparedStatement creatPreparedStatement( public String extractTableName(TablePath tablePath) { return tablePath.getTableName(); } + + @Override + public Map defaultParameter() { + HashMap map = new HashMap<>(); + map.put("rewriteBatchedStatements", "true"); + return map; + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java index c23619b5aade..2ba85d3b059a 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java @@ -108,6 +108,9 @@ public void prepare(Config pluginConfig) throws PrepareFailException { JdbcDialectLoader.load( jdbcSinkConfig.getJdbcConnectionConfig().getUrl(), jdbcSinkConfig.getJdbcConnectionConfig().getCompatibleMode()); + this.dialect + .defaultParameter() + .forEach(this.jdbcSinkConfig.getJdbcConnectionConfig().getInfo()::putIfAbsent); this.dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA; } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java index 8209533f9d55..414d60795ddd 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java @@ -146,6 +146,8 @@ public TableSink createSink(TableFactoryContext context) { JdbcDialectLoader.load( sinkConfig.getJdbcConnectionConfig().getUrl(), sinkConfig.getJdbcConnectionConfig().getCompatibleMode()); + dialect.defaultParameter() + .forEach(sinkConfig.getJdbcConnectionConfig().getInfo()::putIfAbsent); CatalogTable finalCatalogTable = catalogTable; return () -> new JdbcSink( diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java index 1bf1b332fa98..71db271f96e0 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java @@ -97,13 +97,16 @@ public void prepare(Config pluginConfig) throws PrepareFailException { ReadonlyConfig config = ReadonlyConfig.fromConfig(pluginConfig); ConfigValidator.of(config).validate(new JdbcSourceFactory().optionRule()); this.jdbcSourceConfig = JdbcSourceConfig.of(config); - this.jdbcConnectionProvider = - new SimpleJdbcConnectionProvider(jdbcSourceConfig.getJdbcConnectionConfig()); - this.query = jdbcSourceConfig.getQuery(); this.jdbcDialect = JdbcDialectLoader.load( jdbcSourceConfig.getJdbcConnectionConfig().getUrl(), jdbcSourceConfig.getJdbcConnectionConfig().getCompatibleMode()); + this.jdbcDialect + .defaultParameter() + .forEach(this.jdbcSourceConfig.getJdbcConnectionConfig().getInfo()::putIfAbsent); + this.jdbcConnectionProvider = + new SimpleJdbcConnectionProvider(jdbcSourceConfig.getJdbcConnectionConfig()); + this.query = jdbcSourceConfig.getQuery(); try (Connection connection = jdbcConnectionProvider.getOrEstablishConnection()) { this.typeInfo = initTableField(connection); this.partitionParameter =