Skip to content

Commit

Permalink
[sqlserver] Fix old change data that will be captured when the latest…
Browse files Browse the repository at this point in the history
… mode starts
  • Loading branch information
fuyun2024 committed Jun 3, 2023
1 parent 8cef4af commit 4866875
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
public class LsnFactory extends OffsetFactory {
@Override
public Offset newOffset(Map<String, String> offset) {
return new LsnOffset(Lsn.valueOf(offset.get(SourceInfo.CHANGE_LSN_KEY)));
Lsn changeLsn = Lsn.valueOf(offset.get(SourceInfo.CHANGE_LSN_KEY));
Lsn commitLsn = Lsn.valueOf(offset.get(SourceInfo.COMMIT_LSN_KEY));
return new LsnOffset(changeLsn, commitLsn, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ private SqlServerOffsetContext loadStartingOffsetState(
: sourceSplitBase.asStreamSplit().getStartingOffset();

SqlServerOffsetContext sqlServerOffsetContext = loader.load(offset.getOffset());
sqlServerOffsetContext.preSnapshotCompletion();
return sqlServerOffsetContext;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,16 @@ public static LsnOffset getLsnPosition(Map<String, ?> offset) {
offsetStrMap.put(
entry.getKey(), entry.getValue() == null ? null : entry.getValue().toString());
}
return new LsnOffset(Lsn.valueOf(offsetStrMap.get(SourceInfo.CHANGE_LSN_KEY)));
Lsn changeLsn = Lsn.valueOf(offsetStrMap.get(SourceInfo.CHANGE_LSN_KEY));
Lsn commitLsn = Lsn.valueOf(offsetStrMap.get(SourceInfo.COMMIT_LSN_KEY));
return new LsnOffset(changeLsn, commitLsn, null);
}

/** Fetch current largest log sequence number (LSN) of the database. */
public static LsnOffset currentLsn(SqlServerConnection connection) {
try {
Lsn maxLsn = connection.getMaxLsn();
return new LsnOffset(maxLsn);
return new LsnOffset(maxLsn, maxLsn, null);
} catch (SQLException e) {
throw new FlinkRuntimeException(e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.sql.Connection;
import java.sql.SQLException;
Expand All @@ -36,12 +38,14 @@
import java.util.List;
import java.util.concurrent.ExecutionException;

import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.testcontainers.containers.MSSQLServerContainer.MS_SQL_SERVER_PORT;

/** Integration tests for SqlServer Table source. */
@RunWith(Parameterized.class)
public class SqlServerConnectorITCase extends SqlServerTestBase {

private final StreamExecutionEnvironment env =
Expand All @@ -52,10 +56,28 @@ public class SqlServerConnectorITCase extends SqlServerTestBase {

@ClassRule public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE;

// enable the parallelismSnapshot (i.e: The new source OracleParallelSource)
private final boolean parallelismSnapshot;

public SqlServerConnectorITCase(boolean parallelismSnapshot) {
this.parallelismSnapshot = parallelismSnapshot;
}

@Parameterized.Parameters(name = "parallelismSnapshot: {0}")
public static Object[] parameters() {
return new Object[][] {new Object[] {false}, new Object[] {true}};
}

@Before
public void before() {
TestValuesTableFactory.clearAllData();
env.setParallelism(1);

if (parallelismSnapshot) {
env.setParallelism(4);
env.enableCheckpointing(200);
} else {
env.setParallelism(1);
}
}

@Test
Expand All @@ -75,13 +97,15 @@ public void testConsumingAllEvents()
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
MSSQL_SERVER_CONTAINER.getHost(),
MSSQL_SERVER_CONTAINER.getMappedPort(MS_SQL_SERVER_PORT),
MSSQL_SERVER_CONTAINER.getUsername(),
MSSQL_SERVER_CONTAINER.getPassword(),
parallelismSnapshot,
"inventory",
"dbo.products");
String sinkDDL =
Expand Down Expand Up @@ -160,6 +184,82 @@ public void testConsumingAllEvents()
result.getJobClient().get().cancel().get();
}

@Test
public void testStartupFromLatestOffset() throws Exception {
initializeSqlServerTable("inventory");

Connection connection = getJdbcConnection();
Statement statement = connection.createStatement();

// The following two change records will be discarded in the 'latest-offset' mode
statement.execute(
"INSERT INTO inventory.dbo.products (name,description,weight) VALUES ('jacket','water resistent white wind breaker',0.2);"); // 110
statement.execute(
"INSERT INTO inventory.dbo.products (name,description,weight) VALUES ('scooter','Big 2-wheel scooter ',5.18);");
Thread.sleep(5000L);

String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
+ " id INT NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(10,3)"
+ ") WITH ("
+ " 'connector' = 'sqlserver-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.startup.mode' = 'latest-offset'"
+ ")",
MSSQL_SERVER_CONTAINER.getHost(),
MSSQL_SERVER_CONTAINER.getMappedPort(MS_SQL_SERVER_PORT),
MSSQL_SERVER_CONTAINER.getUsername(),
MSSQL_SERVER_CONTAINER.getPassword(),
parallelismSnapshot,
"inventory",
"dbo.products");
String sinkDDL =
"CREATE TABLE sink "
+ " WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false'"
+ ") LIKE debezium_source (EXCLUDING OPTIONS)";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);

// async submit job
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");

// wait for the source startup, we don't have a better way to wait it, use sleep for now
do {
Thread.sleep(5000L);
} while (result.getJobClient().get().getJobStatus().get() != RUNNING);
Thread.sleep(30000L);

statement.execute(
"INSERT INTO inventory.dbo.products (name,description,weight) VALUES ('hammer','18oz carpenters hammer',1.2);");
statement.execute(
"INSERT INTO inventory.dbo.products (name,description,weight) VALUES ('scooter','Big 3-wheel scooter',5.20);");

waitForSinkSize("sink", 2);

String[] expected =
new String[] {
"112,hammer,18oz carpenters hammer,1.200",
"113,scooter,Big 3-wheel scooter,5.200"
};

List<String> actual = TestValuesTableFactory.getResults("sink");
assertThat(actual, containsInAnyOrder(expected));

result.getJobClient().get().cancel().get();
}

@Test
public void testAllTypes() throws Throwable {
initializeSqlServerTable("column_type_test");
Expand Down Expand Up @@ -199,13 +299,15 @@ public void testAllTypes() throws Throwable {
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
MSSQL_SERVER_CONTAINER.getHost(),
MSSQL_SERVER_CONTAINER.getMappedPort(MS_SQL_SERVER_PORT),
MSSQL_SERVER_CONTAINER.getUsername(),
MSSQL_SERVER_CONTAINER.getPassword(),
parallelismSnapshot,
"column_type_test",
"dbo.full_types");
String sinkDDL =
Expand Down Expand Up @@ -288,13 +390,15 @@ public void testMetadataColumns() throws Throwable {
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
MSSQL_SERVER_CONTAINER.getHost(),
MSSQL_SERVER_CONTAINER.getMappedPort(MS_SQL_SERVER_PORT),
MSSQL_SERVER_CONTAINER.getUsername(),
MSSQL_SERVER_CONTAINER.getPassword(),
parallelismSnapshot,
"inventory",
"dbo.products");

Expand Down

0 comments on commit 4866875

Please sign in to comment.