From af8abc7fd5a8608e2446ccc828c79d444638e9a7 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 12 Jan 2022 17:17:42 +0800 Subject: [PATCH 1/4] cherry pick #31563 to release-5.2 Signed-off-by: ti-srebot --- executor/executor_test.go | 262 +++++++++++++++++++++++++++++ executor/index_lookup_hash_join.go | 13 -- executor/index_lookup_join.go | 18 +- 3 files changed, 264 insertions(+), 29 deletions(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index 1edabe0d76b3f..ff210eea3997c 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -9066,3 +9066,265 @@ func (s *testSerialSuite) TestIssue28650(c *C) { }() } } +<<<<<<< HEAD +======= + +func (s *testSerialSuite) TestIssue30289(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + fpName := "github.com/pingcap/tidb/executor/issue30289" + c.Assert(failpoint.Enable(fpName, `return(true)`), IsNil) + defer func() { + c.Assert(failpoint.Disable(fpName), IsNil) + }() + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int)") + err := tk.QueryToErr("select /*+ hash_join(t1) */ * from t t1 join t t2 on t1.a=t2.a") + c.Assert(err.Error(), Matches, "issue30289 build return error") +} + +func (s *testSerialSuite) TestIssue29498(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("DROP TABLE IF EXISTS t1;") + tk.MustExec("CREATE TABLE t1 (t3 TIME(3), d DATE, t TIME);") + tk.MustExec("INSERT INTO t1 VALUES ('00:00:00.567', '2002-01-01', '00:00:02');") + + res := tk.MustQuery("SELECT CONCAT(IFNULL(t3, d)) AS col1 FROM t1;") + row := res.Rows()[0][0].(string) + c.Assert(len(row), Equals, mysql.MaxDatetimeWidthNoFsp+3+1) + c.Assert(row[len(row)-12:], Equals, "00:00:00.567") + + res = tk.MustQuery("SELECT IFNULL(t3, d) AS col1 FROM t1;") + row = res.Rows()[0][0].(string) + c.Assert(len(row), Equals, mysql.MaxDatetimeWidthNoFsp+3+1) + c.Assert(row[len(row)-12:], Equals, "00:00:00.567") + + res = tk.MustQuery("SELECT CONCAT(IFNULL(t, d)) AS col1 FROM t1;") + row = res.Rows()[0][0].(string) + c.Assert(len(row), Equals, mysql.MaxDatetimeWidthNoFsp) + c.Assert(row[len(row)-8:], Equals, "00:00:02") + + res = tk.MustQuery("SELECT IFNULL(t, d) AS col1 FROM t1;") + row = res.Rows()[0][0].(string) + c.Assert(len(row), Equals, mysql.MaxDatetimeWidthNoFsp) + c.Assert(row[len(row)-8:], Equals, "00:00:02") + + res = tk.MustQuery("SELECT CONCAT(xx) FROM (SELECT t3 AS xx FROM t1 UNION SELECT d FROM t1) x ORDER BY -xx LIMIT 1;") + row = res.Rows()[0][0].(string) + c.Assert(len(row), Equals, mysql.MaxDatetimeWidthNoFsp+3+1) + c.Assert(row[len(row)-12:], Equals, "00:00:00.567") + + res = tk.MustQuery("SELECT CONCAT(CASE WHEN d IS NOT NULL THEN t3 ELSE d END) AS col1 FROM t1;") + row = res.Rows()[0][0].(string) + c.Assert(len(row), Equals, mysql.MaxDatetimeWidthNoFsp+3+1) + c.Assert(row[len(row)-12:], Equals, "00:00:00.567") +} + +// Test invoke Close without invoking Open before for each operators. +func (s *testSerialSuite) TestUnreasonablyClose(c *C) { + defer testleak.AfterTest(c)() + + is := infoschema.MockInfoSchema([]*model.TableInfo{plannercore.MockSignedTable(), plannercore.MockUnsignedTable()}) + se, err := session.CreateSession4Test(s.store) + c.Assert(err, IsNil) + _, err = se.Execute(context.Background(), "use test") + c.Assert(err, IsNil) + // To enable the shuffleExec operator. + _, err = se.Execute(context.Background(), "set @@tidb_merge_join_concurrency=4") + c.Assert(err, IsNil) + + var opsNeedsCovered = []plannercore.PhysicalPlan{ + &plannercore.PhysicalHashJoin{}, + &plannercore.PhysicalMergeJoin{}, + &plannercore.PhysicalIndexJoin{}, + &plannercore.PhysicalIndexHashJoin{}, + &plannercore.PhysicalTableReader{}, + &plannercore.PhysicalIndexReader{}, + &plannercore.PhysicalIndexLookUpReader{}, + &plannercore.PhysicalIndexMergeReader{}, + &plannercore.PhysicalApply{}, + &plannercore.PhysicalHashAgg{}, + &plannercore.PhysicalStreamAgg{}, + &plannercore.PhysicalLimit{}, + &plannercore.PhysicalSort{}, + &plannercore.PhysicalTopN{}, + &plannercore.PhysicalCTE{}, + &plannercore.PhysicalCTETable{}, + &plannercore.PhysicalMaxOneRow{}, + &plannercore.PhysicalProjection{}, + &plannercore.PhysicalSelection{}, + &plannercore.PhysicalTableDual{}, + &plannercore.PhysicalWindow{}, + &plannercore.PhysicalShuffle{}, + &plannercore.PhysicalUnionAll{}, + } + executorBuilder := executor.NewMockExecutorBuilderForTest(se, is, nil, math.MaxUint64, false, "global") + + var opsNeedsCoveredMask uint64 = 1< t1.a) AS a from t as t1) t", + "select /*+ hash_agg() */ count(f) from t group by a", + "select /*+ stream_agg() */ count(f) from t group by a", + "select * from t order by a, f", + "select * from t order by a, f limit 1", + "select * from t limit 1", + "select (select t1.a from t t1 where t1.a > t2.a) as a from t t2;", + "select a + 1 from t", + "select count(*) a from t having a > 1", + "select * from t where a = 1.1", + "with recursive cte1(c1) as (select 1 union select c1 + 1 from cte1 limit 5 offset 0) select * from cte1", + "select /*+use_index_merge(t, c_d_e, f)*/ * from t where c < 1 or f > 2", + "select sum(f) over (partition by f) from t", + "select /*+ merge_join(t1)*/ * from t t1 join t t2 on t1.d = t2.d", + "select a from t union all select a from t", + } { + comment := Commentf("case:%v sql:%s", i, tc) + c.Assert(err, IsNil, comment) + stmt, err := s.ParseOneStmt(tc, "", "") + c.Assert(err, IsNil, comment) + + err = se.NewTxn(context.Background()) + c.Assert(err, IsNil, comment) + p, _, err := planner.Optimize(context.TODO(), se, stmt, is) + c.Assert(err, IsNil, comment) + // This for loop level traverses the plan tree to get which operators are covered. + for child := []plannercore.PhysicalPlan{p.(plannercore.PhysicalPlan)}; len(child) != 0; { + newChild := make([]plannercore.PhysicalPlan, 0, len(child)) + for _, ch := range child { + found := false + for k, t := range opsNeedsCovered { + if reflect.TypeOf(t) == reflect.TypeOf(ch) { + opsAlreadyCoveredMask |= 1 << k + found = true + break + } + } + c.Assert(found, IsTrue, Commentf("case: %v sql: %s operator %v is not registered in opsNeedsCoveredMask", i, tc, reflect.TypeOf(ch))) + switch x := ch.(type) { + case *plannercore.PhysicalCTE: + newChild = append(newChild, x.RecurPlan) + newChild = append(newChild, x.SeedPlan) + continue + case *plannercore.PhysicalShuffle: + newChild = append(newChild, x.DataSources...) + newChild = append(newChild, x.Tails...) + continue + } + newChild = append(newChild, ch.Children()...) + } + child = newChild + } + + e := executorBuilder.Build(p) + + func() { + defer func() { + r := recover() + buf := make([]byte, 4096) + stackSize := runtime.Stack(buf, false) + buf = buf[:stackSize] + c.Assert(r, IsNil, Commentf("case: %v\n sql: %s\n error stack: %v", i, tc, string(buf))) + }() + c.Assert(e.Close(), IsNil, comment) + }() + } + // The following code is used to make sure all the operators registered + // in opsNeedsCoveredMask are covered. + commentBuf := strings.Builder{} + if opsAlreadyCoveredMask != opsNeedsCoveredMask { + for i := range opsNeedsCovered { + if opsAlreadyCoveredMask&(1<>>>>>> 82a75542e... executor: fix index join bug caused by innerWorker panic (#31563) diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index dfede97ee03c6..42e17cf334a12 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -211,7 +211,6 @@ func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r interface{}) { e.taskCh <- task } if e.cancelFunc != nil { - e.IndexLookUpJoin.ctxCancelReason.Store(err) e.cancelFunc() } } @@ -248,9 +247,6 @@ func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) er return result.err } case <-ctx.Done(): - if err := e.IndexLookUpJoin.ctxCancelReason.Load(); err != nil { - return err.(error) - } return ctx.Err() } req.SwapColumns(result.chk) @@ -280,9 +276,6 @@ func (e *IndexNestedLoopHashJoin) runInOrder(ctx context.Context, req *chunk.Chu return result.err } case <-ctx.Done(): - if err := e.IndexLookUpJoin.ctxCancelReason.Load(); err != nil { - return err.(error) - } return ctx.Err() } req.SwapColumns(result.chk) @@ -655,9 +648,6 @@ func (iw *indexHashJoinInnerWorker) doJoinUnordered(ctx context.Context, task *i select { case resultCh <- joinResult: case <-ctx.Done(): - if err := iw.lookup.ctxCancelReason.Load(); err != nil { - return err.(error) - } return ctx.Err() } joinResult, ok = iw.getNewJoinResult(ctx) @@ -806,9 +796,6 @@ func (iw *indexHashJoinInnerWorker) doJoinInOrder(ctx context.Context, task *ind select { case resultCh <- joinResult: case <-ctx.Done(): - if err := iw.lookup.ctxCancelReason.Load(); err != nil { - return err.(error) - } return ctx.Err() } joinResult, ok = iw.getNewJoinResult(ctx) diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index db0ec2c3756d8..162ab8cb6db55 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -82,9 +82,8 @@ type IndexLookUpJoin struct { memTracker *memory.Tracker // track memory usage. - stats *indexLookUpJoinRuntimeStats - ctxCancelReason atomic.Value - finished *atomic.Value + stats *indexLookUpJoinRuntimeStats + finished *atomic.Value } type outerCtx struct { @@ -311,9 +310,6 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask, select { case task = <-e.resultCh: case <-ctx.Done(): - if err := e.ctxCancelReason.Load(); err != nil { - return nil, err.(error) - } return nil, ctx.Err() } if task == nil { @@ -326,9 +322,6 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask, return nil, err } case <-ctx.Done(): - if err := e.ctxCancelReason.Load(); err != nil { - return nil, err.(error) - } return nil, ctx.Err() } @@ -361,8 +354,6 @@ func (ow *outerWorker) run(ctx context.Context, wg *sync.WaitGroup) { err := errors.Errorf("%v", r) task.doneCh <- err ow.pushToChan(ctx, task, ow.resultCh) - ow.lookup.ctxCancelReason.Store(err) - ow.lookup.cancelFunc() } close(ow.resultCh) close(ow.innerCh) @@ -482,8 +473,6 @@ func (iw *innerWorker) run(ctx context.Context, wg *sync.WaitGroup) { err := errors.Errorf("%v", r) // "task != nil" is guaranteed when panic happened. task.doneCh <- err - iw.lookup.ctxCancelReason.Store(err) - iw.lookup.cancelFunc() } wg.Done() }() @@ -699,9 +688,6 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa for { select { case <-ctx.Done(): - if err := iw.lookup.ctxCancelReason.Load(); err != nil { - return err.(error) - } return ctx.Err() default: } From 235994415cbbcb421daf37d99be4222e9546dfe1 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Tue, 12 Apr 2022 20:10:20 +0800 Subject: [PATCH 2/4] fix conflict Signed-off-by: guo-shaoge --- executor/executor_test.go | 301 +++++--------------------------------- 1 file changed, 39 insertions(+), 262 deletions(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index ff210eea3997c..0807a3387d5db 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -9009,6 +9009,45 @@ func (s *testSuite) TestCTEWithIndexLookupJoinDeadLock(c *C) { } } +func (s *testSerialSuite) TestIndexJoin31494(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + + tk.MustExec("drop table if exists t1, t2;") + tk.MustExec("create table t1(a int(11) default null, b int(11) default null, key(b));") + insertStr := "insert into t1 values(1, 1)" + for i := 1; i < 32768; i++ { + insertStr += fmt.Sprintf(", (%d, %d)", i, i) + } + tk.MustExec(insertStr) + tk.MustExec("create table t2(a int(11) default null, b int(11) default null, c int(11) default null)") + insertStr = "insert into t2 values(1, 1, 1)" + for i := 1; i < 32768; i++ { + insertStr += fmt.Sprintf(", (%d, %d, %d)", i, i, i) + } + tk.MustExec(insertStr) + sm := &mockSessionManager1{ + PS: make([]*util.ProcessInfo, 0), + } + tk.Se.SetSessionManager(sm) + s.domain.ExpensiveQueryHandle().SetSessionManager(sm) + defer config.RestoreFunc()() + config.UpdateGlobal(func(conf *config.Config) { + conf.OOMAction = config.OOMActionCancel + }) + c.Assert(tk.Se.Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil), IsTrue) + tk.MustExec("set @@tidb_mem_quota_query=2097152;") + // This bug will be reproduced in 10 times. + for i := 0; i < 10; i++ { + err := tk.QueryToErr("select /*+ inl_join(t1) */ * from t1 right join t2 on t1.b=t2.b;") + c.Assert(err, NotNil) + c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*") + err = tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 right join t2 on t1.b=t2.b;") + c.Assert(err, NotNil) + c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*") + } +} + func (s *testSerialSuite) TestIssue28650(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") @@ -9066,265 +9105,3 @@ func (s *testSerialSuite) TestIssue28650(c *C) { }() } } -<<<<<<< HEAD -======= - -func (s *testSerialSuite) TestIssue30289(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("use test") - fpName := "github.com/pingcap/tidb/executor/issue30289" - c.Assert(failpoint.Enable(fpName, `return(true)`), IsNil) - defer func() { - c.Assert(failpoint.Disable(fpName), IsNil) - }() - tk.MustExec("drop table if exists t") - tk.MustExec("create table t(a int)") - err := tk.QueryToErr("select /*+ hash_join(t1) */ * from t t1 join t t2 on t1.a=t2.a") - c.Assert(err.Error(), Matches, "issue30289 build return error") -} - -func (s *testSerialSuite) TestIssue29498(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("use test") - tk.MustExec("DROP TABLE IF EXISTS t1;") - tk.MustExec("CREATE TABLE t1 (t3 TIME(3), d DATE, t TIME);") - tk.MustExec("INSERT INTO t1 VALUES ('00:00:00.567', '2002-01-01', '00:00:02');") - - res := tk.MustQuery("SELECT CONCAT(IFNULL(t3, d)) AS col1 FROM t1;") - row := res.Rows()[0][0].(string) - c.Assert(len(row), Equals, mysql.MaxDatetimeWidthNoFsp+3+1) - c.Assert(row[len(row)-12:], Equals, "00:00:00.567") - - res = tk.MustQuery("SELECT IFNULL(t3, d) AS col1 FROM t1;") - row = res.Rows()[0][0].(string) - c.Assert(len(row), Equals, mysql.MaxDatetimeWidthNoFsp+3+1) - c.Assert(row[len(row)-12:], Equals, "00:00:00.567") - - res = tk.MustQuery("SELECT CONCAT(IFNULL(t, d)) AS col1 FROM t1;") - row = res.Rows()[0][0].(string) - c.Assert(len(row), Equals, mysql.MaxDatetimeWidthNoFsp) - c.Assert(row[len(row)-8:], Equals, "00:00:02") - - res = tk.MustQuery("SELECT IFNULL(t, d) AS col1 FROM t1;") - row = res.Rows()[0][0].(string) - c.Assert(len(row), Equals, mysql.MaxDatetimeWidthNoFsp) - c.Assert(row[len(row)-8:], Equals, "00:00:02") - - res = tk.MustQuery("SELECT CONCAT(xx) FROM (SELECT t3 AS xx FROM t1 UNION SELECT d FROM t1) x ORDER BY -xx LIMIT 1;") - row = res.Rows()[0][0].(string) - c.Assert(len(row), Equals, mysql.MaxDatetimeWidthNoFsp+3+1) - c.Assert(row[len(row)-12:], Equals, "00:00:00.567") - - res = tk.MustQuery("SELECT CONCAT(CASE WHEN d IS NOT NULL THEN t3 ELSE d END) AS col1 FROM t1;") - row = res.Rows()[0][0].(string) - c.Assert(len(row), Equals, mysql.MaxDatetimeWidthNoFsp+3+1) - c.Assert(row[len(row)-12:], Equals, "00:00:00.567") -} - -// Test invoke Close without invoking Open before for each operators. -func (s *testSerialSuite) TestUnreasonablyClose(c *C) { - defer testleak.AfterTest(c)() - - is := infoschema.MockInfoSchema([]*model.TableInfo{plannercore.MockSignedTable(), plannercore.MockUnsignedTable()}) - se, err := session.CreateSession4Test(s.store) - c.Assert(err, IsNil) - _, err = se.Execute(context.Background(), "use test") - c.Assert(err, IsNil) - // To enable the shuffleExec operator. - _, err = se.Execute(context.Background(), "set @@tidb_merge_join_concurrency=4") - c.Assert(err, IsNil) - - var opsNeedsCovered = []plannercore.PhysicalPlan{ - &plannercore.PhysicalHashJoin{}, - &plannercore.PhysicalMergeJoin{}, - &plannercore.PhysicalIndexJoin{}, - &plannercore.PhysicalIndexHashJoin{}, - &plannercore.PhysicalTableReader{}, - &plannercore.PhysicalIndexReader{}, - &plannercore.PhysicalIndexLookUpReader{}, - &plannercore.PhysicalIndexMergeReader{}, - &plannercore.PhysicalApply{}, - &plannercore.PhysicalHashAgg{}, - &plannercore.PhysicalStreamAgg{}, - &plannercore.PhysicalLimit{}, - &plannercore.PhysicalSort{}, - &plannercore.PhysicalTopN{}, - &plannercore.PhysicalCTE{}, - &plannercore.PhysicalCTETable{}, - &plannercore.PhysicalMaxOneRow{}, - &plannercore.PhysicalProjection{}, - &plannercore.PhysicalSelection{}, - &plannercore.PhysicalTableDual{}, - &plannercore.PhysicalWindow{}, - &plannercore.PhysicalShuffle{}, - &plannercore.PhysicalUnionAll{}, - } - executorBuilder := executor.NewMockExecutorBuilderForTest(se, is, nil, math.MaxUint64, false, "global") - - var opsNeedsCoveredMask uint64 = 1< t1.a) AS a from t as t1) t", - "select /*+ hash_agg() */ count(f) from t group by a", - "select /*+ stream_agg() */ count(f) from t group by a", - "select * from t order by a, f", - "select * from t order by a, f limit 1", - "select * from t limit 1", - "select (select t1.a from t t1 where t1.a > t2.a) as a from t t2;", - "select a + 1 from t", - "select count(*) a from t having a > 1", - "select * from t where a = 1.1", - "with recursive cte1(c1) as (select 1 union select c1 + 1 from cte1 limit 5 offset 0) select * from cte1", - "select /*+use_index_merge(t, c_d_e, f)*/ * from t where c < 1 or f > 2", - "select sum(f) over (partition by f) from t", - "select /*+ merge_join(t1)*/ * from t t1 join t t2 on t1.d = t2.d", - "select a from t union all select a from t", - } { - comment := Commentf("case:%v sql:%s", i, tc) - c.Assert(err, IsNil, comment) - stmt, err := s.ParseOneStmt(tc, "", "") - c.Assert(err, IsNil, comment) - - err = se.NewTxn(context.Background()) - c.Assert(err, IsNil, comment) - p, _, err := planner.Optimize(context.TODO(), se, stmt, is) - c.Assert(err, IsNil, comment) - // This for loop level traverses the plan tree to get which operators are covered. - for child := []plannercore.PhysicalPlan{p.(plannercore.PhysicalPlan)}; len(child) != 0; { - newChild := make([]plannercore.PhysicalPlan, 0, len(child)) - for _, ch := range child { - found := false - for k, t := range opsNeedsCovered { - if reflect.TypeOf(t) == reflect.TypeOf(ch) { - opsAlreadyCoveredMask |= 1 << k - found = true - break - } - } - c.Assert(found, IsTrue, Commentf("case: %v sql: %s operator %v is not registered in opsNeedsCoveredMask", i, tc, reflect.TypeOf(ch))) - switch x := ch.(type) { - case *plannercore.PhysicalCTE: - newChild = append(newChild, x.RecurPlan) - newChild = append(newChild, x.SeedPlan) - continue - case *plannercore.PhysicalShuffle: - newChild = append(newChild, x.DataSources...) - newChild = append(newChild, x.Tails...) - continue - } - newChild = append(newChild, ch.Children()...) - } - child = newChild - } - - e := executorBuilder.Build(p) - - func() { - defer func() { - r := recover() - buf := make([]byte, 4096) - stackSize := runtime.Stack(buf, false) - buf = buf[:stackSize] - c.Assert(r, IsNil, Commentf("case: %v\n sql: %s\n error stack: %v", i, tc, string(buf))) - }() - c.Assert(e.Close(), IsNil, comment) - }() - } - // The following code is used to make sure all the operators registered - // in opsNeedsCoveredMask are covered. - commentBuf := strings.Builder{} - if opsAlreadyCoveredMask != opsNeedsCoveredMask { - for i := range opsNeedsCovered { - if opsAlreadyCoveredMask&(1<>>>>>> 82a75542e... executor: fix index join bug caused by innerWorker panic (#31563) From 0e5f9188bea11752e718c4ef7c79675deeb0fd27 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Thu, 14 Apr 2022 14:11:56 +0800 Subject: [PATCH 3/4] reduce inserted rows to avoid case run too long Signed-off-by: guo-shaoge --- executor/executor_test.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index a1ba9ac08321a..16eba088aaf2e 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -9016,13 +9016,13 @@ func (s *testSerialSuite) TestIndexJoin31494(c *C) { tk.MustExec("drop table if exists t1, t2;") tk.MustExec("create table t1(a int(11) default null, b int(11) default null, key(b));") insertStr := "insert into t1 values(1, 1)" - for i := 1; i < 32768; i++ { + for i := 1; i < 3000; i++ { insertStr += fmt.Sprintf(", (%d, %d)", i, i) } tk.MustExec(insertStr) tk.MustExec("create table t2(a int(11) default null, b int(11) default null, c int(11) default null)") insertStr = "insert into t2 values(1, 1, 1)" - for i := 1; i < 32768; i++ { + for i := 1; i < 3000; i++ { insertStr += fmt.Sprintf(", (%d, %d, %d)", i, i, i) } tk.MustExec(insertStr) @@ -9036,9 +9036,11 @@ func (s *testSerialSuite) TestIndexJoin31494(c *C) { conf.OOMAction = config.OOMActionCancel }) c.Assert(tk.Se.Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil), IsTrue) - tk.MustExec("set @@tidb_mem_quota_query=2097152;") - // This bug will be reproduced in 10 times. - for i := 0; i < 10; i++ { + tk.MustExec("set @@tidb_mem_quota_query=200000;") + tk.MustExec("set @@tidb_max_chunk_size=32;") + tk.MustExec("set @@tidb_index_join_batch_size=1;") + // This bug will likely be reproduced in 30 times. + for i := 0; i < 30; i++ { err := tk.QueryToErr("select /*+ inl_join(t1) */ * from t1 right join t2 on t1.b=t2.b;") c.Assert(err, NotNil) c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*") From b40fbdff1383e255ec31b49eae0fccca41a5db3e Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Thu, 14 Apr 2022 15:31:09 +0800 Subject: [PATCH 4/4] restore config in defer Signed-off-by: guo-shaoge --- executor/executor_test.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/executor/executor_test.go b/executor/executor_test.go index 16eba088aaf2e..a6a2a36dc8e75 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -9036,9 +9036,18 @@ func (s *testSerialSuite) TestIndexJoin31494(c *C) { conf.OOMAction = config.OOMActionCancel }) c.Assert(tk.Se.Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil), IsTrue) + oriMemQuota := tk.MustQuery("select @@tidb_mem_quota_query;").Rows()[0][0] + oriMaxChkSize := tk.MustQuery("select @@tidb_max_chunk_size;").Rows()[0][0] + oriBatchSize := tk.MustQuery("select @@tidb_index_join_batch_size;").Rows()[0][0] tk.MustExec("set @@tidb_mem_quota_query=200000;") tk.MustExec("set @@tidb_max_chunk_size=32;") tk.MustExec("set @@tidb_index_join_batch_size=1;") + defer func() { + // Restore everything. + tk.MustExec(fmt.Sprintf("set @@tidb_mem_quota_query=%s;", oriMemQuota.(string))) + tk.MustExec(fmt.Sprintf("set @@tidb_max_chunk_size=%s;", oriMaxChkSize.(string))) + tk.MustExec(fmt.Sprintf("set @@tidb_index_join_batch_size=%s;", oriBatchSize.(string))) + }() // This bug will likely be reproduced in 30 times. for i := 0; i < 30; i++ { err := tk.QueryToErr("select /*+ inl_join(t1) */ * from t1 right join t2 on t1.b=t2.b;")