Skip to content

Commit

Permalink
owner: includes stopped changefeed when calculating gc safepoint
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei committed Jul 27, 2020
1 parent f79d3d6 commit 2466328
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 10 deletions.
48 changes: 38 additions & 10 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@ import (

// Owner manages the cdc cluster
type Owner struct {
done chan struct{}
session *concurrency.Session
changeFeeds map[model.ChangeFeedID]*changeFeed
failedFeeds map[model.ChangeFeedID]struct{}
done chan struct{}
session *concurrency.Session
changeFeeds map[model.ChangeFeedID]*changeFeed
// failInitFeeds record changefeeds that meet error during initialization
failInitFeeds map[model.ChangeFeedID]struct{}
// stoppedFeeds record changefeeds that meet running error
stoppedFeeds map[model.ChangeFeedID]*model.ChangeFeedStatus
rebalanceTigger map[model.ChangeFeedID]bool
rebalanceForAllChangefeed bool
manualScheduleCommand map[model.ChangeFeedID][]*model.MoveTableJob
Expand Down Expand Up @@ -88,7 +91,8 @@ func NewOwner(pdClient pd.Client, credential *security.Credential, sess *concurr
pdClient: pdClient,
credential: credential,
changeFeeds: make(map[model.ChangeFeedID]*changeFeed),
failedFeeds: make(map[model.ChangeFeedID]struct{}),
failInitFeeds: make(map[model.ChangeFeedID]struct{}),
stoppedFeeds: make(map[model.ChangeFeedID]*model.ChangeFeedStatus),
captures: make(map[model.CaptureID]*model.CaptureInfo),
rebalanceTigger: make(map[model.ChangeFeedID]bool),
manualScheduleCommand: make(map[model.ChangeFeedID][]*model.MoveTableJob),
Expand Down Expand Up @@ -408,16 +412,16 @@ func (o *Owner) loadChangeFeeds(ctx context.Context) error {
return err
}
if cfInfo.State == model.StateFailed {
if _, ok := o.failedFeeds[changeFeedID]; ok {
if _, ok := o.failInitFeeds[changeFeedID]; ok {
continue
}
log.Warn("changefeed is not in normal state", zap.String("changefeed", changeFeedID))
o.failedFeeds[changeFeedID] = struct{}{}
o.failInitFeeds[changeFeedID] = struct{}{}
continue
}
if _, ok := o.failedFeeds[changeFeedID]; ok {
if _, ok := o.failInitFeeds[changeFeedID]; ok {
log.Info("changefeed recovered from failure", zap.String("changefeed", changeFeedID))
delete(o.failedFeeds, changeFeedID)
delete(o.failInitFeeds, changeFeedID)
}
needSave, canInit := cfInfo.CheckErrorHistory()
if needSave {
Expand All @@ -443,7 +447,15 @@ func (o *Owner) loadChangeFeeds(ctx context.Context) error {
return err
}
if status != nil && (status.AdminJobType == model.AdminStop || status.AdminJobType == model.AdminRemove) {
continue
switch status.AdminJobType {
case model.AdminStop:
if _, ok := o.stoppedFeeds[changeFeedID]; !ok {
o.stoppedFeeds[changeFeedID] = status
}
continue
case model.AdminRemove:
continue
}
}
checkpointTs := cfInfo.GetCheckpointTs(status)

Expand Down Expand Up @@ -474,6 +486,7 @@ func (o *Owner) loadChangeFeeds(ctx context.Context) error {
return errors.Annotatef(err, "create change feed %s", changeFeedID)
}
o.changeFeeds[changeFeedID] = newCf
delete(o.stoppedFeeds, changeFeedID)
}
o.adminJobsLock.Lock()
for cfID, err := range errorFeeds {
Expand Down Expand Up @@ -521,6 +534,8 @@ func (o *Owner) balanceTables(ctx context.Context) error {
}

func (o *Owner) flushChangeFeedInfos(ctx context.Context) error {
// if no active changefeeds, we can skip both changefeed status flush and
// GC safepoint update, the GC safepoint will keep the old value.
if len(o.changeFeeds) == 0 {
return nil
}
Expand All @@ -536,6 +551,12 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}

for _, status := range o.stoppedFeeds {
if status.CheckpointTs < minCheckpointTs {
minCheckpointTs = status.CheckpointTs
}
}
_, err = o.pdClient.UpdateServiceGCSafePoint(ctx, CDCServiceSafePointID, o.gcTTL, minCheckpointTs)
if err != nil {
log.Info("failed to update service safe point", zap.Error(err))
Expand Down Expand Up @@ -604,6 +625,12 @@ func (o *Owner) dispatchJob(ctx context.Context, job model.AdminJob) error {
log.Info("stop changefeed ddl handler", zap.String("changefeed id", job.CfID), util.ZapErrorFilter(err, context.Canceled))
err = cf.sink.Close()
log.Info("stop changefeed sink", zap.String("changefeed id", job.CfID), util.ZapErrorFilter(err, context.Canceled))
// Only need to process stoppedFeeds with `AdminStop` command here.
// For `AdminResume`, we remove stopped feed in changefeed initialization phase.
// For `AdminRemove`, we need to update stoppedFeeds when removing a stopped changefeed.
if job.Type == model.AdminStop {
o.stoppedFeeds[job.CfID] = cf.status
}
delete(o.changeFeeds, job.CfID)
return nil
}
Expand Down Expand Up @@ -727,6 +754,7 @@ func (o *Owner) handleAdminJob(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
delete(o.stoppedFeeds, job.CfID)
default:
return errors.Errorf("changefeed in abnormal state: %s, replication status: %+v", feedState, status)
}
Expand Down
27 changes: 27 additions & 0 deletions tests/gc_safepoint/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# diff Configuration.

log-level = "info"
chunk-size = 10
check-thread-count = 4
sample-percent = 100
use-rowid = false
use-checksum = true
fix-sql-file = "fix.sql"

# tables need to check.
[[check-tables]]
schema = "gc_safepoint"
tables = ["simple"]

[[source-db]]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""
instance-id = "source-1"

[target-db]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
114 changes: 114 additions & 0 deletions tests/gc_safepoint/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#!/bin/bash

set -e

CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1
MAX_RETRIES=5

function get_safepoint() {
pd_addr=$1
pd_cluster_id=$2
safe_point=$(etcdctl get --endpoints=$pd_addr /pd/$pd_cluster_id/gc/safe_point/service/ticdc|grep -oE "SafePoint\":[0-9]+"|grep -oE "[0-9]+")
echo $safe_point
}

function check_safepoint_forward() {
pd_addr=$1
pd_cluster_id=$2
safe_point1=$(get_safepoint $pd_addr $pd_cluster_id)
sleep 1
safe_point2=$(get_safepoint $pd_addr $pd_cluster_id)
if [[ "$safe_point1" == "$safe_point2" ]]; then
echo "safepoint $safe_point1 is not forward"
exit 1
fi
}

function check_safepoint_equal() {
pd_addr=$1
pd_cluster_id=$2
safe_point1=$(get_safepoint $pd_addr $pd_cluster_id)
for i in $(seq 1 3); do
sleep 1
safe_point2=$(get_safepoint $pd_addr $pd_cluster_id)
if [[ "$safe_point1" != "$safe_point2" ]]; then
echo "safepoint is unexpected forward: $safe_point1 -> $safe_point2"
exit 1
fi
done
}

function check_changefeed_state() {
pd_addr=$1
changefeed_id=$2
expected=$3
state=$(cdc cli --pd=$pd_addr changefeed query -s -c $changefeed_id|jq -r ".state")
if [[ "$state" != "$expected" ]];then
echo "unexpected state $state, expected $expected"
exit 1
fi
}

export -f get_safepoint
export -f check_safepoint_forward
export -f check_safepoint_equal
export -f check_changefeed_state

function run() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR
start_tidb_cluster --workdir $WORK_DIR
cd $WORK_DIR

pd_addr="http://$UP_PD_HOST:$UP_PD_PORT"
TOPIC_NAME="ticdc-gc-safepoint-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4";;
*) SINK_URI="mysql://root@127.0.0.1:3306/?max-txn-row=1";;
esac
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4"
fi
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr
changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}')

run_sql "CREATE DATABASE gc_safepoint;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "CREATE table gc_safepoint.simple(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT INTO gc_safepoint.simple VALUES (),();" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

pd_cluster_id=$(curl -s $pd_addr/pd/api/v1/cluster|grep -oE "id\":\s[0-9]+"|grep -oE "[0-9]+")
ensure $MAX_RETRIES check_safepoint_forward $pd_addr $pd_cluster_id

# after the changefeed is paused, the safe_point will be not updated
cdc cli changefeed pause --changefeed-id=$changefeed_id --pd=$pd_addr
ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "stopped"
ensure $MAX_RETRIES check_safepoint_equal $pd_addr $pd_cluster_id

# resume changefeed will recover the safe_point forward
cdc cli changefeed resume --changefeed-id=$changefeed_id --pd=$pd_addr
ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "normal"
ensure $MAX_RETRIES check_safepoint_forward $pd_addr $pd_cluster_id

cdc cli changefeed pause --changefeed-id=$changefeed_id --pd=$pd_addr
ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "stopped"
# create another changefeed, because there exists a paused changefeed,
# the safe_point still does not forward
changefeed_id2=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}')
ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id2 "normal"
ensure $MAX_RETRIES check_safepoint_equal $pd_addr $pd_cluster_id

# remove paused changefeed, the safe_point forward will recover
cdc cli changefeed remove --changefeed-id=$changefeed_id --pd=$pd_addr
ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "removed"
ensure $MAX_RETRIES check_safepoint_forward $pd_addr $pd_cluster_id

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"

0 comments on commit 2466328

Please sign in to comment.