From 14bd70bd9f4e1a57aa62dfabea7bb55a788527a9 Mon Sep 17 00:00:00 2001 From: Feng Liyuan Date: Wed, 16 Oct 2019 17:43:58 +0800 Subject: [PATCH 1/3] Fix forever hanging when HashAgg is called by apply --- executor/aggregate.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/executor/aggregate.go b/executor/aggregate.go index 57aa431f718a6..05fa5ff7f8275 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -161,6 +161,7 @@ type HashAggExec struct { isChildReturnEmpty bool childResult *chunk.Chunk + executed bool } // HashAggInput indicates the input of hash agg exec. @@ -602,10 +603,14 @@ func (e *HashAggExec) parallelExec(ctx context.Context, chk *chunk.Chunk) error e.prepared = true } + if e.executed { + return nil + } for !chk.IsFull() { e.finalInputCh <- chk result, ok := <-e.finalOutputCh if !ok { // all finalWorkers exited + e.executed = true if chk.NumRows() > 0 { // but there are some data left return nil } From d6b1443f86c48b31986311b1e05381f50af2fbe2 Mon Sep 17 00:00:00 2001 From: Feng Liyuan Date: Wed, 16 Oct 2019 18:31:54 +0800 Subject: [PATCH 2/3] fix test --- executor/aggregate.go | 1 + 1 file changed, 1 insertion(+) diff --git a/executor/aggregate.go b/executor/aggregate.go index 05fa5ff7f8275..81dadbe022c27 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -208,6 +208,7 @@ func (e *HashAggExec) Close() error { } close(e.finalOutputCh) } + e.executed = false close(e.finishCh) for _, ch := range e.partialOutputChs { for range ch { From 6a5681f7172a8684011aa59a63776d0f5d8da141 Mon Sep 17 00:00:00 2001 From: Feng Liyuan Date: Wed, 16 Oct 2019 18:33:21 +0800 Subject: [PATCH 3/3] fix test --- executor/aggregate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/aggregate.go b/executor/aggregate.go index 81dadbe022c27..ff3a6f7aa72b3 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -208,7 +208,6 @@ func (e *HashAggExec) Close() error { } close(e.finalOutputCh) } - e.executed = false close(e.finishCh) for _, ch := range e.partialOutputChs { for range ch { @@ -216,6 +215,7 @@ func (e *HashAggExec) Close() error { } for range e.finalOutputCh { } + e.executed = false return e.baseExecutor.Close() }