Skip to content

Commit

Permalink
planner: fine grained collation control for mpp plan (#23584)
Browse files Browse the repository at this point in the history
  • Loading branch information
windtalker authored Mar 26, 2021
1 parent b3b1177 commit fe250b9
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 18 deletions.
3 changes: 1 addition & 2 deletions executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@ import (
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
)

func useMPPExecution(ctx sessionctx.Context, tr *plannercore.PhysicalTableReader) bool {
if !ctx.GetSessionVars().AllowMPPExecution || collate.NewCollationEnabled() {
if !ctx.GetSessionVars().AllowMPPExecution {
return false
}
_, ok := tr.GetTablePlan().(*plannercore.PhysicalExchangeSender)
Expand Down
4 changes: 2 additions & 2 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1666,7 +1666,7 @@ func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P
}
joins := make([]PhysicalPlan, 0, 8)
canPushToTiFlash := p.canPushToCop(kv.TiFlash)
if p.ctx.GetSessionVars().AllowMPPExecution && !collate.NewCollationEnabled() && canPushToTiFlash {
if p.ctx.GetSessionVars().AllowMPPExecution && canPushToTiFlash {
if p.shouldUseMPPBCJ() {
mppJoins := p.tryToGetMppHashJoin(prop, true)
if (p.preferJoinType & preferBCJoin) > 0 {
Expand Down Expand Up @@ -2356,7 +2356,7 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy
taskTypes = append(taskTypes, property.CopTiFlashLocalReadTaskType)
}
canPushDownToTiFlash := la.canPushToCop(kv.TiFlash)
canPushDownToMPP := la.ctx.GetSessionVars().AllowMPPExecution && !collate.NewCollationEnabled() && la.checkCanPushDownToMPP() && canPushDownToTiFlash
canPushDownToMPP := la.ctx.GetSessionVars().AllowMPPExecution && la.checkCanPushDownToMPP() && canPushDownToTiFlash
if la.HasDistinct() {
// TODO: remove after the cost estimation of distinct pushdown is implemented.
if !la.ctx.GetSessionVars().AllowDistinctAggPushDown {
Expand Down
51 changes: 48 additions & 3 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,13 +626,58 @@ func (s *testIntegrationSerialSuite) TestJoinNotSupportedByTiFlash(c *C) {
}
}

func (s *testIntegrationSerialSuite) TestMPPNotSupportedInNewCollation(c *C) {
func (s *testIntegrationSerialSuite) TestMPPWithHashExchangeUnderNewCollation(c *C) {
defer collate.SetNewCollationEnabledForTest(false)
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists table_1")
tk.MustExec("create table table_1(id int not null, value int)")
tk.MustExec("insert into table_1 values(1,1),(2,2)")
tk.MustExec("create table table_1(id int not null, value char(10))")
tk.MustExec("insert into table_1 values(1,'1'),(2,'2')")
tk.MustExec("analyze table table_1")

// Create virtual tiflash replica info.
dom := domain.GetDomain(tk.Se)
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
c.Assert(exists, IsTrue)
for _, tblInfo := range db.Tables {
if tblInfo.Name.L == "table_1" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}

collate.SetNewCollationEnabledForTest(true)
tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'")
tk.MustExec("set @@session.tidb_allow_mpp = 1")
tk.MustExec("set @@session.tidb_opt_broadcast_join = 0")
tk.MustExec("set @@session.tidb_broadcast_join_threshold_count = 0")
tk.MustExec("set @@session.tidb_broadcast_join_threshold_size = 0")
var input []string
var output []struct {
SQL string
Plan []string
}
s.testData.GetTestCases(c, &input, &output)
for i, tt := range input {
s.testData.OnRecord(func() {
output[i].SQL = tt
output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows())
})
res := tk.MustQuery(tt)
res.Check(testkit.Rows(output[i].Plan...))
}
}

func (s *testIntegrationSerialSuite) TestMPPWithBroadcastExchangeUnderNewCollation(c *C) {
defer collate.SetNewCollationEnabledForTest(false)
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists table_1")
tk.MustExec("create table table_1(id int not null, value char(10))")
tk.MustExec("insert into table_1 values(1,'1'),(2,'2')")
tk.MustExec("analyze table table_1")

// Create virtual tiflash replica info.
Expand Down
11 changes: 11 additions & 0 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/plancodec"
"github.com/pingcap/tipb/go-tipb"
)
Expand Down Expand Up @@ -1770,6 +1771,9 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...task) task {
}
prop := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, PartitionTp: property.HashType, PartitionCols: partitionCols}
newMpp := mpp.enforceExchangerImpl(prop)
if newMpp.invalid() {
return newMpp
}
attachPlan2Task(finalAgg, newMpp)
if proj != nil {
attachPlan2Task(proj, newMpp)
Expand Down Expand Up @@ -1971,6 +1975,13 @@ func (t *mppTask) enforceExchanger(prop *property.PhysicalProperty) *mppTask {
}

func (t *mppTask) enforceExchangerImpl(prop *property.PhysicalProperty) *mppTask {
if collate.NewCollationEnabled() && prop.PartitionTp == property.HashType {
for _, col := range prop.PartitionCols {
if types.IsString(col.RetType.Tp) {
return &mppTask{cst: math.MaxFloat64}
}
}
}
ctx := t.p.SCtx()
sender := PhysicalExchangeSender{
ExchangeType: tipb.ExchangeType(prop.PartitionTp),
Expand Down
13 changes: 11 additions & 2 deletions planner/core/testdata/integration_serial_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,19 @@
]
},
{
"name": "TestMPPNotSupportedInNewCollation",
"name": "TestMPPWithHashExchangeUnderNewCollation",
"cases": [
"explain format = 'brief' select * from table_1 a, table_1 b where a.id = b.id",
"explain format = 'brief' select /*+ agg_to_cop() */ count(*), id from table_1 group by id"
"explain format = 'brief' select /*+ agg_to_cop() */ count(*), id from table_1 group by id",
"explain format = 'brief' select * from table_1 a, table_1 b where a.value = b.value",
"explain format = 'brief' select /*+ agg_to_cop() */ count(*), value from table_1 group by value"
]
},
{
"name": "TestMPPWithBroadcastExchangeUnderNewCollation",
"cases": [
"explain format = 'brief' select /*+ broadcast_join(a,b) */ * from table_1 a, table_1 b where a.id = b.id",
"explain format = 'brief' select /*+ broadcast_join(a,b) */ * from table_1 a, table_1 b where a.value = b.value"
]
},
{
Expand Down
80 changes: 71 additions & 9 deletions planner/core/testdata/integration_serial_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -825,24 +825,86 @@
]
},
{
"Name": "TestMPPNotSupportedInNewCollation",
"Name": "TestMPPWithHashExchangeUnderNewCollation",
"Cases": [
{
"SQL": "explain format = 'brief' select * from table_1 a, table_1 b where a.id = b.id",
"Plan": [
"HashJoin 2.00 root inner join, equal:[eq(test.table_1.id, test.table_1.id)]",
"├─TableReader(Build) 2.00 root data:TableFullScan",
"│ └─TableFullScan 2.00 cop[tiflash] table:b keep order:false",
"└─TableReader(Probe) 2.00 root data:TableFullScan",
" └─TableFullScan 2.00 cop[tiflash] table:a keep order:false"
"TableReader 2.00 root data:ExchangeSender",
"└─ExchangeSender 2.00 cop[tiflash] ExchangeType: PassThrough",
" └─HashJoin 2.00 cop[tiflash] inner join, equal:[eq(test.table_1.id, test.table_1.id)]",
" ├─ExchangeReceiver(Build) 2.00 cop[tiflash] ",
" │ └─ExchangeSender 2.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.table_1.id",
" │ └─TableFullScan 2.00 cop[tiflash] table:a keep order:false",
" └─ExchangeReceiver(Probe) 2.00 cop[tiflash] ",
" └─ExchangeSender 2.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.table_1.id",
" └─TableFullScan 2.00 cop[tiflash] table:b keep order:false"
]
},
{
"SQL": "explain format = 'brief' select /*+ agg_to_cop() */ count(*), id from table_1 group by id",
"Plan": [
"HashAgg 2.00 root group by:test.table_1.id, funcs:count(1)->Column#4, funcs:firstrow(test.table_1.id)->test.table_1.id",
"└─TableReader 2.00 root data:TableFullScan",
" └─TableFullScan 2.00 cop[tiflash] table:table_1 keep order:false"
"TableReader 2.00 root data:ExchangeSender",
"└─ExchangeSender 2.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─Projection 2.00 batchCop[tiflash] Column#4, test.table_1.id",
" └─HashAgg 2.00 batchCop[tiflash] group by:test.table_1.id, funcs:sum(Column#7)->Column#4, funcs:firstrow(test.table_1.id)->test.table_1.id",
" └─ExchangeReceiver 2.00 batchCop[tiflash] ",
" └─ExchangeSender 2.00 batchCop[tiflash] ExchangeType: HashPartition, Hash Cols: test.table_1.id",
" └─HashAgg 2.00 batchCop[tiflash] group by:test.table_1.id, funcs:count(1)->Column#7",
" └─TableFullScan 2.00 batchCop[tiflash] table:table_1 keep order:false"
]
},
{
"SQL": "explain format = 'brief' select * from table_1 a, table_1 b where a.value = b.value",
"Plan": [
"HashJoin 2.00 root inner join, equal:[eq(test.table_1.value, test.table_1.value)]",
"├─TableReader(Build) 2.00 root data:Selection",
"│ └─Selection 2.00 cop[tiflash] not(isnull(test.table_1.value))",
"│ └─TableFullScan 2.00 cop[tiflash] table:b keep order:false",
"└─TableReader(Probe) 2.00 root data:Selection",
" └─Selection 2.00 cop[tiflash] not(isnull(test.table_1.value))",
" └─TableFullScan 2.00 cop[tiflash] table:a keep order:false"
]
},
{
"SQL": "explain format = 'brief' select /*+ agg_to_cop() */ count(*), value from table_1 group by value",
"Plan": [
"HashAgg 2.00 root group by:test.table_1.value, funcs:count(Column#9)->Column#4, funcs:firstrow(test.table_1.value)->test.table_1.value",
"└─TableReader 2.00 root data:ExchangeSender",
" └─ExchangeSender 2.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg 2.00 batchCop[tiflash] group by:test.table_1.value, funcs:count(1)->Column#9",
" └─TableFullScan 2.00 batchCop[tiflash] table:table_1 keep order:false"
]
}
]
},
{
"Name": "TestMPPWithBroadcastExchangeUnderNewCollation",
"Cases": [
{
"SQL": "explain format = 'brief' select /*+ broadcast_join(a,b) */ * from table_1 a, table_1 b where a.id = b.id",
"Plan": [
"TableReader 2.00 root data:ExchangeSender",
"└─ExchangeSender 2.00 cop[tiflash] ExchangeType: PassThrough",
" └─HashJoin 2.00 cop[tiflash] inner join, equal:[eq(test.table_1.id, test.table_1.id)]",
" ├─ExchangeReceiver(Build) 2.00 cop[tiflash] ",
" │ └─ExchangeSender 2.00 cop[tiflash] ExchangeType: Broadcast",
" │ └─TableFullScan 2.00 cop[tiflash] table:a keep order:false",
" └─TableFullScan(Probe) 2.00 cop[tiflash] table:b keep order:false"
]
},
{
"SQL": "explain format = 'brief' select /*+ broadcast_join(a,b) */ * from table_1 a, table_1 b where a.value = b.value",
"Plan": [
"TableReader 2.00 root data:ExchangeSender",
"└─ExchangeSender 2.00 cop[tiflash] ExchangeType: PassThrough",
" └─HashJoin 2.00 cop[tiflash] inner join, equal:[eq(test.table_1.value, test.table_1.value)]",
" ├─ExchangeReceiver(Build) 2.00 cop[tiflash] ",
" │ └─ExchangeSender 2.00 cop[tiflash] ExchangeType: Broadcast",
" │ └─Selection 2.00 cop[tiflash] not(isnull(test.table_1.value))",
" │ └─TableFullScan 2.00 cop[tiflash] table:a keep order:false",
" └─Selection(Probe) 2.00 cop[tiflash] not(isnull(test.table_1.value))",
" └─TableFullScan 2.00 cop[tiflash] table:b keep order:false"
]
}
]
Expand Down

0 comments on commit fe250b9

Please sign in to comment.