diff --git a/executor/executor_test.go b/executor/executor_test.go index 39e8bc3c022f9..fd583d7d23f8f 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -8815,6 +8815,45 @@ func (s *testSerialSuite) TestIssue28650(c *C) { } } +func (s *testSerialSuite) TestIndexJoin31494(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + + tk.MustExec("drop table if exists t1, t2;") + tk.MustExec("create table t1(a int(11) default null, b int(11) default null, key(b));") + insertStr := "insert into t1 values(1, 1)" + for i := 1; i < 32768; i++ { + insertStr += fmt.Sprintf(", (%d, %d)", i, i) + } + tk.MustExec(insertStr) + tk.MustExec("create table t2(a int(11) default null, b int(11) default null, c int(11) default null)") + insertStr = "insert into t2 values(1, 1, 1)" + for i := 1; i < 32768; i++ { + insertStr += fmt.Sprintf(", (%d, %d, %d)", i, i, i) + } + tk.MustExec(insertStr) + sm := &mockSessionManager1{ + PS: make([]*util.ProcessInfo, 0), + } + tk.Se.SetSessionManager(sm) + s.domain.ExpensiveQueryHandle().SetSessionManager(sm) + defer config.RestoreFunc()() + config.UpdateGlobal(func(conf *config.Config) { + conf.OOMAction = config.OOMActionCancel + }) + c.Assert(tk.Se.Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil), IsTrue) + tk.MustExec("set @@tidb_mem_quota_query=2097152;") + // This bug will be reproduced in 10 times. + for i := 0; i < 10; i++ { + err := tk.QueryToErr("select /*+ inl_join(t1) */ * from t1 right join t2 on t1.b=t2.b;") + c.Assert(err, NotNil) + c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*") + err = tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 right join t2 on t1.b=t2.b;") + c.Assert(err, NotNil) + c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*") + } +} + func (s *testSuite) TestDeleteWithMulTbl(c *C) { tk := testkit.NewTestKit(c, s.store) diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index 5dd6f960811f6..0332e3186a2d8 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -211,7 +211,6 @@ func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r interface{}) { e.taskCh <- task } if e.cancelFunc != nil { - e.IndexLookUpJoin.ctxCancelReason.Store(err) e.cancelFunc() } } @@ -248,9 +247,6 @@ func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) er return result.err } case <-ctx.Done(): - if err := e.IndexLookUpJoin.ctxCancelReason.Load(); err != nil { - return err.(error) - } return ctx.Err() } req.SwapColumns(result.chk) @@ -280,9 +276,6 @@ func (e *IndexNestedLoopHashJoin) runInOrder(ctx context.Context, req *chunk.Chu return result.err } case <-ctx.Done(): - if err := e.IndexLookUpJoin.ctxCancelReason.Load(); err != nil { - return err.(error) - } return ctx.Err() } req.SwapColumns(result.chk) @@ -696,9 +689,6 @@ func (iw *indexHashJoinInnerWorker) doJoinUnordered(ctx context.Context, task *i select { case resultCh <- joinResult: case <-ctx.Done(): - if err := iw.lookup.ctxCancelReason.Load(); err != nil { - return err.(error) - } return ctx.Err() } joinResult, ok = iw.getNewJoinResult(ctx) @@ -849,9 +839,6 @@ func (iw *indexHashJoinInnerWorker) doJoinInOrder(ctx context.Context, task *ind select { case resultCh <- joinResult: case <-ctx.Done(): - if err := iw.lookup.ctxCancelReason.Load(); err != nil { - return err.(error) - } return ctx.Err() } joinResult, ok = iw.getNewJoinResult(ctx) diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 440b223a9f770..c494da48d5d48 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -82,9 +82,8 @@ type IndexLookUpJoin struct { memTracker *memory.Tracker // track memory usage. - stats *indexLookUpJoinRuntimeStats - ctxCancelReason atomic.Value - finished *atomic.Value + stats *indexLookUpJoinRuntimeStats + finished *atomic.Value } type outerCtx struct { @@ -313,9 +312,6 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask, select { case task = <-e.resultCh: case <-ctx.Done(): - if err := e.ctxCancelReason.Load(); err != nil { - return nil, err.(error) - } return nil, ctx.Err() } if task == nil { @@ -328,9 +324,6 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask, return nil, err } case <-ctx.Done(): - if err := e.ctxCancelReason.Load(); err != nil { - return nil, err.(error) - } return nil, ctx.Err() } @@ -363,8 +356,6 @@ func (ow *outerWorker) run(ctx context.Context, wg *sync.WaitGroup) { err := errors.Errorf("%v", r) task.doneCh <- err ow.pushToChan(ctx, task, ow.resultCh) - ow.lookup.ctxCancelReason.Store(err) - ow.lookup.cancelFunc() } close(ow.resultCh) close(ow.innerCh) @@ -484,8 +475,6 @@ func (iw *innerWorker) run(ctx context.Context, wg *sync.WaitGroup) { err := errors.Errorf("%v", r) // "task != nil" is guaranteed when panic happened. task.doneCh <- err - iw.lookup.ctxCancelReason.Store(err) - iw.lookup.cancelFunc() } wg.Done() }() @@ -696,9 +685,6 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa for { select { case <-ctx.Done(): - if err := iw.lookup.ctxCancelReason.Load(); err != nil { - return err.(error) - } return ctx.Err() default: }