diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 317124d0b8d19..e32606207082e 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -394,6 +394,13 @@ func openDuplicateDB(storeDir string) (*pebble.DB, error) { return pebble.Open(dbPath, opts) } +var ( + // RunInTest indicates whether the current process is running in test. + RunInTest bool + // LastAlloc is the last ID allocator. + LastAlloc manual.Allocator +) + // NewLocalBackend creates new connections to tikv. func NewLocalBackend( ctx context.Context, @@ -461,6 +468,11 @@ func NewLocalBackend( } else { writeLimiter = noopStoreWriteLimiter{} } + alloc := manual.Allocator{} + if RunInTest { + alloc.RefCnt = new(atomic.Int64) + LastAlloc = alloc + } local := &local{ engines: sync.Map{}, pdCtl: pdCtl, @@ -486,7 +498,7 @@ func NewLocalBackend( keyAdapter: keyAdapter, errorMgr: errorMgr, importClientFactory: importClientFactory, - bufferPool: membuf.NewPool(membuf.WithAllocator(manual.Allocator{})), + bufferPool: membuf.NewPool(membuf.WithAllocator(alloc)), writeLimiter: writeLimiter, logger: log.FromContext(ctx), encBuilder: NewEncodingBuilder(ctx), diff --git a/br/pkg/lightning/manual/BUILD.bazel b/br/pkg/lightning/manual/BUILD.bazel index 6d1fc18dd2495..d54902a23c066 100644 --- a/br/pkg/lightning/manual/BUILD.bazel +++ b/br/pkg/lightning/manual/BUILD.bazel @@ -10,4 +10,5 @@ go_library( cgo = True, importpath = "github.com/pingcap/tidb/br/pkg/lightning/manual", visibility = ["//visibility:public"], + deps = ["@org_uber_go_atomic//:atomic"], ) diff --git a/br/pkg/lightning/manual/allocator.go b/br/pkg/lightning/manual/allocator.go index 821eb750c5030..18aa8cc9353c4 100644 --- a/br/pkg/lightning/manual/allocator.go +++ b/br/pkg/lightning/manual/allocator.go @@ -14,8 +14,33 @@ package manual -type Allocator struct{} +import ( + "fmt" -func (Allocator) Alloc(n int) []byte { return New(n) } + "go.uber.org/atomic" +) -func (Allocator) Free(b []byte) { Free(b) } +type Allocator struct { + RefCnt *atomic.Int64 +} + +func (a Allocator) Alloc(n int) []byte { + if a.RefCnt != nil { + a.RefCnt.Add(1) + } + return New(n) +} + +func (a Allocator) Free(b []byte) { + if a.RefCnt != nil { + a.RefCnt.Add(-1) + } + Free(b) +} + +func (a Allocator) CheckRefCnt() error { + if a.RefCnt != nil && a.RefCnt.Load() != 0 { + return fmt.Errorf("memory leak detected, refCnt: %d", a.RefCnt.Load()) + } + return nil +} diff --git a/ddl/ingest/engine.go b/ddl/ingest/engine.go index d875d78e346d0..5bf32762dcc22 100644 --- a/ddl/ingest/engine.go +++ b/ddl/ingest/engine.go @@ -83,6 +83,11 @@ func (ei *engineInfo) Clean() { zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) } ei.openedEngine = nil + err = ei.closeWriters() + if err != nil { + logutil.BgLogger().Error(LitErrCloseWriterErr, zap.Error(err), + zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) + } // Here the local intermediate files will be removed. err = closedEngine.Cleanup(ei.ctx) if err != nil { @@ -102,8 +107,14 @@ func (ei *engineInfo) ImportAndClean() error { return errors.New(LitErrCloseEngineErr) } ei.openedEngine = nil + err := ei.closeWriters() + if err != nil { + logutil.BgLogger().Error(LitErrCloseWriterErr, zap.Error(err), + zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) + return err + } - err := ei.diskRoot.UpdateUsageAndQuota() + err = ei.diskRoot.UpdateUsageAndQuota() if err != nil { logutil.BgLogger().Error(LitErrUpdateDiskStats, zap.Error(err), zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) @@ -182,6 +193,22 @@ func (ei *engineInfo) newWriterContext(workerID int) (*WriterContext, error) { }, nil } +func (ei *engineInfo) closeWriters() error { + var firstErr error + for wid := range ei.writerCache.Keys() { + if w, ok := ei.writerCache.Load(wid); ok { + _, err := w.Close(ei.ctx) + if err != nil { + if firstErr == nil { + firstErr = err + } + } + } + ei.writerCache.Delete(wid) + } + return firstErr +} + // WriteRow Write one row into local writer buffer. func (wCtx *WriterContext) WriteRow(key, idxVal []byte) error { kvs := make([]common.KvPair, 1) diff --git a/ddl/ingest/message.go b/ddl/ingest/message.go index 1a68541d68f0c..ac2a213e81f9f 100644 --- a/ddl/ingest/message.go +++ b/ddl/ingest/message.go @@ -57,6 +57,7 @@ const ( LitInfoChgMemSetting string = "[ddl-ingest] change memory setting for lightning" LitInfoInitMemSetting string = "[ddl-ingest] initial memory setting for lightning" LitInfoUnsafeImport string = "[ddl-ingest] do a partial import data into the storage" + LitErrCloseWriterErr string = "[ddl-ingest] close writer error" ) func genBackendAllocMemFailedErr(memRoot MemRoot, jobID int64) error { diff --git a/tests/realtikvtest/addindextest/BUILD.bazel b/tests/realtikvtest/addindextest/BUILD.bazel index f6bb2f0844404..62032320d03de 100644 --- a/tests/realtikvtest/addindextest/BUILD.bazel +++ b/tests/realtikvtest/addindextest/BUILD.bazel @@ -33,6 +33,7 @@ go_test( ], embed = [":addindextest"], deps = [ + "//br/pkg/lightning/backend/local", "//config", "//ddl", "//ddl/ingest", diff --git a/tests/realtikvtest/addindextest/integration_test.go b/tests/realtikvtest/addindextest/integration_test.go index c29c6929fa8c3..35a636bec7be4 100644 --- a/tests/realtikvtest/addindextest/integration_test.go +++ b/tests/realtikvtest/addindextest/integration_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/br/pkg/lightning/backend/local" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/ddl/ingest" "github.com/pingcap/tidb/ddl/testutil" @@ -38,6 +39,8 @@ func TestAddIndexIngestMemoryUsage(t *testing.T) { tk.MustExec("use addindexlit;") tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + local.RunInTest = true + tk.MustExec("create table t (a int, b int, c int);") var sb strings.Builder sb.WriteString("insert into t values ") @@ -55,6 +58,7 @@ func TestAddIndexIngestMemoryUsage(t *testing.T) { tk.MustExec("alter table t add unique index idx1(b);") tk.MustExec("admin check table t;") require.Equal(t, int64(0), ingest.LitMemRoot.CurrentUsage()) + require.NoError(t, local.LastAlloc.CheckRefCnt()) } func TestAddIndexIngestLimitOneBackend(t *testing.T) {