Skip to content

Commit

Permalink
refine close
Browse files Browse the repository at this point in the history
Signed-off-by: guo-shaoge <shaoge1994@163.com>
  • Loading branch information
guo-shaoge committed Jun 20, 2023
1 parent d1f5c0a commit 2c13472
Showing 1 changed file with 17 additions and 24 deletions.
41 changes: 17 additions & 24 deletions executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,42 +82,44 @@ func (e *CTEExec) Open(ctx context.Context) (err error) {
e.producer.resTbl.Lock()
defer e.producer.resTbl.Unlock()

// This check if we need to clear producer.resTbl because the outer row of Apply has changed.
if e.producer.checkAndUpdateCorColHashCode() {
e.producer.reset()
if err = e.producer.reopenTbls(); err != nil {
return err
}
}
if !e.producer.opened {
if err = e.producer.openProducer(ctx, e); err != nil {
return err
}
}
return nil
}

// Next implements the Executor interface.
func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
e.producer.resTbl.Lock()
defer e.producer.resTbl.Unlock()

if !e.producer.resTbl.Done() {
if e.producer.resTbl.Error() != nil {
return e.producer.resTbl.Error()
}

// Normally, openProducer()/produce()/closeProducer() will only be called once,
// unless CTE is the inner side of the Apply operator and there are cor columns for the Apply operator in the cte definition.
// For this situation, producer will open/produce/close once for each outer row of Apply.
if err = e.producer.openProducer(ctx, e); err != nil {
return err
}
if err = e.producer.produce(ctx, e); err != nil {
return err
}
if err = e.producer.closeProducer(); err != nil {
return err
}
}

return e.producer.getChunk(ctx, e, req)
}

// Close implements the Executor interface.
func (e *CTEExec) Close() (err error) {
e.producer.resTbl.Lock()
if !e.producer.closed {
err = e.producer.closeProducer()
}
e.producer.resTbl.Unlock()
if err != nil {
return err
}
return e.baseExecutor.Close()
}

Expand All @@ -128,10 +130,6 @@ func (e *CTEExec) reset() {
}

type cteProducer struct {
opened bool
produced bool
closed bool

ctx sessionctx.Context

seedExec Executor
Expand Down Expand Up @@ -164,6 +162,7 @@ type cteProducer struct {
corColHashCodes [][]byte
}

// NOTE: Make sure check p.resTbl.Done() and p.resTbl.Error() before open producer to avoid open twice.
func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err error) {
if p.seedExec == nil {
return errors.New("seedExec for CTEExec is nil")
Expand Down Expand Up @@ -206,7 +205,6 @@ func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err e
p.hCtx.keyColIdx[i] = i
}
}
p.opened = true
return nil
}

Expand All @@ -226,7 +224,6 @@ func (p *cteProducer) closeProducer() (err error) {
}
}
}
p.closed = true
return nil
}

Expand Down Expand Up @@ -468,10 +465,6 @@ func (p *cteProducer) setupTblsForNewIteration() (err error) {
func (p *cteProducer) reset() {
p.curIter = 0
p.hashTbl = nil

p.opened = false
p.produced = false
p.closed = false
}

func (p *cteProducer) reopenTbls() (err error) {
Expand Down

0 comments on commit 2c13472

Please sign in to comment.