Skip to content

Commit

Permalink
*: table partition dynamic prune mode with Physical Table ID from uni…
Browse files Browse the repository at this point in the history
…store (#31634)

close #29851
  • Loading branch information
mjonss authored Mar 3, 2022
1 parent 2878195 commit 493eb45
Show file tree
Hide file tree
Showing 31 changed files with 665 additions and 208 deletions.
28 changes: 27 additions & 1 deletion cmd/explaintest/r/generated_columns.result
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
set @@tidb_partition_prune_mode='dynamic';
DROP TABLE IF EXISTS person;
CREATE TABLE person (
id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
Expand Down Expand Up @@ -90,6 +89,8 @@ Projection 5.00 root test.sgc1.j1, test.sgc1.j2, test.sgc1.a, test.sgc1.b, test
└─TableReader(Probe) 5.00 root data:Selection
└─Selection 5.00 cop[tikv] not(isnull(test.sgc1.a))
└─TableFullScan 5.00 cop[tikv] table:sgc1 keep order:false
set @old_prune_mode = @@tidb_partition_prune_mode;
set @@tidb_partition_prune_mode='static';
DROP TABLE IF EXISTS sgc3;
CREATE TABLE sgc3 (
j JSON,
Expand Down Expand Up @@ -136,6 +137,31 @@ PartitionUnion 23263.33 root
└─TableReader 3323.33 root data:Selection
└─Selection 3323.33 cop[tikv] lt(test.sgc3.a, 7)
└─TableFullScan 10000.00 cop[tikv] table:sgc3, partition:max keep order:false, stats:pseudo
set @@tidb_partition_prune_mode='dynamic';
DROP TABLE sgc3;
CREATE TABLE sgc3 (
j JSON,
a INT AS (JSON_EXTRACT(j, "$.a")) STORED
)
PARTITION BY RANGE (a) (
PARTITION p0 VALUES LESS THAN (1),
PARTITION p1 VALUES LESS THAN (2),
PARTITION p2 VALUES LESS THAN (3),
PARTITION p3 VALUES LESS THAN (4),
PARTITION p4 VALUES LESS THAN (5),
PARTITION p5 VALUES LESS THAN (6),
PARTITION max VALUES LESS THAN MAXVALUE);
EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a <= 1;
id estRows task access object operator info
TableReader 3323.33 root partition:p0,p1 data:Selection
└─Selection 3323.33 cop[tikv] le(test.sgc3.a, 1)
└─TableFullScan 10000.00 cop[tikv] table:sgc3 keep order:false, stats:pseudo
EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a < 7;
id estRows task access object operator info
TableReader 3323.33 root partition:all data:Selection
└─Selection 3323.33 cop[tikv] lt(test.sgc3.a, 7)
└─TableFullScan 10000.00 cop[tikv] table:sgc3 keep order:false, stats:pseudo
set @@tidb_partition_prune_mode = @old_prune_mode;
DROP TABLE IF EXISTS t1;
CREATE TABLE t1(a INT, b INT AS (a+1) VIRTUAL, c INT AS (b+1) VIRTUAL, d INT AS (c+1) VIRTUAL, KEY(b), INDEX IDX(c, d));
INSERT INTO t1 (a) VALUES (0);
Expand Down
16 changes: 16 additions & 0 deletions cmd/explaintest/r/select.result
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ Projection_3 1.00 root sysdate()->Column#1, sleep(1)->Column#2, sysdate()->Colu
└─TableDual_4 1.00 root rows:1
drop table if exists th;
set @@session.tidb_enable_table_partition = '1';
set @@session.tidb_partition_prune_mode = 'static';
create table th (a int, b int) partition by hash(a) partitions 3;
insert into th values (0,0),(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8);
insert into th values (-1,-1),(-2,-2),(-3,-3),(-4,-4),(-5,-5),(-6,-6),(-7,-7),(-8,-8);
Expand All @@ -378,6 +379,21 @@ PartitionUnion_8 20000.00 root
│ └─TableFullScan_9 10000.00 cop[tikv] table:th, partition:p1 keep order:false, stats:pseudo
└─TableReader_12 10000.00 root data:TableFullScan_11
└─TableFullScan_11 10000.00 cop[tikv] table:th, partition:p2 keep order:false, stats:pseudo
set @@session.tidb_partition_prune_mode = 'dynamic';
desc select * from th where a=-2;
id estRows task access object operator info
TableReader_7 10.00 root partition:p2 data:Selection_6
└─Selection_6 10.00 cop[tikv] eq(test.th.a, -2)
└─TableFullScan_5 10000.00 cop[tikv] table:th keep order:false, stats:pseudo
desc select * from th;
id estRows task access object operator info
TableReader_5 10000.00 root partition:all data:TableFullScan_4
└─TableFullScan_4 10000.00 cop[tikv] table:th keep order:false, stats:pseudo
desc select * from th partition (p2,p1);
id estRows task access object operator info
TableReader_5 10000.00 root partition:p1,p2 data:TableFullScan_4
└─TableFullScan_4 10000.00 cop[tikv] table:th keep order:false, stats:pseudo
set @@session.tidb_partition_prune_mode = DEFAULT;
drop table if exists t;
create table t(a int, b int);
explain format = 'brief' select a != any (select a from t t2) from t t1;
Expand Down
23 changes: 22 additions & 1 deletion cmd/explaintest/t/generated_columns.test
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
-- Most of the cases are ported from other tests to make sure generated columns behaves the same.

-- Stored generated columns as indices
set @@tidb_partition_prune_mode='dynamic';

DROP TABLE IF EXISTS person;
CREATE TABLE person (
Expand Down Expand Up @@ -74,6 +73,8 @@ EXPLAIN format = 'brief' SELECT * from sgc1 join sgc2 on sgc1.a=sgc2.a;

-- Stored generated columns as partition columns

set @old_prune_mode = @@tidb_partition_prune_mode;
set @@tidb_partition_prune_mode='static';
DROP TABLE IF EXISTS sgc3;
CREATE TABLE sgc3 (
j JSON,
Expand All @@ -91,6 +92,26 @@ PARTITION max VALUES LESS THAN MAXVALUE);
EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a <= 1;
EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a < 7;

set @@tidb_partition_prune_mode='dynamic';
DROP TABLE sgc3;
CREATE TABLE sgc3 (
j JSON,
a INT AS (JSON_EXTRACT(j, "$.a")) STORED
)
PARTITION BY RANGE (a) (
PARTITION p0 VALUES LESS THAN (1),
PARTITION p1 VALUES LESS THAN (2),
PARTITION p2 VALUES LESS THAN (3),
PARTITION p3 VALUES LESS THAN (4),
PARTITION p4 VALUES LESS THAN (5),
PARTITION p5 VALUES LESS THAN (6),
PARTITION max VALUES LESS THAN MAXVALUE);

EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a <= 1;
EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a < 7;

set @@tidb_partition_prune_mode = @old_prune_mode;

-- Virtual generated columns as indices

DROP TABLE IF EXISTS t1;
Expand Down
6 changes: 6 additions & 0 deletions cmd/explaintest/t/select.test
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,18 @@ desc select sysdate(), sleep(1), sysdate();
# test select partition table
drop table if exists th;
set @@session.tidb_enable_table_partition = '1';
set @@session.tidb_partition_prune_mode = 'static';
create table th (a int, b int) partition by hash(a) partitions 3;
insert into th values (0,0),(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8);
insert into th values (-1,-1),(-2,-2),(-3,-3),(-4,-4),(-5,-5),(-6,-6),(-7,-7),(-8,-8);
desc select * from th where a=-2;
desc select * from th;
desc select * from th partition (p2,p1);
set @@session.tidb_partition_prune_mode = 'dynamic';
desc select * from th where a=-2;
desc select * from th;
desc select * from th partition (p2,p1);
set @@session.tidb_partition_prune_mode = DEFAULT;

# test != any(subq) and = all(subq)
drop table if exists t;
Expand Down
1 change: 1 addition & 0 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ func TestFailedAnalyzeRequest(t *testing.T) {
tk.MustExec("set @@tidb_analyze_version = 1")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/buildStatsFromResult", `return(true)`))
_, err := tk.Exec("analyze table t")
require.NotNil(t, err)
require.Equal(t, "mock buildStatsFromResult error", err.Error())
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/buildStatsFromResult"))
}
Expand Down
33 changes: 4 additions & 29 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,10 +717,10 @@ func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor
return src
}
e := &SelectLockExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), src),
Lock: v.Lock,
tblID2Handle: v.TblID2Handle,
partitionedTable: v.PartitionedTable,
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), src),
Lock: v.Lock,
tblID2Handle: v.TblID2Handle,
tblID2PhysTblIDCol: v.TblID2PhysTblIDCol,
}

// filter out temporary tables because they do not store any record in tikv and should not write any lock
Expand All @@ -736,16 +736,6 @@ func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor
}
}

if len(e.partitionedTable) > 0 {
schema := v.Schema()
e.tblID2PIDColumnIndex = make(map[int64]int)
for i := 0; i < len(v.ExtraPIDInfo.Columns); i++ {
col := v.ExtraPIDInfo.Columns[i]
tblID := v.ExtraPIDInfo.TblIDs[i]
offset := schema.ColumnIndex(col)
e.tblID2PIDColumnIndex[tblID] = offset
}
}
return e
}

Expand Down Expand Up @@ -3170,9 +3160,6 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
storeType: v.StoreType,
batchCop: v.BatchCop,
}
if tbl.Meta().Partition != nil {
e.extraPIDColumnIndex = extraPIDColumnIndex(v.Schema())
}
e.buildVirtualColumnInfo()
if containsLimit(dagReq.Executors) {
e.feedback = statistics.NewQueryFeedback(0, nil, 0, ts.Desc)
Expand Down Expand Up @@ -3203,15 +3190,6 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
return e, nil
}

func extraPIDColumnIndex(schema *expression.Schema) offsetOptional {
for idx, col := range schema.Columns {
if col.ID == model.ExtraPidColID {
return newOffset(idx)
}
}
return 0
}

func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Executor {
startTs, err := b.getSnapshotTS()
if err != nil {
Expand Down Expand Up @@ -3618,9 +3596,6 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn
tblPlans: v.TablePlans,
PushedLimit: v.PushedLimit,
}
if ok, _ := ts.IsPartition(); ok {
e.extraPIDColumnIndex = extraPIDColumnIndex(v.Schema())
}

if containsLimit(indexReq.Executors) {
e.feedback = statistics.NewQueryFeedback(0, nil, 0, is.Desc)
Expand Down
2 changes: 1 addition & 1 deletion executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {

datumRow := make([]types.Datum, 0, len(fields))
for i, field := range fields {
if columns[i].ID == model.ExtraPidColID {
if columns[i].ID == model.ExtraPidColID || columns[i].ID == model.ExtraPhysTblID {
continue
}

Expand Down
26 changes: 11 additions & 15 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,6 @@ type IndexLookUpExecutor struct {

stats *IndexLookUpRunTimeStats

// extraPIDColumnIndex is used for partition reader to add an extra partition ID column, default -1
extraPIDColumnIndex offsetOptional

// cancelFunc is called when close the executor
cancelFunc context.CancelFunc

Expand Down Expand Up @@ -676,18 +673,17 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup
table = task.partitionTable
}
tableReaderExec := &TableReaderExecutor{
baseExecutor: newBaseExecutor(e.ctx, e.schema, e.getTableRootPlanID()),
table: table,
dagPB: e.tableRequest,
startTS: e.startTS,
readReplicaScope: e.readReplicaScope,
isStaleness: e.isStaleness,
columns: e.columns,
streaming: e.tableStreaming,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
corColInFilter: e.corColInTblSide,
plans: e.tblPlans,
extraPIDColumnIndex: e.extraPIDColumnIndex,
baseExecutor: newBaseExecutor(e.ctx, e.schema, e.getTableRootPlanID()),
table: table,
dagPB: e.tableRequest,
startTS: e.startTS,
readReplicaScope: e.readReplicaScope,
isStaleness: e.isStaleness,
columns: e.columns,
streaming: e.tableStreaming,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
corColInFilter: e.corColInTblSide,
plans: e.tblPlans,
}
tableReaderExec.buildVirtualColumnInfo()
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, task.handles, true)
Expand Down
72 changes: 42 additions & 30 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,21 +930,47 @@ type SelectLockExec struct {
Lock *ast.SelectLockInfo
keys []kv.Key

// The children may be a join of multiple tables, so we need a map.
tblID2Handle map[int64][]plannercore.HandleCols

// All the partition tables in the children of this executor.
partitionedTable []table.PartitionedTable
// When SelectLock work on a partition table, we need the partition ID
// (Physical Table ID) instead of the 'logical' table ID to calculate
// the lock KV. In that case, the Physical Table ID is extracted
// from the row key in the store and as an extra column in the chunk row.

// When SelectLock work on the partition table, we need the partition ID
// instead of table ID to calculate the lock KV. In that case, partition ID is store as an
// extra column in the chunk row.
// tblID2PIDColumnIndex stores the column index in the chunk row. The children may be join
// of multiple tables, so the map struct is used.
tblID2PIDColumnIndex map[int64]int
// tblID2PhyTblIDCol is used for partitioned tables.
// The child executor need to return an extra column containing
// the Physical Table ID (i.e. from which partition the row came from)
// Used during building
tblID2PhysTblIDCol map[int64]*expression.Column

// Used during execution
// Map from logic tableID to column index where the physical table id is stored
// For dynamic prune mode, model.ExtraPhysTblID columns are requested from
// storage and used for physical table id
// For static prune mode, model.ExtraPhysTblID is still sent to storage/Protobuf
// but could be filled in by the partitions TableReaderExecutor
// due to issues with chunk handling between the TableReaderExecutor and the
// SelectReader result.
tblID2PhysTblIDColIdx map[int64]int
}

// Open implements the Executor Open interface.
func (e *SelectLockExec) Open(ctx context.Context) error {
if len(e.tblID2PhysTblIDCol) > 0 {
e.tblID2PhysTblIDColIdx = make(map[int64]int)
cols := e.Schema().Columns
for i := len(cols) - 1; i >= 0; i-- {
if cols[i].ID == model.ExtraPhysTblID {
for tblID, col := range e.tblID2PhysTblIDCol {
if cols[i].UniqueID == col.UniqueID {
e.tblID2PhysTblIDColIdx[tblID] = i
break
}
}
}
}
}
return e.baseExecutor.Open(ctx)
}

Expand All @@ -963,23 +989,17 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
if req.NumRows() > 0 {
iter := chunk.NewIterator4Chunk(req)
for row := iter.Begin(); row != iter.End(); row = iter.Next() {

for id, cols := range e.tblID2Handle {
physicalID := id
if len(e.partitionedTable) > 0 {
// Replace the table ID with partition ID.
// The partition ID is returned as an extra column from the table reader.
if offset, ok := e.tblID2PIDColumnIndex[id]; ok {
physicalID = row.GetInt64(offset)
}
}

for tblID, cols := range e.tblID2Handle {
for _, col := range cols {
handle, err := col.BuildHandle(row)
if err != nil {
return err
}
e.keys = append(e.keys, tablecodec.EncodeRowKeyWithHandle(physicalID, handle))
physTblID := tblID
if physTblColIdx, ok := e.tblID2PhysTblIDColIdx[tblID]; ok {
physTblID = row.GetInt64(physTblColIdx)
}
e.keys = append(e.keys, tablecodec.EncodeRowKeyWithHandle(physTblID, handle))
}
}
}
Expand All @@ -992,16 +1012,8 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
lockWaitTime = int64(e.Lock.WaitSec) * 1000
}

if len(e.tblID2Handle) > 0 {
for id := range e.tblID2Handle {
e.updateDeltaForTableID(id)
}
}
if len(e.partitionedTable) > 0 {
for _, p := range e.partitionedTable {
pid := p.Meta().ID
e.updateDeltaForTableID(pid)
}
for id := range e.tblID2Handle {
e.updateDeltaForTableID(id)
}

return doLockKeys(ctx, e.ctx, newLockCtx(e.ctx.GetSessionVars(), lockWaitTime, len(e.keys)), e.keys...)
Expand Down
Loading

0 comments on commit 493eb45

Please sign in to comment.