Skip to content

Commit

Permalink
[Feature][JDBC] Default parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouyao committed Sep 11, 2023
1 parent 8d262a5 commit fcfbe70
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -219,4 +221,8 @@ default ResultSetMetaData getResultSetMetaData(
default String extractTableName(TablePath tablePath) {
return tablePath.getSchemaAndTableName();
}

default Map<String, String> defaultParameter() {
return new HashMap<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -84,4 +86,11 @@ public PreparedStatement creatPreparedStatement(
public String extractTableName(TablePath tablePath) {
return tablePath.getTableName();
}

@Override
public Map<String, String> defaultParameter() {
HashMap<String, String> map = new HashMap<>();
map.put("rewriteBatchedStatements", "true");
return map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
JdbcDialectLoader.load(
jdbcSinkConfig.getJdbcConnectionConfig().getUrl(),
jdbcSinkConfig.getJdbcConnectionConfig().getCompatibleMode());
this.dialect
.defaultParameter()
.forEach(this.jdbcSinkConfig.getJdbcConnectionConfig().getInfo()::putIfAbsent);
this.dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ public TableSink createSink(TableFactoryContext context) {
JdbcDialectLoader.load(
sinkConfig.getJdbcConnectionConfig().getUrl(),
sinkConfig.getJdbcConnectionConfig().getCompatibleMode());
dialect.defaultParameter()
.forEach(sinkConfig.getJdbcConnectionConfig().getInfo()::putIfAbsent);
CatalogTable finalCatalogTable = catalogTable;
return () ->
new JdbcSink(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,16 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
ReadonlyConfig config = ReadonlyConfig.fromConfig(pluginConfig);
ConfigValidator.of(config).validate(new JdbcSourceFactory().optionRule());
this.jdbcSourceConfig = JdbcSourceConfig.of(config);
this.jdbcConnectionProvider =
new SimpleJdbcConnectionProvider(jdbcSourceConfig.getJdbcConnectionConfig());
this.query = jdbcSourceConfig.getQuery();
this.jdbcDialect =
JdbcDialectLoader.load(
jdbcSourceConfig.getJdbcConnectionConfig().getUrl(),
jdbcSourceConfig.getJdbcConnectionConfig().getCompatibleMode());
this.jdbcDialect
.defaultParameter()
.forEach(this.jdbcSourceConfig.getJdbcConnectionConfig().getInfo()::putIfAbsent);
this.jdbcConnectionProvider =
new SimpleJdbcConnectionProvider(jdbcSourceConfig.getJdbcConnectionConfig());
this.query = jdbcSourceConfig.getQuery();
try (Connection connection = jdbcConnectionProvider.getOrEstablishConnection()) {
this.typeInfo = initTableField(connection);
this.partitionParameter =
Expand Down

0 comments on commit fcfbe70

Please sign in to comment.