Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

owner: fix a bug about wrong timing sequence of DDL when add a table to processor #450

Merged
merged 20 commits into from
Apr 10, 2020
37 changes: 25 additions & 12 deletions cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,12 @@ type changeFeed struct {
schemas map[uint64]tableIDMap
tables map[uint64]entry.TableName
orphanTables map[uint64]model.ProcessTableInfo
waitingConfirmTables map[uint64]string
toCleanTables map[uint64]struct{}
infoWriter *storage.OwnerTaskStatusEtcdWriter
waitingConfirmTables map[uint64]struct {
captureID string
startTs uint64
}
toCleanTables map[uint64]struct{}
infoWriter *storage.OwnerTaskStatusEtcdWriter
}

// String implements fmt.Stringer interface.
Expand Down Expand Up @@ -261,16 +264,16 @@ func (c *changeFeed) banlanceOrphanTables(ctx context.Context, captures map[stri
return nil
}

for tableID, captureID := range c.waitingConfirmTables {
lockStatus, err := c.infoWriter.CheckLock(ctx, c.id, captureID)
for tableID, waitingConfirm := range c.waitingConfirmTables {
lockStatus, err := c.infoWriter.CheckLock(ctx, c.id, waitingConfirm.captureID)
if err != nil {
return errors.Trace(err)
}
switch lockStatus {
case model.TableNoLock:
delete(c.waitingConfirmTables, tableID)
case model.TablePLock:
log.Debug("waiting the c-lock", zap.Uint64("tableID", tableID), zap.String("captureID", captureID))
log.Debug("waiting the c-lock", zap.Uint64("tableID", tableID), zap.Reflect("waitingConfirm", waitingConfirm))
case model.TablePLockCommited:
delete(c.waitingConfirmTables, tableID)
}
Expand Down Expand Up @@ -308,7 +311,10 @@ func (c *changeFeed) banlanceOrphanTables(ctx context.Context, captures map[stri
zap.Uint64("start ts", orphan.StartTs),
zap.String("capture", captureID))
delete(c.orphanTables, tableID)
c.waitingConfirmTables[tableID] = captureID
c.waitingConfirmTables[tableID] = struct {
captureID string
startTs uint64
}{captureID: captureID, startTs: orphan.StartTs}
default:
c.restoreTableInfos(infoClone, captureID)
log.Error("fail to put sub changefeed info", zap.Error(err))
Expand Down Expand Up @@ -468,9 +474,6 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error {
if len(c.orphanTables) > 0 {
return nil
}
if len(c.waitingConfirmTables) > 0 {
return nil
}

minResolvedTs := c.targetTs
minCheckpointTs := c.targetTs
Expand All @@ -490,6 +493,15 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error {
minCheckpointTs = position.CheckPointTs
}
}
for _, waitingConfirm := range c.waitingConfirmTables {
if minResolvedTs > waitingConfirm.startTs {
minResolvedTs = waitingConfirm.startTs
}

if minCheckpointTs > waitingConfirm.startTs {
minCheckpointTs = waitingConfirm.startTs
}
}
}

// if minResolvedTs is greater than ddlResolvedTs,
Expand Down Expand Up @@ -521,15 +533,16 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error {
minCheckpointTs = minResolvedTs
}

c.status.ResolvedTs = minResolvedTs
c.status.CheckpointTs = minCheckpointTs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these two lines have conflict with L541 and L545. c.sink.EmitCheckpointEvent will be never called?


var tsUpdated bool

if minResolvedTs > c.status.ResolvedTs {
c.status.ResolvedTs = minResolvedTs
tsUpdated = true
}

if minCheckpointTs > c.status.CheckpointTs {
c.status.CheckpointTs = minCheckpointTs
err := c.sink.EmitCheckpointEvent(ctx, minCheckpointTs)
if err != nil {
return errors.Trace(err)
Expand Down
21 changes: 12 additions & 9 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,15 +209,18 @@ func (o *Owner) newChangeFeed(
}()

cf := &changeFeed{
info: info,
id: id,
ddlHandler: ddlHandler,
schema: schemaStorage,
schemas: schemas,
tables: tables,
orphanTables: orphanTables,
waitingConfirmTables: make(map[uint64]string),
toCleanTables: make(map[uint64]struct{}),
info: info,
id: id,
ddlHandler: ddlHandler,
schema: schemaStorage,
schemas: schemas,
tables: tables,
orphanTables: orphanTables,
waitingConfirmTables: make(map[uint64]struct {
captureID string
startTs uint64
}),
toCleanTables: make(map[uint64]struct{}),
status: &model.ChangeFeedStatus{
ResolvedTs: 0,
CheckpointTs: checkpointTs,
Expand Down
12 changes: 9 additions & 3 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,10 @@ func (p *processor) updateInfo(ctx context.Context) error {
}
p.handleTables(ctx, oldStatus, p.status, p.position.CheckPointTs)
syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureID).Set(float64(len(p.status.TableInfos)))

err = updatePosition()
if err != nil {
return errors.Trace(err)
}
err = retry.Run(500*time.Millisecond, 5, func() error {
err = p.tsRWriter.WriteInfoIntoStorage(ctx)
switch errors.Cause(err) {
Expand All @@ -393,7 +396,7 @@ func (p *processor) updateInfo(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
return updatePosition()
return nil
}

func diffProcessTableInfos(oldInfo, newInfo []*model.ProcessTableInfo) (removed, added []*model.ProcessTableInfo) {
Expand Down Expand Up @@ -603,7 +606,7 @@ func (p *processor) addTable(ctx context.Context, tableID int64, startTs uint64)
defer p.tablesMu.Unlock()
ctx = util.PutTableIDInCtx(ctx, tableID)

log.Debug("Add table", zap.Int64("tableID", tableID))
log.Debug("Add table", zap.Int64("tableID", tableID), zap.Uint64("startTs", startTs))
if _, ok := p.tables[tableID]; ok {
log.Warn("Ignore existing table", zap.Int64("ID", tableID))
}
Expand Down Expand Up @@ -681,6 +684,9 @@ func (p *processor) addTable(ctx context.Context, tableID int64, startTs uint64)
if p.position.CheckPointTs > startTs {
p.position.CheckPointTs = startTs
}
if p.position.ResolvedTs > startTs {
p.position.ResolvedTs = startTs
}
syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureID).Inc()
p.collectMetrics(ctx, tableID)
}
Expand Down
2 changes: 1 addition & 1 deletion scripts/jenkins_ci/integration_test_common.groovy
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
test_case_names = ["simple", "cdc", "multi_capture", "split_region", "row_format", "tiflash", "availability"]
test_case_names = ["simple", "cdc", "multi_capture", "split_region", "row_format", "tiflash", "availability", "ddl_sequence"]

def prepare_binaries() {
stage('Prepare Binaries') {
Expand Down
27 changes: 27 additions & 0 deletions tests/ddl_sequence/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# diff Configuration.

log-level = "info"
chunk-size = 10
check-thread-count = 4
sample-percent = 100
use-rowid = false
use-checksum = true
fix-sql-file = "fix.sql"

# tables need to check.
[[check-tables]]
schema = "ddl_sequence"
tables = ["~.*"]

[[source-db]]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""
instance-id = "source-1"

[target-db]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
45 changes: 45 additions & 0 deletions tests/ddl_sequence/data/prepare.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
drop database if exists `ddl_sequence`;
create database `ddl_sequence`;
use `ddl_sequence`;

CREATE TABLE many_cols1 (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);
ALTER TABLE many_cols1 DROP COLUMN col0;
INSERT INTO many_cols1 (val) VALUES (1);

CREATE TABLE many_cols2 (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);
ALTER TABLE many_cols2 DROP COLUMN col0;
INSERT INTO many_cols2 (val) VALUES (1);

CREATE TABLE many_cols3 (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);
ALTER TABLE many_cols3 DROP COLUMN col0;
INSERT INTO many_cols3 (val) VALUES (1);

CREATE TABLE many_cols4 (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);
ALTER TABLE many_cols4 DROP COLUMN col0;
INSERT INTO many_cols4 (val) VALUES (1);

CREATE TABLE many_cols5 (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);
ALTER TABLE many_cols5 DROP COLUMN col0;
INSERT INTO many_cols5 (val) VALUES (1);

CREATE TABLE finish_mark(a int primary key)
43 changes: 43 additions & 0 deletions tests/ddl_sequence/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#!/bin/bash

set -e

CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

function run() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster $WORK_DIR

cd $WORK_DIR

# record tso before we create tables to skip the system table DDLs
start_ts=$(cdc cli tso query --pd=http://$UP_PD_HOST:$UP_PD_PORT)

run_cdc_server $WORK_DIR $CDC_BINARY

TOPIC_NAME="ticdc-ddl-sequence-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4";;
mysql) ;&
*) SINK_URI="mysql://root@127.0.0.1:3306/";;
esac
cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4"
fi
run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# sync_diff can't check non-exist table, so we check expected tables are created in downstream first
check_table_exists ddl_sequence.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"