From 90a739821cfdd3cbc4dc5447da866a317d7cfd8b Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 13 Dec 2022 20:10:51 +0800 Subject: [PATCH 1/2] ddl: close lightning writers after the import is complete (#39879) close pingcap/tidb#39800 --- br/pkg/lightning/backend/local/local.go | 14 ++++++++- br/pkg/lightning/manual/BUILD.bazel | 1 + br/pkg/lightning/manual/allocator.go | 31 +++++++++++++++++-- ddl/ingest/engine.go | 29 ++++++++++++++++- ddl/ingest/message.go | 1 + tests/realtikvtest/addindextest/BUILD.bazel | 1 + .../addindextest/integration_test.go | 4 +++ 7 files changed, 76 insertions(+), 5 deletions(-) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 317124d0b8d19..e32606207082e 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -394,6 +394,13 @@ func openDuplicateDB(storeDir string) (*pebble.DB, error) { return pebble.Open(dbPath, opts) } +var ( + // RunInTest indicates whether the current process is running in test. + RunInTest bool + // LastAlloc is the last ID allocator. + LastAlloc manual.Allocator +) + // NewLocalBackend creates new connections to tikv. func NewLocalBackend( ctx context.Context, @@ -461,6 +468,11 @@ func NewLocalBackend( } else { writeLimiter = noopStoreWriteLimiter{} } + alloc := manual.Allocator{} + if RunInTest { + alloc.RefCnt = new(atomic.Int64) + LastAlloc = alloc + } local := &local{ engines: sync.Map{}, pdCtl: pdCtl, @@ -486,7 +498,7 @@ func NewLocalBackend( keyAdapter: keyAdapter, errorMgr: errorMgr, importClientFactory: importClientFactory, - bufferPool: membuf.NewPool(membuf.WithAllocator(manual.Allocator{})), + bufferPool: membuf.NewPool(membuf.WithAllocator(alloc)), writeLimiter: writeLimiter, logger: log.FromContext(ctx), encBuilder: NewEncodingBuilder(ctx), diff --git a/br/pkg/lightning/manual/BUILD.bazel b/br/pkg/lightning/manual/BUILD.bazel index 6d1fc18dd2495..d54902a23c066 100644 --- a/br/pkg/lightning/manual/BUILD.bazel +++ b/br/pkg/lightning/manual/BUILD.bazel @@ -10,4 +10,5 @@ go_library( cgo = True, importpath = "github.com/pingcap/tidb/br/pkg/lightning/manual", visibility = ["//visibility:public"], + deps = ["@org_uber_go_atomic//:atomic"], ) diff --git a/br/pkg/lightning/manual/allocator.go b/br/pkg/lightning/manual/allocator.go index 821eb750c5030..18aa8cc9353c4 100644 --- a/br/pkg/lightning/manual/allocator.go +++ b/br/pkg/lightning/manual/allocator.go @@ -14,8 +14,33 @@ package manual -type Allocator struct{} +import ( + "fmt" -func (Allocator) Alloc(n int) []byte { return New(n) } + "go.uber.org/atomic" +) -func (Allocator) Free(b []byte) { Free(b) } +type Allocator struct { + RefCnt *atomic.Int64 +} + +func (a Allocator) Alloc(n int) []byte { + if a.RefCnt != nil { + a.RefCnt.Add(1) + } + return New(n) +} + +func (a Allocator) Free(b []byte) { + if a.RefCnt != nil { + a.RefCnt.Add(-1) + } + Free(b) +} + +func (a Allocator) CheckRefCnt() error { + if a.RefCnt != nil && a.RefCnt.Load() != 0 { + return fmt.Errorf("memory leak detected, refCnt: %d", a.RefCnt.Load()) + } + return nil +} diff --git a/ddl/ingest/engine.go b/ddl/ingest/engine.go index 8392674c1eae6..c7ed29a71d017 100644 --- a/ddl/ingest/engine.go +++ b/ddl/ingest/engine.go @@ -82,6 +82,11 @@ func (ei *engineInfo) Clean() { zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) } ei.openedEngine = nil + err = ei.closeWriters() + if err != nil { + logutil.BgLogger().Error(LitErrCloseWriterErr, zap.Error(err), + zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) + } // Here the local intermediate files will be removed. err = closedEngine.Cleanup(ei.ctx) if err != nil { @@ -101,8 +106,14 @@ func (ei *engineInfo) ImportAndClean() error { return err1 } ei.openedEngine = nil + err := ei.closeWriters() + if err != nil { + logutil.BgLogger().Error(LitErrCloseWriterErr, zap.Error(err), + zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) + return err + } - err := ei.diskRoot.UpdateUsageAndQuota() + err = ei.diskRoot.UpdateUsageAndQuota() if err != nil { logutil.BgLogger().Error(LitErrUpdateDiskStats, zap.Error(err), zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) @@ -181,6 +192,22 @@ func (ei *engineInfo) newWriterContext(workerID int) (*WriterContext, error) { }, nil } +func (ei *engineInfo) closeWriters() error { + var firstErr error + for wid := range ei.writerCache.Keys() { + if w, ok := ei.writerCache.Load(wid); ok { + _, err := w.Close(ei.ctx) + if err != nil { + if firstErr == nil { + firstErr = err + } + } + } + ei.writerCache.Delete(wid) + } + return firstErr +} + // WriteRow Write one row into local writer buffer. func (wCtx *WriterContext) WriteRow(key, idxVal []byte) error { kvs := make([]common.KvPair, 1) diff --git a/ddl/ingest/message.go b/ddl/ingest/message.go index 0828d68796ba4..4996aab49a415 100644 --- a/ddl/ingest/message.go +++ b/ddl/ingest/message.go @@ -54,6 +54,7 @@ const ( LitInfoChgMemSetting string = "[ddl-ingest] change memory setting for ingest" LitInfoInitMemSetting string = "[ddl-ingest] initial memory setting for ingest" LitInfoUnsafeImport string = "[ddl-ingest] do a partial import data into the storage" + LitErrCloseWriterErr string = "[ddl-ingest] close writer error" ) func genBackendAllocMemFailedErr(memRoot MemRoot, jobID int64) error { diff --git a/tests/realtikvtest/addindextest/BUILD.bazel b/tests/realtikvtest/addindextest/BUILD.bazel index 1ca10f9db34f2..a2e9c9906380b 100644 --- a/tests/realtikvtest/addindextest/BUILD.bazel +++ b/tests/realtikvtest/addindextest/BUILD.bazel @@ -33,6 +33,7 @@ go_test( ], embed = [":addindextest"], deps = [ + "//br/pkg/lightning/backend/local", "//config", "//ddl", "//ddl/ingest", diff --git a/tests/realtikvtest/addindextest/integration_test.go b/tests/realtikvtest/addindextest/integration_test.go index 5567113696810..352dc83a1d1a2 100644 --- a/tests/realtikvtest/addindextest/integration_test.go +++ b/tests/realtikvtest/addindextest/integration_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/br/pkg/lightning/backend/local" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/ddl/ingest" "github.com/pingcap/tidb/ddl/testutil" @@ -44,6 +45,8 @@ func TestAddIndexIngestMemoryUsage(t *testing.T) { tk.MustExec("use addindexlit;") tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + local.RunInTest = true + tk.MustExec("create table t (a int, b int, c int);") var sb strings.Builder sb.WriteString("insert into t values ") @@ -61,6 +64,7 @@ func TestAddIndexIngestMemoryUsage(t *testing.T) { tk.MustExec("alter table t add unique index idx1(b);") tk.MustExec("admin check table t;") require.Equal(t, int64(0), ingest.LitMemRoot.CurrentUsage()) + require.NoError(t, local.LastAlloc.CheckRefCnt()) } func TestAddIndexIngestLimitOneBackend(t *testing.T) { From d0d6955a77ba60e9ee339e34112202080e2c0c66 Mon Sep 17 00:00:00 2001 From: you06 Date: Tue, 13 Dec 2022 20:52:52 +0800 Subject: [PATCH 2/2] store/copr: handle region error from client (#39838) ref pingcap/tidb#39361 --- store/copr/coprocessor.go | 189 +++++++++++++++++++++++++++++-------- store/copr/region_cache.go | 4 +- 2 files changed, 154 insertions(+), 39 deletions(-) diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index a4cea914c3aa9..390c5ffe8e63a 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -140,7 +140,7 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars // disable batch copr for follower read req.StoreBatchSize = 0 } - // disable paging for batch copr + // disable batch copr when paging is enabled. if req.Paging.Enable { req.StoreBatchSize = 0 } @@ -315,13 +315,13 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv chanSize = 18 } - tasks := make([]*copTask, 0, len(locs)) - origRangeIdx := 0 - taskID := uint64(0) - var store2Idx map[uint64]int + var builder taskBuilder if req.StoreBatchSize > 0 { - store2Idx = make(map[uint64]int, 16) + builder = newBatchTaskBuilder(bo, req, cache) + } else { + builder = newLegacyTaskBuilder(len(locs)) } + origRangeIdx := 0 for _, loc := range locs { // TiKV will return gRPC error if the message is too large. So we need to limit the length of the ranges slice // to make sure the message can be sent successfully. @@ -357,7 +357,6 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv } } task := &copTask{ - taskID: taskID, region: loc.Location.Region, bucketsVer: loc.getBucketVersion(), ranges: loc.Ranges.Slice(i, nextI), @@ -370,50 +369,138 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv requestSource: req.RequestSource, RowCountHint: hint, } - if req.StoreBatchSize > 0 { - batchedTask, err := cache.BuildBatchTask(bo, task, req.ReplicaRead) - if err != nil { - return nil, err - } - if idx, ok := store2Idx[batchedTask.storeID]; !ok || len(tasks[idx].batchTaskList) >= req.StoreBatchSize { - tasks = append(tasks, batchedTask.task) - store2Idx[batchedTask.storeID] = len(tasks) - 1 - } else { - if tasks[idx].batchTaskList == nil { - tasks[idx].batchTaskList = make(map[uint64]*batchedCopTask, req.StoreBatchSize) - // disable paging for batched task. - tasks[idx].paging = false - tasks[idx].pagingSize = 0 - } - if task.RowCountHint > 0 { - tasks[idx].RowCountHint += task.RowCountHint - } - tasks[idx].batchTaskList[taskID] = batchedTask - } - } else { - tasks = append(tasks, task) + if err = builder.handle(task); err != nil { + return nil, err } i = nextI if req.Paging.Enable { pagingSize = paging.GrowPagingSize(pagingSize, req.Paging.MaxPagingSize) } - taskID++ } } if req.Desc { - reverseTasks(tasks) + builder.reverse() } + tasks := builder.build() if elapsed := time.Since(start); elapsed > time.Millisecond*500 { logutil.BgLogger().Warn("buildCopTasks takes too much time", zap.Duration("elapsed", elapsed), zap.Int("range len", rangesLen), zap.Int("task len", len(tasks))) } - metrics.TxnRegionsNumHistogramWithCoprocessor.Observe(float64(len(tasks))) + metrics.TxnRegionsNumHistogramWithCoprocessor.Observe(float64(builder.regionNum())) return tasks, nil } +type taskBuilder interface { + handle(*copTask) error + reverse() + build() []*copTask + regionNum() int +} + +type legacyTaskBuilder struct { + tasks []*copTask +} + +func newLegacyTaskBuilder(hint int) *legacyTaskBuilder { + return &legacyTaskBuilder{ + tasks: make([]*copTask, 0, hint), + } +} + +func (b *legacyTaskBuilder) handle(task *copTask) error { + b.tasks = append(b.tasks, task) + return nil +} + +func (b *legacyTaskBuilder) regionNum() int { + return len(b.tasks) +} + +func (b *legacyTaskBuilder) reverse() { + reverseTasks(b.tasks) +} + +func (b *legacyTaskBuilder) build() []*copTask { + return b.tasks +} + +type batchStoreTaskBuilder struct { + bo *Backoffer + req *kv.Request + cache *RegionCache + taskID uint64 + limit int + store2Idx map[uint64]int + tasks []*copTask +} + +func newBatchTaskBuilder(bo *Backoffer, req *kv.Request, cache *RegionCache) *batchStoreTaskBuilder { + return &batchStoreTaskBuilder{ + bo: bo, + req: req, + cache: cache, + taskID: 0, + limit: req.StoreBatchSize, + store2Idx: make(map[uint64]int, 16), + tasks: make([]*copTask, 0, 16), + } +} + +func (b *batchStoreTaskBuilder) handle(task *copTask) (err error) { + b.taskID++ + task.taskID = b.taskID + handled := false + defer func() { + if !handled && err == nil { + // fallback to non-batch way. It's mainly caused by region miss. + b.tasks = append(b.tasks, task) + } + }() + if b.limit <= 0 { + return nil + } + batchedTask, err := b.cache.BuildBatchTask(b.bo, task, b.req.ReplicaRead) + if err != nil { + return err + } + if batchedTask == nil { + return nil + } + if idx, ok := b.store2Idx[batchedTask.storeID]; !ok || len(b.tasks[idx].batchTaskList) >= b.limit { + b.tasks = append(b.tasks, batchedTask.task) + b.store2Idx[batchedTask.storeID] = len(b.tasks) - 1 + } else { + if b.tasks[idx].batchTaskList == nil { + b.tasks[idx].batchTaskList = make(map[uint64]*batchedCopTask, b.limit) + // disable paging for batched task. + b.tasks[idx].paging = false + b.tasks[idx].pagingSize = 0 + } + if task.RowCountHint > 0 { + b.tasks[idx].RowCountHint += task.RowCountHint + } + b.tasks[idx].batchTaskList[task.taskID] = batchedTask + } + handled = true + return nil +} + +func (b *batchStoreTaskBuilder) regionNum() int { + // we allocate b.taskID for each region task, so the final b.taskID is equal to the related region number. + return int(b.taskID) +} + +func (b *batchStoreTaskBuilder) reverse() { + reverseTasks(b.tasks) +} + +func (b *batchStoreTaskBuilder) build() []*copTask { + return b.tasks +} + func buildTiDBMemCopTasks(ranges *KeyRanges, req *kv.Request) ([]*copTask, error) { servers, err := infosync.GetAllServerInfo(context.Background()) if err != nil { @@ -1138,13 +1225,13 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R if err != nil { return remains, err } - return worker.handleBatchRemainsOnErr(bo, remains, resp.pbResp.BatchResponses, task, ch) + return worker.handleBatchRemainsOnErr(bo, remains, resp.pbResp.GetBatchResponses(), task, ch) } if lockErr := resp.pbResp.GetLocked(); lockErr != nil { if err := worker.handleLockErr(bo, lockErr, task); err != nil { return nil, err } - return worker.handleBatchRemainsOnErr(bo, []*copTask{task}, resp.pbResp.BatchResponses, task, ch) + return worker.handleBatchRemainsOnErr(bo, []*copTask{task}, resp.pbResp.GetBatchResponses(), task, ch) } if otherErr := resp.pbResp.GetOtherError(); otherErr != "" { err := errors.Errorf("other error: %s", otherErr) @@ -1250,16 +1337,26 @@ func (worker *copIteratorWorker) handleBatchRemainsOnErr(bo *Backoffer, remains } // handle the batched cop response. +// tasks will be changed, so the input tasks should not be used after calling this function. func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResps []*coprocessor.StoreBatchTaskResponse, tasks map[uint64]*batchedCopTask, ch chan<- *copResponse) ([]*copTask, error) { if len(tasks) == 0 { return nil, nil } var remainTasks []*copTask + appendRemainTasks := func(tasks ...*copTask) { + if remainTasks == nil { + // allocate size fo remain length + remainTasks = make([]*copTask, 0, len(tasks)) + } + remainTasks = append(remainTasks, tasks...) + } for _, batchResp := range batchResps { - batchedTask, ok := tasks[batchResp.GetTaskId()] + taskID := batchResp.GetTaskId() + batchedTask, ok := tasks[taskID] if !ok { return nil, errors.Errorf("task id %d not found", batchResp.GetTaskId()) } + delete(tasks, taskID) resp := &copResponse{ pbResp: &coprocessor.Response{ Data: batchResp.Data, @@ -1276,7 +1373,7 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResp if err != nil { return nil, err } - remainTasks = append(remainTasks, remains...) + appendRemainTasks(remains...) continue } //TODO: handle locks in batch @@ -1284,7 +1381,7 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResp if err := worker.handleLockErr(bo, resp.pbResp.GetLocked(), task); err != nil { return nil, err } - remainTasks = append(remainTasks, task) + appendRemainTasks(task) continue } if otherErr := batchResp.GetOtherError(); otherErr != "" { @@ -1312,6 +1409,24 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResp // TODO: check OOM worker.sendToRespCh(resp, ch, false) } + for _, t := range tasks { + task := t.task + // when the error is generated by client, response is empty, skip warning for this case. + if len(batchResps) != 0 { + firstRangeStartKey := task.ranges.At(0).StartKey + lastRangeEndKey := task.ranges.At(task.ranges.Len() - 1).EndKey + logutil.Logger(bo.GetCtx()).Error("response of batched task missing", + zap.Uint64("id", task.taskID), + zap.Uint64("txnStartTS", worker.req.StartTs), + zap.Uint64("regionID", task.region.GetID()), + zap.Uint64("bucketsVer", task.bucketsVer), + zap.Int("rangeNums", task.ranges.Len()), + zap.ByteString("firstRangeStartKey", firstRangeStartKey), + zap.ByteString("lastRangeEndKey", lastRangeEndKey), + zap.String("storeAddr", task.storeAddr)) + } + appendRemainTasks(t.task) + } return remainTasks, nil } diff --git a/store/copr/region_cache.go b/store/copr/region_cache.go index a3fd20e036d43..97c3d705c223b 100644 --- a/store/copr/region_cache.go +++ b/store/copr/region_cache.go @@ -18,7 +18,6 @@ import ( "bytes" "strconv" - "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" @@ -210,8 +209,9 @@ func (c *RegionCache) BuildBatchTask(bo *Backoffer, task *copTask, replicaRead k if err != nil { return nil, err } + // fallback to non-batch path if rpcContext == nil { - return nil, errors.Errorf("region %s missing", task.region.String()) + return nil, nil } return &batchedCopTask{ task: task,