Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: not to generate partial agg when cop task has root condition… #15141

Merged
merged 7 commits into from
Mar 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,41 @@ func (s *testIntegrationSuite) TestSelPushDownTiFlash(c *C) {
))
}

func (s *testIntegrationSuite) TestIssue15110(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists crm_rd_150m")
tk.MustExec(`CREATE TABLE crm_rd_150m (
product varchar(256) DEFAULT NULL,
uks varchar(16) DEFAULT NULL,
brand varchar(256) DEFAULT NULL,
cin varchar(16) DEFAULT NULL,
created_date timestamp NULL DEFAULT NULL,
quantity int(11) DEFAULT NULL,
amount decimal(11,0) DEFAULT NULL,
pl_date timestamp NULL DEFAULT NULL,
customer_first_date timestamp NULL DEFAULT NULL,
recent_date timestamp NULL DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;`)

// Create virtual tiflash replica info.
dom := domain.GetDomain(tk.Se)
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
c.Assert(exists, IsTrue)
for _, tblInfo := range db.Tables {
if tblInfo.Name.L == "crm_rd_150m" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}

tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'")
tk.MustExec("explain SELECT count(*) FROM crm_rd_150m dataset_48 WHERE (CASE WHEN (month(dataset_48.customer_first_date)) <= 30 THEN '新客' ELSE NULL END) IS NOT NULL;")
}

func (s *testIntegrationSuite) TestReadFromStorageHint(c *C) {
tk := testkit.NewTestKit(c, s.store)

Expand Down
2 changes: 1 addition & 1 deletion planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ func (b *PlanBuilder) getPossibleAccessPaths(indexHints []*ast.IndexHint, tblInf

func (b *PlanBuilder) filterPathByIsolationRead(paths []*accessPath, dbName model.CIStr) ([]*accessPath, error) {
// TODO: filter paths with isolation read locations.
if dbName.L == mysql.SystemDB {
if dbName.L == mysql.SystemDB || dbName.L == "information_schema" || dbName.L == "performance_schema" {
return paths, nil
}
cfgIsolationEngines := set.StringSet{}
Expand Down
53 changes: 29 additions & 24 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ func (p *PhysicalLimit) attach2Task(tasks ...task) task {
if cop, ok := t.(*copTask); ok {
// For double read which requires order being kept, the limit cannot be pushed down to the table side,
// because handles would be reordered before being sent to table scan.
if !cop.keepOrder || !cop.indexPlanFinished || cop.indexPlan == nil {
if (!cop.keepOrder || !cop.indexPlanFinished || cop.indexPlan == nil) && len(cop.rootTaskConds) == 0 {
// When limit is pushed down, we should remove its offset.
newCount := p.Offset + p.Count
childProfile := cop.plan().statsInfo()
Expand Down Expand Up @@ -654,7 +654,7 @@ func (p *PhysicalTopN) getPushedDownTopN(childPlan PhysicalPlan) *PhysicalTopN {
func (p *PhysicalTopN) attach2Task(tasks ...task) task {
t := tasks[0].copy()
inputCount := t.count()
if copTask, ok := t.(*copTask); ok && p.canPushDown() {
if copTask, ok := t.(*copTask); ok && p.canPushDown() && len(copTask.rootTaskConds) == 0 {
// If all columns in topN are from index plan, we push it to index plan, otherwise we finish the index plan and
// push it to table plan.
var pushedDownTopN *PhysicalTopN
Expand Down Expand Up @@ -824,7 +824,7 @@ func (p *PhysicalStreamAgg) attach2Task(tasks ...task) task {
// The `doubleReadNeedProj` is always set if the double read needs to keep order. So we just use it to decided
// whether the following plan is double read with order reserved.
copToFlash := isFlashCopTask(cop)
if !cop.doubleReadNeedProj {
if !cop.doubleReadNeedProj && len(cop.rootTaskConds) == 0 {
partialAgg, finalAgg := p.newPartialAggregate(copToFlash)
if partialAgg != nil {
if cop.tablePlan != nil {
Expand Down Expand Up @@ -901,29 +901,34 @@ func (p *PhysicalHashAgg) attach2Task(tasks ...task) task {
t := tasks[0].copy()
inputRows := t.count()
if cop, ok := t.(*copTask); ok {
// copToFlash means whether the cop task is running on flash storage
copToFlash := isFlashCopTask(cop)
partialAgg, finalAgg := p.newPartialAggregate(copToFlash)
if partialAgg != nil {
if cop.tablePlan != nil {
cop.finishIndexPlan()
partialAgg.SetChildren(cop.tablePlan)
cop.tablePlan = partialAgg
} else {
partialAgg.SetChildren(cop.indexPlan)
cop.indexPlan = partialAgg
if len(cop.rootTaskConds) == 0 {
// copToFlash means whether the cop task is running on flash storage
copToFlash := isFlashCopTask(cop)
partialAgg, finalAgg := p.newPartialAggregate(copToFlash)
if partialAgg != nil {
if cop.tablePlan != nil {
cop.finishIndexPlan()
partialAgg.SetChildren(cop.tablePlan)
cop.tablePlan = partialAgg
} else {
partialAgg.SetChildren(cop.indexPlan)
cop.indexPlan = partialAgg
}
cop.addCost(p.GetCost(inputRows, false))
}
cop.addCost(p.GetCost(inputRows, false))
// In `newPartialAggregate`, we are using stats of final aggregation as stats
// of `partialAgg`, so the network cost of transferring result rows of `partialAgg`
// to TiDB is normally under-estimated for hash aggregation, since the group-by
// column may be independent of the column used for region distribution, so a closer
// estimation of network cost for hash aggregation may multiply the number of
// regions involved in the `partialAgg`, which is unknown however.
t = finishCopTask(p.ctx, cop)
inputRows = t.count()
attachPlan2Task(finalAgg, t)
} else {
t = finishCopTask(p.ctx, cop)
attachPlan2Task(p, t)
}
// In `newPartialAggregate`, we are using stats of final aggregation as stats
// of `partialAgg`, so the network cost of transferring result rows of `partialAgg`
// to TiDB is normally under-estimated for hash aggregation, since the group-by
// column may be independent of the column used for region distribution, so a closer
// estimation of network cost for hash aggregation may multiply the number of
// regions involved in the `partialAgg`, which is unknown however.
t = finishCopTask(p.ctx, cop)
inputRows = t.count()
attachPlan2Task(finalAgg, t)
} else {
attachPlan2Task(p, t)
}
Expand Down