Skip to content

Commit

Permalink
[Feature][CDC] Support disable/enable exactly once for INITIAL (#4921)
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 authored Jul 7, 2023
1 parent d3b59d8 commit 6d9a3e5
Show file tree
Hide file tree
Showing 33 changed files with 226 additions and 107 deletions.
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/MySQL-CDC.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ describes how to set up the MySQL CDC connector to run SQL queries against MySQL
| chunk-key.even-distribution.factor.lower-bound | Double | No | 0.05 |
| sample-sharding.threshold | int | No | 1000 |
| inverse-sampling.rate | int | No | 1000 |
| exactly_once | Boolean | No | true |
| debezium.* | config | No | - |
| format | Enum | No | DEFAULT |
| common-options | | no | - |
Expand Down Expand Up @@ -168,6 +169,10 @@ The max retry times that the connector should retry to build database server con

The connection pool size.

### exactly_once [Boolean]

Enable exactly once semantic.

### debezium [Config]

Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from MySQL server.
Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/SqlServer-CDC.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ describes how to setup the SqlServer CDC connector to run SQL queries against Sq
| chunk-key.even-distribution.factor.lower-bound | Double | No | 0.05 |
| sample-sharding.threshold | int | No | 1000 |
| inverse-sampling.rate | int | No | 1000 |
| exactly_once | Boolean | No | true |
| debezium.* | config | No | - |
| format | Enum | No | DEFAULT |
| common-options | | no | - |
Expand Down Expand Up @@ -157,6 +158,10 @@ The max retry times that the connector should retry to build database server con

The connection pool size.

### exactly_once [Boolean]

Enable exactly once semantic.

### debezium [Config]

Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from SqlServer server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public abstract class BaseSourceConfig implements SourceConfig {
@Getter protected final double distributionFactorLower;
@Getter protected final int sampleShardingThreshold;
@Getter protected final int inverseSamplingRate;
@Getter protected final boolean exactlyOnce;

// --------------------------------------------------------------------------------------------
// Debezium Configurations
Expand All @@ -53,6 +54,7 @@ public BaseSourceConfig(
double distributionFactorLower,
int sampleShardingThreshold,
int inverseSamplingRate,
boolean exactlyOnce,
Properties dbzProperties) {
this.startupConfig = startupConfig;
this.stopConfig = stopConfig;
Expand All @@ -61,6 +63,7 @@ public BaseSourceConfig(
this.distributionFactorLower = distributionFactorLower;
this.sampleShardingThreshold = sampleShardingThreshold;
this.inverseSamplingRate = inverseSamplingRate;
this.exactlyOnce = exactlyOnce;
this.dbzProperties = dbzProperties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public JdbcSourceConfig(
String serverTimeZone,
long connectTimeoutMillis,
int connectMaxRetries,
int connectionPoolSize) {
int connectionPoolSize,
boolean exactlyOnce) {
super(
startupConfig,
stopConfig,
Expand All @@ -73,6 +74,7 @@ public JdbcSourceConfig(
distributionFactorLower,
sampleShardingThreshold,
inverseSamplingRate,
exactlyOnce,
dbzProperties);
this.driverClassName = driverClassName;
this.hostname = hostname;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;

import lombok.Setter;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;
Expand Down Expand Up @@ -55,6 +57,7 @@ public abstract class JdbcSourceConfigFactory implements SourceConfig.Factory<Jd
protected long connectTimeoutMillis = JdbcSourceOptions.CONNECT_TIMEOUT_MS.defaultValue();
protected int connectMaxRetries = JdbcSourceOptions.CONNECT_MAX_RETRIES.defaultValue();
protected int connectionPoolSize = JdbcSourceOptions.CONNECTION_POOL_SIZE.defaultValue();
@Setter protected boolean exactlyOnce = JdbcSourceOptions.EXACTLY_ONCE.defaultValue();
protected Properties dbzProperties;

/** Integer port number of the database server. */
Expand Down Expand Up @@ -242,6 +245,7 @@ public JdbcSourceConfigFactory fromReadonlyConfig(ReadonlyConfig config) {
this.connectTimeoutMillis = config.get(JdbcSourceOptions.CONNECT_TIMEOUT_MS);
this.connectMaxRetries = config.get(JdbcSourceOptions.CONNECT_MAX_RETRIES);
this.connectionPoolSize = config.get(JdbcSourceOptions.CONNECTION_POOL_SIZE);
this.exactlyOnce = config.get(JdbcSourceOptions.EXACTLY_ONCE);
this.dbzProperties = new Properties();
config.getOptional(SourceOptions.DEBEZIUM_PROPERTIES)
.ifPresent(map -> dbzProperties.putAll(map));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public interface SourceConfig extends Serializable {

int getSplitSize();

boolean isExactlyOnce();

/** Factory for the {@code SourceConfig}. */
@FunctionalInterface
interface Factory<C extends SourceConfig> extends Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ public class SourceOptions {
.withDescription(
"Data format. The default format is seatunnel row. Optional compatible with debezium-json format.");

public static final Option<Boolean> EXACTLY_ONCE =
Options.key("exactly_once")
.booleanType()
.defaultValue(true)
.withDescription("Enable exactly once semantic.");

public static OptionRule.Builder getBaseRule() {
return OptionRule.builder()
.optional(FORMAT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,7 @@ public void onCompletedSplits(List<SnapshotSplitWatermark> completedSplitWaterma
// do nothing
completedSplitWatermarks.forEach(
watermark ->
context.getSplitCompletedOffsets()
.put(watermark.getSplitId(), watermark.getHighWatermark()));
context.getSplitCompletedOffsets().put(watermark.getSplitId(), watermark));
}

@Override
Expand Down Expand Up @@ -195,21 +194,31 @@ public List<IncrementalSplit> createIncrementalSplits(boolean startWithSnapshotM

private IncrementalSplit createIncrementalSplit(
List<TableId> capturedTables, int index, boolean startWithSnapshotMinimumOffset) {
C sourceConfig = context.getSourceConfig();
final List<SnapshotSplit> assignedSnapshotSplit =
context.getAssignedSnapshotSplit().values().stream()
.filter(split -> capturedTables.contains(split.getTableId()))
.sorted(Comparator.comparing(SourceSplitBase::splitId))
.collect(Collectors.toList());

Map<String, Offset> splitCompletedOffsets = context.getSplitCompletedOffsets();
Map<String, SnapshotSplitWatermark> splitCompletedOffsets =
context.getSplitCompletedOffsets();
final List<CompletedSnapshotSplitInfo> completedSnapshotSplitInfos = new ArrayList<>();
Offset minOffset = null;
for (SnapshotSplit split : assignedSnapshotSplit) {
Offset changeLogOffset = splitCompletedOffsets.get(split.splitId());
SnapshotSplitWatermark splitWatermark = splitCompletedOffsets.get(split.splitId());
if (startWithSnapshotMinimumOffset) {
// find the min offset of change log
if (minOffset == null || changeLogOffset.isBefore(minOffset)) {
minOffset = changeLogOffset;
Offset splitOffset =
sourceConfig.isExactlyOnce()
? splitWatermark.getHighWatermark()
: splitWatermark.getLowWatermark();
if (minOffset == null || splitOffset.isBefore(minOffset)) {
minOffset = splitOffset;
LOG.debug(
"Find the min offset {} of change log in split {}",
splitOffset,
splitWatermark);
}
}
completedSnapshotSplitInfos.add(
Expand All @@ -219,21 +228,26 @@ private IncrementalSplit createIncrementalSplit(
split.getSplitKeyType(),
split.getSplitStart(),
split.getSplitEnd(),
changeLogOffset));
splitWatermark));
}
for (TableId tableId : capturedTables) {
Offset watermark = tableWatermarks.get(tableId);
if (minOffset == null || (watermark != null && watermark.isBefore(minOffset))) {
minOffset = watermark;
LOG.debug(
"Find the min offset {} of change log in table-watermarks {}",
watermark,
tableId);
}
}
C sourceConfig = context.getSourceConfig();
Offset incrementalSplitStartOffset =
minOffset != null
? minOffset
: sourceConfig.getStartupConfig().getStartupOffset(offsetFactory);
return new IncrementalSplit(
String.format(INCREMENTAL_SPLIT_ID, index),
capturedTables,
minOffset != null
? minOffset
: sourceConfig.getStartupConfig().getStartupOffset(offsetFactory),
incrementalSplitStartOffset,
sourceConfig.getStopConfig().getStopOffset(offsetFactory),
completedSnapshotSplitInfos);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.SnapshotPhaseState;
import org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;

Expand Down Expand Up @@ -50,7 +49,7 @@ public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssig
private final List<TableId> alreadyProcessedTables;
private final List<SnapshotSplit> remainingSplits;
private final Map<String, SnapshotSplit> assignedSplits;
private final Map<String, Offset> splitCompletedOffsets;
private final Map<String, SnapshotSplitWatermark> splitCompletedOffsets;
private boolean assignerCompleted;
private final int currentParallelism;
private final LinkedList<TableId> remainingTables;
Expand Down Expand Up @@ -107,7 +106,7 @@ private SnapshotSplitAssigner(
List<TableId> alreadyProcessedTables,
List<SnapshotSplit> remainingSplits,
Map<String, SnapshotSplit> assignedSplits,
Map<String, Offset> splitCompletedOffsets,
Map<String, SnapshotSplitWatermark> splitCompletedOffsets,
boolean assignerCompleted,
List<TableId> remainingTables,
boolean isTableIdCaseSensitive,
Expand Down Expand Up @@ -181,7 +180,7 @@ public boolean waitingForCompletedSplits() {
@Override
public void onCompletedSplits(List<SnapshotSplitWatermark> completedSplitWatermarks) {
completedSplitWatermarks.forEach(
m -> this.splitCompletedOffsets.put(m.getSplitId(), m.getHighWatermark()));
watermark -> this.splitCompletedOffsets.put(watermark.getSplitId(), watermark));
if (allSplitsCompleted()) {
// Skip the waiting checkpoint when current parallelism is 1 which means we do not need
// to care about the global output data order of snapshot splits and incremental split.
Expand Down Expand Up @@ -250,14 +249,6 @@ public boolean isCompleted() {
return assignerCompleted;
}

public Map<String, SnapshotSplit> getAssignedSplits() {
return assignedSplits;
}

public Map<String, Offset> getSplitCompletedOffsets() {
return splitCompletedOffsets;
}

// -------------------------------------------------------------------------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.PendingSplitsState;
import org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;

Expand Down Expand Up @@ -114,6 +113,6 @@ final class Context<C extends SourceConfig> {
private final Map<String, SnapshotSplit> assignedSnapshotSplit;

/** key: SnapshotSplit id */
private final Map<String, Offset> splitCompletedOffsets;
private final Map<String, SnapshotSplitWatermark> splitCompletedOffsets;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.seatunnel.connectors.cdc.base.source.enumerator.state;

import org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSourceEnumerator;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
import org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;

Expand Down Expand Up @@ -58,7 +58,7 @@ public class SnapshotPhaseState implements PendingSplitsState {
* The offsets of completed (snapshot) splits that the {@link IncrementalSourceEnumerator} has
* received from {@link IncrementalSourceSplitReader}s.
*/
private final Map<String, Offset> splitCompletedOffsets;
private final Map<String, SnapshotSplitWatermark> splitCompletedOffsets;

/**
* Whether the snapshot split assigner is completed, which indicates there is no more splits and
Expand All @@ -76,7 +76,7 @@ public SnapshotPhaseState(
List<TableId> alreadyProcessedTables,
List<SnapshotSplit> remainingSplits,
Map<String, SnapshotSplit> assignedSplits,
Map<String, Offset> splitCompletedOffsets,
Map<String, SnapshotSplitWatermark> splitCompletedOffsets,
boolean isAssignerCompleted,
List<TableId> remainingTables,
boolean isTableIdCaseSensitive,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@
public class SnapshotSplitWatermark implements Serializable {
private static final long serialVersionUID = 1L;
private final String splitId;
private final Offset lowWatermark;
private final Offset highWatermark;
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,10 @@ private void reportFinishedSnapshotSplitsIfNeed() {

for (SnapshotSplit split : finishedUnackedSplits.values()) {
completedSnapshotSplitWatermarks.add(
new SnapshotSplitWatermark(split.splitId(), split.getHighWatermark()));
new SnapshotSplitWatermark(
split.splitId(),
split.getLowWatermark(),
split.getHighWatermark()));
}
CompletedSnapshotSplitsReportEvent reportEvent =
new CompletedSnapshotSplitsReportEvent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Map;

import static org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent.isHighWatermarkEvent;
import static org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent.isLowWatermarkEvent;
import static org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent.isWatermarkEvent;
import static org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.getFetchTimestamp;
import static org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.getMessageTimestamp;
Expand Down Expand Up @@ -107,6 +108,9 @@ protected void processElement(
throws Exception {
if (isWatermarkEvent(element)) {
Offset watermark = getWatermark(element);
if (isLowWatermarkEvent(element) && splitState.isSnapshotSplitState()) {
splitState.asSnapshotSplitState().setLowWatermark(watermark);
}
if (isHighWatermarkEvent(element) && splitState.isSnapshotSplitState()) {
splitState.asSnapshotSplitState().setHighWatermark(watermark);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ interface Context {

Tables.TableFilter getTableFilter();

boolean isExactlyOnce();

Offset getStreamOffset(SourceRecord record);

boolean isDataChangeRecord(SourceRecord record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,16 @@ public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {

if (highWatermark == null && isHighWatermarkEvent(record)) {
highWatermark = record;
// snapshot events capture end and begin to capture binlog events
reachChangeLogStart = true;
continue;
// snapshot events capture end
if (taskContext.isExactlyOnce()) {
// begin to capture binlog events
reachChangeLogStart = true;
continue;
} else {
// not support exactly-once, stop the loop
reachChangeLogEnd = true;
break;
}
}

if (reachChangeLogStart && isEndWatermarkEvent(record)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.relational.TableId;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
Expand All @@ -38,6 +39,8 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import static org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.getTableId;

/**
* Fetcher to fetch data from table split, the split is the incremental split {@link
* IncrementalSplit}.
Expand Down Expand Up @@ -146,9 +149,16 @@ public void close() {
private boolean shouldEmit(SourceRecord sourceRecord) {
if (taskContext.isDataChangeRecord(sourceRecord)) {
Offset position = taskContext.getStreamOffset(sourceRecord);
return position.isAfter(splitStartWatermark);
TableId tableId = getTableId(sourceRecord);
if (!taskContext.isExactlyOnce()) {
log.trace(
"The table {} is not support exactly-once, so ignore the watermark check",
tableId);
return position.isAfter(splitStartWatermark);
}
// TODO only the table who captured snapshot splits need to filter( Used to support
// Exactly-Once )
return position.isAfter(splitStartWatermark);
}
return true;
}
Expand Down
Loading

0 comments on commit 6d9a3e5

Please sign in to comment.