Skip to content

Commit

Permalink
planner: fix a bug when pushing streamAgg down (#41056)
Browse files Browse the repository at this point in the history
close #40857
  • Loading branch information
Dousir9 authored Feb 12, 2023
1 parent 2baf8c2 commit 288c2dd
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 8 deletions.
10 changes: 5 additions & 5 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1495,16 +1495,16 @@ func (ds *DataSource) convertToIndexScan(prop *property.PhysicalProperty,
}
}
if candidate.isMatchProp {
if cop.tablePlan != nil && !ds.tableInfo.IsCommonHandle {
col, isNew := cop.tablePlan.(*PhysicalTableScan).appendExtraHandleCol(ds)
cop.extraHandleCol = col
cop.needExtraProj = cop.needExtraProj || isNew
}
cop.keepOrder = true
// IndexScan on partition table can't keep order.
if ds.tableInfo.GetPartitionInfo() != nil {
return invalidTask, nil
}
if cop.tablePlan != nil && !ds.tableInfo.IsCommonHandle {
col, isNew := cop.tablePlan.(*PhysicalTableScan).appendExtraHandleCol(ds)
cop.extraHandleCol = col
cop.needExtraProj = cop.needExtraProj || isNew
}
}
if cop.needExtraProj {
cop.originSchema = ds.schema
Expand Down
11 changes: 11 additions & 0 deletions planner/core/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,17 @@ func TestIssue34863(t *testing.T) {
tk.MustQuery("select count(o.c_id) from c right join o on c.c_id=o.c_id;").Check(testkit.Rows("5"))
}

func TestIssue40857(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("drop table if exists t;")
tk.MustExec("CREATE TABLE t (c1 mediumint(9) DEFAULT '-4747160',c2 year(4) NOT NULL DEFAULT '2075',c3 double DEFAULT '1.1559030660251948',c4 enum('wbv4','eli','d8ym','m3gsx','lz7td','o','d1k7l','y1x','xcxq','bj','n7') DEFAULT 'xcxq',c5 int(11) DEFAULT '255080866',c6 tinyint(1) DEFAULT '1',PRIMARY KEY (c2),KEY `c4d86d54-091c-4307-957b-b164c9652b7f` (c6,c4) );")
tk.MustExec("insert into t values (-4747160, 2075, 722.5719203870632, 'xcxq', 1576824797, 1);")
tk.MustExec("select /*+ stream_agg() */ bit_or(t.c5) as r0 from t where t.c3 in (select c6 from t where not(t.c6 <> 1) and not(t.c3 in(9263.749352636818))) group by t.c1;")
require.Empty(t, tk.Session().LastMessage())
}

func TestCloneFineGrainedShuffleStreamCount(t *testing.T) {
window := &core.PhysicalWindow{}
newPlan, err := window.Clone()
Expand Down
6 changes: 3 additions & 3 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1784,9 +1784,9 @@ func (p *PhysicalStreamAgg) attach2Task(tasks ...task) task {
t := tasks[0].copy()
if cop, ok := t.(*copTask); ok {
// We should not push agg down across double read, since the data of second read is ordered by handle instead of index.
// The `extraHandleCol` is added if the double read needs to keep order. So we just use it to decided
// whether the following plan is double read with order reserved.
if cop.extraHandleCol != nil || len(cop.rootTaskConds) > 0 {
// We use (cop.indexPlan != nil && cop.tablePlan != nil && cop.keepOrder) to decided whether the following plan is double
// read with order reserved.
if (cop.indexPlan != nil && cop.tablePlan != nil && cop.keepOrder) || len(cop.rootTaskConds) > 0 {
t = cop.convertToRootTask(p.ctx)
attachPlan2Task(p, t)
} else {
Expand Down

0 comments on commit 288c2dd

Please sign in to comment.