Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl, domain: make schema correct after canceling jobs (#7997) #8057

Merged
merged 4 commits into from
Oct 31, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
type DDLForTest interface {
// SetHook sets the hook.
SetHook(h Callback)
// GetHook gets the hook.
GetHook() Callback
// SetInterceptoror sets the interceptor.
SetInterceptoror(h Interceptor)
}
Expand All @@ -51,6 +53,14 @@ func (d *ddl) SetHook(h Callback) {
d.mu.hook = h
}

// GetHook implements DDL.GetHook interface.
func (d *ddl) GetHook() Callback {
d.mu.Lock()
defer d.mu.Unlock()

return d.mu.hook
}

// SetInterceptoror implements DDL.SetInterceptoror interface.
func (d *ddl) SetInterceptoror(i Interceptor) {
d.mu.Lock()
Expand Down
25 changes: 14 additions & 11 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,18 +276,20 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerFinishDDLJob, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
}()

switch job.Type {
case model.ActionAddIndex:
if job.State != model.JobStateRollbackDone {
break
if !job.IsCancelled() {
switch job.Type {
case model.ActionAddIndex:
if job.State != model.JobStateRollbackDone {
break
}
// After rolling back an AddIndex operation, we need to use delete-range to delete the half-done index data.
err = w.deleteRange(job)
case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropTablePartition:
err = w.deleteRange(job)
}
if err != nil {
return errors.Trace(err)
}
// After rolling back an AddIndex operation, we need to use delete-range to delete the half-done index data.
err = w.deleteRange(job)
case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropTablePartition:
err = w.deleteRange(job)
}
if err != nil {
return errors.Trace(err)
}

_, err = t.DeQueueDDLJob()
Expand Down Expand Up @@ -380,6 +382,7 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error {
// and retry later if the job is not cancelled.
schemaVer, runJobErr = w.runDDLJob(d, t, job)
if job.IsCancelled() {
txn.Reset()
err = w.finishDDLJob(t, job)
return errors.Trace(err)
}
Expand Down
116 changes: 116 additions & 0 deletions ddl/fail_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,131 @@ package ddl_test

import (
"fmt"
"time"

gofail "github.com/etcd-io/gofail/runtime"
. "github.com/pingcap/check"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
"golang.org/x/net/context"
)

var _ = Suite(&testFailDBSuite{})

type testFailDBSuite struct {
lease time.Duration
store kv.Storage
dom *domain.Domain
se session.Session
p *parser.Parser
}

func (s *testFailDBSuite) SetUpSuite(c *C) {
testleak.BeforeTest()
s.lease = 200 * time.Millisecond
ddl.WaitTimeWhenErrorOccured = 1 * time.Microsecond
var err error
s.store, err = mockstore.NewMockTikvStore()
c.Assert(err, IsNil)
session.SetSchemaLease(s.lease)
s.dom, err = session.BootstrapSession(s.store)
c.Assert(err, IsNil)
s.se, err = session.CreateSession4Test(s.store)
c.Assert(err, IsNil)
s.p = parser.New()
}

func (s *testFailDBSuite) TearDownSuite(c *C) {
s.se.Execute(context.Background(), "drop database if exists test_db_state")
s.se.Close()
s.dom.Close()
s.store.Close()
testleak.AfterTest(c)()
}

// TestHalfwayCancelOperations tests the case that the schema is correct after the execution of operations are cancelled halfway.
func (s *testFailDBSuite) TestHalfwayCancelOperations(c *C) {
gofail.Enable("github.com/pingcap/tidb/ddl/truncateTableErr", `return(true)`)
defer gofail.Disable("github.com/pingcap/tidb/ddl/truncateTableErr")

// test for truncating table
_, err := s.se.Execute(context.Background(), "create database cancel_job_db")
c.Assert(err, IsNil)
_, err = s.se.Execute(context.Background(), "use cancel_job_db")
c.Assert(err, IsNil)
_, err = s.se.Execute(context.Background(), "create table t(a int)")
c.Assert(err, IsNil)
_, err = s.se.Execute(context.Background(), "insert into t values(1)")
c.Assert(err, IsNil)
_, err = s.se.Execute(context.Background(), "truncate table t")
c.Assert(err, NotNil)
// Make sure that the table's data has not been deleted.
rs, err := s.se.Execute(context.Background(), "select count(*) from t")
c.Assert(err, IsNil)
chk := rs[0].NewChunk()
err = rs[0].Next(context.Background(), chk)
c.Assert(err, IsNil)
c.Assert(chk.NumRows() == 0, IsFalse)
row := chk.GetRow(0)
c.Assert(row.Len(), Equals, 1)
c.Assert(row.GetInt64(0), DeepEquals, int64(1))
c.Assert(rs[0].Close(), IsNil)
// Reload schema.
s.dom.ResetHandle(s.store)
err = s.dom.DDL().(ddl.DDLForTest).GetHook().OnChanged(nil)
c.Assert(err, IsNil)
s.se, err = session.CreateSession4Test(s.store)
c.Assert(err, IsNil)
_, err = s.se.Execute(context.Background(), "use cancel_job_db")
c.Assert(err, IsNil)
// Test schema is correct.
_, err = s.se.Execute(context.Background(), "select * from t")
c.Assert(err, IsNil)

// test for renaming table
gofail.Enable("github.com/pingcap/tidb/ddl/errRenameTable", `return(true)`)
defer gofail.Disable("github.com/pingcap/tidb/ddl/errRenameTable")
_, err = s.se.Execute(context.Background(), "create table tx(a int)")
c.Assert(err, IsNil)
_, err = s.se.Execute(context.Background(), "insert into tx values(1)")
c.Assert(err, IsNil)
_, err = s.se.Execute(context.Background(), "rename table tx to ty")
c.Assert(err, NotNil)
// Make sure that the table's data has not been deleted.
rs, err = s.se.Execute(context.Background(), "select count(*) from tx")
c.Assert(err, IsNil)
chk = rs[0].NewChunk()
err = rs[0].Next(context.Background(), chk)
c.Assert(err, IsNil)
c.Assert(chk.NumRows() == 0, IsFalse)
row = chk.GetRow(0)
c.Assert(row.Len(), Equals, 1)
c.Assert(row.GetInt64(0), DeepEquals, int64(1))
c.Assert(rs[0].Close(), IsNil)
// Reload schema.
s.dom.ResetHandle(s.store)
err = s.dom.DDL().(ddl.DDLForTest).GetHook().OnChanged(nil)
c.Assert(err, IsNil)
s.se, err = session.CreateSession4Test(s.store)
c.Assert(err, IsNil)
_, err = s.se.Execute(context.Background(), "use cancel_job_db")
c.Assert(err, IsNil)
// Test schema is correct.
_, err = s.se.Execute(context.Background(), "select * from tx")
c.Assert(err, IsNil)

// clean up
_, err = s.se.Execute(context.Background(), "drop database cancel_job_db")
c.Assert(err, IsNil)
}

// TestInitializeOffsetAndState tests the case that the column's offset and state don't be initialized in the file of ddl_api.go when
// doing the operation of 'modify column'.
func (s *testStateChangeSuite) TestInitializeOffsetAndState(c *C) {
Expand Down
10 changes: 10 additions & 0 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
// gofail: var truncateTableErr bool
// if truncateTableErr {
// job.State = model.JobStateCancelled
// return ver, errors.New("occur an error after dropping table.")
// }

// We use the new partition ID because all the old data is encoded with the old partition ID, it can not be accessed anymore.
var oldPartitionIDs []int64
Expand Down Expand Up @@ -338,6 +343,11 @@ func onRenameTable(t *meta.Meta, job *model.Job) (ver int64, _ error) {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
// gofail: var renameTableErr bool
// if renameTableErr {
// job.State = model.JobStateCancelled
// return ver, errors.New("occur an error after renaming table.")
// }
tblInfo.Name = tableName
err = t.CreateTable(newSchemaID, tblInfo)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,11 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
}
}

// ResetHandle resets the domain's infoschema handle. It is used for testing.
func (do *Domain) ResetHandle(store kv.Storage) {
do.infoHandle = infoschema.NewHandle(store)
}

// Init initializes a domain.
func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.Resource, error)) error {
if ebd, ok := do.store.(EtcdBackend); ok {
Expand Down