Skip to content

Commit

Permalink
check gc safe point
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 committed Dec 15, 2018
1 parent 6991fa1 commit 72b921d
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 13 deletions.
35 changes: 34 additions & 1 deletion ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/table"
Expand Down Expand Up @@ -1863,13 +1864,24 @@ func (s *testDBSuite) TestRestoreTable(c *C) {
tk.MustExec("use test_restore")
tk.MustExec("create table t_recover (a int);")
defer ddl.SetEmulatorGCEnable(ddl.GetEmulatorGCStatus())

// disable emulator GC.
// Otherwise emulator GC will delete table record as soon as possible after execute drop table ddl.
originGC := ddl.GetEmulatorGCStatus()
defer ddl.SetEmulatorGCEnable(originGC)
ddl.SetEmulatorGCEnable(false)
gcTimeFormat := "20060102-15:04:05 -0700 MST"

timeBeforeDrop := time.Now().Add(0 - time.Duration(48*60*60*time.Second)).Format(gcTimeFormat)
timeAfterDrop := time.Now().Add(time.Duration(48 * 60 * 60 * time.Second)).Format(gcTimeFormat)

safePointSql := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '')
ON DUPLICATE KEY
UPDATE variable_value = '%[1]s'`

tk.MustExec("insert into t_recover values (1),(2),(3)")
tk.MustExec("drop table t_recover")

rs, err := tk.Exec("admin show ddl jobs")
c.Assert(err, IsNil)
rows, err := session.GetRows4Test(context.Background(), tk.Se, rs)
Expand All @@ -1878,9 +1890,30 @@ func (s *testDBSuite) TestRestoreTable(c *C) {
c.Assert(row.GetString(1), Equals, "test_restore")
c.Assert(row.GetString(3), Equals, "drop table")
jobID := row.GetInt64(0)
// enable GC first.

// if gc enable is not exists in mysql.tidb
_, err = tk.Exec(fmt.Sprintf("admin restore table by job %d", jobID))
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "can not get 'tikv_gc_enable'")

err = admin.EnableGCAfterRecover(tk.Se)
c.Assert(err, IsNil)

// if gc safe point is not exists in mysql.tidb
_, err = tk.Exec(fmt.Sprintf("admin restore table by job %d", jobID))
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "can not get 'tikv_gc_safe_point'")

// recover job is before gc safe point
tk.MustExec(fmt.Sprintf(safePointSql, timeAfterDrop))
_, err = tk.Exec(fmt.Sprintf("admin restore table by job %d", jobID))
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, variable.ErrSnapshotTooOld.GenWithStackByArgs(timeAfterDrop).Error())

// recover job after gc safe point
tk.MustExec(fmt.Sprintf(safePointSql, timeBeforeDrop))
tk.MustExec(fmt.Sprintf("admin restore table by job %d", jobID))

// check recover table meta and data record.
tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "2", "3"))
// check recover table autoID.
Expand Down
8 changes: 4 additions & 4 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)

func (w *worker) onRestoreTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
// check gc enable status again.
gcEnable, isNull, err := checkGCEnable(w)
gcEnable, err := checkGCEnable(w)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
if isNull || gcEnable {
if gcEnable {
job.State = model.JobStateCancelled
return ver, errors.Errorf("can not found gc enable variable in mysql.tidb")
}
Expand Down Expand Up @@ -195,10 +195,10 @@ func enableGC(w *worker) error {
return admin.EnableGCAfterRecover(ctx)
}

func checkGCEnable(w *worker) (enable bool, isNull bool, err error) {
func checkGCEnable(w *worker) (enable bool, err error) {
ctx, err := w.sessPool.get()
if err != nil {
return false, false, errors.Trace(err)
return false, errors.Trace(err)
}
defer w.sessPool.put(ctx)

Expand Down
11 changes: 7 additions & 4 deletions executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,10 @@ func (e *RestoreTableExec) Open(ctx context.Context) error {

// Next implements the Executor Open interface.
func (e *RestoreTableExec) Next(ctx context.Context, chk *chunk.Chunk) error {
enableGCAfterRecover, isNull, err := admin.CheckGCEnableStatus(e.ctx)
enableGCAfterRecover, err := admin.CheckGCEnableStatus(e.ctx)
if err != nil {
return err
}
if isNull {
return errors.Errorf("can not found gc enable variable in mysql.tidb")
}
if enableGCAfterRecover {
err = admin.DisableGCForRecover(e.ctx)
if err != nil {
Expand All @@ -210,6 +207,12 @@ func (e *RestoreTableExec) Next(ctx context.Context, chk *chunk.Chunk) error {
return errors.Errorf("Job %v doesn't drop any table", job.ID)
}

// check gc safe point
err = validateSnapshot(e.ctx, job.StartTS)
if err != nil {
return errors.Trace(err)
}

dom := domain.GetDomain(e.ctx)
// Get the snapshot infoSchema before drop table.
snapInfo, err := dom.GetSnapshotInfoSchema(job.StartTS)
Expand Down
1 change: 1 addition & 0 deletions executor/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func validateSnapshot(ctx sessionctx.Context, snapshotTS uint64) error {
return errors.Trace(err)
}
safePointTS := variable.GoTimeToTS(safePointTime)
fmt.Printf("\n\nvalid safe point: %v\nformat: %v\nsafeTS: %v\nsnapTs: %v-------------\n", safePointTime, safePointTime.Format(gcTimeFormat), safePointTS, snapshotTS)
if safePointTS > snapshotTS {
return variable.ErrSnapshotTooOld.GenWithStackByArgs(safePointString)
}
Expand Down
8 changes: 4 additions & 4 deletions util/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,16 +712,16 @@ func iterRecords(sessCtx sessionctx.Context, retriever kv.Retriever, t table.Tab
}

// CheckGCEnableStatus is
func CheckGCEnableStatus(ctx sessionctx.Context) (enable bool, isNull bool, err error) {
func CheckGCEnableStatus(ctx sessionctx.Context) (enable bool, err error) {
sql := fmt.Sprintf(`SELECT HIGH_PRIORITY (variable_value) FROM mysql.tidb WHERE variable_name='%s' FOR UPDATE`, "tikv_gc_enable")
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, sql)
if err != nil {
return false, false, errors.Trace(err)
return false, errors.Trace(err)
}
if len(rows) != 1 {
return false, true, nil
return false, errors.New("can not get 'tikv_gc_enable'")
}
return rows[0].GetString(0) == "true", false, nil
return rows[0].GetString(0) == "true", nil
}

// DisableGCForRecover is
Expand Down

0 comments on commit 72b921d

Please sign in to comment.