From 20d3d60350b8b66a8c489b0081cfb2eb8cc792f9 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Mon, 20 Dec 2021 11:35:46 +0800 Subject: [PATCH] executor: HashJoinExec checks the buildError even if the probeSide is empty (#30471) (#30794) close pingcap/tidb#30289 --- executor/executor_test.go | 14 ++++++++++++++ executor/join.go | 13 ++++++++++++- executor/shuffle.go | 28 +++++++++++++++++++++------- 3 files changed, 47 insertions(+), 8 deletions(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index 27e12ee99f717..c760d5950c416 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -8279,3 +8279,17 @@ func (s *testSuite) TestIssue26532(c *C) { tk.MustQuery("select greatest(\"2020-01-01 01:01:01\" ,\"2019-01-01 01:01:01\" )union select null;").Sort().Check(testkit.Rows("2020-01-01 01:01:01", "")) tk.MustQuery("select least(\"2020-01-01 01:01:01\" , \"2019-01-01 01:01:01\" )union select null;").Sort().Check(testkit.Rows("2019-01-01 01:01:01", "")) } + +func (s *testSerialSuite) TestIssue30289(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + fpName := "github.com/pingcap/tidb/executor/issue30289" + c.Assert(failpoint.Enable(fpName, `return(true)`), IsNil) + defer func() { + c.Assert(failpoint.Disable(fpName), IsNil) + }() + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int)") + err := tk.QueryToErr("select /*+ hash_join(t1) */ * from t t1 join t t2 on t1.a=t2.a") + c.Assert(err.Error(), Matches, "issue30289 build return error") +} diff --git a/executor/join.go b/executor/join.go index 472df41ded441..9ce1625ff635a 100644 --- a/executor/join.go +++ b/executor/join.go @@ -216,9 +216,13 @@ func (e *HashJoinExec) fetchProbeSideChunks(ctx context.Context) { return } if !hasWaitedForBuild { + failpoint.Inject("issue30289", func(val failpoint.Value) { + if val.(bool) { + probeSideResult.Reset() + } + }) if probeSideResult.NumRows() == 0 && !e.useOuterToBuild { e.finished.Store(true) - return } emptyBuild, buildErr := e.wait4BuildSide() if buildErr != nil { @@ -260,6 +264,13 @@ func (e *HashJoinExec) wait4BuildSide() (emptyBuild bool, err error) { func (e *HashJoinExec) fetchBuildSideRows(ctx context.Context, chkCh chan<- *chunk.Chunk, doneCh <-chan struct{}) { defer close(chkCh) var err error + failpoint.Inject("issue30289", func(val failpoint.Value) { + if val.(bool) { + err = errors.Errorf("issue30289 build return error") + e.buildFinished <- errors.Trace(err) + return + } + }) for { if e.finished.Load().(bool) { return diff --git a/executor/shuffle.go b/executor/shuffle.go index 9ad4ff522e4cd..cf2a3a225cd28 100644 --- a/executor/shuffle.go +++ b/executor/shuffle.go @@ -141,17 +141,29 @@ func (e *ShuffleExec) Close() error { if !e.prepared { for _, w := range e.workers { for _, r := range w.receivers { - close(r.inputHolderCh) - close(r.inputCh) + if r.inputHolderCh != nil { + close(r.inputHolderCh) + } + if r.inputCh != nil { + close(r.inputCh) + } } - close(w.outputHolderCh) + if w.outputHolderCh != nil { + close(w.outputHolderCh) + } + } + if e.outputCh != nil { + close(e.outputCh) } - close(e.outputCh) } - close(e.finishCh) + if e.finishCh != nil { + close(e.finishCh) + } for _, w := range e.workers { for _, r := range w.receivers { - for range r.inputCh { + if r.inputCh != nil { + for range r.inputCh { + } } } // close child executor of each worker @@ -159,7 +171,9 @@ func (e *ShuffleExec) Close() error { firstErr = err } } - for range e.outputCh { // workers exit before `e.outputCh` is closed. + if e.outputCh != nil { + for range e.outputCh { // workers exit before `e.outputCh` is closed. + } } e.executed = false