Skip to content

Commit

Permalink
[FLINK-36803][cdc-connector][base & mysql] Use the same format `table…
Browse files Browse the repository at this point in the history
…Id:chunkId` for splitId in SnapshotSplit (#3763)
  • Loading branch information
ruanhang1993 authored Dec 2, 2024
1 parent 3fd4cb6 commit 23e8149
Show file tree
Hide file tree
Showing 15 changed files with 109 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -417,17 +417,7 @@ private SnapshotSplit createSnapshotSplit(
Map<TableId, TableChanges.TableChange> schema = new HashMap<>();
schema.put(tableId, dialect.queryTableSchema(jdbc, tableId));
return new SnapshotSplit(
tableId,
splitId(tableId, chunkId),
splitKeyType,
splitStart,
splitEnd,
null,
schema);
}

private String splitId(TableId tableId, int chunkId) {
return tableId.toString() + ":" + chunkId;
tableId, chunkId, splitKeyType, splitStart, splitEnd, null, schema);
}

private void maySleep(int count, TableId tableId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.cdc.connectors.base.source.meta.split;

import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.table.types.logical.RowType;

Expand Down Expand Up @@ -44,6 +45,35 @@ public class SnapshotSplit extends SourceSplitBase {

@Nullable transient byte[] serializedFormCache;

/**
* Create a SnapshotSplit with generating splitId with the given tableId and chunkId.
*
* @see #generateSplitId(TableId, int)
*/
public SnapshotSplit(
TableId tableId,
int chunkId,
RowType splitKeyType,
Object[] splitStart,
Object[] splitEnd,
Offset highWatermark,
Map<TableId, TableChange> tableSchemas) {
super(generateSplitId(tableId, chunkId));
this.tableId = tableId;
this.splitKeyType = splitKeyType;
this.splitStart = splitStart;
this.splitEnd = splitEnd;
this.highWatermark = highWatermark;
this.tableSchemas = tableSchemas;
}

/**
* This constructor should not be used directly. Please use the other constructor. If this
* constructor must be invoked, please use the same format for the splitId as {@link
* #generateSplitId(TableId, int)}. Or else the parsing method will fail. See more in {@link
* #extractTableId(String)} and {@link #extractChunkId(String)}.
*/
@Internal
public SnapshotSplit(
TableId tableId,
String splitId,
Expand Down Expand Up @@ -95,6 +125,18 @@ public final SchemalessSnapshotSplit toSchemalessSnapshotSplit() {
tableId, splitId, splitKeyType, splitStart, splitEnd, highWatermark);
}

public static String generateSplitId(TableId tableId, int chunkId) {
return tableId.toString() + ":" + chunkId;
}

public static TableId extractTableId(String splitId) {
return TableId.parse(splitId.substring(0, splitId.lastIndexOf(":")));
}

public static int extractChunkId(String splitId) {
return Integer.parseInt(splitId.substring(splitId.lastIndexOf(":") + 1));
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import java.util.HashMap;
import java.util.Map;

import static org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit.generateSplitId;

/** Tests for {@link PendingSplitsStateSerializer}. */
public class PendingSplitsStateSerializerTest {

Expand Down Expand Up @@ -152,7 +154,7 @@ public Offset createNoStoppingOffset() {
private SchemalessSnapshotSplit constuctSchemalessSnapshotSplit() {
return new SchemalessSnapshotSplit(
tableId,
"test",
generateSplitId(tableId, 0),
new RowType(
Collections.singletonList(new RowType.RowField("id", new BigIntType()))),
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void testStreamSplitBackwardCompatibility() throws IOException {
private SnapshotSplit constuctSnapshotSplit() {
return new SnapshotSplit(
new TableId("cata`log\"", "s\"che`ma", "ta\"ble.1`"),
"test",
0,
new RowType(
Collections.singletonList(new RowType.RowField("id", new BigIntType()))),
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public Collection<SnapshotSplit> split(SplitContext splitContext) {
SnapshotSplit firstSplit =
new SnapshotSplit(
collectionId,
splitId(collectionId, 0),
0,
rowType,
ChunkUtils.minLowerBoundOfId(),
ChunkUtils.boundOfId(lowerBoundOfBucket(chunks.get(0))),
Expand All @@ -142,7 +142,7 @@ public Collection<SnapshotSplit> split(SplitContext splitContext) {
snapshotSplits.add(
new SnapshotSplit(
collectionId,
splitId(collectionId, i + 1),
i + 1,
rowType,
ChunkUtils.boundOfId(lowerBoundOfBucket(bucket)),
ChunkUtils.boundOfId(upperBoundOfBucket(bucket)),
Expand All @@ -153,7 +153,7 @@ public Collection<SnapshotSplit> split(SplitContext splitContext) {
SnapshotSplit lastSplit =
new SnapshotSplit(
collectionId,
splitId(collectionId, chunks.size() + 1),
chunks.size() + 1,
rowType,
ChunkUtils.boundOfId(upperBoundOfBucket(chunks.get(chunks.size() - 1))),
ChunkUtils.maxUpperBoundOfId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ && isNotShardedByHash(collectionMetadata))) {
snapshotSplits.add(
new SnapshotSplit(
collectionId,
splitId(collectionId, i),
i,
rowType,
new Object[] {splitKeys, chunk.getDocument(MIN_FIELD)},
new Object[] {splitKeys, chunk.getDocument(MAX_FIELD)},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public Collection<SnapshotSplit> split(SplitContext splitContext) {
SnapshotSplit snapshotSplit =
new SnapshotSplit(
collectionId,
splitId(collectionId, 0),
0,
shardKeysToRowType(singleton(ID_FIELD)),
ChunkUtils.minLowerBoundOfId(),
ChunkUtils.maxUpperBoundOfId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.logical.RowType;

import io.debezium.relational.TableId;
import org.bson.BsonDocument;

import java.util.Collection;
Expand All @@ -36,10 +35,6 @@ public interface SplitStrategy {

Collection<SnapshotSplit> split(SplitContext splitContext);

default String splitId(TableId collectionId, int chunkId) {
return collectionId.identifier() + ":" + chunkId;
}

default RowType shardKeysToRowType(BsonDocument shardKeys) {
return shardKeysToRowType(shardKeys.keySet());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public Collection<SnapshotSplit> split(SplitContext splitContext) {
snapshotSplits.add(
new SnapshotSplit(
collectionId,
splitId(collectionId, i),
i,
rowType,
ChunkUtils.boundOfId(lowerValue),
ChunkUtils.boundOfId(splitKeyValue),
Expand All @@ -125,7 +125,7 @@ public Collection<SnapshotSplit> split(SplitContext splitContext) {
SnapshotSplit lastSplit =
new SnapshotSplit(
collectionId,
splitId(collectionId, splitKeys.size()),
splitKeys.size(),
rowType,
ChunkUtils.boundOfId(lowerValue),
ChunkUtils.maxUpperBoundOfId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,13 +369,7 @@ private MySqlSnapshotSplit createSnapshotSplit(
Map<TableId, TableChange> schema = new HashMap<>();
schema.put(tableId, mySqlSchema.getTableSchema(partition, jdbc, tableId));
return new MySqlSnapshotSplit(
tableId,
splitId(tableId, chunkId),
splitKeyType,
splitStart,
splitEnd,
null,
schema);
tableId, chunkId, splitKeyType, splitStart, splitEnd, null, schema);
}

// ------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -455,10 +449,6 @@ private double calculateDistributionFactor(
return distributionFactor;
}

private static String splitId(TableId tableId, int chunkId) {
return tableId.toString() + ":" + chunkId;
}

private static void maySleep(int count, TableId tableId) {
// every 10 queries to sleep 0.1s
if (count % 10 == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.cdc.connectors.mysql.source.split;

import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.table.types.logical.RowType;

Expand Down Expand Up @@ -44,6 +45,35 @@ public class MySqlSnapshotSplit extends MySqlSplit {

@Nullable transient byte[] serializedFormCache;

/**
* Create a SnapshotSplit with generating splitId with the given tableId and chunkId.
*
* @see #generateSplitId(TableId, int)
*/
public MySqlSnapshotSplit(
TableId tableId,
int chunkId,
RowType splitKeyType,
Object[] splitStart,
Object[] splitEnd,
BinlogOffset highWatermark,
Map<TableId, TableChange> tableSchemas) {
super(generateSplitId(tableId, chunkId));
this.tableId = tableId;
this.splitKeyType = splitKeyType;
this.splitStart = splitStart;
this.splitEnd = splitEnd;
this.highWatermark = highWatermark;
this.tableSchemas = tableSchemas;
}

/**
* This constructor should not be used directly. Please use the other constructor. If this
* constructor must be invoked, please use the same format for the splitId as {@link
* #generateSplitId(TableId, int)}. Or else the parsing method will fail. See more in {@link
* #extractTableId(String)} and {@link #extractChunkId(String)}.
*/
@Internal
public MySqlSnapshotSplit(
TableId tableId,
String splitId,
Expand Down Expand Up @@ -95,6 +125,18 @@ public final MySqlSchemalessSnapshotSplit toSchemalessSnapshotSplit() {
tableId, splitId, splitKeyType, splitStart, splitEnd, highWatermark);
}

public static String generateSplitId(TableId tableId, int chunkId) {
return tableId.toString() + ":" + chunkId;
}

public static TableId extractTableId(String splitId) {
return TableId.parse(splitId.substring(0, splitId.lastIndexOf(":")));
}

public static int extractChunkId(String splitId) {
return Integer.parseInt(splitId.substring(splitId.lastIndexOf(":") + 1));
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.List;
import java.util.Map;

import static org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit.generateSplitId;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;

Expand Down Expand Up @@ -190,7 +191,7 @@ private static MySqlSchemalessSnapshotSplit getTestSchemalessSnapshotSplit(
TableId tableId, int splitNo) {
return new MySqlSchemalessSnapshotSplit(
tableId,
tableId.toString() + "-" + splitNo,
generateSplitId(tableId, splitNo),
new RowType(
Collections.singletonList(new RowType.RowField("id", new BigIntType()))),
new Object[] {100L + splitNo * 1000L},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,23 +149,11 @@ public void testRemoveTableUsingStateFromSnapshotPhase() throws Exception {
snapshotSplits =
Collections.singletonList(
new MySqlSnapshotSplit(
tableId0,
tableId0 + ":0",
splitType,
null,
null,
null,
tableSchemas));
tableId0, 0, splitType, null, null, null, tableSchemas));
toRemoveSplits =
Collections.singletonList(
new MySqlSnapshotSplit(
tableId1,
tableId1 + ":0",
splitType,
null,
null,
null,
tableSchemas));
tableId1, 0, splitType, null, null, null, tableSchemas));
}

// Step 1: start source reader and assign snapshot splits
Expand Down Expand Up @@ -254,23 +242,23 @@ public void testFinishedUnackedSplitsUsingStateFromSnapshotPhase() throws Except
Arrays.asList(
new MySqlSnapshotSplit(
tableId,
tableId + ":0",
0,
splitType,
null,
new Integer[] {200},
null,
tableSchemas),
new MySqlSnapshotSplit(
tableId,
tableId + ":1",
1,
splitType,
new Integer[] {200},
new Integer[] {1500},
null,
tableSchemas),
new MySqlSnapshotSplit(
tableId,
tableId + ":2",
2,
splitType,
new Integer[] {1500},
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void testSnapshotSplit() throws Exception {
final MySqlSplit split =
new MySqlSnapshotSplit(
TableId.parse("test_db.test_table"),
"test_db.test_table-1",
1,
new RowType(
Collections.singletonList(
new RowType.RowField("id", new BigIntType()))),
Expand Down Expand Up @@ -122,7 +122,7 @@ public void testRepeatedSerializationCache() throws Exception {
final MySqlSplit split =
new MySqlSnapshotSplit(
TableId.parse("test_db.test_table"),
"test_db.test_table-0",
0,
new RowType(
Collections.singletonList(
new RowType.RowField("id", new BigIntType()))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void testFromToSplit() {
final MySqlSnapshotSplit split =
new MySqlSnapshotSplit(
TableId.parse("test_db.test_table"),
"test_db.test_table-1",
1,
new RowType(
Collections.singletonList(
new RowType.RowField("id", new BigIntType()))),
Expand All @@ -58,7 +58,7 @@ public void testRecordSnapshotSplitState() {
final MySqlSnapshotSplit split =
new MySqlSnapshotSplit(
TableId.parse("test_db.test_table"),
"test_db.test_table-1",
1,
new RowType(
Collections.singletonList(
new RowType.RowField("id", new BigIntType()))),
Expand All @@ -73,7 +73,7 @@ public void testRecordSnapshotSplitState() {
final MySqlSnapshotSplit expected =
new MySqlSnapshotSplit(
TableId.parse("test_db.test_table"),
"test_db.test_table-1",
1,
new RowType(
Collections.singletonList(
new RowType.RowField("id", new BigIntType()))),
Expand Down

0 comments on commit 23e8149

Please sign in to comment.