Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
ywqzzy committed Sep 25, 2023
1 parent 5f8be45 commit 71f0b49
Showing 1 changed file with 11 additions and 3 deletions.
14 changes: 11 additions & 3 deletions ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -180,9 +181,14 @@ func NewWriteIndexToExternalStoragePipeline(ctx *OperatorCtx, store kv.Storage,
shareMu = bcctx.GetLocalBackend().GetMutex()
}

memTotal, err := memory.MemTotal()
if err != nil {
return nil, err
}

srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt)
writeOp := NewWriteExternalStoreOperator(ctx, copCtx, sessPool, jobID, subtaskID, tbl, index, extStore, srcChkPool, writerCnt, onClose, shareMu)
writeOp := NewWriteExternalStoreOperator(ctx, copCtx, sessPool, jobID, subtaskID, tbl, index, extStore, srcChkPool, writerCnt, onClose, shareMu, memTotal/2)
sinkOp := newIndexWriteResultSink(ctx, nil, tbl, index, totalRowCount, metricCounter)

operator.Compose[TableScanTask](srcOp, scanOp)
Expand Down Expand Up @@ -449,15 +455,17 @@ type WriteExternalStoreOperator struct {
// NewWriteExternalStoreOperator creates a new WriteExternalStoreOperator.
func NewWriteExternalStoreOperator(ctx *OperatorCtx, copCtx *CopContext, sessPool opSessPool, jobID int64,
subtaskID int64, tbl table.PhysicalTable, index table.Index, store storage.ExternalStorage, srcChunkPool chan *chunk.Chunk,
concurrency int, onClose external.OnCloseFunc, shareMu *sync.Mutex) *WriteExternalStoreOperator {
concurrency int, onClose external.OnCloseFunc, shareMu *sync.Mutex, memoryQuota uint64) *WriteExternalStoreOperator {
pool := workerpool.NewWorkerPool(
"WriteExternalStoreOperator",
util.DDL,
concurrency,
func() workerpool.Worker[IndexRecordChunk, IndexWriteResult] {
builder := external.NewWriterBuilder().
SetOnCloseFunc(onClose).
SetKeyDuplicationEncoding(index.Meta().Unique).SetMutex(shareMu)
SetKeyDuplicationEncoding(index.Meta().Unique).
SetMutex(shareMu).
SetMemorySizeLimit(memoryQuota)
writerID := uuid.New().String()
prefix := path.Join(strconv.Itoa(int(jobID)), strconv.Itoa(int(subtaskID)))
writer := builder.Build(store, prefix, writerID)
Expand Down

0 comments on commit 71f0b49

Please sign in to comment.