From a3efafe2c2fc6ab9e69fac37afd8b84aa638386d Mon Sep 17 00:00:00 2001 From: EasonBall <592838129@qq.com> Date: Mon, 25 Sep 2023 19:06:27 +0800 Subject: [PATCH] disttask/ddl: change memory quota for writer (#47251) ref pingcap/tidb#45719 --- ddl/backfilling_operators.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/ddl/backfilling_operators.go b/ddl/backfilling_operators.go index 22c19ef46bba6..28d0c009a7ad9 100644 --- a/ddl/backfilling_operators.go +++ b/ddl/backfilling_operators.go @@ -45,6 +45,7 @@ import ( "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/memory" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -180,9 +181,14 @@ func NewWriteIndexToExternalStoragePipeline(ctx *OperatorCtx, store kv.Storage, shareMu = bcctx.GetLocalBackend().GetMutex() } + memTotal, err := memory.MemTotal() + if err != nil { + return nil, err + } + srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey) scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt) - writeOp := NewWriteExternalStoreOperator(ctx, copCtx, sessPool, jobID, subtaskID, tbl, index, extStore, srcChkPool, writerCnt, onClose, shareMu) + writeOp := NewWriteExternalStoreOperator(ctx, copCtx, sessPool, jobID, subtaskID, tbl, index, extStore, srcChkPool, writerCnt, onClose, shareMu, memTotal/2) sinkOp := newIndexWriteResultSink(ctx, nil, tbl, index, totalRowCount, metricCounter) operator.Compose[TableScanTask](srcOp, scanOp) @@ -449,7 +455,7 @@ type WriteExternalStoreOperator struct { // NewWriteExternalStoreOperator creates a new WriteExternalStoreOperator. func NewWriteExternalStoreOperator(ctx *OperatorCtx, copCtx *CopContext, sessPool opSessPool, jobID int64, subtaskID int64, tbl table.PhysicalTable, index table.Index, store storage.ExternalStorage, srcChunkPool chan *chunk.Chunk, - concurrency int, onClose external.OnCloseFunc, shareMu *sync.Mutex) *WriteExternalStoreOperator { + concurrency int, onClose external.OnCloseFunc, shareMu *sync.Mutex, memoryQuota uint64) *WriteExternalStoreOperator { pool := workerpool.NewWorkerPool( "WriteExternalStoreOperator", util.DDL, @@ -457,7 +463,9 @@ func NewWriteExternalStoreOperator(ctx *OperatorCtx, copCtx *CopContext, sessPoo func() workerpool.Worker[IndexRecordChunk, IndexWriteResult] { builder := external.NewWriterBuilder(). SetOnCloseFunc(onClose). - SetKeyDuplicationEncoding(index.Meta().Unique).SetMutex(shareMu) + SetKeyDuplicationEncoding(index.Meta().Unique). + SetMutex(shareMu). + SetMemorySizeLimit(memoryQuota) writerID := uuid.New().String() prefix := path.Join(strconv.Itoa(int(jobID)), strconv.Itoa(int(subtaskID))) writer := builder.Build(store, prefix, writerID)