Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: Fix for dynamic partition pruning and union_scan #30734

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 6 additions & 29 deletions cmd/explaintest/r/generated_columns.result
Original file line number Diff line number Diff line change
Expand Up @@ -105,37 +105,14 @@ PARTITION p5 VALUES LESS THAN (6),
PARTITION max VALUES LESS THAN MAXVALUE);
EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a <= 1;
id estRows task access object operator info
PartitionUnion 6646.67 root
├─TableReader 3323.33 root data:Selection
│ └─Selection 3323.33 cop[tikv] le(test.sgc3.a, 1)
│ └─TableFullScan 10000.00 cop[tikv] table:sgc3, partition:p0 keep order:false, stats:pseudo
└─TableReader 3323.33 root data:Selection
└─Selection 3323.33 cop[tikv] le(test.sgc3.a, 1)
└─TableFullScan 10000.00 cop[tikv] table:sgc3, partition:p1 keep order:false, stats:pseudo
TableReader 3323.33 root partition:p0,p1 data:Selection
└─Selection 3323.33 cop[tikv] le(test.sgc3.a, 1)
└─TableFullScan 10000.00 cop[tikv] table:sgc3 keep order:false, stats:pseudo
EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a < 7;
id estRows task access object operator info
PartitionUnion 23263.33 root
├─TableReader 3323.33 root data:Selection
│ └─Selection 3323.33 cop[tikv] lt(test.sgc3.a, 7)
│ └─TableFullScan 10000.00 cop[tikv] table:sgc3, partition:p0 keep order:false, stats:pseudo
├─TableReader 3323.33 root data:Selection
│ └─Selection 3323.33 cop[tikv] lt(test.sgc3.a, 7)
│ └─TableFullScan 10000.00 cop[tikv] table:sgc3, partition:p1 keep order:false, stats:pseudo
├─TableReader 3323.33 root data:Selection
│ └─Selection 3323.33 cop[tikv] lt(test.sgc3.a, 7)
│ └─TableFullScan 10000.00 cop[tikv] table:sgc3, partition:p2 keep order:false, stats:pseudo
├─TableReader 3323.33 root data:Selection
│ └─Selection 3323.33 cop[tikv] lt(test.sgc3.a, 7)
│ └─TableFullScan 10000.00 cop[tikv] table:sgc3, partition:p3 keep order:false, stats:pseudo
├─TableReader 3323.33 root data:Selection
│ └─Selection 3323.33 cop[tikv] lt(test.sgc3.a, 7)
│ └─TableFullScan 10000.00 cop[tikv] table:sgc3, partition:p4 keep order:false, stats:pseudo
├─TableReader 3323.33 root data:Selection
│ └─Selection 3323.33 cop[tikv] lt(test.sgc3.a, 7)
│ └─TableFullScan 10000.00 cop[tikv] table:sgc3, partition:p5 keep order:false, stats:pseudo
└─TableReader 3323.33 root data:Selection
└─Selection 3323.33 cop[tikv] lt(test.sgc3.a, 7)
└─TableFullScan 10000.00 cop[tikv] table:sgc3, partition:max keep order:false, stats:pseudo
TableReader 3323.33 root partition:all data:Selection
└─Selection 3323.33 cop[tikv] lt(test.sgc3.a, 7)
└─TableFullScan 10000.00 cop[tikv] table:sgc3 keep order:false, stats:pseudo
DROP TABLE IF EXISTS t1;
CREATE TABLE t1(a INT, b INT AS (a+1) VIRTUAL, c INT AS (b+1) VIRTUAL, d INT AS (c+1) VIRTUAL, KEY(b), INDEX IDX(c, d));
INSERT INTO t1 (a) VALUES (0);
Expand Down
22 changes: 7 additions & 15 deletions cmd/explaintest/r/select.result
Original file line number Diff line number Diff line change
Expand Up @@ -359,25 +359,17 @@ insert into th values (0,0),(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8);
insert into th values (-1,-1),(-2,-2),(-3,-3),(-4,-4),(-5,-5),(-6,-6),(-7,-7),(-8,-8);
desc select * from th where a=-2;
id estRows task access object operator info
TableReader_9 10.00 root data:Selection_8
└─Selection_8 10.00 cop[tikv] eq(test.th.a, -2)
└─TableFullScan_7 10000.00 cop[tikv] table:th, partition:p2 keep order:false, stats:pseudo
TableReader_7 10.00 root partition:p2 data:Selection_6
└─Selection_6 10.00 cop[tikv] eq(test.th.a, -2)
└─TableFullScan_5 10000.00 cop[tikv] table:th keep order:false, stats:pseudo
desc select * from th;
id estRows task access object operator info
PartitionUnion_9 30000.00 root
├─TableReader_11 10000.00 root data:TableFullScan_10
│ └─TableFullScan_10 10000.00 cop[tikv] table:th, partition:p0 keep order:false, stats:pseudo
├─TableReader_13 10000.00 root data:TableFullScan_12
│ └─TableFullScan_12 10000.00 cop[tikv] table:th, partition:p1 keep order:false, stats:pseudo
└─TableReader_15 10000.00 root data:TableFullScan_14
└─TableFullScan_14 10000.00 cop[tikv] table:th, partition:p2 keep order:false, stats:pseudo
TableReader_5 10000.00 root partition:all data:TableFullScan_4
└─TableFullScan_4 10000.00 cop[tikv] table:th keep order:false, stats:pseudo
desc select * from th partition (p2,p1);
id estRows task access object operator info
PartitionUnion_8 20000.00 root
├─TableReader_10 10000.00 root data:TableFullScan_9
│ └─TableFullScan_9 10000.00 cop[tikv] table:th, partition:p1 keep order:false, stats:pseudo
└─TableReader_12 10000.00 root data:TableFullScan_11
└─TableFullScan_11 10000.00 cop[tikv] table:th, partition:p2 keep order:false, stats:pseudo
TableReader_5 10000.00 root partition:p1,p2 data:TableFullScan_4
└─TableFullScan_4 10000.00 cop[tikv] table:th keep order:false, stats:pseudo
drop table if exists t;
create table t(a int, b int);
explain format = 'brief' select a != any (select a from t t2) from t t1;
Expand Down
1 change: 1 addition & 0 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@ func (getter *PessimisticLockCacheGetter) Get(_ context.Context, key kv.Key) ([]
return nil, kv.ErrNotExist
}

// TODO: move to table/tables/partition.go and deduplicate?
func getPhysID(tblInfo *model.TableInfo, partitionExpr *tables.PartitionExpr, intVal int64) (int64, error) {
pi := tblInfo.GetPartitionInfo()
if pi == nil {
Expand Down
23 changes: 22 additions & 1 deletion executor/union_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,28 @@ func (us *UnionScanExec) getSnapshotRow(ctx context.Context) ([]types.Datum, err
if err != nil {
return nil, err
}
checkKey := tablecodec.EncodeRecordKey(us.table.RecordPrefix(), snapshotHandle)
// In partition prune mode == 'dynamic', this is wrong. It will use the tableID,
// not the physicalID that the memBufSnap used (since it will later be used for storage at commit)
// How can we get the physicalID from the read?!?
// Since it seems to be lost through the Next API call, I can only come up with it needs to be reconstructed
// from the handle -> partitionID -> physicalID :(
// But where?

var checkKey kv.Key
if pt, ok := us.table.(table.PartitionedTable); ok && us.ctx.GetSessionVars().UseDynamicPartitionPrune() {
var rowDatums []types.Datum
fts := us.belowHandleCols.GetFieldsTypes()
for i := 0; i < us.belowHandleCols.NumCols(); i++ {
rowDatums = append(rowDatums, row.GetDatum(i, fts[i]))
}
physTbl, err := pt.GetPartitionByRow(us.ctx, rowDatums)
if err != nil {
return nil, err
}
checkKey = tablecodec.EncodeRecordKey(physTbl.RecordPrefix(), snapshotHandle)
} else {
checkKey = tablecodec.EncodeRecordKey(us.table.RecordPrefix(), snapshotHandle)
}
if _, err := us.memBufSnap.Get(context.TODO(), checkKey); err == nil {
// If src handle appears in added rows, it means there is conflict and the transaction will fail to
// commit, but for simplicity, we don't handle it here.
Expand Down
11 changes: 11 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3345,13 +3345,24 @@ func (s *testIntegrationSuite) TestIssue26719(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec(`create table tx (a int) partition by range (a) (partition p0 values less than (10), partition p1 values less than (20))`)
defer tk.MustExec("drop table tx")
tk.MustExec(`insert into tx values (1)`)
tk.MustExec("set @@tidb_partition_prune_mode='dynamic'")

tk.MustExec(`begin`)
tk.MustExec(`delete from tx where a in (1)`)
tk.MustQuery(`select * from tx PARTITION(p0)`).Check(testkit.Rows())
tk.MustQuery(`explain format = 'brief' select * from tx PARTITION(p0)`).Check(testkit.Rows(
"Projection 10000.00 root test.tx.a",
"└─UnionScan 10000.00 root ",
" └─TableReader 10000.00 root partition:p0 data:TableFullScan",
" └─TableFullScan 10000.00 cop[tikv] table:tx keep order:false, stats:pseudo"))
tk.MustQuery(`select * from tx`).Check(testkit.Rows())
tk.MustQuery(`explain format = 'brief' select * from tx`).Check(testkit.Rows(
"Projection 10000.00 root test.tx.a",
"└─UnionScan 10000.00 root ",
" └─TableReader 10000.00 root partition:all data:TableFullScan",
" └─TableFullScan 10000.00 cop[tikv] table:tx keep order:false, stats:pseudo"))
tk.MustExec(`rollback`)
}

Expand Down
4 changes: 3 additions & 1 deletion planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func (p *PhysicalTableReader) GetTableScan() *PhysicalTableScan {
if !ok {
return nil
}
//TODO: Should glovalChildIndex be a bool or can this be out-of-bands, i.e. > 1 => negative index?
curPlan = join.children[1-join.globalChildIndex]
}
}
Expand Down Expand Up @@ -307,6 +308,7 @@ func (p *PhysicalIndexLookUpReader) Clone() (PhysicalPlan, error) {
if p.PushedLimit != nil {
cloned.PushedLimit = p.PushedLimit.Clone()
}
// TODO: Check if not PartitionInfo should be cloned as well?
return cloned, nil
}

Expand Down Expand Up @@ -477,7 +479,7 @@ type PhysicalTableScan struct {

IsGlobalRead bool

// The table scan may be a partition, rather than a real table.
// The table scan may be a single partition, rather than a real table.
// TODO: clean up this field. After we support dynamic partitioning, table scan
// works on the whole partition table, and `isPartition` is not used.
isPartition bool
Expand Down
1 change: 1 addition & 0 deletions planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -1526,6 +1526,7 @@ func buildHandleCols(ctx sessionctx.Context, tbl *model.TableInfo, schema *expre
return &IntHandleCols{col: handleCol}
}

// TODO: Where is this actually used? Does not seem to handle LIST/RANGE COLUMNS or actual expressions, just int columns?!
func getPartitionInfo(ctx sessionctx.Context, tbl *model.TableInfo, pairs []nameValuePair) (*model.PartitionDefinition, int, bool) {
partitionExpr := getPartitionExpr(ctx, tbl)
if partitionExpr == nil {
Expand Down
1 change: 1 addition & 0 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1891,6 +1891,7 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm
case *plannercore.Update:
updateStmt := stmt.(*ast.UpdateStmt)
if pp, ok := x.SelectPlan.(*plannercore.PointGetPlan); ok {
// TODO: Should this not check dynamic pruning mode? Issue with PhysicalTableID?
if pp.PartitionInfo != nil {
continue
}
Expand Down
5 changes: 0 additions & 5 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1021,11 +1021,6 @@ func (s *SessionVars) CheckAndGetTxnScope() string {

// UseDynamicPartitionPrune indicates whether use new dynamic partition prune.
func (s *SessionVars) UseDynamicPartitionPrune() bool {
if s.InTxn() || !s.GetStatusFlag(mysql.ServerStatusAutocommit) {
// UnionScan cannot get partition table IDs in dynamic-mode, this is a quick-fix for issues/26719,
// please see it for more details.
return false
}
return PartitionPruneMode(s.PartitionPruneMode.Load()) == Dynamic
}

Expand Down