diff --git a/cdc/owner.go b/cdc/owner.go index df60bbf9e06..cefc534cf65 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -45,10 +45,13 @@ import ( // Owner manages the cdc cluster type Owner struct { - done chan struct{} - session *concurrency.Session - changeFeeds map[model.ChangeFeedID]*changeFeed - failedFeeds map[model.ChangeFeedID]struct{} + done chan struct{} + session *concurrency.Session + changeFeeds map[model.ChangeFeedID]*changeFeed + // failInitFeeds record changefeeds that meet error during initialization + failInitFeeds map[model.ChangeFeedID]struct{} + // stoppedFeeds record changefeeds that meet running error + stoppedFeeds map[model.ChangeFeedID]*model.ChangeFeedStatus rebalanceTigger map[model.ChangeFeedID]bool rebalanceForAllChangefeed bool manualScheduleCommand map[model.ChangeFeedID][]*model.MoveTableJob @@ -88,7 +91,8 @@ func NewOwner(pdClient pd.Client, credential *security.Credential, sess *concurr pdClient: pdClient, credential: credential, changeFeeds: make(map[model.ChangeFeedID]*changeFeed), - failedFeeds: make(map[model.ChangeFeedID]struct{}), + failInitFeeds: make(map[model.ChangeFeedID]struct{}), + stoppedFeeds: make(map[model.ChangeFeedID]*model.ChangeFeedStatus), captures: make(map[model.CaptureID]*model.CaptureInfo), rebalanceTigger: make(map[model.ChangeFeedID]bool), manualScheduleCommand: make(map[model.ChangeFeedID][]*model.MoveTableJob), @@ -408,16 +412,16 @@ func (o *Owner) loadChangeFeeds(ctx context.Context) error { return err } if cfInfo.State == model.StateFailed { - if _, ok := o.failedFeeds[changeFeedID]; ok { + if _, ok := o.failInitFeeds[changeFeedID]; ok { continue } log.Warn("changefeed is not in normal state", zap.String("changefeed", changeFeedID)) - o.failedFeeds[changeFeedID] = struct{}{} + o.failInitFeeds[changeFeedID] = struct{}{} continue } - if _, ok := o.failedFeeds[changeFeedID]; ok { + if _, ok := o.failInitFeeds[changeFeedID]; ok { log.Info("changefeed recovered from failure", zap.String("changefeed", changeFeedID)) - delete(o.failedFeeds, changeFeedID) + delete(o.failInitFeeds, changeFeedID) } needSave, canInit := cfInfo.CheckErrorHistory() if needSave { @@ -443,7 +447,15 @@ func (o *Owner) loadChangeFeeds(ctx context.Context) error { return err } if status != nil && (status.AdminJobType == model.AdminStop || status.AdminJobType == model.AdminRemove) { - continue + switch status.AdminJobType { + case model.AdminStop: + if _, ok := o.stoppedFeeds[changeFeedID]; !ok { + o.stoppedFeeds[changeFeedID] = status + } + continue + case model.AdminRemove: + continue + } } checkpointTs := cfInfo.GetCheckpointTs(status) @@ -474,6 +486,7 @@ func (o *Owner) loadChangeFeeds(ctx context.Context) error { return errors.Annotatef(err, "create change feed %s", changeFeedID) } o.changeFeeds[changeFeedID] = newCf + delete(o.stoppedFeeds, changeFeedID) } o.adminJobsLock.Lock() for cfID, err := range errorFeeds { @@ -521,6 +534,8 @@ func (o *Owner) balanceTables(ctx context.Context) error { } func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { + // if no active changefeeds, we can skip both changefeed status flush and + // GC safepoint update, the GC safepoint will keep the old value. if len(o.changeFeeds) == 0 { return nil } @@ -536,6 +551,12 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { if err != nil { return errors.Trace(err) } + + for _, status := range o.stoppedFeeds { + if status.CheckpointTs < minCheckpointTs { + minCheckpointTs = status.CheckpointTs + } + } _, err = o.pdClient.UpdateServiceGCSafePoint(ctx, CDCServiceSafePointID, o.gcTTL, minCheckpointTs) if err != nil { log.Info("failed to update service safe point", zap.Error(err)) @@ -604,6 +625,12 @@ func (o *Owner) dispatchJob(ctx context.Context, job model.AdminJob) error { log.Info("stop changefeed ddl handler", zap.String("changefeed id", job.CfID), util.ZapErrorFilter(err, context.Canceled)) err = cf.sink.Close() log.Info("stop changefeed sink", zap.String("changefeed id", job.CfID), util.ZapErrorFilter(err, context.Canceled)) + // Only need to process stoppedFeeds with `AdminStop` command here. + // For `AdminResume`, we remove stopped feed in changefeed initialization phase. + // For `AdminRemove`, we need to update stoppedFeeds when removing a stopped changefeed. + if job.Type == model.AdminStop { + o.stoppedFeeds[job.CfID] = cf.status + } delete(o.changeFeeds, job.CfID) return nil } @@ -727,6 +754,7 @@ func (o *Owner) handleAdminJob(ctx context.Context) error { if err != nil { return errors.Trace(err) } + delete(o.stoppedFeeds, job.CfID) default: return errors.Errorf("changefeed in abnormal state: %s, replication status: %+v", feedState, status) } diff --git a/tests/gc_safepoint/conf/diff_config.toml b/tests/gc_safepoint/conf/diff_config.toml new file mode 100644 index 00000000000..5c842456257 --- /dev/null +++ b/tests/gc_safepoint/conf/diff_config.toml @@ -0,0 +1,27 @@ +# diff Configuration. + +log-level = "info" +chunk-size = 10 +check-thread-count = 4 +sample-percent = 100 +use-rowid = false +use-checksum = true +fix-sql-file = "fix.sql" + +# tables need to check. +[[check-tables]] + schema = "gc_safepoint" + tables = ["simple"] + +[[source-db]] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + instance-id = "source-1" + +[target-db] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/gc_safepoint/run.sh b/tests/gc_safepoint/run.sh new file mode 100755 index 00000000000..374741165bc --- /dev/null +++ b/tests/gc_safepoint/run.sh @@ -0,0 +1,114 @@ +#!/bin/bash + +set -e + +CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 +MAX_RETRIES=5 + +function get_safepoint() { + pd_addr=$1 + pd_cluster_id=$2 + safe_point=$(etcdctl get --endpoints=$pd_addr /pd/$pd_cluster_id/gc/safe_point/service/ticdc|grep -oE "SafePoint\":[0-9]+"|grep -oE "[0-9]+") + echo $safe_point +} + +function check_safepoint_forward() { + pd_addr=$1 + pd_cluster_id=$2 + safe_point1=$(get_safepoint $pd_addr $pd_cluster_id) + sleep 1 + safe_point2=$(get_safepoint $pd_addr $pd_cluster_id) + if [[ "$safe_point1" == "$safe_point2" ]]; then + echo "safepoint $safe_point1 is not forward" + exit 1 + fi +} + +function check_safepoint_equal() { + pd_addr=$1 + pd_cluster_id=$2 + safe_point1=$(get_safepoint $pd_addr $pd_cluster_id) + for i in $(seq 1 3); do + sleep 1 + safe_point2=$(get_safepoint $pd_addr $pd_cluster_id) + if [[ "$safe_point1" != "$safe_point2" ]]; then + echo "safepoint is unexpected forward: $safe_point1 -> $safe_point2" + exit 1 + fi + done +} + +function check_changefeed_state() { + pd_addr=$1 + changefeed_id=$2 + expected=$3 + state=$(cdc cli --pd=$pd_addr changefeed query -s -c $changefeed_id|jq -r ".state") + if [[ "$state" != "$expected" ]];then + echo "unexpected state $state, expected $expected" + exit 1 + fi +} + +export -f get_safepoint +export -f check_safepoint_forward +export -f check_safepoint_equal +export -f check_changefeed_state + +function run() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + start_tidb_cluster --workdir $WORK_DIR + cd $WORK_DIR + + pd_addr="http://$UP_PD_HOST:$UP_PD_PORT" + TOPIC_NAME="ticdc-gc-safepoint-$RANDOM" + case $SINK_TYPE in + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4";; + *) SINK_URI="mysql://root@127.0.0.1:3306/?max-txn-row=1";; + esac + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4" + fi + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr + changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}') + + run_sql "CREATE DATABASE gc_safepoint;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "CREATE table gc_safepoint.simple(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "INSERT INTO gc_safepoint.simple VALUES (),();" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + pd_cluster_id=$(curl -s $pd_addr/pd/api/v1/cluster|grep -oE "id\":\s[0-9]+"|grep -oE "[0-9]+") + ensure $MAX_RETRIES check_safepoint_forward $pd_addr $pd_cluster_id + + # after the changefeed is paused, the safe_point will be not updated + cdc cli changefeed pause --changefeed-id=$changefeed_id --pd=$pd_addr + ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "stopped" + ensure $MAX_RETRIES check_safepoint_equal $pd_addr $pd_cluster_id + + # resume changefeed will recover the safe_point forward + cdc cli changefeed resume --changefeed-id=$changefeed_id --pd=$pd_addr + ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "normal" + ensure $MAX_RETRIES check_safepoint_forward $pd_addr $pd_cluster_id + + cdc cli changefeed pause --changefeed-id=$changefeed_id --pd=$pd_addr + ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "stopped" + # create another changefeed, because there exists a paused changefeed, + # the safe_point still does not forward + changefeed_id2=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}') + ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id2 "normal" + ensure $MAX_RETRIES check_safepoint_equal $pd_addr $pd_cluster_id + + # remove paused changefeed, the safe_point forward will recover + cdc cli changefeed remove --changefeed-id=$changefeed_id --pd=$pd_addr + ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "removed" + ensure $MAX_RETRIES check_safepoint_forward $pd_addr $pd_cluster_id + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"