diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/conf/JdbcConf.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/conf/JdbcConf.java index dfe13a856d..c721d95698 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/conf/JdbcConf.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/conf/JdbcConf.java @@ -55,6 +55,8 @@ public class JdbcConf extends ChunJunCommonConf implements Serializable { protected String orderByColumn; protected String querySql; protected String splitPk; + protected String splitPkStart; + protected String splitPkEnd; protected String splitStrategy; protected int fetchSize = 0; protected int queryTimeOut = 0; @@ -106,6 +108,10 @@ public class JdbcConf extends ChunJunCommonConf implements Serializable { /** upsert 写数据库时,是否null覆盖原来的值 */ protected boolean allReplace = false; + protected boolean isAutoCommit = false; + + private boolean defineColumnTypeForStatement = false; + public Boolean getInitReporter() { return initReporter; } @@ -137,6 +143,22 @@ public String getTable() { return connection.get(0).getTable().get(0); } + public String getSplitPkStart() { + return splitPkStart; + } + + public void setSplitPkStart(String splitPkStart) { + this.splitPkStart = splitPkStart; + } + + public String getSplitPkEnd() { + return splitPkEnd; + } + + public void setSplitPkEnd(String splitPkEnd) { + this.splitPkEnd = splitPkEnd; + } + public void setTable(String table) { connection.get(0).getTable().set(0, table); } @@ -419,6 +441,14 @@ public void setAllReplace(boolean allReplace) { this.allReplace = allReplace; } + public boolean isAutoCommit() { + return isAutoCommit; + } + + public boolean isDefineColumnTypeForStatement() { + return defineColumnTypeForStatement; + } + public String getSplitStrategy() { return splitStrategy; } @@ -472,6 +502,12 @@ public String toString() { + ", splitPk='" + splitPk + '\'' + + ", splitPkStart='" + + splitPkStart + + '\'' + + ", splitPkEnd='" + + splitPkEnd + + '\'' + ", splitStrategy='" + splitStrategy + '\'' @@ -485,9 +521,13 @@ public String toString() { + increment + ", polling=" + polling + + ", pollingFromMax=" + + pollingFromMax + ", increColumn='" + increColumn + '\'' + + ", isOrderBy=" + + isOrderBy + ", increColumnIndex=" + increColumnIndex + ", increColumnType='" @@ -508,6 +548,8 @@ public String toString() { + restoreColumnIndex + ", useMaxFunc=" + useMaxFunc + + ", initReporter=" + + initReporter + ", mode='" + mode + '\'' @@ -521,6 +563,10 @@ public String toString() { + updateKey + ", allReplace=" + allReplace + + ", isAutoCommit=" + + isAutoCommit + + ", defineColumnTypeForStatement=" + + defineColumnTypeForStatement + '}'; } } diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormat.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormat.java index 63f48f3ea6..fb804b95b8 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormat.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormat.java @@ -393,25 +393,32 @@ private Pair getSplitRangeFromDb() { try { long startTime = System.currentTimeMillis(); - String querySplitRangeSql = SqlUtil.buildQuerySplitRangeSql(jdbcConf, jdbcDialect); - LOG.info(String.format("Query SplitRange sql is '%s'", querySplitRangeSql)); - - conn = getConnection(); - st = conn.createStatement(resultSetType, resultSetConcurrency); - st.setQueryTimeout(jdbcConf.getQueryTimeOut()); - rs = st.executeQuery(querySplitRangeSql); - if (rs.next()) { + if (jdbcConf.getSplitPkStart() != null && jdbcConf.getSplitPkEnd() != null) { splitPkRange = Pair.of( - String.valueOf(rs.getObject("min_value")), - String.valueOf(rs.getObject("max_value"))); - } + String.valueOf(jdbcConf.getSplitPkStart()), + String.valueOf(jdbcConf.getSplitPkEnd())); - LOG.info( - String.format( - "Takes [%s] milliseconds to get the SplitRange value [%s]", - System.currentTimeMillis() - startTime, splitPkRange)); + } else { + String querySplitRangeSql = SqlUtil.buildQuerySplitRangeSql(jdbcConf, jdbcDialect); + LOG.info(String.format("Query SplitRange sql is '%s'", querySplitRangeSql)); + + conn = getConnection(); + st = conn.createStatement(resultSetType, resultSetConcurrency); + st.setQueryTimeout(jdbcConf.getQueryTimeOut()); + rs = st.executeQuery(querySplitRangeSql); + if (rs.next()) { + splitPkRange = + Pair.of( + String.valueOf(rs.getObject("min_value")), + String.valueOf(rs.getObject("max_value"))); + } + LOG.info( + String.format( + "Takes [%s] milliseconds to get the SplitRange value [%s]", + System.currentTimeMillis() - startTime, splitPkRange)); + } return splitPkRange; } catch (Throwable e) { throw new ChunJunRuntimeException( @@ -707,6 +714,10 @@ protected void executeQuery(String startLocation) throws SQLException { } } else { statement = dbConn.createStatement(resultSetType, resultSetConcurrency); + if (jdbcConf.isDefineColumnTypeForStatement() + && StringUtils.isBlank(jdbcConf.getCustomSql())) { + defineColumnType(statement); + } statement.setFetchSize(jdbcConf.getFetchSize()); statement.setQueryTimeout(jdbcConf.getQueryTimeOut()); resultSet = statement.executeQuery(jdbcConf.getQuerySql()); @@ -714,6 +725,8 @@ protected void executeQuery(String startLocation) throws SQLException { } } + protected void defineColumnType(Statement statement) throws SQLException {} + /** init prepareStatement */ public void initPrepareStatement(String querySql) throws SQLException { ps = dbConn.prepareStatement(querySql, resultSetType, resultSetConcurrency); @@ -799,7 +812,8 @@ protected Connection getConnection() throws SQLException { /** 使用自定义的指标输出器把增量指标打到普罗米修斯 */ @Override protected boolean useCustomReporter() { - return jdbcConf.isIncrement() && jdbcConf.getInitReporter(); + // 配置了 reporter 就可以输入指标到外部系统, 如果不是增量, 增量指标也不会被输出 + return jdbcConf.getInitReporter(); } /** 为了保证增量数据的准确性,指标输出失败时使任务失败 */ diff --git "a/docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/oracle/oracle-source.md" "b/docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/oracle/oracle-source.md" index 735b793294..b19b0c6cc6 100644 --- "a/docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/oracle/oracle-source.md" +++ "b/docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/oracle/oracle-source.md" @@ -110,6 +110,32 @@ Oracle 9 及以上 - 默认值:无
+- **splitPkStart** + + - 描述: 当指定了 `splitPk` 之后,可以手动指定 `splitPk` 的上/下界,之后可以直接根据这两个值计算每个并行度的数据量。 + - 注意: + - 此参数生效的前提是设置了 `splitPk`,并且需要同时设置 **splitPkEnd** 参数。 + - 推荐splitPk使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。 + - 目前splitPk仅支持整形数据切分,不支持浮点、字符串、日期等其他类型。如果用户指定其他非支持类型,ChunJun将报错。 + - 如果channel大于1但是没有配置此参数,任务将置为失败。 + - 必选:否 + - 参数类型:String + - 默认值:无 +
+ +- **splitPkEnd** + + - 描述: 当指定了 `splitPk` 之后,可以手动指定 `splitPk` 的上/下界,之后可以直接根据这两个值计算每个并行度的数据量。 + - 注意: + - 此参数生效的前提是设置了 `splitPk`,并且需要同时设置 **splitPkStart** 参数。 + - 推荐splitPk使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。 + - 目前splitPk仅支持整形数据切分,不支持浮点、字符串、日期等其他类型。如果用户指定其他非支持类型,ChunJun将报错。 + - 如果channel大于1但是没有配置此参数,任务将置为失败。 + - 必选:否 + - 参数类型:String + - 默认值:无 +
+ - **queryTimeOut** - 描述:查询超时时间,单位秒。