From 34ce8e34cc53e209c89a41bafb83c6a0b7cb29b6 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Sun, 5 Feb 2023 15:39:56 +0800 Subject: [PATCH] This is an automated cherry-pick of #41036 Signed-off-by: ti-chi-bot --- executor/index_merge_reader.go | 432 +++++++++++++++++++++++++++- executor/index_merge_reader_test.go | 277 ++++++++++++++++++ 2 files changed, 698 insertions(+), 11 deletions(-) diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index 939826edd7170..3e3371c17c96d 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -49,6 +49,14 @@ var ( _ Executor = &IndexMergeReaderExecutor{} ) +const ( + partialIndexWorkerType = "IndexMergePartialIndexWorker" + partialTableWorkerType = "IndexMergePartialTableWorker" + processWorkerType = "IndexMergeProcessWorker" + partTblIntersectionWorkerType = "IndexMergePartTblIntersectionWorker" + tableScanWorkerType = "IndexMergeTableScanWorker" +) + // IndexMergeReaderExecutor accesses a table with multiple index/table scan. // There are three types of workers: // 1. partialTableWorker/partialIndexWorker, which are used to fetch the handles @@ -87,10 +95,10 @@ type IndexMergeReaderExecutor struct { // All fields above are immutable. - tblWorkerWg sync.WaitGroup - idxWorkerWg sync.WaitGroup - processWokerWg sync.WaitGroup - finished chan struct{} + tblWorkerWg sync.WaitGroup + idxWorkerWg sync.WaitGroup + processWorkerWg sync.WaitGroup + finished chan struct{} workerStarted bool keyRanges [][]kv.KeyRange @@ -245,20 +253,34 @@ func (e *IndexMergeReaderExecutor) startIndexMergeProcessWorker(ctx context.Cont indexMerge: e, stats: e.stats, } - e.processWokerWg.Add(1) + e.processWorkerWg.Add(1) go func() { defer trace.StartRegion(ctx, "IndexMergeProcessWorker").End() util.WithRecovery( func() { idxMergeProcessWorker.fetchLoop(ctx, fetch, workCh, e.resultCh, e.finished) }, +<<<<<<< HEAD idxMergeProcessWorker.handleLoopFetcherPanic(ctx, e.resultCh), +======= + handleWorkerPanic(ctx, e.finished, e.resultCh, nil, processWorkerType), +>>>>>>> bc2c1b229df (executor: fix IndexMerge handle panic logic (#41036)) ) - e.processWokerWg.Done() + e.processWorkerWg.Done() }() } +<<<<<<< HEAD func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *lookupTableTask, workID int) error { +======= +func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *indexMergeTableTask, workID int) error { + failpoint.Inject("testIndexMergeResultChCloseEarly", func(_ failpoint.Value) { + // Wait for processWorker to close resultCh. + time.Sleep(2) + // Should use fetchCh instead of resultCh to send error. + syncErr(ctx, e.finished, fetchCh, errors.New("testIndexMergeResultChCloseEarly")) + }) +>>>>>>> bc2c1b229df (executor: fix IndexMerge handle panic logic (#41036)) if e.runtimeStats != nil { collExec := true e.dagPBs[workID].CollectExecutionSummaries = &collExec @@ -282,6 +304,17 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, defer e.idxWorkerWg.Done() util.WithRecovery( func() { + failpoint.Inject("testIndexMergePanicPartialIndexWorker", nil) + failpoint.Inject("mockSleepBeforeStartTableReader", func(_ failpoint.Value) { + select { + case <-ctx.Done(): + failpoint.Return() + case <-e.finished: + failpoint.Return() + case <-exitCh: + failpoint.Return() + } + }) worker := &partialIndexWorker{ stats: e.stats, idxID: e.getPartitalPlanID(workID), @@ -289,13 +322,19 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, batchSize: e.maxChunkSize, maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize, maxChunkSize: e.maxChunkSize, + memTracker: e.memTracker, } if e.isCorColInPartialFilters[workID] { // We got correlated column, so need to refresh Selection operator. var err error +<<<<<<< HEAD if e.dagPBs[workID].Executors, _, err = constructDistExec(e.ctx, e.partialPlans[workID]); err != nil { worker.syncErr(e.resultCh, err) +======= + if e.dagPBs[workID].Executors, err = constructDistExec(e.ctx, e.partialPlans[workID]); err != nil { + syncErr(ctx, e.finished, fetchCh, err) +>>>>>>> bc2c1b229df (executor: fix IndexMerge handle panic logic (#41036)) return } } @@ -323,12 +362,20 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, // init kvReq and worker for this partition kvReq, err := builder.SetKeyRanges(keyRange).Build() if err != nil { +<<<<<<< HEAD worker.syncErr(e.resultCh, err) +======= + syncErr(ctx, e.finished, fetchCh, err) +>>>>>>> bc2c1b229df (executor: fix IndexMerge handle panic logic (#41036)) return } result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, e.handleCols.GetFieldsTypes(), e.feedbacks[workID], getPhysicalPlanIDs(e.partialPlans[workID]), e.getPartitalPlanID(workID)) if err != nil { +<<<<<<< HEAD worker.syncErr(e.resultCh, err) +======= + syncErr(ctx, e.finished, fetchCh, err) +>>>>>>> bc2c1b229df (executor: fix IndexMerge handle panic logic (#41036)) return } worker.batchSize = e.maxChunkSize @@ -341,7 +388,11 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, // fetch all data from this partition ctx1, cancel := context.WithCancel(ctx) +<<<<<<< HEAD _, fetchErr := worker.fetchHandles(ctx1, result, exitCh, fetchCh, e.resultCh, e.finished, e.handleCols) +======= + _, fetchErr := worker.fetchHandles(ctx1, result, exitCh, fetchCh, e.finished, e.handleCols, parTblIdx) +>>>>>>> bc2c1b229df (executor: fix IndexMerge handle panic logic (#41036)) if fetchErr != nil { // this error is synced in fetchHandles(), don't sync it again e.feedbacks[workID].Invalidate() } @@ -355,7 +406,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, } } }, - e.handleHandlesFetcherPanic(ctx, e.resultCh, "partialIndexWorker"), + handleWorkerPanic(ctx, e.finished, fetchCh, nil, "partialIndexWorker"), ) }() @@ -379,6 +430,17 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, defer e.idxWorkerWg.Done() util.WithRecovery( func() { + failpoint.Inject("testIndexMergePanicPartialTableWorker", nil) + failpoint.Inject("mockSleepBeforeStartTableReader", func(_ failpoint.Value) { + select { + case <-ctx.Done(): + failpoint.Return() + case <-e.finished: + failpoint.Return() + case <-exitCh: + failpoint.Return() + } + }) var err error partialTableReader := &TableReaderExecutor{ baseExecutor: newBaseExecutor(e.ctx, ts.Schema(), e.getPartitalPlanID(workID)), @@ -399,11 +461,17 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize, maxChunkSize: e.maxChunkSize, tableReader: partialTableReader, + memTracker: e.memTracker, } if e.isCorColInPartialFilters[workID] { +<<<<<<< HEAD if e.dagPBs[workID].Executors, _, err = constructDistExec(e.ctx, e.partialPlans[workID]); err != nil { worker.syncErr(e.resultCh, err) +======= + if e.dagPBs[workID].Executors, err = constructDistExec(e.ctx, e.partialPlans[workID]); err != nil { + syncErr(ctx, e.finished, fetchCh, err) +>>>>>>> bc2c1b229df (executor: fix IndexMerge handle panic logic (#41036)) return } partialTableReader.dagPB = e.dagPBs[workID] @@ -421,7 +489,11 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, partialTableReader.table = tbl if err = partialTableReader.Open(ctx); err != nil { logutil.Logger(ctx).Error("open Select result failed:", zap.Error(err)) +<<<<<<< HEAD worker.syncErr(e.resultCh, err) +======= + syncErr(ctx, e.finished, fetchCh, err) +>>>>>>> bc2c1b229df (executor: fix IndexMerge handle panic logic (#41036)) break } worker.batchSize = e.maxChunkSize @@ -434,7 +506,11 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, // fetch all handles from this table ctx1, cancel := context.WithCancel(ctx) +<<<<<<< HEAD _, fetchErr := worker.fetchHandles(ctx1, exitCh, fetchCh, e.resultCh, e.finished, e.handleCols) +======= + _, fetchErr := worker.fetchHandles(ctx1, exitCh, fetchCh, e.finished, e.handleCols, parTblIdx) +>>>>>>> bc2c1b229df (executor: fix IndexMerge handle panic logic (#41036)) if fetchErr != nil { // this error is synced in fetchHandles, so don't sync it again e.feedbacks[workID].Invalidate() } @@ -450,7 +526,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, } } }, - e.handleHandlesFetcherPanic(ctx, e.resultCh, "partialTableWorker"), + handleWorkerPanic(ctx, e.finished, fetchCh, nil, "partialTableWorker"), ) }() return nil @@ -487,8 +563,10 @@ type partialTableWorker struct { maxChunkSize int tableReader Executor partition table.PhysicalTable // it indicates if this worker is accessing a particular partition table + memTracker *memory.Tracker } +<<<<<<< HEAD func (w *partialTableWorker) syncErr(resultCh chan<- *lookupTableTask, err error) { doneCh := make(chan error, 1) doneCh <- err @@ -500,6 +578,11 @@ func (w *partialTableWorker) syncErr(resultCh chan<- *lookupTableTask, err error func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *lookupTableTask, resultCh chan<- *lookupTableTask, finished <-chan struct{}, handleCols plannercore.HandleCols) (count int64, err error) { chk := chunk.NewChunkWithCapacity(retTypes(w.tableReader), w.maxChunkSize) +======= +func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *indexMergeTableTask, + finished <-chan struct{}, handleCols plannercore.HandleCols, parTblIdx int) (count int64, err error) { + chk := w.sc.GetSessionVars().GetNewChunkWithCapacity(retTypes(w.tableReader), w.maxChunkSize, w.maxChunkSize, w.tableReader.base().AllocPool) +>>>>>>> bc2c1b229df (executor: fix IndexMerge handle panic logic (#41036)) var basic *execdetails.BasicRuntimeStats if be := w.tableReader.base(); be != nil && be.runtimeStats != nil { basic = be.runtimeStats @@ -508,7 +591,11 @@ func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan str start := time.Now() handles, retChunk, err := w.extractTaskHandles(ctx, chk, handleCols) if err != nil { +<<<<<<< HEAD w.syncErr(resultCh, err) +======= + syncErr(ctx, finished, fetchCh, err) +>>>>>>> bc2c1b229df (executor: fix IndexMerge handle panic logic (#41036)) return count, err } if len(handles) == 0 { @@ -537,6 +624,8 @@ func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan str func (w *partialTableWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, handleCols plannercore.HandleCols) ( handles []kv.Handle, retChk *chunk.Chunk, err error) { handles = make([]kv.Handle, 0, w.batchSize) + var memUsage int64 + defer w.memTracker.Consume(-memUsage) for len(handles) < w.batchSize { chk.SetRequiredRows(w.batchSize-len(handles), w.maxChunkSize) err = errors.Trace(w.tableReader.Next(ctx, chk)) @@ -544,8 +633,14 @@ func (w *partialTableWorker) extractTaskHandles(ctx context.Context, chk *chunk. return handles, nil, err } if chk.NumRows() == 0 { + failpoint.Inject("testIndexMergeErrorPartialTableWorker", func(v failpoint.Value) { + failpoint.Return(handles, nil, errors.New(v.(string))) + }) return handles, retChk, nil } + memDelta := chk.MemoryUsage() + memUsage += memDelta + w.memTracker.Consume(memDelta) for i := 0; i < chk.NumRows(); i++ { handle, err := handleCols.BuildHandle(chk.GetRow(i)) if err != nil { @@ -590,8 +685,18 @@ func (e *IndexMergeReaderExecutor) startIndexMergeTableScanWorker(ctx context.Co defer trace.StartRegion(ctx, "IndexMergeTableScanWorker").End() var task *lookupTableTask util.WithRecovery( +<<<<<<< HEAD func() { task = worker.pickAndExecTask(ctx1) }, worker.handlePickAndExecTaskPanic(ctx1, task), +======= + // Note we use the address of `task` as the argument of both `pickAndExecTask` and `handleTableScanWorkerPanic` + // because `task` is expected to be assigned in `pickAndExecTask`, and this assignment should also be visible + // in `handleTableScanWorkerPanic` since it will get `doneCh` from `task`. Golang always pass argument by value, + // so if we don't use the address of `task` as the argument, the assignment to `task` in `pickAndExecTask` is + // not visible in `handleTableScanWorkerPanic` + func() { worker.pickAndExecTask(ctx1, &task) }, + worker.handleTableScanWorkerPanic(ctx1, e.finished, &task, tableScanWorkerType), +>>>>>>> bc2c1b229df (executor: fix IndexMerge handle panic logic (#41036)) ) cancel() e.tblWorkerWg.Done() @@ -676,18 +781,46 @@ func (e *IndexMergeReaderExecutor) getResultTask() (*lookupTableTask, error) { return e.resultCurr, nil } +<<<<<<< HEAD func (e *IndexMergeReaderExecutor) handleHandlesFetcherPanic(ctx context.Context, resultCh chan<- *lookupTableTask, worker string) func(r interface{}) { +======= +func handleWorkerPanic(ctx context.Context, finished <-chan struct{}, ch chan<- *indexMergeTableTask, extraNotifyCh chan bool, worker string) func(r interface{}) { +>>>>>>> bc2c1b229df (executor: fix IndexMerge handle panic logic (#41036)) return func(r interface{}) { + if worker == processWorkerType { + // There is only one processWorker, so it's safe to close here. + // No need to worry about "close on closed channel" error. + defer close(ch) + } if r == nil { return } - err4Panic := errors.Errorf("panic in IndexMergeReaderExecutor %s: %v", worker, r) + if extraNotifyCh != nil { + extraNotifyCh <- true + } + + err4Panic := errors.Errorf("%s: %v", worker, r) logutil.Logger(ctx).Error(err4Panic.Error()) doneCh := make(chan error, 1) doneCh <- err4Panic +<<<<<<< HEAD resultCh <- &lookupTableTask{ doneCh: doneCh, +======= + task := &indexMergeTableTask{ + lookupTableTask: lookupTableTask{ + doneCh: doneCh, + }, +>>>>>>> bc2c1b229df (executor: fix IndexMerge handle panic logic (#41036)) + } + select { + case <-ctx.Done(): + return + case <-finished: + return + case ch <- task: + return } } } @@ -698,9 +831,9 @@ func (e *IndexMergeReaderExecutor) Close() error { return nil } close(e.finished) - e.processWokerWg.Wait() e.tblWorkerWg.Wait() e.idxWorkerWg.Wait() + e.processWorkerWg.Wait() e.finished = nil e.workerStarted = false // TODO: how to store e.feedbacks @@ -712,19 +845,43 @@ type indexMergeProcessWorker struct { stats *IndexMergeRuntimeStat } +<<<<<<< HEAD func (w *indexMergeProcessWorker) fetchLoop(ctx context.Context, fetchCh <-chan *lookupTableTask, workCh chan<- *lookupTableTask, resultCh chan<- *lookupTableTask, finished <-chan struct{}) { defer func() { close(workCh) close(resultCh) }() +======= +func (w *indexMergeProcessWorker) fetchLoopUnion(ctx context.Context, fetchCh <-chan *indexMergeTableTask, + workCh chan<- *indexMergeTableTask, resultCh chan<- *indexMergeTableTask, finished <-chan struct{}) { + failpoint.Inject("testIndexMergeResultChCloseEarly", func(_ failpoint.Value) { + failpoint.Return() + }) + memTracker := memory.NewTracker(w.indexMerge.id, -1) + memTracker.AttachTo(w.indexMerge.memTracker) + defer memTracker.Detach() + defer close(workCh) + failpoint.Inject("testIndexMergePanicProcessWorkerUnion", nil) +>>>>>>> bc2c1b229df (executor: fix IndexMerge handle panic logic (#41036)) distinctHandles := make(map[int64]*kv.HandleMap) for task := range fetchCh { + select { + case err := <-task.doneCh: + // If got error from partialIndexWorker/partialTableWorker, stop processing. + if err != nil { + syncErr(ctx, finished, resultCh, err) + return + } + default: + } start := time.Now() handles := task.handles fhs := make([]kv.Handle, 0, 8) + memTracker.Consume(int64(cap(task.handles) * 8)) + var tblID int64 if w.indexMerge.partitionTableMode { tblID = getPhysicalTableID(task.partitionTable) @@ -765,6 +922,7 @@ func (w *indexMergeProcessWorker) fetchLoop(ctx context.Context, fetchCh <-chan } } +<<<<<<< HEAD func (w *indexMergeProcessWorker) handleLoopFetcherPanic(ctx context.Context, resultCh chan<- *lookupTableTask) func(r interface{}) { return func(r interface{}) { if r == nil { @@ -777,10 +935,196 @@ func (w *indexMergeProcessWorker) handleLoopFetcherPanic(ctx context.Context, re doneCh <- err4Panic resultCh <- &lookupTableTask{ doneCh: doneCh, +======= +type intersectionProcessWorker struct { + // key: parTblIdx, val: HandleMap + // Value of MemAwareHandleMap is *int to avoid extra Get(). + handleMapsPerWorker map[int]*kv.MemAwareHandleMap[*int] + workerID int + workerCh chan *indexMergeTableTask + indexMerge *IndexMergeReaderExecutor + memTracker *memory.Tracker + batchSize int + + // When rowDelta == memConsumeBatchSize, Consume(memUsage) + rowDelta int64 + mapUsageDelta int64 +} + +func (w *intersectionProcessWorker) consumeMemDelta() { + w.memTracker.Consume(w.mapUsageDelta + w.rowDelta*int64(unsafe.Sizeof(int(0)))) + w.mapUsageDelta = 0 + w.rowDelta = 0 +} + +func (w *intersectionProcessWorker) doIntersectionPerPartition(ctx context.Context, workCh chan<- *indexMergeTableTask, resultCh chan<- *indexMergeTableTask, finished <-chan struct{}) { + failpoint.Inject("testIndexMergePanicPartitionTableIntersectionWorker", nil) + defer w.memTracker.Detach() + + for task := range w.workerCh { + var ok bool + var hMap *kv.MemAwareHandleMap[*int] + if hMap, ok = w.handleMapsPerWorker[task.parTblIdx]; !ok { + hMap = kv.NewMemAwareHandleMap[*int]() + w.handleMapsPerWorker[task.parTblIdx] = hMap + } + var mapDelta int64 + var rowDelta int64 + for _, h := range task.handles { + // Use *int to avoid Get() again. + if cntPtr, ok := hMap.Get(h); ok { + (*cntPtr)++ + } else { + cnt := 1 + mapDelta += hMap.Set(h, &cnt) + int64(h.ExtraMemSize()) + rowDelta += 1 + } + } + + logutil.BgLogger().Debug("intersectionProcessWorker handle tasks", zap.Int("workerID", w.workerID), + zap.Int("task.handles", len(task.handles)), zap.Int64("rowDelta", rowDelta)) + + w.mapUsageDelta += mapDelta + w.rowDelta += rowDelta + if w.rowDelta >= int64(w.batchSize) { + w.consumeMemDelta() + } + failpoint.Inject("testIndexMergeIntersectionWorkerPanic", nil) + } + if w.rowDelta > 0 { + w.consumeMemDelta() + } + + // We assume the result of intersection is small, so no need to track memory. + intersectedMap := make(map[int][]kv.Handle, len(w.handleMapsPerWorker)) + for parTblIdx, hMap := range w.handleMapsPerWorker { + hMap.Range(func(h kv.Handle, val interface{}) bool { + if *(val.(*int)) == len(w.indexMerge.partialPlans) { + // Means all partial paths have this handle. + intersectedMap[parTblIdx] = append(intersectedMap[parTblIdx], h) + } + return true + }) + } + + tasks := make([]*indexMergeTableTask, 0, len(w.handleMapsPerWorker)) + for parTblIdx, intersected := range intersectedMap { + // Split intersected[parTblIdx] to avoid task is too large. + for len(intersected) > 0 { + length := w.batchSize + if length > len(intersected) { + length = len(intersected) + } + task := &indexMergeTableTask{ + lookupTableTask: lookupTableTask{ + handles: intersected[:length], + doneCh: make(chan error, 1), + }, + } + intersected = intersected[length:] + if w.indexMerge.partitionTableMode { + task.partitionTable = w.indexMerge.prunedPartitions[parTblIdx] + } + tasks = append(tasks, task) + logutil.BgLogger().Debug("intersectionProcessWorker build tasks", + zap.Int("parTblIdx", parTblIdx), zap.Int("task.handles", len(task.handles))) + } + } + for _, task := range tasks { + select { + case <-ctx.Done(): + return + case <-finished: + return + case workCh <- task: + resultCh <- task +>>>>>>> bc2c1b229df (executor: fix IndexMerge handle panic logic (#41036)) } } } +<<<<<<< HEAD +======= +// For each partition(dynamic mode), a map is used to do intersection. Key of the map is handle, and value is the number of times it occurs. +// If the value of handle equals the number of partial paths, it should be sent to final_table_scan_worker. +// To avoid too many goroutines, each intersectionProcessWorker can handle multiple partitions. +func (w *indexMergeProcessWorker) fetchLoopIntersection(ctx context.Context, fetchCh <-chan *indexMergeTableTask, + workCh chan<- *indexMergeTableTask, resultCh chan<- *indexMergeTableTask, finished <-chan struct{}) { + defer close(workCh) + + if w.stats != nil { + start := time.Now() + defer func() { + w.stats.IndexMergeProcess += time.Since(start) + }() + } + + failpoint.Inject("testIndexMergePanicProcessWorkerIntersection", nil) + + // One goroutine may handle one or multiple partitions. + // Max number of partition number is 8192, we use ExecutorConcurrency to avoid too many goroutines. + maxWorkerCnt := w.indexMerge.ctx.GetSessionVars().IndexMergeIntersectionConcurrency() + maxChannelSize := atomic.LoadInt32(&LookupTableTaskChannelSize) + batchSize := w.indexMerge.ctx.GetSessionVars().IndexLookupSize + + partCnt := 1 + if w.indexMerge.partitionTableMode { + partCnt = len(w.indexMerge.prunedPartitions) + } + workerCnt := mathutil.Min(partCnt, maxWorkerCnt) + failpoint.Inject("testIndexMergeIntersectionConcurrency", func(val failpoint.Value) { + con := val.(int) + if con != workerCnt { + panic(fmt.Sprintf("unexpected workerCnt, expect %d, got %d", con, workerCnt)) + } + }) + + workers := make([]*intersectionProcessWorker, 0, workerCnt) + wg := util.WaitGroupWrapper{} + errCh := make(chan bool, workerCnt) + for i := 0; i < workerCnt; i++ { + tracker := memory.NewTracker(w.indexMerge.id, -1) + tracker.AttachTo(w.indexMerge.memTracker) + worker := &intersectionProcessWorker{ + workerID: i, + handleMapsPerWorker: make(map[int]*kv.MemAwareHandleMap[*int]), + workerCh: make(chan *indexMergeTableTask, maxChannelSize), + indexMerge: w.indexMerge, + memTracker: tracker, + batchSize: batchSize, + } + wg.RunWithRecover(func() { + defer trace.StartRegion(ctx, "IndexMergeIntersectionProcessWorker").End() + worker.doIntersectionPerPartition(ctx, workCh, resultCh, finished) + }, handleWorkerPanic(ctx, finished, resultCh, errCh, partTblIntersectionWorkerType)) + workers = append(workers, worker) + } +loop: + for task := range fetchCh { + select { + case err := <-task.doneCh: + // If got error from partialIndexWorker/partialTableWorker, stop processing. + if err != nil { + syncErr(ctx, finished, resultCh, err) + break loop + } + default: + } + + select { + case workers[task.parTblIdx%workerCnt].workerCh <- task: + case <-errCh: + // If got error from intersectionProcessWorker, stop processing. + break loop + } + } + for _, processWorker := range workers { + close(processWorker.workerCh) + } + wg.Wait() +} + +>>>>>>> bc2c1b229df (executor: fix IndexMerge handle panic logic (#41036)) type partialIndexWorker struct { stats *IndexMergeRuntimeStat sc sessionctx.Context @@ -789,13 +1133,35 @@ type partialIndexWorker struct { maxBatchSize int maxChunkSize int partition table.PhysicalTable // it indicates if this worker is accessing a particular partition table + memTracker *memory.Tracker } +<<<<<<< HEAD func (w *partialIndexWorker) syncErr(resultCh chan<- *lookupTableTask, err error) { doneCh := make(chan error, 1) doneCh <- err resultCh <- &lookupTableTask{ doneCh: doneCh, +======= +func syncErr(ctx context.Context, finished <-chan struct{}, errCh chan<- *indexMergeTableTask, err error) { + logutil.BgLogger().Error("IndexMergeReaderExecutor.syncErr", zap.Error(err)) + doneCh := make(chan error, 1) + doneCh <- err + task := &indexMergeTableTask{ + lookupTableTask: lookupTableTask{ + doneCh: doneCh, + }, +>>>>>>> bc2c1b229df (executor: fix IndexMerge handle panic logic (#41036)) + } + + // ctx.Done and finished is to avoid write channel is stuck. + select { + case <-ctx.Done(): + return + case <-finished: + return + case errCh <- task: + return } } @@ -803,8 +1169,12 @@ func (w *partialIndexWorker) fetchHandles( ctx context.Context, result distsql.SelectResult, exitCh <-chan struct{}, +<<<<<<< HEAD fetchCh chan<- *lookupTableTask, resultCh chan<- *lookupTableTask, +======= + fetchCh chan<- *indexMergeTableTask, +>>>>>>> bc2c1b229df (executor: fix IndexMerge handle panic logic (#41036)) finished <-chan struct{}, handleCols plannercore.HandleCols) (count int64, err error) { chk := chunk.NewChunkWithCapacity(handleCols.GetFieldsTypes(), w.maxChunkSize) @@ -819,7 +1189,11 @@ func (w *partialIndexWorker) fetchHandles( start := time.Now() handles, retChunk, err := w.extractTaskHandles(ctx, chk, result, handleCols) if err != nil { +<<<<<<< HEAD w.syncErr(resultCh, err) +======= + syncErr(ctx, finished, fetchCh, err) +>>>>>>> bc2c1b229df (executor: fix IndexMerge handle panic logic (#41036)) return count, err } if len(handles) == 0 { @@ -848,6 +1222,8 @@ func (w *partialIndexWorker) fetchHandles( func (w *partialIndexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult, handleCols plannercore.HandleCols) ( handles []kv.Handle, retChk *chunk.Chunk, err error) { handles = make([]kv.Handle, 0, w.batchSize) + var memUsage int64 + defer w.memTracker.Consume(-memUsage) for len(handles) < w.batchSize { chk.SetRequiredRows(w.batchSize-len(handles), w.maxChunkSize) err = errors.Trace(idxResult.Next(ctx, chk)) @@ -855,8 +1231,14 @@ func (w *partialIndexWorker) extractTaskHandles(ctx context.Context, chk *chunk. return handles, nil, err } if chk.NumRows() == 0 { + failpoint.Inject("testIndexMergeErrorPartialIndexWorker", func(v failpoint.Value) { + failpoint.Return(handles, nil, errors.New(v.(string))) + }) return handles, retChk, nil } + memDelta := chk.MemoryUsage() + memUsage += memDelta + w.memTracker.Consume(memDelta) for i := 0; i < chk.NumRows(); i++ { handle, err := handleCols.BuildHandleFromIndexRow(chk.GetRow(i)) if err != nil { @@ -907,6 +1289,17 @@ func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context) (task * case <-w.finished: return } + // Make sure panic failpoint is after fetch task from workCh. + // Otherwise cannot send error to task.doneCh. + failpoint.Inject("testIndexMergePanicTableScanWorker", nil) + failpoint.Inject("mockSleepBeforeStartTableReader", func(_ failpoint.Value) { + select { + case <-ctx.Done(): + failpoint.Return() + case <-w.finished: + failpoint.Return() + } + }) execStart := time.Now() err := w.executeTask(ctx, task) if w.stats != nil { @@ -918,15 +1311,32 @@ func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context) (task * } } +<<<<<<< HEAD func (w *indexMergeTableScanWorker) handlePickAndExecTaskPanic(ctx context.Context, task *lookupTableTask) func(r interface{}) { +======= +func (w *indexMergeTableScanWorker) handleTableScanWorkerPanic(ctx context.Context, finished <-chan struct{}, task **indexMergeTableTask, worker string) func(r interface{}) { +>>>>>>> bc2c1b229df (executor: fix IndexMerge handle panic logic (#41036)) return func(r interface{}) { if r == nil { return } - err4Panic := errors.Errorf("panic in IndexMergeReaderExecutor indexMergeTableWorker: %v", r) + err4Panic := errors.Errorf("%s: %v", worker, r) logutil.Logger(ctx).Error(err4Panic.Error()) +<<<<<<< HEAD task.doneCh <- err4Panic +======= + if *task != nil { + select { + case <-ctx.Done(): + return + case <-finished: + return + case (*task).doneCh <- err4Panic: + return + } + } +>>>>>>> bc2c1b229df (executor: fix IndexMerge handle panic logic (#41036)) } } diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index c09d5429dcb5a..715587370fce0 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -584,3 +584,280 @@ func TestPessimisticLockOnPartitionForIndexMerge(t *testing.T) { // TODO: add support for index merge reader in dynamic tidb_partition_prune_mode } +<<<<<<< HEAD +======= + +func TestIndexMergeIntersectionConcurrency(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3)) partition by hash(c1) partitions 10;") + tk.MustExec("insert into t1 values(1, 1, 3000), (2, 1, 1)") + tk.MustExec("analyze table t1;") + tk.MustExec("set tidb_partition_prune_mode = 'dynamic'") + res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Rows() + require.Contains(t, res[1][0], "IndexMerge") + + // Default is tidb_executor_concurrency. + res = tk.MustQuery("select @@tidb_executor_concurrency;").Sort().Rows() + defExecCon := res[0][0].(string) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", fmt.Sprintf("return(%s)", defExecCon))) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency")) + }() + tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1")) + + tk.MustExec("set tidb_executor_concurrency = 10") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(10)")) + tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1")) + // workerCnt = min(part_num, concurrency) + tk.MustExec("set tidb_executor_concurrency = 20") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(10)")) + tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1")) + tk.MustExec("set tidb_executor_concurrency = 2") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(2)")) + tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1")) + + tk.MustExec("set tidb_index_merge_intersection_concurrency = 9") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(9)")) + tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1")) + tk.MustExec("set tidb_index_merge_intersection_concurrency = 21") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(10)")) + tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1")) + tk.MustExec("set tidb_index_merge_intersection_concurrency = 3") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(3)")) + tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1")) + + // Concurrency only works for dynamic pruning partition table, so real concurrency is 1. + tk.MustExec("set tidb_partition_prune_mode = 'static'") + tk.MustExec("set tidb_index_merge_intersection_concurrency = 9") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(1)")) + tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1")) + + // Concurrency only works for dynamic pruning partition table. so real concurrency is 1. + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3));") + tk.MustExec("insert into t1 values(1, 1, 3000), (2, 1, 1)") + tk.MustExec("set tidb_index_merge_intersection_concurrency = 9") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(1)")) + tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1")) +} + +func TestIntersectionWithDifferentConcurrency(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + var execCon []int + tblSchemas := []string{ + // partition table + "create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3)) partition by hash(c1) partitions 10;", + // non-partition table + "create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3));", + } + + for tblIdx, tblSchema := range tblSchemas { + if tblIdx == 0 { + // Test different intersectionProcessWorker with partition table(10 partitions). + execCon = []int{1, 3, 10, 11, 20} + } else { + // Default concurrency. + execCon = []int{5} + } + tk.MustExec("use test") + tk.MustExec("drop table if exists t1;") + tk.MustExec(tblSchema) + + const queryCnt int = 10 + const rowCnt int = 1000 + curRowCnt := 0 + insertStr := "insert into t1 values" + for i := 0; i < rowCnt; i++ { + if i != 0 { + insertStr += ", " + } + insertStr += fmt.Sprintf("(%d, %d, %d)", i, rand.Int(), rand.Int()) + curRowCnt++ + } + tk.MustExec(insertStr) + tk.MustExec("analyze table t1") + + for _, concurrency := range execCon { + tk.MustExec(fmt.Sprintf("set tidb_executor_concurrency = %d", concurrency)) + for i := 0; i < 2; i++ { + if i == 0 { + // Dynamic mode. + tk.MustExec("set tidb_partition_prune_mode = 'dynamic'") + res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024") + require.Contains(t, res.Rows()[1][0], "IndexMerge") + } else { + tk.MustExec("set tidb_partition_prune_mode = 'static'") + res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024") + if tblIdx == 0 { + // partition table + require.Contains(t, res.Rows()[1][0], "PartitionUnion") + require.Contains(t, res.Rows()[2][0], "IndexMerge") + } else { + require.Contains(t, res.Rows()[1][0], "IndexMerge") + } + } + for i := 0; i < queryCnt; i++ { + c3 := rand.Intn(1024) + res := tk.MustQuery(fmt.Sprintf("select /*+ no_index_merge() */ c1 from t1 where c2 < 1024 and c3 > %d", c3)).Sort().Rows() + tk.MustQuery(fmt.Sprintf("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > %d", c3)).Sort().Check(res) + } + + // In tranaction + for i := 0; i < queryCnt; i++ { + tk.MustExec("begin;") + r := rand.Intn(3) + if r == 0 { + tk.MustExec(fmt.Sprintf("update t1 set c3 = %d where c1 = %d", rand.Int(), rand.Intn(rowCnt))) + } else if r == 1 { + tk.MustExec(fmt.Sprintf("delete from t1 where c1 = %d", rand.Intn(rowCnt))) + } else if r == 2 { + tk.MustExec(fmt.Sprintf("insert into t1 values(%d, %d, %d)", curRowCnt, rand.Int(), rand.Int())) + curRowCnt++ + } + c3 := rand.Intn(1024) + res := tk.MustQuery(fmt.Sprintf("select /*+ no_index_merge() */ c1 from t1 where c2 < 1024 and c3 > %d", c3)).Sort().Rows() + tk.MustQuery(fmt.Sprintf("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > %d", c3)).Sort().Check(res) + tk.MustExec("commit;") + } + } + } + tk.MustExec("drop table t1") + } +} + +func TestIntersectionWorkerPanic(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3)) partition by hash(c1) partitions 10;") + tk.MustExec("insert into t1 values(1, 1, 3000), (2, 1, 1)") + tk.MustExec("analyze table t1;") + tk.MustExec("set tidb_partition_prune_mode = 'dynamic'") + res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Rows() + require.Contains(t, res[1][0], "IndexMerge") + + // Test panic in intersection. + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionWorkerPanic", `panic("testIndexMergeIntersectionWorkerPanic")`)) + err := tk.QueryToErr("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024") + require.Contains(t, err.Error(), "testIndexMergeIntersectionWorkerPanic") + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionWorkerPanic")) +} + +func TestIntersectionMemQuota(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(pk varchar(100) primary key, c1 int, c2 int, index idx1(c1), index idx2(c2))") + + insertStr := "insert into t1 values" + for i := 0; i < 20; i++ { + if i != 0 { + insertStr += ", " + } + insertStr += fmt.Sprintf("('%s', %d, %d)", testutil.RandStringRunes(100), 1, 1) + } + tk.MustExec(insertStr) + res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, idx1, idx2) */ c1 from t1 where c1 < 1024 and c2 < 1024").Rows() + require.Contains(t, res[1][0], "IndexMerge") + + tk.MustExec("set global tidb_mem_oom_action='CANCEL'") + defer tk.MustExec("set global tidb_mem_oom_action = DEFAULT") + tk.MustExec("set @@tidb_mem_quota_query = 4000") + err := tk.QueryToErr("select /*+ use_index_merge(t1, primary, idx1, idx2) */ c1 from t1 where c1 < 1024 and c2 < 1024") + require.Contains(t, err.Error(), "Out Of Memory Quota!") +} + +func TestIndexMergePanic(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3));") + tk.MustExec("insert into t1 values(1, 1, 1), (100, 100, 100)") + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeResultChCloseEarly", "return(true)")) + tk.MustExec("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c1 < 100 or c2 < 100") + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeResultChCloseEarly")) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3)) partition by hash(c1) partitions 10;") + insertStr := "insert into t1 values(0, 0, 0)" + for i := 1; i < 1000; i++ { + insertStr += fmt.Sprintf(", (%d, %d, %d)", i, i, i) + } + tk.MustExec(insertStr) + tk.MustExec("analyze table t1;") + tk.MustExec("set tidb_partition_prune_mode = 'dynamic'") + + minV := 200 + maxV := 1000 + runSQL := func(fp string) { + var sql string + v1 := rand.Intn(maxV-minV) + minV + v2 := rand.Intn(maxV-minV) + minV + if !strings.Contains(fp, "Intersection") { + sql = fmt.Sprintf("select /*+ use_index_merge(t1) */ c1 from t1 where c1 < %d or c2 < %d;", v1, v2) + } else { + sql = fmt.Sprintf("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c3 < %d and c2 < %d", v1, v2) + } + res := tk.MustQuery("explain " + sql).Rows() + require.Contains(t, res[1][0], "IndexMerge") + err := tk.QueryToErr(sql) + require.Contains(t, err.Error(), fp) + } + + packagePath := "github.com/pingcap/tidb/executor/" + panicFPPaths := []string{ + packagePath + "testIndexMergePanicPartialIndexWorker", + packagePath + "testIndexMergePanicPartialTableWorker", + + packagePath + "testIndexMergePanicProcessWorkerUnion", + packagePath + "testIndexMergePanicProcessWorkerIntersection", + packagePath + "testIndexMergePanicPartitionTableIntersectionWorker", + + packagePath + "testIndexMergePanicTableScanWorker", + } + for _, fp := range panicFPPaths { + fmt.Println("handling failpoint: ", fp) + if !strings.Contains(fp, "testIndexMergePanicTableScanWorker") { + // When mockSleepBeforeStartTableReader is enabled, will not read real data. This is to avoid leaking goroutines in coprocessor. + // But should disable mockSleepBeforeStartTableReader for testIndexMergePanicTableScanWorker. + // Because finalTableScanWorker need task.doneCh to pass error, so need partialIndexWorker/partialTableWorker runs normally. + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockSleepBeforeStartTableReader", "return(1000)")) + } + for i := 0; i < 1000; i++ { + require.NoError(t, failpoint.Enable(fp, fmt.Sprintf(`panic("%s")`, fp))) + runSQL(fp) + require.NoError(t, failpoint.Disable(fp)) + } + if !strings.Contains(fp, "testIndexMergePanicTableScanWorker") { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockSleepBeforeStartTableReader")) + } + } + + errFPPaths := []string{ + packagePath + "testIndexMergeErrorPartialIndexWorker", + packagePath + "testIndexMergeErrorPartialTableWorker", + } + for _, fp := range errFPPaths { + fmt.Println("handling failpoint: ", fp) + require.NoError(t, failpoint.Enable(fp, fmt.Sprintf(`return("%s")`, fp))) + for i := 0; i < 100; i++ { + runSQL(fp) + } + require.NoError(t, failpoint.Disable(fp)) + } +} +>>>>>>> bc2c1b229df (executor: fix IndexMerge handle panic logic (#41036))