Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[minor][cdc-connector][sqlserver] Fix some words error #3421

Merged
merged 1 commit into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ protected void executeDataSnapshot(Context context) throws Exception {
sourceFetchContext.getDispatcher(),
sourceFetchContext.getSnapshotReceiver(),
snapshotSplit);
SqlserverSnapshotSplitChangeEventSourceContext changeEventSourceContext =
new SqlserverSnapshotSplitChangeEventSourceContext();
SqlServerSnapshotSplitChangeEventSourceContext changeEventSourceContext =
new SqlServerSnapshotSplitChangeEventSourceContext();
SnapshotResult<SqlServerOffsetContext> snapshotResult =
snapshotSplitReadTask.execute(
changeEventSourceContext,
Expand All @@ -106,7 +106,7 @@ protected void executeBackfillTask(Context context, StreamSplit backfillStreamSp
final SqlServerStreamFetchTask.StreamSplitReadTask backfillBinlogReadTask =
createBackFillLsnSplitReadTask(backfillStreamSplit, sourceFetchContext);
backfillBinlogReadTask.execute(
new SqlserverSnapshotSplitChangeEventSourceContext(),
new SqlServerSnapshotSplitChangeEventSourceContext(),
sourceFetchContext.getPartition(),
streamOffsetContext);
}
Expand Down Expand Up @@ -188,7 +188,7 @@ public SnapshotResult<SqlServerOffsetContext> execute(
SqlServerOffsetContext previousOffset)
throws InterruptedException {
SnapshottingTask snapshottingTask = getSnapshottingTask(partition, previousOffset);
final SqlSeverSnapshotContext ctx;
final SqlServerSnapshotContext ctx;
try {
ctx = prepare(partition);
} catch (Exception e) {
Expand All @@ -212,7 +212,7 @@ protected SnapshotResult<SqlServerOffsetContext> doExecute(
SnapshotContext<SqlServerPartition, SqlServerOffsetContext> snapshotContext,
SnapshottingTask snapshottingTask)
throws Exception {
final SqlSeverSnapshotContext ctx = (SqlSeverSnapshotContext) snapshotContext;
final SqlServerSnapshotContext ctx = (SqlServerSnapshotContext) snapshotContext;
ctx.offset = offsetContext;

LOG.info("Snapshot step 2 - Snapshotting data");
Expand All @@ -228,11 +228,11 @@ protected SnapshottingTask getSnapshottingTask(
}

@Override
protected SqlSeverSnapshotContext prepare(SqlServerPartition partition) throws Exception {
return new SqlSeverSnapshotContext(partition);
protected SqlServerSnapshotContext prepare(SqlServerPartition partition) throws Exception {
return new SqlServerSnapshotContext(partition);
}

private void createDataEvents(SqlSeverSnapshotContext snapshotContext, TableId tableId)
private void createDataEvents(SqlServerSnapshotContext snapshotContext, TableId tableId)
throws Exception {
LOG.debug("Snapshotting table {}", tableId);
createDataEventsForTable(
Expand All @@ -242,7 +242,7 @@ private void createDataEvents(SqlSeverSnapshotContext snapshotContext, TableId t

/** Dispatches the data change events for the records of a single table. */
private void createDataEventsForTable(
SqlSeverSnapshotContext snapshotContext,
SqlServerSnapshotContext snapshotContext,
EventDispatcher.SnapshotReceiver<SqlServerPartition> snapshotReceiver,
Table table)
throws InterruptedException {
Expand Down Expand Up @@ -313,7 +313,7 @@ private void createDataEventsForTable(
}

protected ChangeRecordEmitter<SqlServerPartition> getChangeRecordEmitter(
SqlSeverSnapshotContext snapshotContext, TableId tableId, Object[] row) {
SqlServerSnapshotContext snapshotContext, TableId tableId, Object[] row) {
snapshotContext.offset.event(tableId, clock.currentTime());
return new SnapshotChangeRecordEmitter<>(
snapshotContext.partition, snapshotContext.offset, row, clock);
Expand All @@ -323,11 +323,11 @@ private Threads.Timer getTableScanLogTimer() {
return Threads.timer(clock, LOG_INTERVAL);
}

private static class SqlSeverSnapshotContext
private static class SqlServerSnapshotContext
extends RelationalSnapshotChangeEventSource.RelationalSnapshotContext<
SqlServerPartition, SqlServerOffsetContext> {

public SqlSeverSnapshotContext(SqlServerPartition partition) throws SQLException {
public SqlServerSnapshotContext(SqlServerPartition partition) throws SQLException {
super(partition, "");
}
}
Expand All @@ -337,7 +337,7 @@ public SqlSeverSnapshotContext(SqlServerPartition partition) throws SQLException
* The {@link ChangeEventSource.ChangeEventSourceContext} implementation for bounded stream task
* of a snapshot split task.
*/
public class SqlserverSnapshotSplitChangeEventSourceContext
public class SqlServerSnapshotSplitChangeEventSourceContext
implements ChangeEventSource.ChangeEventSourceContext {

public void finished() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
import org.apache.flink.cdc.connectors.sqlserver.source.offset.LsnOffset;
import org.apache.flink.cdc.connectors.sqlserver.source.reader.fetch.SqlServerScanFetchTask.SqlServerSnapshotSplitChangeEventSourceContext;

import io.debezium.DebeziumException;
import io.debezium.connector.sqlserver.Lsn;
Expand Down Expand Up @@ -142,9 +143,7 @@ public void afterHandleLsn(SqlServerPartition partition, Lsn toLsn) {
new DebeziumException("Error processing binlog signal event", e));
}
// tell fetcher the streaming task finished
((SqlServerScanFetchTask.SqlserverSnapshotSplitChangeEventSourceContext)
context)
.finished();
((SqlServerSnapshotSplitChangeEventSourceContext) context).finished();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class SqlServerConnectorITCase extends SqlServerTestBase {

@ClassRule public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE;

// enable the parallelismSnapshot (i.e: The new source OracleParallelSource)
// enable the parallelismSnapshot (i.e: The new source JdbcIncrementalSource)
private final boolean parallelismSnapshot;

public SqlServerConnectorITCase(boolean parallelismSnapshot) {
Expand Down
Loading