Skip to content

Commit

Permalink
ddl: fix inaccurate row_count for admin show ddl jobs (#36716)
Browse files Browse the repository at this point in the history
close #25968
  • Loading branch information
Defined2014 authored Aug 1, 2022
1 parent fc217d4 commit 93a31f0
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 7 deletions.
13 changes: 10 additions & 3 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,18 @@ type reorgBackfillTask struct {
physicalTableID int64
startKey kv.Key
endKey kv.Key
endInclude bool
}

func (r *reorgBackfillTask) String() string {
physicalID := strconv.FormatInt(r.physicalTableID, 10)
startKey := tryDecodeToHandleString(r.startKey)
endKey := tryDecodeToHandleString(r.endKey)
return "physicalTableID_" + physicalID + "_" + "[" + startKey + "," + endKey + "]"
rangeStr := "physicalTableID_" + physicalID + "_" + "[" + startKey + "," + endKey
if r.endInclude {
return rangeStr + "]"
}
return rangeStr + ")"
}

func logSlowOperations(elapsed time.Duration, slowMsg string, threshold uint32) {
Expand Down Expand Up @@ -472,7 +477,7 @@ func (w *worker) sendRangeTaskToWorkers(t table.Table, workers []*backfillWorker
physicalTableID := reorgInfo.PhysicalTableID

// Build reorg tasks.
for _, keyRange := range kvRanges {
for i, keyRange := range kvRanges {
endKey := keyRange.EndKey
endK, err := getRangeEndKey(reorgInfo.d.jobContext(reorgInfo.Job), workers[0].sessCtx.GetStore(), workers[0].priority, t, keyRange.StartKey, endKey)
if err != nil {
Expand All @@ -486,7 +491,9 @@ func (w *worker) sendRangeTaskToWorkers(t table.Table, workers []*backfillWorker
task := &reorgBackfillTask{
physicalTableID: physicalTableID,
startKey: keyRange.StartKey,
endKey: endKey}
endKey: endKey,
// If the boundaries overlap, we should ignore the preceding endKey.
endInclude: endK.Cmp(keyRange.EndKey) != 0 || i == len(kvRanges)-1}
batchTasks = append(batchTasks, task)

if len(batchTasks) >= len(workers) {
Expand Down
7 changes: 5 additions & 2 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1150,7 +1150,11 @@ func (w *updateColumnWorker) fetchRowColVals(txn kv.Transaction, taskRange reorg
logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotRows in updateColumnWorker fetchRowColVals", 0)
oprStartTime = oprEndTime

taskDone = recordKey.Cmp(taskRange.endKey) > 0
if taskRange.endInclude {
taskDone = recordKey.Cmp(taskRange.endKey) > 0
} else {
taskDone = recordKey.Cmp(taskRange.endKey) >= 0
}

if taskDone || len(w.rowRecords) >= w.batchCnt {
return false, nil
Expand All @@ -1161,7 +1165,6 @@ func (w *updateColumnWorker) fetchRowColVals(txn kv.Transaction, taskRange reorg
}
lastAccessedHandle = recordKey
if recordKey.Cmp(taskRange.endKey) == 0 {
// If taskRange.endIncluded == false, we will not reach here when handle == taskRange.endHandle.
taskDone = true
return false, nil
}
Expand Down
7 changes: 5 additions & 2 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,11 @@ func (w *baseIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgBac
logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotRows in baseIndexWorker fetchRowColVals", 0)
oprStartTime = oprEndTime

taskDone = recordKey.Cmp(taskRange.endKey) > 0
if taskRange.endInclude {
taskDone = recordKey.Cmp(taskRange.endKey) > 0
} else {
taskDone = recordKey.Cmp(taskRange.endKey) >= 0
}

if taskDone || len(w.idxRecords) >= w.batchCnt {
return false, nil
Expand All @@ -1146,7 +1150,6 @@ func (w *baseIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgBac
w.cleanRowMap()

if recordKey.Cmp(taskRange.endKey) == 0 {
// If taskRange.endIncluded == false, we will not reach here when handle == taskRange.endHandle
taskDone = true
return false, nil
}
Expand Down
19 changes: 19 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4454,6 +4454,25 @@ func TestAdminShowDDLJobs(t *testing.T) {
require.Equal(t, t2.In(time.UTC), tt.In(time.UTC))
}

func TestAdminShowDDLJobsRowCount(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)

// Test for issue: https://github.com/pingcap/tidb/issues/25968
tk.MustExec("use test")
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (id bigint key,b int);")
tk.MustExec("split table t by (10),(20),(30);")
tk.MustExec("insert into t values (0,0),(10,10),(20,20),(30,30);")
tk.MustExec("alter table t add index idx1(b);")
require.Equal(t, "4", tk.MustQuery("admin show ddl jobs 1").Rows()[0][7])

tk.MustExec("insert into t values (1,0),(2,10),(3,20),(4,30);")
tk.MustExec("alter table t add index idx2(b);")
require.Equal(t, "8", tk.MustQuery("admin show ddl jobs 1").Rows()[0][7])
}

func TestAdminShowDDLJobsInfo(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
Expand Down

0 comments on commit 93a31f0

Please sign in to comment.