From c971661b3a73e1e47416079a691d31e0acfb57f1 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 19 Jun 2023 15:17:12 +0800 Subject: [PATCH] This is an automated cherry-pick of #44643 Signed-off-by: ti-chi-bot --- executor/builder.go | 82 +++++---- executor/cte.go | 420 +++++++++++++++++++++++++++++++++---------- executor/cte_test.go | 54 ++++++ 3 files changed, 424 insertions(+), 132 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 414dafa1eed1e..a9dddb610cad4 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -120,6 +120,7 @@ type executorBuilder struct { type CTEStorages struct { ResTbl cteutil.Storage IterInTbl cteutil.Storage + Producer *cteProducer } func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, replicaReadScope string) *executorBuilder { @@ -4858,33 +4859,39 @@ func (b *executorBuilder) buildTableSample(v *plannercore.PhysicalTableSample) * } func (b *executorBuilder) buildCTE(v *plannercore.PhysicalCTE) Executor { - // 1. Build seedPlan. if b.Ti != nil { b.Ti.UseNonRecursive = true } - seedExec := b.build(v.SeedPlan) - if b.err != nil { - return nil + if v.RecurPlan != nil && b.Ti != nil { + b.Ti.UseRecursive = true } - // 2. Build tables to store intermediate results. - chkSize := b.ctx.GetSessionVars().MaxChunkSize - tps := seedExec.base().retFieldTypes - // iterOutTbl will be constructed in CTEExec.Open(). - var resTbl cteutil.Storage - var iterInTbl cteutil.Storage - storageMap, ok := b.ctx.GetSessionVars().StmtCtx.CTEStorageMap.(map[int]*CTEStorages) if !ok { b.err = errors.New("type assertion for CTEStorageMap failed") return nil } + + chkSize := b.ctx.GetSessionVars().MaxChunkSize + // iterOutTbl will be constructed in CTEExec.Open(). + var resTbl cteutil.Storage + var iterInTbl cteutil.Storage + var producer *cteProducer storages, ok := storageMap[v.CTE.IDForStorage] if ok { // Storage already setup. resTbl = storages.ResTbl iterInTbl = storages.IterInTbl + producer = storages.Producer } else { + // Build seed part. + seedExec := b.build(v.SeedPlan) + if b.err != nil { + return nil + } + + // Setup storages. + tps := seedExec.base().retFieldTypes resTbl = cteutil.NewStorageRowContainer(tps, chkSize) if err := resTbl.OpenAndRef(); err != nil { b.err = err @@ -4896,38 +4903,39 @@ func (b *executorBuilder) buildCTE(v *plannercore.PhysicalCTE) Executor { return nil } storageMap[v.CTE.IDForStorage] = &CTEStorages{ResTbl: resTbl, IterInTbl: iterInTbl} - } - // 3. Build recursive part. - if v.RecurPlan != nil && b.Ti != nil { - b.Ti.UseRecursive = true - } - recursiveExec := b.build(v.RecurPlan) - if b.err != nil { - return nil - } + // Build recursive part. + recursiveExec := b.build(v.RecurPlan) + if b.err != nil { + return nil + } + var sel []int + if v.CTE.IsDistinct { + sel = make([]int, chkSize) + for i := 0; i < chkSize; i++ { + sel[i] = i + } + } - var sel []int - if v.CTE.IsDistinct { - sel = make([]int, chkSize) - for i := 0; i < chkSize; i++ { - sel[i] = i + producer = &cteProducer{ + ctx: b.ctx, + seedExec: seedExec, + recursiveExec: recursiveExec, + resTbl: resTbl, + iterInTbl: iterInTbl, + isDistinct: v.CTE.IsDistinct, + sel: sel, + hasLimit: v.CTE.HasLimit, + limitBeg: v.CTE.LimitBeg, + limitEnd: v.CTE.LimitEnd, + isInApply: v.CTE.IsInApply, } + storageMap[v.CTE.IDForStorage].Producer = producer } return &CTEExec{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), - seedExec: seedExec, - recursiveExec: recursiveExec, - resTbl: resTbl, - iterInTbl: iterInTbl, - chkIdx: 0, - isDistinct: v.CTE.IsDistinct, - sel: sel, - hasLimit: v.CTE.HasLimit, - limitBeg: v.CTE.LimitBeg, - limitEnd: v.CTE.LimitEnd, - isInApply: v.CTE.IsInApply, + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), + producer: producer, } } diff --git a/executor/cte.go b/executor/cte.go index 7e98064b1d8bd..7d472ec7b5f7d 100644 --- a/executor/cte.go +++ b/executor/cte.go @@ -63,6 +63,73 @@ var _ Executor = &CTEExec{} type CTEExec struct { baseExecutor + chkIdx int + producer *cteProducer + + // limit in recursive CTE. + cursor uint64 + meetFirstBatch bool +} + +// Open implements the Executor interface. +func (e *CTEExec) Open(ctx context.Context) (err error) { + e.reset() + if err := e.baseExecutor.Open(ctx); err != nil { + return err + } + + e.producer.resTbl.Lock() + defer e.producer.resTbl.Unlock() + + if e.producer.isInApply { + e.producer.reset() + } + if !e.producer.opened { + if err = e.producer.openProducer(ctx, e); err != nil { + return err + } + } + return nil +} + +// Next implements the Executor interface. +func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { + e.producer.resTbl.Lock() + defer e.producer.resTbl.Unlock() + if !e.producer.resTbl.Done() { + if err = e.producer.produce(ctx, e); err != nil { + return err + } + } + return e.producer.getChunk(ctx, e, req) +} + +// Close implements the Executor interface. +func (e *CTEExec) Close() (err error) { + e.producer.resTbl.Lock() + if !e.producer.closed { + err = e.producer.closeProducer() + } + e.producer.resTbl.Unlock() + if err != nil { + return err + } + return e.baseExecutor.Close() +} + +func (e *CTEExec) reset() { + e.chkIdx = 0 + e.cursor = 0 + e.meetFirstBatch = false +} + +type cteProducer struct { + opened bool + produced bool + closed bool + + ctx sessionctx.Context + seedExec Executor recursiveExec Executor @@ -74,9 +141,6 @@ type CTEExec struct { hashTbl baseHashTable - // Index of chunk to read from `resTbl`. - chkIdx int - // UNION ALL or UNION DISTINCT. isDistinct bool curIter int @@ -84,11 +148,9 @@ type CTEExec struct { sel []int // Limit related info. - hasLimit bool - limitBeg uint64 - limitEnd uint64 - cursor uint64 - meetFirstBatch bool + hasLimit bool + limitBeg uint64 + limitEnd uint64 memTracker *memory.Tracker diskTracker *disk.Tracker @@ -99,56 +161,87 @@ type CTEExec struct { isInApply bool } -// Open implements the Executor interface. -func (e *CTEExec) Open(ctx context.Context) (err error) { - e.reset() - if err := e.baseExecutor.Open(ctx); err != nil { - return err - } - - if e.seedExec == nil { +func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err error) { + if p.seedExec == nil { return errors.New("seedExec for CTEExec is nil") } - if err = e.seedExec.Open(ctx); err != nil { + if err = p.seedExec.Open(ctx); err != nil { return err } +<<<<<<< HEAD e.memTracker = memory.NewTracker(e.id, -1) e.diskTracker = disk.NewTracker(e.id, -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) e.diskTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.DiskTracker) +======= + if p.memTracker != nil { + p.memTracker.Reset() + } else { + p.memTracker = memory.NewTracker(cteExec.id, -1) + } + p.diskTracker = disk.NewTracker(cteExec.id, -1) + p.memTracker.AttachTo(p.ctx.GetSessionVars().StmtCtx.MemTracker) + p.diskTracker.AttachTo(p.ctx.GetSessionVars().StmtCtx.DiskTracker) +>>>>>>> cfef1b05ccd (executor: add CTEProducer that shared by all CTEExec (#44643)) - if e.recursiveExec != nil { - if err = e.recursiveExec.Open(ctx); err != nil { + if p.recursiveExec != nil { + if err = p.recursiveExec.Open(ctx); err != nil { return err } // For non-recursive CTE, the result will be put into resTbl directly. // So no need to build iterOutTbl. // Construct iterOutTbl in Open() instead of buildCTE(), because its destruct is in Close(). - recursiveTypes := e.recursiveExec.base().retFieldTypes - e.iterOutTbl = cteutil.NewStorageRowContainer(recursiveTypes, e.maxChunkSize) - if err = e.iterOutTbl.OpenAndRef(); err != nil { + recursiveTypes := p.recursiveExec.base().retFieldTypes + p.iterOutTbl = cteutil.NewStorageRowContainer(recursiveTypes, cteExec.maxChunkSize) + if err = p.iterOutTbl.OpenAndRef(); err != nil { return err } } - if e.isDistinct { - e.hashTbl = newConcurrentMapHashTable() - e.hCtx = &hashContext{ - allTypes: e.base().retFieldTypes, + if p.isDistinct { + p.hashTbl = newConcurrentMapHashTable() + p.hCtx = &hashContext{ + allTypes: cteExec.base().retFieldTypes, } // We use all columns to compute hash. - e.hCtx.keyColIdx = make([]int, len(e.hCtx.allTypes)) - for i := range e.hCtx.keyColIdx { - e.hCtx.keyColIdx[i] = i + p.hCtx.keyColIdx = make([]int, len(p.hCtx.allTypes)) + for i := range p.hCtx.keyColIdx { + p.hCtx.keyColIdx[i] = i } } + p.opened = true return nil } -// Next implements the Executor interface. -func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { +func (p *cteProducer) closeProducer() (err error) { + if err = p.seedExec.Close(); err != nil { + return err + } + if p.recursiveExec != nil { + if err = p.recursiveExec.Close(); err != nil { + return err + } + // `iterInTbl` and `resTbl` are shared by multiple operators, + // so will be closed when the SQL finishes. + if p.iterOutTbl != nil { + if err = p.iterOutTbl.DerefAndClose(); err != nil { + return err + } + } + } + p.closed = true + if p.isInApply { + if err = p.reopenTbls(); err != nil { + return err + } + } + return nil +} + +func (p *cteProducer) getChunk(ctx context.Context, cteExec *CTEExec, req *chunk.Chunk) (err error) { req.Reset() +<<<<<<< HEAD e.resTbl.Lock() defer e.resTbl.Unlock() if !e.resTbl.Done() { @@ -181,13 +274,13 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { return err } e.resTbl.SetDone() +======= + if p.hasLimit { + return p.nextChunkLimit(cteExec, req) +>>>>>>> cfef1b05ccd (executor: add CTEProducer that shared by all CTEExec (#44643)) } - - if e.hasLimit { - return e.nextChunkLimit(req) - } - if e.chkIdx < e.resTbl.NumChunks() { - res, err := e.resTbl.GetChunk(e.chkIdx) + if cteExec.chkIdx < p.resTbl.NumChunks() { + res, err := p.resTbl.GetChunk(cteExec.chkIdx) if err != nil { return err } @@ -195,11 +288,12 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { // Also we ignore copying rows not selected, because some operators like Projection // doesn't support swap column if chunk.sel is no nil. req.SwapColumns(res.CopyConstructSel()) - e.chkIdx++ + cteExec.chkIdx++ } return nil } +<<<<<<< HEAD // Close implements the Executor interface. func (e *CTEExec) Close() (err error) { e.reset() @@ -316,18 +410,24 @@ func (e *CTEExec) nextChunkLimit(req *chunk.Chunk) error { if !e.meetFirstBatch { for e.chkIdx < e.resTbl.NumChunks() { res, err := e.resTbl.GetChunk(e.chkIdx) +======= +func (p *cteProducer) nextChunkLimit(cteExec *CTEExec, req *chunk.Chunk) error { + if !cteExec.meetFirstBatch { + for cteExec.chkIdx < p.resTbl.NumChunks() { + res, err := p.resTbl.GetChunk(cteExec.chkIdx) +>>>>>>> cfef1b05ccd (executor: add CTEProducer that shared by all CTEExec (#44643)) if err != nil { return err } - e.chkIdx++ + cteExec.chkIdx++ numRows := uint64(res.NumRows()) - if newCursor := e.cursor + numRows; newCursor >= e.limitBeg { - e.meetFirstBatch = true - begInChk, endInChk := e.limitBeg-e.cursor, numRows - if newCursor > e.limitEnd { - endInChk = e.limitEnd - e.cursor + if newCursor := cteExec.cursor + numRows; newCursor >= p.limitBeg { + cteExec.meetFirstBatch = true + begInChk, endInChk := p.limitBeg-cteExec.cursor, numRows + if newCursor > p.limitEnd { + endInChk = p.limitEnd - cteExec.cursor } - e.cursor += endInChk + cteExec.cursor += endInChk if begInChk == endInChk { break } @@ -335,43 +435,172 @@ func (e *CTEExec) nextChunkLimit(req *chunk.Chunk) error { req.Append(tmpChk, int(begInChk), int(endInChk)) return nil } - e.cursor += numRows + cteExec.cursor += numRows } } - if e.chkIdx < e.resTbl.NumChunks() && e.cursor < e.limitEnd { - res, err := e.resTbl.GetChunk(e.chkIdx) + if cteExec.chkIdx < p.resTbl.NumChunks() && cteExec.cursor < p.limitEnd { + res, err := p.resTbl.GetChunk(cteExec.chkIdx) if err != nil { return err } - e.chkIdx++ + cteExec.chkIdx++ numRows := uint64(res.NumRows()) - if e.cursor+numRows > e.limitEnd { - numRows = e.limitEnd - e.cursor + if cteExec.cursor+numRows > p.limitEnd { + numRows = p.limitEnd - cteExec.cursor req.Append(res.CopyConstructSel(), 0, int(numRows)) } else { req.SwapColumns(res.CopyConstructSel()) } - e.cursor += numRows + cteExec.cursor += numRows + } + return nil +} + +func (p *cteProducer) produce(ctx context.Context, cteExec *CTEExec) (err error) { + if p.resTbl.Error() != nil { + return p.resTbl.Error() + } + resAction := setupCTEStorageTracker(p.resTbl, cteExec.ctx, p.memTracker, p.diskTracker) + iterInAction := setupCTEStorageTracker(p.iterInTbl, cteExec.ctx, p.memTracker, p.diskTracker) + var iterOutAction *chunk.SpillDiskAction + if p.iterOutTbl != nil { + iterOutAction = setupCTEStorageTracker(p.iterOutTbl, cteExec.ctx, p.memTracker, p.diskTracker) + } + + failpoint.Inject("testCTEStorageSpill", func(val failpoint.Value) { + if val.(bool) && variable.EnableTmpStorageOnOOM.Load() { + defer resAction.WaitForTest() + defer iterInAction.WaitForTest() + if iterOutAction != nil { + defer iterOutAction.WaitForTest() + } + } + }) + + if err = p.computeSeedPart(ctx); err != nil { + p.resTbl.SetError(err) + return err + } + if err = p.computeRecursivePart(ctx); err != nil { + p.resTbl.SetError(err) + return err } + p.resTbl.SetDone() return nil } -func (e *CTEExec) setupTblsForNewIteration() (err error) { - num := e.iterOutTbl.NumChunks() +func (p *cteProducer) computeSeedPart(ctx context.Context) (err error) { + defer func() { + if r := recover(); r != nil && err == nil { + err = errors.Errorf("%v", r) + } + }() + failpoint.Inject("testCTESeedPanic", nil) + p.curIter = 0 + p.iterInTbl.SetIter(p.curIter) + chks := make([]*chunk.Chunk, 0, 10) + for { + if p.limitDone(p.iterInTbl) { + break + } + chk := tryNewCacheChunk(p.seedExec) + if err = Next(ctx, p.seedExec, chk); err != nil { + return + } + if chk.NumRows() == 0 { + break + } + if chk, err = p.tryDedupAndAdd(chk, p.iterInTbl, p.hashTbl); err != nil { + return + } + chks = append(chks, chk) + } + // Initial resTbl is empty, so no need to deduplicate chk using resTbl. + // Just adding is ok. + for _, chk := range chks { + if err = p.resTbl.Add(chk); err != nil { + return + } + } + p.curIter++ + p.iterInTbl.SetIter(p.curIter) + + return +} + +func (p *cteProducer) computeRecursivePart(ctx context.Context) (err error) { + defer func() { + if r := recover(); r != nil && err == nil { + err = errors.Errorf("%v", r) + } + }() + failpoint.Inject("testCTERecursivePanic", nil) + if p.recursiveExec == nil || p.iterInTbl.NumChunks() == 0 { + return + } + + if p.curIter > p.ctx.GetSessionVars().CTEMaxRecursionDepth { + return exeerrors.ErrCTEMaxRecursionDepth.GenWithStackByArgs(p.curIter) + } + + if p.limitDone(p.resTbl) { + return + } + + for { + chk := tryNewCacheChunk(p.recursiveExec) + if err = Next(ctx, p.recursiveExec, chk); err != nil { + return + } + if chk.NumRows() == 0 { + if err = p.setupTblsForNewIteration(); err != nil { + return + } + if p.limitDone(p.resTbl) { + break + } + if p.iterInTbl.NumChunks() == 0 { + break + } + // Next iteration begins. Need use iterOutTbl as input of next iteration. + p.curIter++ + p.iterInTbl.SetIter(p.curIter) + if p.curIter > p.ctx.GetSessionVars().CTEMaxRecursionDepth { + return exeerrors.ErrCTEMaxRecursionDepth.GenWithStackByArgs(p.curIter) + } + // Make sure iterInTbl is setup before Close/Open, + // because some executors will read iterInTbl in Open() (like IndexLookupJoin). + if err = p.recursiveExec.Close(); err != nil { + return + } + if err = p.recursiveExec.Open(ctx); err != nil { + return + } + } else { + if err = p.iterOutTbl.Add(chk); err != nil { + return + } + } + } + return +} + +func (p *cteProducer) setupTblsForNewIteration() (err error) { + num := p.iterOutTbl.NumChunks() chks := make([]*chunk.Chunk, 0, num) // Setup resTbl's data. for i := 0; i < num; i++ { - chk, err := e.iterOutTbl.GetChunk(i) + chk, err := p.iterOutTbl.GetChunk(i) if err != nil { return err } // Data should be copied in UNION DISTINCT. // Because deduplicate() will change data in iterOutTbl, // which will cause panic when spilling data into disk concurrently. - if e.isDistinct { + if p.isDistinct { chk = chk.CopyConstruct() } - chk, err = e.tryDedupAndAdd(chk, e.resTbl, e.hashTbl) + chk, err = p.tryDedupAndAdd(chk, p.resTbl, p.hashTbl) if err != nil { return err } @@ -379,47 +608,48 @@ func (e *CTEExec) setupTblsForNewIteration() (err error) { } // Setup new iteration data in iterInTbl. - if err = e.iterInTbl.Reopen(); err != nil { + if err = p.iterInTbl.Reopen(); err != nil { return err } - if e.isDistinct { + if p.isDistinct { // Already deduplicated by resTbl, adding directly is ok. for _, chk := range chks { - if err = e.iterInTbl.Add(chk); err != nil { + if err = p.iterInTbl.Add(chk); err != nil { return err } } } else { - if err = e.iterInTbl.SwapData(e.iterOutTbl); err != nil { + if err = p.iterInTbl.SwapData(p.iterOutTbl); err != nil { return err } } // Clear data in iterOutTbl. - return e.iterOutTbl.Reopen() + return p.iterOutTbl.Reopen() } -func (e *CTEExec) reset() { - e.curIter = 0 - e.chkIdx = 0 - e.hashTbl = nil - e.cursor = 0 - e.meetFirstBatch = false +func (p *cteProducer) reset() { + p.curIter = 0 + p.hashTbl = nil + + p.opened = false + p.produced = false + p.closed = false } -func (e *CTEExec) reopenTbls() (err error) { - if e.isDistinct { - e.hashTbl = newConcurrentMapHashTable() +func (p *cteProducer) reopenTbls() (err error) { + if p.isDistinct { + p.hashTbl = newConcurrentMapHashTable() } - if err := e.resTbl.Reopen(); err != nil { + if err := p.resTbl.Reopen(); err != nil { return err } - return e.iterInTbl.Reopen() + return p.iterInTbl.Reopen() } // Check if tbl meets the requirement of limit. -func (e *CTEExec) limitDone(tbl cteutil.Storage) bool { - return e.hasLimit && uint64(tbl.NumRows()) >= e.limitEnd +func (p *cteProducer) limitDone(tbl cteutil.Storage) bool { + return p.hasLimit && uint64(tbl.NumRows()) >= p.limitEnd } func setupCTEStorageTracker(tbl cteutil.Storage, ctx sessionctx.Context, parentMemTracker *memory.Tracker, @@ -444,11 +674,11 @@ func setupCTEStorageTracker(tbl cteutil.Storage, ctx sessionctx.Context, parentM return actionSpill } -func (e *CTEExec) tryDedupAndAdd(chk *chunk.Chunk, +func (p *cteProducer) tryDedupAndAdd(chk *chunk.Chunk, storage cteutil.Storage, hashTbl baseHashTable) (res *chunk.Chunk, err error) { - if e.isDistinct { - if chk, err = e.deduplicate(chk, storage, hashTbl); err != nil { + if p.isDistinct { + if chk, err = p.deduplicate(chk, storage, hashTbl); err != nil { return nil, err } } @@ -457,12 +687,12 @@ func (e *CTEExec) tryDedupAndAdd(chk *chunk.Chunk, // Compute hash values in chk and put it in hCtx.hashVals. // Use the returned sel to choose the computed hash values. -func (e *CTEExec) computeChunkHash(chk *chunk.Chunk) (sel []int, err error) { +func (p *cteProducer) computeChunkHash(chk *chunk.Chunk) (sel []int, err error) { numRows := chk.NumRows() - e.hCtx.initHash(numRows) + p.hCtx.initHash(numRows) // Continue to reset to make sure all hasher is new. - for i := numRows; i < len(e.hCtx.hashVals); i++ { - e.hCtx.hashVals[i].Reset() + for i := numRows; i < len(p.hCtx.hashVals); i++ { + p.hCtx.hashVals[i].Reset() } sel = chk.Sel() var hashBitMap []bool @@ -474,12 +704,12 @@ func (e *CTEExec) computeChunkHash(chk *chunk.Chunk) (sel []int, err error) { } else { // All rows is selected, sel will be [0....numRows). // e.sel is setup when building executor. - sel = e.sel + sel = p.sel } for i := 0; i < chk.NumCols(); i++ { - if err = codec.HashChunkSelected(e.ctx.GetSessionVars().StmtCtx, e.hCtx.hashVals, - chk, e.hCtx.allTypes[i], i, e.hCtx.buf, e.hCtx.hasNull, + if err = codec.HashChunkSelected(p.ctx.GetSessionVars().StmtCtx, p.hCtx.hashVals, + chk, p.hCtx.allTypes[i], i, p.hCtx.buf, p.hCtx.hasNull, hashBitMap, false); err != nil { return nil, err } @@ -489,7 +719,7 @@ func (e *CTEExec) computeChunkHash(chk *chunk.Chunk) (sel []int, err error) { // Use hashTbl to deduplicate rows, and unique rows will be added to hashTbl. // Duplicated rows are only marked to be removed by sel in Chunk, instead of really deleted. -func (e *CTEExec) deduplicate(chk *chunk.Chunk, +func (p *cteProducer) deduplicate(chk *chunk.Chunk, storage cteutil.Storage, hashTbl baseHashTable) (chkNoDup *chunk.Chunk, err error) { numRows := chk.NumRows() @@ -499,7 +729,7 @@ func (e *CTEExec) deduplicate(chk *chunk.Chunk, // 1. Compute hash values for chunk. chkHashTbl := newConcurrentMapHashTable() - selOri, err := e.computeChunkHash(chk) + selOri, err := p.computeChunkHash(chk) if err != nil { return nil, err } @@ -508,10 +738,10 @@ func (e *CTEExec) deduplicate(chk *chunk.Chunk, // This sel is for filtering rows duplicated in cur chk. selChk := make([]int, 0, numRows) for i := 0; i < numRows; i++ { - key := e.hCtx.hashVals[selOri[i]].Sum64() + key := p.hCtx.hashVals[selOri[i]].Sum64() row := chk.GetRow(i) - hasDup, err := e.checkHasDup(key, row, chk, storage, chkHashTbl) + hasDup, err := p.checkHasDup(key, row, chk, storage, chkHashTbl) if err != nil { return nil, err } @@ -531,10 +761,10 @@ func (e *CTEExec) deduplicate(chk *chunk.Chunk, // This sel is for filtering rows duplicated in cteutil.Storage. selStorage := make([]int, 0, len(selChk)) for i := 0; i < len(selChk); i++ { - key := e.hCtx.hashVals[selChk[i]].Sum64() + key := p.hCtx.hashVals[selChk[i]].Sum64() row := chk.GetRow(i) - hasDup, err := e.checkHasDup(key, row, nil, storage, hashTbl) + hasDup, err := p.checkHasDup(key, row, nil, storage, hashTbl) if err != nil { return nil, err } @@ -555,7 +785,7 @@ func (e *CTEExec) deduplicate(chk *chunk.Chunk, // Use the row's probe key to check if it already exists in chk or storage. // We also need to compare the row's real encoding value to avoid hash collision. -func (e *CTEExec) checkHasDup(probeKey uint64, +func (p *cteProducer) checkHasDup(probeKey uint64, row chunk.Row, curChk *chunk.Chunk, storage cteutil.Storage, @@ -576,9 +806,9 @@ func (e *CTEExec) checkHasDup(probeKey uint64, if err != nil { return false, err } - isEqual, err := codec.EqualChunkRow(e.ctx.GetSessionVars().StmtCtx, - row, e.hCtx.allTypes, e.hCtx.keyColIdx, - matchedRow, e.hCtx.allTypes, e.hCtx.keyColIdx) + isEqual, err := codec.EqualChunkRow(p.ctx.GetSessionVars().StmtCtx, + row, p.hCtx.allTypes, p.hCtx.keyColIdx, + matchedRow, p.hCtx.allTypes, p.hCtx.keyColIdx) if err != nil { return false, err } diff --git a/executor/cte_test.go b/executor/cte_test.go index 5f68f140fed5e..5cdb5aab43f3d 100644 --- a/executor/cte_test.go +++ b/executor/cte_test.go @@ -460,3 +460,57 @@ func TestCTEsInView(t *testing.T) { tk.MustExec("use test1;") tk.MustQuery("select * from test.v;").Check(testkit.Rows("1")) } +<<<<<<< HEAD +======= + +func TestCTEPanic(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("create table t1(c1 int)") + tk.MustExec("insert into t1 values(1), (2), (3)") + + fpPathPrefix := "github.com/pingcap/tidb/executor/" + fp := "testCTESeedPanic" + require.NoError(t, failpoint.Enable(fpPathPrefix+fp, fmt.Sprintf(`panic("%s")`, fp))) + err := tk.QueryToErr("with recursive cte1 as (select c1 from t1 union all select c1 + 1 from cte1 where c1 < 5) select t_alias_1.c1 from cte1 as t_alias_1 inner join cte1 as t_alias_2 on t_alias_1.c1 = t_alias_2.c1 order by c1") + require.Contains(t, err.Error(), fp) + require.NoError(t, failpoint.Disable(fpPathPrefix+fp)) + + fp = "testCTERecursivePanic" + require.NoError(t, failpoint.Enable(fpPathPrefix+fp, fmt.Sprintf(`panic("%s")`, fp))) + err = tk.QueryToErr("with recursive cte1 as (select c1 from t1 union all select c1 + 1 from cte1 where c1 < 5) select t_alias_1.c1 from cte1 as t_alias_1 inner join cte1 as t_alias_2 on t_alias_1.c1 = t_alias_2.c1 order by c1") + require.Contains(t, err.Error(), fp) + require.NoError(t, failpoint.Disable(fpPathPrefix+fp)) +} + +func TestCTEDelSpillFile(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("drop table if exists t1, t2;") + tk.MustExec("create table t1(c1 int, c2 int);") + tk.MustExec("create table t2(c1 int);") + tk.MustExec("set @@cte_max_recursion_depth = 1000000;") + tk.MustExec("set global tidb_mem_oom_action = 'log';") + tk.MustExec("set @@tidb_mem_quota_query = 100;") + tk.MustExec("insert into t2 values(1);") + tk.MustExec("insert into t1 (c1, c2) with recursive cte1 as (select c1 from t2 union select cte1.c1 + 1 from cte1 where cte1.c1 < 100000) select cte1.c1, cte1.c1+1 from cte1;") + require.Nil(t, tk.Session().GetSessionVars().StmtCtx.CTEStorageMap) +} + +func TestCTEShareCorColumn(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("drop table if exists t1, t2;") + tk.MustExec("create table t1(c1 int, c2 varchar(100));") + tk.MustExec("insert into t1 values(1, '2020-10-10');") + tk.MustExec("create table t2(c1 int, c2 date);") + tk.MustExec("insert into t2 values(1, '2020-10-10');") + for i := 0; i < 100; i++ { + tk.MustQuery("with cte1 as (select t1.c1, (select t2.c2 from t2 where t2.c2 = str_to_date(t1.c2, '%Y-%m-%d')) from t1 inner join t2 on t1.c1 = t2.c1) select /*+ hash_join_build(alias1) */ * from cte1 alias1 inner join cte1 alias2 on alias1.c1 = alias2.c1;").Check(testkit.Rows("1 2020-10-10 1 2020-10-10")) + tk.MustQuery("with cte1 as (select t1.c1, (select t2.c2 from t2 where t2.c2 = str_to_date(t1.c2, '%Y-%m-%d')) from t1 inner join t2 on t1.c1 = t2.c1) select /*+ hash_join_build(alias2) */ * from cte1 alias1 inner join cte1 alias2 on alias1.c1 = alias2.c1;").Check(testkit.Rows("1 2020-10-10 1 2020-10-10")) + } +} +>>>>>>> cfef1b05ccd (executor: add CTEProducer that shared by all CTEExec (#44643))