Skip to content

Commit

Permalink
planner, expression: pushdown AggFuncMode to coprocessor (#31392) (#3…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Jul 5, 2022
1 parent 3279c08 commit 2e04605
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 33 deletions.
43 changes: 41 additions & 2 deletions expression/aggregation/agg_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,46 @@ func AggFuncToPBExpr(sc *stmtctx.StatementContext, client kv.Client, aggFunc *Ag
}
children = append(children, pbArg)
}
return &tipb.Expr{Tp: tp, Children: children, FieldType: expression.ToPBFieldType(aggFunc.RetTp), HasDistinct: aggFunc.HasDistinct}
return &tipb.Expr{Tp: tp, Children: children, FieldType: expression.ToPBFieldType(aggFunc.RetTp), HasDistinct: aggFunc.HasDistinct, AggFuncMode: AggFunctionModeToPB(aggFunc.Mode)}
}

// AggFunctionModeToPB converts aggregate function mode to PB.
func AggFunctionModeToPB(mode AggFunctionMode) (pbMode *tipb.AggFunctionMode) {
pbMode = new(tipb.AggFunctionMode)
switch mode {
case CompleteMode:
*pbMode = tipb.AggFunctionMode_CompleteMode
case FinalMode:
*pbMode = tipb.AggFunctionMode_FinalMode
case Partial1Mode:
*pbMode = tipb.AggFunctionMode_Partial1Mode
case Partial2Mode:
*pbMode = tipb.AggFunctionMode_Partial2Mode
case DedupMode:
*pbMode = tipb.AggFunctionMode_DedupMode
}
return pbMode
}

// PBAggFuncModeToAggFuncMode converts pb to aggregate function mode.
func PBAggFuncModeToAggFuncMode(pbMode *tipb.AggFunctionMode) (mode AggFunctionMode) {
// Default mode of the aggregate function is PartialMode.
mode = Partial1Mode
if pbMode != nil {
switch *pbMode {
case tipb.AggFunctionMode_CompleteMode:
mode = CompleteMode
case tipb.AggFunctionMode_FinalMode:
mode = FinalMode
case tipb.AggFunctionMode_Partial1Mode:
mode = Partial1Mode
case tipb.AggFunctionMode_Partial2Mode:
mode = Partial2Mode
case tipb.AggFunctionMode_DedupMode:
mode = DedupMode
}
}
return mode
}

// PBExprToAggFuncDesc converts pb to aggregate function.
Expand Down Expand Up @@ -127,7 +166,7 @@ func PBExprToAggFuncDesc(ctx sessionctx.Context, aggFunc *tipb.Expr, fieldTps []
base.WrapCastForAggArgs(ctx)
return &AggFuncDesc{
baseFuncDesc: base,
Mode: Partial1Mode,
Mode: PBAggFuncModeToAggFuncMode(aggFunc.AggFuncMode),
HasDistinct: false,
}, nil
}
14 changes: 7 additions & 7 deletions expression/aggregation/agg_to_pb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ func (s *testEvaluatorSuite) TestAggFunc2Pb(c *C) {
}

jsons := []string{
`{"tp":3002,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v}`,
`{"tp":3001,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":8,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v}`,
`{"tp":3003,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v}`,
"null",
`{"tp":3005,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v}`,
`{"tp":3004,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v}`,
`{"tp":3006,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v}`,
`{"tp":3002,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v,"aggFuncMode":0}`,
`{"tp":3001,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":8,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v,"aggFuncMode":0}`,
`{"tp":3003,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v,"aggFuncMode":0}`,
`{"tp":3007,"val":"AAAAAAAABAA=","children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":15,"flag":0,"flen":1,"decimal":-1,"collate":-46,"charset":"utf8mb4"},"has_distinct":%v,"aggFuncMode":0}`,
`{"tp":3005,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v,"aggFuncMode":0}`,
`{"tp":3004,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v,"aggFuncMode":0}`,
`{"tp":3006,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v,"aggFuncMode":0}`,
}
for i, funcName := range funcNames {
for _, hasDistinct := range []bool{true, false} {
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ require (
github.com/pingcap/parser v0.0.0-20210831085004-b5390aa83f65
github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5
github.com/pingcap/tidb-tools v5.0.3+incompatible
github.com/pingcap/tipb v0.0.0-20211025074540-e1c7362eeeb4
github.com/pingcap/tipb v0.0.0-20220413031846-c26c0efa9127
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.9.1
Expand Down Expand Up @@ -92,7 +92,7 @@ require (
google.golang.org/grpc v1.29.1
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v2 v2.4.0
modernc.org/mathutil v1.2.2
modernc.org/mathutil v1.4.1
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0
sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67
)
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -576,8 +576,8 @@ github.com/pingcap/tidb-dashboard v0.0.0-20210312062513-eef5d6404638/go.mod h1:O
github.com/pingcap/tidb-dashboard v0.0.0-20210716172320-2226872e3296/go.mod h1:OCXbZTBTIMRcIt0jFsuCakZP+goYRv6IjawKbwLS2TQ=
github.com/pingcap/tidb-tools v5.0.3+incompatible h1:vYMrW9ux+3HRMeRZ1fUOjy2nyiodtuVyAyK270EKBEs=
github.com/pingcap/tidb-tools v5.0.3+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tipb v0.0.0-20211025074540-e1c7362eeeb4 h1:9Ef4j3DLmUidURfob0tf94v+sqvozqdCTr7e5hi19qU=
github.com/pingcap/tipb v0.0.0-20211025074540-e1c7362eeeb4/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pingcap/tipb v0.0.0-20220413031846-c26c0efa9127 h1:Q5eNRRSOfZP00j7J46D+3fVCtHAXh4Z+Tmqu3ILwgjg=
github.com/pingcap/tipb v0.0.0-20220413031846-c26c0efa9127/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -1159,8 +1159,8 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9
honnef.co/go/tools v0.2.0 h1:ws8AfbgTX3oIczLPNPCu5166oBg9ST2vNs0rcht+mDE=
honnef.co/go/tools v0.2.0/go.mod h1:lPVVZ2BS5TfnjLyizF7o7hv7j9/L+8cZY2hLyjP9cGY=
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
modernc.org/mathutil v1.2.2 h1:+yFk8hBprV+4c0U9GjFtL+dV3N8hOJ8JCituQcMShFY=
modernc.org/mathutil v1.2.2/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
modernc.org/mathutil v1.4.1 h1:ij3fYGe8zBF4Vu+g0oT7mB06r8sqGWKuJu1yXeR4by8=
modernc.org/mathutil v1.4.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
moul.io/zapgorm2 v1.1.0/go.mod h1:emRfKjNqSzVj5lcgasBdovIXY1jSOwFz2GQZn1Rddks=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=
Expand Down
4 changes: 2 additions & 2 deletions planner/cascades/transformation_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,15 +438,15 @@ func (r *PushAggDownGather) OnTransform(old *memo.ExprIter) (newExprs []*memo.Gr
gbyItems := make([]expression.Expression, len(agg.GroupByItems))
copy(gbyItems, agg.GroupByItems)

partialPref, finalPref, funcMap := plannercore.BuildFinalModeAggregation(agg.SCtx(),
partialPref, finalPref, firstRowFuncMap := plannercore.BuildFinalModeAggregation(agg.SCtx(),
&plannercore.AggInfo{
AggFuncs: aggFuncs,
GroupByItems: gbyItems,
Schema: aggSchema,
}, true, false)
// Remove unnecessary FirstRow.
partialPref.AggFuncs =
plannercore.RemoveUnnecessaryFirstRow(agg.SCtx(), finalPref.AggFuncs, finalPref.GroupByItems, partialPref.AggFuncs, partialPref.GroupByItems, partialPref.Schema, funcMap)
plannercore.RemoveUnnecessaryFirstRow(agg.SCtx(), finalPref.GroupByItems, partialPref.AggFuncs, partialPref.GroupByItems, partialPref.Schema, firstRowFuncMap)

partialAgg := plannercore.LogicalAggregation{
AggFuncs: partialPref.AggFuncs,
Expand Down
155 changes: 155 additions & 0 deletions planner/core/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,28 @@ import (
"bytes"
"fmt"
"strings"
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/planner/util"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/israce"
"github.com/pingcap/tidb/util/plancodec"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/util/testutil"
"github.com/stretchr/testify/require"
)

var _ = Suite(&testPlanNormalize{})
Expand Down Expand Up @@ -638,3 +647,149 @@ func (s *testPlanNormalize) TestIssue25729(c *C) {
tk.MustExec("insert into t1 values(\"a\", \"adwa\");")
tk.MustQuery("select * from t1 where concat(a, b) like \"aadwa\" and a = \"a\";").Check(testkit.Rows("a adwa"))
}

func TestBuildFinalModeAggregation(t *testing.T) {
aggSchemaBuilder := func(sctx sessionctx.Context, aggFuncs []*aggregation.AggFuncDesc) *expression.Schema {
schema := expression.NewSchema(make([]*expression.Column, 0, len(aggFuncs))...)
for _, agg := range aggFuncs {
newCol := &expression.Column{
UniqueID: sctx.GetSessionVars().AllocPlanColumnID(),
RetType: agg.RetTp,
}
schema.Append(newCol)
}
return schema
}
isFinalAggMode := func(mode aggregation.AggFunctionMode) bool {
return mode == aggregation.FinalMode || mode == aggregation.CompleteMode
}
checkResult := func(sctx sessionctx.Context, aggFuncs []*aggregation.AggFuncDesc, groubyItems []expression.Expression) {
for partialIsCop := 0; partialIsCop < 2; partialIsCop++ {
for isMPPTask := 0; isMPPTask < 2; isMPPTask++ {
partial, final, _ := core.BuildFinalModeAggregation(sctx, &core.AggInfo{
AggFuncs: aggFuncs,
GroupByItems: groubyItems,
Schema: aggSchemaBuilder(sctx, aggFuncs),
}, partialIsCop == 0, isMPPTask == 0)
if partial != nil {
for _, aggFunc := range partial.AggFuncs {
if partialIsCop == 0 {
require.True(t, !isFinalAggMode(aggFunc.Mode))
} else {
require.True(t, isFinalAggMode(aggFunc.Mode))
}
}
}
if final != nil {
for _, aggFunc := range final.AggFuncs {
require.True(t, isFinalAggMode(aggFunc.Mode))
}
}
}
}
}

ctx := core.MockContext()

aggCol := &expression.Column{
Index: 0,
RetType: types.NewFieldType(mysql.TypeLonglong),
}
gbyCol := &expression.Column{
Index: 1,
RetType: types.NewFieldType(mysql.TypeLonglong),
}
orderCol := &expression.Column{
Index: 2,
RetType: types.NewFieldType(mysql.TypeLonglong),
}

emptyGroupByItems := make([]expression.Expression, 0, 1)
groupByItems := make([]expression.Expression, 0, 1)
groupByItems = append(groupByItems, gbyCol)

orderByItems := make([]*util.ByItems, 0, 1)
orderByItems = append(orderByItems, &util.ByItems{
Expr: orderCol,
Desc: true,
})

aggFuncs := make([]*aggregation.AggFuncDesc, 0, 5)
desc, err := aggregation.NewAggFuncDesc(ctx, ast.AggFuncMax, []expression.Expression{aggCol}, false)
require.NoError(t, err)
aggFuncs = append(aggFuncs, desc)
desc, err = aggregation.NewAggFuncDesc(ctx, ast.AggFuncFirstRow, []expression.Expression{aggCol}, false)
require.NoError(t, err)
aggFuncs = append(aggFuncs, desc)
desc, err = aggregation.NewAggFuncDesc(ctx, ast.AggFuncCount, []expression.Expression{aggCol}, false)
require.NoError(t, err)
aggFuncs = append(aggFuncs, desc)
desc, err = aggregation.NewAggFuncDesc(ctx, ast.AggFuncSum, []expression.Expression{aggCol}, false)
require.NoError(t, err)
aggFuncs = append(aggFuncs, desc)
desc, err = aggregation.NewAggFuncDesc(ctx, ast.AggFuncAvg, []expression.Expression{aggCol}, false)
require.NoError(t, err)
aggFuncs = append(aggFuncs, desc)

aggFuncsWithDistinct := make([]*aggregation.AggFuncDesc, 0, 2)
desc, err = aggregation.NewAggFuncDesc(ctx, ast.AggFuncAvg, []expression.Expression{aggCol}, true)
require.NoError(t, err)
aggFuncsWithDistinct = append(aggFuncsWithDistinct, desc)
desc, err = aggregation.NewAggFuncDesc(ctx, ast.AggFuncCount, []expression.Expression{aggCol}, true)
require.NoError(t, err)
aggFuncsWithDistinct = append(aggFuncsWithDistinct, desc)

groupConcatAggFuncs := make([]*aggregation.AggFuncDesc, 0, 4)
groupConcatWithoutDistinctWithoutOrderBy, err := aggregation.NewAggFuncDesc(ctx, ast.AggFuncGroupConcat, []expression.Expression{aggCol, aggCol}, false)
require.NoError(t, err)
groupConcatAggFuncs = append(groupConcatAggFuncs, groupConcatWithoutDistinctWithoutOrderBy)
groupConcatWithoutDistinctWithOrderBy, err := aggregation.NewAggFuncDesc(ctx, ast.AggFuncGroupConcat, []expression.Expression{aggCol, aggCol}, false)
require.NoError(t, err)
groupConcatWithoutDistinctWithOrderBy.OrderByItems = orderByItems
groupConcatAggFuncs = append(groupConcatAggFuncs, groupConcatWithoutDistinctWithOrderBy)
groupConcatWithDistinctWithoutOrderBy, err := aggregation.NewAggFuncDesc(ctx, ast.AggFuncGroupConcat, []expression.Expression{aggCol, aggCol}, true)
require.NoError(t, err)
groupConcatAggFuncs = append(groupConcatAggFuncs, groupConcatWithDistinctWithoutOrderBy)
groupConcatWithDistinctWithOrderBy, err := aggregation.NewAggFuncDesc(ctx, ast.AggFuncGroupConcat, []expression.Expression{aggCol, aggCol}, true)
require.NoError(t, err)
groupConcatWithDistinctWithOrderBy.OrderByItems = orderByItems
groupConcatAggFuncs = append(groupConcatAggFuncs, groupConcatWithDistinctWithOrderBy)

// case 1 agg without distinct
checkResult(ctx, aggFuncs, emptyGroupByItems)
checkResult(ctx, aggFuncs, groupByItems)

// case 2 agg with distinct
checkResult(ctx, aggFuncsWithDistinct, emptyGroupByItems)
checkResult(ctx, aggFuncsWithDistinct, groupByItems)

// case 3 mixed with distinct and without distinct
mixedAggFuncs := make([]*aggregation.AggFuncDesc, 0, 10)
mixedAggFuncs = append(mixedAggFuncs, aggFuncs...)
mixedAggFuncs = append(mixedAggFuncs, aggFuncsWithDistinct...)
checkResult(ctx, mixedAggFuncs, emptyGroupByItems)
checkResult(ctx, mixedAggFuncs, groupByItems)

// case 4 group concat
for _, groupConcatAggFunc := range groupConcatAggFuncs {
checkResult(ctx, []*aggregation.AggFuncDesc{groupConcatAggFunc}, emptyGroupByItems)
checkResult(ctx, []*aggregation.AggFuncDesc{groupConcatAggFunc}, groupByItems)
}
checkResult(ctx, groupConcatAggFuncs, emptyGroupByItems)
checkResult(ctx, groupConcatAggFuncs, groupByItems)

// case 5 mixed group concat and other agg funcs
for _, groupConcatAggFunc := range groupConcatAggFuncs {
funcs := make([]*aggregation.AggFuncDesc, 0, 10)
funcs = append(funcs, groupConcatAggFunc)
funcs = append(funcs, aggFuncs...)
checkResult(ctx, funcs, emptyGroupByItems)
checkResult(ctx, funcs, groupByItems)
funcs = append(funcs, aggFuncsWithDistinct...)
checkResult(ctx, funcs, emptyGroupByItems)
checkResult(ctx, funcs, groupByItems)
}
mixedAggFuncs = append(mixedAggFuncs, groupConcatAggFuncs...)
checkResult(ctx, mixedAggFuncs, emptyGroupByItems)
checkResult(ctx, mixedAggFuncs, groupByItems)
}
Loading

0 comments on commit 2e04605

Please sign in to comment.