Skip to content

Commit

Permalink
*: support capture evolve plan tasks (pingcap#13199)
Browse files Browse the repository at this point in the history
  • Loading branch information
alivxxx authored and XiaTianliang committed Dec 21, 2019
1 parent 03c0919 commit 517c321
Show file tree
Hide file tree
Showing 11 changed files with 214 additions and 65 deletions.
16 changes: 12 additions & 4 deletions bindinfo/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,30 @@ type hintProcessor struct {
*HintsSet
// bindHint2Ast indicates the behavior of the processor, `true` for bind hint to ast, `false` for extract hint from ast.
bindHint2Ast bool
tableCounter int64
indexCounter int64
tableCounter int
indexCounter int
}

func (hp *hintProcessor) Enter(in ast.Node) (ast.Node, bool) {
switch v := in.(type) {
case *ast.SelectStmt:
if hp.bindHint2Ast {
v.TableHints = hp.tableHints[hp.tableCounter]
if hp.tableCounter < len(hp.tableHints) {
v.TableHints = hp.tableHints[hp.tableCounter]
} else {
v.TableHints = nil
}
hp.tableCounter++
} else {
hp.tableHints = append(hp.tableHints, v.TableHints)
}
case *ast.TableName:
if hp.bindHint2Ast {
v.IndexHints = hp.indexHints[hp.indexCounter]
if hp.indexCounter < len(hp.indexHints) {
v.IndexHints = hp.indexHints[hp.indexCounter]
} else {
v.IndexHints = nil
}
hp.indexCounter++
} else {
hp.indexHints = append(hp.indexHints, v.IndexHints)
Expand Down
22 changes: 21 additions & 1 deletion bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func (s *testSuite) TestUseMultiplyBindings(c *C) {
tk.MustExec("analyze table t")
tk.MustExec("create binding for select * from t where a >= 1 and b >= 1 and c = 0 using select * from t use index(idx_a) where a >= 1 and b >= 1 and c = 0")
tk.MustExec("create binding for select * from t where a >= 1 and b >= 1 and c = 0 using select * from t use index(idx_b) where a >= 1 and b >= 1 and c = 0")
// It cannot choose `idx_c` although it has lowest cost.
// It cannot choose table path although it has lowest cost.
tk.MustQuery("select * from t where a >= 4 and b >= 1 and c = 0")
c.Assert(tk.Se.GetSessionVars().StmtCtx.IndexNames[0], Equals, "t:idx_a")
tk.MustQuery("select * from t where a >= 1 and b >= 4 and c = 0")
Expand Down Expand Up @@ -504,3 +504,23 @@ func (s *testSuite) TestDropSingleBindings(c *C) {
rows = tk.MustQuery("show global bindings").Rows()
c.Assert(len(rows), Equals, 0)
}

func (s *testSuite) TestAddEvolveTasks(c *C) {
tk := testkit.NewTestKit(c, s.store)
s.cleanBindingEnv(tk)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, c int, index idx_a(a), index idx_b(b), index idx_c(c))")
tk.MustExec("insert into t values (1,1,1), (2,2,2), (3,3,3), (4,4,4), (5,5,5)")
tk.MustExec("analyze table t")
tk.MustExec("create global binding for select * from t where a >= 1 and b >= 1 and c = 0 using select * from t use index(idx_a) where a >= 1 and b >= 1 and c = 0")
tk.MustExec("set @@tidb_evolve_plan_baselines=1")
// It cannot choose table path although it has lowest cost.
tk.MustQuery("select * from t where a >= 4 and b >= 1 and c = 0")
c.Assert(tk.Se.GetSessionVars().StmtCtx.IndexNames[0], Equals, "t:idx_a")
domain.GetDomain(tk.Se).BindHandle().SaveEvolveTasksToStore()
rows := tk.MustQuery("show global bindings").Rows()
c.Assert(len(rows), Equals, 2)
c.Assert(rows[1][1], Equals, "SELECT /*+ USE_INDEX(@`sel_1` `test`.`t` )*/ * FROM `test`.`t` WHERE `a`>=4 AND `b`>=1 AND `c`=0")
c.Assert(rows[1][3], Equals, "pending verify")
}
11 changes: 7 additions & 4 deletions bindinfo/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const (
deleted = "deleted"
// Invalid is the bind info's invalid status.
Invalid = "invalid"
// PendingVerify means the bind info needs to be verified.
PendingVerify = "pending verify"
)

// Binding stores the basic bind hint info.
Expand All @@ -47,6 +49,7 @@ type Binding struct {
// Hint is the parsed hints, it is used to bind hints to stmt node.
Hint *HintsSet
// id is the string form of all hints. It is used to uniquely identify different hints.
// It would be non-empty only when the status is `Using` or `PendingVerify`.
id string
}

Expand All @@ -71,10 +74,10 @@ func (br *BindRecord) HasUsingBinding() bool {
return false
}

// FindUsingBinding find bindings with status `Using` in BindRecord.
func (br *BindRecord) FindUsingBinding(hint string) *Binding {
// FindBinding find bindings in BindRecord.
func (br *BindRecord) FindBinding(hint string) *Binding {
for _, binding := range br.Bindings {
if binding.Status == Using && binding.id == hint {
if binding.id == hint {
return &binding
}
}
Expand All @@ -84,7 +87,7 @@ func (br *BindRecord) FindUsingBinding(hint string) *Binding {
func (br *BindRecord) prepareHints(sctx sessionctx.Context, is infoschema.InfoSchema) error {
p := parser.New()
for i, bind := range br.Bindings {
if bind.Hint != nil {
if bind.Hint != nil || bind.id != "" {
continue
}
stmtNode, err := p.ParseOneStmt(bind.BindSQL, bind.Charset, bind.Collation)
Expand Down
150 changes: 100 additions & 50 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ type BindHandle struct {

// invalidBindRecordMap indicates the invalid bind records found during querying.
// A record will be deleted from this map, after 2 bind-lease, after it is dropped from the kv.
invalidBindRecordMap struct {
sync.Mutex
atomic.Value
}
invalidBindRecordMap tmpBindRecordMap

// pendingVerifyBindRecordMap indicates the pending verify bind records that found during query.
pendingVerifyBindRecordMap tmpBindRecordMap

lastUpdateTime types.Time

Expand All @@ -83,9 +83,9 @@ type BindHandle struct {
// Lease influences the duration of loading bind info and handling invalid bind.
var Lease = 3 * time.Second

type invalidBindRecordMap struct {
bindRecord *BindRecord
droppedTime time.Time
type bindRecordUpdate struct {
bindRecord *BindRecord
updateTime time.Time
}

// NewBindHandle creates a new BindHandle.
Expand All @@ -95,7 +95,18 @@ func NewBindHandle(ctx sessionctx.Context) *BindHandle {
handle.bindInfo.Value.Store(make(cache, 32))
handle.bindInfo.parser = parser.New()
handle.parser4Baseline = parser.New()
handle.invalidBindRecordMap.Value.Store(make(map[string]*invalidBindRecordMap))
handle.invalidBindRecordMap.Value.Store(make(map[string]*bindRecordUpdate))
handle.invalidBindRecordMap.flushFunc = func(record *BindRecord) error {
// We do not need the first two parameters because they are only use to generate hint,
// and we already have the hint.
return handle.DropBindRecord(nil, nil, record)
}
handle.pendingVerifyBindRecordMap.Value.Store(make(map[string]*bindRecordUpdate))
handle.pendingVerifyBindRecordMap.flushFunc = func(record *BindRecord) error {
// We do not need the first two parameters because they are only use to generate hint,
// and we already have the hint.
return handle.AddBindRecord(nil, nil, record)
}
return handle
}

Expand Down Expand Up @@ -150,6 +161,21 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, is infoschema.InfoSc
if err != nil {
return err
}

br := h.GetBindRecord(parser.DigestHash(record.OriginalSQL), record.OriginalSQL, record.Db)
var duplicateBinding string
if br != nil {
binding := br.FindBinding(record.Bindings[0].id)
if binding != nil {
// There is already a binding with status `Using` or `PendingVerify`, we could directly cancel the job.
if record.Bindings[0].Status == PendingVerify {
return nil
}
// Otherwise, we need to remove it before insert.
duplicateBinding = binding.BindSQL
}
}

exec, _ := h.sctx.Context.(sqlexec.SQLExecutor)
h.sctx.Lock()
_, err = exec.Execute(context.TODO(), "BEGIN")
Expand Down Expand Up @@ -179,18 +205,10 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, is infoschema.InfoSc
h.bindInfo.Unlock()
}()

oldBindRecord := h.GetBindRecord(parser.DigestHash(record.OriginalSQL), record.OriginalSQL, record.Db)
if oldBindRecord != nil {
for _, newBinding := range record.Bindings {
binding := oldBindRecord.FindUsingBinding(newBinding.id)
if binding == nil {
continue
}
// Remove duplicates before insert.
_, err = exec.Execute(context.TODO(), h.deleteBindInfoSQL(record.OriginalSQL, record.Db, binding.BindSQL))
if err != nil {
return err
}
if duplicateBinding != "" {
_, err = exec.Execute(context.TODO(), h.deleteBindInfoSQL(record.OriginalSQL, record.Db, duplicateBinding))
if err != nil {
return err
}
}

Expand All @@ -205,7 +223,6 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, is infoschema.InfoSc
Fsp: 3,
}
record.Bindings[i].UpdateTime = record.Bindings[0].CreateTime
record.Bindings[i].Status = Using

// insert the BindRecord to the storage.
_, err = exec.Execute(context.TODO(), h.insertBindInfoSQL(record.OriginalSQL, record.Db, record.Bindings[i]))
Expand Down Expand Up @@ -266,7 +283,7 @@ func (h *BindHandle) DropBindRecord(sctx sessionctx.Context, is infoschema.InfoS
if oldBindRecord == nil {
continue
}
binding := oldBindRecord.FindUsingBinding(record.Bindings[i].id)
binding := oldBindRecord.FindBinding(record.Bindings[i].id)
if binding != nil {
bindingSQLs = append(bindingSQLs, binding.BindSQL)
}
Expand All @@ -276,44 +293,60 @@ func (h *BindHandle) DropBindRecord(sctx sessionctx.Context, is infoschema.InfoS
return err
}

// DropInvalidBindRecord execute the drop bindRecord task.
func (h *BindHandle) DropInvalidBindRecord() {
invalidBindRecordMap := copyInvalidBindRecordMap(h.invalidBindRecordMap.Load().(map[string]*invalidBindRecordMap))
for key, invalidBindRecord := range invalidBindRecordMap {
if invalidBindRecord.droppedTime.IsZero() {
err := h.DropBindRecord(nil, nil, invalidBindRecord.bindRecord)
// tmpBindRecordMap is used to temporarily save bind record changes.
// Those changes will be flushed into store periodically.
type tmpBindRecordMap struct {
sync.Mutex
atomic.Value
flushFunc func(record *BindRecord) error
}

func (tmpMap *tmpBindRecordMap) flushToStore() {
newMap := copyBindRecordUpdateMap(tmpMap.Load().(map[string]*bindRecordUpdate))
for key, bindRecord := range newMap {
if bindRecord.updateTime.IsZero() {
err := tmpMap.flushFunc(bindRecord.bindRecord)
if err != nil {
logutil.BgLogger().Error("DropInvalidBindRecord failed", zap.Error(err))
logutil.BgLogger().Error("flush bind record failed", zap.Error(err))
}
invalidBindRecord.droppedTime = time.Now()
bindRecord.updateTime = time.Now()
continue
}

if time.Since(invalidBindRecord.droppedTime) > 6*time.Second {
delete(invalidBindRecordMap, key)
updateMetrics(metrics.ScopeGlobal, invalidBindRecord.bindRecord, nil, false)
if time.Since(bindRecord.updateTime) > 6*time.Second {
delete(newMap, key)
updateMetrics(metrics.ScopeGlobal, bindRecord.bindRecord, nil, false)
}
}
h.invalidBindRecordMap.Store(invalidBindRecordMap)
tmpMap.Store(newMap)
}

// AddDropInvalidBindTask add bindRecord to invalidBindRecordMap when the bindRecord need to be deleted.
func (h *BindHandle) AddDropInvalidBindTask(invalidBindRecord *BindRecord) {
key := invalidBindRecord.OriginalSQL + ":" + invalidBindRecord.Db
if _, ok := h.invalidBindRecordMap.Value.Load().(map[string]*invalidBindRecordMap)[key]; ok {
func (tmpMap *tmpBindRecordMap) saveToCache(bindRecord *BindRecord) {
key := bindRecord.OriginalSQL + ":" + bindRecord.Db + ":" + bindRecord.Bindings[0].id
if _, ok := tmpMap.Load().(map[string]*bindRecordUpdate)[key]; ok {
return
}
h.invalidBindRecordMap.Lock()
defer h.invalidBindRecordMap.Unlock()
if _, ok := h.invalidBindRecordMap.Value.Load().(map[string]*invalidBindRecordMap)[key]; ok {
tmpMap.Lock()
defer tmpMap.Unlock()
if _, ok := tmpMap.Load().(map[string]*bindRecordUpdate)[key]; ok {
return
}
newMap := copyInvalidBindRecordMap(h.invalidBindRecordMap.Value.Load().(map[string]*invalidBindRecordMap))
newMap[key] = &invalidBindRecordMap{
bindRecord: invalidBindRecord,
newMap := copyBindRecordUpdateMap(tmpMap.Load().(map[string]*bindRecordUpdate))
newMap[key] = &bindRecordUpdate{
bindRecord: bindRecord,
}
h.invalidBindRecordMap.Store(newMap)
updateMetrics(metrics.ScopeGlobal, nil, invalidBindRecord, false)
tmpMap.Store(newMap)
updateMetrics(metrics.ScopeGlobal, nil, bindRecord, false)
}

// DropInvalidBindRecord execute the drop bindRecord task.
func (h *BindHandle) DropInvalidBindRecord() {
h.invalidBindRecordMap.flushToStore()
}

// AddDropInvalidBindTask add bindRecord to invalidBindRecordMap when the bindRecord need to be deleted.
func (h *BindHandle) AddDropInvalidBindTask(invalidBindRecord *BindRecord) {
h.invalidBindRecordMap.saveToCache(invalidBindRecord)
}

// Size return the size of bind info cache.
Expand Down Expand Up @@ -360,6 +393,7 @@ func (h *BindHandle) newBindRecord(row chunk.Row) (string, *BindRecord, error) {
if err != nil {
return "", nil, err
}
h.sctx.GetSessionVars().StmtCtx.TimeZone = h.sctx.GetSessionVars().TimeZone
h.sctx.GetSessionVars().CurrentDB = bindRecord.Db
err = bindRecord.prepareHints(h.sctx.Context, h.sctx.GetSessionVars().TxnCtx.InfoSchema.(infoschema.InfoSchema))
return hash, bindRecord, err
Expand Down Expand Up @@ -431,8 +465,8 @@ func (c cache) copy() cache {
return newCache
}

func copyInvalidBindRecordMap(oldMap map[string]*invalidBindRecordMap) map[string]*invalidBindRecordMap {
newMap := make(map[string]*invalidBindRecordMap, len(oldMap))
func copyBindRecordUpdateMap(oldMap map[string]*bindRecordUpdate) map[string]*bindRecordUpdate {
newMap := make(map[string]*bindRecordUpdate, len(oldMap))
for k, v := range oldMap {
newMap[k] = v
}
Expand All @@ -453,7 +487,7 @@ func (c cache) getBindRecord(hash, normdOrigSQL, db string) *BindRecord {

func (h *BindHandle) deleteBindInfoSQL(normdOrigSQL, db, bindSQL string) string {
return fmt.Sprintf(
`DELETE FROM mysql.bind_info WHERE original_sql=%s AND default_db=%s AND bind_sql = %s`,
`DELETE FROM mysql.bind_info WHERE original_sql=%s AND default_db=%s AND bind_sql=%s`,
expression.Quote(normdOrigSQL),
expression.Quote(db),
expression.Quote(bindSQL),
Expand Down Expand Up @@ -536,9 +570,25 @@ func (h *BindHandle) CaptureBaselines() {
}
}

// AddEvolvePlanTask adds the evolve plan task into memory cache. It would be flushed to store periodically.
func (h *BindHandle) AddEvolvePlanTask(originalSQL, DB string, binding Binding, planHint string) {
binding.id = planHint
br := &BindRecord{
OriginalSQL: originalSQL,
Db: DB,
Bindings: []Binding{binding},
}
h.pendingVerifyBindRecordMap.saveToCache(br)
}

// SaveEvolveTasksToStore saves the evolve task into store.
func (h *BindHandle) SaveEvolveTasksToStore() {
h.pendingVerifyBindRecordMap.flushToStore()
}

// Clear resets the bind handle. It is used for test.
func (h *BindHandle) Clear() {
h.bindInfo.Store(make(cache))
h.invalidBindRecordMap.Store(make(map[string]*invalidBindRecordMap))
h.invalidBindRecordMap.Store(make(map[string]*bindRecordUpdate))
h.lastUpdateTime = types.ZeroTimestamp
}
1 change: 1 addition & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,7 @@ func (do *Domain) globalBindHandleWorkerLoop() {
continue
}
do.bindHandle.CaptureBaselines()
do.bindHandle.SaveEvolveTasksToStore()
}
}
}()
Expand Down
Loading

0 comments on commit 517c321

Please sign in to comment.