From 6d9a3e5957c0e7aa280119e85828819635ba400d Mon Sep 17 00:00:00 2001 From: hailin0 Date: Fri, 7 Jul 2023 11:10:01 +0800 Subject: [PATCH] [Feature][CDC] Support disable/enable exactly once for INITIAL (#4921) --- docs/en/connector-v2/source/MySQL-CDC.md | 5 ++ docs/en/connector-v2/source/SqlServer-CDC.md | 5 ++ .../cdc/base/config/BaseSourceConfig.java | 3 + .../cdc/base/config/JdbcSourceConfig.java | 4 +- .../base/config/JdbcSourceConfigFactory.java | 4 ++ .../cdc/base/config/SourceConfig.java | 2 + .../cdc/base/option/SourceOptions.java | 6 ++ .../enumerator/IncrementalSplitAssigner.java | 36 +++++++---- .../enumerator/SnapshotSplitAssigner.java | 15 +---- .../base/source/enumerator/SplitAssigner.java | 3 +- .../enumerator/state/SnapshotPhaseState.java | 6 +- .../source/event/SnapshotSplitWatermark.java | 1 + .../reader/IncrementalSourceReader.java | 5 +- .../IncrementalSourceRecordEmitter.java | 4 ++ .../source/reader/external/FetchTask.java | 2 + .../IncrementalSourceScanFetcher.java | 13 +++- .../IncrementalSourceStreamFetcher.java | 12 +++- .../external/JdbcSourceFetchTaskContext.java | 5 ++ .../split/CompletedSnapshotSplitInfo.java | 15 +++-- .../cdc/base/source/split/SnapshotSplit.java | 14 ++++- .../split/state/SnapshotSplitState.java | 2 + .../cdc/mysql/config/MySqlSourceConfig.java | 6 +- .../config/MySqlSourceConfigFactory.java | 3 +- .../source/MySqlIncrementalSourceFactory.java | 4 ++ .../source/eumerator/MySqlChunkSplitter.java | 2 +- .../fetch/scan/MySqlSnapshotFetchTask.java | 52 ++++++++++------ .../scan/MySqlSnapshotSplitReadTask.java | 15 +++-- .../source/config/SqlServerSourceConfig.java | 6 +- .../config/SqlServerSourceConfigFactory.java | 3 +- .../SqlServerIncrementalSourceFactory.java | 4 ++ .../eumerator/SqlServerChunkSplitter.java | 2 +- .../scan/SqlServerSnapshotFetchTask.java | 59 +++++++++++-------- .../scan/SqlServerSnapshotSplitReadTask.java | 15 +++-- 33 files changed, 226 insertions(+), 107 deletions(-) diff --git a/docs/en/connector-v2/source/MySQL-CDC.md b/docs/en/connector-v2/source/MySQL-CDC.md index ff89fae574e..f26c1e60a01 100644 --- a/docs/en/connector-v2/source/MySQL-CDC.md +++ b/docs/en/connector-v2/source/MySQL-CDC.md @@ -45,6 +45,7 @@ describes how to set up the MySQL CDC connector to run SQL queries against MySQL | chunk-key.even-distribution.factor.lower-bound | Double | No | 0.05 | | sample-sharding.threshold | int | No | 1000 | | inverse-sampling.rate | int | No | 1000 | +| exactly_once | Boolean | No | true | | debezium.* | config | No | - | | format | Enum | No | DEFAULT | | common-options | | no | - | @@ -168,6 +169,10 @@ The max retry times that the connector should retry to build database server con The connection pool size. +### exactly_once [Boolean] + +Enable exactly once semantic. + ### debezium [Config] Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from MySQL server. diff --git a/docs/en/connector-v2/source/SqlServer-CDC.md b/docs/en/connector-v2/source/SqlServer-CDC.md index 9b60ca1472d..87456a02c7f 100644 --- a/docs/en/connector-v2/source/SqlServer-CDC.md +++ b/docs/en/connector-v2/source/SqlServer-CDC.md @@ -44,6 +44,7 @@ describes how to setup the SqlServer CDC connector to run SQL queries against Sq | chunk-key.even-distribution.factor.lower-bound | Double | No | 0.05 | | sample-sharding.threshold | int | No | 1000 | | inverse-sampling.rate | int | No | 1000 | +| exactly_once | Boolean | No | true | | debezium.* | config | No | - | | format | Enum | No | DEFAULT | | common-options | | no | - | @@ -157,6 +158,10 @@ The max retry times that the connector should retry to build database server con The connection pool size. +### exactly_once [Boolean] + +Enable exactly once semantic. + ### debezium [Config] Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from SqlServer server. diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/BaseSourceConfig.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/BaseSourceConfig.java index e2016f1fe0f..f4a82d2de12 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/BaseSourceConfig.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/BaseSourceConfig.java @@ -39,6 +39,7 @@ public abstract class BaseSourceConfig implements SourceConfig { @Getter protected final double distributionFactorLower; @Getter protected final int sampleShardingThreshold; @Getter protected final int inverseSamplingRate; + @Getter protected final boolean exactlyOnce; // -------------------------------------------------------------------------------------------- // Debezium Configurations @@ -53,6 +54,7 @@ public BaseSourceConfig( double distributionFactorLower, int sampleShardingThreshold, int inverseSamplingRate, + boolean exactlyOnce, Properties dbzProperties) { this.startupConfig = startupConfig; this.stopConfig = stopConfig; @@ -61,6 +63,7 @@ public BaseSourceConfig( this.distributionFactorLower = distributionFactorLower; this.sampleShardingThreshold = sampleShardingThreshold; this.inverseSamplingRate = inverseSamplingRate; + this.exactlyOnce = exactlyOnce; this.dbzProperties = dbzProperties; } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java index aa9218c20e8..9d46ab3393b 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java @@ -64,7 +64,8 @@ public JdbcSourceConfig( String serverTimeZone, long connectTimeoutMillis, int connectMaxRetries, - int connectionPoolSize) { + int connectionPoolSize, + boolean exactlyOnce) { super( startupConfig, stopConfig, @@ -73,6 +74,7 @@ public JdbcSourceConfig( distributionFactorLower, sampleShardingThreshold, inverseSamplingRate, + exactlyOnce, dbzProperties); this.driverClassName = driverClassName; this.hostname = hostname; diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java index c58e18489b1..ec1a6976d85 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java @@ -22,6 +22,8 @@ import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions; import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions; +import lombok.Setter; + import java.util.Arrays; import java.util.List; import java.util.Properties; @@ -55,6 +57,7 @@ public abstract class JdbcSourceConfigFactory implements SourceConfig.Factory dbzProperties.putAll(map)); diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/SourceConfig.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/SourceConfig.java index b37edf65b52..6a77bca3d59 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/SourceConfig.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/SourceConfig.java @@ -28,6 +28,8 @@ public interface SourceConfig extends Serializable { int getSplitSize(); + boolean isExactlyOnce(); + /** Factory for the {@code SourceConfig}. */ @FunctionalInterface interface Factory extends Serializable { diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java index b0f69c6169a..99932c21f93 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java @@ -102,6 +102,12 @@ public class SourceOptions { .withDescription( "Data format. The default format is seatunnel row. Optional compatible with debezium-json format."); + public static final Option EXACTLY_ONCE = + Options.key("exactly_once") + .booleanType() + .defaultValue(true) + .withDescription("Enable exactly once semantic."); + public static OptionRule.Builder getBaseRule() { return OptionRule.builder() .optional(FORMAT) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java index cf6caf87b61..d000d505363 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java @@ -124,8 +124,7 @@ public void onCompletedSplits(List completedSplitWaterma // do nothing completedSplitWatermarks.forEach( watermark -> - context.getSplitCompletedOffsets() - .put(watermark.getSplitId(), watermark.getHighWatermark())); + context.getSplitCompletedOffsets().put(watermark.getSplitId(), watermark)); } @Override @@ -195,21 +194,31 @@ public List createIncrementalSplits(boolean startWithSnapshotM private IncrementalSplit createIncrementalSplit( List capturedTables, int index, boolean startWithSnapshotMinimumOffset) { + C sourceConfig = context.getSourceConfig(); final List assignedSnapshotSplit = context.getAssignedSnapshotSplit().values().stream() .filter(split -> capturedTables.contains(split.getTableId())) .sorted(Comparator.comparing(SourceSplitBase::splitId)) .collect(Collectors.toList()); - Map splitCompletedOffsets = context.getSplitCompletedOffsets(); + Map splitCompletedOffsets = + context.getSplitCompletedOffsets(); final List completedSnapshotSplitInfos = new ArrayList<>(); Offset minOffset = null; for (SnapshotSplit split : assignedSnapshotSplit) { - Offset changeLogOffset = splitCompletedOffsets.get(split.splitId()); + SnapshotSplitWatermark splitWatermark = splitCompletedOffsets.get(split.splitId()); if (startWithSnapshotMinimumOffset) { // find the min offset of change log - if (minOffset == null || changeLogOffset.isBefore(minOffset)) { - minOffset = changeLogOffset; + Offset splitOffset = + sourceConfig.isExactlyOnce() + ? splitWatermark.getHighWatermark() + : splitWatermark.getLowWatermark(); + if (minOffset == null || splitOffset.isBefore(minOffset)) { + minOffset = splitOffset; + LOG.debug( + "Find the min offset {} of change log in split {}", + splitOffset, + splitWatermark); } } completedSnapshotSplitInfos.add( @@ -219,21 +228,26 @@ private IncrementalSplit createIncrementalSplit( split.getSplitKeyType(), split.getSplitStart(), split.getSplitEnd(), - changeLogOffset)); + splitWatermark)); } for (TableId tableId : capturedTables) { Offset watermark = tableWatermarks.get(tableId); if (minOffset == null || (watermark != null && watermark.isBefore(minOffset))) { minOffset = watermark; + LOG.debug( + "Find the min offset {} of change log in table-watermarks {}", + watermark, + tableId); } } - C sourceConfig = context.getSourceConfig(); + Offset incrementalSplitStartOffset = + minOffset != null + ? minOffset + : sourceConfig.getStartupConfig().getStartupOffset(offsetFactory); return new IncrementalSplit( String.format(INCREMENTAL_SPLIT_ID, index), capturedTables, - minOffset != null - ? minOffset - : sourceConfig.getStartupConfig().getStartupOffset(offsetFactory), + incrementalSplitStartOffset, sourceConfig.getStopConfig().getStopOffset(offsetFactory), completedSnapshotSplitInfos); } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java index b52ec67f9f0..ce2373bc622 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java @@ -22,7 +22,6 @@ import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter; import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.SnapshotPhaseState; import org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark; -import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset; import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit; import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase; @@ -50,7 +49,7 @@ public class SnapshotSplitAssigner implements SplitAssig private final List alreadyProcessedTables; private final List remainingSplits; private final Map assignedSplits; - private final Map splitCompletedOffsets; + private final Map splitCompletedOffsets; private boolean assignerCompleted; private final int currentParallelism; private final LinkedList remainingTables; @@ -107,7 +106,7 @@ private SnapshotSplitAssigner( List alreadyProcessedTables, List remainingSplits, Map assignedSplits, - Map splitCompletedOffsets, + Map splitCompletedOffsets, boolean assignerCompleted, List remainingTables, boolean isTableIdCaseSensitive, @@ -181,7 +180,7 @@ public boolean waitingForCompletedSplits() { @Override public void onCompletedSplits(List completedSplitWatermarks) { completedSplitWatermarks.forEach( - m -> this.splitCompletedOffsets.put(m.getSplitId(), m.getHighWatermark())); + watermark -> this.splitCompletedOffsets.put(watermark.getSplitId(), watermark)); if (allSplitsCompleted()) { // Skip the waiting checkpoint when current parallelism is 1 which means we do not need // to care about the global output data order of snapshot splits and incremental split. @@ -250,14 +249,6 @@ public boolean isCompleted() { return assignerCompleted; } - public Map getAssignedSplits() { - return assignedSplits; - } - - public Map getSplitCompletedOffsets() { - return splitCompletedOffsets; - } - // ------------------------------------------------------------------------------------------- /** diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SplitAssigner.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SplitAssigner.java index 4734ee5c758..a3ecdd73a76 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SplitAssigner.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SplitAssigner.java @@ -21,7 +21,6 @@ import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig; import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.PendingSplitsState; import org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark; -import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset; import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit; import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase; @@ -114,6 +113,6 @@ final class Context { private final Map assignedSnapshotSplit; /** key: SnapshotSplit id */ - private final Map splitCompletedOffsets; + private final Map splitCompletedOffsets; } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/state/SnapshotPhaseState.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/state/SnapshotPhaseState.java index 8703ff85849..07e8ea79744 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/state/SnapshotPhaseState.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/state/SnapshotPhaseState.java @@ -18,7 +18,7 @@ package org.apache.seatunnel.connectors.cdc.base.source.enumerator.state; import org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSourceEnumerator; -import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset; +import org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark; import org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader; import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit; @@ -58,7 +58,7 @@ public class SnapshotPhaseState implements PendingSplitsState { * The offsets of completed (snapshot) splits that the {@link IncrementalSourceEnumerator} has * received from {@link IncrementalSourceSplitReader}s. */ - private final Map splitCompletedOffsets; + private final Map splitCompletedOffsets; /** * Whether the snapshot split assigner is completed, which indicates there is no more splits and @@ -76,7 +76,7 @@ public SnapshotPhaseState( List alreadyProcessedTables, List remainingSplits, Map assignedSplits, - Map splitCompletedOffsets, + Map splitCompletedOffsets, boolean isAssignerCompleted, List remainingTables, boolean isTableIdCaseSensitive, diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/event/SnapshotSplitWatermark.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/event/SnapshotSplitWatermark.java index 43024926ee2..f47b6d4ce75 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/event/SnapshotSplitWatermark.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/event/SnapshotSplitWatermark.java @@ -27,5 +27,6 @@ public class SnapshotSplitWatermark implements Serializable { private static final long serialVersionUID = 1L; private final String splitId; + private final Offset lowWatermark; private final Offset highWatermark; } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java index 90d849b081f..b251759ff7c 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java @@ -140,7 +140,10 @@ private void reportFinishedSnapshotSplitsIfNeed() { for (SnapshotSplit split : finishedUnackedSplits.values()) { completedSnapshotSplitWatermarks.add( - new SnapshotSplitWatermark(split.splitId(), split.getHighWatermark())); + new SnapshotSplitWatermark( + split.splitId(), + split.getLowWatermark(), + split.getHighWatermark())); } CompletedSnapshotSplitsReportEvent reportEvent = new CompletedSnapshotSplitsReportEvent(); diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java index 8b51fa55bd6..2f8409b99a3 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java @@ -36,6 +36,7 @@ import java.util.Map; import static org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent.isHighWatermarkEvent; +import static org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent.isLowWatermarkEvent; import static org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent.isWatermarkEvent; import static org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.getFetchTimestamp; import static org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.getMessageTimestamp; @@ -107,6 +108,9 @@ protected void processElement( throws Exception { if (isWatermarkEvent(element)) { Offset watermark = getWatermark(element); + if (isLowWatermarkEvent(element) && splitState.isSnapshotSplitState()) { + splitState.asSnapshotSplitState().setLowWatermark(watermark); + } if (isHighWatermarkEvent(element) && splitState.isSnapshotSplitState()) { splitState.asSnapshotSplitState().setHighWatermark(watermark); } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/FetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/FetchTask.java index 1cc61bd7734..6784b64868e 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/FetchTask.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/FetchTask.java @@ -57,6 +57,8 @@ interface Context { Tables.TableFilter getTableFilter(); + boolean isExactlyOnce(); + Offset getStreamOffset(SourceRecord record); boolean isDataChangeRecord(SourceRecord record); diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java index 56f59ba087a..fc97d7d2125 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java @@ -133,9 +133,16 @@ public Iterator pollSplitRecords() throws InterruptedException { if (highWatermark == null && isHighWatermarkEvent(record)) { highWatermark = record; - // snapshot events capture end and begin to capture binlog events - reachChangeLogStart = true; - continue; + // snapshot events capture end + if (taskContext.isExactlyOnce()) { + // begin to capture binlog events + reachChangeLogStart = true; + continue; + } else { + // not support exactly-once, stop the loop + reachChangeLogEnd = true; + break; + } } if (reachChangeLogStart && isEndWatermarkEvent(record)) { diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java index f3715e710a8..af576d7be11 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java @@ -28,6 +28,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.debezium.connector.base.ChangeEventQueue; import io.debezium.pipeline.DataChangeEvent; +import io.debezium.relational.TableId; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; @@ -38,6 +39,8 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import static org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.getTableId; + /** * Fetcher to fetch data from table split, the split is the incremental split {@link * IncrementalSplit}. @@ -146,9 +149,16 @@ public void close() { private boolean shouldEmit(SourceRecord sourceRecord) { if (taskContext.isDataChangeRecord(sourceRecord)) { Offset position = taskContext.getStreamOffset(sourceRecord); - return position.isAfter(splitStartWatermark); + TableId tableId = getTableId(sourceRecord); + if (!taskContext.isExactlyOnce()) { + log.trace( + "The table {} is not support exactly-once, so ignore the watermark check", + tableId); + return position.isAfter(splitStartWatermark); + } // TODO only the table who captured snapshot splits need to filter( Used to support // Exactly-Once ) + return position.isAfter(splitStartWatermark); } return true; } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java index 52f915c04d7..70741aa6192 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java @@ -151,6 +151,11 @@ public SourceConfig getSourceConfig() { return sourceConfig; } + @Override + public boolean isExactlyOnce() { + return sourceConfig.isExactlyOnce(); + } + public JdbcDataSourceDialect getDataSourceDialect() { return dataSourceDialect; } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/CompletedSnapshotSplitInfo.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/CompletedSnapshotSplitInfo.java index 39b78d03c56..b38fd4fb202 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/CompletedSnapshotSplitInfo.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/CompletedSnapshotSplitInfo.java @@ -18,7 +18,7 @@ package org.apache.seatunnel.connectors.cdc.base.source.split; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset; +import org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark; import io.debezium.relational.TableId; import lombok.Getter; @@ -32,7 +32,7 @@ public class CompletedSnapshotSplitInfo implements Serializable { private final SeaTunnelRowType splitKeyType; private final Object[] splitStart; private final Object[] splitEnd; - private final Offset watermark; + private final SnapshotSplitWatermark watermark; public CompletedSnapshotSplitInfo( String splitId, @@ -40,7 +40,7 @@ public CompletedSnapshotSplitInfo( SeaTunnelRowType splitKeyType, Object[] splitStart, Object[] splitEnd, - Offset watermark) { + SnapshotSplitWatermark watermark) { this.splitId = splitId; this.tableId = tableId; this.splitKeyType = splitKeyType; @@ -50,6 +50,13 @@ public CompletedSnapshotSplitInfo( } public SnapshotSplit asSnapshotSplit() { - return new SnapshotSplit(splitId, tableId, splitKeyType, splitStart, splitEnd, watermark); + return new SnapshotSplit( + splitId, + tableId, + splitKeyType, + splitStart, + splitEnd, + watermark.getLowWatermark(), + watermark.getHighWatermark()); } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/SnapshotSplit.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/SnapshotSplit.java index 776ad68918f..95b354d2643 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/SnapshotSplit.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/SnapshotSplit.java @@ -31,20 +31,32 @@ public class SnapshotSplit extends SourceSplitBase { private final Object[] splitStart; private final Object[] splitEnd; + private final Offset lowWatermark; private final Offset highWatermark; + public SnapshotSplit( + String splitId, + TableId tableId, + SeaTunnelRowType splitKeyType, + Object[] splitStart, + Object[] splitEnd) { + this(splitId, tableId, splitKeyType, splitStart, splitEnd, null, null); + } + public SnapshotSplit( String splitId, TableId tableId, SeaTunnelRowType splitKeyType, Object[] splitStart, Object[] splitEnd, + Offset lowWatermark, Offset highWatermark) { super(splitId); this.tableId = tableId; this.splitKeyType = splitKeyType; this.splitStart = splitStart; this.splitEnd = splitEnd; + this.lowWatermark = lowWatermark; this.highWatermark = highWatermark; } @@ -54,6 +66,6 @@ public String splitId() { } public boolean isSnapshotReadFinished() { - return highWatermark != null; + return lowWatermark != null && highWatermark != null; } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/state/SnapshotSplitState.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/state/SnapshotSplitState.java index fd10a587ea5..3298d5d6934 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/state/SnapshotSplitState.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/state/SnapshotSplitState.java @@ -28,6 +28,7 @@ @Setter public class SnapshotSplitState extends SourceSplitStateBase { + private Offset lowWatermark; private Offset highWatermark; public SnapshotSplitState(SnapshotSplit split) { @@ -43,6 +44,7 @@ public SnapshotSplit toSourceSplit() { snapshotSplit.getSplitKeyType(), snapshotSplit.getSplitStart(), snapshotSplit.getSplitEnd(), + getLowWatermark(), getHighWatermark()); } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java index 9b18f202eca..19d1124847b 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java @@ -56,7 +56,8 @@ public MySqlSourceConfig( String serverTimeZone, long connectTimeoutMillis, int connectMaxRetries, - int connectionPoolSize) { + int connectionPoolSize, + boolean exactlyOnce) { super( startupConfig, stopConfig, @@ -78,7 +79,8 @@ public MySqlSourceConfig( serverTimeZone, connectTimeoutMillis, connectMaxRetries, - connectionPoolSize); + connectionPoolSize, + exactlyOnce); } @Override diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java index 02bb1343f70..ef697f2e19c 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java @@ -130,6 +130,7 @@ public MySqlSourceConfig create(int subtaskId) { serverTimeZone, connectTimeoutMillis, connectMaxRetries, - connectionPoolSize); + connectionPoolSize, + exactlyOnce); } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java index 3be95901fa5..7e64ee81ef7 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java @@ -85,6 +85,10 @@ public OptionRule optionRule() { MySqlSourceOptions.STOP_MODE, StopMode.TIMESTAMP, SourceOptions.STOP_TIMESTAMP) + .conditional( + MySqlSourceOptions.STARTUP_MODE, + StartupMode.INITIAL, + SourceOptions.EXACTLY_ONCE) .build(); } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java index 60b02c27c7d..05935d1701d 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java @@ -344,7 +344,7 @@ private SnapshotSplit createSnapshotSplit( Object[] splitStart = chunkStart == null ? null : new Object[] {chunkStart}; Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd}; return new SnapshotSplit( - splitId(tableId, chunkId), tableId, splitKeyType, splitStart, splitEnd, null); + splitId(tableId, chunkId), tableId, splitKeyType, splitStart, splitEnd); } // ------------------------------------------------------------------------------------------ diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java index ac997f7ed44..b11061a1179 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java @@ -32,6 +32,7 @@ import io.debezium.heartbeat.Heartbeat; import io.debezium.pipeline.source.spi.ChangeEventSource; import io.debezium.pipeline.spi.SnapshotResult; +import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.Collections; @@ -39,6 +40,7 @@ import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createMySqlConnection; +@Slf4j public class MySqlSnapshotFetchTask implements FetchTask { private final SnapshotSplit split; @@ -69,34 +71,48 @@ public void execute(FetchTask.Context context) throws Exception { SnapshotResult snapshotResult = snapshotSplitReadTask.execute( changeEventSourceContext, sourceFetchContext.getOffsetContext()); + if (!snapshotResult.isCompletedOrSkipped()) { + taskRunning = false; + throw new IllegalStateException( + String.format("Read snapshot for split %s fail", split)); + } - final IncrementalSplit backfillBinlogSplit = - createBackfillBinlogSplit(changeEventSourceContext); + boolean changed = + changeEventSourceContext + .getHighWatermark() + .isAfter(changeEventSourceContext.getLowWatermark()); + if (!context.isExactlyOnce()) { + taskRunning = false; + if (changed) { + log.debug("Skip merge changelog(exactly-once) for snapshot split {}", split); + } + return; + } + final IncrementalSplit backfillSplit = createBackfillBinlogSplit(changeEventSourceContext); // optimization that skip the binlog read when the low watermark equals high // watermark - final boolean binlogBackfillRequired = - backfillBinlogSplit.getStopOffset().isAfter(backfillBinlogSplit.getStartupOffset()); - if (!binlogBackfillRequired) { + if (!changed) { dispatchBinlogEndEvent( - backfillBinlogSplit, + backfillSplit, ((MySqlSourceFetchTaskContext) context).getOffsetContext().getPartition(), ((MySqlSourceFetchTaskContext) context).getDispatcher()); taskRunning = false; return; } - // execute binlog read task - if (snapshotResult.isCompletedOrSkipped()) { - final MySqlBinlogFetchTask.MySqlBinlogSplitReadTask backfillBinlogReadTask = - createBackfillBinlogReadTask(backfillBinlogSplit, sourceFetchContext); - backfillBinlogReadTask.execute( - new SnapshotBinlogSplitChangeEventSourceContext(), - sourceFetchContext.getOffsetContext()); - } else { - taskRunning = false; - throw new IllegalStateException( - String.format("Read snapshot for mysql split %s fail", split)); - } + + final MySqlBinlogFetchTask.MySqlBinlogSplitReadTask backfillReadTask = + createBackfillBinlogReadTask(backfillSplit, sourceFetchContext); + log.info( + "start execute backfillReadTask, start offset : {}, stop offset : {}", + backfillSplit.getStartupOffset(), + backfillSplit.getStopOffset()); + backfillReadTask.execute( + new SnapshotBinlogSplitChangeEventSourceContext(), + sourceFetchContext.getOffsetContext()); + log.info("backfillReadTask execute end"); + + taskRunning = false; } private IncrementalSplit createBackfillBinlogSplit( diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java index 82104aeb11f..c0879193c60 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java @@ -130,7 +130,7 @@ protected SnapshotResult doExecute( ctx.offset = offsetContext; final BinlogOffset lowWatermark = currentBinlogOffset(jdbcConnection); - LOG.debug( + LOG.info( "Snapshot step 1 - Determining low watermark {} for split {}", lowWatermark, snapshotSplit); @@ -138,11 +138,11 @@ protected SnapshotResult doExecute( dispatcher.dispatchWatermarkEvent( offsetContext.getPartition(), snapshotSplit, lowWatermark, WatermarkKind.LOW); - LOG.debug("Snapshot step 2 - Snapshotting data"); + LOG.info("Snapshot step 2 - Snapshotting data"); createDataEvents(ctx, snapshotSplit.getTableId()); final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection); - LOG.debug( + LOG.info( "Snapshot step 3 - Determining high watermark {} for split {}", highWatermark, snapshotSplit); @@ -184,8 +184,7 @@ private void createDataEventsForTable( throws InterruptedException { long exportStart = clock.currentTimeInMillis(); - LOG.debug( - "Exporting data from split '{}' of table {}", snapshotSplit.splitId(), table.id()); + LOG.info("Exporting data from split '{}' of table {}", snapshotSplit.splitId(), table.id()); final String selectSql = buildSplitScanQuery( @@ -193,7 +192,7 @@ private void createDataEventsForTable( snapshotSplit.getSplitKeyType(), snapshotSplit.getSplitStart() == null, snapshotSplit.getSplitEnd() == null); - LOG.debug( + LOG.info( "For split '{}' of table {} using select statement: '{}'", snapshotSplit.splitId(), table.id(), @@ -225,7 +224,7 @@ private void createDataEventsForTable( } if (logTimer.expired()) { long stop = clock.currentTimeInMillis(); - LOG.debug( + LOG.info( "Exported {} records for split '{}' after {}", rows, snapshotSplit.splitId(), @@ -238,7 +237,7 @@ private void createDataEventsForTable( getChangeRecordEmitter(snapshotContext, table.id(), row), snapshotReceiver); } - LOG.debug( + LOG.info( "Finished exporting {} records for split '{}', total duration '{}'", rows, snapshotSplit.splitId(), diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfig.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfig.java index 97f9bad2bd9..a47b491a83b 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfig.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfig.java @@ -56,7 +56,8 @@ public SqlServerSourceConfig( String serverTimeZone, long connectTimeoutMillis, int connectMaxRetries, - int connectionPoolSize) { + int connectionPoolSize, + boolean exactlyOnce) { super( startupConfig, stopConfig, @@ -78,7 +79,8 @@ public SqlServerSourceConfig( serverTimeZone, connectTimeoutMillis, connectMaxRetries, - connectionPoolSize); + connectionPoolSize, + exactlyOnce); } @Override diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfigFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfigFactory.java index 69acd3190af..2537cc18907 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfigFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfigFactory.java @@ -98,6 +98,7 @@ public SqlServerSourceConfig create(int subtask) { serverTimeZone, connectTimeoutMillis, connectMaxRetries, - connectionPoolSize); + connectionPoolSize, + exactlyOnce); } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java index a7dca4c3319..85810b825f3 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java @@ -83,6 +83,10 @@ public OptionRule optionRule() { SqlServerSourceOptions.STOP_MODE, StopMode.TIMESTAMP, SourceOptions.STOP_TIMESTAMP) + .conditional( + SqlServerSourceOptions.STARTUP_MODE, + StartupMode.INITIAL, + SourceOptions.EXACTLY_ONCE) .build(); } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java index 7d9342851ad..3de596fd7da 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java @@ -341,7 +341,7 @@ private SnapshotSplit createSnapshotSplit( Object[] splitStart = chunkStart == null ? null : new Object[] {chunkStart}; Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd}; return new SnapshotSplit( - splitId(tableId, chunkId), tableId, splitKeyType, splitStart, splitEnd, null); + splitId(tableId, chunkId), tableId, splitKeyType, splitStart, splitEnd); } // ------------------------------------------------------------------------------------------ diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java index 82d43971e54..e2553a07242 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java @@ -73,42 +73,49 @@ public void execute(FetchTask.Context context) throws Exception { SnapshotResult snapshotResult = snapshotSplitReadTask.execute( changeEventSourceContext, sourceFetchContext.getOffsetContext()); + if (!snapshotResult.isCompletedOrSkipped()) { + taskRunning = false; + throw new IllegalStateException( + String.format("Read snapshot for split %s fail", split)); + } - final IncrementalSplit backfillBinlogSplit = - createBackFillLsnSplit(changeEventSourceContext); + boolean changed = + changeEventSourceContext + .getHighWatermark() + .isAfter(changeEventSourceContext.getLowWatermark()); + if (!context.isExactlyOnce()) { + taskRunning = false; + if (changed) { + log.debug("Skip merge changelog(exactly-once) for snapshot split {}", split); + } + return; + } + + final IncrementalSplit backfillSplit = createBackFillLsnSplit(changeEventSourceContext); // optimization that skip the binlog read when the low watermark equals high // watermark - final boolean binlogBackfillRequired = - backfillBinlogSplit.getStopOffset().isAfter(backfillBinlogSplit.getStartupOffset()); - if (!binlogBackfillRequired) { + if (!changed) { dispatchLsnEndEvent( - backfillBinlogSplit, + backfillSplit, ((SqlServerSourceFetchTaskContext) context).getOffsetContext().getPartition(), ((SqlServerSourceFetchTaskContext) context).getDispatcher()); taskRunning = false; return; } + // execute stream read task - if (snapshotResult.isCompletedOrSkipped()) { - final SqlServerTransactionLogFetchTask.TransactionLogSplitReadTask - backfillBinlogReadTask = - createBackFillLsnSplitReadTask(backfillBinlogSplit, sourceFetchContext); - - SqlServerOffsetContext sqlServerOffsetContext = - new SqlServerOffsetContext.Loader(sourceFetchContext.getDbzConnectorConfig()) - .load(backfillBinlogSplit.getStartupOffset().getOffset()); - log.info( - "start execute backfillBinlogReadTask, start offset : {}, stop offset : {}", - backfillBinlogSplit.getStartupOffset(), - backfillBinlogSplit.getStopOffset()); - backfillBinlogReadTask.execute( - new SnapshotBinlogSplitChangeEventSourceContext(), sqlServerOffsetContext); - log.info("backfillBinlogReadTask execute end"); - } else { - taskRunning = false; - throw new IllegalStateException( - String.format("Read snapshot for SqlServer split %s fail", split)); - } + final SqlServerTransactionLogFetchTask.TransactionLogSplitReadTask backfillReadTask = + createBackFillLsnSplitReadTask(backfillSplit, sourceFetchContext); + SqlServerOffsetContext sqlServerOffsetContext = + new SqlServerOffsetContext.Loader(sourceFetchContext.getDbzConnectorConfig()) + .load(backfillSplit.getStartupOffset().getOffset()); + log.info( + "start execute backfillReadTask, start offset : {}, stop offset : {}", + backfillSplit.getStartupOffset(), + backfillSplit.getStopOffset()); + backfillReadTask.execute( + new SnapshotBinlogSplitChangeEventSourceContext(), sqlServerOffsetContext); + log.info("backfillReadTask execute end"); } private IncrementalSplit createBackFillLsnSplit( diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotSplitReadTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotSplitReadTask.java index d25934ff6a2..9857c18f067 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotSplitReadTask.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotSplitReadTask.java @@ -122,7 +122,7 @@ protected SnapshotResult doExecute( ctx.offset = offsetContext; final LsnOffset lowWatermark = SqlServerUtils.currentLsn(jdbcConnection); - log.debug( + log.info( "Snapshot step 1 - Determining low watermark {} for split {}", lowWatermark, snapshotSplit); @@ -130,11 +130,11 @@ protected SnapshotResult doExecute( dispatcher.dispatchWatermarkEvent( offsetContext.getPartition(), snapshotSplit, lowWatermark, WatermarkKind.LOW); - log.debug("Snapshot step 2 - Snapshotting data"); + log.info("Snapshot step 2 - Snapshotting data"); createDataEvents(ctx, snapshotSplit.getTableId()); final LsnOffset highWatermark = SqlServerUtils.currentLsn(jdbcConnection); - log.debug( + log.info( "Snapshot step 3 - Determining high watermark {} for split {}", highWatermark, snapshotSplit); @@ -174,8 +174,7 @@ private void createDataEventsForTable( throws InterruptedException { long exportStart = clock.currentTimeInMillis(); - log.debug( - "Exporting data from split '{}' of table {}", snapshotSplit.splitId(), table.id()); + log.info("Exporting data from split '{}' of table {}", snapshotSplit.splitId(), table.id()); final String selectSql = SqlServerUtils.buildSplitScanQuery( @@ -183,7 +182,7 @@ private void createDataEventsForTable( snapshotSplit.getSplitKeyType(), snapshotSplit.getSplitStart() == null, snapshotSplit.getSplitEnd() == null); - log.debug( + log.info( "For split '{}' of table {} using select statement: '{}'", snapshotSplit.splitId(), table.id(), @@ -214,7 +213,7 @@ private void createDataEventsForTable( } if (logTimer.expired()) { long stop = clock.currentTimeInMillis(); - log.debug( + log.info( "Exported {} records for split '{}' after {}", rows, snapshotSplit.splitId(), @@ -227,7 +226,7 @@ private void createDataEventsForTable( getChangeRecordEmitter(snapshotContext, table.id(), row), snapshotReceiver); } - log.debug( + log.info( "Finished exporting {} records for split '{}', total duration '{}'", rows, snapshotSplit.splitId(),