From 4970a67034fb64d51e1718d0da1bc30cf91cc3a8 Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Thu, 15 Sep 2022 18:11:00 +0800 Subject: [PATCH 01/29] *: add startTS, commitTS and close auto analyze for flashback --- ddl/cluster.go | 169 ++++++++++-------- ddl/cluster_test.go | 34 +++- ddl/ddl_api.go | 5 +- go.mod | 2 +- go.sum | 4 +- meta/meta.go | 54 ++++++ sessionctx/context.go | 13 +- tests/realtikvtest/brietest/flashback_test.go | 1 - 8 files changed, 194 insertions(+), 88 deletions(-) diff --git a/ddl/cluster.go b/ddl/cluster.go index 8483ca433d7ff..dddd2d1e45822 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -37,7 +37,6 @@ import ( "github.com/pingcap/tidb/util/filter" "github.com/pingcap/tidb/util/gcutil" "github.com/pingcap/tidb/util/logutil" - tikverr "github.com/tikv/client-go/v2/error" tikvstore "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" @@ -56,11 +55,14 @@ var pdScheduleKey = []string{ } const ( - flashbackMaxBackoff = 300000 // 300s - flashbackTimeout = 30 * time.Second // 30s - - readOnlyArgsOffset = 2 - gcEnabledArgsOffset = 3 + flashbackMaxBackoff = 1800000 // 1800s + flashbackTimeout = 3 * time.Minute // 3min + + readOnlyArgsOffset = 2 + gcEnabledArgsOffset = 3 + autoAnalyzeOffset = 4 + maxAutoAnalyzeTimeOffset = 5 + minSafeTimeOffset = 6 ) func closePDSchedule() error { @@ -129,11 +131,35 @@ func getTiDBSuperReadOnly(sess sessionctx.Context) (string, error) { return val, nil } +func setTiDBEnableAutoAnalyze(sess sessionctx.Context, value string) error { + return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiDBEnableAutoAnalyze, value) +} + +func getTiDBMaxAutoAnalyzeTime(sess sessionctx.Context) (string, error) { + val, err := sess.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBMaxAutoAnalyzeTime) + if err != nil { + return "", errors.Trace(err) + } + return val, nil +} + +func setTiDBMaxAutoAnalyzeTime(sess sessionctx.Context, value string) error { + return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiDBMaxAutoAnalyzeTime, value) +} + +func getTiDBEnableAutoAnalyze(sess sessionctx.Context) (string, error) { + val, err := sess.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBEnableAutoAnalyze) + if err != nil { + return "", errors.Trace(err) + } + return val, nil +} + func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta.Meta, job *model.Job, flashbackTS uint64) (err error) { if err = ValidateFlashbackTS(d.ctx, sess, flashbackTS); err != nil { return err } - if err = CheckFlashbackHistoryTSRange(t, flashbackTS); err != nil { + if err = meta.CheckFlashbackHistoryTSRange(t, flashbackTS); err != nil { return err } @@ -146,6 +172,12 @@ func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta if err = setTiDBSuperReadOnly(sess, variable.On); err != nil { return err } + if err = setTiDBEnableAutoAnalyze(sess, variable.Off); err != nil { + return err + } + if err = setTiDBMaxAutoAnalyzeTime(sess, "1"); err != nil { + return err + } nowSchemaVersion, err := t.GetSchemaVersion() if err != nil { @@ -158,7 +190,7 @@ func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta } // If flashbackSchemaVersion not same as nowSchemaVersion, we've done ddl during [flashbackTs, now). if flashbackSchemaVersion != nowSchemaVersion { - return errors.Errorf("schema version not same, have done ddl during [flashbackTS, now)") + return errors.Errorf("Had ddl history during [%s, now), can't do flashback", oracle.GetTimeFromTS(flashbackTS)) } jobs, err := GetAllDDLJobs(sess, t) @@ -270,6 +302,7 @@ func sendFlashbackToVersionRPC( ) (rangetask.TaskStat, error) { startKey, rangeEndKey := r.StartKey, r.EndKey var taskStat rangetask.TaskStat + bo := tikv.NewBackoffer(ctx, flashbackMaxBackoff) for { select { case <-ctx.Done(): @@ -281,7 +314,6 @@ func sendFlashbackToVersionRPC( break } - bo := tikv.NewBackoffer(ctx, flashbackMaxBackoff) loc, err := s.GetRegionCache().LocateKey(bo, startKey) if err != nil { return taskStat, err @@ -294,10 +326,22 @@ func sendFlashbackToVersionRPC( endKey = rangeEndKey } + // Because flashback has no guarantee of transactional consistency, + // we could use different startTS and commitTS for each region. + startTS, err := s.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + if err != nil { + return taskStat, err + } + commitTS, err := s.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + if err != nil { + return taskStat, err + } req := tikvrpc.NewRequest(tikvrpc.CmdFlashbackToVersion, &kvrpcpb.FlashbackToVersionRequest{ Version: version, StartKey: startKey, EndKey: endKey, + StartTs: startTS, + CommitTs: commitTS, }) resp, err := s.SendReq(bo, req, loc.Region, flashbackTimeout) @@ -316,16 +360,19 @@ func sendFlashbackToVersionRPC( continue } if resp.Resp == nil { - return taskStat, errors.WithStack(tikverr.ErrBodyMissing) + logutil.BgLogger().Warn("flashback missing resp body") + continue } flashbackToVersionResp := resp.Resp.(*kvrpcpb.FlashbackToVersionResponse) if err := flashbackToVersionResp.GetError(); err != "" { - return taskStat, errors.Errorf("unexpected flashback to version err: %v", err) + logutil.BgLogger().Warn("flashback rpc meets error", zap.String("err", err)) + continue } taskStat.CompletedRegions++ if isLast { break } + bo = tikv.NewBackoffer(ctx, flashbackMaxBackoff) startKey = endKey } return taskStat, nil @@ -342,7 +389,11 @@ func flashbackToVersion( d.store.(tikv.Storage), int(variable.GetDDLFlashbackConcurrency()), func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { - return sendFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), version, r) + stats, err := sendFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), version, r) + logutil.BgLogger().Info("flashback cluster stats", + zap.Int("complete region", stats.CompletedRegions), + zap.Error(err)) + return stats, err }, ).RunOnRange(ctx, startKey, endKey) } @@ -364,11 +415,11 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve return ver, errors.Errorf("Not support flashback cluster in non-TiKV env") } - var flashbackTS uint64 + var flashbackTS, minSafeTime uint64 var pdScheduleValue map[string]interface{} - var readOnlyValue string + var readOnlyValue, autoAnalyzeValue, maxAutoAnalyzeTimeValue string var gcEnabledValue bool - if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &readOnlyValue, &gcEnabledValue); err != nil { + if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &readOnlyValue, &gcEnabledValue, &autoAnalyzeValue, &maxAutoAnalyzeTimeValue, &minSafeTime); err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } @@ -381,7 +432,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve defer w.sessPool.put(sess) switch job.SchemaState { - // Stage 1, check and set FlashbackClusterJobID, and save the PD schedule. + // Stage 1, check and set FlashbackClusterJobID, and update job args. case model.StateNone: flashbackJobID, err := t.GetFlashbackClusterJobID() if err != nil { @@ -412,6 +463,23 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve return ver, errors.Trace(err) } job.Args[gcEnabledArgsOffset] = &gcEnableValue + autoAnalyzeValue, err = getTiDBEnableAutoAnalyze(sess) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + job.Args[autoAnalyzeOffset] = &autoAnalyzeValue + maxAutoAnalyzeTimeValue, err = getTiDBMaxAutoAnalyzeTime(sess) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + job.Args[maxAutoAnalyzeTimeOffset] = &maxAutoAnalyzeTimeValue + minSafeTime = d.store.GetMinSafeTS(oracle.GlobalTxnScope) + if minSafeTime == 0 { + minSafeTime = job.StartTS + } + job.Args[minSafeTimeOffset] = &minSafeTime } else { job.State = model.JobStateCancelled return ver, errors.Errorf("Other flashback job(ID: %d) is running", job.ID) @@ -446,7 +514,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve } for _, ranges := range keyRanges { - if err = flashbackToVersion(context.Background(), d, flashbackTS, ranges.StartKey, ranges.EndKey); err != nil { + if err = flashbackToVersion(d.ctx, d, flashbackTS, ranges.StartKey, ranges.EndKey); err != nil { logutil.BgLogger().Warn("[ddl] Get error when do flashback", zap.Error(err)) return ver, err } @@ -461,13 +529,13 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve } func finishFlashbackCluster(w *worker, job *model.Job) error { - var flashbackTS uint64 + var flashbackTS, minSafeTS uint64 var pdScheduleValue map[string]interface{} - var readOnlyValue string + var readOnlyValue, autoAnalyzeValue, maxAutoAnalyzeTime string var gcEnabled bool var jobID int64 - if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &readOnlyValue, &gcEnabled); err != nil { + if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &readOnlyValue, &gcEnabled, &autoAnalyzeValue, &maxAutoAnalyzeTime, &minSafeTS); err != nil { return errors.Trace(err) } sess, err := w.sessPool.get() @@ -489,6 +557,12 @@ func finishFlashbackCluster(w *worker, job *model.Job) error { if err = setTiDBSuperReadOnly(sess, readOnlyValue); err != nil { return err } + if err = setTiDBEnableAutoAnalyze(sess, autoAnalyzeValue); err != nil { + return err + } + if err = setTiDBMaxAutoAnalyzeTime(sess, maxAutoAnalyzeTime); err != nil { + return err + } if gcEnabled { if err = gcutil.EnableGC(sess); err != nil { return err @@ -499,7 +573,7 @@ func finishFlashbackCluster(w *worker, job *model.Job) error { if err != nil { return err } - if err = UpdateFlashbackHistoryTSRanges(t, flashbackTS, t.StartTS, gcSafePoint); err != nil { + if err = meta.UpdateFlashbackHistoryTSRanges(t, minSafeTS, t.StartTS, gcSafePoint); err != nil { return err } } @@ -515,56 +589,3 @@ func finishFlashbackCluster(w *worker, job *model.Job) error { return nil } - -// CheckFlashbackHistoryTSRange checks flashbackTS overlapped with history time ranges or not. -func CheckFlashbackHistoryTSRange(m *meta.Meta, flashbackTS uint64) error { - tsRanges, err := m.GetFlashbackHistoryTSRange() - if err != nil { - return err - } - for _, tsRange := range tsRanges { - if tsRange.StartTS <= flashbackTS && flashbackTS <= tsRange.EndTS { - return errors.Errorf("FlashbackTs overlapped, old range: [%s, %s], flashbackTS: %s", - oracle.GetTimeFromTS(tsRange.StartTS), oracle.GetTimeFromTS(tsRange.EndTS), oracle.GetTimeFromTS(flashbackTS)) - } - } - return nil -} - -// UpdateFlashbackHistoryTSRanges insert [startTS, endTS] into FlashbackHistoryTSRange. -func UpdateFlashbackHistoryTSRanges(m *meta.Meta, startTS uint64, endTS uint64, gcSafePoint uint64) error { - tsRanges, err := m.GetFlashbackHistoryTSRange() - if err != nil { - return err - } - if len(tsRanges) != 0 && tsRanges[len(tsRanges)-1].EndTS >= endTS { - // It's impossible, endTS should always greater than all TS in history TS ranges. - return errors.Errorf("Maybe TSO fallback, last flashback endTS: %d, now: %d", tsRanges[len(tsRanges)-1].EndTS, endTS) - } - - newTsRange := make([]meta.TSRange, 0, len(tsRanges)) - - for _, tsRange := range tsRanges { - if tsRange.EndTS < gcSafePoint { - continue - } - if startTS > tsRange.EndTS { - // tsRange.StartTS < tsRange.EndTS < startTS. - // We should keep tsRange in slices. - newTsRange = append(newTsRange, tsRange) - } else if startTS < tsRange.StartTS { - // startTS < tsRange.StartTS < tsRange.EndTS. - // The remained ts ranges are useless, [startTS, endTS] will cover them, so break. - break - } else { - // tsRange.StartTS < startTS < tsRange.EndTS. - // It's impossible reach here, we checked it before start flashback cluster. - return errors.Errorf("It's an unreachable branch, flashbackTS (%d) in old ts range: [%d, %d]", - startTS, tsRange.StartTS, tsRange.EndTS) - } - } - - // Store the new tsRange. - newTsRange = append(newTsRange, meta.TSRange{StartTS: startTS, EndTS: endTS}) - return m.SetFlashbackHistoryTSRange(newTsRange) -} diff --git a/ddl/cluster_test.go b/ddl/cluster_test.go index 924e7e0e9fd31..818f4c919e04f 100644 --- a/ddl/cluster_test.go +++ b/ddl/cluster_test.go @@ -193,6 +193,12 @@ func TestGlobalVariablesOnFlashback(t *testing.T) { rs, err = tk.Exec("show variables like 'tidb_gc_enable'") assert.NoError(t, err) assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off) + rs, err = tk.Exec("show variables like 'tidb_enable_auto_analyze'") + assert.NoError(t, err) + assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off) + rs, err = tk.Exec("show variables like 'tidb_max_auto_analyze_time'") + assert.NoError(t, err) + assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], "1") } } dom.DDL().SetHook(hook) @@ -281,16 +287,30 @@ func TestFlashbackTimeRange(t *testing.T) { flashbackTime := oracle.GetTimeFromTS(m.StartTS).Add(-10 * time.Minute) // No flashback history, shouldn't return err. - require.NoError(t, ddl.CheckFlashbackHistoryTSRange(m, oracle.GoTimeToTS(flashbackTime))) + require.NoError(t, meta.CheckFlashbackHistoryTSRange(m, oracle.GoTimeToTS(flashbackTime))) // Insert a time range to flashback history ts ranges. - require.NoError(t, ddl.UpdateFlashbackHistoryTSRanges(m, oracle.GoTimeToTS(flashbackTime), m.StartTS, 0)) + require.NoError(t, meta.UpdateFlashbackHistoryTSRanges(m, oracle.GoTimeToTS(flashbackTime), m.StartTS, 0)) historyTS, err := m.GetFlashbackHistoryTSRange() require.NoError(t, err) require.Len(t, historyTS, 1) require.NoError(t, txn.Commit(context.Background())) + // check tidb_snapshot and stale read timestamp + tk := testkit.NewTestKit(t, store) + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) + defer resetGC() + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + sql := fmt.Sprintf("set tidb_snapshot='%v'", flashbackTime.Add(5*time.Minute).Format("2006-01-02 15:04:05")) + tk.MustGetErrMsg(sql, fmt.Sprintf("Can't get snapshot value from flashback time range [%s, %s]", + flashbackTime, oracle.GetTimeFromTS(m.StartTS))) + tk.MustExec("use test") + tk.MustExec("create table t(a int)") + sql = fmt.Sprintf("select * from t as of timestamp '%v'", flashbackTime.Add(5*time.Minute).Format("2006-01-02 15:04:05")) + tk.MustGetErrMsg(sql, fmt.Sprintf("Can't get snapshot value from flashback time range [%s, %s]", + flashbackTime, oracle.GetTimeFromTS(m.StartTS))) + se, err = session.CreateSession4Test(store) require.NoError(t, err) txn, err = se.GetStore().Begin() @@ -299,24 +319,24 @@ func TestFlashbackTimeRange(t *testing.T) { m = meta.NewMeta(txn) require.NoError(t, err) // Flashback history time range is [m.StartTS - 10min, m.StartTS] - require.Error(t, ddl.CheckFlashbackHistoryTSRange(m, oracle.GoTimeToTS(flashbackTime.Add(5*time.Minute)))) + require.Error(t, meta.CheckFlashbackHistoryTSRange(m, oracle.GoTimeToTS(flashbackTime.Add(5*time.Minute)))) // Check add insert a new time range - require.NoError(t, ddl.CheckFlashbackHistoryTSRange(m, oracle.GoTimeToTS(flashbackTime.Add(-5*time.Minute)))) - require.NoError(t, ddl.UpdateFlashbackHistoryTSRanges(m, oracle.GoTimeToTS(flashbackTime.Add(-5*time.Minute)), m.StartTS, 0)) + require.NoError(t, meta.CheckFlashbackHistoryTSRange(m, oracle.GoTimeToTS(flashbackTime.Add(-5*time.Minute)))) + require.NoError(t, meta.UpdateFlashbackHistoryTSRanges(m, oracle.GoTimeToTS(flashbackTime.Add(-5*time.Minute)), m.StartTS, 0)) historyTS, err = m.GetFlashbackHistoryTSRange() require.NoError(t, err) // history time range still equals to 1, because overlapped require.Len(t, historyTS, 1) - require.NoError(t, ddl.UpdateFlashbackHistoryTSRanges(m, oracle.GoTimeToTS(flashbackTime.Add(15*time.Minute)), oracle.GoTimeToTS(flashbackTime.Add(20*time.Minute)), 0)) + require.NoError(t, meta.UpdateFlashbackHistoryTSRanges(m, oracle.GoTimeToTS(flashbackTime.Add(15*time.Minute)), oracle.GoTimeToTS(flashbackTime.Add(20*time.Minute)), 0)) historyTS, err = m.GetFlashbackHistoryTSRange() require.NoError(t, err) require.Len(t, historyTS, 2) // GCSafePoint updated will clean some history TS ranges - require.NoError(t, ddl.UpdateFlashbackHistoryTSRanges(m, + require.NoError(t, meta.UpdateFlashbackHistoryTSRanges(m, oracle.GoTimeToTS(flashbackTime.Add(25*time.Minute)), oracle.GoTimeToTS(flashbackTime.Add(30*time.Minute)), oracle.GoTimeToTS(flashbackTime.Add(22*time.Minute)))) diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 4dc457b539867..11f78b9c6ccf9 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -2651,7 +2651,10 @@ func (d *ddl) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error flashbackTS, map[string]interface{}{}, variable.On, /* tidb_super_read_only */ - true /* tidb_gc_enable */}, + true, /* tidb_gc_enable */ + variable.On, /* tidb_enable_auto_analyze */ + "1", /* tidb_max_auto_analyze_time */ + flashbackTS /* min safe time */}, } err := d.DoDDLJob(ctx, job) err = d.callHookOnChanged(job, err) diff --git a/go.mod b/go.mod index 88b19ed5e8771..60ba47616aa94 100644 --- a/go.mod +++ b/go.mod @@ -65,7 +65,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3 github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 - github.com/pingcap/kvproto v0.0.0-20220913025519-586cff113d10 + github.com/pingcap/kvproto v0.0.0-20220913050750-f6d05706948a github.com/pingcap/log v1.1.0 github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e diff --git a/go.sum b/go.sum index 7cbe99ed6235e..67ef5b9c9428f 100644 --- a/go.sum +++ b/go.sum @@ -752,8 +752,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20220510035547-0e2f26c0a46a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= -github.com/pingcap/kvproto v0.0.0-20220913025519-586cff113d10 h1:/92S0s/TCoCmK2vv6WbkXNeqtLn90sHRJ5Vlx1Sigas= -github.com/pingcap/kvproto v0.0.0-20220913025519-586cff113d10/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +github.com/pingcap/kvproto v0.0.0-20220913050750-f6d05706948a h1:LCtkOPEzjWk86NclzxdZ42nNtbhuIN1p6cpd/FYUqkU= +github.com/pingcap/kvproto v0.0.0-20220913050750-f6d05706948a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= diff --git a/meta/meta.go b/meta/meta.go index d0884f722e70e..33ed115d1757a 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/structure" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/logutil" + "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" ) @@ -655,6 +656,59 @@ func (m *Meta) GetFlashbackHistoryTSRange() (timeRange []TSRange, err error) { return timeRange, nil } +// CheckFlashbackHistoryTSRange checks flashbackTS overlapped with history time ranges or not. +func CheckFlashbackHistoryTSRange(m *Meta, targetTS uint64) error { + tsRanges, err := m.GetFlashbackHistoryTSRange() + if err != nil { + return err + } + for _, tsRange := range tsRanges { + if tsRange.StartTS <= targetTS && targetTS <= tsRange.EndTS { + return errors.Errorf("Can't get snapshot value from flashback time range [%s, %s]", + oracle.GetTimeFromTS(tsRange.StartTS), oracle.GetTimeFromTS(tsRange.EndTS)) + } + } + return nil +} + +// UpdateFlashbackHistoryTSRanges insert [startTS, endTS] into FlashbackHistoryTSRange. +func UpdateFlashbackHistoryTSRanges(m *Meta, startTS uint64, endTS uint64, gcSafePoint uint64) error { + tsRanges, err := m.GetFlashbackHistoryTSRange() + if err != nil { + return err + } + if len(tsRanges) != 0 && tsRanges[len(tsRanges)-1].EndTS >= endTS { + // It's impossible, endTS should always greater than all TS in history TS ranges. + return errors.Errorf("Maybe TSO fallback, last flashback endTS: %d, now: %d", tsRanges[len(tsRanges)-1].EndTS, endTS) + } + + newTsRange := make([]TSRange, 0, len(tsRanges)) + + for _, tsRange := range tsRanges { + if tsRange.EndTS < gcSafePoint { + continue + } + if startTS > tsRange.EndTS { + // tsRange.StartTS < tsRange.EndTS < startTS. + // We should keep tsRange in slices. + newTsRange = append(newTsRange, tsRange) + } else if startTS < tsRange.StartTS { + // startTS < tsRange.StartTS < tsRange.EndTS. + // The remained ts ranges are useless, [startTS, endTS] will cover them, so break. + break + } else { + // tsRange.StartTS < startTS < tsRange.EndTS. + // It's impossible reach here, we checked it before start flashback cluster. + return errors.Errorf("It's an unreachable branch, flashbackTS (%d) in old ts range: [%d, %d]", + startTS, tsRange.StartTS, tsRange.EndTS) + } + } + + // Store the new tsRange. + newTsRange = append(newTsRange, TSRange{StartTS: startTS, EndTS: endTS}) + return m.SetFlashbackHistoryTSRange(newTsRange) +} + // SetConcurrentDDL set the concurrent DDL flag. func (m *Meta) SetConcurrentDDL(b bool) error { var data []byte diff --git a/sessionctx/context.go b/sessionctx/context.go index 7be92f102e56c..7a64830cd892a 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx/sessionstates" @@ -225,7 +226,11 @@ func ValidateSnapshotReadTS(ctx context.Context, sctx Context, readTS uint64) er return errors.Errorf("cannot set read timestamp to a future time") } } - return nil + txn, err := sctx.Txn(true) + if err != nil { + return err + } + return meta.CheckFlashbackHistoryTSRange(meta.NewMeta(txn), readTS) } // How far future from now ValidateStaleReadTS allows at most @@ -246,7 +251,11 @@ func ValidateStaleReadTS(ctx context.Context, sctx Context, readTS uint64) error if oracle.GetTimeFromTS(readTS).After(oracle.GetTimeFromTS(currentTS).Add(allowedTimeFromNow)) { return errors.Errorf("cannot set read timestamp to a future time") } - return nil + txn, err := sctx.Txn(true) + if err != nil { + return err + } + return meta.CheckFlashbackHistoryTSRange(meta.NewMeta(txn), readTS) } // SysProcTracker is used to track background sys processes diff --git a/tests/realtikvtest/brietest/flashback_test.go b/tests/realtikvtest/brietest/flashback_test.go index 1f490c30e0a2f..9c15845a108ad 100644 --- a/tests/realtikvtest/brietest/flashback_test.go +++ b/tests/realtikvtest/brietest/flashback_test.go @@ -54,7 +54,6 @@ func MockGC(tk *testkit.TestKit) (string, string, string, func()) { } func TestFlashback(t *testing.T) { - t.Skip("skip this test because TestFlashback isn't ready.") if *realtikvtest.WithRealTiKV { store := realtikvtest.CreateMockStoreAndSetup(t) From e9e588fcd73570976df112ae41141dec1da76a97 Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Tue, 20 Sep 2022 19:22:59 +0800 Subject: [PATCH 02/29] improve error msg --- ddl/cluster_test.go | 4 ++-- meta/meta.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ddl/cluster_test.go b/ddl/cluster_test.go index 818f4c919e04f..5e0dd16a837e5 100644 --- a/ddl/cluster_test.go +++ b/ddl/cluster_test.go @@ -303,12 +303,12 @@ func TestFlashbackTimeRange(t *testing.T) { defer resetGC() tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) sql := fmt.Sprintf("set tidb_snapshot='%v'", flashbackTime.Add(5*time.Minute).Format("2006-01-02 15:04:05")) - tk.MustGetErrMsg(sql, fmt.Sprintf("Can't get snapshot value from flashback time range [%s, %s]", + tk.MustGetErrMsg(sql, fmt.Sprintf("can't set timestamp to history flashback time range [%s, %s]", flashbackTime, oracle.GetTimeFromTS(m.StartTS))) tk.MustExec("use test") tk.MustExec("create table t(a int)") sql = fmt.Sprintf("select * from t as of timestamp '%v'", flashbackTime.Add(5*time.Minute).Format("2006-01-02 15:04:05")) - tk.MustGetErrMsg(sql, fmt.Sprintf("Can't get snapshot value from flashback time range [%s, %s]", + tk.MustGetErrMsg(sql, fmt.Sprintf("can't set timestamp to history flashback time range [%s, %s]", flashbackTime, oracle.GetTimeFromTS(m.StartTS))) se, err = session.CreateSession4Test(store) diff --git a/meta/meta.go b/meta/meta.go index 33ed115d1757a..1ea5f856bae77 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -664,7 +664,7 @@ func CheckFlashbackHistoryTSRange(m *Meta, targetTS uint64) error { } for _, tsRange := range tsRanges { if tsRange.StartTS <= targetTS && targetTS <= tsRange.EndTS { - return errors.Errorf("Can't get snapshot value from flashback time range [%s, %s]", + return errors.Errorf("can't set timestamp to history flashback time range [%s, %s]", oracle.GetTimeFromTS(tsRange.StartTS), oracle.GetTimeFromTS(tsRange.EndTS)) } } From 4e7960da1e524bc3685bc54eea103d23a57ccc44 Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Tue, 20 Sep 2022 19:35:09 +0800 Subject: [PATCH 03/29] update bazel --- DEPS.bzl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index f1e62f4b48de3..17febd1ce90fd 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -2803,8 +2803,8 @@ def go_deps(): name = "com_github_pingcap_kvproto", build_file_proto_mode = "disable_global", importpath = "github.com/pingcap/kvproto", - sum = "h1:/92S0s/TCoCmK2vv6WbkXNeqtLn90sHRJ5Vlx1Sigas=", - version = "v0.0.0-20220913025519-586cff113d10", + sum = "h1:LCtkOPEzjWk86NclzxdZ42nNtbhuIN1p6cpd/FYUqkU=", + version = "v0.0.0-20220913050750-f6d05706948a", ) go_repository( name = "com_github_pingcap_log", From e6b48adacee61e085ccaebeecd3cca8713b82969 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Wed, 21 Sep 2022 17:12:55 +0800 Subject: [PATCH 04/29] fix teset --- sessionctx/context.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sessionctx/context.go b/sessionctx/context.go index 7a64830cd892a..c84dd68f16d78 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -226,7 +226,7 @@ func ValidateSnapshotReadTS(ctx context.Context, sctx Context, readTS uint64) er return errors.Errorf("cannot set read timestamp to a future time") } } - txn, err := sctx.Txn(true) + txn, err := sctx.GetStore().Begin() if err != nil { return err } @@ -251,7 +251,7 @@ func ValidateStaleReadTS(ctx context.Context, sctx Context, readTS uint64) error if oracle.GetTimeFromTS(readTS).After(oracle.GetTimeFromTS(currentTS).Add(allowedTimeFromNow)) { return errors.Errorf("cannot set read timestamp to a future time") } - txn, err := sctx.Txn(true) + txn, err := sctx.GetStore().Begin() if err != nil { return err } From 115adb5259fdb35eb94d15e3dffddc468d313de1 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Thu, 22 Sep 2022 15:51:13 +0800 Subject: [PATCH 05/29] delete useless code --- ddl/cluster.go | 86 +++++++--------------------- ddl/cluster_test.go | 72 ------------------------ ddl/ddl_api.go | 4 +- meta/meta.go | 126 +++++++----------------------------------- sessionctx/context.go | 13 +---- 5 files changed, 44 insertions(+), 257 deletions(-) diff --git a/ddl/cluster.go b/ddl/cluster.go index dddd2d1e45822..b690e2c7aa792 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -58,11 +58,9 @@ const ( flashbackMaxBackoff = 1800000 // 1800s flashbackTimeout = 3 * time.Minute // 3min - readOnlyArgsOffset = 2 - gcEnabledArgsOffset = 3 - autoAnalyzeOffset = 4 - maxAutoAnalyzeTimeOffset = 5 - minSafeTimeOffset = 6 + gcEnabledArgsOffset = 2 + autoAnalyzeOffset = 3 + maxAutoAnalyzeTimeOffset = 4 ) func closePDSchedule() error { @@ -119,18 +117,6 @@ func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBack return gcutil.ValidateSnapshotWithGCSafePoint(flashBackTS, gcSafePoint) } -func setTiDBSuperReadOnly(sess sessionctx.Context, value string) error { - return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiDBSuperReadOnly, value) -} - -func getTiDBSuperReadOnly(sess sessionctx.Context) (string, error) { - val, err := sess.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBSuperReadOnly) - if err != nil { - return "", errors.Trace(err) - } - return val, nil -} - func setTiDBEnableAutoAnalyze(sess sessionctx.Context, value string) error { return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiDBEnableAutoAnalyze, value) } @@ -159,9 +145,6 @@ func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta if err = ValidateFlashbackTS(d.ctx, sess, flashbackTS); err != nil { return err } - if err = meta.CheckFlashbackHistoryTSRange(t, flashbackTS); err != nil { - return err - } if err = gcutil.DisableGC(sess); err != nil { return err @@ -169,9 +152,6 @@ func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta if err = closePDSchedule(); err != nil { return err } - if err = setTiDBSuperReadOnly(sess, variable.On); err != nil { - return err - } if err = setTiDBEnableAutoAnalyze(sess, variable.Off); err != nil { return err } @@ -298,6 +278,7 @@ func sendFlashbackToVersionRPC( ctx context.Context, s tikv.Storage, version uint64, + startTS, commitTS uint64, r tikvstore.KeyRange, ) (rangetask.TaskStat, error) { startKey, rangeEndKey := r.StartKey, r.EndKey @@ -326,16 +307,6 @@ func sendFlashbackToVersionRPC( endKey = rangeEndKey } - // Because flashback has no guarantee of transactional consistency, - // we could use different startTS and commitTS for each region. - startTS, err := s.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) - if err != nil { - return taskStat, err - } - commitTS, err := s.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) - if err != nil { - return taskStat, err - } req := tikvrpc.NewRequest(tikvrpc.CmdFlashbackToVersion, &kvrpcpb.FlashbackToVersionRequest{ Version: version, StartKey: startKey, @@ -382,6 +353,7 @@ func flashbackToVersion( ctx context.Context, d *ddlCtx, version uint64, + startTS, commitTS uint64, startKey []byte, endKey []byte, ) (err error) { return rangetask.NewRangeTaskRunner( @@ -389,7 +361,7 @@ func flashbackToVersion( d.store.(tikv.Storage), int(variable.GetDDLFlashbackConcurrency()), func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { - stats, err := sendFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), version, r) + stats, err := sendFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), version, startTS, commitTS, r) logutil.BgLogger().Info("flashback cluster stats", zap.Int("complete region", stats.CompletedRegions), zap.Error(err)) @@ -451,12 +423,6 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve job.State = model.JobStateCancelled return ver, errors.Trace(err) } - readOnlyValue, err = getTiDBSuperReadOnly(sess) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - job.Args[readOnlyArgsOffset] = &readOnlyValue gcEnableValue, err := gcutil.CheckGCEnable(sess) if err != nil { job.State = model.JobStateCancelled @@ -475,11 +441,6 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve return ver, errors.Trace(err) } job.Args[maxAutoAnalyzeTimeOffset] = &maxAutoAnalyzeTimeValue - minSafeTime = d.store.GetMinSafeTS(oracle.GlobalTxnScope) - if minSafeTime == 0 { - minSafeTime = job.StartTS - } - job.Args[minSafeTimeOffset] = &minSafeTime } else { job.State = model.JobStateCancelled return ver, errors.Errorf("Other flashback job(ID: %d) is running", job.ID) @@ -492,12 +453,15 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve job.State = model.JobStateCancelled return ver, errors.Trace(err) } - // A hack way to make global variables are synchronized to all TiDB. - // TiKV will block read/write requests during flashback cluster. - // So it's not very dangerous when sync failed. - time.Sleep(1 * time.Second) + + _, err := GetFlashbackKeyRanges(sess, tablecodec.EncodeTablePrefix(0)) + if err != nil { + return ver, errors.Trace(err) + } + + // TODO, lock all regions, update schema diff, close all txns. job.SchemaState = model.StateWriteReorganization - return ver, nil + return updateSchemaVersion(d, t, job) // Stage 3, get key ranges. case model.StateWriteReorganization: // TODO: Support flashback in unistore. @@ -513,8 +477,12 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve return ver, errors.Trace(err) } + commitTS, err := d.store.GetOracle().GetTimestamp(d.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + if err != nil { + return ver, errors.Trace(err) + } for _, ranges := range keyRanges { - if err = flashbackToVersion(d.ctx, d, flashbackTS, ranges.StartKey, ranges.EndKey); err != nil { + if err = flashbackToVersion(d.ctx, d, flashbackTS, t.StartTS, commitTS, ranges.StartKey, ranges.EndKey); err != nil { logutil.BgLogger().Warn("[ddl] Get error when do flashback", zap.Error(err)) return ver, err } @@ -529,13 +497,13 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve } func finishFlashbackCluster(w *worker, job *model.Job) error { - var flashbackTS, minSafeTS uint64 + var flashbackTS uint64 var pdScheduleValue map[string]interface{} var readOnlyValue, autoAnalyzeValue, maxAutoAnalyzeTime string var gcEnabled bool var jobID int64 - if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &readOnlyValue, &gcEnabled, &autoAnalyzeValue, &maxAutoAnalyzeTime, &minSafeTS); err != nil { + if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &readOnlyValue, &gcEnabled, &autoAnalyzeValue, &maxAutoAnalyzeTime); err != nil { return errors.Trace(err) } sess, err := w.sessPool.get() @@ -554,9 +522,6 @@ func finishFlashbackCluster(w *worker, job *model.Job) error { if err = recoverPDSchedule(pdScheduleValue); err != nil { return err } - if err = setTiDBSuperReadOnly(sess, readOnlyValue); err != nil { - return err - } if err = setTiDBEnableAutoAnalyze(sess, autoAnalyzeValue); err != nil { return err } @@ -568,15 +533,6 @@ func finishFlashbackCluster(w *worker, job *model.Job) error { return err } } - if job.IsDone() || job.IsSynced() { - gcSafePoint, err := gcutil.GetGCSafePoint(sess) - if err != nil { - return err - } - if err = meta.UpdateFlashbackHistoryTSRanges(t, minSafeTS, t.StartTS, gcSafePoint); err != nil { - return err - } - } if err = t.SetFlashbackClusterJobID(0); err != nil { return err } diff --git a/ddl/cluster_test.go b/ddl/cluster_test.go index 5e0dd16a837e5..b36ec41337aef 100644 --- a/ddl/cluster_test.go +++ b/ddl/cluster_test.go @@ -24,7 +24,6 @@ import ( "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/errno" - "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx/variable" @@ -274,74 +273,3 @@ func TestCancelFlashbackCluster(t *testing.T) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) } - -func TestFlashbackTimeRange(t *testing.T) { - store := testkit.CreateMockStore(t) - - se, err := session.CreateSession4Test(store) - require.NoError(t, err) - txn, err := se.GetStore().Begin() - require.NoError(t, err) - - m := meta.NewMeta(txn) - flashbackTime := oracle.GetTimeFromTS(m.StartTS).Add(-10 * time.Minute) - - // No flashback history, shouldn't return err. - require.NoError(t, meta.CheckFlashbackHistoryTSRange(m, oracle.GoTimeToTS(flashbackTime))) - - // Insert a time range to flashback history ts ranges. - require.NoError(t, meta.UpdateFlashbackHistoryTSRanges(m, oracle.GoTimeToTS(flashbackTime), m.StartTS, 0)) - - historyTS, err := m.GetFlashbackHistoryTSRange() - require.NoError(t, err) - require.Len(t, historyTS, 1) - require.NoError(t, txn.Commit(context.Background())) - - // check tidb_snapshot and stale read timestamp - tk := testkit.NewTestKit(t, store) - timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) - defer resetGC() - tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) - sql := fmt.Sprintf("set tidb_snapshot='%v'", flashbackTime.Add(5*time.Minute).Format("2006-01-02 15:04:05")) - tk.MustGetErrMsg(sql, fmt.Sprintf("can't set timestamp to history flashback time range [%s, %s]", - flashbackTime, oracle.GetTimeFromTS(m.StartTS))) - tk.MustExec("use test") - tk.MustExec("create table t(a int)") - sql = fmt.Sprintf("select * from t as of timestamp '%v'", flashbackTime.Add(5*time.Minute).Format("2006-01-02 15:04:05")) - tk.MustGetErrMsg(sql, fmt.Sprintf("can't set timestamp to history flashback time range [%s, %s]", - flashbackTime, oracle.GetTimeFromTS(m.StartTS))) - - se, err = session.CreateSession4Test(store) - require.NoError(t, err) - txn, err = se.GetStore().Begin() - require.NoError(t, err) - - m = meta.NewMeta(txn) - require.NoError(t, err) - // Flashback history time range is [m.StartTS - 10min, m.StartTS] - require.Error(t, meta.CheckFlashbackHistoryTSRange(m, oracle.GoTimeToTS(flashbackTime.Add(5*time.Minute)))) - - // Check add insert a new time range - require.NoError(t, meta.CheckFlashbackHistoryTSRange(m, oracle.GoTimeToTS(flashbackTime.Add(-5*time.Minute)))) - require.NoError(t, meta.UpdateFlashbackHistoryTSRanges(m, oracle.GoTimeToTS(flashbackTime.Add(-5*time.Minute)), m.StartTS, 0)) - - historyTS, err = m.GetFlashbackHistoryTSRange() - require.NoError(t, err) - // history time range still equals to 1, because overlapped - require.Len(t, historyTS, 1) - - require.NoError(t, meta.UpdateFlashbackHistoryTSRanges(m, oracle.GoTimeToTS(flashbackTime.Add(15*time.Minute)), oracle.GoTimeToTS(flashbackTime.Add(20*time.Minute)), 0)) - historyTS, err = m.GetFlashbackHistoryTSRange() - require.NoError(t, err) - require.Len(t, historyTS, 2) - - // GCSafePoint updated will clean some history TS ranges - require.NoError(t, meta.UpdateFlashbackHistoryTSRanges(m, - oracle.GoTimeToTS(flashbackTime.Add(25*time.Minute)), - oracle.GoTimeToTS(flashbackTime.Add(30*time.Minute)), - oracle.GoTimeToTS(flashbackTime.Add(22*time.Minute)))) - historyTS, err = m.GetFlashbackHistoryTSRange() - require.NoError(t, err) - require.Len(t, historyTS, 1) - require.NoError(t, txn.Commit(context.Background())) -} diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 8f86fde970df0..9c15ef31101e1 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -2656,11 +2656,9 @@ func (d *ddl) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error Args: []interface{}{ flashbackTS, map[string]interface{}{}, - variable.On, /* tidb_super_read_only */ true, /* tidb_gc_enable */ variable.On, /* tidb_enable_auto_analyze */ - "1", /* tidb_max_auto_analyze_time */ - flashbackTS /* min safe time */}, + "1" /* tidb_max_auto_analyze_time */}, } err := d.DoDDLJob(ctx, job) err = d.callHookOnChanged(job, err) diff --git a/meta/meta.go b/meta/meta.go index 1ea5f856bae77..f8537b5dd9ab6 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -35,7 +35,6 @@ import ( "github.com/pingcap/tidb/structure" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/logutil" - "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" ) @@ -60,27 +59,26 @@ var ( // var ( - mMetaPrefix = []byte("m") - mNextGlobalIDKey = []byte("NextGlobalID") - mSchemaVersionKey = []byte("SchemaVersionKey") - mDBs = []byte("DBs") - mDBPrefix = "DB" - mTablePrefix = "Table" - mSequencePrefix = "SID" - mSeqCyclePrefix = "SequenceCycle" - mTableIDPrefix = "TID" - mIncIDPrefix = "IID" - mRandomIDPrefix = "TARID" - mBootstrapKey = []byte("BootstrapKey") - mSchemaDiffPrefix = "Diff" - mPolicies = []byte("Policies") - mPolicyPrefix = "Policy" - mPolicyGlobalID = []byte("PolicyGlobalID") - mPolicyMagicByte = CurrentMagicByteVer - mDDLTableVersion = []byte("DDLTableVersion") - mConcurrentDDL = []byte("concurrentDDL") - mInFlashbackCluster = []byte("InFlashbackCluster") - mFlashbackHistoryTSRange = []byte("FlashbackHistoryTSRange") + mMetaPrefix = []byte("m") + mNextGlobalIDKey = []byte("NextGlobalID") + mSchemaVersionKey = []byte("SchemaVersionKey") + mDBs = []byte("DBs") + mDBPrefix = "DB" + mTablePrefix = "Table" + mSequencePrefix = "SID" + mSeqCyclePrefix = "SequenceCycle" + mTableIDPrefix = "TID" + mIncIDPrefix = "IID" + mRandomIDPrefix = "TARID" + mBootstrapKey = []byte("BootstrapKey") + mSchemaDiffPrefix = "Diff" + mPolicies = []byte("Policies") + mPolicyPrefix = "Policy" + mPolicyGlobalID = []byte("PolicyGlobalID") + mPolicyMagicByte = CurrentMagicByteVer + mDDLTableVersion = []byte("DDLTableVersion") + mConcurrentDDL = []byte("concurrentDDL") + mInFlashbackCluster = []byte("InFlashbackCluster") ) const ( @@ -625,90 +623,6 @@ func (m *Meta) GetFlashbackClusterJobID() (int64, error) { return int64(binary.BigEndian.Uint64(val)), nil } -// TSRange store a range time -type TSRange struct { - StartTS uint64 - EndTS uint64 -} - -// SetFlashbackHistoryTSRange store flashback time range to TiKV -func (m *Meta) SetFlashbackHistoryTSRange(timeRange []TSRange) error { - timeRangeByte, err := json.Marshal(timeRange) - if err != nil { - return err - } - return errors.Trace(m.txn.Set(mFlashbackHistoryTSRange, timeRangeByte)) -} - -// GetFlashbackHistoryTSRange get flashback time range from TiKV -func (m *Meta) GetFlashbackHistoryTSRange() (timeRange []TSRange, err error) { - timeRangeByte, err := m.txn.Get(mFlashbackHistoryTSRange) - if err != nil { - return nil, err - } - if len(timeRangeByte) == 0 { - return []TSRange{}, nil - } - err = json.Unmarshal(timeRangeByte, &timeRange) - if err != nil { - return nil, err - } - return timeRange, nil -} - -// CheckFlashbackHistoryTSRange checks flashbackTS overlapped with history time ranges or not. -func CheckFlashbackHistoryTSRange(m *Meta, targetTS uint64) error { - tsRanges, err := m.GetFlashbackHistoryTSRange() - if err != nil { - return err - } - for _, tsRange := range tsRanges { - if tsRange.StartTS <= targetTS && targetTS <= tsRange.EndTS { - return errors.Errorf("can't set timestamp to history flashback time range [%s, %s]", - oracle.GetTimeFromTS(tsRange.StartTS), oracle.GetTimeFromTS(tsRange.EndTS)) - } - } - return nil -} - -// UpdateFlashbackHistoryTSRanges insert [startTS, endTS] into FlashbackHistoryTSRange. -func UpdateFlashbackHistoryTSRanges(m *Meta, startTS uint64, endTS uint64, gcSafePoint uint64) error { - tsRanges, err := m.GetFlashbackHistoryTSRange() - if err != nil { - return err - } - if len(tsRanges) != 0 && tsRanges[len(tsRanges)-1].EndTS >= endTS { - // It's impossible, endTS should always greater than all TS in history TS ranges. - return errors.Errorf("Maybe TSO fallback, last flashback endTS: %d, now: %d", tsRanges[len(tsRanges)-1].EndTS, endTS) - } - - newTsRange := make([]TSRange, 0, len(tsRanges)) - - for _, tsRange := range tsRanges { - if tsRange.EndTS < gcSafePoint { - continue - } - if startTS > tsRange.EndTS { - // tsRange.StartTS < tsRange.EndTS < startTS. - // We should keep tsRange in slices. - newTsRange = append(newTsRange, tsRange) - } else if startTS < tsRange.StartTS { - // startTS < tsRange.StartTS < tsRange.EndTS. - // The remained ts ranges are useless, [startTS, endTS] will cover them, so break. - break - } else { - // tsRange.StartTS < startTS < tsRange.EndTS. - // It's impossible reach here, we checked it before start flashback cluster. - return errors.Errorf("It's an unreachable branch, flashbackTS (%d) in old ts range: [%d, %d]", - startTS, tsRange.StartTS, tsRange.EndTS) - } - } - - // Store the new tsRange. - newTsRange = append(newTsRange, TSRange{StartTS: startTS, EndTS: endTS}) - return m.SetFlashbackHistoryTSRange(newTsRange) -} - // SetConcurrentDDL set the concurrent DDL flag. func (m *Meta) SetConcurrentDDL(b bool) error { var data []byte diff --git a/sessionctx/context.go b/sessionctx/context.go index c84dd68f16d78..7be92f102e56c 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx/sessionstates" @@ -226,11 +225,7 @@ func ValidateSnapshotReadTS(ctx context.Context, sctx Context, readTS uint64) er return errors.Errorf("cannot set read timestamp to a future time") } } - txn, err := sctx.GetStore().Begin() - if err != nil { - return err - } - return meta.CheckFlashbackHistoryTSRange(meta.NewMeta(txn), readTS) + return nil } // How far future from now ValidateStaleReadTS allows at most @@ -251,11 +246,7 @@ func ValidateStaleReadTS(ctx context.Context, sctx Context, readTS uint64) error if oracle.GetTimeFromTS(readTS).After(oracle.GetTimeFromTS(currentTS).Add(allowedTimeFromNow)) { return errors.Errorf("cannot set read timestamp to a future time") } - txn, err := sctx.GetStore().Begin() - if err != nil { - return err - } - return meta.CheckFlashbackHistoryTSRange(meta.NewMeta(txn), readTS) + return nil } // SysProcTracker is used to track background sys processes From fdc66539d29c8b711944b2c947f305a9859556da Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Mon, 26 Sep 2022 14:31:35 +0800 Subject: [PATCH 06/29] kill non-flashback cluster query --- ddl/cluster.go | 44 +++++++++++++++++++++++--------------- domain/schema_validator.go | 3 +++ infoschema/builder.go | 2 ++ server/server.go | 22 +++++++++++++++++++ util/processinfo.go | 1 + 5 files changed, 55 insertions(+), 17 deletions(-) diff --git a/ddl/cluster.go b/ddl/cluster.go index b690e2c7aa792..940ff7b092922 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -58,6 +58,7 @@ const ( flashbackMaxBackoff = 1800000 // 1800s flashbackTimeout = 3 * time.Minute // 3min + pdScheduleArgsOffset = 1 gcEnabledArgsOffset = 2 autoAnalyzeOffset = 3 maxAutoAnalyzeTimeOffset = 4 @@ -80,7 +81,7 @@ func savePDSchedule(job *model.Job) error { for _, key := range pdScheduleKey { saveValue[key] = retValue[key] } - job.Args[1] = &saveValue + job.Args[pdScheduleArgsOffset] = &saveValue return nil } @@ -168,9 +169,16 @@ func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta if err != nil { return errors.Trace(err) } + // If flashbackSchemaVersion not same as nowSchemaVersion, we've done ddl during [flashbackTs, now). - if flashbackSchemaVersion != nowSchemaVersion { - return errors.Errorf("Had ddl history during [%s, now), can't do flashback", oracle.GetTimeFromTS(flashbackTS)) + for i := flashbackSchemaVersion + 1; i <= nowSchemaVersion; i++ { + diff, err := t.GetSchemaDiff(i) + if err != nil { + return errors.Trace(err) + } + if diff != nil && diff.Type != model.ActionFlashbackCluster { + return errors.Errorf("Had ddl history during [%s, now), can't do flashback", oracle.GetTimeFromTS(flashbackTS)) + } } jobs, err := GetAllDDLJobs(sess, t) @@ -370,10 +378,11 @@ func flashbackToVersion( ).RunOnRange(ctx, startKey, endKey) } -// A Flashback has 3 different stages. +// A Flashback has 4 different stages. // 1. before lock flashbackClusterJobID, check clusterJobID and lock it. // 2. before flashback start, check timestamp, disable GC and close PD schedule. -// 3. before flashback done, get key ranges, send flashback RPC. +// 3. phase 1, get key ranges, lock all regions. +// 4. phase 2, send flashback RPC, do flashback jobs. func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) { inFlashbackTest := false failpoint.Inject("mockFlashbackTest", func(val failpoint.Value) { @@ -387,11 +396,11 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve return ver, errors.Errorf("Not support flashback cluster in non-TiKV env") } - var flashbackTS, minSafeTime uint64 + var flashbackTS uint64 var pdScheduleValue map[string]interface{} - var readOnlyValue, autoAnalyzeValue, maxAutoAnalyzeTimeValue string + var autoAnalyzeValue, maxAutoAnalyzeTimeValue string var gcEnabledValue bool - if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &readOnlyValue, &gcEnabledValue, &autoAnalyzeValue, &maxAutoAnalyzeTimeValue, &minSafeTime); err != nil { + if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabledValue, &autoAnalyzeValue, &maxAutoAnalyzeTimeValue); err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } @@ -445,24 +454,26 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve job.State = model.JobStateCancelled return ver, errors.Errorf("Other flashback job(ID: %d) is running", job.ID) } - job.SchemaState = model.StateWriteOnly + job.SchemaState = model.StateDeleteOnly return ver, nil // Stage 2, check flashbackTS, close GC and PD schedule. - case model.StateWriteOnly: + case model.StateDeleteOnly: if err = checkAndSetFlashbackClusterInfo(sess, d, t, job, flashbackTS); err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } - + job.SchemaState = model.StateWriteOnly + return ver, nil + // Stage 3, get key ranges and get locks. + case model.StateWriteOnly: _, err := GetFlashbackKeyRanges(sess, tablecodec.EncodeTablePrefix(0)) if err != nil { return ver, errors.Trace(err) } - - // TODO, lock all regions, update schema diff, close all txns. + // TODO, lock all regions. job.SchemaState = model.StateWriteReorganization return updateSchemaVersion(d, t, job) - // Stage 3, get key ranges. + // Stage 4, get key ranges and send flashback RPC. case model.StateWriteReorganization: // TODO: Support flashback in unistore. if inFlashbackTest { @@ -471,7 +482,6 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve job.SchemaState = model.StatePublic return ver, nil } - keyRanges, err := GetFlashbackKeyRanges(sess, tablecodec.EncodeTablePrefix(0)) if err != nil { return ver, errors.Trace(err) @@ -499,11 +509,11 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve func finishFlashbackCluster(w *worker, job *model.Job) error { var flashbackTS uint64 var pdScheduleValue map[string]interface{} - var readOnlyValue, autoAnalyzeValue, maxAutoAnalyzeTime string + var autoAnalyzeValue, maxAutoAnalyzeTime string var gcEnabled bool var jobID int64 - if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &readOnlyValue, &gcEnabled, &autoAnalyzeValue, &maxAutoAnalyzeTime); err != nil { + if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabled, &autoAnalyzeValue, &maxAutoAnalyzeTime); err != nil { return errors.Trace(err) } sess, err := w.sessPool.get() diff --git a/domain/schema_validator.go b/domain/schema_validator.go index 511553feafad8..f6f4f265068ae 100644 --- a/domain/schema_validator.go +++ b/domain/schema_validator.go @@ -153,6 +153,9 @@ func (s *schemaValidator) Update(leaseGrantTS uint64, oldVer, currVer int64, cha if ac == 1< Date: Mon, 26 Sep 2022 16:11:42 +0800 Subject: [PATCH 07/29] update test --- ddl/cluster_test.go | 19 ++++--------------- domain/schema_validator.go | 2 +- parser/model/ddl.go | 3 ++- server/server.go | 3 ++- testkit/mocksessionmanager.go | 19 +++++++++++++++++++ util/processinfo.go | 3 ++- 6 files changed, 30 insertions(+), 19 deletions(-) diff --git a/ddl/cluster_test.go b/ddl/cluster_test.go index b36ec41337aef..1898c00ef67d3 100644 --- a/ddl/cluster_test.go +++ b/ddl/cluster_test.go @@ -186,10 +186,7 @@ func TestGlobalVariablesOnFlashback(t *testing.T) { hook.OnJobRunBeforeExported = func(job *model.Job) { assert.Equal(t, model.ActionFlashbackCluster, job.Type) if job.SchemaState == model.StateWriteReorganization { - rs, err := tk.Exec("show variables like 'tidb_super_read_only'") - assert.NoError(t, err) - assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.On) - rs, err = tk.Exec("show variables like 'tidb_gc_enable'") + rs, err := tk.Exec("show variables like 'tidb_gc_enable'") assert.NoError(t, err) assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off) rs, err = tk.Exec("show variables like 'tidb_enable_auto_analyze'") @@ -201,28 +198,20 @@ func TestGlobalVariablesOnFlashback(t *testing.T) { } } dom.DDL().SetHook(hook) - // first try with `tidb_gc_enable` = on and `tidb_super_read_only` = off + // first try with `tidb_gc_enable` = on tk.MustExec("set global tidb_gc_enable = on") - tk.MustExec("set global tidb_super_read_only = off") tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) - rs, err := tk.Exec("show variables like 'tidb_super_read_only'") - require.NoError(t, err) - require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off) - rs, err = tk.Exec("show variables like 'tidb_gc_enable'") + rs, err := tk.Exec("show variables like 'tidb_gc_enable'") require.NoError(t, err) require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.On) - // second try with `tidb_gc_enable` = off and `tidb_super_read_only` = on + // second try with `tidb_gc_enable` = off tk.MustExec("set global tidb_gc_enable = off") - tk.MustExec("set global tidb_super_read_only = on") ts, err = tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) require.NoError(t, err) tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) - rs, err = tk.Exec("show variables like 'tidb_super_read_only'") - require.NoError(t, err) - require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.On) rs, err = tk.Exec("show variables like 'tidb_gc_enable'") require.NoError(t, err) require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off) diff --git a/domain/schema_validator.go b/domain/schema_validator.go index f6f4f265068ae..6f028fdca2e70 100644 --- a/domain/schema_validator.go +++ b/domain/schema_validator.go @@ -154,7 +154,7 @@ func (s *schemaValidator) Update(leaseGrantTS uint64, oldVer, currVer int64, cha s.do.Store().GetMemCache().Delete(tblIDs[idx]) } if ac == 1< Date: Mon, 26 Sep 2022 16:13:40 +0800 Subject: [PATCH 08/29] fix ut --- ddl/cluster_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ddl/cluster_test.go b/ddl/cluster_test.go index 1898c00ef67d3..66b5632352aa3 100644 --- a/ddl/cluster_test.go +++ b/ddl/cluster_test.go @@ -240,9 +240,9 @@ func TestCancelFlashbackCluster(t *testing.T) { defer resetGC() tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) - // Try canceled on StateWriteOnly, cancel success + // Try canceled on StateDeleteOnly, cancel success hook := newCancelJobHook(t, store, dom, func(job *model.Job) bool { - return job.SchemaState == model.StateWriteOnly + return job.SchemaState == model.StateDeleteOnly }) dom.DDL().SetHook(hook) tk.MustGetErrCode(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)), errno.ErrCancelledDDLJob) From 12fa7a0cd3a7e0ef0c19cdf01e47911b0ecf426e Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Mon, 26 Sep 2022 17:14:34 +0800 Subject: [PATCH 09/29] fix test --- executor/recover_test.go | 25 +------------------------ 1 file changed, 1 insertion(+), 24 deletions(-) diff --git a/executor/recover_test.go b/executor/recover_test.go index f5328eb6b1527..51bc75601c197 100644 --- a/executor/recover_test.go +++ b/executor/recover_test.go @@ -333,36 +333,13 @@ func TestRecoverClusterMeetError(t *testing.T) { // Flashback failed because of ddl history. tk.MustExec("use test;") tk.MustExec("create table t(a int);") - tk.MustContainErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs), "schema version not same, have done ddl during [flashbackTS, now)") + tk.MustMatchErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs), "Had ddl history during \\[.*, now\\), can't do flashback") require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) } -func TestRecoverClusterWithTiFlash(t *testing.T) { - store := testkit.CreateMockStore(t, withMockTiFlash(1)) - tk := testkit.NewTestKit(t, store) - - injectSafeTS := oracle.GoTimeToTS(time.Now().Add(-10 * time.Second)) - require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", - fmt.Sprintf("return(%v)", injectSafeTS))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", - fmt.Sprintf("return(%v)", injectSafeTS))) - - timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) - defer resetGC() - - // Set GC safe point - tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) - - tk.MustContainErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", time.Now().Add(0-30*time.Second)), - "not support flash back cluster with TiFlash stores") - - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) - require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) -} - func TestFlashbackWithSafeTs(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) From 521a80b1f8c2fb0b2935cebca78595157888c60a Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Tue, 27 Sep 2022 11:10:39 +0800 Subject: [PATCH 10/29] support tiflash --- executor/ddl.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/executor/ddl.go b/executor/ddl.go index 1fd2b20eb70a1..6a2ff64ae0e37 100644 --- a/executor/ddl.go +++ b/executor/ddl.go @@ -529,14 +529,6 @@ func (e *DDLExec) executeFlashBackCluster(ctx context.Context, s *ast.FlashBackC return core.ErrSpecificAccessDenied.GenWithStackByArgs("SUPER") } - tiFlashInfo, err := getTiFlashStores(e.ctx) - if err != nil { - return err - } - if len(tiFlashInfo) != 0 { - return errors.Errorf("not support flash back cluster with TiFlash stores") - } - flashbackTS, err := staleread.CalculateAsOfTsExpr(e.ctx, s.FlashbackTS) if err != nil { return err From b15b8c3b5cd5621329c1357be2571296bdcd4cb3 Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Tue, 27 Sep 2022 11:43:40 +0800 Subject: [PATCH 11/29] delete useless param --- executor/ddl.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executor/ddl.go b/executor/ddl.go index 6a2ff64ae0e37..ccd16649be758 100644 --- a/executor/ddl.go +++ b/executor/ddl.go @@ -172,7 +172,7 @@ func (e *DDLExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { case *ast.FlashBackTableStmt: err = e.executeFlashbackTable(x) case *ast.FlashBackClusterStmt: - err = e.executeFlashBackCluster(ctx, x) + err = e.executeFlashBackCluster(x) case *ast.RenameTableStmt: err = e.executeRenameTable(x) case *ast.TruncateTableStmt: @@ -523,7 +523,7 @@ func (e *DDLExec) getRecoverTableByTableName(tableName *ast.TableName) (*model.J return jobInfo, tableInfo, nil } -func (e *DDLExec) executeFlashBackCluster(ctx context.Context, s *ast.FlashBackClusterStmt) error { +func (e *DDLExec) executeFlashBackCluster(s *ast.FlashBackClusterStmt) error { checker := privilege.GetPrivilegeManager(e.ctx) if !checker.RequestVerification(e.ctx.GetSessionVars().ActiveRoles, "", "", "", mysql.SuperPriv) { return core.ErrSpecificAccessDenied.GenWithStackByArgs("SUPER") From 135e67c03ef4b1ba8f898e98024f18acabc12bf0 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Wed, 28 Sep 2022 14:26:34 +0800 Subject: [PATCH 12/29] update kvproto and client-go --- ddl/cluster.go | 95 +++++++++++++++++++++++++++++++++++++++++++++++++- go.mod | 4 +-- go.sum | 8 ++--- 3 files changed, 100 insertions(+), 7 deletions(-) diff --git a/ddl/cluster.go b/ddl/cluster.go index 940ff7b092922..fb9f11ff7d8f2 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -282,6 +282,93 @@ func GetFlashbackKeyRanges(sess sessionctx.Context, startKey kv.Key) ([]kv.KeyRa return keyRanges, nil } +func sendPrepareFlashbackToVersionRPC( + ctx context.Context, + s tikv.Storage, + r tikvstore.KeyRange, +) (rangetask.TaskStat, error) { + startKey, rangeEndKey := r.StartKey, r.EndKey + var taskStat rangetask.TaskStat + bo := tikv.NewBackoffer(ctx, flashbackMaxBackoff) + for { + select { + case <-ctx.Done(): + return taskStat, errors.WithStack(ctx.Err()) + default: + } + + if len(rangeEndKey) > 0 && bytes.Compare(startKey, rangeEndKey) >= 0 { + break + } + + loc, err := s.GetRegionCache().LocateKey(bo, startKey) + if err != nil { + return taskStat, err + } + + endKey := loc.EndKey + isLast := len(endKey) == 0 || (len(rangeEndKey) > 0 && bytes.Compare(endKey, rangeEndKey) >= 0) + // If it is the last region + if isLast { + endKey = rangeEndKey + } + + req := tikvrpc.NewRequest(tikvrpc.CmdPrepareFlashbackToVersion, &kvrpcpb.PrepareFlashbackToVersionRequest{ + StartKey: startKey, + EndKey: endKey, + }) + + resp, err := s.SendReq(bo, req, loc.Region, flashbackTimeout) + if err != nil { + return taskStat, err + } + regionErr, err := resp.GetRegionError() + if err != nil { + return taskStat, err + } + if regionErr != nil { + err = bo.Backoff(tikv.BoRegionMiss(), errors.New(regionErr.String())) + if err != nil { + return taskStat, err + } + continue + } + if resp.Resp == nil { + return taskStat, errors.Errorf("prepare flashback missing resp body") + } + prepareFlashbackToVersionResp := resp.Resp.(*kvrpcpb.PrepareFlashbackToVersionResponse) + if err := prepareFlashbackToVersionResp.GetError(); err != "" { + return taskStat, errors.Errorf(err) + } + taskStat.CompletedRegions++ + if isLast { + break + } + bo = tikv.NewBackoffer(ctx, flashbackMaxBackoff) + startKey = endKey + } + return taskStat, nil +} + +func prepareFlashbackToVersion( + ctx context.Context, + d *ddlCtx, + startKey []byte, endKey []byte, +) (err error) { + return rangetask.NewRangeTaskRunner( + "flashback-to-version-runner", + d.store.(tikv.Storage), + int(variable.GetDDLFlashbackConcurrency()), + func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { + stats, err := sendPrepareFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), r) + logutil.BgLogger().Info("prepare flashback to version stats", + zap.Int("complete region", stats.CompletedRegions), + zap.Error(err)) + return stats, err + }, + ).RunOnRange(ctx, startKey, endKey) +} + func sendFlashbackToVersionRPC( ctx context.Context, s tikv.Storage, @@ -466,10 +553,16 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve return ver, nil // Stage 3, get key ranges and get locks. case model.StateWriteOnly: - _, err := GetFlashbackKeyRanges(sess, tablecodec.EncodeTablePrefix(0)) + keyRanges, err := GetFlashbackKeyRanges(sess, tablecodec.EncodeTablePrefix(0)) if err != nil { return ver, errors.Trace(err) } + for _, ranges := range keyRanges { + if err = prepareFlashbackToVersion(d.ctx, d, ranges.StartKey, ranges.EndKey); err != nil { + logutil.BgLogger().Warn("[ddl] Get error when do flashback", zap.Error(err)) + return ver, err + } + } // TODO, lock all regions. job.SchemaState = model.StateWriteReorganization return updateSchemaVersion(d, t, job) diff --git a/go.mod b/go.mod index fea930c747662..f79a093db42fd 100644 --- a/go.mod +++ b/go.mod @@ -66,7 +66,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3 github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 - github.com/pingcap/kvproto v0.0.0-20220913050750-f6d05706948a + github.com/pingcap/kvproto v0.0.0-20220929075948-06e08d5ed64c github.com/pingcap/log v1.1.0 github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e @@ -84,7 +84,7 @@ require ( github.com/stretchr/testify v1.8.0 github.com/tdakkota/asciicheck v0.1.1 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.1-0.20220923061703-33efe476e022 + github.com/tikv/client-go/v2 v2.0.1-0.20220929111318-51f3bd3944c2 github.com/tikv/pd/client v0.0.0-20220725055910-7187a7ab72db github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 github.com/twmb/murmur3 v1.1.3 diff --git a/go.sum b/go.sum index 04713ecdd16ee..a31472ac5c3d9 100644 --- a/go.sum +++ b/go.sum @@ -752,8 +752,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20220510035547-0e2f26c0a46a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= -github.com/pingcap/kvproto v0.0.0-20220913050750-f6d05706948a h1:LCtkOPEzjWk86NclzxdZ42nNtbhuIN1p6cpd/FYUqkU= -github.com/pingcap/kvproto v0.0.0-20220913050750-f6d05706948a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +github.com/pingcap/kvproto v0.0.0-20220929075948-06e08d5ed64c h1:ceg4xjEEXNgPsScTQ5dtidiltLF4h17Y/jUqfyLAy9E= +github.com/pingcap/kvproto v0.0.0-20220929075948-06e08d5ed64c/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= @@ -901,8 +901,8 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= -github.com/tikv/client-go/v2 v2.0.1-0.20220923061703-33efe476e022 h1:TxDSQAmtGdE34BvOaYF35mRrAXePeZEq8quvuAwrKsI= -github.com/tikv/client-go/v2 v2.0.1-0.20220923061703-33efe476e022/go.mod h1:6pedLz7wiINLHXwCT1+yMZmzuG42+ubtBkkfcwoukIo= +github.com/tikv/client-go/v2 v2.0.1-0.20220929111318-51f3bd3944c2 h1:A05tkatkgjqcTq94YteKkXFNPIELaGwAwjEzXsx9t6g= +github.com/tikv/client-go/v2 v2.0.1-0.20220929111318-51f3bd3944c2/go.mod h1:gdXot2ofS2EOGtrXQ2qyESonQX/gFmgtfBCqCOSWg9E= github.com/tikv/pd/client v0.0.0-20220725055910-7187a7ab72db h1:r1eMh9Rny3hfWuBuxOnbsCRrR4FhthiNxLQ5rAUtaww= github.com/tikv/pd/client v0.0.0-20220725055910-7187a7ab72db/go.mod h1:ew8kS0yIcEaSetuuywkTLIUBR+sz3J5XvAYRae11qwc= github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro= From 135fe2226542e0a9f5317d87d1ba3ec8e23b175a Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Sat, 8 Oct 2022 13:17:11 +0800 Subject: [PATCH 13/29] update DEPS --- DEPS.bzl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index bf51d87eb3f25..4cf18a1a498bd 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -2818,8 +2818,8 @@ def go_deps(): name = "com_github_pingcap_kvproto", build_file_proto_mode = "disable_global", importpath = "github.com/pingcap/kvproto", - sum = "h1:LCtkOPEzjWk86NclzxdZ42nNtbhuIN1p6cpd/FYUqkU=", - version = "v0.0.0-20220913050750-f6d05706948a", + sum = "h1:ceg4xjEEXNgPsScTQ5dtidiltLF4h17Y/jUqfyLAy9E=", + version = "v0.0.0-20220929075948-06e08d5ed64c", ) go_repository( name = "com_github_pingcap_log", @@ -3422,8 +3422,8 @@ def go_deps(): name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sum = "h1:GJNu6XDT8W2Oahh+w/fhb37PNhFr4EZjdehIklZnhU4=", - version = "v2.0.1-0.20220921101651-ce9203ef66e9", + sum = "h1:A05tkatkgjqcTq94YteKkXFNPIELaGwAwjEzXsx9t6g=", + version = "v2.0.1-0.20220929111318-51f3bd3944c2", ) go_repository( name = "com_github_tikv_pd_client", From d88ec2a9f326fe36bcd854f47f0afa21ead288ab Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Sat, 8 Oct 2022 13:28:59 +0800 Subject: [PATCH 14/29] fix test --- ddl/cluster.go | 5 +++++ ddl/cluster_test.go | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/ddl/cluster.go b/ddl/cluster.go index e234eabe54f41..54888d3c684e2 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -524,6 +524,11 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve return ver, nil // Stage 3, get key ranges and get locks. case model.StateWriteOnly: + // TODO: Support flashback in unistore. + if inFlashbackTest { + job.SchemaState = model.StateWriteReorganization + return updateSchemaVersion(d, t, job) + } keyRanges, err := GetFlashbackKeyRanges(sess, tablecodec.EncodeTablePrefix(0)) if err != nil { return ver, errors.Trace(err) diff --git a/ddl/cluster_test.go b/ddl/cluster_test.go index 32123d84ac6cb..965cda0fa8dfc 100644 --- a/ddl/cluster_test.go +++ b/ddl/cluster_test.go @@ -185,7 +185,7 @@ func TestAddDDLDuringFlashback(t *testing.T) { hook := &ddl.TestDDLCallback{Do: dom} hook.OnJobRunBeforeExported = func(job *model.Job) { assert.Equal(t, model.ActionFlashbackCluster, job.Type) - if job.SchemaState == model.StateWriteReorganization { + if job.SchemaState == model.StateWriteOnly { _, err := tk.Exec("alter table t add column b int") assert.ErrorContains(t, err, "Can't add ddl job, have flashback cluster job") } From 6b19aaeb373e33780e1931e9d748af58899d8ee0 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Sun, 9 Oct 2022 17:01:38 +0800 Subject: [PATCH 15/29] fix auto analyze --- ddl/cluster.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/ddl/cluster.go b/ddl/cluster.go index 54888d3c684e2..84ce870130a82 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -512,6 +512,18 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve return ver, errors.Trace(err) } job.Args[gcEnabledArgsOffset] = &gcEnableValue + autoAnalyzeValue, err = getTiDBEnableAutoAnalyze(sess) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + job.Args[autoAnalyzeOffset] = &autoAnalyzeValue + maxAutoAnalyzeTimeValue, err = getTiDBMaxAutoAnalyzeTime(sess) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + job.Args[maxAutoAnalyzeTimeOffset] = &maxAutoAnalyzeTimeValue job.SchemaState = model.StateDeleteOnly return ver, nil // Stage 2, check flashbackTS, close GC and PD schedule. From b790fc5c3f6c8c423c889994bf3b0f4185a5cb68 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Sun, 9 Oct 2022 18:14:04 +0800 Subject: [PATCH 16/29] add logs for working stats --- ddl/cluster.go | 50 +++++++++++++++++++++++++++++--------------------- ddl/ddl_api.go | 3 ++- 2 files changed, 31 insertions(+), 22 deletions(-) diff --git a/ddl/cluster.go b/ddl/cluster.go index 84ce870130a82..5b1b308cf1b2a 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -42,6 +42,7 @@ import ( "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/txnkv/rangetask" + "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/exp/slices" ) @@ -62,6 +63,7 @@ const ( gcEnabledArgsOffset = 2 autoAnalyzeOffset = 3 maxAutoAnalyzeTimeOffset = 4 + totalLockedRegionsOffset = 5 ) func closePDSchedule() error { @@ -353,19 +355,14 @@ func sendPrepareFlashbackToVersionRPC( func prepareFlashbackToVersion( ctx context.Context, d *ddlCtx, + handler rangetask.TaskHandler, startKey []byte, endKey []byte, ) (err error) { return rangetask.NewRangeTaskRunner( "flashback-to-version-runner", d.store.(tikv.Storage), int(variable.GetDDLFlashbackConcurrency()), - func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { - stats, err := sendPrepareFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), r) - logutil.BgLogger().Info("prepare flashback to version stats", - zap.Int("complete region", stats.CompletedRegions), - zap.Error(err)) - return stats, err - }, + handler, ).RunOnRange(ctx, startKey, endKey) } @@ -447,21 +444,14 @@ func sendFlashbackToVersionRPC( func flashbackToVersion( ctx context.Context, d *ddlCtx, - version uint64, - startTS, commitTS uint64, + handler rangetask.TaskHandler, startKey []byte, endKey []byte, ) (err error) { return rangetask.NewRangeTaskRunner( "flashback-to-version-runner", d.store.(tikv.Storage), int(variable.GetDDLFlashbackConcurrency()), - func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { - stats, err := sendFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), version, startTS, commitTS, r) - logutil.BgLogger().Info("flashback cluster stats", - zap.Int("complete region", stats.CompletedRegions), - zap.Error(err)) - return stats, err - }, + handler, ).RunOnRange(ctx, startKey, endKey) } @@ -483,15 +473,18 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve return ver, errors.Errorf("Not support flashback cluster in non-TiKV env") } - var flashbackTS uint64 + var flashbackTS, lockedRegions uint64 var pdScheduleValue map[string]interface{} var autoAnalyzeValue, maxAutoAnalyzeTimeValue string var gcEnabledValue bool - if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabledValue, &autoAnalyzeValue, &maxAutoAnalyzeTimeValue); err != nil { + if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabledValue, &autoAnalyzeValue, &maxAutoAnalyzeTimeValue, &lockedRegions); err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } + var totalRegions, completedRegions atomic.Uint64 + totalRegions.Store(lockedRegions) + sess, err := w.sessPool.get() if err != nil { job.State = model.JobStateCancelled @@ -541,17 +534,23 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve job.SchemaState = model.StateWriteReorganization return updateSchemaVersion(d, t, job) } + totalRegions.Store(0) keyRanges, err := GetFlashbackKeyRanges(sess, tablecodec.EncodeTablePrefix(0)) if err != nil { return ver, errors.Trace(err) } for _, ranges := range keyRanges { - if err = prepareFlashbackToVersion(d.ctx, d, ranges.StartKey, ranges.EndKey); err != nil { + if err = prepareFlashbackToVersion(d.ctx, d, + func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { + stats, err := sendPrepareFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), r) + totalRegions.Add(uint64(stats.CompletedRegions)) + return stats, err + }, ranges.StartKey, ranges.EndKey); err != nil { logutil.BgLogger().Warn("[ddl] Get error when do flashback", zap.Error(err)) return ver, err } } - // TODO, lock all regions. + job.Args[totalLockedRegionsOffset] = totalRegions.Load() job.SchemaState = model.StateWriteReorganization return updateSchemaVersion(d, t, job) // Stage 4, get key ranges and send flashback RPC. @@ -573,7 +572,16 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve return ver, errors.Trace(err) } for _, ranges := range keyRanges { - if err = flashbackToVersion(d.ctx, d, flashbackTS, t.StartTS, commitTS, ranges.StartKey, ranges.EndKey); err != nil { + if err = flashbackToVersion(d.ctx, d, + func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { + stats, err := sendFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), flashbackTS, t.StartTS, commitTS, r) + completedRegions.Add(uint64(stats.CompletedRegions)) + logutil.BgLogger().Info("flashback cluster stats", + zap.Uint64("complete regions", completedRegions.Load()), + zap.Uint64("total regions", totalRegions.Load()), + zap.Error(err)) + return stats, err + }, ranges.StartKey, ranges.EndKey); err != nil { logutil.BgLogger().Warn("[ddl] Get error when do flashback", zap.Error(err)) return ver, err } diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 42dc56cd1909c..be543dd2bcc47 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -2658,7 +2658,8 @@ func (d *ddl) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error map[string]interface{}{}, true, /* tidb_gc_enable */ variable.On, /* tidb_enable_auto_analyze */ - "1" /* tidb_max_auto_analyze_time */}, + "1", /* tidb_max_auto_analyze_time */ + 0 /* totalRegions */}, } err := d.DoDDLJob(ctx, job) err = d.callHookOnChanged(job, err) From 610f63188a14f78e5f23703f653879f475a99ac8 Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Wed, 12 Oct 2022 14:03:18 +0800 Subject: [PATCH 17/29] update --- ddl/cluster.go | 19 ++++++++++++------- ddl/ddl_api.go | 3 ++- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/ddl/cluster.go b/ddl/cluster.go index 5b1b308cf1b2a..db83593388018 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -64,6 +64,7 @@ const ( autoAnalyzeOffset = 3 maxAutoAnalyzeTimeOffset = 4 totalLockedRegionsOffset = 5 + commitTSOffset = 6 ) func closePDSchedule() error { @@ -473,11 +474,11 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve return ver, errors.Errorf("Not support flashback cluster in non-TiKV env") } - var flashbackTS, lockedRegions uint64 + var flashbackTS, lockedRegions, commitTS uint64 var pdScheduleValue map[string]interface{} var autoAnalyzeValue, maxAutoAnalyzeTimeValue string var gcEnabledValue bool - if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabledValue, &autoAnalyzeValue, &maxAutoAnalyzeTimeValue, &lockedRegions); err != nil { + if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabledValue, &autoAnalyzeValue, &maxAutoAnalyzeTimeValue, &lockedRegions, &commitTS); err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } @@ -551,6 +552,13 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve } } job.Args[totalLockedRegionsOffset] = totalRegions.Load() + + // We should get commitTS here to avoid lost commitTS when TiDB crashed during send flashback RPC. + commitTS, err = d.store.GetOracle().GetTimestamp(d.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + if err != nil { + return ver, errors.Trace(err) + } + job.Args[commitTSOffset] = commitTS job.SchemaState = model.StateWriteReorganization return updateSchemaVersion(d, t, job) // Stage 4, get key ranges and send flashback RPC. @@ -567,14 +575,11 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve return ver, errors.Trace(err) } - commitTS, err := d.store.GetOracle().GetTimestamp(d.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) - if err != nil { - return ver, errors.Trace(err) - } for _, ranges := range keyRanges { if err = flashbackToVersion(d.ctx, d, func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { - stats, err := sendFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), flashbackTS, t.StartTS, commitTS, r) + // use commitTS - 1 as startTS, make sure it less than commitTS. + stats, err := sendFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), flashbackTS, commitTS-1, commitTS, r) completedRegions.Add(uint64(stats.CompletedRegions)) logutil.BgLogger().Info("flashback cluster stats", zap.Uint64("complete regions", completedRegions.Load()), diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index be543dd2bcc47..67eb830572382 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -2659,7 +2659,8 @@ func (d *ddl) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error true, /* tidb_gc_enable */ variable.On, /* tidb_enable_auto_analyze */ "1", /* tidb_max_auto_analyze_time */ - 0 /* totalRegions */}, + 0, /* totalRegions */ + 0 /* newCommitTS */}, } err := d.DoDDLJob(ctx, job) err = d.callHookOnChanged(job, err) From 5e9db08577ba04f54f0eed1e947244ab7a19d401 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Wed, 12 Oct 2022 15:52:33 +0800 Subject: [PATCH 18/29] improve code --- ddl/cluster.go | 33 +++++++++++++++------------------ ddl/cluster_test.go | 34 +++------------------------------- 2 files changed, 18 insertions(+), 49 deletions(-) diff --git a/ddl/cluster.go b/ddl/cluster.go index db83593388018..0645f1243fef7 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -221,7 +221,7 @@ func addToSlice(schema string, tableName string, tableID int64, flashbackIDs []f // GetFlashbackKeyRanges make keyRanges efficiently for flashback cluster when many tables in cluster, // The time complexity is O(nlogn). -func GetFlashbackKeyRanges(sess sessionctx.Context, startKey kv.Key) ([]kv.KeyRange, error) { +func GetFlashbackKeyRanges(sess sessionctx.Context) ([]kv.KeyRange, error) { schemas := sess.GetDomainInfoSchema().(infoschema.InfoSchema).AllSchemas() // The semantic of keyRanges(output). @@ -268,20 +268,6 @@ func GetFlashbackKeyRanges(sess sessionctx.Context, startKey kv.Key) ([]kv.KeyRa }) } - for i, ranges := range keyRanges { - // startKey smaller than ranges.StartKey, ranges begin with [ranges.StartKey, ranges.EndKey) - if ranges.StartKey.Cmp(startKey) > 0 { - keyRanges = keyRanges[i:] - break - } - // startKey in [ranges.StartKey, ranges.EndKey), ranges begin with [startKey, ranges.EndKey) - if ranges.StartKey.Cmp(startKey) <= 0 && ranges.EndKey.Cmp(startKey) > 0 { - keyRanges = keyRanges[i:] - keyRanges[0].StartKey = startKey - break - } - } - return keyRanges, nil } @@ -456,6 +442,15 @@ func flashbackToVersion( ).RunOnRange(ctx, startKey, endKey) } +func splitTablesRegions(d *ddlCtx, keyRanges []kv.KeyRange) { + if s, ok := d.store.(kv.SplittableStore); ok { + for _, keys := range keyRanges { + splitRecordRegion(d.ctx, s, tablecodec.DecodeTableID(keys.StartKey), false) + splitRecordRegion(d.ctx, s, tablecodec.DecodeTableID(keys.EndKey), false) + } + } +} + // A Flashback has 4 different stages. // 1. before lock flashbackClusterJobID, check clusterJobID and lock it. // 2. before flashback start, check timestamp, disable GC and close PD schedule. @@ -535,11 +530,13 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve job.SchemaState = model.StateWriteReorganization return updateSchemaVersion(d, t, job) } - totalRegions.Store(0) - keyRanges, err := GetFlashbackKeyRanges(sess, tablecodec.EncodeTablePrefix(0)) + keyRanges, err := GetFlashbackKeyRanges(sess) if err != nil { return ver, errors.Trace(err) } + // Split region by keyRanges, make sure no unrelated key ranges be locked. + splitTablesRegions(d, keyRanges) + totalRegions.Store(0) for _, ranges := range keyRanges { if err = prepareFlashbackToVersion(d.ctx, d, func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { @@ -570,7 +567,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve job.SchemaState = model.StatePublic return ver, nil } - keyRanges, err := GetFlashbackKeyRanges(sess, tablecodec.EncodeTablePrefix(0)) + keyRanges, err := GetFlashbackKeyRanges(sess) if err != nil { return ver, errors.Trace(err) } diff --git a/ddl/cluster_test.go b/ddl/cluster_test.go index 965cda0fa8dfc..4ee5951125038 100644 --- a/ddl/cluster_test.go +++ b/ddl/cluster_test.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/util/dbterror" "github.com/stretchr/testify/assert" @@ -41,7 +40,7 @@ func TestGetFlashbackKeyRanges(t *testing.T) { se, err := session.CreateSession4Test(store) require.NoError(t, err) - kvRanges, err := ddl.GetFlashbackKeyRanges(se, tablecodec.EncodeTablePrefix(0)) + kvRanges, err := ddl.GetFlashbackKeyRanges(se) require.NoError(t, err) // The results are 6 key ranges // 0: (stats_meta,stats_histograms,stats_buckets) @@ -51,26 +50,6 @@ func TestGetFlashbackKeyRanges(t *testing.T) { // 4: (stats_fm_sketch) // 5: (stats_history, stats_meta_history) require.Len(t, kvRanges, 6) - // tableID for mysql.stats_meta is 20 - require.Equal(t, kvRanges[0].StartKey, tablecodec.EncodeTablePrefix(20)) - // tableID for mysql.stats_feedback is 30 - require.Equal(t, kvRanges[1].StartKey, tablecodec.EncodeTablePrefix(30)) - // tableID for mysql.stats_meta_history is 62 - require.Equal(t, kvRanges[5].EndKey, tablecodec.EncodeTablePrefix(62+1)) - - // The original table ID for range is [60, 63) - // startKey is 61, so return [61, 63) - kvRanges, err = ddl.GetFlashbackKeyRanges(se, tablecodec.EncodeTablePrefix(61)) - require.NoError(t, err) - require.Len(t, kvRanges, 1) - require.Equal(t, kvRanges[0].StartKey, tablecodec.EncodeTablePrefix(61)) - - // The original ranges are [48, 49), [60, 63) - // startKey is 59, so return [60, 63) - kvRanges, err = ddl.GetFlashbackKeyRanges(se, tablecodec.EncodeTablePrefix(59)) - require.NoError(t, err) - require.Len(t, kvRanges, 1) - require.Equal(t, kvRanges[0].StartKey, tablecodec.EncodeTablePrefix(60)) tk.MustExec("use test") tk.MustExec("CREATE TABLE employees (" + @@ -82,13 +61,6 @@ func TestGetFlashbackKeyRanges(t *testing.T) { " PARTITION p2 VALUES LESS THAN (16)," + " PARTITION p3 VALUES LESS THAN (21)" + ");") - kvRanges, err = ddl.GetFlashbackKeyRanges(se, tablecodec.EncodeTablePrefix(63)) - require.NoError(t, err) - // start from table ID is 63, so only 1 kv range. - require.Len(t, kvRanges, 1) - // 1 tableID and 4 partitions. - require.Equal(t, tablecodec.DecodeTableID(kvRanges[0].EndKey)-tablecodec.DecodeTableID(kvRanges[0].StartKey), int64(5)) - tk.MustExec("truncate table mysql.analyze_jobs") // truncate all `stats_` tables, make table ID consecutive. @@ -101,12 +73,12 @@ func TestGetFlashbackKeyRanges(t *testing.T) { tk.MustExec("truncate table mysql.stats_fm_sketch") tk.MustExec("truncate table mysql.stats_history") tk.MustExec("truncate table mysql.stats_meta_history") - kvRanges, err = ddl.GetFlashbackKeyRanges(se, tablecodec.EncodeTablePrefix(0)) + kvRanges, err = ddl.GetFlashbackKeyRanges(se) require.NoError(t, err) require.Len(t, kvRanges, 2) tk.MustExec("truncate table test.employees") - kvRanges, err = ddl.GetFlashbackKeyRanges(se, tablecodec.EncodeTablePrefix(0)) + kvRanges, err = ddl.GetFlashbackKeyRanges(se) require.NoError(t, err) require.Len(t, kvRanges, 1) } From 6c21fd2f2f3f2076d71f47e59c59a9f89fb084da Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Tue, 18 Oct 2022 14:12:16 +0800 Subject: [PATCH 19/29] delete max auto analyze time --- ddl/cluster.go | 22 ++++++---------------- ddl/ddl_api.go | 1 - 2 files changed, 6 insertions(+), 17 deletions(-) diff --git a/ddl/cluster.go b/ddl/cluster.go index 0645f1243fef7..44d4fccf147f5 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -62,9 +62,8 @@ const ( pdScheduleArgsOffset = 1 gcEnabledArgsOffset = 2 autoAnalyzeOffset = 3 - maxAutoAnalyzeTimeOffset = 4 - totalLockedRegionsOffset = 5 - commitTSOffset = 6 + totalLockedRegionsOffset = 4 + commitTSOffset = 5 ) func closePDSchedule() error { @@ -471,9 +470,9 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve var flashbackTS, lockedRegions, commitTS uint64 var pdScheduleValue map[string]interface{} - var autoAnalyzeValue, maxAutoAnalyzeTimeValue string + var autoAnalyzeValue string var gcEnabledValue bool - if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabledValue, &autoAnalyzeValue, &maxAutoAnalyzeTimeValue, &lockedRegions, &commitTS); err != nil { + if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabledValue, &autoAnalyzeValue, &lockedRegions, &commitTS); err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } @@ -507,12 +506,6 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve return ver, errors.Trace(err) } job.Args[autoAnalyzeOffset] = &autoAnalyzeValue - maxAutoAnalyzeTimeValue, err = getTiDBMaxAutoAnalyzeTime(sess) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - job.Args[maxAutoAnalyzeTimeOffset] = &maxAutoAnalyzeTimeValue job.SchemaState = model.StateDeleteOnly return ver, nil // Stage 2, check flashbackTS, close GC and PD schedule. @@ -605,10 +598,10 @@ func finishFlashbackCluster(w *worker, job *model.Job) error { var flashbackTS uint64 var pdScheduleValue map[string]interface{} - var autoAnalyzeValue, maxAutoAnalyzeTime string + var autoAnalyzeValue string var gcEnabled bool - if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabled, &autoAnalyzeValue, &maxAutoAnalyzeTime); err != nil { + if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabled, &autoAnalyzeValue); err != nil { return errors.Trace(err) } sess, err := w.sessPool.get() @@ -629,9 +622,6 @@ func finishFlashbackCluster(w *worker, job *model.Job) error { if err = setTiDBEnableAutoAnalyze(sess, autoAnalyzeValue); err != nil { return err } - if err = setTiDBMaxAutoAnalyzeTime(sess, maxAutoAnalyzeTime); err != nil { - return err - } if gcEnabled { if err = gcutil.EnableGC(sess); err != nil { return err diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 67eb830572382..451032a3baf9e 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -2658,7 +2658,6 @@ func (d *ddl) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error map[string]interface{}{}, true, /* tidb_gc_enable */ variable.On, /* tidb_enable_auto_analyze */ - "1", /* tidb_max_auto_analyze_time */ 0, /* totalRegions */ 0 /* newCommitTS */}, } From 9a4b4f1d397be5aa4b9a637f09ff1b68f1dd7695 Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Fri, 21 Oct 2022 10:03:27 +0800 Subject: [PATCH 20/29] update --- ddl/cluster.go | 109 ++++++++++++++++++++++++++----------------------- go.mod | 6 +-- go.sum | 11 ++--- 3 files changed, 67 insertions(+), 59 deletions(-) diff --git a/ddl/cluster.go b/ddl/cluster.go index 44d4fccf147f5..7f5b37f9eb972 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -17,6 +17,8 @@ package ddl import ( "bytes" "context" + "encoding/hex" + "fmt" "strings" "time" @@ -124,18 +126,6 @@ func setTiDBEnableAutoAnalyze(sess sessionctx.Context, value string) error { return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiDBEnableAutoAnalyze, value) } -func getTiDBMaxAutoAnalyzeTime(sess sessionctx.Context) (string, error) { - val, err := sess.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBMaxAutoAnalyzeTime) - if err != nil { - return "", errors.Trace(err) - } - return val, nil -} - -func setTiDBMaxAutoAnalyzeTime(sess sessionctx.Context, value string) error { - return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiDBMaxAutoAnalyzeTime, value) -} - func getTiDBEnableAutoAnalyze(sess sessionctx.Context) (string, error) { val, err := sess.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBEnableAutoAnalyze) if err != nil { @@ -158,9 +148,6 @@ func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta if err = setTiDBEnableAutoAnalyze(sess, variable.Off); err != nil { return err } - if err = setTiDBMaxAutoAnalyzeTime(sess, "1"); err != nil { - return err - } nowSchemaVersion, err := t.GetSchemaVersion() if err != nil { @@ -301,6 +288,9 @@ func sendPrepareFlashbackToVersionRPC( endKey = rangeEndKey } + logutil.BgLogger().Info("send prepare flashback request", zap.Uint64("region_id", loc.Region.GetID()), + zap.String("start_key", hex.EncodeToString(startKey)), zap.String("end_key", hex.EncodeToString(endKey))) + req := tikvrpc.NewRequest(tikvrpc.CmdPrepareFlashbackToVersion, &kvrpcpb.PrepareFlashbackToVersionRequest{ StartKey: startKey, EndKey: endKey, @@ -309,24 +299,21 @@ func sendPrepareFlashbackToVersionRPC( resp, err := s.SendReq(bo, req, loc.Region, flashbackTimeout) if err != nil { return taskStat, err - } - regionErr, err := resp.GetRegionError() - if err != nil { - return taskStat, err - } - if regionErr != nil { - err = bo.Backoff(tikv.BoRegionMiss(), errors.New(regionErr.String())) + } else { + regionErr, err := resp.GetRegionError() if err != nil { return taskStat, err } - continue - } - if resp.Resp == nil { - return taskStat, errors.Errorf("prepare flashback missing resp body") - } - prepareFlashbackToVersionResp := resp.Resp.(*kvrpcpb.PrepareFlashbackToVersionResponse) - if err := prepareFlashbackToVersionResp.GetError(); err != "" { - return taskStat, errors.Errorf(err) + if regionErr != nil { + return taskStat, errors.Errorf(regionErr.String()) + } + if resp.Resp == nil { + return taskStat, errors.Errorf("prepare flashback missing resp body") + } + prepareFlashbackToVersionResp := resp.Resp.(*kvrpcpb.PrepareFlashbackToVersionResponse) + if err := prepareFlashbackToVersionResp.GetError(); err != "" { + return taskStat, errors.Errorf(err) + } } taskStat.CompletedRegions++ if isLast { @@ -385,6 +372,9 @@ func sendFlashbackToVersionRPC( endKey = rangeEndKey } + logutil.BgLogger().Info("send flashback request", zap.Uint64("region_id", loc.Region.GetID()), + zap.String("start_key", hex.EncodeToString(startKey)), zap.String("end_key", hex.EncodeToString(endKey))) + req := tikvrpc.NewRequest(tikvrpc.CmdFlashbackToVersion, &kvrpcpb.FlashbackToVersionRequest{ Version: version, StartKey: startKey, @@ -395,27 +385,38 @@ func sendFlashbackToVersionRPC( resp, err := s.SendReq(bo, req, loc.Region, flashbackTimeout) if err != nil { - return taskStat, err - } - regionErr, err := resp.GetRegionError() - if err != nil { - return taskStat, err - } - if regionErr != nil { - err = bo.Backoff(tikv.BoRegionMiss(), errors.New(regionErr.String())) + logutil.BgLogger().Warn("send request meets error", zap.Uint64("region_id", loc.Region.GetID()), zap.Error(err)) + if err.Error() != fmt.Sprintf("region %d is not prepared for the flashback", loc.Region.GetID()) { + return taskStat, err + } + } else { + regionErr, err := resp.GetRegionError() if err != nil { return taskStat, err } - continue - } - if resp.Resp == nil { - logutil.BgLogger().Warn("flashback missing resp body") - continue - } - flashbackToVersionResp := resp.Resp.(*kvrpcpb.FlashbackToVersionResponse) - if err := flashbackToVersionResp.GetError(); err != "" { - logutil.BgLogger().Warn("flashback rpc meets error", zap.String("err", err)) - continue + if regionErr != nil { + err = bo.Backoff(tikv.BoRegionMiss(), errors.New(regionErr.String())) + if err != nil { + return taskStat, err + } + continue + } + if resp.Resp == nil { + logutil.BgLogger().Warn("flashback miss resp body", zap.Uint64("region_id", loc.Region.GetID())) + err = bo.Backoff(tikv.BoTiKVRPC(), errors.New("flashback rpc miss resp body")) + if err != nil { + return taskStat, err + } + continue + } + flashbackToVersionResp := resp.Resp.(*kvrpcpb.FlashbackToVersionResponse) + if respErr := flashbackToVersionResp.GetError(); respErr != "" { + boErr := bo.Backoff(tikv.BoTiKVRPC(), errors.New(respErr)) + if boErr != nil { + return taskStat, boErr + } + continue + } } taskStat.CompletedRegions++ if isLast { @@ -443,9 +444,15 @@ func flashbackToVersion( func splitTablesRegions(d *ddlCtx, keyRanges []kv.KeyRange) { if s, ok := d.store.(kv.SplittableStore); ok { + var tableID int64 for _, keys := range keyRanges { - splitRecordRegion(d.ctx, s, tablecodec.DecodeTableID(keys.StartKey), false) - splitRecordRegion(d.ctx, s, tablecodec.DecodeTableID(keys.EndKey), false) + for { + // tableID is useless when scatter == false + _, err := s.SplitRegions(d.ctx, [][]byte{keys.StartKey, keys.EndKey}, false, &tableID) + if err == nil { + break + } + } } } } @@ -578,7 +585,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve return stats, err }, ranges.StartKey, ranges.EndKey); err != nil { logutil.BgLogger().Warn("[ddl] Get error when do flashback", zap.Error(err)) - return ver, err + return ver, errors.Trace(err) } } diff --git a/go.mod b/go.mod index 8bb1bb8a92695..2a13df05cf60f 100644 --- a/go.mod +++ b/go.mod @@ -67,7 +67,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3 github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 - github.com/pingcap/kvproto v0.0.0-20220929075948-06e08d5ed64c + github.com/pingcap/kvproto v0.0.0-20221014081430-26e28e6a281a github.com/pingcap/log v1.1.0 github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e @@ -85,7 +85,7 @@ require ( github.com/stretchr/testify v1.8.0 github.com/tdakkota/asciicheck v0.1.1 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.1-0.20220929111318-51f3bd3944c2 + github.com/tikv/client-go/v2 v2.0.1-0.20221017092635-91be9c6ce6c0 github.com/tikv/pd/client v0.0.0-20220725055910-7187a7ab72db github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 github.com/twmb/murmur3 v1.1.3 @@ -203,7 +203,7 @@ require ( github.com/rivo/uniseg v0.3.4 // indirect github.com/rogpeppe/go-internal v1.6.1 // indirect github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 // indirect - github.com/shurcooL/vfsgen v0.0.0-20180711163814-62bca832be04 // indirect + github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd // indirect github.com/sirupsen/logrus v1.9.0 // indirect github.com/stathat/consistent v1.0.0 // indirect github.com/tklauser/go-sysconf v0.3.10 // indirect diff --git a/go.sum b/go.sum index c5284c8f4465f..63cf1603eedca 100644 --- a/go.sum +++ b/go.sum @@ -755,8 +755,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20220510035547-0e2f26c0a46a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= -github.com/pingcap/kvproto v0.0.0-20220929075948-06e08d5ed64c h1:ceg4xjEEXNgPsScTQ5dtidiltLF4h17Y/jUqfyLAy9E= -github.com/pingcap/kvproto v0.0.0-20220929075948-06e08d5ed64c/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +github.com/pingcap/kvproto v0.0.0-20221014081430-26e28e6a281a h1:McYxPhA8SHqfUtLfQHHN0fQl4dy93IkhlX4Pp2MKIFA= +github.com/pingcap/kvproto v0.0.0-20221014081430-26e28e6a281a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= @@ -851,8 +851,9 @@ github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJ github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0 h1:mj/nMDAwTBiaCqMEs4cYCqF7pO6Np7vhy1D1wcQGz+E= github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0/go.mod h1:919LwcH0M7/W4fcZ0/jy0qGght1GIhqyS/EgWGH2j5Q= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= -github.com/shurcooL/vfsgen v0.0.0-20180711163814-62bca832be04 h1:y0cMJ0qjii33BnD6tMGcF/+gHYsoKQ6tbwQpy233OII= github.com/shurcooL/vfsgen v0.0.0-20180711163814-62bca832be04/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw= +github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd h1:ug7PpSOB5RBPK1Kg6qskGBoP3Vnj/aNYFTznWvlkGo0= +github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= @@ -904,8 +905,8 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= -github.com/tikv/client-go/v2 v2.0.1-0.20220929111318-51f3bd3944c2 h1:A05tkatkgjqcTq94YteKkXFNPIELaGwAwjEzXsx9t6g= -github.com/tikv/client-go/v2 v2.0.1-0.20220929111318-51f3bd3944c2/go.mod h1:gdXot2ofS2EOGtrXQ2qyESonQX/gFmgtfBCqCOSWg9E= +github.com/tikv/client-go/v2 v2.0.1-0.20221017092635-91be9c6ce6c0 h1:5KLqhDGLc/mtemdS/odfOP717rn8ttsTj3jzZ8TZn9A= +github.com/tikv/client-go/v2 v2.0.1-0.20221017092635-91be9c6ce6c0/go.mod h1:9hmGJFrWdehClHg0lv2cYgzvCUEhwLZkH67/PHl75tg= github.com/tikv/pd/client v0.0.0-20220725055910-7187a7ab72db h1:r1eMh9Rny3hfWuBuxOnbsCRrR4FhthiNxLQ5rAUtaww= github.com/tikv/pd/client v0.0.0-20220725055910-7187a7ab72db/go.mod h1:ew8kS0yIcEaSetuuywkTLIUBR+sz3J5XvAYRae11qwc= github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro= From 3bd34e8ab14ea91b3f0f0f764e0f8a641fa4895a Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Mon, 24 Oct 2022 16:39:55 +0800 Subject: [PATCH 21/29] update DEPS --- DEPS.bzl | 12 ++++++------ ddl/cluster.go | 7 +------ executor/ddl.go | 2 +- server/server.go | 2 +- 4 files changed, 9 insertions(+), 14 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index bb71d9f2418b6..9c40f8949ba41 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -2818,8 +2818,8 @@ def go_deps(): name = "com_github_pingcap_kvproto", build_file_proto_mode = "disable_global", importpath = "github.com/pingcap/kvproto", - sum = "h1:ceg4xjEEXNgPsScTQ5dtidiltLF4h17Y/jUqfyLAy9E=", - version = "v0.0.0-20220929075948-06e08d5ed64c", + sum = "h1:McYxPhA8SHqfUtLfQHHN0fQl4dy93IkhlX4Pp2MKIFA=", + version = "v0.0.0-20221014081430-26e28e6a281a", ) go_repository( name = "com_github_pingcap_log", @@ -3205,8 +3205,8 @@ def go_deps(): name = "com_github_shurcool_vfsgen", build_file_proto_mode = "disable_global", importpath = "github.com/shurcooL/vfsgen", - sum = "h1:y0cMJ0qjii33BnD6tMGcF/+gHYsoKQ6tbwQpy233OII=", - version = "v0.0.0-20180711163814-62bca832be04", + sum = "h1:ug7PpSOB5RBPK1Kg6qskGBoP3Vnj/aNYFTznWvlkGo0=", + version = "v0.0.0-20181202132449-6a9ea43bcacd", ) go_repository( name = "com_github_sirupsen_logrus", @@ -3422,8 +3422,8 @@ def go_deps(): name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sum = "h1:/13jzD/AR7v3dCLweFQ2JG8bihh3HLVIci2tbOHHGW0=", - version = "v2.0.1-0.20221012074856-6def8d7b90c4", + sum = "h1:5KLqhDGLc/mtemdS/odfOP717rn8ttsTj3jzZ8TZn9A=", + version = "v2.0.1-0.20221017092635-91be9c6ce6c0", ) go_repository( name = "com_github_tikv_pd_client", diff --git a/ddl/cluster.go b/ddl/cluster.go index 7f5b37f9eb972..62b719130fd72 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -123,7 +123,7 @@ func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBack } func setTiDBEnableAutoAnalyze(sess sessionctx.Context, value string) error { - return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiDBEnableAutoAnalyze, value) + return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(context.Background(), variable.TiDBEnableAutoAnalyze, value) } func getTiDBEnableAutoAnalyze(sess sessionctx.Context) (string, error) { @@ -629,11 +629,6 @@ func finishFlashbackCluster(w *worker, job *model.Job) error { if err = setTiDBEnableAutoAnalyze(sess, autoAnalyzeValue); err != nil { return err } - if gcEnabled { - if err = gcutil.EnableGC(sess); err != nil { - return err - } - } return nil }) if err != nil { diff --git a/executor/ddl.go b/executor/ddl.go index b0939056bafe1..85918bb7d4f54 100644 --- a/executor/ddl.go +++ b/executor/ddl.go @@ -530,7 +530,7 @@ func (e *DDLExec) getRecoverTableByTableName(tableName *ast.TableName) (*model.J return jobInfo, tableInfo, nil } -func (e *DDLExec) executeFlashBackCluster(s *ast.FlashBackClusterStmt) error { +func (e *DDLExec) executeFlashBackCluster(s *ast.FlashBackToTimestampStmt) error { flashbackTS, err := staleread.CalculateAsOfTsExpr(e.ctx, s.FlashbackTS) if err != nil { return err diff --git a/server/server.go b/server/server.go index e2eb5557fa8e9..136f2f76b782a 100644 --- a/server/server.go +++ b/server/server.go @@ -897,7 +897,7 @@ func (s *Server) KillNonFlashbackClusterConn() { s.Kill(client.connectionID, false) continue } - _, ok = ddl.Statement.(*ast.FlashBackClusterStmt) + _, ok = ddl.Statement.(*ast.FlashBackToTimestampStmt) if !ok { s.Kill(client.connectionID, false) continue From 76d32648cb00ca11b887595f19b424c2e74675d7 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Tue, 25 Oct 2022 13:52:54 +0800 Subject: [PATCH 22/29] update lint --- ddl/cluster.go | 36 ++++++++++++++++-------------------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/ddl/cluster.go b/ddl/cluster.go index 62b719130fd72..c659d7f2b9c80 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -299,21 +299,20 @@ func sendPrepareFlashbackToVersionRPC( resp, err := s.SendReq(bo, req, loc.Region, flashbackTimeout) if err != nil { return taskStat, err - } else { - regionErr, err := resp.GetRegionError() - if err != nil { - return taskStat, err - } - if regionErr != nil { - return taskStat, errors.Errorf(regionErr.String()) - } - if resp.Resp == nil { - return taskStat, errors.Errorf("prepare flashback missing resp body") - } - prepareFlashbackToVersionResp := resp.Resp.(*kvrpcpb.PrepareFlashbackToVersionResponse) - if err := prepareFlashbackToVersionResp.GetError(); err != "" { - return taskStat, errors.Errorf(err) - } + } + regionErr, err := resp.GetRegionError() + if err != nil { + return taskStat, err + } + if regionErr != nil { + return taskStat, errors.Errorf(regionErr.String()) + } + if resp.Resp == nil { + return taskStat, errors.Errorf("prepare flashback missing resp body") + } + prepareFlashbackToVersionResp := resp.Resp.(*kvrpcpb.PrepareFlashbackToVersionResponse) + if err := prepareFlashbackToVersionResp.GetError(); err != "" { + return taskStat, errors.Errorf(err) } taskStat.CompletedRegions++ if isLast { @@ -372,7 +371,7 @@ func sendFlashbackToVersionRPC( endKey = rangeEndKey } - logutil.BgLogger().Info("send flashback request", zap.Uint64("region_id", loc.Region.GetID()), + logutil.BgLogger().Debug("send flashback request", zap.Uint64("region_id", loc.Region.GetID()), zap.String("start_key", hex.EncodeToString(startKey)), zap.String("end_key", hex.EncodeToString(endKey))) req := tikvrpc.NewRequest(tikvrpc.CmdFlashbackToVersion, &kvrpcpb.FlashbackToVersionRequest{ @@ -626,10 +625,7 @@ func finishFlashbackCluster(w *worker, job *model.Job) error { return err } } - if err = setTiDBEnableAutoAnalyze(sess, autoAnalyzeValue); err != nil { - return err - } - return nil + return setTiDBEnableAutoAnalyze(sess, autoAnalyzeValue) }) if err != nil { return err From bae87ce5d78796a8144467631b4ea2fca0fece15 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Tue, 25 Oct 2022 14:40:38 +0800 Subject: [PATCH 23/29] fix test --- ddl/cluster.go | 7 +++---- testkit/mocksessionmanager.go | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/ddl/cluster.go b/ddl/cluster.go index c659d7f2b9c80..d6742967960f7 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -441,13 +441,12 @@ func flashbackToVersion( ).RunOnRange(ctx, startKey, endKey) } -func splitTablesRegions(d *ddlCtx, keyRanges []kv.KeyRange) { +func splitRegionsByKeyRanges(d *ddlCtx, keyRanges []kv.KeyRange) { if s, ok := d.store.(kv.SplittableStore); ok { - var tableID int64 for _, keys := range keyRanges { for { // tableID is useless when scatter == false - _, err := s.SplitRegions(d.ctx, [][]byte{keys.StartKey, keys.EndKey}, false, &tableID) + _, err := s.SplitRegions(d.ctx, [][]byte{keys.StartKey, keys.EndKey}, false, nil) if err == nil { break } @@ -534,7 +533,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve return ver, errors.Trace(err) } // Split region by keyRanges, make sure no unrelated key ranges be locked. - splitTablesRegions(d, keyRanges) + splitRegionsByKeyRanges(d, keyRanges) totalRegions.Store(0) for _, ranges := range keyRanges { if err = prepareFlashbackToVersion(d.ctx, d, diff --git a/testkit/mocksessionmanager.go b/testkit/mocksessionmanager.go index 5b7a79177fd58..c22af798f7b9a 100644 --- a/testkit/mocksessionmanager.go +++ b/testkit/mocksessionmanager.go @@ -120,7 +120,7 @@ func (msm *MockSessionManager) KillNonFlashbackClusterConn() { msm.Kill(se.GetSessionVars().ConnectionID, false) continue } - _, ok = ddl.Statement.(*ast.FlashBackClusterStmt) + _, ok = ddl.Statement.(*ast.FlashBackToTimestampStmt) if !ok { msm.Kill(se.GetSessionVars().ConnectionID, false) continue From 4c361f1dfdb7d77d9025c4a28ce37469b65b7789 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Tue, 25 Oct 2022 15:22:01 +0800 Subject: [PATCH 24/29] fix test --- ddl/cluster_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/ddl/cluster_test.go b/ddl/cluster_test.go index 4ee5951125038..b32fcd709e924 100644 --- a/ddl/cluster_test.go +++ b/ddl/cluster_test.go @@ -202,9 +202,6 @@ func TestGlobalVariablesOnFlashback(t *testing.T) { rs, err = tk.Exec("show variables like 'tidb_enable_auto_analyze'") assert.NoError(t, err) assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off) - rs, err = tk.Exec("show variables like 'tidb_max_auto_analyze_time'") - assert.NoError(t, err) - assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], "1") } } dom.DDL().SetHook(hook) From a2879e0e75033059551f12c5ee405ea496d99b5f Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Tue, 25 Oct 2022 15:26:07 +0800 Subject: [PATCH 25/29] fix test --- util/logutil/hex_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/logutil/hex_test.go b/util/logutil/hex_test.go index 9351b0216cf52..fac76d0406ad9 100644 --- a/util/logutil/hex_test.go +++ b/util/logutil/hex_test.go @@ -32,7 +32,7 @@ func TestHex(t *testing.T) { region.StartKey = []byte{'t', 200, '\\', 000, 000, 000, '\\', 000, 000, 000, 37, '-', 000, 000, 000, 000, 000, 000, 000, 37} region.EndKey = []byte("3asg3asd") - expected := "{Id:6662 StartKey:74c85c0000005c000000252d0000000000000025 EndKey:3361736733617364 RegionEpoch: Peers:[] EncryptionMeta:}" + expected := "{Id:6662 StartKey:74c85c0000005c000000252d0000000000000025 EndKey:3361736733617364 RegionEpoch: Peers:[] EncryptionMeta: IsInFlashback:false}" require.Equal(t, expected, logutil.Hex(®ion).String()) } From baee75d000c2a267263bcfe4b97c0d4a57f6ea15 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Tue, 25 Oct 2022 16:21:50 +0800 Subject: [PATCH 26/29] save params --- ddl/cluster.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ddl/cluster.go b/ddl/cluster.go index d6742967960f7..5ab7a323d01c8 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -601,12 +601,12 @@ func finishFlashbackCluster(w *worker, job *model.Job) error { return nil } - var flashbackTS uint64 + var flashbackTS, lockedRegions, commitTS uint64 var pdScheduleValue map[string]interface{} var autoAnalyzeValue string var gcEnabled bool - if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabled, &autoAnalyzeValue); err != nil { + if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabled, &autoAnalyzeValue, &lockedRegions, &commitTS); err != nil { return errors.Trace(err) } sess, err := w.sessPool.get() From c36f7a933251605c8ae7ae0339adeb5812291476 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Wed, 26 Oct 2022 10:16:19 +0800 Subject: [PATCH 27/29] Update ddl/cluster.go Co-authored-by: bb7133 --- ddl/cluster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/cluster.go b/ddl/cluster.go index 5ab7a323d01c8..5c453016e1c59 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -166,7 +166,7 @@ func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta return errors.Trace(err) } if diff != nil && diff.Type != model.ActionFlashbackCluster { - return errors.Errorf("Had ddl history during [%s, now), can't do flashback", oracle.GetTimeFromTS(flashbackTS)) + return errors.Errorf("Detected schema change due to another DDL job during [%s, now), can't do flashback", oracle.GetTimeFromTS(flashbackTS)) } } From 06bdda7229d77a63e031aca043ef3d867b193b02 Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Wed, 26 Oct 2022 10:33:24 +0800 Subject: [PATCH 28/29] update test --- executor/recover_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/recover_test.go b/executor/recover_test.go index 1692e5eab6b9e..26ed1a765f58f 100644 --- a/executor/recover_test.go +++ b/executor/recover_test.go @@ -334,7 +334,7 @@ func TestRecoverClusterMeetError(t *testing.T) { // Flashback failed because of ddl history. tk.MustExec("use test;") tk.MustExec("create table t(a int);") - tk.MustMatchErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs), "Had ddl history during \\[.*, now\\), can't do flashback") + tk.MustMatchErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs), "Detected schema change due to another DDL job during \\[.*, now\\), can't do flashback") require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) From 45054eb069ceb0705a458aa7f5d2f3c3cd75d7b1 Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Wed, 26 Oct 2022 16:08:45 +0800 Subject: [PATCH 29/29] follow comments --- ddl/cluster.go | 36 +++++++++++------------------------- 1 file changed, 11 insertions(+), 25 deletions(-) diff --git a/ddl/cluster.go b/ddl/cluster.go index 5c453016e1c59..e88ea7b5f524a 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -283,12 +283,12 @@ func sendPrepareFlashbackToVersionRPC( endKey := loc.EndKey isLast := len(endKey) == 0 || (len(rangeEndKey) > 0 && bytes.Compare(endKey, rangeEndKey) >= 0) - // If it is the last region + // If it is the last region. if isLast { endKey = rangeEndKey } - logutil.BgLogger().Info("send prepare flashback request", zap.Uint64("region_id", loc.Region.GetID()), + logutil.BgLogger().Info("[ddl] send prepare flashback request", zap.Uint64("region_id", loc.Region.GetID()), zap.String("start_key", hex.EncodeToString(startKey)), zap.String("end_key", hex.EncodeToString(endKey))) req := tikvrpc.NewRequest(tikvrpc.CmdPrepareFlashbackToVersion, &kvrpcpb.PrepareFlashbackToVersionRequest{ @@ -324,20 +324,6 @@ func sendPrepareFlashbackToVersionRPC( return taskStat, nil } -func prepareFlashbackToVersion( - ctx context.Context, - d *ddlCtx, - handler rangetask.TaskHandler, - startKey []byte, endKey []byte, -) (err error) { - return rangetask.NewRangeTaskRunner( - "flashback-to-version-runner", - d.store.(tikv.Storage), - int(variable.GetDDLFlashbackConcurrency()), - handler, - ).RunOnRange(ctx, startKey, endKey) -} - func sendFlashbackToVersionRPC( ctx context.Context, s tikv.Storage, @@ -366,12 +352,12 @@ func sendFlashbackToVersionRPC( endKey := loc.EndKey isLast := len(endKey) == 0 || (len(rangeEndKey) > 0 && bytes.Compare(endKey, rangeEndKey) >= 0) - // If it is the last region + // If it is the last region. if isLast { endKey = rangeEndKey } - logutil.BgLogger().Debug("send flashback request", zap.Uint64("region_id", loc.Region.GetID()), + logutil.BgLogger().Info("[ddl] send flashback request", zap.Uint64("region_id", loc.Region.GetID()), zap.String("start_key", hex.EncodeToString(startKey)), zap.String("end_key", hex.EncodeToString(endKey))) req := tikvrpc.NewRequest(tikvrpc.CmdFlashbackToVersion, &kvrpcpb.FlashbackToVersionRequest{ @@ -535,13 +521,13 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve // Split region by keyRanges, make sure no unrelated key ranges be locked. splitRegionsByKeyRanges(d, keyRanges) totalRegions.Store(0) - for _, ranges := range keyRanges { - if err = prepareFlashbackToVersion(d.ctx, d, + for _, r := range keyRanges { + if err = flashbackToVersion(d.ctx, d, func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { stats, err := sendPrepareFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), r) totalRegions.Add(uint64(stats.CompletedRegions)) return stats, err - }, ranges.StartKey, ranges.EndKey); err != nil { + }, r.StartKey, r.EndKey); err != nil { logutil.BgLogger().Warn("[ddl] Get error when do flashback", zap.Error(err)) return ver, err } @@ -570,18 +556,18 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve return ver, errors.Trace(err) } - for _, ranges := range keyRanges { + for _, r := range keyRanges { if err = flashbackToVersion(d.ctx, d, func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { - // use commitTS - 1 as startTS, make sure it less than commitTS. + // Use commitTS - 1 as startTS, make sure it less than commitTS. stats, err := sendFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), flashbackTS, commitTS-1, commitTS, r) completedRegions.Add(uint64(stats.CompletedRegions)) - logutil.BgLogger().Info("flashback cluster stats", + logutil.BgLogger().Info("[ddl] flashback cluster stats", zap.Uint64("complete regions", completedRegions.Load()), zap.Uint64("total regions", totalRegions.Load()), zap.Error(err)) return stats, err - }, ranges.StartKey, ranges.EndKey); err != nil { + }, r.StartKey, r.EndKey); err != nil { logutil.BgLogger().Warn("[ddl] Get error when do flashback", zap.Error(err)) return ver, errors.Trace(err) }