Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix 'select for update' on partitioned table again #30732

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 36 additions & 12 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package executor
import (
"bytes"
"context"
"fmt"
"math"
"sort"
"strconv"
Expand Down Expand Up @@ -631,10 +632,9 @@ func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor
return src
}
e := &SelectLockExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), src),
Lock: v.Lock,
tblID2Handle: v.TblID2Handle,
partitionedTable: v.PartitionedTable,
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), src),
Lock: v.Lock,
tblID2Handle: v.TblID2Handle,
}

// filter out temporary tables because they do not store any record in tikv and should not write any lock
Expand All @@ -650,14 +650,38 @@ func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor
}
}

if len(e.partitionedTable) > 0 {
schema := v.Schema()
e.tblID2PIDColumnIndex = make(map[int64]int)
for i := 0; i < len(v.ExtraPIDInfo.Columns); i++ {
col := v.ExtraPIDInfo.Columns[i]
tblID := v.ExtraPIDInfo.TblIDs[i]
offset := schema.ColumnIndex(col)
e.tblID2PIDColumnIndex[tblID] = offset
if len(v.PartitionedTable) > 0 {
e.partitionedTable = make(map[int64]table.PartitionedTable)
e.tblID2PtColsOffsets = make(map[int64][]int)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this? Is not the e.tblID2Handle enough?


for _, pt := range v.PartitionedTable {
tblInfo := pt.Meta()
e.partitionedTable[tblInfo.ID] = pt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the tblInfo.ID the logical table or the physical table (partition) ID here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tblInfo.ID is the logical table ID here

dbInfo, ok := is.SchemaByTable(tblInfo)
if !ok {
b.err = errors.Trace(errors.Errorf("Cannot get schema info for table %s", tblInfo.Name.O))
return nil
}

cols := pt.VisibleCols()
offsets := make([]int, 0, len(cols))
for _, colInfo := range cols {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why create this double loop for each partition?
Are not all the partitions the same?
I wonder if we cannot use the HandleCols from tblID2Handle instead.
I think I managed something like that in my hack here.

colName := fmt.Sprintf("%s.%s.%s", dbInfo.Name.L, tblInfo.Name.L, colInfo.Name.L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will there be some colume names not in this format like _tidb_rowid that it could not be found in the schema?


match := false
for i, col := range e.schema.Columns {
if col.OrigName == colName {
match = true
offsets = append(offsets, i)
break
}
}
if !match {
b.err = errors.Trace(errors.Errorf("Table %s column %s cannot find data with select result", tblInfo.Name.O, colInfo.Name.L))
return nil
}
}
e.tblID2PtColsOffsets[tblInfo.ID] = offsets
}
}
return e
Expand Down
62 changes: 49 additions & 13 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,14 +893,22 @@ type SelectLockExec struct {
tblID2Handle map[int64][]plannercore.HandleCols

// All the partition tables in the children of this executor.
partitionedTable []table.PartitionedTable

// When SelectLock work on the partition table, we need the partition ID
// instead of table ID to calculate the lock KV. In that case, partition ID is store as an
// extra column in the chunk row.
// tblID2PIDColumnIndex stores the column index in the chunk row. The children may be join
// of multiple tables, so the map struct is used.
tblID2PIDColumnIndex map[int64]int
partitionedTable map[int64]table.PartitionedTable

// When SelectLock work on the partition table, we need the partition ID instead
// of table ID to calculate the lock KV.
//
// To get the partition ID, we keep the partition columns not pruned and use those columns
// to locate the partition (by partition pruning again), and finally get the physicalID.
//
// tid => partition column offets
// partition column offsets + chunk row => partition columns
// table.GetPartitionByRow(partition columns) => partition
// pid = partition.GetPhysicalID()
//
// tblID2PColsOffsets stores tableID => offsets of the partition columns in the chunk row.
// The children executor may involve join of multiple tables, so the map struct is used.
tblID2PtColsOffsets map[int64][]int
}

// Open implements the Executor Open interface.
Expand All @@ -926,12 +934,20 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {

for id, cols := range e.tblID2Handle {
physicalID := id
if len(e.partitionedTable) > 0 {
// Replace the table ID with partition ID.
// The partition ID is returned as an extra column from the table reader.
if offset, ok := e.tblID2PIDColumnIndex[id]; ok {
physicalID = row.GetInt64(offset)

// If the table is a partitioned table, replace the table ID with partition ID.
if pt, ok := e.partitionedTable[id]; ok {
ptRowData, err := e.constructPartitionTableRow(row, id)
if err != nil {
return err
}

p, err := pt.GetPartitionByRow(e.ctx, ptRowData)
if err != nil {
return err
}

physicalID = p.GetPhysicalID()
}

for _, col := range cols {
Expand Down Expand Up @@ -967,6 +983,26 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
return doLockKeys(ctx, e.ctx, newLockCtx(e.ctx.GetSessionVars(), lockWaitTime), e.keys...)
}

func (e *SelectLockExec) constructPartitionTableRow(row chunk.Row, id int64) ([]types.Datum, error) {
rowDatums := row.GetDatumRow(e.base().retFieldTypes)
numDatums := len(rowDatums)
if len(e.schema.Columns) != numDatums {
return nil, errors.Trace(errors.Errorf("Columns length not match row fields length"))
}
proj, ok := e.tblID2PtColsOffsets[id]
if !ok {
return nil, errors.Trace(errors.Errorf("Cannot get column maps"))
}
ret := make([]types.Datum, 0, numDatums)
for _, idx := range proj {
if idx >= numDatums {
return nil, errors.Trace(errors.Errorf("Column maps index is overflow!"))
}
ret = append(ret, rowDatums[idx])
}
return ret, nil
}

func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *tikvstore.LockCtx {
lockCtx := tikvstore.NewLockCtx(seVars.TxnCtx.GetForUpdateTS(), lockWaitTime, seVars.StmtCtx.GetLockWaitStartTime())
lockCtx.Killed = &seVars.Killed
Expand Down
14 changes: 14 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9501,3 +9501,17 @@ func (s *testSerialSuite) TestIssue30289(c *C) {
err := tk.QueryToErr("select /*+ hash_join(t1) */ * from t t1 join t t2 on t1.a=t2.a")
c.Assert(err.Error(), Matches, "issue30289 build return error")
}

func (s *testSuiteP1) TestIssue30382(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("set @@session.tidb_enable_list_partition = ON;")
tk.MustExec("drop table if exists t1, t2;")
tk.MustExec("create table t1 (c_int int, c_str varchar(40), c_decimal decimal(12, 6), primary key (c_int) , key(c_str(2)) , key(c_decimal) ) partition by list (c_int) ( partition p0 values IN (1, 5, 9, 13, 17, 21, 25, 29, 33, 37), partition p1 values IN (2, 6, 10, 14, 18, 22, 26, 30, 34, 38), partition p2 values IN (3, 7, 11, 15, 19, 23, 27, 31, 35, 39), partition p3 values IN (4, 8, 12, 16, 20, 24, 28, 32, 36, 40)) ;")
tk.MustExec("create table t2 (c_int int, c_str varchar(40), c_decimal decimal(12, 6), primary key (c_int) , key(c_str) , key(c_decimal) ) partition by hash (c_int) partitions 4;")
tk.MustExec("insert into t1 values (6, 'musing mayer', 1.280), (7, 'wizardly heisenberg', 6.589), (8, 'optimistic swirles', 9.633), (9, 'hungry haslett', 2.659), (10, 'stupefied wiles', 2.336);")
tk.MustExec("insert into t2 select * from t1 ;")
tk.MustExec("begin;")
tk.MustQuery("select * from t1 where c_str <> any (select c_str from t2 where c_decimal < 5) for update;")
tk.MustExec("commit")
}
80 changes: 80 additions & 0 deletions executor/partition_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3120,3 +3120,83 @@ func (s *partitionTableSuite) TestIssue26251(c *C) {
c.Fail()
}
}

func (s *partitionTableSuite) TestLeftJoinForUpdate(c *C) {
tk1 := testkit.NewTestKit(c, s.store)
tk1.MustExec("use test")
tk2 := testkit.NewTestKit(c, s.store)
tk2.MustExec("use test")
tk3 := testkit.NewTestKit(c, s.store)
tk3.MustExec("use test")

tk1.MustExec("drop table if exists nt, pt")
tk1.MustExec("create table nt (id int, col varchar(32), primary key (id))")
tk1.MustExec("create table pt (id int, col varchar(32), primary key (id)) partition by hash(id) partitions 4")

resetData := func() {
tk1.MustExec("truncate table nt")
tk1.MustExec("truncate table pt")
tk1.MustExec("insert into nt values (1, 'hello')")
tk1.MustExec("insert into pt values (2, 'test')")
}

// ========================== First round of test ==================
// partition table left join normal table.
// =================================================================
resetData()
ch := make(chan int, 10)
tk1.MustExec("begin pessimistic")
// No union scan
tk1.MustQuery("select * from pt left join nt on pt.id = nt.id for update").Check(testkit.Rows("2 test <nil> <nil>"))
go func() {
// Check the key is locked.
tk2.MustExec("update pt set col = 'xxx' where id = 2")
ch <- 2
}()

// Union scan
tk1.MustExec("insert into pt values (1, 'world')")
tk1.MustQuery("select * from pt left join nt on pt.id = nt.id for update").Sort().Check(testkit.Rows("1 world 1 hello", "2 test <nil> <nil>"))
go func() {
// Check the key is locked.
tk3.MustExec("update nt set col = 'yyy' where id = 1")
ch <- 3
}()

// Give chance for the goroutines to run first.
time.Sleep(80 * time.Millisecond)
ch <- 1
tk1.MustExec("rollback")

checkOrder := func() {
c.Assert(<-ch, Equals, 1)
v1 := <-ch
v2 := <-ch
c.Assert((v1 == 2 && v2 == 3) || (v1 == 3 && v2 == 2), IsTrue)
}
checkOrder()

// ========================== Another round of test ==================
// normal table left join partition table.
// ===================================================================
resetData()
tk1.MustExec("begin pessimistic")
// No union scan
tk1.MustQuery("select * from nt left join pt on pt.id = nt.id for update").Check(testkit.Rows("1 hello <nil> <nil>"))

// Union scan
tk1.MustExec("insert into pt values (1, 'world')")
tk1.MustQuery("select * from nt left join pt on pt.id = nt.id for update").Check(testkit.Rows("1 hello 1 world"))
go func() {
tk2.MustExec("replace into pt values (1, 'aaa')")
ch <- 2
}()
go func() {
tk3.MustExec("update nt set col = 'bbb' where id = 1")
ch <- 3
}()
time.Sleep(80 * time.Millisecond)
ch <- 1
tk1.MustExec("rollback")
checkOrder()
}
38 changes: 38 additions & 0 deletions executor/union_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"testing"

"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -416,3 +417,40 @@ func TestForApplyAndUnionScan(t *testing.T) {
tk.MustQuery("select c_int, c_str from t where (select count(*) from t1 where t1.c_int in (t.c_int, t.c_int + 2, t.c_int + 10)) > 2").Check(testkit.Rows())
tk.MustExec("rollback")
}

func TestIssue28073(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2")
tk.MustExec("create table t1 (c_int int, c_str varchar(40), primary key (c_int, c_str) , key(c_int)) partition by hash (c_int) partitions 4")
tk.MustExec("create table t2 like t1")
tk.MustExec("insert into t1 values (1, 'flamboyant mcclintock')")
tk.MustExec("insert into t2 select * from t1")

tk.MustExec("begin")
tk.MustExec("insert into t2 (c_int, c_str) values (2, 'romantic grothendieck')")
tk.MustQuery("select * from t2 left join t1 on t1.c_int = t2.c_int for update").Sort().Check(
testkit.Rows(
"1 flamboyant mcclintock 1 flamboyant mcclintock",
"2 romantic grothendieck <nil> <nil>",
))
tk.MustExec("commit")

// Check no key is written to table ID 0
txn, err := store.Begin()
require.NoError(t, err)
start := tablecodec.EncodeTablePrefix(0)
end := tablecodec.EncodeTablePrefix(1)
iter, err := txn.Iter(start, end)
require.NoError(t, err)

exist := false
for iter.Valid() {
require.Nil(t, iter.Next())
exist = true
break
}
require.False(t, exist)
}
1 change: 0 additions & 1 deletion planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -2738,7 +2738,6 @@ func (p *LogicalLock) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P
Lock: p.Lock,
TblID2Handle: p.tblID2Handle,
PartitionedTable: p.partitionedTable,
ExtraPIDInfo: p.extraPIDInfo,
}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), childProp)
return []PhysicalPlan{lock}, true, nil
}
Expand Down
26 changes: 0 additions & 26 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3781,32 +3781,6 @@ func (ds *DataSource) newExtraHandleSchemaCol() *expression.Column {
}
}

// addExtraPIDColumn add an extra PID column for partition table.
// 'select ... for update' on a partition table need to know the partition ID
// to construct the lock key, so this column is added to the chunk row.
func (ds *DataSource) addExtraPIDColumn(info *extraPIDInfo) {
pidCol := &expression.Column{
RetType: types.NewFieldType(mysql.TypeLonglong),
UniqueID: ds.ctx.GetSessionVars().AllocPlanColumnID(),
ID: model.ExtraPidColID,
OrigName: fmt.Sprintf("%v.%v.%v", ds.DBName, ds.tableInfo.Name, model.ExtraPartitionIdName),
}

ds.Columns = append(ds.Columns, model.NewExtraPartitionIDColInfo())
schema := ds.Schema()
schema.Append(pidCol)
ds.names = append(ds.names, &types.FieldName{
DBName: ds.DBName,
TblName: ds.TableInfo().Name,
ColName: model.ExtraPartitionIdName,
OrigColName: model.ExtraPartitionIdName,
})
ds.TblCols = append(ds.TblCols, pidCol)

info.Columns = append(info.Columns, pidCol)
info.TblIDs = append(info.TblIDs, ds.TableInfo().ID)
}

var (
pseudoEstimationNotAvailable = metrics.PseudoEstimation.WithLabelValues("nodata")
pseudoEstimationOutdate = metrics.PseudoEstimation.WithLabelValues("outdate")
Expand Down
1 change: 0 additions & 1 deletion planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,6 @@ type PhysicalLock struct {

TblID2Handle map[int64][]HandleCols
PartitionedTable []table.PartitionedTable
ExtraPIDInfo extraPIDInfo
}

// PhysicalLimit is the physical operator of Limit.
Expand Down
Loading