diff --git a/expression/aggregation/agg_to_pb.go b/expression/aggregation/agg_to_pb.go index d3c9ef9b7e3c2..1e31646f4c295 100644 --- a/expression/aggregation/agg_to_pb.go +++ b/expression/aggregation/agg_to_pb.go @@ -82,7 +82,46 @@ func AggFuncToPBExpr(sc *stmtctx.StatementContext, client kv.Client, aggFunc *Ag } children = append(children, pbArg) } - return &tipb.Expr{Tp: tp, Children: children, FieldType: expression.ToPBFieldType(aggFunc.RetTp), HasDistinct: aggFunc.HasDistinct} + return &tipb.Expr{Tp: tp, Children: children, FieldType: expression.ToPBFieldType(aggFunc.RetTp), HasDistinct: aggFunc.HasDistinct, AggFuncMode: AggFunctionModeToPB(aggFunc.Mode)} +} + +// AggFunctionModeToPB converts aggregate function mode to PB. +func AggFunctionModeToPB(mode AggFunctionMode) (pbMode *tipb.AggFunctionMode) { + pbMode = new(tipb.AggFunctionMode) + switch mode { + case CompleteMode: + *pbMode = tipb.AggFunctionMode_CompleteMode + case FinalMode: + *pbMode = tipb.AggFunctionMode_FinalMode + case Partial1Mode: + *pbMode = tipb.AggFunctionMode_Partial1Mode + case Partial2Mode: + *pbMode = tipb.AggFunctionMode_Partial2Mode + case DedupMode: + *pbMode = tipb.AggFunctionMode_DedupMode + } + return pbMode +} + +// PBAggFuncModeToAggFuncMode converts pb to aggregate function mode. +func PBAggFuncModeToAggFuncMode(pbMode *tipb.AggFunctionMode) (mode AggFunctionMode) { + // Default mode of the aggregate function is PartialMode. + mode = Partial1Mode + if pbMode != nil { + switch *pbMode { + case tipb.AggFunctionMode_CompleteMode: + mode = CompleteMode + case tipb.AggFunctionMode_FinalMode: + mode = FinalMode + case tipb.AggFunctionMode_Partial1Mode: + mode = Partial1Mode + case tipb.AggFunctionMode_Partial2Mode: + mode = Partial2Mode + case tipb.AggFunctionMode_DedupMode: + mode = DedupMode + } + } + return mode } // PBExprToAggFuncDesc converts pb to aggregate function. @@ -127,7 +166,7 @@ func PBExprToAggFuncDesc(ctx sessionctx.Context, aggFunc *tipb.Expr, fieldTps [] base.WrapCastForAggArgs(ctx) return &AggFuncDesc{ baseFuncDesc: base, - Mode: Partial1Mode, + Mode: PBAggFuncModeToAggFuncMode(aggFunc.AggFuncMode), HasDistinct: false, }, nil } diff --git a/expression/aggregation/agg_to_pb_test.go b/expression/aggregation/agg_to_pb_test.go index 5030959eed08b..5ee987f1e76f1 100644 --- a/expression/aggregation/agg_to_pb_test.go +++ b/expression/aggregation/agg_to_pb_test.go @@ -77,13 +77,13 @@ func (s *testEvaluatorSuite) TestAggFunc2Pb(c *C) { } jsons := []string{ - `{"tp":3002,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v}`, - `{"tp":3001,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":8,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v}`, - `{"tp":3003,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v}`, - "null", - `{"tp":3005,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v}`, - `{"tp":3004,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v}`, - `{"tp":3006,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v}`, + `{"tp":3002,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v,"aggFuncMode":0}`, + `{"tp":3001,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":8,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v,"aggFuncMode":0}`, + `{"tp":3003,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v,"aggFuncMode":0}`, + `{"tp":3007,"val":"AAAAAAAABAA=","children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":15,"flag":0,"flen":1,"decimal":-1,"collate":-46,"charset":"utf8mb4"},"has_distinct":%v,"aggFuncMode":0}`, + `{"tp":3005,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v,"aggFuncMode":0}`, + `{"tp":3004,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v,"aggFuncMode":0}`, + `{"tp":3006,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v,"aggFuncMode":0}`, } for i, funcName := range funcNames { for _, hasDistinct := range []bool{true, false} { diff --git a/go.mod b/go.mod index ee7854505b141..c5b3b0de8e22e 100644 --- a/go.mod +++ b/go.mod @@ -54,7 +54,7 @@ require ( github.com/pingcap/parser v0.0.0-20210831085004-b5390aa83f65 github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5 github.com/pingcap/tidb-tools v5.0.3+incompatible - github.com/pingcap/tipb v0.0.0-20211025074540-e1c7362eeeb4 + github.com/pingcap/tipb v0.0.0-20220413031846-c26c0efa9127 github.com/prometheus/client_golang v1.5.1 github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.9.1 @@ -92,7 +92,7 @@ require ( google.golang.org/grpc v1.29.1 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v2 v2.4.0 - modernc.org/mathutil v1.2.2 + modernc.org/mathutil v1.4.1 sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0 sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67 ) diff --git a/go.sum b/go.sum index 51095390a11ee..02f705f0e05a3 100644 --- a/go.sum +++ b/go.sum @@ -576,8 +576,8 @@ github.com/pingcap/tidb-dashboard v0.0.0-20210312062513-eef5d6404638/go.mod h1:O github.com/pingcap/tidb-dashboard v0.0.0-20210716172320-2226872e3296/go.mod h1:OCXbZTBTIMRcIt0jFsuCakZP+goYRv6IjawKbwLS2TQ= github.com/pingcap/tidb-tools v5.0.3+incompatible h1:vYMrW9ux+3HRMeRZ1fUOjy2nyiodtuVyAyK270EKBEs= github.com/pingcap/tidb-tools v5.0.3+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= -github.com/pingcap/tipb v0.0.0-20211025074540-e1c7362eeeb4 h1:9Ef4j3DLmUidURfob0tf94v+sqvozqdCTr7e5hi19qU= -github.com/pingcap/tipb v0.0.0-20211025074540-e1c7362eeeb4/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= +github.com/pingcap/tipb v0.0.0-20220413031846-c26c0efa9127 h1:Q5eNRRSOfZP00j7J46D+3fVCtHAXh4Z+Tmqu3ILwgjg= +github.com/pingcap/tipb v0.0.0-20220413031846-c26c0efa9127/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -1159,8 +1159,8 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9 honnef.co/go/tools v0.2.0 h1:ws8AfbgTX3oIczLPNPCu5166oBg9ST2vNs0rcht+mDE= honnef.co/go/tools v0.2.0/go.mod h1:lPVVZ2BS5TfnjLyizF7o7hv7j9/L+8cZY2hLyjP9cGY= k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= -modernc.org/mathutil v1.2.2 h1:+yFk8hBprV+4c0U9GjFtL+dV3N8hOJ8JCituQcMShFY= -modernc.org/mathutil v1.2.2/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= +modernc.org/mathutil v1.4.1 h1:ij3fYGe8zBF4Vu+g0oT7mB06r8sqGWKuJu1yXeR4by8= +modernc.org/mathutil v1.4.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= moul.io/zapgorm2 v1.1.0/go.mod h1:emRfKjNqSzVj5lcgasBdovIXY1jSOwFz2GQZn1Rddks= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/planner/cascades/transformation_rules.go b/planner/cascades/transformation_rules.go index 9961509299a52..9ddfb0077c8d1 100644 --- a/planner/cascades/transformation_rules.go +++ b/planner/cascades/transformation_rules.go @@ -438,7 +438,7 @@ func (r *PushAggDownGather) OnTransform(old *memo.ExprIter) (newExprs []*memo.Gr gbyItems := make([]expression.Expression, len(agg.GroupByItems)) copy(gbyItems, agg.GroupByItems) - partialPref, finalPref, funcMap := plannercore.BuildFinalModeAggregation(agg.SCtx(), + partialPref, finalPref, firstRowFuncMap := plannercore.BuildFinalModeAggregation(agg.SCtx(), &plannercore.AggInfo{ AggFuncs: aggFuncs, GroupByItems: gbyItems, @@ -446,7 +446,7 @@ func (r *PushAggDownGather) OnTransform(old *memo.ExprIter) (newExprs []*memo.Gr }, true, false) // Remove unnecessary FirstRow. partialPref.AggFuncs = - plannercore.RemoveUnnecessaryFirstRow(agg.SCtx(), finalPref.AggFuncs, finalPref.GroupByItems, partialPref.AggFuncs, partialPref.GroupByItems, partialPref.Schema, funcMap) + plannercore.RemoveUnnecessaryFirstRow(agg.SCtx(), finalPref.GroupByItems, partialPref.AggFuncs, partialPref.GroupByItems, partialPref.Schema, firstRowFuncMap) partialAgg := plannercore.LogicalAggregation{ AggFuncs: partialPref.AggFuncs, diff --git a/planner/core/plan_test.go b/planner/core/plan_test.go index ae1ae30abc84d..7e8d53d44fb9a 100644 --- a/planner/core/plan_test.go +++ b/planner/core/plan_test.go @@ -17,19 +17,28 @@ import ( "bytes" "fmt" "strings" + "testing" . "github.com/pingcap/check" + "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/expression/aggregation" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/planner/util" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/israce" "github.com/pingcap/tidb/util/plancodec" "github.com/pingcap/tidb/util/testkit" "github.com/pingcap/tidb/util/testleak" "github.com/pingcap/tidb/util/testutil" + "github.com/stretchr/testify/require" ) var _ = Suite(&testPlanNormalize{}) @@ -638,3 +647,149 @@ func (s *testPlanNormalize) TestIssue25729(c *C) { tk.MustExec("insert into t1 values(\"a\", \"adwa\");") tk.MustQuery("select * from t1 where concat(a, b) like \"aadwa\" and a = \"a\";").Check(testkit.Rows("a adwa")) } + +func TestBuildFinalModeAggregation(t *testing.T) { + aggSchemaBuilder := func(sctx sessionctx.Context, aggFuncs []*aggregation.AggFuncDesc) *expression.Schema { + schema := expression.NewSchema(make([]*expression.Column, 0, len(aggFuncs))...) + for _, agg := range aggFuncs { + newCol := &expression.Column{ + UniqueID: sctx.GetSessionVars().AllocPlanColumnID(), + RetType: agg.RetTp, + } + schema.Append(newCol) + } + return schema + } + isFinalAggMode := func(mode aggregation.AggFunctionMode) bool { + return mode == aggregation.FinalMode || mode == aggregation.CompleteMode + } + checkResult := func(sctx sessionctx.Context, aggFuncs []*aggregation.AggFuncDesc, groubyItems []expression.Expression) { + for partialIsCop := 0; partialIsCop < 2; partialIsCop++ { + for isMPPTask := 0; isMPPTask < 2; isMPPTask++ { + partial, final, _ := core.BuildFinalModeAggregation(sctx, &core.AggInfo{ + AggFuncs: aggFuncs, + GroupByItems: groubyItems, + Schema: aggSchemaBuilder(sctx, aggFuncs), + }, partialIsCop == 0, isMPPTask == 0) + if partial != nil { + for _, aggFunc := range partial.AggFuncs { + if partialIsCop == 0 { + require.True(t, !isFinalAggMode(aggFunc.Mode)) + } else { + require.True(t, isFinalAggMode(aggFunc.Mode)) + } + } + } + if final != nil { + for _, aggFunc := range final.AggFuncs { + require.True(t, isFinalAggMode(aggFunc.Mode)) + } + } + } + } + } + + ctx := core.MockContext() + + aggCol := &expression.Column{ + Index: 0, + RetType: types.NewFieldType(mysql.TypeLonglong), + } + gbyCol := &expression.Column{ + Index: 1, + RetType: types.NewFieldType(mysql.TypeLonglong), + } + orderCol := &expression.Column{ + Index: 2, + RetType: types.NewFieldType(mysql.TypeLonglong), + } + + emptyGroupByItems := make([]expression.Expression, 0, 1) + groupByItems := make([]expression.Expression, 0, 1) + groupByItems = append(groupByItems, gbyCol) + + orderByItems := make([]*util.ByItems, 0, 1) + orderByItems = append(orderByItems, &util.ByItems{ + Expr: orderCol, + Desc: true, + }) + + aggFuncs := make([]*aggregation.AggFuncDesc, 0, 5) + desc, err := aggregation.NewAggFuncDesc(ctx, ast.AggFuncMax, []expression.Expression{aggCol}, false) + require.NoError(t, err) + aggFuncs = append(aggFuncs, desc) + desc, err = aggregation.NewAggFuncDesc(ctx, ast.AggFuncFirstRow, []expression.Expression{aggCol}, false) + require.NoError(t, err) + aggFuncs = append(aggFuncs, desc) + desc, err = aggregation.NewAggFuncDesc(ctx, ast.AggFuncCount, []expression.Expression{aggCol}, false) + require.NoError(t, err) + aggFuncs = append(aggFuncs, desc) + desc, err = aggregation.NewAggFuncDesc(ctx, ast.AggFuncSum, []expression.Expression{aggCol}, false) + require.NoError(t, err) + aggFuncs = append(aggFuncs, desc) + desc, err = aggregation.NewAggFuncDesc(ctx, ast.AggFuncAvg, []expression.Expression{aggCol}, false) + require.NoError(t, err) + aggFuncs = append(aggFuncs, desc) + + aggFuncsWithDistinct := make([]*aggregation.AggFuncDesc, 0, 2) + desc, err = aggregation.NewAggFuncDesc(ctx, ast.AggFuncAvg, []expression.Expression{aggCol}, true) + require.NoError(t, err) + aggFuncsWithDistinct = append(aggFuncsWithDistinct, desc) + desc, err = aggregation.NewAggFuncDesc(ctx, ast.AggFuncCount, []expression.Expression{aggCol}, true) + require.NoError(t, err) + aggFuncsWithDistinct = append(aggFuncsWithDistinct, desc) + + groupConcatAggFuncs := make([]*aggregation.AggFuncDesc, 0, 4) + groupConcatWithoutDistinctWithoutOrderBy, err := aggregation.NewAggFuncDesc(ctx, ast.AggFuncGroupConcat, []expression.Expression{aggCol, aggCol}, false) + require.NoError(t, err) + groupConcatAggFuncs = append(groupConcatAggFuncs, groupConcatWithoutDistinctWithoutOrderBy) + groupConcatWithoutDistinctWithOrderBy, err := aggregation.NewAggFuncDesc(ctx, ast.AggFuncGroupConcat, []expression.Expression{aggCol, aggCol}, false) + require.NoError(t, err) + groupConcatWithoutDistinctWithOrderBy.OrderByItems = orderByItems + groupConcatAggFuncs = append(groupConcatAggFuncs, groupConcatWithoutDistinctWithOrderBy) + groupConcatWithDistinctWithoutOrderBy, err := aggregation.NewAggFuncDesc(ctx, ast.AggFuncGroupConcat, []expression.Expression{aggCol, aggCol}, true) + require.NoError(t, err) + groupConcatAggFuncs = append(groupConcatAggFuncs, groupConcatWithDistinctWithoutOrderBy) + groupConcatWithDistinctWithOrderBy, err := aggregation.NewAggFuncDesc(ctx, ast.AggFuncGroupConcat, []expression.Expression{aggCol, aggCol}, true) + require.NoError(t, err) + groupConcatWithDistinctWithOrderBy.OrderByItems = orderByItems + groupConcatAggFuncs = append(groupConcatAggFuncs, groupConcatWithDistinctWithOrderBy) + + // case 1 agg without distinct + checkResult(ctx, aggFuncs, emptyGroupByItems) + checkResult(ctx, aggFuncs, groupByItems) + + // case 2 agg with distinct + checkResult(ctx, aggFuncsWithDistinct, emptyGroupByItems) + checkResult(ctx, aggFuncsWithDistinct, groupByItems) + + // case 3 mixed with distinct and without distinct + mixedAggFuncs := make([]*aggregation.AggFuncDesc, 0, 10) + mixedAggFuncs = append(mixedAggFuncs, aggFuncs...) + mixedAggFuncs = append(mixedAggFuncs, aggFuncsWithDistinct...) + checkResult(ctx, mixedAggFuncs, emptyGroupByItems) + checkResult(ctx, mixedAggFuncs, groupByItems) + + // case 4 group concat + for _, groupConcatAggFunc := range groupConcatAggFuncs { + checkResult(ctx, []*aggregation.AggFuncDesc{groupConcatAggFunc}, emptyGroupByItems) + checkResult(ctx, []*aggregation.AggFuncDesc{groupConcatAggFunc}, groupByItems) + } + checkResult(ctx, groupConcatAggFuncs, emptyGroupByItems) + checkResult(ctx, groupConcatAggFuncs, groupByItems) + + // case 5 mixed group concat and other agg funcs + for _, groupConcatAggFunc := range groupConcatAggFuncs { + funcs := make([]*aggregation.AggFuncDesc, 0, 10) + funcs = append(funcs, groupConcatAggFunc) + funcs = append(funcs, aggFuncs...) + checkResult(ctx, funcs, emptyGroupByItems) + checkResult(ctx, funcs, groupByItems) + funcs = append(funcs, aggFuncsWithDistinct...) + checkResult(ctx, funcs, emptyGroupByItems) + checkResult(ctx, funcs, groupByItems) + } + mixedAggFuncs = append(mixedAggFuncs, groupConcatAggFuncs...) + checkResult(ctx, mixedAggFuncs, emptyGroupByItems) + checkResult(ctx, mixedAggFuncs, groupByItems) +} diff --git a/planner/core/task.go b/planner/core/task.go index 9afa73d108430..b507baaa2d323 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1492,11 +1492,13 @@ type AggInfo struct { // BuildFinalModeAggregation splits either LogicalAggregation or PhysicalAggregation to finalAgg and partial1Agg, // returns the information of partial and final agg. -// partialIsCop means whether partial agg is a cop task. +// partialIsCop means whether partial agg is a cop task. When partialIsCop is false, +// we do not set the AggMode for partialAgg cause it may be split further when +// building the aggregate executor(e.g. buildHashAgg will split the AggDesc further for parallel executing). +// firstRowFuncMap is a map between partial first_row to final first_row, will be used in RemoveUnnecessaryFirstRow func BuildFinalModeAggregation( - sctx sessionctx.Context, original *AggInfo, partialIsCop bool, isMPPTask bool) (partial, final *AggInfo, funcMap map[*aggregation.AggFuncDesc]*aggregation.AggFuncDesc) { - - funcMap = make(map[*aggregation.AggFuncDesc]*aggregation.AggFuncDesc, len(original.AggFuncs)) + sctx sessionctx.Context, original *AggInfo, partialIsCop bool, isMPPTask bool) (partial, final *AggInfo, firstRowFuncMap map[*aggregation.AggFuncDesc]*aggregation.AggFuncDesc) { + firstRowFuncMap = make(map[*aggregation.AggFuncDesc]*aggregation.AggFuncDesc, len(original.AggFuncs)) partial = &AggInfo{ AggFuncs: make([]*aggregation.AggFuncDesc, 0, len(original.AggFuncs)), GroupByItems: original.GroupByItems, @@ -1634,16 +1636,19 @@ func BuildFinalModeAggregation( sumAgg.RetTp = partial.Schema.Columns[partialCursor-1].GetType() partial.AggFuncs = append(partial.AggFuncs, cntAgg, sumAgg) } else if aggFunc.Name == ast.AggFuncApproxCountDistinct { - approxCountDistinctAgg := *aggFunc - approxCountDistinctAgg.Name = ast.AggFuncApproxCountDistinct - approxCountDistinctAgg.RetTp = partial.Schema.Columns[partialCursor-1].GetType() - partial.AggFuncs = append(partial.AggFuncs, &approxCountDistinctAgg) + newAggFunc := aggFunc.Clone() + newAggFunc.Name = aggFunc.Name + newAggFunc.RetTp = partial.Schema.Columns[partialCursor-1].GetType() + partial.AggFuncs = append(partial.AggFuncs, newAggFunc) } else { - partial.AggFuncs = append(partial.AggFuncs, aggFunc) + partialFuncDesc := aggFunc.Clone() + partial.AggFuncs = append(partial.AggFuncs, partialFuncDesc) + if aggFunc.Name == ast.AggFuncFirstRow { + firstRowFuncMap[partialFuncDesc] = finalAggFunc + } } finalAggFunc.Mode = aggregation.FinalMode - funcMap[aggFunc] = finalAggFunc } finalAggFunc.Args = args @@ -1651,6 +1656,11 @@ func BuildFinalModeAggregation( final.AggFuncs[i] = finalAggFunc } partial.Schema.Append(partialGbySchema.Columns...) + if partialIsCop { + for _, f := range partial.AggFuncs { + f.Mode = aggregation.Partial1Mode + } + } return } @@ -1724,7 +1734,7 @@ func (p *basePhysicalAgg) newPartialAggregate(copTaskType kv.StoreType, isMPPTas if !CheckAggCanPushCop(p.ctx, p.AggFuncs, p.GroupByItems, copTaskType) { return nil, p.self } - partialPref, finalPref, funcMap := BuildFinalModeAggregation(p.ctx, &AggInfo{ + partialPref, finalPref, firstRowFuncMap := BuildFinalModeAggregation(p.ctx, &AggInfo{ AggFuncs: p.AggFuncs, GroupByItems: p.GroupByItems, Schema: p.Schema().Clone(), @@ -1734,8 +1744,7 @@ func (p *basePhysicalAgg) newPartialAggregate(copTaskType kv.StoreType, isMPPTas } // Remove unnecessary FirstRow. partialPref.AggFuncs = RemoveUnnecessaryFirstRow(p.ctx, - finalPref.AggFuncs, finalPref.GroupByItems, - partialPref.AggFuncs, partialPref.GroupByItems, partialPref.Schema, funcMap) + finalPref.GroupByItems, partialPref.AggFuncs, partialPref.GroupByItems, partialPref.Schema, firstRowFuncMap) if copTaskType == kv.TiDB { // For partial agg of TiDB cop task, since TiDB coprocessor reuse the TiDB executor, // and TiDB aggregation executor won't output the group by value, @@ -1792,12 +1801,11 @@ func genFirstRowAggForGroupBy(ctx sessionctx.Context, groupByItems []expression. // Can optimize the schema to [count(b), a] , and change the index to get value. func RemoveUnnecessaryFirstRow( sctx sessionctx.Context, - finalAggFuncs []*aggregation.AggFuncDesc, finalGbyItems []expression.Expression, partialAggFuncs []*aggregation.AggFuncDesc, partialGbyItems []expression.Expression, partialSchema *expression.Schema, - funcMap map[*aggregation.AggFuncDesc]*aggregation.AggFuncDesc) []*aggregation.AggFuncDesc { + firstRowFuncMap map[*aggregation.AggFuncDesc]*aggregation.AggFuncDesc) []*aggregation.AggFuncDesc { partialCursor := 0 newAggFuncs := make([]*aggregation.AggFuncDesc, 0, len(partialAggFuncs)) @@ -1817,7 +1825,7 @@ func RemoveUnnecessaryFirstRow( } if gbyExpr.Equal(sctx, aggFunc.Args[0]) { canOptimize = true - funcMap[aggFunc].Args[0] = finalGbyItems[j] + firstRowFuncMap[aggFunc].Args[0] = finalGbyItems[j] break } }