Skip to content

Commit

Permalink
ddl: fix duplicate elementID allocation to make sure gc work for part…
Browse files Browse the repository at this point in the history
…ition table (#33726)

close #33620
  • Loading branch information
wjhuang2016 authored Apr 11, 2022
1 parent 11681b4 commit 48efcf6
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 90 deletions.
12 changes: 0 additions & 12 deletions ddl/column_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,12 +497,10 @@ func TestCancelDropColumn(t *testing.T) {
originalHook := dom.DDL().GetHook()
dom.DDL().SetHook(hook)
for i := range testCases {
var c3IdxID int64
testCase = &testCases[i]
if testCase.needAddColumn {
tk.MustExec("alter table test_drop_column add column c3 int")
tk.MustExec("alter table test_drop_column add index idx_c3(c3)")
c3IdxID = external.GetIndexID(t, tk, "test", "test_drop_column", "idx_c3")
}

err := tk.ExecToErr("alter table test_drop_column drop column c3")
Expand Down Expand Up @@ -533,10 +531,6 @@ func TestCancelDropColumn(t *testing.T) {
require.Nil(t, col1)
require.NoError(t, err)
require.EqualError(t, checkErr, admin.ErrCannotCancelDDLJob.GenWithStackByArgs(jobID).Error())
if c3IdxID != 0 {
// Check index is deleted
checkDelRangeAdded(tk, jobID, c3IdxID)
}
}
}
dom.DDL().SetHook(originalHook)
Expand Down Expand Up @@ -601,12 +595,10 @@ func TestCancelDropColumns(t *testing.T) {
originalHook := dom.DDL().GetHook()
dom.DDL().SetHook(hook)
for i := range testCases {
var c3IdxID int64
testCase = &testCases[i]
if testCase.needAddColumn {
tk.MustExec("alter table test_drop_column add column c3 int, add column c4 int")
tk.MustExec("alter table test_drop_column add index idx_c3(c3)")
c3IdxID = external.GetIndexID(t, tk, "test", "test_drop_column", "idx_c3")
}
err := tk.ExecToErr("alter table test_drop_column drop column c3, drop column c4")
tbl := external.GetTableByName(t, tk, "test", "test_drop_column")
Expand Down Expand Up @@ -634,10 +626,6 @@ func TestCancelDropColumns(t *testing.T) {
require.Nil(t, idx3)
require.NoError(t, err)
require.EqualError(t, checkErr, admin.ErrCannotCancelDDLJob.GenWithStackByArgs(jobID).Error())
if c3IdxID != 0 {
// Check index is deleted
checkDelRangeAdded(tk, jobID, c3IdxID)
}
}
}
dom.DDL().SetHook(originalHook)
Expand Down
9 changes: 1 addition & 8 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2754,10 +2754,6 @@ func testPartitionDropIndex(t *testing.T, store kv.Storage, lease time.Duration,
}
tk.MustExec(addIdxSQL)

indexID := external.GetIndexID(t, tk, "test", "partition_drop_idx", idxName)

jobIDExt, reset := setupJobIDExtCallback(tk.Session())
defer reset()
testutil.ExecMultiSQLInGoroutine(store, "test", []string{dropIdxSQL}, done)
ticker := time.NewTicker(lease / 2)
defer ticker.Stop()
Expand All @@ -2780,7 +2776,6 @@ LOOP:
num += step
}
}
checkDelRangeAdded(tk, jobIDExt.jobID, indexID)
tk.MustExec("drop table partition_drop_idx;")
}

Expand Down Expand Up @@ -2833,13 +2828,12 @@ func testPartitionCancelAddIndex(t *testing.T, store kv.Storage, d ddl.DDL, leas
}

var checkErr error
var c3IdxInfo *model.IndexInfo
hook := &ddl.TestDDLCallback{}
originBatchSize := tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size")
// Set batch size to lower try to slow down add-index reorganization, This if for hook to cancel this ddl job.
tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = 32")
defer tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_reorg_batch_size = %v", originBatchSize.Rows()[0][0]))
hook.OnJobUpdatedExported, c3IdxInfo, checkErr = backgroundExecOnJobUpdatedExportedT(t, tk, store, hook, idxName)
hook.OnJobUpdatedExported, _, checkErr = backgroundExecOnJobUpdatedExportedT(t, tk, store, hook, idxName)
originHook := d.GetHook()
defer d.SetHook(originHook)
jobIDExt := wrapJobIDExtCallback(hook)
Expand Down Expand Up @@ -2873,7 +2867,6 @@ LOOP:
times++
}
}
checkDelRangeAdded(tk, jobIDExt.jobID, c3IdxInfo.ID)
tk.MustExec("drop table t1")
}

Expand Down
61 changes: 36 additions & 25 deletions ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package ddl
import (
"context"
"encoding/hex"
"fmt"
"math"
"strings"
"sync"
Expand Down Expand Up @@ -236,14 +237,24 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error {
return nil
}

type elementIDAlloc struct {
id int64
}

func (ea *elementIDAlloc) alloc() int64 {
ea.id++
return ea.id
}

// insertJobIntoDeleteRangeTable parses the job into delete-range arguments,
// and inserts a new record into gc_delete_range table. The primary key is
// job ID, so we ignore key conflict error.
// (job ID, element ID), so we ignore key conflict error.
func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, job *model.Job) error {
now, err := getNowTSO(sctx)
if err != nil {
return errors.Trace(err)
}
var ea elementIDAlloc

s := sctx.(sqlexec.SQLExecutor)
switch job.Type {
Expand All @@ -257,7 +268,7 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
if batchEnd > i+batchInsertDeleteRangeSize {
batchEnd = i + batchInsertDeleteRangeSize
}
if err := doBatchInsert(ctx, s, job.ID, tableIDs[i:batchEnd], now); err != nil {
if err := doBatchInsert(ctx, s, job.ID, tableIDs[i:batchEnd], now, &ea); err != nil {
return errors.Trace(err)
}
}
Expand All @@ -274,15 +285,15 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
for _, pid := range physicalTableIDs {
startKey = tablecodec.EncodeTablePrefix(pid)
endKey := tablecodec.EncodeTablePrefix(pid + 1)
if err := doInsert(ctx, s, job.ID, pid, startKey, endKey, now); err != nil {
if err := doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("partition ID is %d", pid)); err != nil {
return errors.Trace(err)
}
}
return nil
}
startKey = tablecodec.EncodeTablePrefix(tableID)
endKey := tablecodec.EncodeTablePrefix(tableID + 1)
return doInsert(ctx, s, job.ID, tableID, startKey, endKey, now)
return doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("table ID is %d", tableID))
case model.ActionDropTablePartition, model.ActionTruncateTablePartition:
var physicalTableIDs []int64
if err := job.DecodeArgs(&physicalTableIDs); err != nil {
Expand All @@ -291,7 +302,7 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
for _, physicalTableID := range physicalTableIDs {
startKey := tablecodec.EncodeTablePrefix(physicalTableID)
endKey := tablecodec.EncodeTablePrefix(physicalTableID + 1)
if err := doInsert(ctx, s, job.ID, physicalTableID, startKey, endKey, now); err != nil {
if err := doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("partition table ID is %d", physicalTableID)); err != nil {
return errors.Trace(err)
}
}
Expand All @@ -307,14 +318,14 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
for _, pid := range partitionIDs {
startKey := tablecodec.EncodeTableIndexPrefix(pid, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(pid, indexID+1)
if err := doInsert(ctx, s, job.ID, indexID, startKey, endKey, now); err != nil {
if err := doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("partition table ID is %d", pid)); err != nil {
return errors.Trace(err)
}
}
} else {
startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1)
return doInsert(ctx, s, job.ID, indexID, startKey, endKey, now)
return doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("table ID is %d", tableID))
}
case model.ActionDropIndex, model.ActionDropPrimaryKey:
tableID := job.TableID
Expand All @@ -328,14 +339,14 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
for _, pid := range partitionIDs {
startKey := tablecodec.EncodeTableIndexPrefix(pid, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(pid, indexID+1)
if err := doInsert(ctx, s, job.ID, indexID, startKey, endKey, now); err != nil {
if err := doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("partition table ID is %d", pid)); err != nil {
return errors.Trace(err)
}
}
} else {
startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1)
return doInsert(ctx, s, job.ID, indexID, startKey, endKey, now)
return doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("index ID is %d", indexID))
}
case model.ActionDropIndexes:
var indexIDs []int64
Expand All @@ -348,10 +359,10 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
return nil
}
if len(partitionIDs) == 0 {
return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now)
return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, &ea)
}
for _, pID := range partitionIDs {
if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pID, indexIDs, now); err != nil {
if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pID, indexIDs, now, &ea); err != nil {
return errors.Trace(err)
}
}
Expand All @@ -365,12 +376,12 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
if len(indexIDs) > 0 {
if len(partitionIDs) > 0 {
for _, pid := range partitionIDs {
if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now); err != nil {
if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now, &ea); err != nil {
return errors.Trace(err)
}
}
} else {
return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now)
return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, &ea)
}
}
case model.ActionDropColumns:
Expand All @@ -384,12 +395,12 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
if len(indexIDs) > 0 {
if len(partitionIDs) > 0 {
for _, pid := range partitionIDs {
if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now); err != nil {
if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now, &ea); err != nil {
return errors.Trace(err)
}
}
} else {
return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now)
return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, &ea)
}
}
case model.ActionModifyColumn:
Expand All @@ -402,19 +413,19 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
return nil
}
if len(partitionIDs) == 0 {
return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now)
return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, &ea)
}
for _, pid := range partitionIDs {
if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now); err != nil {
if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now, &ea); err != nil {
return errors.Trace(err)
}
}
}
return nil
}

func doBatchDeleteIndiceRange(ctx context.Context, s sqlexec.SQLExecutor, jobID, tableID int64, indexIDs []int64, ts uint64) error {
logutil.BgLogger().Info("[ddl] batch insert into delete-range indices", zap.Int64("jobID", jobID), zap.Int64s("elementIDs", indexIDs))
func doBatchDeleteIndiceRange(ctx context.Context, s sqlexec.SQLExecutor, jobID, tableID int64, indexIDs []int64, ts uint64, ea *elementIDAlloc) error {
logutil.BgLogger().Info("[ddl] batch insert into delete-range indices", zap.Int64("jobID", jobID), zap.Int64("tableID", tableID), zap.Int64s("indexIDs", indexIDs))
paramsList := make([]interface{}, 0, len(indexIDs)*5)
var buf strings.Builder
buf.WriteString(insertDeleteRangeSQLPrefix)
Expand All @@ -427,14 +438,14 @@ func doBatchDeleteIndiceRange(ctx context.Context, s sqlexec.SQLExecutor, jobID,
if i != len(indexIDs)-1 {
buf.WriteString(",")
}
paramsList = append(paramsList, jobID, indexID, startKeyEncoded, endKeyEncoded, ts)
paramsList = append(paramsList, jobID, ea.alloc(), startKeyEncoded, endKeyEncoded, ts)
}
_, err := s.ExecuteInternal(ctx, buf.String(), paramsList...)
return errors.Trace(err)
}

func doInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID int64, elementID int64, startKey, endKey kv.Key, ts uint64) error {
logutil.BgLogger().Info("[ddl] insert into delete-range table", zap.Int64("jobID", jobID), zap.Int64("elementID", elementID))
func doInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID, elementID int64, startKey, endKey kv.Key, ts uint64, comment string) error {
logutil.BgLogger().Info("[ddl] insert into delete-range table", zap.Int64("jobID", jobID), zap.Int64("elementID", elementID), zap.String("comment", comment))
startKeyEncoded := hex.EncodeToString(startKey)
endKeyEncoded := hex.EncodeToString(endKey)
// set session disk full opt
Expand All @@ -446,8 +457,8 @@ func doInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID int64, elementID
return errors.Trace(err)
}

func doBatchInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID int64, tableIDs []int64, ts uint64) error {
logutil.BgLogger().Info("[ddl] batch insert into delete-range table", zap.Int64("jobID", jobID), zap.Int64s("elementIDs", tableIDs))
func doBatchInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID int64, tableIDs []int64, ts uint64, ea *elementIDAlloc) error {
logutil.BgLogger().Info("[ddl] batch insert into delete-range table", zap.Int64("jobID", jobID), zap.Int64s("tableIDs", tableIDs))
var buf strings.Builder
buf.WriteString(insertDeleteRangeSQLPrefix)
paramsList := make([]interface{}, 0, len(tableIDs)*5)
Expand All @@ -460,7 +471,7 @@ func doBatchInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID int64, tabl
if i != len(tableIDs)-1 {
buf.WriteString(",")
}
paramsList = append(paramsList, jobID, tableID, startKeyEncoded, endKeyEncoded, ts)
paramsList = append(paramsList, jobID, ea.alloc(), startKeyEncoded, endKeyEncoded, ts)
}
// set session disk full opt
s.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
Expand Down
18 changes: 6 additions & 12 deletions ddl/index_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,6 @@ func testCancelAddIndex(t *testing.T, store kv.Storage, dom *domain.Domain, idxN
batchInsert(tk, "t1", i, i+defaultBatchSize)
}

var c3IdxInfo *model.IndexInfo
hook := &ddl.TestDDLCallback{Do: dom}
originBatchSize := tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size")
// Set batch size to lower try to slow down add-index reorganization, This if for hook to cancel this ddl job.
Expand All @@ -725,7 +724,7 @@ func testCancelAddIndex(t *testing.T, store kv.Storage, dom *domain.Domain, idxN
// the hook.OnJobUpdatedExported is called when the job is updated, runReorgJob will wait ddl.ReorgWaitTimeout, then return the ddl.runDDLJob.
// After that ddl call d.hook.OnJobUpdated(job), so that we can canceled the job in this test case.
var checkErr error
hook.OnJobUpdatedExported, c3IdxInfo, checkErr = backgroundExecOnJobUpdatedExported(t, tk, store, hook, idxName)
hook.OnJobUpdatedExported, _, checkErr = backgroundExecOnJobUpdatedExported(t, tk, store, hook, idxName)
originalHook := d.GetHook()
jobIDExt := wrapJobIDExtCallback(hook)
d.SetHook(jobIDExt)
Expand Down Expand Up @@ -757,7 +756,6 @@ LOOP:
times++
}
}
checkDelRangeAdded(tk, jobIDExt.jobID, c3IdxInfo.ID)
d.SetHook(originalHook)
}

Expand Down Expand Up @@ -1059,8 +1057,6 @@ func testDropIndexes(t *testing.T, store kv.Storage, createSQL, dropIdxSQL strin
for _, idxName := range idxNames {
idxIDs = append(idxIDs, external.GetIndexID(t, tk, "test", "test_drop_indexes", idxName))
}
jobIDExt, reset := setupJobIDExtCallback(tk.Session())
defer reset()
testddlutil.SessionExecInGoroutine(store, "test", dropIdxSQL, done)

ticker := time.NewTicker(indexModifyLease / 2)
Expand All @@ -1084,9 +1080,6 @@ LOOP:
num += step
}
}
for _, idxID := range idxIDs {
checkDelRangeAdded(tk, jobIDExt.jobID, idxID)
}
}

func testDropIndexesIfExists(t *testing.T, store kv.Storage) {
Expand Down Expand Up @@ -1128,6 +1121,11 @@ func testDropIndexesFromPartitionedTable(t *testing.T, store kv.Storage) {
tk.MustExec("insert into test_drop_indexes_from_partitioned_table values (?, ?, ?)", i, i, i)
}
tk.MustExec("alter table test_drop_indexes_from_partitioned_table drop index i1, drop index if exists i2;")
tk.MustExec("alter table test_drop_indexes_from_partitioned_table add index i1(c1)")
tk.MustExec("alter table test_drop_indexes_from_partitioned_table drop index i1, drop index if exists i1;")
tk.MustExec("alter table test_drop_indexes_from_partitioned_table drop column c1, drop column c2;")
tk.MustExec("alter table test_drop_indexes_from_partitioned_table add column c1 int")
tk.MustExec("alter table test_drop_indexes_from_partitioned_table drop column c1, drop column if exists c1;")
}

func testCancelDropIndexes(t *testing.T, store kv.Storage, d ddl.DDL) {
Expand Down Expand Up @@ -1255,9 +1253,6 @@ func testDropIndex(t *testing.T, store kv.Storage, createSQL, dropIdxSQL, idxNam
for i := 0; i < num; i++ {
tk.MustExec("insert into test_drop_index values (?, ?, ?)", i, i, i)
}
indexID := external.GetIndexID(t, tk, "test", "test_drop_index", idxName)
jobIDExt, reset := setupJobIDExtCallback(tk.Session())
defer reset()
testddlutil.SessionExecInGoroutine(store, "test", dropIdxSQL, done)

ticker := time.NewTicker(indexModifyLease / 2)
Expand Down Expand Up @@ -1285,7 +1280,6 @@ LOOP:
rows := tk.MustQuery("explain select c1 from test_drop_index where c3 >= 0")
require.NotContains(t, fmt.Sprintf("%v", rows), idxName)

checkDelRangeAdded(tk, jobIDExt.jobID, indexID)
tk.MustExec("drop table test_drop_index")
}

Expand Down
Loading

0 comments on commit 48efcf6

Please sign in to comment.