Skip to content

Commit

Permalink
ttl: disable ttl job when recover/flashback table/database/cluster (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao authored Jan 3, 2023
1 parent 1bf230a commit be8caa6
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 7 deletions.
38 changes: 34 additions & 4 deletions ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ const (
totalLockedRegionsOffset
startTSOffset
commitTSOffset
ttlJobEnableOffSet
)

func closePDSchedule() error {
Expand Down Expand Up @@ -124,6 +125,18 @@ func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBack
return gcutil.ValidateSnapshotWithGCSafePoint(flashBackTS, gcSafePoint)
}

func getTiDBTTLJobEnable(sess sessionctx.Context) (string, error) {
val, err := sess.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBTTLJobEnable)
if err != nil {
return "", errors.Trace(err)
}
return val, nil
}

func setTiDBTTLJobEnable(ctx context.Context, sess sessionctx.Context, value string) error {
return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(ctx, variable.TiDBTTLJobEnable, value)
}

func setTiDBEnableAutoAnalyze(ctx context.Context, sess sessionctx.Context, value string) error {
return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(ctx, variable.TiDBEnableAutoAnalyze, value)
}
Expand Down Expand Up @@ -176,6 +189,9 @@ func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta
if err = setTiDBSuperReadOnly(d.ctx, sess, variable.On); err != nil {
return err
}
if err = setTiDBTTLJobEnable(d.ctx, sess, variable.Off); err != nil {
return err
}

nowSchemaVersion, err := t.GetSchemaVersion()
if err != nil {
Expand Down Expand Up @@ -553,9 +569,9 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve

var flashbackTS, lockedRegions, startTS, commitTS uint64
var pdScheduleValue map[string]interface{}
var autoAnalyzeValue, readOnlyValue string
var autoAnalyzeValue, readOnlyValue, ttlJobEnableValue string
var gcEnabledValue bool
if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabledValue, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &startTS, &commitTS); err != nil {
if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabledValue, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &startTS, &commitTS, &ttlJobEnableValue); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -595,6 +611,12 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
return ver, errors.Trace(err)
}
job.Args[readOnlyOffset] = &readOnlyValue
ttlJobEnableValue, err = getTiDBTTLJobEnable(sess)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.Args[ttlJobEnableOffSet] = &ttlJobEnableValue
job.SchemaState = model.StateDeleteOnly
return ver, nil
// Stage 2, check flashbackTS, close GC and PD schedule.
Expand Down Expand Up @@ -694,10 +716,10 @@ func finishFlashbackCluster(w *worker, job *model.Job) error {

var flashbackTS, lockedRegions, startTS, commitTS uint64
var pdScheduleValue map[string]interface{}
var autoAnalyzeValue, readOnlyValue string
var autoAnalyzeValue, readOnlyValue, ttlJobEnableValue string
var gcEnabled bool

if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabled, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &startTS, &commitTS); err != nil {
if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabled, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &startTS, &commitTS, &ttlJobEnableValue); err != nil {
return errors.Trace(err)
}
sess, err := w.sessPool.get()
Expand All @@ -718,6 +740,14 @@ func finishFlashbackCluster(w *worker, job *model.Job) error {
if err = setTiDBSuperReadOnly(w.ctx, sess, readOnlyValue); err != nil {
return err
}

if job.IsCancelled() {
// only restore `tidb_ttl_job_enable` when flashback failed
if err = setTiDBTTLJobEnable(w.ctx, sess, ttlJobEnableValue); err != nil {
return err
}
}

return setTiDBEnableAutoAnalyze(w.ctx, sess, autoAnalyzeValue)
})
if err != nil {
Expand Down
24 changes: 22 additions & 2 deletions ddl/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,16 @@ func TestGlobalVariablesOnFlashback(t *testing.T) {
rs, err = tk.Exec("show variables like 'tidb_super_read_only'")
assert.NoError(t, err)
assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.On)
rs, err = tk.Exec("show variables like 'tidb_ttl_job_enable'")
assert.NoError(t, err)
assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off)
}
}
dom.DDL().SetHook(hook)
// first try with `tidb_gc_enable` = on and `tidb_super_read_only` = off
// first try with `tidb_gc_enable` = on and `tidb_super_read_only` = off and `tidb_ttl_job_enable` = on
tk.MustExec("set global tidb_gc_enable = on")
tk.MustExec("set global tidb_super_read_only = off")
tk.MustExec("set global tidb_ttl_job_enable = on")

tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)))

Expand All @@ -224,10 +228,14 @@ func TestGlobalVariablesOnFlashback(t *testing.T) {
rs, err = tk.Exec("show variables like 'tidb_gc_enable'")
require.NoError(t, err)
require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.On)
rs, err = tk.Exec("show variables like 'tidb_ttl_job_enable'")
require.NoError(t, err)
require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off)

// second try with `tidb_gc_enable` = off and `tidb_super_read_only` = on
// second try with `tidb_gc_enable` = off and `tidb_super_read_only` = on and `tidb_ttl_job_enable` = off
tk.MustExec("set global tidb_gc_enable = off")
tk.MustExec("set global tidb_super_read_only = on")
tk.MustExec("set global tidb_ttl_job_enable = off")

ts, err = tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
require.NoError(t, err)
Expand All @@ -238,6 +246,9 @@ func TestGlobalVariablesOnFlashback(t *testing.T) {
rs, err = tk.Exec("show variables like 'tidb_gc_enable'")
require.NoError(t, err)
require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off)
rs, err = tk.Exec("show variables like 'tidb_ttl_job_enable'")
assert.NoError(t, err)
assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off)

dom.DDL().SetHook(originHook)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest"))
Expand Down Expand Up @@ -268,9 +279,14 @@ func TestCancelFlashbackCluster(t *testing.T) {
return job.SchemaState == model.StateDeleteOnly
})
dom.DDL().SetHook(hook)
tk.MustExec("set global tidb_ttl_job_enable = on")
tk.MustGetErrCode(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)), errno.ErrCancelledDDLJob)
hook.MustCancelDone(t)

rs, err := tk.Exec("show variables like 'tidb_ttl_job_enable'")
assert.NoError(t, err)
assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.On)

// Try canceled on StateWriteReorganization, cancel failed
hook = newCancelJobHook(t, store, dom, func(job *model.Job) bool {
return job.SchemaState == model.StateWriteReorganization
Expand All @@ -279,6 +295,10 @@ func TestCancelFlashbackCluster(t *testing.T) {
tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)))
hook.MustCancelFailed(t)

rs, err = tk.Exec("show variables like 'tidb_ttl_job_enable'")
assert.NoError(t, err)
assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off)

dom.DDL().SetHook(originHook)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest"))
Expand Down
3 changes: 2 additions & 1 deletion ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2750,7 +2750,8 @@ func (d *ddl) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error
variable.Off, /* tidb_super_read_only */
0, /* totalRegions */
0, /* startTS */
0 /* commitTS */},
0, /* commitTS */
variable.On /* tidb_ttl_job_enable */},
}
err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
Expand Down
4 changes: 4 additions & 0 deletions ddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,10 @@ func (w *worker) onRecoverSchema(d *ddlCtx, t *meta.Meta, job *model.Job) (ver i
return ver, errors.Trace(err)
}
for _, recoverInfo := range recoverSchemaInfo.RecoverTabsInfo {
if recoverInfo.TableInfo.TTLInfo != nil {
// force disable TTL job schedule for recovered table
recoverInfo.TableInfo.TTLInfo.Enable = false
}
ver, err = w.recoverTable(t, job, recoverInfo)
if err != nil {
return ver, errors.Trace(err)
Expand Down
64 changes: 64 additions & 0 deletions ddl/serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,70 @@ func TestCancelAddIndexPanic(t *testing.T) {
require.Truef(t, strings.HasPrefix(errMsg, "[ddl:8214]Cancelled DDL job"), "%v", errMsg)
}

func TestRecoverTableWithTTL(t *testing.T) {
store, _ := createMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("create database if not exists test_recover")
tk.MustExec("use test_recover")
defer func(originGC bool) {
if originGC {
util.EmulatorGCEnable()
} else {
util.EmulatorGCDisable()
}
}(util.IsEmulatorGCEnable())

// disable emulator GC.
// Otherwise emulator GC will delete table record as soon as possible after execute drop table ddl.
util.EmulatorGCDisable()
gcTimeFormat := "20060102-15:04:05 -0700 MST"
safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '')
ON DUPLICATE KEY
UPDATE variable_value = '%[1]s'`
tk.MustExec(fmt.Sprintf(safePointSQL, time.Now().Add(-time.Hour).Format(gcTimeFormat)))
getDDLJobID := func(table, tp string) int64 {
rs, err := tk.Exec("admin show ddl jobs")
require.NoError(t, err)
rows, err := session.GetRows4Test(context.Background(), tk.Session(), rs)
require.NoError(t, err)
for _, row := range rows {
if row.GetString(2) == table && row.GetString(3) == tp {
return row.GetInt64(0)
}
}
require.FailNowf(t, "can't find %s table of %s", tp, table)
return -1
}

// recover table
tk.MustExec("create table t_recover1 (t timestamp) TTL=`t`+INTERVAL 1 DAY")
tk.MustExec("drop table t_recover1")
tk.MustExec("recover table t_recover1")
tk.MustQuery("show create table t_recover1").Check(testkit.Rows("t_recover1 CREATE TABLE `t_recover1` (\n `t` timestamp NULL DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![ttl] TTL=`t` + INTERVAL 1 DAY */ /*T![ttl] TTL_ENABLE='OFF' */"))

// recover table with job id
tk.MustExec("create table t_recover2 (t timestamp) TTL=`t`+INTERVAL 1 DAY")
tk.MustExec("drop table t_recover2")
jobID := getDDLJobID("t_recover2", "drop table")
tk.MustExec(fmt.Sprintf("recover table BY JOB %d", jobID))
tk.MustQuery("show create table t_recover2").Check(testkit.Rows("t_recover2 CREATE TABLE `t_recover2` (\n `t` timestamp NULL DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![ttl] TTL=`t` + INTERVAL 1 DAY */ /*T![ttl] TTL_ENABLE='OFF' */"))

// flashback table
tk.MustExec("create table t_recover3 (t timestamp) TTL=`t`+INTERVAL 1 DAY")
tk.MustExec("drop table t_recover3")
tk.MustExec("flashback table t_recover3")
tk.MustQuery("show create table t_recover3").Check(testkit.Rows("t_recover3 CREATE TABLE `t_recover3` (\n `t` timestamp NULL DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![ttl] TTL=`t` + INTERVAL 1 DAY */ /*T![ttl] TTL_ENABLE='OFF' */"))

// flashback database
tk.MustExec("create database if not exists test_recover2")
tk.MustExec("create table test_recover2.t1 (t timestamp) TTL=`t`+INTERVAL 1 DAY")
tk.MustExec("create table test_recover2.t2 (t timestamp) TTL=`t`+INTERVAL 1 DAY")
tk.MustExec("drop database test_recover2")
tk.MustExec("flashback database test_recover2")
tk.MustQuery("show create table test_recover2.t1").Check(testkit.Rows("t1 CREATE TABLE `t1` (\n `t` timestamp NULL DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![ttl] TTL=`t` + INTERVAL 1 DAY */ /*T![ttl] TTL_ENABLE='OFF' */"))
tk.MustQuery("show create table test_recover2.t2").Check(testkit.Rows("t2 CREATE TABLE `t2` (\n `t` timestamp NULL DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![ttl] TTL=`t` + INTERVAL 1 DAY */ /*T![ttl] TTL_ENABLE='OFF' */"))
}

func TestRecoverTableByJobID(t *testing.T) {
store, _ := createMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
Expand Down
5 changes: 5 additions & 0 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,11 @@ func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in

schemaID := recoverInfo.SchemaID
tblInfo := recoverInfo.TableInfo
if tblInfo.TTLInfo != nil {
// force disable TTL job schedule for recovered table
tblInfo.TTLInfo.Enable = false
}

// check GC and safe point
gcEnable, err := checkGCEnable(w)
if err != nil {
Expand Down

0 comments on commit be8caa6

Please sign in to comment.