From 48585a7823e2166bb7c570596ad71b5b6bcdac99 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Mon, 19 Dec 2022 12:50:54 +0800 Subject: [PATCH] ddl: retry prepare RPC when meets region error (#39834) close pingcap/tidb#39836 --- ddl/BUILD.bazel | 1 + ddl/cluster.go | 28 +++++++++-- tests/realtikvtest/brietest/BUILD.bazel | 1 + tests/realtikvtest/brietest/flashback_test.go | 46 +++++++++++++++++++ 4 files changed, 73 insertions(+), 3 deletions(-) diff --git a/ddl/BUILD.bazel b/ddl/BUILD.bazel index 5d9d554f7b873..dc179250ad4bd 100644 --- a/ddl/BUILD.bazel +++ b/ddl/BUILD.bazel @@ -120,6 +120,7 @@ go_library( "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_kvproto//pkg/errorpb", "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_log//:log", diff --git a/ddl/cluster.go b/ddl/cluster.go index 96a7cd8544abb..fbcfa9cd8a49f 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/domain/infosync" @@ -324,15 +325,36 @@ func SendPrepareFlashbackToVersionRPC( if err != nil { return taskStat, err } + failpoint.Inject("mockPrepareMeetsEpochNotMatch", func(val failpoint.Value) { + if val.(bool) && bo.ErrorsNum() == 0 { + regionErr = &errorpb.Error{ + Message: "stale epoch", + EpochNotMatch: &errorpb.EpochNotMatch{}, + } + } + }) if regionErr != nil { - return taskStat, errors.Errorf(regionErr.String()) + err = bo.Backoff(tikv.BoRegionMiss(), errors.New(regionErr.String())) + if err != nil { + return taskStat, err + } + continue } if resp.Resp == nil { - return taskStat, errors.Errorf("prepare flashback missing resp body") + logutil.BgLogger().Warn("prepare flashback miss resp body", zap.Uint64("region_id", loc.Region.GetID())) + err = bo.Backoff(tikv.BoTiKVRPC(), errors.New("prepare flashback rpc miss resp body")) + if err != nil { + return taskStat, err + } + continue } prepareFlashbackToVersionResp := resp.Resp.(*kvrpcpb.PrepareFlashbackToVersionResponse) if err := prepareFlashbackToVersionResp.GetError(); err != "" { - return taskStat, errors.Errorf(err) + boErr := bo.Backoff(tikv.BoTiKVRPC(), errors.New(err)) + if boErr != nil { + return taskStat, boErr + } + continue } taskStat.CompletedRegions++ if isLast { diff --git a/tests/realtikvtest/brietest/BUILD.bazel b/tests/realtikvtest/brietest/BUILD.bazel index 49ea32406c7d6..62de71ea3b77d 100644 --- a/tests/realtikvtest/brietest/BUILD.bazel +++ b/tests/realtikvtest/brietest/BUILD.bazel @@ -14,6 +14,7 @@ go_test( deps = [ "//config", "//ddl/util", + "//parser/model", "//parser/mysql", "//sessionctx/binloginfo", "//store/mockstore/mockcopr", diff --git a/tests/realtikvtest/brietest/flashback_test.go b/tests/realtikvtest/brietest/flashback_test.go index 322359fff411a..470a62fb90d93 100644 --- a/tests/realtikvtest/brietest/flashback_test.go +++ b/tests/realtikvtest/brietest/flashback_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/failpoint" ddlutil "github.com/pingcap/tidb/ddl/util" + "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/tests/realtikvtest" "github.com/stretchr/testify/require" @@ -90,3 +91,48 @@ func TestFlashback(t *testing.T) { require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) } } + +func TestPrepareFlashbackFailed(t *testing.T) { + if *realtikvtest.WithRealTiKV { + store := realtikvtest.CreateMockStoreAndSetup(t) + + tk := testkit.NewTestKit(t, store) + + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) + defer resetGC() + + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, index i(a))") + tk.MustExec("insert t values (1), (2), (3)") + + time.Sleep(1 * time.Second) + + ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + + injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) + require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockPrepareMeetsEpochNotMatch", `return(true)`)) + + tk.MustExec("insert t values (4), (5), (6)") + tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) + + tk.MustExec("admin check table t") + require.Equal(t, tk.MustQuery("select max(a) from t").Rows()[0][0], "3") + require.Equal(t, tk.MustQuery("select max(a) from t use index(i)").Rows()[0][0], "3") + + jobMeta := tk.MustQuery("select job_meta from mysql.tidb_ddl_history order by job_id desc limit 1").Rows()[0][0].(string) + job := model.Job{} + require.NoError(t, job.Decode([]byte(jobMeta))) + require.Equal(t, job.ErrorCount, int64(0)) + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) + require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockPrepareMeetsEpochNotMatch")) + } +}