diff --git a/ddl/ddl.go b/ddl/ddl.go index 9fd6cb7215e68..9a85b58c84dc5 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" pumpcli "github.com/pingcap/tidb-tools/tidb-binlog/pump_client" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" @@ -42,6 +43,7 @@ import ( "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table" + goutil "github.com/pingcap/tidb/util" tidbutil "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/logutil" "go.etcd.io/etcd/clientv3" @@ -312,6 +314,7 @@ type ddlCtx struct { lease time.Duration // lease is schema lease. binlogCli *pumpcli.PumpsClient // binlogCli is used for Binlog. infoHandle *infoschema.Handle + tableLockCkr util.DeadTableLockChecker // hook may be modified. mu struct { @@ -375,6 +378,7 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage, ctx, cancelFunc := context.WithCancel(ctx) var manager owner.Manager var syncer util.SchemaSyncer + var deadLockCkr util.DeadTableLockChecker if etcdCli == nil { // The etcdCli is nil if the store is localstore which is only used for testing. // So we use mockOwnerManager and MockSchemaSyncer. @@ -383,6 +387,7 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage, } else { manager = owner.NewOwnerManager(etcdCli, ddlPrompt, id, DDLOwnerKey, cancelFunc) syncer = util.NewSchemaSyncer(etcdCli, id, manager) + deadLockCkr = util.NewDeadTableLockChecker(etcdCli) } ddlCtx := &ddlCtx{ @@ -394,6 +399,7 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage, schemaSyncer: syncer, binlogCli: binloginfo.GetPumpsClient(), infoHandle: infoHandle, + tableLockCkr: deadLockCkr, } ddlCtx.mu.hook = hook ddlCtx.mu.interceptor = &BaseInterceptor{} @@ -491,6 +497,11 @@ func (d *ddl) start(ctx context.Context, ctxPool *pools.ResourcePool) { } }) metrics.DDLCounter.WithLabelValues(fmt.Sprintf("%s", metrics.StartCleanWork)).Inc() + if config.TableLockEnabled() { + d.wg.Add(1) + go d.startCleanDeadTableLock() + } + metrics.DDLCounter.WithLabelValues(metrics.StartCleanWork).Inc() } } @@ -681,6 +692,40 @@ func (d *ddl) GetHook() Callback { return d.mu.hook } +func (d *ddl) startCleanDeadTableLock() { + defer func() { + goutil.Recover(metrics.LabelDDL, "startCleanDeadTableLock", nil, false) + d.wg.Done() + }() + + ticker := time.NewTicker(time.Second * 10) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if !d.ownerManager.IsOwner() { + continue + } + if d.infoHandle == nil || !d.infoHandle.IsValid() { + continue + } + deadLockTables, err := d.tableLockCkr.GetDeadLockedTables(d.quitCh, d.infoHandle.Get().AllSchemas()) + if err != nil { + logutil.Logger(ddlLogCtx).Info("[ddl] get dead table lock failed.", zap.Error(err)) + continue + } + for se, tables := range deadLockTables { + err := d.CleanDeadTableLock(tables, se) + if err != nil { + logutil.Logger(ddlLogCtx).Info("[ddl] clean dead table lock failed.", zap.Error(err)) + } + } + case <-d.quitCh: + return + } + } +} + // DDL error codes. const ( codeInvalidWorker terror.ErrCode = 1 diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index c818c2705b859..bb00c7cea977b 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -3756,6 +3756,33 @@ func (d *ddl) UnlockTables(ctx sessionctx.Context, unlockTables []model.TableLoc return errors.Trace(err) } +// CleanDeadTableLock uses to clean dead table locks. +func (d *ddl) CleanDeadTableLock(unlockTables []model.TableLockTpInfo, se model.SessionInfo) error { + if len(unlockTables) == 0 { + return nil + } + arg := &lockTablesArg{ + UnlockTables: unlockTables, + SessionInfo: se, + } + job := &model.Job{ + SchemaID: unlockTables[0].SchemaID, + TableID: unlockTables[0].TableID, + Type: model.ActionUnlockTable, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{arg}, + } + + ctx, err := d.sessPool.get() + if err != nil { + return err + } + defer d.sessPool.put(ctx) + err = d.doDDLJob(ctx, job) + err = d.callHookOnChanged(err) + return errors.Trace(err) +} + func throwErrIfInMemOrSysDB(ctx sessionctx.Context, dbLowerName string) error { if util.IsMemOrSysDB(dbLowerName) { if ctx.GetSessionVars().User != nil { diff --git a/ddl/util/dead_table_lock_checker.go b/ddl/util/dead_table_lock_checker.go new file mode 100644 index 0000000000000..9984a234eb469 --- /dev/null +++ b/ddl/util/dead_table_lock_checker.go @@ -0,0 +1,106 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "context" + "strings" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/parser/model" + "github.com/pingcap/tidb/util/logutil" + "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" +) + +const ( + defaultRetryCnt = 5 + defaultRetryInterval = time.Millisecond * 200 + defaultTimeout = time.Second +) + +// DeadTableLockChecker uses to check dead table locks. +// If tidb-server panic or killed by others, the table locks hold by the killed tidb-server maybe doesn't released. +type DeadTableLockChecker struct { + etcdCli *clientv3.Client +} + +// NewDeadTableLockChecker creates new DeadLockChecker. +func NewDeadTableLockChecker(etcdCli *clientv3.Client) DeadTableLockChecker { + return DeadTableLockChecker{ + etcdCli: etcdCli, + } +} + +func (d *DeadTableLockChecker) getAliveServers(ctx context.Context) (map[string]struct{}, error) { + var err error + var resp *clientv3.GetResponse + allInfos := make(map[string]struct{}) + for i := 0; i < defaultRetryCnt; i++ { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + childCtx, cancel := context.WithTimeout(ctx, defaultTimeout) + resp, err = d.etcdCli.Get(childCtx, DDLAllSchemaVersions, clientv3.WithPrefix()) + cancel() + if err != nil { + logutil.Logger(ddlLogCtx).Info("[ddl] clean dead table lock get alive servers failed.", zap.Error(err)) + time.Sleep(defaultRetryInterval) + continue + } + for _, kv := range resp.Kvs { + serverID := strings.TrimPrefix(string(kv.Key), DDLAllSchemaVersions+"/") + allInfos[serverID] = struct{}{} + } + return allInfos, nil + } + return nil, errors.Trace(err) +} + +// GetDeadLockedTables gets dead locked tables. +func (d *DeadTableLockChecker) GetDeadLockedTables(quitCh chan struct{}, schemas []*model.DBInfo) (map[model.SessionInfo][]model.TableLockTpInfo, error) { + if d.etcdCli == nil { + return nil, nil + } + aliveServers, err := d.getAliveServers(context.Background()) + if err != nil { + return nil, err + } + deadLockTables := make(map[model.SessionInfo][]model.TableLockTpInfo) + for _, schema := range schemas { + select { + case <-quitCh: + return nil, nil + default: + } + for _, tbl := range schema.Tables { + if tbl.Lock == nil { + continue + } + for _, se := range tbl.Lock.Sessions { + if _, ok := aliveServers[se.ServerID]; !ok { + deadLockTables[se] = append(deadLockTables[se], model.TableLockTpInfo{ + SchemaID: schema.ID, + TableID: tbl.ID, + Tp: tbl.Lock.Tp, + }) + } + } + } + } + return deadLockTables, nil +} diff --git a/util/misc.go b/util/misc.go index 6af5c4b9cf65f..f79ddc06e9fac 100644 --- a/util/misc.go +++ b/util/misc.go @@ -19,8 +19,10 @@ import ( "crypto/x509" "crypto/x509/pkix" "fmt" + "github.com/pingcap/tidb/metrics" "io/ioutil" "net/http" + "os" "runtime" "strconv" "strings" @@ -92,6 +94,34 @@ func WithRecovery(exec func(), recoverFn func(r interface{})) { exec() } +// Recover includes operations such as recovering, clearing,and printing information. +// It will dump current goroutine stack into log if catch any recover result. +// metricsLabel: The label of PanicCounter metrics. +// funcInfo: Some information for the panic function. +// recoverFn: Handler will be called after recover and before dump stack, passing `nil` means noop. +// quit: If this value is true, the current program exits after recovery. +func Recover(metricsLabel, funcInfo string, recoverFn func(), quit bool) { + r := recover() + if r == nil { + return + } + + if recoverFn != nil { + recoverFn() + } + logutil.Logger(context.Background()).Error("panic in the recoverable goroutine", + zap.String("label", metricsLabel), + zap.String("funcInfo", funcInfo), + zap.Reflect("r", r), + zap.String("stack", string(GetStack()))) + metrics.PanicCounter.WithLabelValues(metricsLabel).Inc() + if quit { + // Wait for metrics to be pushed. + time.Sleep(time.Second * 15) + os.Exit(1) + } +} + // CompatibleParseGCTime parses a string with `GCTimeFormat` and returns a time.Time. If `value` can't be parsed as that // format, truncate to last space and try again. This function is only useful when loading times that saved by // gc_worker. We have changed the format that gc_worker saves time (removed the last field), but when loading times it