Skip to content

Commit

Permalink
Merge pull request pingcap#13 from hanfei1991/hanfei/join-merge-plan
Browse files Browse the repository at this point in the history
refine planner
  • Loading branch information
windtalker authored May 13, 2020
2 parents 694fa00 + 1ce94ee commit a469308
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 60 deletions.
34 changes: 9 additions & 25 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1469,19 +1469,6 @@ func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P
return joins, true
}

func getAllDataSourceTotalRowSize(plan LogicalPlan) float64 {
if ds, ok := plan.(*DataSource); ok {
rowCount := ds.statsInfo().Count()
rowSize := ds.TblColHists.GetTableAvgRowSize(ds.ctx, ds.schema.Columns, kv.StoreType(ds.preferStoreType), ds.handleCol != nil)
return float64(rowCount) * rowSize
}
ret := float64(0)
for _, child := range plan.Children() {
ret += getAllDataSourceTotalRowSize(child)
}
return ret
}

func (p *LogicalJoin) tryToGetBroadCastJoin(prop *property.PhysicalProperty) []PhysicalPlan {
if !prop.IsEmpty() {
return nil
Expand All @@ -1494,6 +1481,15 @@ func (p *LogicalJoin) tryToGetBroadCastJoin(prop *property.PhysicalProperty) []P
return nil
}

if hasPrefer, idx := p.getPreferredBCJLocalIndex(); hasPrefer {
return p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 1-idx)
}
results := p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 0)
results = append(results, p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 1)...)
return results
}

func (p *LogicalJoin) tryToGetBroadCastJoinByPreferGlobalIdx(prop *property.PhysicalProperty, preferredGlobalIndex int) []PhysicalPlan {
lkeys, rkeys := p.GetJoinKeys()
baseJoin := basePhysicalJoin{
JoinType: p.JoinType,
Expand All @@ -1508,18 +1504,6 @@ func (p *LogicalJoin) tryToGetBroadCastJoin(prop *property.PhysicalProperty) []P
if p.children[0].statsInfo().Count() > p.children[1].statsInfo().Count() {
preferredBuildIndex = 1
}
preferredGlobalIndex := preferredBuildIndex
if prop.TaskTp != property.CopTiFlashGlobalReadTaskType {
if hasPrefer, idx := p.getPreferredBCJLocalIndex(); hasPrefer {
preferredGlobalIndex = 1 - idx
} else if getAllDataSourceTotalRowSize(p.children[preferredGlobalIndex]) > getAllDataSourceTotalRowSize(p.children[1-preferredGlobalIndex]) {
preferredGlobalIndex = 1 - preferredGlobalIndex
}
}
// todo: currently, build side is the one has less rowcount and global read side
// is the one has less datasource row size(which mean less remote read), need
// to use cbo to decide the build side and global read side if preferred build index
// is not equal to preferred global index
baseJoin.InnerChildIdx = preferredBuildIndex
childrenReqProps := make([]*property.PhysicalProperty, 2)
childrenReqProps[preferredGlobalIndex] = &property.PhysicalProperty{TaskTp: property.CopTiFlashGlobalReadTaskType, ExpectedCnt: math.MaxFloat64}
Expand Down
6 changes: 2 additions & 4 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,10 +552,8 @@ func (p *PhysicalBroadCastJoin) GetCost(lCnt, rCnt float64) float64 {
}
numPairs := helper.estimate()
probeCost := numPairs * sessVars.CopCPUFactor
// should divided by the cop concurrency, which is decide by TiFlash, but TiDB
// can not get the information from TiFlash, so just use `sessVars.HashJoinConcurrency`
// as a workaround
probeCost /= float64(sessVars.HashJoinConcurrency)
// should divided by the concurrency in tiflash, which should be the number of core in tiflash nodes.
probeCost /= float64(sessVars.CopTiFlashConcurrencyFactor)
cpuCost += probeCost

// todo since TiFlash join is significant faster than TiDB join, maybe
Expand Down
62 changes: 31 additions & 31 deletions planner/core/testdata/integration_serial_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,33 +26,33 @@
{
"SQL": "explain select /*+ tidb_bcj(fact_t,d1_t) */ count(*) from fact_t, d1_t where fact_t.d1_k = d1_t.d1_k",
"Plan": [
"StreamAgg_25 1.00 root funcs:count(Column#14)->Column#11",
"└─TableReader_26 1.00 root data:StreamAgg_13",
"StreamAgg_32 1.00 root funcs:count(Column#14)->Column#11",
"└─TableReader_33 1.00 root data:StreamAgg_13",
" └─StreamAgg_13 1.00 cop[tiflash] funcs:count(1)->Column#14",
" └─TypeBroadcastJoin_24 8.00 cop[tiflash] ",
" ├─Selection_18(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))",
" │ └─TableFullScan_17 2.00 cop[tiflash] table:d1_t keep order:false, global read",
" └─Selection_16(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k))",
" └─TableFullScan_15 8.00 cop[tiflash] table:fact_t keep order:false"
" └─TypeBroadcastJoin_31 8.00 cop[tiflash] ",
" ├─Selection_23(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))",
" │ └─TableFullScan_22 2.00 cop[tiflash] table:d1_t keep order:false, global read",
" └─Selection_21(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k))",
" └─TableFullScan_20 8.00 cop[tiflash] table:fact_t keep order:false"
]
},
{
"SQL": "explain select /*+ tidb_bcj(fact_t,d1_t,d2_t,d3_t) */ count(*) from fact_t, d1_t, d2_t, d3_t where fact_t.d1_k = d1_t.d1_k and fact_t.d2_k = d2_t.d2_k and fact_t.d3_k = d3_t.d3_k",
"Plan": [
"StreamAgg_35 1.00 root funcs:count(Column#20)->Column#17",
"└─TableReader_36 1.00 root data:StreamAgg_17",
"StreamAgg_52 1.00 root funcs:count(Column#20)->Column#17",
"└─TableReader_53 1.00 root data:StreamAgg_17",
" └─StreamAgg_17 1.00 cop[tiflash] funcs:count(1)->Column#20",
" └─TypeBroadcastJoin_34 8.00 cop[tiflash] ",
" ├─Selection_28(Build) 2.00 cop[tiflash] not(isnull(test.d3_t.d3_k))",
" │ └─TableFullScan_27 2.00 cop[tiflash] table:d3_t keep order:false, global read",
" └─TypeBroadcastJoin_19(Probe) 8.00 cop[tiflash] ",
" ├─Selection_26(Build) 2.00 cop[tiflash] not(isnull(test.d2_t.d2_k))",
" │ └─TableFullScan_25 2.00 cop[tiflash] table:d2_t keep order:false, global read",
" └─TypeBroadcastJoin_20(Probe) 8.00 cop[tiflash] ",
" ├─Selection_24(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))",
" │ └─TableFullScan_23 2.00 cop[tiflash] table:d1_t keep order:false, global read",
" └─Selection_22(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k)), not(isnull(test.fact_t.d2_k)), not(isnull(test.fact_t.d3_k))",
" └─TableFullScan_21 8.00 cop[tiflash] table:fact_t keep order:false"
" └─TypeBroadcastJoin_51 8.00 cop[tiflash] ",
" ├─Selection_43(Build) 2.00 cop[tiflash] not(isnull(test.d3_t.d3_k))",
" │ └─TableFullScan_42 2.00 cop[tiflash] table:d3_t keep order:false, global read",
" └─TypeBroadcastJoin_33(Probe) 8.00 cop[tiflash] ",
" ├─Selection_29(Build) 2.00 cop[tiflash] not(isnull(test.d2_t.d2_k))",
" │ └─TableFullScan_28 2.00 cop[tiflash] table:d2_t keep order:false, global read",
" └─TypeBroadcastJoin_37(Probe) 8.00 cop[tiflash] ",
" ├─Selection_27(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))",
" │ └─TableFullScan_26 2.00 cop[tiflash] table:d1_t keep order:false, global read",
" └─Selection_41(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k)), not(isnull(test.fact_t.d2_k)), not(isnull(test.fact_t.d3_k))",
" └─TableFullScan_40 8.00 cop[tiflash] table:fact_t keep order:false"
]
},
{
Expand All @@ -71,20 +71,20 @@
{
"SQL": "explain select /*+ tidb_bcj(fact_t,d1_t,d2_t,d3_t), bcj_local(d2_t) */ count(*) from fact_t, d1_t, d2_t, d3_t where fact_t.d1_k = d1_t.d1_k and fact_t.d2_k = d2_t.d2_k and fact_t.d3_k = d3_t.d3_k",
"Plan": [
"StreamAgg_35 1.00 root funcs:count(Column#20)->Column#17",
"└─TableReader_36 1.00 root data:StreamAgg_17",
"StreamAgg_36 1.00 root funcs:count(Column#20)->Column#17",
"└─TableReader_37 1.00 root data:StreamAgg_17",
" └─StreamAgg_17 1.00 cop[tiflash] funcs:count(1)->Column#20",
" └─TypeBroadcastJoin_34 8.00 cop[tiflash] ",
" ├─Selection_28(Build) 2.00 cop[tiflash] not(isnull(test.d3_t.d3_k))",
" │ └─TableFullScan_27 2.00 cop[tiflash] table:d3_t keep order:false, global read",
" └─TypeBroadcastJoin_35 8.00 cop[tiflash] ",
" ├─Selection_29(Build) 2.00 cop[tiflash] not(isnull(test.d3_t.d3_k))",
" │ └─TableFullScan_28 2.00 cop[tiflash] table:d3_t keep order:false, global read",
" └─TypeBroadcastJoin_19(Probe) 8.00 cop[tiflash] ",
" ├─Selection_26(Build) 2.00 cop[tiflash] not(isnull(test.d2_t.d2_k))",
" │ └─TableFullScan_25 2.00 cop[tiflash] table:d2_t keep order:false",
" ├─Selection_27(Build) 2.00 cop[tiflash] not(isnull(test.d2_t.d2_k))",
" │ └─TableFullScan_26 2.00 cop[tiflash] table:d2_t keep order:false",
" └─TypeBroadcastJoin_20(Probe) 8.00 cop[tiflash] ",
" ├─Selection_24(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))",
" │ └─TableFullScan_23 2.00 cop[tiflash] table:d1_t keep order:false, global read",
" └─Selection_22(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k)), not(isnull(test.fact_t.d2_k)), not(isnull(test.fact_t.d3_k))",
" └─TableFullScan_21 8.00 cop[tiflash] table:fact_t keep order:false, global read"
" ├─Selection_25(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))",
" │ └─TableFullScan_24 2.00 cop[tiflash] table:d1_t keep order:false, global read",
" └─Selection_23(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k)), not(isnull(test.fact_t.d2_k)), not(isnull(test.fact_t.d3_k))",
" └─TableFullScan_22 8.00 cop[tiflash] table:fact_t keep order:false, global read"
]
}
]
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,8 @@ type SessionVars struct {
CPUFactor float64
// CopCPUFactor is the CPU cost of processing one expression for one row in coprocessor.
CopCPUFactor float64
// CopTiFlashConcurrencyFactor is the concurrency number of computation in tiflash coprocessor.
CopTiFlashConcurrencyFactor float64
// NetworkFactor is the network cost of transferring 1 byte data.
NetworkFactor float64
// ScanFactor is the IO cost of scanning 1 byte data on TiKV and TiFlash.
Expand Down Expand Up @@ -663,6 +665,7 @@ func NewSessionVars() *SessionVars {
CorrelationExpFactor: DefOptCorrelationExpFactor,
CPUFactor: DefOptCPUFactor,
CopCPUFactor: DefOptCopCPUFactor,
CopTiFlashConcurrencyFactor: DefOptTiFlashConcurrencyFactor,
NetworkFactor: DefOptNetworkFactor,
ScanFactor: DefOptScanFactor,
DescScanFactor: DefOptDescScanFactor,
Expand Down Expand Up @@ -1077,6 +1080,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.CPUFactor = tidbOptFloat64(val, DefOptCPUFactor)
case TiDBOptCopCPUFactor:
s.CopCPUFactor = tidbOptFloat64(val, DefOptCopCPUFactor)
case TiDBOptTiFlashConcurrencyFactor:
s.CopTiFlashConcurrencyFactor = tidbOptFloat64(val, DefOptTiFlashConcurrencyFactor)
case TiDBOptNetworkFactor:
s.NetworkFactor = tidbOptFloat64(val, DefOptNetworkFactor)
case TiDBOptScanFactor:
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ var defaultSysVars = []*SysVar{
{ScopeGlobal | ScopeSession, TiDBOptCorrelationThreshold, strconv.FormatFloat(DefOptCorrelationThreshold, 'f', -1, 64)},
{ScopeGlobal | ScopeSession, TiDBOptCorrelationExpFactor, strconv.Itoa(DefOptCorrelationExpFactor)},
{ScopeGlobal | ScopeSession, TiDBOptCPUFactor, strconv.FormatFloat(DefOptCPUFactor, 'f', -1, 64)},
{ScopeGlobal | ScopeSession, TiDBOptTiFlashConcurrencyFactor, strconv.FormatFloat(DefOptTiFlashConcurrencyFactor, 'f', -1, 64)},
{ScopeGlobal | ScopeSession, TiDBOptCopCPUFactor, strconv.FormatFloat(DefOptCopCPUFactor, 'f', -1, 64)},
{ScopeGlobal | ScopeSession, TiDBOptNetworkFactor, strconv.FormatFloat(DefOptNetworkFactor, 'f', -1, 64)},
{ScopeGlobal | ScopeSession, TiDBOptScanFactor, strconv.FormatFloat(DefOptScanFactor, 'f', -1, 64)},
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ const (
TiDBOptCPUFactor = "tidb_opt_cpu_factor"
// tidb_opt_copcpu_factor is the CPU cost of processing one expression for one row in coprocessor.
TiDBOptCopCPUFactor = "tidb_opt_copcpu_factor"
// tidb_opt_tiflash_concurrency_factor is concurrency number of tiflash computation.
TiDBOptTiFlashConcurrencyFactor = "tidb_opt_tiflash_concurrency_factor"
// tidb_opt_network_factor is the network cost of transferring 1 byte data.
TiDBOptNetworkFactor = "tidb_opt_network_factor"
// tidb_opt_scan_factor is the IO cost of scanning 1 byte data on TiKV.
Expand Down Expand Up @@ -420,6 +422,7 @@ const (
DefOptCorrelationExpFactor = 1
DefOptCPUFactor = 3.0
DefOptCopCPUFactor = 3.0
DefOptTiFlashConcurrencyFactor = 24.0
DefOptNetworkFactor = 1.0
DefOptScanFactor = 1.5
DefOptDescScanFactor = 3.0
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string, scope Sc
}
return value, nil
case TiDBOptCPUFactor,
TiDBOptTiFlashConcurrencyFactor,
TiDBOptCopCPUFactor,
TiDBOptNetworkFactor,
TiDBOptScanFactor,
Expand Down
9 changes: 9 additions & 0 deletions sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,14 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) {
c.Assert(val, Equals, "5.0")
c.Assert(v.CopCPUFactor, Equals, 5.0)

c.Assert(v.CopTiFlashConcurrencyFactor, Equals, 24.0)
err = SetSessionSystemVar(v, TiDBOptTiFlashConcurrencyFactor, types.NewStringDatum("5.0"))
c.Assert(err, IsNil)
val, err = GetSessionSystemVar(v, TiDBOptTiFlashConcurrencyFactor)
c.Assert(err, IsNil)
c.Assert(val, Equals, "5.0")
c.Assert(v.CopCPUFactor, Equals, 5.0)

c.Assert(v.NetworkFactor, Equals, 1.0)
err = SetSessionSystemVar(v, TiDBOptNetworkFactor, types.NewStringDatum("3.0"))
c.Assert(err, IsNil)
Expand Down Expand Up @@ -515,6 +523,7 @@ func (s *testVarsutilSuite) TestValidate(c *C) {
{TiDBOptCorrelationThreshold, "-2", true},
{TiDBOptCPUFactor, "a", true},
{TiDBOptCPUFactor, "-2", true},
{TiDBOptTiFlashConcurrencyFactor, "-2", true},
{TiDBOptCopCPUFactor, "a", true},
{TiDBOptCopCPUFactor, "-2", true},
{TiDBOptNetworkFactor, "a", true},
Expand Down

0 comments on commit a469308

Please sign in to comment.