diff --git a/executor/builder.go b/executor/builder.go index eae7d7f0e435c..14b415cf5419c 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -105,6 +105,7 @@ type executorBuilder struct { type CTEStorages struct { ResTbl cteutil.Storage IterInTbl cteutil.Storage + Producer *cteProducer } func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, snapshotTS uint64, isStaleness bool, replicaReadScope string) *executorBuilder { @@ -4664,33 +4665,39 @@ func (b *executorBuilder) buildTableSample(v *plannercore.PhysicalTableSample) * } func (b *executorBuilder) buildCTE(v *plannercore.PhysicalCTE) Executor { - // 1. Build seedPlan. if b.Ti != nil { b.Ti.UseNonRecursive = true } - seedExec := b.build(v.SeedPlan) - if b.err != nil { - return nil + if v.RecurPlan != nil && b.Ti != nil { + b.Ti.UseRecursive = true } - // 2. Build tables to store intermediate results. - chkSize := b.ctx.GetSessionVars().MaxChunkSize - tps := seedExec.base().retFieldTypes - // iterOutTbl will be constructed in CTEExec.Open(). - var resTbl cteutil.Storage - var iterInTbl cteutil.Storage - storageMap, ok := b.ctx.GetSessionVars().StmtCtx.CTEStorageMap.(map[int]*CTEStorages) if !ok { b.err = errors.New("type assertion for CTEStorageMap failed") return nil } + + chkSize := b.ctx.GetSessionVars().MaxChunkSize + // iterOutTbl will be constructed in CTEExec.Open(). + var resTbl cteutil.Storage + var iterInTbl cteutil.Storage + var producer *cteProducer storages, ok := storageMap[v.CTE.IDForStorage] if ok { // Storage already setup. resTbl = storages.ResTbl iterInTbl = storages.IterInTbl + producer = storages.Producer } else { + // Build seed part. + seedExec := b.build(v.SeedPlan) + if b.err != nil { + return nil + } + + // Setup storages. + tps := seedExec.base().retFieldTypes resTbl = cteutil.NewStorageRowContainer(tps, chkSize) if err := resTbl.OpenAndRef(); err != nil { b.err = err @@ -4702,38 +4709,39 @@ func (b *executorBuilder) buildCTE(v *plannercore.PhysicalCTE) Executor { return nil } storageMap[v.CTE.IDForStorage] = &CTEStorages{ResTbl: resTbl, IterInTbl: iterInTbl} - } - // 3. Build recursive part. - if v.RecurPlan != nil && b.Ti != nil { - b.Ti.UseRecursive = true - } - recursiveExec := b.build(v.RecurPlan) - if b.err != nil { - return nil - } + // Build recursive part. + recursiveExec := b.build(v.RecurPlan) + if b.err != nil { + return nil + } + var sel []int + if v.CTE.IsDistinct { + sel = make([]int, chkSize) + for i := 0; i < chkSize; i++ { + sel[i] = i + } + } - var sel []int - if v.CTE.IsDistinct { - sel = make([]int, chkSize) - for i := 0; i < chkSize; i++ { - sel[i] = i + producer = &cteProducer{ + ctx: b.ctx, + seedExec: seedExec, + recursiveExec: recursiveExec, + resTbl: resTbl, + iterInTbl: iterInTbl, + isDistinct: v.CTE.IsDistinct, + sel: sel, + hasLimit: v.CTE.HasLimit, + limitBeg: v.CTE.LimitBeg, + limitEnd: v.CTE.LimitEnd, + isInApply: v.CTE.IsInApply, } + storageMap[v.CTE.IDForStorage].Producer = producer } return &CTEExec{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), - seedExec: seedExec, - recursiveExec: recursiveExec, - resTbl: resTbl, - iterInTbl: iterInTbl, - chkIdx: 0, - isDistinct: v.CTE.IsDistinct, - sel: sel, - hasLimit: v.CTE.HasLimit, - limitBeg: v.CTE.LimitBeg, - limitEnd: v.CTE.LimitEnd, - isInApply: v.CTE.IsInApply, + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), + producer: producer, } } diff --git a/executor/cte.go b/executor/cte.go index 63b371fc43885..23aaca3e6235d 100644 --- a/executor/cte.go +++ b/executor/cte.go @@ -41,25 +41,94 @@ var _ Executor = &CTEExec{} // which will be the input for new iteration. // At the end of each iteration, data in `iterOutTbl` will also be added into `resTbl`. // `resTbl` stores data of all iteration. -// +----------+ -// write |iterOutTbl| -// CTEExec ------------------->| | -// | +----+-----+ -// ------------- | write -// | | v -// other op other op +----------+ -// (seed) (recursive) | resTbl | -// ^ | | -// | +----------+ -// CTETableReaderExec -// ^ -// | read +----------+ -// +---------------+iterInTbl | -// | | -// +----------+ +/* + +----------+ + write |iterOutTbl| + CTEExec ------------------->| | + | +----+-----+ + ------------- | write + | | v + other op other op +----------+ + (seed) (recursive) | resTbl | + ^ | | + | +----------+ + CTETableReaderExec + ^ + | read +----------+ + +---------------+iterInTbl | + | | + +----------+ +*/ type CTEExec struct { baseExecutor + chkIdx int + producer *cteProducer + + // limit in recursive CTE. + cursor uint64 + meetFirstBatch bool +} + +// Open implements the Executor interface. +func (e *CTEExec) Open(ctx context.Context) (err error) { + e.reset() + if err := e.baseExecutor.Open(ctx); err != nil { + return err + } + + e.producer.resTbl.Lock() + defer e.producer.resTbl.Unlock() + + if e.producer.isInApply { + e.producer.reset() + } + if !e.producer.opened { + if err = e.producer.openProducer(ctx, e); err != nil { + return err + } + } + return nil +} + +// Next implements the Executor interface. +func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { + e.producer.resTbl.Lock() + defer e.producer.resTbl.Unlock() + if !e.producer.resTbl.Done() { + if err = e.producer.produce(ctx, e); err != nil { + return err + } + } + return e.producer.getChunk(ctx, e, req) +} + +// Close implements the Executor interface. +func (e *CTEExec) Close() (err error) { + e.producer.resTbl.Lock() + if !e.producer.closed { + err = e.producer.closeProducer() + } + e.producer.resTbl.Unlock() + if err != nil { + return err + } + return e.baseExecutor.Close() +} + +func (e *CTEExec) reset() { + e.chkIdx = 0 + e.cursor = 0 + e.meetFirstBatch = false +} + +type cteProducer struct { + opened bool + produced bool + closed bool + + ctx sessionctx.Context + seedExec Executor recursiveExec Executor @@ -71,9 +140,6 @@ type CTEExec struct { hashTbl baseHashTable - // Index of chunk to read from `resTbl`. - chkIdx int - // UNION ALL or UNION DISTINCT. isDistinct bool curIter int @@ -81,11 +147,9 @@ type CTEExec struct { sel []int // Limit related info. - hasLimit bool - limitBeg uint64 - limitEnd uint64 - cursor uint64 - meetFirstBatch bool + hasLimit bool + limitBeg uint64 + limitEnd uint64 memTracker *memory.Tracker diskTracker *disk.Tracker @@ -96,98 +160,80 @@ type CTEExec struct { isInApply bool } -// Open implements the Executor interface. -func (e *CTEExec) Open(ctx context.Context) (err error) { - e.reset() - if err := e.baseExecutor.Open(ctx); err != nil { - return err - } - - if e.seedExec == nil { +func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err error) { + if p.seedExec == nil { return errors.New("seedExec for CTEExec is nil") } - if err = e.seedExec.Open(ctx); err != nil { + if err = p.seedExec.Open(ctx); err != nil { return err } - e.memTracker = memory.NewTracker(e.id, -1) - e.diskTracker = disk.NewTracker(e.id, -1) - e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) - e.diskTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.DiskTracker) + p.memTracker = memory.NewTracker(cteExec.id, -1) + p.diskTracker = disk.NewTracker(cteExec.id, -1) + p.memTracker.AttachTo(p.ctx.GetSessionVars().StmtCtx.MemTracker) + p.diskTracker.AttachTo(p.ctx.GetSessionVars().StmtCtx.DiskTracker) - if e.recursiveExec != nil { - if err = e.recursiveExec.Open(ctx); err != nil { + if p.recursiveExec != nil { + if err = p.recursiveExec.Open(ctx); err != nil { return err } // For non-recursive CTE, the result will be put into resTbl directly. // So no need to build iterOutTbl. // Construct iterOutTbl in Open() instead of buildCTE(), because its destruct is in Close(). - recursiveTypes := e.recursiveExec.base().retFieldTypes - e.iterOutTbl = cteutil.NewStorageRowContainer(recursiveTypes, e.maxChunkSize) - if err = e.iterOutTbl.OpenAndRef(); err != nil { + recursiveTypes := p.recursiveExec.base().retFieldTypes + p.iterOutTbl = cteutil.NewStorageRowContainer(recursiveTypes, cteExec.maxChunkSize) + if err = p.iterOutTbl.OpenAndRef(); err != nil { return err } } - if e.isDistinct { - e.hashTbl = newConcurrentMapHashTable() - e.hCtx = &hashContext{ - allTypes: e.base().retFieldTypes, + if p.isDistinct { + p.hashTbl = newConcurrentMapHashTable() + p.hCtx = &hashContext{ + allTypes: cteExec.base().retFieldTypes, } // We use all columns to compute hash. - e.hCtx.keyColIdx = make([]int, len(e.hCtx.allTypes)) - for i := range e.hCtx.keyColIdx { - e.hCtx.keyColIdx[i] = i + p.hCtx.keyColIdx = make([]int, len(p.hCtx.allTypes)) + for i := range p.hCtx.keyColIdx { + p.hCtx.keyColIdx[i] = i } } + p.opened = true return nil } -// Next implements the Executor interface. -func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { - req.Reset() - e.resTbl.Lock() - defer e.resTbl.Unlock() - if !e.resTbl.Done() { - resAction := setupCTEStorageTracker(e.resTbl, e.ctx, e.memTracker, e.diskTracker) - iterInAction := setupCTEStorageTracker(e.iterInTbl, e.ctx, e.memTracker, e.diskTracker) - var iterOutAction *chunk.SpillDiskAction - if e.iterOutTbl != nil { - iterOutAction = setupCTEStorageTracker(e.iterOutTbl, e.ctx, e.memTracker, e.diskTracker) - } - - failpoint.Inject("testCTEStorageSpill", func(val failpoint.Value) { - if val.(bool) && config.GetGlobalConfig().OOMUseTmpStorage { - defer resAction.WaitForTest() - defer iterInAction.WaitForTest() - if iterOutAction != nil { - defer iterOutAction.WaitForTest() - } - } - }) - - if err = e.computeSeedPart(ctx); err != nil { - // Don't put it in defer. - // Because it should be called only when the filling process is not completed. - if err1 := e.reopenTbls(); err1 != nil { - return err1 - } +func (p *cteProducer) closeProducer() (err error) { + if err = p.seedExec.Close(); err != nil { + return err + } + if p.recursiveExec != nil { + if err = p.recursiveExec.Close(); err != nil { return err } - if err = e.computeRecursivePart(ctx); err != nil { - if err1 := e.reopenTbls(); err1 != nil { - return err1 + // `iterInTbl` and `resTbl` are shared by multiple operators, + // so will be closed when the SQL finishes. + if p.iterOutTbl != nil { + if err = p.iterOutTbl.DerefAndClose(); err != nil { + return err } + } + } + p.closed = true + if p.isInApply { + if err = p.reopenTbls(); err != nil { return err } - e.resTbl.SetDone() } + return nil +} - if e.hasLimit { - return e.nextChunkLimit(req) +func (p *cteProducer) getChunk(ctx context.Context, cteExec *CTEExec, req *chunk.Chunk) (err error) { + req.Reset() + if p.hasLimit { + return p.nextChunkLimit(cteExec, req) } - if e.chkIdx < e.resTbl.NumChunks() { - res, err := e.resTbl.GetChunk(e.chkIdx) + if cteExec.chkIdx < p.resTbl.NumChunks() { + res, err := p.resTbl.GetChunk(cteExec.chkIdx) if err != nil { return err } @@ -195,60 +241,110 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { // Also we ignore copying rows not selected, because some operators like Projection // doesn't support swap column if chunk.sel is no nil. req.SwapColumns(res.CopyConstructSel()) - e.chkIdx++ + cteExec.chkIdx++ } return nil } -// Close implements the Executor interface. -func (e *CTEExec) Close() (err error) { - e.reset() - if err = e.seedExec.Close(); err != nil { - return err - } - if e.recursiveExec != nil { - if err = e.recursiveExec.Close(); err != nil { - return err - } - // `iterInTbl` and `resTbl` are shared by multiple operators, - // so will be closed when the SQL finishes. - if e.iterOutTbl != nil { - if err = e.iterOutTbl.DerefAndClose(); err != nil { +func (p *cteProducer) nextChunkLimit(cteExec *CTEExec, req *chunk.Chunk) error { + if !cteExec.meetFirstBatch { + for cteExec.chkIdx < p.resTbl.NumChunks() { + res, err := p.resTbl.GetChunk(cteExec.chkIdx) + if err != nil { return err } + cteExec.chkIdx++ + numRows := uint64(res.NumRows()) + if newCursor := cteExec.cursor + numRows; newCursor >= p.limitBeg { + cteExec.meetFirstBatch = true + begInChk, endInChk := p.limitBeg-cteExec.cursor, numRows + if newCursor > p.limitEnd { + endInChk = p.limitEnd - cteExec.cursor + } + cteExec.cursor += endInChk + if begInChk == endInChk { + break + } + tmpChk := res.CopyConstructSel() + req.Append(tmpChk, int(begInChk), int(endInChk)) + return nil + } + cteExec.cursor += numRows } } - if e.isInApply { - if err = e.reopenTbls(); err != nil { + if cteExec.chkIdx < p.resTbl.NumChunks() && cteExec.cursor < p.limitEnd { + res, err := p.resTbl.GetChunk(cteExec.chkIdx) + if err != nil { return err } + cteExec.chkIdx++ + numRows := uint64(res.NumRows()) + if cteExec.cursor+numRows > p.limitEnd { + numRows = p.limitEnd - cteExec.cursor + req.Append(res.CopyConstructSel(), 0, int(numRows)) + } else { + req.SwapColumns(res.CopyConstructSel()) + } + cteExec.cursor += numRows } + return nil +} - return e.baseExecutor.Close() +func (p *cteProducer) produce(ctx context.Context, cteExec *CTEExec) (err error) { + if p.resTbl.Error() != nil { + return p.resTbl.Error() + } + resAction := setupCTEStorageTracker(p.resTbl, cteExec.ctx, p.memTracker, p.diskTracker) + iterInAction := setupCTEStorageTracker(p.iterInTbl, cteExec.ctx, p.memTracker, p.diskTracker) + var iterOutAction *chunk.SpillDiskAction + if p.iterOutTbl != nil { + iterOutAction = setupCTEStorageTracker(p.iterOutTbl, cteExec.ctx, p.memTracker, p.diskTracker) + } + + failpoint.Inject("testCTEStorageSpill", func(val failpoint.Value) { + if val.(bool) && config.GetGlobalConfig().OOMUseTmpStorage { + defer resAction.WaitForTest() + defer iterInAction.WaitForTest() + if iterOutAction != nil { + defer iterOutAction.WaitForTest() + } + } + }) + + if err = p.computeSeedPart(ctx); err != nil { + p.resTbl.SetError(err) + return err + } + if err = p.computeRecursivePart(ctx); err != nil { + p.resTbl.SetError(err) + return err + } + p.resTbl.SetDone() + return nil } -func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) { +func (p *cteProducer) computeSeedPart(ctx context.Context) (err error) { defer func() { if r := recover(); r != nil && err == nil { err = errors.Errorf("%v", r) } }() failpoint.Inject("testCTESeedPanic", nil) - e.curIter = 0 - e.iterInTbl.SetIter(e.curIter) + p.curIter = 0 + p.iterInTbl.SetIter(p.curIter) chks := make([]*chunk.Chunk, 0, 10) for { - if e.limitDone(e.iterInTbl) { + if p.limitDone(p.iterInTbl) { break } - chk := newFirstChunk(e.seedExec) - if err = Next(ctx, e.seedExec, chk); err != nil { + chk := newFirstChunk(p.seedExec) + if err = Next(ctx, p.seedExec, chk); err != nil { return } if chk.NumRows() == 0 { break } - if chk, err = e.tryDedupAndAdd(chk, e.iterInTbl, e.hashTbl); err != nil { + if chk, err = p.tryDedupAndAdd(chk, p.iterInTbl, p.hashTbl); err != nil { return } chks = append(chks, chk) @@ -256,66 +352,66 @@ func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) { // Initial resTbl is empty, so no need to deduplicate chk using resTbl. // Just adding is ok. for _, chk := range chks { - if err = e.resTbl.Add(chk); err != nil { + if err = p.resTbl.Add(chk); err != nil { return } } - e.curIter++ - e.iterInTbl.SetIter(e.curIter) + p.curIter++ + p.iterInTbl.SetIter(p.curIter) return } -func (e *CTEExec) computeRecursivePart(ctx context.Context) (err error) { +func (p *cteProducer) computeRecursivePart(ctx context.Context) (err error) { defer func() { if r := recover(); r != nil && err == nil { err = errors.Errorf("%v", r) } }() failpoint.Inject("testCTERecursivePanic", nil) - if e.recursiveExec == nil || e.iterInTbl.NumChunks() == 0 { + if p.recursiveExec == nil || p.iterInTbl.NumChunks() == 0 { return } - if e.curIter > e.ctx.GetSessionVars().CTEMaxRecursionDepth { - return ErrCTEMaxRecursionDepth.GenWithStackByArgs(e.curIter) + if p.curIter > p.ctx.GetSessionVars().CTEMaxRecursionDepth { + return ErrCTEMaxRecursionDepth.GenWithStackByArgs(p.curIter) } - if e.limitDone(e.resTbl) { + if p.limitDone(p.resTbl) { return } for { - chk := newFirstChunk(e.recursiveExec) - if err = Next(ctx, e.recursiveExec, chk); err != nil { + chk := newFirstChunk(p.recursiveExec) + if err = Next(ctx, p.recursiveExec, chk); err != nil { return } if chk.NumRows() == 0 { - if err = e.setupTblsForNewIteration(); err != nil { + if err = p.setupTblsForNewIteration(); err != nil { return } - if e.limitDone(e.resTbl) { + if p.limitDone(p.resTbl) { break } - if e.iterInTbl.NumChunks() == 0 { + if p.iterInTbl.NumChunks() == 0 { break } // Next iteration begins. Need use iterOutTbl as input of next iteration. - e.curIter++ - e.iterInTbl.SetIter(e.curIter) - if e.curIter > e.ctx.GetSessionVars().CTEMaxRecursionDepth { - return ErrCTEMaxRecursionDepth.GenWithStackByArgs(e.curIter) + p.curIter++ + p.iterInTbl.SetIter(p.curIter) + if p.curIter > p.ctx.GetSessionVars().CTEMaxRecursionDepth { + return ErrCTEMaxRecursionDepth.GenWithStackByArgs(p.curIter) } // Make sure iterInTbl is setup before Close/Open, // because some executors will read iterInTbl in Open() (like IndexLookupJoin). - if err = e.recursiveExec.Close(); err != nil { + if err = p.recursiveExec.Close(); err != nil { return } - if err = e.recursiveExec.Open(ctx); err != nil { + if err = p.recursiveExec.Open(ctx); err != nil { return } } else { - if err = e.iterOutTbl.Add(chk); err != nil { + if err = p.iterOutTbl.Add(chk); err != nil { return } } @@ -323,67 +419,22 @@ func (e *CTEExec) computeRecursivePart(ctx context.Context) (err error) { return } -// Get next chunk from resTbl for limit. -func (e *CTEExec) nextChunkLimit(req *chunk.Chunk) error { - if !e.meetFirstBatch { - for e.chkIdx < e.resTbl.NumChunks() { - res, err := e.resTbl.GetChunk(e.chkIdx) - if err != nil { - return err - } - e.chkIdx++ - numRows := uint64(res.NumRows()) - if newCursor := e.cursor + numRows; newCursor >= e.limitBeg { - e.meetFirstBatch = true - begInChk, endInChk := e.limitBeg-e.cursor, numRows - if newCursor > e.limitEnd { - endInChk = e.limitEnd - e.cursor - } - e.cursor += endInChk - if begInChk == endInChk { - break - } - tmpChk := res.CopyConstructSel() - req.Append(tmpChk, int(begInChk), int(endInChk)) - return nil - } - e.cursor += numRows - } - } - if e.chkIdx < e.resTbl.NumChunks() && e.cursor < e.limitEnd { - res, err := e.resTbl.GetChunk(e.chkIdx) - if err != nil { - return err - } - e.chkIdx++ - numRows := uint64(res.NumRows()) - if e.cursor+numRows > e.limitEnd { - numRows = e.limitEnd - e.cursor - req.Append(res.CopyConstructSel(), 0, int(numRows)) - } else { - req.SwapColumns(res.CopyConstructSel()) - } - e.cursor += numRows - } - return nil -} - -func (e *CTEExec) setupTblsForNewIteration() (err error) { - num := e.iterOutTbl.NumChunks() +func (p *cteProducer) setupTblsForNewIteration() (err error) { + num := p.iterOutTbl.NumChunks() chks := make([]*chunk.Chunk, 0, num) // Setup resTbl's data. for i := 0; i < num; i++ { - chk, err := e.iterOutTbl.GetChunk(i) + chk, err := p.iterOutTbl.GetChunk(i) if err != nil { return err } // Data should be copied in UNION DISTINCT. // Because deduplicate() will change data in iterOutTbl, // which will cause panic when spilling data into disk concurrently. - if e.isDistinct { + if p.isDistinct { chk = chk.CopyConstruct() } - chk, err = e.tryDedupAndAdd(chk, e.resTbl, e.hashTbl) + chk, err = p.tryDedupAndAdd(chk, p.resTbl, p.hashTbl) if err != nil { return err } @@ -391,47 +442,48 @@ func (e *CTEExec) setupTblsForNewIteration() (err error) { } // Setup new iteration data in iterInTbl. - if err = e.iterInTbl.Reopen(); err != nil { + if err = p.iterInTbl.Reopen(); err != nil { return err } - if e.isDistinct { + if p.isDistinct { // Already deduplicated by resTbl, adding directly is ok. for _, chk := range chks { - if err = e.iterInTbl.Add(chk); err != nil { + if err = p.iterInTbl.Add(chk); err != nil { return err } } } else { - if err = e.iterInTbl.SwapData(e.iterOutTbl); err != nil { + if err = p.iterInTbl.SwapData(p.iterOutTbl); err != nil { return err } } // Clear data in iterOutTbl. - return e.iterOutTbl.Reopen() + return p.iterOutTbl.Reopen() } -func (e *CTEExec) reset() { - e.curIter = 0 - e.chkIdx = 0 - e.hashTbl = nil - e.cursor = 0 - e.meetFirstBatch = false +func (p *cteProducer) reset() { + p.curIter = 0 + p.hashTbl = nil + + p.opened = false + p.produced = false + p.closed = false } -func (e *CTEExec) reopenTbls() (err error) { - if e.isDistinct { - e.hashTbl = newConcurrentMapHashTable() +func (p *cteProducer) reopenTbls() (err error) { + if p.isDistinct { + p.hashTbl = newConcurrentMapHashTable() } - if err := e.resTbl.Reopen(); err != nil { + if err := p.resTbl.Reopen(); err != nil { return err } - return e.iterInTbl.Reopen() + return p.iterInTbl.Reopen() } // Check if tbl meets the requirement of limit. -func (e *CTEExec) limitDone(tbl cteutil.Storage) bool { - return e.hasLimit && uint64(tbl.NumRows()) >= e.limitEnd +func (p *cteProducer) limitDone(tbl cteutil.Storage) bool { + return p.hasLimit && uint64(tbl.NumRows()) >= p.limitEnd } func setupCTEStorageTracker(tbl cteutil.Storage, ctx sessionctx.Context, parentMemTracker *memory.Tracker, @@ -456,11 +508,11 @@ func setupCTEStorageTracker(tbl cteutil.Storage, ctx sessionctx.Context, parentM return actionSpill } -func (e *CTEExec) tryDedupAndAdd(chk *chunk.Chunk, +func (p *cteProducer) tryDedupAndAdd(chk *chunk.Chunk, storage cteutil.Storage, hashTbl baseHashTable) (res *chunk.Chunk, err error) { - if e.isDistinct { - if chk, err = e.deduplicate(chk, storage, hashTbl); err != nil { + if p.isDistinct { + if chk, err = p.deduplicate(chk, storage, hashTbl); err != nil { return nil, err } } @@ -469,12 +521,12 @@ func (e *CTEExec) tryDedupAndAdd(chk *chunk.Chunk, // Compute hash values in chk and put it in hCtx.hashVals. // Use the returned sel to choose the computed hash values. -func (e *CTEExec) computeChunkHash(chk *chunk.Chunk) (sel []int, err error) { +func (p *cteProducer) computeChunkHash(chk *chunk.Chunk) (sel []int, err error) { numRows := chk.NumRows() - e.hCtx.initHash(numRows) + p.hCtx.initHash(numRows) // Continue to reset to make sure all hasher is new. - for i := numRows; i < len(e.hCtx.hashVals); i++ { - e.hCtx.hashVals[i].Reset() + for i := numRows; i < len(p.hCtx.hashVals); i++ { + p.hCtx.hashVals[i].Reset() } sel = chk.Sel() var hashBitMap []bool @@ -486,12 +538,12 @@ func (e *CTEExec) computeChunkHash(chk *chunk.Chunk) (sel []int, err error) { } else { // All rows is selected, sel will be [0....numRows). // e.sel is setup when building executor. - sel = e.sel + sel = p.sel } for i := 0; i < chk.NumCols(); i++ { - if err = codec.HashChunkSelected(e.ctx.GetSessionVars().StmtCtx, e.hCtx.hashVals, - chk, e.hCtx.allTypes[i], i, e.hCtx.buf, e.hCtx.hasNull, + if err = codec.HashChunkSelected(p.ctx.GetSessionVars().StmtCtx, p.hCtx.hashVals, + chk, p.hCtx.allTypes[i], i, p.hCtx.buf, p.hCtx.hasNull, hashBitMap, false); err != nil { return nil, err } @@ -501,7 +553,7 @@ func (e *CTEExec) computeChunkHash(chk *chunk.Chunk) (sel []int, err error) { // Use hashTbl to deduplicate rows, and unique rows will be added to hashTbl. // Duplicated rows are only marked to be removed by sel in Chunk, instead of really deleted. -func (e *CTEExec) deduplicate(chk *chunk.Chunk, +func (p *cteProducer) deduplicate(chk *chunk.Chunk, storage cteutil.Storage, hashTbl baseHashTable) (chkNoDup *chunk.Chunk, err error) { numRows := chk.NumRows() @@ -511,7 +563,7 @@ func (e *CTEExec) deduplicate(chk *chunk.Chunk, // 1. Compute hash values for chunk. chkHashTbl := newConcurrentMapHashTable() - selOri, err := e.computeChunkHash(chk) + selOri, err := p.computeChunkHash(chk) if err != nil { return nil, err } @@ -520,10 +572,10 @@ func (e *CTEExec) deduplicate(chk *chunk.Chunk, // This sel is for filtering rows duplicated in cur chk. selChk := make([]int, 0, numRows) for i := 0; i < numRows; i++ { - key := e.hCtx.hashVals[selOri[i]].Sum64() + key := p.hCtx.hashVals[selOri[i]].Sum64() row := chk.GetRow(i) - hasDup, err := e.checkHasDup(key, row, chk, storage, chkHashTbl) + hasDup, err := p.checkHasDup(key, row, chk, storage, chkHashTbl) if err != nil { return nil, err } @@ -543,10 +595,10 @@ func (e *CTEExec) deduplicate(chk *chunk.Chunk, // This sel is for filtering rows duplicated in cteutil.Storage. selStorage := make([]int, 0, len(selChk)) for i := 0; i < len(selChk); i++ { - key := e.hCtx.hashVals[selChk[i]].Sum64() + key := p.hCtx.hashVals[selChk[i]].Sum64() row := chk.GetRow(i) - hasDup, err := e.checkHasDup(key, row, nil, storage, hashTbl) + hasDup, err := p.checkHasDup(key, row, nil, storage, hashTbl) if err != nil { return nil, err } @@ -567,7 +619,7 @@ func (e *CTEExec) deduplicate(chk *chunk.Chunk, // Use the row's probe key to check if it already exists in chk or storage. // We also need to compare the row's real encoding value to avoid hash collision. -func (e *CTEExec) checkHasDup(probeKey uint64, +func (p *cteProducer) checkHasDup(probeKey uint64, row chunk.Row, curChk *chunk.Chunk, storage cteutil.Storage, @@ -588,9 +640,9 @@ func (e *CTEExec) checkHasDup(probeKey uint64, if err != nil { return false, err } - isEqual, err := codec.EqualChunkRow(e.ctx.GetSessionVars().StmtCtx, - row, e.hCtx.allTypes, e.hCtx.keyColIdx, - matchedRow, e.hCtx.allTypes, e.hCtx.keyColIdx) + isEqual, err := codec.EqualChunkRow(p.ctx.GetSessionVars().StmtCtx, + row, p.hCtx.allTypes, p.hCtx.keyColIdx, + matchedRow, p.hCtx.allTypes, p.hCtx.keyColIdx) if err != nil { return false, err } diff --git a/executor/cte_test.go b/executor/cte_test.go index 925b24f31144c..5d9743526d85c 100644 --- a/executor/cte_test.go +++ b/executor/cte_test.go @@ -484,6 +484,23 @@ func TestCTEPanic(t *testing.T) { require.NoError(t, failpoint.Disable(fpPathPrefix+fp)) } +func TestCTEShareCorColumn(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("drop table if exists t1, t2;") + + tk.MustExec("create table t1(c1 int, c2 varchar(100));") + tk.MustExec("insert into t1 values(1, '2020-10-10');") + tk.MustExec("create table t2(c1 int, c2 date);") + tk.MustExec("insert into t2 values(1, '2020-10-10');") + for i := 0; i < 100; i++ { + tk.MustQuery("with cte1 as (select t1.c1, (select t2.c2 from t2 where t2.c2 = str_to_date(t1.c2, '%Y-%m-%d')) from t1 inner join t2 on t1.c1 = t2.c1) select /*+ hash_join_build(alias1) */ * from cte1 alias1 inner join cte1 alias2 on alias1.c1 = alias2.c1;").Check(testkit.Rows("1 2020-10-10 1 2020-10-10")) + tk.MustQuery("with cte1 as (select t1.c1, (select t2.c2 from t2 where t2.c2 = str_to_date(t1.c2, '%Y-%m-%d')) from t1 inner join t2 on t1.c1 = t2.c1) select /*+ hash_join_build(alias2) */ * from cte1 alias1 inner join cte1 alias2 on alias1.c1 = alias2.c1;").Check(testkit.Rows("1 2020-10-10 1 2020-10-10")) + } +} + func TestCTEDelSpillFile(t *testing.T) { oriGlobalConfig := config.GetGlobalConfig() config.UpdateGlobal(func(conf *config.Config) { @@ -497,6 +514,7 @@ func TestCTEDelSpillFile(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test;") tk.MustExec("drop table if exists t1, t2;") + tk.MustExec("create table t1(c1 int, c2 int);") tk.MustExec("create table t2(c1 int);") tk.MustExec("set @@cte_max_recursion_depth = 1000000;") diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 31a97b484e92f..ae6d5ef8015e9 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -819,10 +819,10 @@ func (b *PlanBuilder) buildJoin(ctx context.Context, joinNode *ast.Join) (Logica // on the "USING" clause. // // According to the standard SQL, columns are ordered in the following way: -// 1. coalesced common columns of "leftPlan" and "rightPlan", in the order they -// appears in "leftPlan". -// 2. the rest columns in "leftPlan", in the order they appears in "leftPlan". -// 3. the rest columns in "rightPlan", in the order they appears in "rightPlan". +// 1. coalesced common columns of "leftPlan" and "rightPlan", in the order they +// appears in "leftPlan". +// 2. the rest columns in "leftPlan", in the order they appears in "leftPlan". +// 3. the rest columns in "rightPlan", in the order they appears in "rightPlan". func (b *PlanBuilder) buildUsingClause(p *LogicalJoin, leftPlan, rightPlan LogicalPlan, join *ast.Join) error { filter := make(map[string]bool, len(join.Using)) for _, col := range join.Using { @@ -843,9 +843,10 @@ func (b *PlanBuilder) buildUsingClause(p *LogicalJoin, leftPlan, rightPlan Logic // buildNaturalJoin builds natural join output schema. It finds out all the common columns // then using the same mechanism as buildUsingClause to eliminate redundant columns and build join conditions. // According to standard SQL, producing this display order: -// All the common columns -// Every column in the first (left) table that is not a common column -// Every column in the second (right) table that is not a common column +// +// All the common columns +// Every column in the first (left) table that is not a common column +// Every column in the second (right) table that is not a common column func (b *PlanBuilder) buildNaturalJoin(p *LogicalJoin, leftPlan, rightPlan LogicalPlan, join *ast.Join) error { err := b.coalesceCommonColumns(p, leftPlan, rightPlan, join.Tp, nil) if err != nil { @@ -1668,7 +1669,9 @@ func (b *PlanBuilder) buildUnion(ctx context.Context, selects []LogicalPlan, aft // divideUnionSelectPlans resolves union's select stmts to logical plans. // and divide result plans into "union-distinct" and "union-all" parts. // divide rule ref: -// https://dev.mysql.com/doc/refman/5.7/en/union.html +// +// https://dev.mysql.com/doc/refman/5.7/en/union.html +// // "Mixed UNION types are treated such that a DISTINCT union overrides any ALL union to its left." func (b *PlanBuilder) divideUnionSelectPlans(ctx context.Context, selects []LogicalPlan, setOprTypes []*ast.SetOprType) (distinctSelects []LogicalPlan, allSelects []LogicalPlan, err error) { firstUnionAllIdx := 0 @@ -4483,12 +4486,12 @@ func (b *PlanBuilder) buildProjUponView(ctx context.Context, dbName model.CIStr, // every row from outerPlan and the whole innerPlan. func (b *PlanBuilder) buildApplyWithJoinType(outerPlan, innerPlan LogicalPlan, tp JoinType) LogicalPlan { b.optFlag = b.optFlag | flagPredicatePushDown | flagBuildKeyInfo | flagDecorrelate - setIsInApplyForCTE(innerPlan) ap := LogicalApply{LogicalJoin: LogicalJoin{JoinType: tp}}.Init(b.ctx, b.getSelectOffset()) ap.SetChildren(outerPlan, innerPlan) ap.names = make([]*types.FieldName, outerPlan.Schema().Len()+innerPlan.Schema().Len()) copy(ap.names, outerPlan.OutputNames()) ap.SetSchema(expression.MergeSchema(outerPlan.Schema(), innerPlan.Schema())) + setIsInApplyForCTE(innerPlan, ap.Schema()) // Note that, tp can only be LeftOuterJoin or InnerJoin, so we don't consider other outer joins. if tp == LeftOuterJoin { b.optFlag = b.optFlag | flagEliminateOuterJoin @@ -4509,27 +4512,29 @@ func (b *PlanBuilder) buildSemiApply(outerPlan, innerPlan LogicalPlan, condition return nil, err } - setIsInApplyForCTE(innerPlan) + setIsInApplyForCTE(innerPlan, join.Schema()) ap := &LogicalApply{LogicalJoin: *join} ap.tp = plancodec.TypeApply ap.self = ap return ap, nil } -// setIsInApplyForCTE indicates CTE is the in inner side of Apply, +// setIsInApplyForCTE indicates CTE is the in inner side of Apply and correlate. // the storage of cte needs to be reset for each outer row. // It's better to handle this in CTEExec.Close(), but cte storage is closed when SQL is finished. -func setIsInApplyForCTE(p LogicalPlan) { +func setIsInApplyForCTE(p LogicalPlan, apSchema *expression.Schema) { switch x := p.(type) { case *LogicalCTE: - x.cte.IsInApply = true - setIsInApplyForCTE(x.cte.seedPartLogicalPlan) + if len(extractCorColumnsBySchema4LogicalPlan(p, apSchema)) > 0 { + x.cte.IsInApply = true + } + setIsInApplyForCTE(x.cte.seedPartLogicalPlan, apSchema) if x.cte.recursivePartLogicalPlan != nil { - setIsInApplyForCTE(x.cte.recursivePartLogicalPlan) + setIsInApplyForCTE(x.cte.recursivePartLogicalPlan, apSchema) } default: for _, child := range p.Children() { - setIsInApplyForCTE(child) + setIsInApplyForCTE(child, apSchema) } } } diff --git a/util/cteutil/storage.go b/util/cteutil/storage.go index bb2cee08f24d4..dea6fd632e42b 100644 --- a/util/cteutil/storage.go +++ b/util/cteutil/storage.go @@ -30,12 +30,12 @@ var _ Storage = &StorageRC{} // // Common usage as follows: // -// storage.Lock() -// if !storage.Done() { -// fill all data into storage -// } -// storage.UnLock() -// read data from storage +// storage.Lock() +// if !storage.Done() { +// fill all data into storage +// } +// storage.UnLock() +// read data from storage type Storage interface { // If is first called, will open underlying storage. Otherwise will add ref count by one. OpenAndRef() error @@ -77,6 +77,10 @@ type Storage interface { Done() bool SetDone() + // Store error message, so we can return directly. + Error() error + SetError(err error) + // Readers use iter information to determine // whether they need to read data from the beginning. SetIter(iter int) @@ -89,16 +93,14 @@ type Storage interface { // StorageRC implements Storage interface using RowContainer. type StorageRC struct { - mu sync.Mutex - refCnt int + err error + rc *chunk.RowContainer tp []*types.FieldType + refCnt int chkSize int - - begCh chan struct{} - done bool - iter int - - rc *chunk.RowContainer + iter int + mu sync.Mutex + done bool } // NewStorageRowContainer create a new StorageRC. @@ -111,10 +113,9 @@ func (s *StorageRC) OpenAndRef() (err error) { if !s.valid() { s.rc = chunk.NewRowContainer(s.tp, s.chkSize) s.refCnt = 1 - s.begCh = make(chan struct{}) s.iter = 0 } else { - s.refCnt += 1 + s.refCnt++ } return nil } @@ -124,12 +125,13 @@ func (s *StorageRC) DerefAndClose() (err error) { if !s.valid() { return errors.New("Storage not opend yet") } - s.refCnt -= 1 + s.refCnt-- if s.refCnt < 0 { return errors.New("Storage ref count is less than zero") } else if s.refCnt == 0 { s.refCnt = -1 s.done = false + s.err = nil s.iter = 0 if err = s.rc.Close(); err != nil { return err @@ -158,8 +160,8 @@ func (s *StorageRC) Reopen() (err error) { return err } s.iter = 0 - s.begCh = make(chan struct{}) s.done = false + s.err = nil // Create a new RowContainer. // Because some meta infos in old RowContainer are not resetted. // Such as memTracker/actionSpill etc. So we just use a new one. @@ -224,6 +226,16 @@ func (s *StorageRC) SetDone() { s.done = true } +// Error impls Storage Error interface. +func (s *StorageRC) Error() error { + return s.err +} + +// SetError impls Storage SetError interface. +func (s *StorageRC) SetError(err error) { + s.err = err +} + // SetIter impls Storage SetIter interface. func (s *StorageRC) SetIter(iter int) { s.iter = iter