Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

owner: includes stopped changefeed when calculating gc safepoint #797

Merged
merged 6 commits into from
Aug 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 55 additions & 19 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,13 @@ import (

// Owner manages the cdc cluster
type Owner struct {
done chan struct{}
session *concurrency.Session
changeFeeds map[model.ChangeFeedID]*changeFeed
failedFeeds map[model.ChangeFeedID]struct{}
done chan struct{}
session *concurrency.Session
changeFeeds map[model.ChangeFeedID]*changeFeed
// failInitFeeds record changefeeds that meet error during initialization
failInitFeeds map[model.ChangeFeedID]struct{}
// stoppedFeeds record changefeeds that meet running error
stoppedFeeds map[model.ChangeFeedID]*model.ChangeFeedStatus
rebalanceTigger map[model.ChangeFeedID]bool
rebalanceForAllChangefeed bool
manualScheduleCommand map[model.ChangeFeedID][]*model.MoveTableJob
Expand All @@ -73,6 +76,8 @@ type Owner struct {

// gcTTL is the ttl of cdc gc safepoint ttl.
gcTTL int64
// whether gc safepoint is set in pd
gcSafePointSet bool
}

// CDCServiceSafePointID is the ID of CDC service in pd.UpdateServiceGCSafePoint.
Expand All @@ -89,7 +94,8 @@ func NewOwner(pdClient pd.Client, credential *security.Credential, sess *concurr
pdClient: pdClient,
credential: credential,
changeFeeds: make(map[model.ChangeFeedID]*changeFeed),
failedFeeds: make(map[model.ChangeFeedID]struct{}),
failInitFeeds: make(map[model.ChangeFeedID]struct{}),
stoppedFeeds: make(map[model.ChangeFeedID]*model.ChangeFeedStatus),
captures: make(map[model.CaptureID]*model.CaptureInfo),
rebalanceTigger: make(map[model.ChangeFeedID]bool),
manualScheduleCommand: make(map[model.ChangeFeedID][]*model.MoveTableJob),
Expand Down Expand Up @@ -413,16 +419,16 @@ func (o *Owner) loadChangeFeeds(ctx context.Context) error {
return err
}
if cfInfo.State == model.StateFailed {
if _, ok := o.failedFeeds[changeFeedID]; ok {
if _, ok := o.failInitFeeds[changeFeedID]; ok {
continue
}
log.Warn("changefeed is not in normal state", zap.String("changefeed", changeFeedID))
o.failedFeeds[changeFeedID] = struct{}{}
o.failInitFeeds[changeFeedID] = struct{}{}
continue
}
if _, ok := o.failedFeeds[changeFeedID]; ok {
if _, ok := o.failInitFeeds[changeFeedID]; ok {
log.Info("changefeed recovered from failure", zap.String("changefeed", changeFeedID))
delete(o.failedFeeds, changeFeedID)
delete(o.failInitFeeds, changeFeedID)
}
needSave, canInit := cfInfo.CheckErrorHistory()
if needSave {
Expand All @@ -448,6 +454,11 @@ func (o *Owner) loadChangeFeeds(ctx context.Context) error {
return err
}
if status != nil && status.AdminJobType.IsStopState() {
if status.AdminJobType == model.AdminStop {
if _, ok := o.stoppedFeeds[changeFeedID]; !ok {
o.stoppedFeeds[changeFeedID] = status
}
}
continue
}
checkpointTs := cfInfo.GetCheckpointTs(status)
Expand Down Expand Up @@ -479,6 +490,7 @@ func (o *Owner) loadChangeFeeds(ctx context.Context) error {
return errors.Annotatef(err, "create change feed %s", changeFeedID)
}
o.changeFeeds[changeFeedID] = newCf
delete(o.stoppedFeeds, changeFeedID)
}
o.adminJobsLock.Lock()
for cfID, err := range errorFeeds {
Expand Down Expand Up @@ -526,26 +538,43 @@ func (o *Owner) balanceTables(ctx context.Context) error {
}

func (o *Owner) flushChangeFeedInfos(ctx context.Context) error {
if len(o.changeFeeds) == 0 {
// no running or stopped changefeed, clear gc safepoint.
if len(o.changeFeeds) == 0 && len(o.stoppedFeeds) == 0 {
if o.gcSafePointSet {
_, err := o.pdClient.UpdateServiceGCSafePoint(ctx, CDCServiceSafePointID, 0, 0)
if err != nil {
return errors.Trace(err)
}
o.gcSafePointSet = false
}
return nil
}
snapshot := make(map[model.ChangeFeedID]*model.ChangeFeedStatus, len(o.changeFeeds))

minCheckpointTs := uint64(math.MaxUint64)
for id, changefeed := range o.changeFeeds {
snapshot[id] = changefeed.status
if changefeed.status.CheckpointTs < minCheckpointTs {
minCheckpointTs = changefeed.status.CheckpointTs
if len(o.changeFeeds) > 0 {
snapshot := make(map[model.ChangeFeedID]*model.ChangeFeedStatus, len(o.changeFeeds))
for id, changefeed := range o.changeFeeds {
snapshot[id] = changefeed.status
if changefeed.status.CheckpointTs < minCheckpointTs {
minCheckpointTs = changefeed.status.CheckpointTs
}
}
err := o.cfRWriter.PutAllChangeFeedStatus(ctx, snapshot)
if err != nil {
return errors.Trace(err)
}
}
err := o.cfRWriter.PutAllChangeFeedStatus(ctx, snapshot)
if err != nil {
return errors.Trace(err)
for _, status := range o.stoppedFeeds {
if status.CheckpointTs < minCheckpointTs {
minCheckpointTs = status.CheckpointTs
}
}
_, err = o.pdClient.UpdateServiceGCSafePoint(ctx, CDCServiceSafePointID, o.gcTTL, minCheckpointTs)
_, err := o.pdClient.UpdateServiceGCSafePoint(ctx, CDCServiceSafePointID, o.gcTTL, minCheckpointTs)
if err != nil {
log.Info("failed to update service safe point", zap.Error(err))
return errors.Trace(err)
}
o.gcSafePointSet = true
return nil
}

Expand Down Expand Up @@ -609,6 +638,12 @@ func (o *Owner) dispatchJob(ctx context.Context, job model.AdminJob) error {
log.Info("stop changefeed ddl handler", zap.String("changefeed id", job.CfID), util.ZapErrorFilter(err, context.Canceled))
err = cf.sink.Close()
log.Info("stop changefeed sink", zap.String("changefeed id", job.CfID), util.ZapErrorFilter(err, context.Canceled))
// Only need to process stoppedFeeds with `AdminStop` command here.
// For `AdminResume`, we remove stopped feed in changefeed initialization phase.
// For `AdminRemove`, we need to update stoppedFeeds when removing a stopped changefeed.
if job.Type == model.AdminStop {
o.stoppedFeeds[job.CfID] = cf.status
}
delete(o.changeFeeds, job.CfID)
return nil
}
Expand Down Expand Up @@ -755,6 +790,7 @@ func (o *Owner) handleAdminJob(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
delete(o.stoppedFeeds, job.CfID)
default:
return errors.Errorf("changefeed in abnormal state: %s, replication status: %+v", feedState, status)
}
Expand Down
27 changes: 27 additions & 0 deletions tests/gc_safepoint/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# diff Configuration.

log-level = "info"
chunk-size = 10
check-thread-count = 4
sample-percent = 100
use-rowid = false
use-checksum = true
fix-sql-file = "fix.sql"

# tables need to check.
[[check-tables]]
schema = "gc_safepoint"
tables = ["simple"]

[[source-db]]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""
instance-id = "source-1"

[target-db]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
129 changes: 129 additions & 0 deletions tests/gc_safepoint/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#!/bin/bash

set -e

CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1
MAX_RETRIES=5

function get_safepoint() {
pd_addr=$1
pd_cluster_id=$2
safe_point=$(etcdctl get --endpoints=$pd_addr /pd/$pd_cluster_id/gc/safe_point/service/ticdc|grep -oE "SafePoint\":[0-9]+"|grep -oE "[0-9]+")
echo $safe_point
}

function check_safepoint_cleared() {
pd_addr=$1
pd_cluster_id=$2
query=$(etcdctl get --endpoints=$pd_addr /pd/$pd_cluster_id/gc/safe_point/service/ticdc)
if [ ! -z "$query" ]; then
echo "gc safepoint is not cleared: $query"
fi
}

function check_safepoint_forward() {
pd_addr=$1
pd_cluster_id=$2
safe_point1=$(get_safepoint $pd_addr $pd_cluster_id)
sleep 1
safe_point2=$(get_safepoint $pd_addr $pd_cluster_id)
if [[ "$safe_point1" == "$safe_point2" ]]; then
echo "safepoint $safe_point1 is not forward"
exit 1
fi
}

function check_safepoint_equal() {
pd_addr=$1
pd_cluster_id=$2
safe_point1=$(get_safepoint $pd_addr $pd_cluster_id)
for i in $(seq 1 3); do
sleep 1
safe_point2=$(get_safepoint $pd_addr $pd_cluster_id)
if [[ "$safe_point1" != "$safe_point2" ]]; then
echo "safepoint is unexpected forward: $safe_point1 -> $safe_point2"
exit 1
fi
done
}

function check_changefeed_state() {
pd_addr=$1
changefeed_id=$2
expected=$3
state=$(cdc cli --pd=$pd_addr changefeed query -s -c $changefeed_id|jq -r ".state")
if [[ "$state" != "$expected" ]];then
echo "unexpected state $state, expected $expected"
exit 1
fi
}

export -f get_safepoint
export -f check_safepoint_forward
export -f check_safepoint_cleared
export -f check_safepoint_equal
export -f check_changefeed_state

function run() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR
start_tidb_cluster --workdir $WORK_DIR
cd $WORK_DIR

pd_addr="http://$UP_PD_HOST:$UP_PD_PORT"
TOPIC_NAME="ticdc-gc-safepoint-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4";;
*) SINK_URI="mysql://root@127.0.0.1:3306/?max-txn-row=1";;
esac
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4"
fi
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr
changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}')

run_sql "CREATE DATABASE gc_safepoint;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "CREATE table gc_safepoint.simple(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT INTO gc_safepoint.simple VALUES (),();" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

pd_cluster_id=$(curl -s $pd_addr/pd/api/v1/cluster|grep -oE "id\":\s[0-9]+"|grep -oE "[0-9]+")
ensure $MAX_RETRIES check_safepoint_forward $pd_addr $pd_cluster_id

# after the changefeed is paused, the safe_point will be not updated
cdc cli changefeed pause --changefeed-id=$changefeed_id --pd=$pd_addr
ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "stopped"
ensure $MAX_RETRIES check_safepoint_equal $pd_addr $pd_cluster_id

# resume changefeed will recover the safe_point forward
cdc cli changefeed resume --changefeed-id=$changefeed_id --pd=$pd_addr
ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "normal"
ensure $MAX_RETRIES check_safepoint_forward $pd_addr $pd_cluster_id

cdc cli changefeed pause --changefeed-id=$changefeed_id --pd=$pd_addr
ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "stopped"
# create another changefeed, because there exists a paused changefeed,
# the safe_point still does not forward
changefeed_id2=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}')
ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id2 "normal"
ensure $MAX_RETRIES check_safepoint_equal $pd_addr $pd_cluster_id

# remove paused changefeed, the safe_point forward will recover
cdc cli changefeed remove --changefeed-id=$changefeed_id --pd=$pd_addr
ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "removed"
ensure $MAX_RETRIES check_safepoint_forward $pd_addr $pd_cluster_id

# remove all changefeeds, the safe_point will be cleared
cdc cli changefeed remove --changefeed-id=$changefeed_id2 --pd=$pd_addr
ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id2 "removed"
ensure $MAX_RETRIES check_safepoint_cleared $pd_addr $pd_cluster_id

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"