diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java index f3b5368e9b6..71de5d176e3 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java @@ -22,6 +22,9 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.split.JdbcNumericBetweenParametersProvider; import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSourceState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -29,20 +32,34 @@ import java.util.stream.Collectors; public class JdbcSourceSplitEnumerator implements SourceSplitEnumerator { - - SourceSplitEnumerator.Context enumeratorContext; - List allSplit = new ArrayList<>(); - JdbcSourceOptions jdbcSourceOptions; - PartitionParameter partitionParameter; + private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceSplitEnumerator.class); + private final SourceSplitEnumerator.Context enumeratorContext; + private List allSplit = new ArrayList<>(); + private JdbcSourceOptions jdbcSourceOptions; + private final PartitionParameter partitionParameter; + private final int parallelism; public JdbcSourceSplitEnumerator(SourceSplitEnumerator.Context enumeratorContext, JdbcSourceOptions jdbcSourceOptions, PartitionParameter partitionParameter) { this.enumeratorContext = enumeratorContext; this.jdbcSourceOptions = jdbcSourceOptions; this.partitionParameter = partitionParameter; + this.parallelism = enumeratorContext.currentParallelism(); } @Override public void open() { + LOG.info("Starting to calculate splits."); + if (null != partitionParameter) { + JdbcNumericBetweenParametersProvider jdbcNumericBetweenParametersProvider = + new JdbcNumericBetweenParametersProvider(partitionParameter.minValue, partitionParameter.maxValue).ofBatchNum(parallelism); + Serializable[][] parameterValues = jdbcNumericBetweenParametersProvider.getParameterValues(); + for (int i = 0; i < parameterValues.length; i++) { + allSplit.add(new JdbcSourceSplit(parameterValues[i], i)); + } + } else { + allSplit.add(new JdbcSourceSplit(null, 0)); + } + LOG.info("Calculated splits successfully, the size of splits is {}.", allSplit.size()); } @Override @@ -70,20 +87,10 @@ public void handleSplitRequest(int subtaskId) { @Override public void registerReader(int subtaskId) { - int parallelism = enumeratorContext.currentParallelism(); - if (allSplit.isEmpty()) { - if (null != partitionParameter) { - JdbcNumericBetweenParametersProvider jdbcNumericBetweenParametersProvider = new JdbcNumericBetweenParametersProvider(partitionParameter.minValue, partitionParameter.maxValue).ofBatchNum(parallelism); - Serializable[][] parameterValues = jdbcNumericBetweenParametersProvider.getParameterValues(); - for (int i = 0; i < parameterValues.length; i++) { - allSplit.add(new JdbcSourceSplit(parameterValues[i], i)); - } - } else { - allSplit.add(new JdbcSourceSplit(null, 0)); - } - } // Filter the split that the current task needs to run - List splits = allSplit.stream().filter(p -> p.splitId % parallelism == subtaskId).collect(Collectors.toList()); + List splits = allSplit.stream() + .filter(p -> p.splitId % parallelism == subtaskId) + .collect(Collectors.toList()); enumeratorContext.assignSplit(subtaskId, splits); enumeratorContext.signalNoMoreSplits(subtaskId); }