Skip to content

Commit

Permalink
Merge pull request #889 from pingcap/zimuxia/bg-job-stats
Browse files Browse the repository at this point in the history
*: Support show current background job info
  • Loading branch information
zimulala committed Feb 19, 2016
2 parents 83955b9 + 011162b commit 871679b
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 31 deletions.
2 changes: 0 additions & 2 deletions ddl/bg_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ func (s *testDDLSuite) TestDropSchemaError(c *C) {
defer d.close()

job := &model.Job{
ID: 1,
SchemaID: 1,
Type: model.ActionDropSchema,
Args: []interface{}{&model.DBInfo{
Expand Down Expand Up @@ -69,7 +68,6 @@ func (s *testDDLSuite) TestDropTableError(c *C) {
testCreateSchema(c, mock.NewContext(), d, dbInfo)

job := &model.Job{
ID: 1,
SchemaID: dbInfo.ID,
Type: model.ActionDropTable,
Args: []interface{}{&model.TableInfo{
Expand Down
4 changes: 3 additions & 1 deletion ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (d *ddl) checkOwner(t *meta.Meta, flag JobType) (*model.Owner, error) {
case bgJobFlag:
owner, err = t.GetBgJobOwner()
default:
err = errors.Errorf("invalid ddl flag %v", flag)
err = errInvalidJobFlag
}
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -191,6 +191,8 @@ var ErrNotOwner = errors.New("DDL: not owner")
// ErrWorkerClosed means we have already closed the DDL worker.
var ErrWorkerClosed = errors.New("DDL: worker is closed")

var errInvalidJobFlag = errors.New("DDL: invalid job flag")

// JobType is job type, including ddl/background.
type JobType int

Expand Down
83 changes: 60 additions & 23 deletions ddl/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

var (
ddlServerID = "ddl_server_id"
serverID = "server_id"
ddlSchemaVersion = "ddl_schema_version"
ddlOwnerID = "ddl_owner_id"
ddlOwnerLastUpdateTS = "ddl_owner_last_update_ts"
Expand All @@ -36,6 +36,20 @@ var (
ddlJobSnapshotVer = "ddl_job_snapshot_ver"
ddlJobReorgHandle = "ddl_job_reorg_handle"
ddlJobArgs = "ddl_job_args"
bgSchemaVersion = "bg_schema_version"
bgOwnerID = "bg_owner_id"
bgOwnerLastUpdateTS = "bg_owner_last_update_ts"
bgJobID = "bg_job_id"
bgJobAction = "bg_job_action"
bgJobLastUpdateTS = "bg_job_last_update_ts"
bgJobState = "bg_job_state"
bgJobError = "bg_job_error"
bgJobSchemaState = "bg_job_schema_state"
bgJobSchemaID = "bg_job_schema_id"
bgJobTableID = "bg_job_table_id"
bgJobSnapshotVer = "bg_job_snapshot_ver"
bgJobReorgHandle = "bg_job_reorg_handle"
bgJobArgs = "bg_job_args"
)

// GetScope gets the status variables scope.
Expand All @@ -46,40 +60,63 @@ func (d *ddl) GetScope(status string) variable.ScopeFlag {

// Stat returns the DDL statistics.
func (d *ddl) Stats() (map[string]interface{}, error) {
var info *inspectkv.DDLInfo
m := make(map[string]interface{})
m[serverID] = d.uuid
var ddlInfo, bgInfo *inspectkv.DDLInfo

err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
var err1 error
info, err1 = inspectkv.GetDDLInfo(txn)
ddlInfo, err1 = inspectkv.GetDDLInfo(txn)
if err1 != nil {
return errors.Trace(err1)
}
bgInfo, err1 = inspectkv.GetDDLBgInfo(txn)

return errors.Trace(err1)
})
if err != nil {
return nil, errors.Trace(err)
}

m := make(map[string]interface{})
m[ddlServerID] = d.uuid

m[ddlSchemaVersion] = info.SchemaVer

if info.Owner != nil {
m[ddlOwnerID] = info.Owner.OwnerID
m[ddlSchemaVersion] = ddlInfo.SchemaVer
if ddlInfo.Owner != nil {
m[ddlOwnerID] = ddlInfo.Owner.OwnerID
// LastUpdateTS uses nanosecond.
m[ddlOwnerLastUpdateTS] = info.Owner.LastUpdateTS / 1e9
m[ddlOwnerLastUpdateTS] = ddlInfo.Owner.LastUpdateTS / 1e9
}
if ddlInfo.Job != nil {
m[ddlJobID] = ddlInfo.Job.ID
m[ddlJobAction] = ddlInfo.Job.Type.String()
m[ddlJobLastUpdateTS] = ddlInfo.Job.LastUpdateTS / 1e9
m[ddlJobState] = ddlInfo.Job.State.String()
m[ddlJobError] = ddlInfo.Job.Error
m[ddlJobSchemaState] = ddlInfo.Job.SchemaState.String()
m[ddlJobSchemaID] = ddlInfo.Job.SchemaID
m[ddlJobTableID] = ddlInfo.Job.TableID
m[ddlJobSnapshotVer] = ddlInfo.Job.SnapshotVer
m[ddlJobReorgHandle] = ddlInfo.ReorgHandle
m[ddlJobArgs] = ddlInfo.Job.Args
}

if info.Job != nil {
m[ddlJobID] = info.Job.ID
m[ddlJobAction] = info.Job.Type.String()
m[ddlJobLastUpdateTS] = info.Job.LastUpdateTS / 1e9
m[ddlJobState] = info.Job.State.String()
m[ddlJobError] = info.Job.Error
m[ddlJobSchemaState] = info.Job.SchemaState.String()
m[ddlJobSchemaID] = info.Job.SchemaID
m[ddlJobTableID] = info.Job.TableID
m[ddlJobSnapshotVer] = info.Job.SnapshotVer
m[ddlJobReorgHandle] = info.ReorgHandle
m[ddlJobArgs] = info.Job.Args
// background DDL info
m[bgSchemaVersion] = bgInfo.SchemaVer
if bgInfo.Owner != nil {
m[bgOwnerID] = bgInfo.Owner.OwnerID
// LastUpdateTS uses nanosecond.
m[bgOwnerLastUpdateTS] = bgInfo.Owner.LastUpdateTS / 1e9
}
if bgInfo.Job != nil {
m[bgJobID] = bgInfo.Job.ID
m[bgJobAction] = bgInfo.Job.Type.String()
m[bgJobLastUpdateTS] = bgInfo.Job.LastUpdateTS / 1e9
m[bgJobState] = bgInfo.Job.State.String()
m[bgJobError] = bgInfo.Job.Error
m[bgJobSchemaState] = bgInfo.Job.SchemaState.String()
m[bgJobSchemaID] = bgInfo.Job.SchemaID
m[bgJobTableID] = bgInfo.Job.TableID
m[bgJobSnapshotVer] = bgInfo.Job.SnapshotVer
m[bgJobReorgHandle] = bgInfo.ReorgHandle
m[bgJobArgs] = bgInfo.Job.Args
}

return m, nil
Expand Down
12 changes: 8 additions & 4 deletions ddl/stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var _ = Suite(&testStatSuite{})
type testStatSuite struct {
}

func (s *testStatSuite) getSchemaVer(c *C, d *ddl) int64 {
func (s *testStatSuite) getDDLSchemaVer(c *C, d *ddl) int64 {
m, err := d.Stats()
c.Assert(err, IsNil)
v := m[ddlSchemaVersion]
Expand All @@ -45,14 +45,15 @@ func (s *testStatSuite) TestStat(c *C) {
time.Sleep(lease)

dbInfo := testSchemaInfo(c, d, "test")
testCreateSchema(c, mock.NewContext(), d, dbInfo)

m, err := d.Stats()
c.Assert(err, IsNil)
c.Assert(m[ddlOwnerID], Equals, d.uuid)

job := &model.Job{
SchemaID: dbInfo.ID,
Type: model.ActionCreateSchema,
Type: model.ActionDropSchema,
Args: []interface{}{dbInfo.Name},
}

Expand All @@ -65,16 +66,19 @@ func (s *testStatSuite) TestStat(c *C) {
ticker := time.NewTicker(d.lease * 1)
defer ticker.Stop()

ver := s.getSchemaVer(c, d)
ver := s.getDDLSchemaVer(c, d)
LOOP:
for {
select {
case <-ticker.C:
d.close()
c.Assert(s.getSchemaVer(c, d), GreaterEqual, ver)
c.Assert(s.getDDLSchemaVer(c, d), GreaterEqual, ver)
d.start()
case err := <-done:
c.Assert(err, IsNil)
m, err := d.Stats()
c.Assert(err, IsNil)
c.Assert(m[bgOwnerID], Equals, d.uuid)
break LOOP
}
}
Expand Down
24 changes: 23 additions & 1 deletion inspectkv/inspectkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
// DDLInfo is for DDL information.
type DDLInfo struct {
SchemaVer int64
ReorgHandle int64
ReorgHandle int64 // it's only used for DDL information.
Owner *model.Owner
Job *model.Job
}
Expand Down Expand Up @@ -65,6 +65,28 @@ func GetDDLInfo(txn kv.Transaction) (*DDLInfo, error) {
return info, nil
}

// GetDDLBgInfo returns DDL background information.
func GetDDLBgInfo(txn kv.Transaction) (*DDLInfo, error) {
var err error
info := &DDLInfo{}
t := meta.NewMeta(txn)

info.Owner, err = t.GetBgJobOwner()
if err != nil {
return nil, errors.Trace(err)
}
info.Job, err = t.GetBgJob(0)
if err != nil {
return nil, errors.Trace(err)
}
info.SchemaVer, err = t.GetSchemaVersion()
if err != nil {
return nil, errors.Trace(err)
}

return info, nil
}

func nextIndexVals(data []interface{}) []interface{} {
// Add 0x0 to the end of data.
return append(data, nil)
Expand Down
23 changes: 23 additions & 0 deletions inspectkv/inspectkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,29 @@ func (s *testSuite) TestGetDDLInfo(c *C) {
c.Assert(err, IsNil)
}

func (s *testSuite) TestGetDDLBgInfo(c *C) {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
t := meta.NewMeta(txn)

owner := &model.Owner{OwnerID: "owner"}
err = t.SetBgJobOwner(owner)
c.Assert(err, IsNil)
job := &model.Job{
SchemaID: 1,
Type: model.ActionDropTable,
}
err = t.EnQueueBgJob(job)
c.Assert(err, IsNil)
info, err := GetDDLBgInfo(txn)
c.Assert(err, IsNil)
c.Assert(info.Owner, DeepEquals, owner)
c.Assert(info.Job, DeepEquals, job)
c.Assert(info.ReorgHandle, Equals, int64(0))
err = txn.Commit()
c.Assert(err, IsNil)
}

func (s *testSuite) TestScan(c *C) {
alloc := autoid.NewAllocator(s.store, s.dbInfo.ID)
tb, err := tables.TableFromMeta(alloc, s.tbInfo)
Expand Down

0 comments on commit 871679b

Please sign in to comment.