Skip to content

Commit

Permalink
executor: send a task with error to the resultCh when panic happen (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Nov 30, 2021
1 parent 89c010a commit 83b273a
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 0 deletions.
1 change: 1 addition & 0 deletions executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ func (ow *indexHashJoinOuterWorker) run(ctx context.Context) {
defer trace.StartRegion(ctx, "IndexHashJoinOuterWorker").End()
defer close(ow.innerCh)
for {
failpoint.Inject("TestIssue30211", nil)
task, err := ow.buildTask(ctx)
failpoint.Inject("testIndexHashJoinOuterWorkerErr", func() {
err = errors.New("mockIndexHashJoinOuterWorkerErr")
Expand Down
3 changes: 3 additions & 0 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"unsafe"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
Expand Down Expand Up @@ -362,6 +363,7 @@ func (ow *outerWorker) run(ctx context.Context, wg *sync.WaitGroup) {
task := &lookUpJoinTask{doneCh: make(chan error, 1)}
err := errors.Errorf("%v", r)
task.doneCh <- err
ow.pushToChan(ctx, task, ow.resultCh)
ow.lookup.ctxCancelReason.Store(err)
ow.lookup.cancelFunc()
}
Expand All @@ -370,6 +372,7 @@ func (ow *outerWorker) run(ctx context.Context, wg *sync.WaitGroup) {
wg.Done()
}()
for {
failpoint.Inject("TestIssue30211", nil)
task, err := ow.buildTask(ctx)
if err != nil {
task.doneCh <- err
Expand Down
36 changes: 36 additions & 0 deletions executor/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2610,3 +2610,39 @@ func (s *testSuiteJoinSerial) TestIssue25902(c *C) {
tk.MustQuery("select * from tt1 where ts in (select ts from tt2);").Check(testkit.Rows())
tk.MustExec("set @@session.time_zone = @tmp;")
}

func (s *testSuiteJoinSerial) TestIssue30211(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2;")
tk.MustExec("create table t1(a int, index(a));")
tk.MustExec("create table t2(a int, index(a));")
func() {
fpName := "github.com/pingcap/tidb/executor/TestIssue30211"
c.Assert(failpoint.Enable(fpName, `panic("TestIssue30211 IndexJoinPanic")`), IsNil)
defer func() {
c.Assert(failpoint.Disable(fpName), IsNil)
}()
err := tk.QueryToErr("select /*+ inl_join(t1) */ * from t1 join t2 on t1.a = t2.a;").Error()
c.Assert(err, Matches, "failpoint panic: TestIssue30211 IndexJoinPanic")

err = tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 join t2 on t1.a = t2.a;").Error()
c.Assert(err, Matches, "failpoint panic: TestIssue30211 IndexJoinPanic")
}()
tk.MustExec("insert into t1 values(1),(2);")
tk.MustExec("insert into t2 values(1),(1),(2),(2);")
tk.MustExec("set @@tidb_mem_quota_query=8000;")
tk.MustExec("set tidb_index_join_batch_size = 1;")
config.UpdateGlobal(func(conf *config.Config) {
conf.OOMAction = config.OOMActionCancel
})
defer func() {
config.UpdateGlobal(func(conf *config.Config) {
conf.OOMAction = config.OOMActionLog
})
}()
err := tk.QueryToErr("select /*+ inl_join(t1) */ * from t1 join t2 on t1.a = t2.a;").Error()
c.Assert(strings.Contains(err, "Out Of Memory Quota"), IsTrue)
err = tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 join t2 on t1.a = t2.a;").Error()
c.Assert(strings.Contains(err, "Out Of Memory Quota"), IsTrue)
}

0 comments on commit 83b273a

Please sign in to comment.