Skip to content

Commit

Permalink
planner: new cost formula for IndexJoin (#35671)
Browse files Browse the repository at this point in the history
ref #35240
  • Loading branch information
qw4990 authored Jun 24, 2022
1 parent 3c83cd3 commit 8d589d2
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 21 deletions.
78 changes: 60 additions & 18 deletions planner/core/plan_cost.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/planner/property"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/util/paging"
Expand Down Expand Up @@ -223,11 +224,8 @@ func (p *PhysicalIndexLookUpReader) GetPlanCost(taskType property.TaskType, cost
// table-side seek cost
p.planCost += estimateNetSeekCost(p.tablePlan)

if p.ctx.GetSessionVars().CostModelVersion == modelVer2 {
// accumulate the real double-read cost: numDoubleReadTasks * seekFactor
numDoubleReadTasks := p.estNumDoubleReadTasks(costFlag)
p.planCost += numDoubleReadTasks * p.ctx.GetSessionVars().GetSeekFactor(ts.Table)
}
// double read cost
p.planCost += p.estDoubleReadCost(ts.Table, costFlag)

// consider concurrency
p.planCost /= float64(p.ctx.GetSessionVars().DistSQLScanConcurrency())
Expand All @@ -238,14 +236,21 @@ func (p *PhysicalIndexLookUpReader) GetPlanCost(taskType property.TaskType, cost
return p.planCost, nil
}

func (p *PhysicalIndexLookUpReader) estNumDoubleReadTasks(costFlag uint64) float64 {
doubleReadRows := p.indexPlan.StatsCount()
func (p *PhysicalIndexLookUpReader) estDoubleReadCost(tbl *model.TableInfo, costFlag uint64) float64 {
if p.ctx.GetSessionVars().CostModelVersion == modelVer1 {
// only consider double-read cost on modelVer2
return 0
}
// estimate the double-read cost: (numDoubleReadTasks * seekFactor) / concurrency
doubleReadRows := getCardinality(p.indexPlan, costFlag)
batchSize := float64(p.ctx.GetSessionVars().IndexLookupSize)
concurrency := math.Max(1.0, float64(p.ctx.GetSessionVars().IndexLookupConcurrency()))
seekFactor := p.ctx.GetSessionVars().GetSeekFactor(tbl)
// distRatio indicates how many requests corresponding to a batch, current value is from experiments.
// TODO: estimate it by using index correlation or make it configurable.
distRatio := 40.0
numDoubleReadTasks := (doubleReadRows / batchSize) * distRatio
return numDoubleReadTasks // use Float64 instead of Int like `Ceil(...)` to make the cost continuous
numDoubleReadTasks := (doubleReadRows / batchSize) * distRatio // use Float64 instead of Int like `Ceil(...)` to make the cost continuous.
return (numDoubleReadTasks * seekFactor) / concurrency
}

// GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost.
Expand Down Expand Up @@ -453,7 +458,7 @@ func (p *PhysicalIndexScan) GetPlanCost(taskType property.TaskType, costFlag uin
}

// GetCost computes the cost of index join operator and its children.
func (p *PhysicalIndexJoin) GetCost(outerCnt, innerCnt float64, outerCost, innerCost float64) float64 {
func (p *PhysicalIndexJoin) GetCost(outerCnt, innerCnt, outerCost, innerCost float64, costFlag uint64) float64 {
var cpuCost float64
sessVars := p.ctx.GetSessionVars()
// Add the cost of evaluating outer filter, since inner filter of index join
Expand Down Expand Up @@ -490,6 +495,9 @@ func (p *PhysicalIndexJoin) GetCost(outerCnt, innerCnt float64, outerCost, inner
numPairs = 0
}
}
if hasCostFlag(costFlag, CostFlagUseTrueCardinality) {
numPairs = getOperatorActRows(p)
}
probeCost := numPairs * sessVars.GetCPUFactor()
// Cost of additional concurrent goroutines.
cpuCost += probeCost + (innerConcurrency+1.0)*sessVars.GetConcurrencyFactor()
Expand All @@ -498,7 +506,23 @@ func (p *PhysicalIndexJoin) GetCost(outerCnt, innerCnt float64, outerCost, inner
memoryCost := innerConcurrency * (batchSize * distinctFactor) * innerCnt * sessVars.GetMemoryFactor()
// Cost of inner child plan, i.e, mainly I/O and network cost.
innerPlanCost := outerCnt * innerCost
return outerCost + innerPlanCost + cpuCost + memoryCost
return outerCost + innerPlanCost + cpuCost + memoryCost + p.estDoubleReadCost(outerCnt)
}

func (p *PhysicalIndexJoin) estDoubleReadCost(doubleReadRows float64) float64 {
if p.ctx.GetSessionVars().CostModelVersion == modelVer1 {
// only consider double-read cost on modelVer2
return 0
}
// estimate the double read cost for IndexJoin: (double-read-tasks * seek-factor) / concurrency
seekFactor := p.ctx.GetSessionVars().GetSeekFactor(nil)
batchSize := math.Max(1.0, float64(p.ctx.GetSessionVars().IndexJoinBatchSize))
concurrency := math.Max(1.0, float64(p.ctx.GetSessionVars().IndexLookupJoinConcurrency()))
// distRatio indicates how many requests corresponding to a batch, current value is from experiments.
// TODO: estimate it by using index correlation or make it configurable.
distRatio := 40.0
numDoubleReadTasks := (doubleReadRows / batchSize) * distRatio
return (numDoubleReadTasks * seekFactor) / concurrency
}

// GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost.
Expand All @@ -517,13 +541,17 @@ func (p *PhysicalIndexJoin) GetPlanCost(taskType property.TaskType, costFlag uin
}
outerCnt := getCardinality(outerChild, costFlag)
innerCnt := getCardinality(innerChild, costFlag)
p.planCost = p.GetCost(outerCnt, innerCnt, outerCost, innerCost)
if hasCostFlag(costFlag, CostFlagUseTrueCardinality) && outerCnt > 0 {
innerCnt /= outerCnt // corresponding to one outer row when calculating IndexJoin costs
innerCost /= outerCnt
}
p.planCost = p.GetCost(outerCnt, innerCnt, outerCost, innerCost, costFlag)
p.planCostInit = true
return p.planCost, nil
}

// GetCost computes the cost of index merge join operator and its children.
func (p *PhysicalIndexHashJoin) GetCost(outerCnt, innerCnt, outerCost, innerCost float64) float64 {
func (p *PhysicalIndexHashJoin) GetCost(outerCnt, innerCnt, outerCost, innerCost float64, costFlag uint64) float64 {
var cpuCost float64
sessVars := p.ctx.GetSessionVars()
// Add the cost of evaluating outer filter, since inner filter of index join
Expand Down Expand Up @@ -561,6 +589,9 @@ func (p *PhysicalIndexHashJoin) GetCost(outerCnt, innerCnt, outerCost, innerCost
numPairs = 0
}
}
if hasCostFlag(costFlag, CostFlagUseTrueCardinality) {
numPairs = getOperatorActRows(p)
}
// Inner workers do hash join in parallel, but they can only save ONE outer
// batch results. So as the number of outer batch exceeds inner concurrency,
// it would fall back to linear execution. In a word, the hash join only runs
Expand All @@ -579,7 +610,7 @@ func (p *PhysicalIndexHashJoin) GetCost(outerCnt, innerCnt, outerCost, innerCost
memoryCost := concurrency * (batchSize * distinctFactor) * innerCnt * sessVars.GetMemoryFactor()
// Cost of inner child plan, i.e, mainly I/O and network cost.
innerPlanCost := outerCnt * innerCost
return outerCost + innerPlanCost + cpuCost + memoryCost
return outerCost + innerPlanCost + cpuCost + memoryCost + p.estDoubleReadCost(outerCnt)
}

// GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost.
Expand All @@ -598,13 +629,17 @@ func (p *PhysicalIndexHashJoin) GetPlanCost(taskType property.TaskType, costFlag
}
outerCnt := getCardinality(outerChild, costFlag)
innerCnt := getCardinality(innerChild, costFlag)
p.planCost = p.GetCost(outerCnt, innerCnt, outerCost, innerCost)
if hasCostFlag(costFlag, CostFlagUseTrueCardinality) && outerCnt > 0 {
innerCnt /= outerCnt // corresponding to one outer row when calculating IndexJoin costs
innerCost /= outerCnt
}
p.planCost = p.GetCost(outerCnt, innerCnt, outerCost, innerCost, costFlag)
p.planCostInit = true
return p.planCost, nil
}

// GetCost computes the cost of index merge join operator and its children.
func (p *PhysicalIndexMergeJoin) GetCost(outerCnt, innerCnt, outerCost, innerCost float64) float64 {
func (p *PhysicalIndexMergeJoin) GetCost(outerCnt, innerCnt, outerCost, innerCost float64, costFlag uint64) float64 {
var cpuCost float64
sessVars := p.ctx.GetSessionVars()
// Add the cost of evaluating outer filter, since inner filter of index join
Expand Down Expand Up @@ -644,6 +679,9 @@ func (p *PhysicalIndexMergeJoin) GetCost(outerCnt, innerCnt, outerCost, innerCos
numPairs = 0
}
}
if hasCostFlag(costFlag, CostFlagUseTrueCardinality) {
numPairs = getOperatorActRows(p)
}
avgProbeCnt := numPairs / outerCnt
var probeCost float64
// Inner workers do merge join in parallel, but they can only save ONE outer batch
Expand All @@ -662,7 +700,7 @@ func (p *PhysicalIndexMergeJoin) GetCost(outerCnt, innerCnt, outerCost, innerCos
memoryCost := innerConcurrency * (batchSize * avgProbeCnt) * sessVars.GetMemoryFactor()

innerPlanCost := outerCnt * innerCost
return outerCost + innerPlanCost + cpuCost + memoryCost
return outerCost + innerPlanCost + cpuCost + memoryCost + p.estDoubleReadCost(outerCnt)
}

// GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost.
Expand All @@ -681,7 +719,11 @@ func (p *PhysicalIndexMergeJoin) GetPlanCost(taskType property.TaskType, costFla
}
outerCnt := getCardinality(outerChild, costFlag)
innerCnt := getCardinality(innerChild, costFlag)
p.planCost = p.GetCost(outerCnt, innerCnt, outerCost, innerCost)
if hasCostFlag(costFlag, CostFlagUseTrueCardinality) && outerCnt > 0 {
innerCnt /= outerCnt // corresponding to one outer row when calculating IndexJoin costs
innerCost /= outerCnt
}
p.planCost = p.GetCost(outerCnt, innerCnt, outerCost, innerCost, costFlag)
p.planCostInit = true
return p.planCost, nil
}
Expand Down
6 changes: 3 additions & 3 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (p *PhysicalIndexMergeJoin) attach2Task(tasks ...task) task {
}
t := &rootTask{
p: p,
cst: p.GetCost(outerTask.count(), innerTask.count(), outerTask.cost(), innerTask.cost()),
cst: p.GetCost(outerTask.count(), innerTask.count(), outerTask.cost(), innerTask.cost(), 0),
}
p.cost = t.cost()
return t
Expand All @@ -282,7 +282,7 @@ func (p *PhysicalIndexHashJoin) attach2Task(tasks ...task) task {
}
t := &rootTask{
p: p,
cst: p.GetCost(outerTask.count(), innerTask.count(), outerTask.cost(), innerTask.cost()),
cst: p.GetCost(outerTask.count(), innerTask.count(), outerTask.cost(), innerTask.cost(), 0),
}
p.cost = t.cost()
return t
Expand All @@ -298,7 +298,7 @@ func (p *PhysicalIndexJoin) attach2Task(tasks ...task) task {
}
t := &rootTask{
p: p,
cst: p.GetCost(outerTask.count(), innerTask.count(), outerTask.cost(), innerTask.cost()),
cst: p.GetCost(outerTask.count(), innerTask.count(), outerTask.cost(), innerTask.cost(), 0),
}
p.cost = t.cost()
return t
Expand Down

0 comments on commit 8d589d2

Please sign in to comment.