Skip to content

Commit

Permalink
ddl: recycle add index cop chunks correctly (#43067)
Browse files Browse the repository at this point in the history
ref #39468
  • Loading branch information
tangenta authored Apr 19, 2023
1 parent 7dd8ef6 commit 3084ff5
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 10 deletions.
9 changes: 7 additions & 2 deletions ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ddl

import (
"context"
"fmt"
"sync"
"time"

Expand All @@ -32,6 +33,7 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/intest"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
decoder "github.com/pingcap/tidb/util/rowDecoder"
Expand Down Expand Up @@ -343,6 +345,9 @@ func (b *ingestBackfillScheduler) close(force bool) {
}
}
close(b.resultCh)
if intest.InTest && len(b.copReqSenderPool.srcChkPool) != copReadChunkPoolSize() {
panic(fmt.Sprintf("unexpected chunk size %d", len(b.copReqSenderPool.srcChkPool)))
}
if !force {
jobID := b.reorgInfo.ID
indexID := b.reorgInfo.currElement.ID
Expand Down Expand Up @@ -450,13 +455,13 @@ func (w *addIndexIngestWorker) HandleTask(rs idxRecResult) {
defer util.Recover(metrics.LabelDDL, "ingestWorker.HandleTask", func() {
w.resultCh <- &backfillResult{taskID: rs.id, err: dbterror.ErrReorgPanic}
}, false)

defer w.copReqSenderPool.recycleChunk(rs.chunk)
result := &backfillResult{
taskID: rs.id,
err: rs.err,
}
if result.err != nil {
logutil.BgLogger().Error("[ddl-ingest] finish a cop-request task with error",
logutil.BgLogger().Error("[ddl-ingest] encounter error when handle index chunk",
zap.Int("id", rs.id), zap.Error(rs.err))
w.resultCh <- result
return
Expand Down
2 changes: 0 additions & 2 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1611,10 +1611,8 @@ func (w *addIndexIngestWorker) WriteLocal(rs *idxRecResult) (count int, nextKey
vars := w.sessCtx.GetSessionVars()
cnt, lastHandle, err := writeChunkToLocal(w.writer, w.index, copCtx, vars, rs.chunk)
if err != nil || cnt == 0 {
w.copReqSenderPool.recycleChunk(rs.chunk)
return 0, nil, err
}
w.copReqSenderPool.recycleChunk(rs.chunk)
w.metricCounter.Add(float64(cnt))
logSlowOperations(time.Since(oprStartTime), "writeChunkToLocal", 3000)
nextKey = tablecodec.EncodeRecordKey(w.tbl.RecordPrefix(), lastHandle)
Expand Down
6 changes: 5 additions & 1 deletion ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,11 @@ func (c *copReqSender) run() {
if p.checkpointMgr != nil {
p.checkpointMgr.UpdateTotal(task.id, srcChk.NumRows(), done)
}
p.chunkSender.AddTask(idxRecResult{id: task.id, chunk: srcChk, done: done})
idxRs := idxRecResult{id: task.id, chunk: srcChk, done: done}
failpoint.Inject("MockCopSenderError", func() {
idxRs.err = errors.New("mock cop error")
})
p.chunkSender.AddTask(idxRs)
}
terror.Call(rs.Close)
}
Expand Down
3 changes: 2 additions & 1 deletion ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 6,
shard_count = 7,
deps = [
":ingest",
"//config",
Expand All @@ -68,6 +68,7 @@ go_test(
"//sessionctx",
"//testkit",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
],
)
29 changes: 25 additions & 4 deletions ddl/ingest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strings"
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/ingest"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
Expand Down Expand Up @@ -47,10 +48,7 @@ func injectMockBackendMgr(t *testing.T, store kv.Storage) (restore func()) {
func TestAddIndexIngestGeneratedColumns(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
tk.MustExec("use test;")
defer injectMockBackendMgr(t, store)()

assertLastNDDLUseIngest := func(n int) {
Expand Down Expand Up @@ -93,3 +91,26 @@ func TestAddIndexIngestGeneratedColumns(t *testing.T) {
tk.MustQuery("select * from t;").Check(testkit.Rows("1 1 1 2 1", "2 2 2 4 2", "3 3 3 6 3"))
assertLastNDDLUseIngest(4)
}

func TestIngestCopSenderErr(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
defer injectMockBackendMgr(t, store)()

tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 1;")
tk.MustExec("create table t (a int primary key, b int);")
for i := 0; i < 4; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i*10000, i*10000))
}
tk.MustQuery("split table t between (0) and (50000) regions 5;").Check(testkit.Rows("4 1"))

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/MockCopSenderError", "return"))
tk.MustExec("alter table t add index idx(a);")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/MockCopSenderError"))
tk.MustExec("admin check table t;")
rows := tk.MustQuery("admin show ddl jobs 1;").Rows()
//nolint: forcetypeassert
jobTp := rows[0][3].(string)
require.True(t, strings.Contains(jobTp, "txn-merge"), jobTp)
}

0 comments on commit 3084ff5

Please sign in to comment.