Skip to content

Commit

Permalink
planner: fix HashAgg cannot pushdown to tiflash_compute (#40828)
Browse files Browse the repository at this point in the history
close #40717
  • Loading branch information
guo-shaoge authored Jan 31, 2023
1 parent eb53aa8 commit 05edfd4
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 50 deletions.
5 changes: 1 addition & 4 deletions executor/tiflashtest/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1270,7 +1270,7 @@ func TestDisaggregatedTiFlash(t *testing.T) {
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")

err = tk.ExecToErr("select * from t;")
require.Contains(t, err.Error(), "Please check tiflash_compute node is available")
require.Contains(t, err.Error(), "tiflash_compute node is unavailable")

config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = false
Expand Down Expand Up @@ -1304,9 +1304,6 @@ func TestDisaggregatedTiFlashQuery(t *testing.T) {
require.NoError(t, err)
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")

needCheckTiFlashComputeNode := "false"
failpoint.Enable("github.com/pingcap/tidb/planner/core/testDisaggregatedTiFlashQuery", fmt.Sprintf("return(%s)", needCheckTiFlashComputeNode))
defer failpoint.Disable("github.com/pingcap/tidb/planner/core/testDisaggregatedTiFlashQuery")
tk.MustExec("explain select max( tbl_1.col_1 ) as r0 , sum( tbl_1.col_1 ) as r1 , sum( tbl_1.col_8 ) as r2 from tbl_1 where tbl_1.col_8 != 68 or tbl_1.col_3 between null and 939 order by r0,r1,r2;")

tk.MustExec("set @@tidb_partition_prune_mode = 'static';")
Expand Down
1 change: 0 additions & 1 deletion planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ go_library(
"//sessiontxn/staleread",
"//statistics",
"//statistics/handle",
"//store/driver/backoff",
"//table",
"//table/tables",
"//table/temptable",
Expand Down
13 changes: 9 additions & 4 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2015,15 +2015,16 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid
}
// In disaggregated tiflash mode, only MPP is allowed, cop and batchCop is deprecated.
// So if prop.TaskTp is RootTaskType, have to use mppTask then convert to rootTask.
isDisaggregatedTiFlashPath := config.GetGlobalConfig().DisaggregatedTiFlash && ts.StoreType == kv.TiFlash
isDisaggregatedTiFlash := config.GetGlobalConfig().DisaggregatedTiFlash
isDisaggregatedTiFlashPath := isDisaggregatedTiFlash && ts.StoreType == kv.TiFlash
canMppConvertToRootForDisaggregatedTiFlash := isDisaggregatedTiFlashPath && prop.TaskTp == property.RootTaskType && ds.SCtx().GetSessionVars().IsMPPAllowed()
if prop.TaskTp == property.MppTaskType || canMppConvertToRootForDisaggregatedTiFlash {
if ts.KeepOrder {
return invalidTask, nil
}
if prop.MPPPartitionTp != property.AnyType || (ts.isPartition && !canMppConvertToRootForDisaggregatedTiFlash) {
if prop.MPPPartitionTp != property.AnyType || (ts.isPartition && !isDisaggregatedTiFlash) {
// If ts is a single partition, then this partition table is in static-only prune, then we should not choose mpp execution.
// But in disaggregated tiflash mode, we can only use mpp, so we add ExchangeSender and ExchangeReceiver above TableScan for static pruning partition table.
// But in disaggregated tiflash mode, we enable using mpp for static pruning partition table, because cop and batchCop is deprecated.
ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because table `" + ds.tableInfo.Name.O + "`is a partition table which is not supported when `@@tidb_partition_prune_mode=static`.")
return invalidTask, nil
}
Expand Down Expand Up @@ -2052,7 +2053,11 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid
// So have to return a rootTask, but prop requires mppTask, cannot meet this requirement.
task = invalidTask
} else if prop.TaskTp == property.RootTaskType {
// when got here, canMppConvertToRootForDisaggregatedTiFlash is true.
// When got here, canMppConvertToRootForDisaggregatedTiFlash is true.
// This is for situations like cannot generate mppTask for some operators.
// Such as when the build side of HashJoin is Projection,
// which cannot pushdown to tiflash(because TiFlash doesn't support some expr in Proj)
// So HashJoin cannot pushdown to tiflash. But we still want TableScan to run on tiflash.
task = mppTask
task = task.convertToRootTask(ds.ctx)
if !task.invalid() {
Expand Down
19 changes: 0 additions & 19 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
Expand All @@ -50,7 +49,6 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/store/driver/backoff"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/table/temptable"
Expand All @@ -67,7 +65,6 @@ import (
"github.com/pingcap/tidb/util/plancodec"
"github.com/pingcap/tidb/util/set"
"github.com/pingcap/tidb/util/size"
"github.com/tikv/client-go/v2/tikv"
)

const (
Expand Down Expand Up @@ -692,13 +689,6 @@ func (ds *DataSource) setPreferredStoreType(hintInfo *tableHintInfo) {
ds.preferStoreType = 0
return
}
if config.GetGlobalConfig().DisaggregatedTiFlash && !isTiFlashComputeNodeAvailable(ds.ctx) {
// TiFlash is in disaggregated mode, need to make sure tiflash_compute node is available.
errMsg := "No available tiflash_compute node"
warning := ErrInternal.GenWithStack(errMsg)
ds.ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
return
}
for _, path := range ds.possibleAccessPaths {
if path.StoreType == kv.TiFlash {
ds.preferStoreType |= preferTiFlash
Expand All @@ -716,15 +706,6 @@ func (ds *DataSource) setPreferredStoreType(hintInfo *tableHintInfo) {
}
}

func isTiFlashComputeNodeAvailable(ctx sessionctx.Context) bool {
bo := backoff.NewBackofferWithVars(context.Background(), 5000, nil)
stores, err := ctx.GetStore().(tikv.Storage).GetRegionCache().GetTiFlashComputeStores(bo.TiKVBackoffer())
if err != nil || len(stores) == 0 {
return false
}
return true
}

func resetNotNullFlag(schema *expression.Schema, start, end int) {
for i := start; i < end; i++ {
col := *schema.Columns[i]
Expand Down
73 changes: 73 additions & 0 deletions planner/core/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/infoschema"
Expand Down Expand Up @@ -2626,3 +2627,75 @@ func TestCountStarForTiFlash(t *testing.T) {
tk.MustQuery("explain format = 'brief' " + ts).Check(testkit.Rows(output[i].Plan...))
}
}

func TestHashAggPushdownToTiFlashCompute(t *testing.T) {
var (
input []string
output []struct {
SQL string
Plan []string
Warning []string
}
)
planSuiteData := core.GetPlanSuiteData()
planSuiteData.LoadTestCases(t, &input, &output)
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("drop table if exists tbl_15;")
tk.MustExec(`create table tbl_15 (col_89 text (473) collate utf8mb4_bin ,
col_90 timestamp default '1976-04-03' ,
col_91 tinyint unsigned not null ,
col_92 tinyint ,
col_93 double not null ,
col_94 datetime not null default '1970-06-08' ,
col_95 datetime default '2028-02-13' ,
col_96 int unsigned not null default 2532480521 ,
col_97 char (168) default '') partition by hash (col_91) partitions 4;`)

tk.MustExec("drop table if exists tbl_16;")
tk.MustExec(`create table tbl_16 (col_98 text (246) not null ,
col_99 decimal (30 ,19) ,
col_100 mediumint unsigned ,
col_101 text (410) collate utf8mb4_bin ,
col_102 date not null ,
col_103 timestamp not null default '2003-08-27' ,
col_104 text (391) not null ,
col_105 date default '2010-10-24' ,
col_106 text (9) not null,primary key (col_100, col_98(5), col_103),
unique key idx_23 (col_100, col_106 (3), col_101 (3))) partition by hash (col_100) partitions 2;`)

config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = true
})
defer config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = false
})

dom := domain.GetDomain(tk.Session())
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
require.True(t, exists)
for _, tblInfo := range db.Tables {
tableName := tblInfo.Name.L
if tableName == "tbl_15" || tableName == "tbl_16" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}

tk.MustExec("set @@tidb_allow_mpp=1; set @@tidb_enforce_mpp=1;")
tk.MustExec("set @@tidb_partition_prune_mode = 'static';")
tk.MustExec("set @@tidb_isolation_read_engines = 'tiflash';")

for i, ts := range input {
testdata.OnRecord(func() {
output[i].SQL = ts
output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery("explain format = 'brief' " + ts).Rows())
})
tk.MustQuery("explain format = 'brief' " + ts).Check(testkit.Rows(output[i].Plan...))
}
}
24 changes: 2 additions & 22 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"unsafe"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
Expand Down Expand Up @@ -1453,8 +1452,6 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath,
isolationReadEngines := ctx.GetSessionVars().GetIsolationReadEngines()
availableEngine := map[kv.StoreType]struct{}{}
var availableEngineStr string
var outputComputeNodeErrMsg bool
noTiFlashComputeNode := config.GetGlobalConfig().DisaggregatedTiFlash && !isTiFlashComputeNodeAvailable(ctx)
for i := len(paths) - 1; i >= 0; i-- {
// availableEngineStr is for warning message.
if _, ok := availableEngine[paths[i].StoreType]; !ok {
Expand All @@ -1464,20 +1461,7 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath,
}
availableEngineStr += paths[i].StoreType.Name()
}
_, exists := isolationReadEngines[paths[i].StoreType]
// Prune this path if:
// 1. path.StoreType doesn't exists in isolationReadEngines or
// 2. TiFlash is disaggregated and the number of tiflash_compute node is zero.
shouldPruneTiFlashCompute := noTiFlashComputeNode && exists && paths[i].StoreType == kv.TiFlash
failpoint.Inject("testDisaggregatedTiFlashQuery", func(val failpoint.Value) {
// Ignore check if tiflash_compute node number.
// After we support disaggregated tiflash in test framework, can delete this failpoint.
shouldPruneTiFlashCompute = val.(bool)
})
if shouldPruneTiFlashCompute {
outputComputeNodeErrMsg = true
}
if (!exists && paths[i].StoreType != kv.TiDB) || shouldPruneTiFlashCompute {
if _, ok := isolationReadEngines[paths[i].StoreType]; !ok && paths[i].StoreType != kv.TiDB {
paths = append(paths[:i], paths[i+1:]...)
}
}
Expand All @@ -1486,11 +1470,7 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath,
if len(paths) == 0 {
helpMsg := ""
if engineVals == "tiflash" {
if outputComputeNodeErrMsg {
helpMsg = ". Please check tiflash_compute node is available"
} else {
helpMsg = ". Please check tiflash replica or ensure the query is readonly"
}
helpMsg = ". Please check tiflash replica or ensure the query is readonly"
}
err = ErrInternal.GenWithStackByArgs(fmt.Sprintf("No access path for table '%s' is found with '%v' = '%v', valid values can be '%s'%s.", tblName.String(),
variable.TiDBIsolationReadEngines, engineVals, availableEngineStr, helpMsg))
Expand Down
6 changes: 6 additions & 0 deletions planner/core/rule_aggregation_push_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,12 @@ func (a *aggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, u
if err != nil {
return nil, err
}
// Update mode of new generated firstRow as other agg funcs.
if len(agg.AggFuncs) != 0 {
firstRow.Mode = agg.AggFuncs[0].Mode
} else {
firstRow.Mode = aggregation.Partial1Mode
}
newAgg.AggFuncs = append(newAgg.AggFuncs, firstRow)
}
tmpSchema := expression.NewSchema(newAgg.GetGroupByCols()...)
Expand Down
6 changes: 6 additions & 0 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,12 @@ func (p *PhysicalUnionAll) attach2MppTasks(tasks ...task) task {
func (p *PhysicalUnionAll) attach2Task(tasks ...task) task {
for _, t := range tasks {
if _, ok := t.(*mppTask); ok {
if p.TP() == plancodec.TypePartitionUnion {
// In attach2MppTasks(), will attach PhysicalUnion to mppTask directly.
// But PartitionUnion cannot pushdown to tiflash, so here disable PartitionUnion pushdown to tiflash explicitly.
// For now, return invalidTask immediately, we can refine this by letting childTask of PartitionUnion convert to rootTask.
return invalidTask
}
return p.attach2MppTasks(tasks...)
}
}
Expand Down
8 changes: 8 additions & 0 deletions planner/core/testdata/plan_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -1186,5 +1186,13 @@
"select a, count(*) from t group by a -- shouldn't be rewritten",
"select sum(a) from t -- sum shouldn't be rewritten"
]
},
{
"name": "TestHashAggPushdownToTiFlashCompute",
"cases": [
"select /*+ agg_to_cop() hash_agg() */ avg( distinct tbl_15.col_96 ) as r0 , min( tbl_15.col_92 ) as r1 , sum( distinct tbl_15.col_91 ) as r2 , max( tbl_15.col_92 ) as r3 from tbl_15 where tbl_15.col_94 != '2033-01-09' and tbl_15.col_93 > 7623.679908049186 order by r0,r1,r2,r3 limit 79 ;",
"select /*+ agg_to_cop() hash_agg() */ count(1) from tbl_15 ;",
"select /*+ agg_to_cop() stream_agg() */ avg( tbl_16.col_100 ) as r0 from tbl_16 where tbl_16.col_100 in ( 10672141 ) or tbl_16.col_104 in ( 'yfEG1t!*b' ,'C1*bqx_qyO' ,'vQ^yUpKHr&j#~' ) group by tbl_16.col_100 order by r0 limit 20 ;"
]
}
]
Loading

0 comments on commit 05edfd4

Please sign in to comment.