Skip to content

Commit

Permalink
planner: support lock/unlock stats of partition tables (#46768)
Browse files Browse the repository at this point in the history
ref #46351
  • Loading branch information
Rustin170506 authored Sep 15, 2023
1 parent beb2a36 commit 8b15111
Show file tree
Hide file tree
Showing 17 changed files with 6,857 additions and 6,126 deletions.
80 changes: 56 additions & 24 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,46 +173,80 @@ func filterAndCollectTasks(tasks []*analyzeTask, statsHandle *handle.Handle, inf
filteredTasks []*analyzeTask
skippedTables []string
needAnalyzeTableCnt uint
tids = make([]int64, 0, len(tasks))
// tidMap is used to deduplicate table IDs.
// In stats v1, analyze for each index is a single task, and they have the same table id.
tidMap = make(map[int64]struct{}, len(tasks))
tidAndPidsMap = make(map[int64]struct{}, len(tasks))
)

// Check the locked tables in one transaction.
for _, task := range tasks {
tableID := getTableIDFromTask(task)
tids = append(tids, tableID)
}
lockedTables, err := statsHandle.GetLockedTables(tids...)
lockedTableAndPartitionIDs, err := getLockedTableAndPartitionIDs(statsHandle, tasks)
if err != nil {
return nil, 0, nil, err
}

for _, task := range tasks {
// Check if the table or partition is locked.
tableID := getTableIDFromTask(task)
_, isLocked := lockedTables[tableID]
_, isLocked := lockedTableAndPartitionIDs[tableID.TableID]
// If the whole table is not locked, we should check whether the partition is locked.
if !isLocked && tableID.IsPartitionTable() {
_, isLocked = lockedTableAndPartitionIDs[tableID.PartitionID]
}

// Only analyze the table that is not locked.
if !isLocked {
filteredTasks = append(filteredTasks, task)
}
if _, ok := tidMap[tableID]; !ok {

// Get the physical table ID.
physicalTableID := tableID.TableID
if tableID.IsPartitionTable() {
physicalTableID = tableID.PartitionID
}
if _, ok := tidAndPidsMap[physicalTableID]; !ok {
if isLocked {
tbl, ok := infoSchema.TableByID(tableID)
if !ok {
logutil.BgLogger().Warn("Unknown table ID in analyze task", zap.Int64("tid", tableID))
if tableID.IsPartitionTable() {
tbl, _, def := infoSchema.FindTableByPartitionID(tableID.PartitionID)
if def == nil {
logutil.BgLogger().Warn("Unknown partition ID in analyze task", zap.Int64("pid", tableID.PartitionID))
} else {
schema, _ := infoSchema.SchemaByTable(tbl.Meta())
skippedTables = append(skippedTables, fmt.Sprintf("%s.%s partition (%s)", schema.Name, tbl.Meta().Name.O, def.Name.O))
}
} else {
skippedTables = append(skippedTables, tbl.Meta().Name.L)
tbl, ok := infoSchema.TableByID(physicalTableID)
if !ok {
logutil.BgLogger().Warn("Unknown table ID in analyze task", zap.Int64("tid", physicalTableID))
} else {
schema, _ := infoSchema.SchemaByTable(tbl.Meta())
skippedTables = append(skippedTables, fmt.Sprintf("%s.%s", schema.Name, tbl.Meta().Name.O))
}
}
} else {
needAnalyzeTableCnt++
}
tidMap[tableID] = struct{}{}
tidAndPidsMap[physicalTableID] = struct{}{}
}
}

return filteredTasks, needAnalyzeTableCnt, skippedTables, nil
}

// getLockedTableAndPartitionIDs queries the locked tables and partitions.
func getLockedTableAndPartitionIDs(statsHandle *handle.Handle, tasks []*analyzeTask) (map[int64]struct{}, error) {
tidAndPids := make([]int64, 0, len(tasks))
// Check the locked tables in one transaction.
// We need to check all tables and its partitions.
// Because if the whole table is locked, we should skip all partitions.
for _, task := range tasks {
tableID := getTableIDFromTask(task)
tidAndPids = append(tidAndPids, tableID.TableID)
if tableID.IsPartitionTable() {
tidAndPids = append(tidAndPids, tableID.PartitionID)
}
}
return statsHandle.GetLockedTables(tidAndPids...)
}

// warnLockedTableMsg warns the locked table IDs.
func warnLockedTableMsg(sessionVars *variable.SessionVars, needAnalyzeTableCnt uint, skippedTables []string) {
if len(skippedTables) > 0 {
Expand All @@ -230,23 +264,21 @@ func warnLockedTableMsg(sessionVars *variable.SessionVars, needAnalyzeTableCnt u
}
}

func getTableIDFromTask(task *analyzeTask) int64 {
var tableID statistics.AnalyzeTableID

func getTableIDFromTask(task *analyzeTask) statistics.AnalyzeTableID {
switch task.taskType {
case colTask:
tableID = task.colExec.tableID
return task.colExec.tableID
case idxTask:
tableID = task.idxExec.tableID
return task.idxExec.tableID
case fastTask:
tableID = task.fastExec.tableID
return task.fastExec.tableID
case pkIncrementalTask:
tableID = task.colIncrementalExec.tableID
return task.colIncrementalExec.tableID
case idxIncrementalTask:
tableID = task.idxIncrementalExec.tableID
return task.idxIncrementalExec.tableID
}

return tableID.TableID
panic("unreachable")
}

func (e *AnalyzeExec) saveV2AnalyzeOpts() error {
Expand Down
2 changes: 2 additions & 0 deletions executor/lockstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ go_library(
"//executor/internal/exec",
"//infoschema",
"//parser/ast",
"//parser/model",
"//table/tables",
"//util/chunk",
"@com_github_pingcap_errors//:errors",
],
Expand Down
69 changes: 59 additions & 10 deletions executor/lockstats/lock_stats_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/pingcap/tidb/executor/internal/exec"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/util/chunk"
)

Expand All @@ -30,6 +32,9 @@ var _ exec.Executor = &LockExec{}
// LockExec represents a lock statistic executor.
type LockExec struct {
exec.BaseExecutor
// Tables is the list of tables to be locked.
// It might contain partition names if we are locking partitions.
// When locking partitions, Tables will only contain one table name.
Tables []*ast.TableName
}

Expand All @@ -45,22 +50,42 @@ func (e *LockExec) Next(_ context.Context, _ *chunk.Chunk) error {
}
is := do.InfoSchema()

tids, pids, err := populateTableAndPartitionIDs(e.Tables, is)
if err != nil {
return err
}
if e.onlyLockPartitions() {
tableName := e.Tables[0]
tid, pidNames, err := populatePartitionIDAndNames(tableName, tableName.PartitionNames, is)
if err != nil {
return err
}

msg, err := h.AddLockedTables(tids, pids, e.Tables)
if err != nil {
return err
}
if msg != "" {
e.Ctx().GetSessionVars().StmtCtx.AppendWarning(errors.New(msg))
msg, err := h.LockPartitions(tid, tableName, pidNames)
if err != nil {
return err
}
if msg != "" {
e.Ctx().GetSessionVars().StmtCtx.AppendWarning(errors.New(msg))
}
} else {
tids, pids, err := populateTableAndPartitionIDs(e.Tables, is)
if err != nil {
return err
}

msg, err := h.LockTables(tids, pids, e.Tables)
if err != nil {
return err
}
if msg != "" {
e.Ctx().GetSessionVars().StmtCtx.AppendWarning(errors.New(msg))
}
}

return nil
}

func (e *LockExec) onlyLockPartitions() bool {
return len(e.Tables) == 1 && len(e.Tables[0].PartitionNames) > 0
}

// Close implements the Executor Close interface.
func (*LockExec) Close() error {
return nil
Expand All @@ -71,6 +96,30 @@ func (*LockExec) Open(context.Context) error {
return nil
}

// populatePartitionIDAndNames returns the table ID and partition IDs for the given table name and partition names.
func populatePartitionIDAndNames(tableName *ast.TableName, partitionNames []model.CIStr, is infoschema.InfoSchema) (int64, map[int64]string, error) {
tbl, err := is.TableByName(tableName.Schema, tableName.Name)
if err != nil {
return 0, nil, err
}

pi := tbl.Meta().GetPartitionInfo()
if pi == nil {
return 0, nil, errors.Errorf("table %s is not a partition table", tableName.Name)
}

pidNames := make(map[int64]string, len(partitionNames))
for _, partitionName := range partitionNames {
pid, err := tables.FindPartitionByName(tbl.Meta(), partitionName.L)
if err != nil {
return 0, nil, err
}
pidNames[pid] = partitionName.L
}

return tbl.Meta().ID, pidNames, nil
}

// populateTableAndPartitionIDs returns table IDs and partition IDs for the given table names.
func populateTableAndPartitionIDs(tables []*ast.TableName, is infoschema.InfoSchema) ([]int64, []int64, error) {
tids := make([]int64, 0, len(tables))
Expand Down
43 changes: 32 additions & 11 deletions executor/lockstats/unlock_stats_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ var _ exec.Executor = &UnlockExec{}
// UnlockExec represents a unlock statistic executor.
type UnlockExec struct {
exec.BaseExecutor
// Tables is the list of tables to be unlocked.
// It might contain partition names if we are unlocking partitions.
// When unlocking partitions, Tables will only contain one table name.
Tables []*ast.TableName
}

Expand All @@ -44,22 +47,40 @@ func (e *UnlockExec) Next(context.Context, *chunk.Chunk) error {
}
is := do.InfoSchema()

tids, pids, err := populateTableAndPartitionIDs(e.Tables, is)
if err != nil {
return err
}

msg, err := h.RemoveLockedTables(tids, pids, e.Tables)
if err != nil {
return err
}
if msg != "" {
e.Ctx().GetSessionVars().StmtCtx.AppendWarning(errors.New(msg))
if e.onlyUnlockPartitions() {
tableName := e.Tables[0]
tid, pidNames, err := populatePartitionIDAndNames(tableName, tableName.PartitionNames, is)
if err != nil {
return err
}
msg, err := h.RemoveLockedPartitions(tid, tableName, pidNames)
if err != nil {
return err
}
if msg != "" {
e.Ctx().GetSessionVars().StmtCtx.AppendWarning(errors.New(msg))
}
} else {
tids, pids, err := populateTableAndPartitionIDs(e.Tables, is)
if err != nil {
return err
}
msg, err := h.RemoveLockedTables(tids, pids, e.Tables)
if err != nil {
return err
}
if msg != "" {
e.Ctx().GetSessionVars().StmtCtx.AppendWarning(errors.New(msg))
}
}

return nil
}

func (e *UnlockExec) onlyUnlockPartitions() bool {
return len(e.Tables) == 1 && len(e.Tables[0].PartitionNames) > 0
}

// Close implements the Executor Close interface.
func (*UnlockExec) Close() error {
return nil
Expand Down
Loading

0 comments on commit 8b15111

Please sign in to comment.