From b181f9d24251e3f9a08f60b3ef31f1628e3308a4 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Tue, 24 Nov 2020 20:13:53 +0800 Subject: [PATCH] Temporary fix for ErrSnapshotSchemaNotFound and ErrSchemaStorageGCed (#1069) (#1114) Signed-off-by: ti-srebot --- cdc/changefeed.go | 6 +- cdc/owner.go | 5 + cdc/processor.go | 20 +- pkg/notify/notify_test.go | 4 +- tests/move_table/conf/diff_config.toml | 27 +++ tests/move_table/conf/workload | 13 ++ tests/move_table/main.go | 277 +++++++++++++++++++++++++ tests/move_table/run.sh | 60 ++++++ 8 files changed, 408 insertions(+), 4 deletions(-) create mode 100644 tests/move_table/conf/diff_config.toml create mode 100644 tests/move_table/conf/workload create mode 100644 tests/move_table/main.go create mode 100644 tests/move_table/run.sh diff --git a/cdc/changefeed.go b/cdc/changefeed.go index 2eea17a3447..5a1b6e52dfe 100644 --- a/cdc/changefeed.go +++ b/cdc/changefeed.go @@ -525,13 +525,15 @@ func (c *changeFeed) handleMoveTableJobs(ctx context.Context, captures map[model case model.MoveTableStatusDeleted: // add table to target capture status, exist := cloneStatus(job.To) + replicaInfo := job.TableReplicaInfo.Clone() + replicaInfo.StartTs = c.status.CheckpointTs if !exist { // the target capture is not exist, add table to orphanTables. - c.orphanTables[tableID] = job.TableReplicaInfo.StartTs + c.orphanTables[tableID] = replicaInfo.StartTs log.Warn("the target capture is not exist, sent the table to orphanTables", zap.Reflect("job", job)) continue } - status.AddTable(tableID, job.TableReplicaInfo, job.TableReplicaInfo.StartTs) + status.AddTable(tableID, replicaInfo, c.status.CheckpointTs) job.Status = model.MoveTableStatusFinished delete(c.moveTableJobs, tableID) log.Info("handle the move job, add table to the target capture", zap.Reflect("job", job)) diff --git a/cdc/owner.go b/cdc/owner.go index 01bc8dcfc4d..a4755395e6f 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -1222,6 +1222,11 @@ func (o *Owner) cleanUpStaleTasks(ctx context.Context, captures []*model.Capture captureIDs[captureID] = struct{}{} } + log.Debug("cleanUpStaleTasks", + zap.Reflect("statuses", statuses), + zap.Reflect("positions", positions), + zap.Reflect("workloads", workloads)) + for captureID := range captureIDs { if _, ok := active[captureID]; !ok { status, ok1 := statuses[captureID] diff --git a/cdc/processor.go b/cdc/processor.go index 4d78b671f7f..4f42f71d18b 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -61,6 +61,8 @@ const ( defaultMemBufferCapacity int64 = 10 * 1024 * 1024 * 1024 // 10G defaultSyncResolvedBatch = 1024 + + schemaStorageGCLag = time.Minute * 20 ) var ( @@ -86,6 +88,7 @@ type processor struct { globalResolvedTs uint64 localResolvedTs uint64 checkpointTs uint64 + globalcheckpointTs uint64 flushCheckpointInterval time.Duration ddlPuller puller.Puller @@ -664,12 +667,17 @@ func (p *processor) globalStatusWorker(ctx context.Context) error { defer globalResolvedTsNotifier.Close() updateStatus := func(changefeedStatus *model.ChangeFeedStatus) { + atomic.StoreUint64(&p.globalcheckpointTs, changefeedStatus.CheckpointTs) if lastResolvedTs == changefeedStatus.ResolvedTs && lastCheckPointTs == changefeedStatus.CheckpointTs { return } if lastCheckPointTs < changefeedStatus.CheckpointTs { - p.schemaStorage.DoGC(changefeedStatus.CheckpointTs) + // Delay GC to accommodate pullers starting from a startTs that's too small + // TODO fix startTs problem and remove GC delay, or use other mechanism that prevents the problem deterministically + gcTime := oracle.GetTimeFromTS(changefeedStatus.CheckpointTs).Add(-schemaStorageGCLag) + gcTs := oracle.ComposeTS(gcTime.Unix(), 0) + p.schemaStorage.DoGC(gcTs) lastCheckPointTs = changefeedStatus.CheckpointTs } if lastResolvedTs < changefeedStatus.ResolvedTs { @@ -937,6 +945,16 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo return } } + + globalcheckpointTs := atomic.LoadUint64(&p.globalcheckpointTs) + + if replicaInfo.StartTs < globalcheckpointTs { + log.Warn("addTable: startTs < checkpoint", + zap.Int64("tableID", tableID), + zap.Uint64("checkpoint", globalcheckpointTs), + zap.Uint64("startTs", replicaInfo.StartTs)) + } + globalResolvedTs := atomic.LoadUint64(&p.sinkEmittedResolvedTs) log.Debug("Add table", zap.Int64("tableID", tableID), zap.String("name", tableName), diff --git a/pkg/notify/notify_test.go b/pkg/notify/notify_test.go index abcc7a9978e..cb737a4b9c4 100644 --- a/pkg/notify/notify_test.go +++ b/pkg/notify/notify_test.go @@ -36,11 +36,13 @@ func (s *notifySuite) TestNotifyHub(c *check.C) { r1 := notifier.NewReceiver(-1) r2 := notifier.NewReceiver(-1) r3 := notifier.NewReceiver(-1) + finishedCh := make(chan struct{}) go func() { for i := 0; i < 5; i++ { time.Sleep(time.Second) notifier.Notify() } + close(finishedCh) }() <-r1.C r1.Stop() @@ -50,7 +52,6 @@ func (s *notifySuite) TestNotifyHub(c *check.C) { r2.Stop() r3.Stop() c.Assert(len(notifier.receivers), check.Equals, 0) - time.Sleep(time.Second) r4 := notifier.NewReceiver(-1) <-r4.C r4.Stop() @@ -59,6 +60,7 @@ func (s *notifySuite) TestNotifyHub(c *check.C) { r5 := notifier2.NewReceiver(10 * time.Millisecond) <-r5.C r5.Stop() + <-finishedCh // To make the leak checker happy } func (s *notifySuite) TestContinusStop(c *check.C) { diff --git a/tests/move_table/conf/diff_config.toml b/tests/move_table/conf/diff_config.toml new file mode 100644 index 00000000000..9c31c91b2fd --- /dev/null +++ b/tests/move_table/conf/diff_config.toml @@ -0,0 +1,27 @@ +# diff Configuration. + +log-level = "info" +chunk-size = 10 +check-thread-count = 4 +sample-percent = 100 +use-rowid = false +use-checksum = true +fix-sql-file = "fix.sql" + +# tables need to check. +[[check-tables]] + schema = "move_table" + tables = ["~usertable.*"] + +[[source-db]] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + instance-id = "source-1" + +[target-db] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/move_table/conf/workload b/tests/move_table/conf/workload new file mode 100644 index 00000000000..5b9ca3189fc --- /dev/null +++ b/tests/move_table/conf/workload @@ -0,0 +1,13 @@ +threadcount=10 +recordcount=60000 +operationcount=0 +workload=core + +readallfields=true + +readproportion=0 +updateproportion=0 +scanproportion=0 +insertproportion=0 + +requestdistribution=uniform diff --git a/tests/move_table/main.go b/tests/move_table/main.go new file mode 100644 index 00000000000..4f255c891cc --- /dev/null +++ b/tests/move_table/main.go @@ -0,0 +1,277 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +// This is a program that drives the CDC cluster to move a table +package main + +import ( + "bytes" + "context" + "flag" + "fmt" + "io/ioutil" + "net/http" + "strings" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/kv" + "github.com/pingcap/ticdc/pkg/retry" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/pkg/logutil" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" +) + +var pd = flag.String("pd", "http://127.0.0.1:2379", "PD address and port") +var logLevel = flag.String("log-level", "debug", "Set log level of the logger") + +func main() { + flag.Parse() + if strings.ToLower(*logLevel) == "debug" { + log.SetLevel(zapcore.DebugLevel) + } + + log.Info("table mover started") + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + + cluster, err := newCluster(ctx, *pd) + if err != nil { + log.Fatal("failed to create cluster info", zap.Error(err)) + } + + err = retry.Run(100*time.Millisecond, 20, func() error { + err := cluster.refreshInfo(ctx) + if err != nil { + log.Warn("error refreshing cluster info", zap.Error(err)) + } + + log.Info("task status", zap.Reflect("status", cluster.captures)) + + if len(cluster.captures) <= 1 { + return errors.New("too few captures") + } + return nil + }) + + if err != nil { + log.Fatal("Fail to get captures", zap.Error(err)) + } + + var sourceCapture string + + for capture, tables := range cluster.captures { + if len(tables) == 0 { + continue + } + sourceCapture = capture + break + } + + var targetCapture string + + for candidateCapture := range cluster.captures { + if candidateCapture != sourceCapture { + targetCapture = candidateCapture + } + } + + if targetCapture == "" { + log.Fatal("no target, unexpected") + } + + // move all tables to another capture + for _, table := range cluster.captures[sourceCapture] { + err = moveTable(ctx, cluster.ownerAddr, table.Changefeed, targetCapture, table.ID) + if err != nil { + log.Warn("failed to move table", zap.Error(err)) + continue + } + + log.Info("moved table successful", zap.Int64("tableID", table.ID)) + } + + log.Info("all tables are moved", zap.String("sourceCapture", sourceCapture), zap.String("targetCapture", targetCapture)) + + for counter := 0; counter < 30; counter++ { + err := retry.Run(100*time.Millisecond, 5, func() error { + return cluster.refreshInfo(ctx) + }) + + if err != nil { + log.Warn("error refreshing cluster info", zap.Error(err)) + } + + tables, ok := cluster.captures[sourceCapture] + if !ok { + log.Warn("source capture is gone", zap.String("sourceCapture", sourceCapture)) + break + } + + if len(tables) == 0 { + log.Info("source capture is now empty", zap.String("sourceCapture", sourceCapture)) + break + } + + if counter != 30 { + log.Debug("source capture is not empty, will try again", zap.String("sourceCapture", sourceCapture)) + time.Sleep(time.Second * 10) + } + } +} + +type tableInfo struct { + ID int64 + Changefeed string +} + +type cluster struct { + ownerAddr string + captures map[string][]*tableInfo + cdcEtcdCli kv.CDCEtcdClient +} + +func newCluster(ctx context.Context, pd string) (*cluster, error) { + logConfig := logutil.DefaultZapLoggerConfig + logConfig.Level = zap.NewAtomicLevelAt(zapcore.ErrorLevel) + + etcdCli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{pd}, + TLS: nil, + Context: ctx, + LogConfig: &logConfig, + DialTimeout: 5 * time.Second, + DialOptions: []grpc.DialOption{ + grpc.WithInsecure(), + grpc.WithBlock(), + grpc.WithConnectParams(grpc.ConnectParams{ + Backoff: backoff.Config{ + BaseDelay: time.Second, + Multiplier: 1.1, + Jitter: 0.1, + MaxDelay: 3 * time.Second, + }, + MinConnectTimeout: 3 * time.Second, + }), + }, + }) + + if err != nil { + return nil, errors.Trace(err) + } + + ret := &cluster{ + ownerAddr: "", + captures: nil, + cdcEtcdCli: kv.NewCDCEtcdClient(ctx, etcdCli), + } + + log.Info("new cluster initialized") + + return ret, nil +} + +func (c *cluster) refreshInfo(ctx context.Context) error { + ownerID, err := c.cdcEtcdCli.GetOwnerID(ctx, kv.CaptureOwnerKey) + if err != nil { + return errors.Trace(err) + } + + log.Debug("retrieved owner ID", zap.String("ownerID", ownerID)) + + captureInfo, err := c.cdcEtcdCli.GetCaptureInfo(ctx, ownerID) + if err != nil { + return errors.Trace(err) + } + + log.Debug("retrieved owner addr", zap.String("ownerAddr", captureInfo.AdvertiseAddr)) + c.ownerAddr = captureInfo.AdvertiseAddr + + _, changefeeds, err := c.cdcEtcdCli.GetChangeFeeds(ctx) + if err != nil { + return errors.Trace(err) + } + if len(changefeeds) == 0 { + return errors.New("No changefeed") + } + + log.Debug("retrieved changefeeds", zap.Reflect("changefeeds", changefeeds)) + + var changefeed string + for k := range changefeeds { + changefeed = k + break + } + + c.captures = make(map[string][]*tableInfo) + _, captures, err := c.cdcEtcdCli.GetCaptures(ctx) + if err != nil { + return errors.Trace(err) + } + for _, capture := range captures { + c.captures[capture.ID] = make([]*tableInfo, 0) + } + + allTasks, err := c.cdcEtcdCli.GetAllTaskStatus(ctx, changefeed) + if err != nil { + return errors.Trace(err) + } + + log.Debug("retrieved all tasks", zap.Reflect("tasks", allTasks)) + + for capture, taskInfo := range allTasks { + if _, ok := c.captures[capture]; !ok { + c.captures[capture] = make([]*tableInfo, 0, len(taskInfo.Tables)) + } + + for tableID := range taskInfo.Tables { + c.captures[capture] = append(c.captures[capture], &tableInfo{ + ID: tableID, + Changefeed: changefeed, + }) + } + } + + return nil +} + +func moveTable(ctx context.Context, ownerAddr string, changefeed string, target string, tableID int64) error { + formStr := fmt.Sprintf("cf-id=%s&target-cp-id=%s&table-id=%d", changefeed, target, tableID) + log.Debug("preparing HTTP API call to owner", zap.String("formStr", formStr)) + rd := bytes.NewReader([]byte(formStr)) + req, err := http.NewRequestWithContext(ctx, "POST", "http://"+ownerAddr+"/capture/owner/move_table", rd) + if err != nil { + return errors.Trace(err) + } + + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + resp, err := http.DefaultClient.Do(req) + if err != nil { + return errors.Trace(err) + } + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return errors.Trace(err) + } + log.Warn("http error", zap.ByteString("body", body)) + return errors.New(resp.Status) + } + + return nil +} diff --git a/tests/move_table/run.sh b/tests/move_table/run.sh new file mode 100644 index 00000000000..c363916c9d0 --- /dev/null +++ b/tests/move_table/run.sh @@ -0,0 +1,60 @@ +#!/bin/bash + +set -e + +CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + + +function run() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + start_ts=$(run_cdc_cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) + run_sql "CREATE DATABASE move_table;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=move_table + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "debug" --logsuffix "1" --addr 127.0.0.1:8300 + + TOPIC_NAME="ticdc-move-table-test-$RANDOM" + case $SINK_TYPE in + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4";; + *) SINK_URI="mysql://root@127.0.0.1:3306/?max-txn-row=1";; + esac + + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4" + fi + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "debug" --logsuffix "2" --addr 127.0.0.1:8301 + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "debug" --logsuffix "3" --addr 127.0.0.1:8302 + + # Add a check table to reduce check time, or if we check data with sync diff + # directly, there maybe a lot of diff data at first because of the incremental scan + run_sql "CREATE table move_table.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "move_table.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + cd $CUR + GO111MODULE=on go run main.go 2>&1 | tee $WORK_DIR/tester.log & + cd $WORK_DIR + + check_table_exists "move_table.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + run_sql "truncate table move_table.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + run_sql "CREATE table move_table.check2(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "move_table.check2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"