diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 822affc6a6f38..b4c777a20c638 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -1469,19 +1469,6 @@ func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P return joins, true } -func getAllDataSourceTotalRowSize(plan LogicalPlan) float64 { - if ds, ok := plan.(*DataSource); ok { - rowCount := ds.statsInfo().Count() - rowSize := ds.TblColHists.GetTableAvgRowSize(ds.ctx, ds.schema.Columns, kv.StoreType(ds.preferStoreType), ds.handleCol != nil) - return float64(rowCount) * rowSize - } - ret := float64(0) - for _, child := range plan.Children() { - ret += getAllDataSourceTotalRowSize(child) - } - return ret -} - func (p *LogicalJoin) tryToGetBroadCastJoin(prop *property.PhysicalProperty) []PhysicalPlan { if !prop.IsEmpty() { return nil @@ -1494,6 +1481,15 @@ func (p *LogicalJoin) tryToGetBroadCastJoin(prop *property.PhysicalProperty) []P return nil } + if hasPrefer, idx := p.getPreferredBCJLocalIndex(); hasPrefer { + return p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 1-idx) + } + results := p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 0) + results = append(results, p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 1)...) + return results +} + +func (p *LogicalJoin) tryToGetBroadCastJoinByPreferGlobalIdx(prop *property.PhysicalProperty, preferredGlobalIndex int) []PhysicalPlan { lkeys, rkeys := p.GetJoinKeys() baseJoin := basePhysicalJoin{ JoinType: p.JoinType, @@ -1508,18 +1504,6 @@ func (p *LogicalJoin) tryToGetBroadCastJoin(prop *property.PhysicalProperty) []P if p.children[0].statsInfo().Count() > p.children[1].statsInfo().Count() { preferredBuildIndex = 1 } - preferredGlobalIndex := preferredBuildIndex - if prop.TaskTp != property.CopTiFlashGlobalReadTaskType { - if hasPrefer, idx := p.getPreferredBCJLocalIndex(); hasPrefer { - preferredGlobalIndex = 1 - idx - } else if getAllDataSourceTotalRowSize(p.children[preferredGlobalIndex]) > getAllDataSourceTotalRowSize(p.children[1-preferredGlobalIndex]) { - preferredGlobalIndex = 1 - preferredGlobalIndex - } - } - // todo: currently, build side is the one has less rowcount and global read side - // is the one has less datasource row size(which mean less remote read), need - // to use cbo to decide the build side and global read side if preferred build index - // is not equal to preferred global index baseJoin.InnerChildIdx = preferredBuildIndex childrenReqProps := make([]*property.PhysicalProperty, 2) childrenReqProps[preferredGlobalIndex] = &property.PhysicalProperty{TaskTp: property.CopTiFlashGlobalReadTaskType, ExpectedCnt: math.MaxFloat64} diff --git a/planner/core/task.go b/planner/core/task.go index 27b3b249d6380..6b86157fedc5c 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -552,10 +552,8 @@ func (p *PhysicalBroadCastJoin) GetCost(lCnt, rCnt float64) float64 { } numPairs := helper.estimate() probeCost := numPairs * sessVars.CopCPUFactor - // should divided by the cop concurrency, which is decide by TiFlash, but TiDB - // can not get the information from TiFlash, so just use `sessVars.HashJoinConcurrency` - // as a workaround - probeCost /= float64(sessVars.HashJoinConcurrency) + // should divided by the concurrency in tiflash, which should be the number of core in tiflash nodes. + probeCost /= float64(sessVars.CopTiFlashConcurrencyFactor) cpuCost += probeCost // todo since TiFlash join is significant faster than TiDB join, maybe diff --git a/planner/core/testdata/integration_serial_suite_out.json b/planner/core/testdata/integration_serial_suite_out.json index 864fae552070c..1878ec443b07e 100644 --- a/planner/core/testdata/integration_serial_suite_out.json +++ b/planner/core/testdata/integration_serial_suite_out.json @@ -26,33 +26,33 @@ { "SQL": "explain select /*+ tidb_bcj(fact_t,d1_t) */ count(*) from fact_t, d1_t where fact_t.d1_k = d1_t.d1_k", "Plan": [ - "StreamAgg_25 1.00 root funcs:count(Column#14)->Column#11", - "└─TableReader_26 1.00 root data:StreamAgg_13", + "StreamAgg_32 1.00 root funcs:count(Column#14)->Column#11", + "└─TableReader_33 1.00 root data:StreamAgg_13", " └─StreamAgg_13 1.00 cop[tiflash] funcs:count(1)->Column#14", - " └─TypeBroadcastJoin_24 8.00 cop[tiflash] ", - " ├─Selection_18(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))", - " │ └─TableFullScan_17 2.00 cop[tiflash] table:d1_t keep order:false, global read", - " └─Selection_16(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k))", - " └─TableFullScan_15 8.00 cop[tiflash] table:fact_t keep order:false" + " └─TypeBroadcastJoin_31 8.00 cop[tiflash] ", + " ├─Selection_23(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))", + " │ └─TableFullScan_22 2.00 cop[tiflash] table:d1_t keep order:false, global read", + " └─Selection_21(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k))", + " └─TableFullScan_20 8.00 cop[tiflash] table:fact_t keep order:false" ] }, { "SQL": "explain select /*+ tidb_bcj(fact_t,d1_t,d2_t,d3_t) */ count(*) from fact_t, d1_t, d2_t, d3_t where fact_t.d1_k = d1_t.d1_k and fact_t.d2_k = d2_t.d2_k and fact_t.d3_k = d3_t.d3_k", "Plan": [ - "StreamAgg_35 1.00 root funcs:count(Column#20)->Column#17", - "└─TableReader_36 1.00 root data:StreamAgg_17", + "StreamAgg_52 1.00 root funcs:count(Column#20)->Column#17", + "└─TableReader_53 1.00 root data:StreamAgg_17", " └─StreamAgg_17 1.00 cop[tiflash] funcs:count(1)->Column#20", - " └─TypeBroadcastJoin_34 8.00 cop[tiflash] ", - " ├─Selection_28(Build) 2.00 cop[tiflash] not(isnull(test.d3_t.d3_k))", - " │ └─TableFullScan_27 2.00 cop[tiflash] table:d3_t keep order:false, global read", - " └─TypeBroadcastJoin_19(Probe) 8.00 cop[tiflash] ", - " ├─Selection_26(Build) 2.00 cop[tiflash] not(isnull(test.d2_t.d2_k))", - " │ └─TableFullScan_25 2.00 cop[tiflash] table:d2_t keep order:false, global read", - " └─TypeBroadcastJoin_20(Probe) 8.00 cop[tiflash] ", - " ├─Selection_24(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))", - " │ └─TableFullScan_23 2.00 cop[tiflash] table:d1_t keep order:false, global read", - " └─Selection_22(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k)), not(isnull(test.fact_t.d2_k)), not(isnull(test.fact_t.d3_k))", - " └─TableFullScan_21 8.00 cop[tiflash] table:fact_t keep order:false" + " └─TypeBroadcastJoin_51 8.00 cop[tiflash] ", + " ├─Selection_43(Build) 2.00 cop[tiflash] not(isnull(test.d3_t.d3_k))", + " │ └─TableFullScan_42 2.00 cop[tiflash] table:d3_t keep order:false, global read", + " └─TypeBroadcastJoin_33(Probe) 8.00 cop[tiflash] ", + " ├─Selection_29(Build) 2.00 cop[tiflash] not(isnull(test.d2_t.d2_k))", + " │ └─TableFullScan_28 2.00 cop[tiflash] table:d2_t keep order:false, global read", + " └─TypeBroadcastJoin_37(Probe) 8.00 cop[tiflash] ", + " ├─Selection_27(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))", + " │ └─TableFullScan_26 2.00 cop[tiflash] table:d1_t keep order:false, global read", + " └─Selection_41(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k)), not(isnull(test.fact_t.d2_k)), not(isnull(test.fact_t.d3_k))", + " └─TableFullScan_40 8.00 cop[tiflash] table:fact_t keep order:false" ] }, { @@ -71,20 +71,20 @@ { "SQL": "explain select /*+ tidb_bcj(fact_t,d1_t,d2_t,d3_t), bcj_local(d2_t) */ count(*) from fact_t, d1_t, d2_t, d3_t where fact_t.d1_k = d1_t.d1_k and fact_t.d2_k = d2_t.d2_k and fact_t.d3_k = d3_t.d3_k", "Plan": [ - "StreamAgg_35 1.00 root funcs:count(Column#20)->Column#17", - "└─TableReader_36 1.00 root data:StreamAgg_17", + "StreamAgg_36 1.00 root funcs:count(Column#20)->Column#17", + "└─TableReader_37 1.00 root data:StreamAgg_17", " └─StreamAgg_17 1.00 cop[tiflash] funcs:count(1)->Column#20", - " └─TypeBroadcastJoin_34 8.00 cop[tiflash] ", - " ├─Selection_28(Build) 2.00 cop[tiflash] not(isnull(test.d3_t.d3_k))", - " │ └─TableFullScan_27 2.00 cop[tiflash] table:d3_t keep order:false, global read", + " └─TypeBroadcastJoin_35 8.00 cop[tiflash] ", + " ├─Selection_29(Build) 2.00 cop[tiflash] not(isnull(test.d3_t.d3_k))", + " │ └─TableFullScan_28 2.00 cop[tiflash] table:d3_t keep order:false, global read", " └─TypeBroadcastJoin_19(Probe) 8.00 cop[tiflash] ", - " ├─Selection_26(Build) 2.00 cop[tiflash] not(isnull(test.d2_t.d2_k))", - " │ └─TableFullScan_25 2.00 cop[tiflash] table:d2_t keep order:false", + " ├─Selection_27(Build) 2.00 cop[tiflash] not(isnull(test.d2_t.d2_k))", + " │ └─TableFullScan_26 2.00 cop[tiflash] table:d2_t keep order:false", " └─TypeBroadcastJoin_20(Probe) 8.00 cop[tiflash] ", - " ├─Selection_24(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))", - " │ └─TableFullScan_23 2.00 cop[tiflash] table:d1_t keep order:false, global read", - " └─Selection_22(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k)), not(isnull(test.fact_t.d2_k)), not(isnull(test.fact_t.d3_k))", - " └─TableFullScan_21 8.00 cop[tiflash] table:fact_t keep order:false, global read" + " ├─Selection_25(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))", + " │ └─TableFullScan_24 2.00 cop[tiflash] table:d1_t keep order:false, global read", + " └─Selection_23(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k)), not(isnull(test.fact_t.d2_k)), not(isnull(test.fact_t.d3_k))", + " └─TableFullScan_22 8.00 cop[tiflash] table:fact_t keep order:false, global read" ] } ] diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 187393d11e553..639c0efa08e98 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -407,6 +407,8 @@ type SessionVars struct { CPUFactor float64 // CopCPUFactor is the CPU cost of processing one expression for one row in coprocessor. CopCPUFactor float64 + // CopTiFlashConcurrencyFactor is the concurrency number of computation in tiflash coprocessor. + CopTiFlashConcurrencyFactor float64 // NetworkFactor is the network cost of transferring 1 byte data. NetworkFactor float64 // ScanFactor is the IO cost of scanning 1 byte data on TiKV and TiFlash. @@ -663,6 +665,7 @@ func NewSessionVars() *SessionVars { CorrelationExpFactor: DefOptCorrelationExpFactor, CPUFactor: DefOptCPUFactor, CopCPUFactor: DefOptCopCPUFactor, + CopTiFlashConcurrencyFactor: DefOptTiFlashConcurrencyFactor, NetworkFactor: DefOptNetworkFactor, ScanFactor: DefOptScanFactor, DescScanFactor: DefOptDescScanFactor, @@ -1077,6 +1080,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.CPUFactor = tidbOptFloat64(val, DefOptCPUFactor) case TiDBOptCopCPUFactor: s.CopCPUFactor = tidbOptFloat64(val, DefOptCopCPUFactor) + case TiDBOptTiFlashConcurrencyFactor: + s.CopTiFlashConcurrencyFactor = tidbOptFloat64(val, DefOptTiFlashConcurrencyFactor) case TiDBOptNetworkFactor: s.NetworkFactor = tidbOptFloat64(val, DefOptNetworkFactor) case TiDBOptScanFactor: diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index bf9bf549f84d1..2686e09b46503 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -623,6 +623,7 @@ var defaultSysVars = []*SysVar{ {ScopeGlobal | ScopeSession, TiDBOptCorrelationThreshold, strconv.FormatFloat(DefOptCorrelationThreshold, 'f', -1, 64)}, {ScopeGlobal | ScopeSession, TiDBOptCorrelationExpFactor, strconv.Itoa(DefOptCorrelationExpFactor)}, {ScopeGlobal | ScopeSession, TiDBOptCPUFactor, strconv.FormatFloat(DefOptCPUFactor, 'f', -1, 64)}, + {ScopeGlobal | ScopeSession, TiDBOptTiFlashConcurrencyFactor, strconv.FormatFloat(DefOptTiFlashConcurrencyFactor, 'f', -1, 64)}, {ScopeGlobal | ScopeSession, TiDBOptCopCPUFactor, strconv.FormatFloat(DefOptCopCPUFactor, 'f', -1, 64)}, {ScopeGlobal | ScopeSession, TiDBOptNetworkFactor, strconv.FormatFloat(DefOptNetworkFactor, 'f', -1, 64)}, {ScopeGlobal | ScopeSession, TiDBOptScanFactor, strconv.FormatFloat(DefOptScanFactor, 'f', -1, 64)}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 1b4a67f563b4c..6b401f2647f52 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -199,6 +199,8 @@ const ( TiDBOptCPUFactor = "tidb_opt_cpu_factor" // tidb_opt_copcpu_factor is the CPU cost of processing one expression for one row in coprocessor. TiDBOptCopCPUFactor = "tidb_opt_copcpu_factor" + // tidb_opt_tiflash_concurrency_factor is concurrency number of tiflash computation. + TiDBOptTiFlashConcurrencyFactor = "tidb_opt_tiflash_concurrency_factor" // tidb_opt_network_factor is the network cost of transferring 1 byte data. TiDBOptNetworkFactor = "tidb_opt_network_factor" // tidb_opt_scan_factor is the IO cost of scanning 1 byte data on TiKV. @@ -420,6 +422,7 @@ const ( DefOptCorrelationExpFactor = 1 DefOptCPUFactor = 3.0 DefOptCopCPUFactor = 3.0 + DefOptTiFlashConcurrencyFactor = 24.0 DefOptNetworkFactor = 1.0 DefOptScanFactor = 1.5 DefOptDescScanFactor = 3.0 diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index 582f60e4f07a0..fdde558b2eeff 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -546,6 +546,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string, scope Sc } return value, nil case TiDBOptCPUFactor, + TiDBOptTiFlashConcurrencyFactor, TiDBOptCopCPUFactor, TiDBOptNetworkFactor, TiDBOptScanFactor, diff --git a/sessionctx/variable/varsutil_test.go b/sessionctx/variable/varsutil_test.go index 145ff229890b4..71a628dc43f54 100644 --- a/sessionctx/variable/varsutil_test.go +++ b/sessionctx/variable/varsutil_test.go @@ -325,6 +325,14 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) { c.Assert(val, Equals, "5.0") c.Assert(v.CopCPUFactor, Equals, 5.0) + c.Assert(v.CopTiFlashConcurrencyFactor, Equals, 24.0) + err = SetSessionSystemVar(v, TiDBOptTiFlashConcurrencyFactor, types.NewStringDatum("5.0")) + c.Assert(err, IsNil) + val, err = GetSessionSystemVar(v, TiDBOptTiFlashConcurrencyFactor) + c.Assert(err, IsNil) + c.Assert(val, Equals, "5.0") + c.Assert(v.CopCPUFactor, Equals, 5.0) + c.Assert(v.NetworkFactor, Equals, 1.0) err = SetSessionSystemVar(v, TiDBOptNetworkFactor, types.NewStringDatum("3.0")) c.Assert(err, IsNil) @@ -515,6 +523,7 @@ func (s *testVarsutilSuite) TestValidate(c *C) { {TiDBOptCorrelationThreshold, "-2", true}, {TiDBOptCPUFactor, "a", true}, {TiDBOptCPUFactor, "-2", true}, + {TiDBOptTiFlashConcurrencyFactor, "-2", true}, {TiDBOptCopCPUFactor, "a", true}, {TiDBOptCopCPUFactor, "-2", true}, {TiDBOptNetworkFactor, "a", true},