Skip to content

Commit

Permalink
executor: fix deadlock in dml statement with cte when oom panic actio…
Browse files Browse the repository at this point in the history
…n was triggered (#49192) (#49201)

close #49096
  • Loading branch information
ti-chi-bot authored Dec 8, 2023
1 parent f8de777 commit bb5be54
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 9 deletions.
27 changes: 18 additions & 9 deletions executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,24 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {

// Close implements the Executor interface.
func (e *CTEExec) Close() (err error) {
e.producer.resTbl.Lock()
if !e.producer.closed {
// closeProducer() only close seedExec and recursiveExec, will not touch resTbl.
// It means you can still read resTbl after call closeProducer().
// You can even call all three functions(openProducer/produce/closeProducer) in CTEExec.Next().
// Separating these three function calls is only to follow the abstraction of the volcano model.
err = e.producer.closeProducer()
}
e.producer.resTbl.Unlock()
func() {
e.producer.resTbl.Lock()
defer e.producer.resTbl.Unlock()
if !e.producer.closed {
failpoint.Inject("mock_cte_exec_panic_avoid_deadlock", func(v failpoint.Value) {
ok := v.(bool)
if ok {
// mock an oom panic, returning ErrMemoryExceedForQuery for error identification in recovery work.
panic(memory.PanicMemoryExceedWarnMsg)
}
})
// closeProducer() only close seedExec and recursiveExec, will not touch resTbl.
// It means you can still read resTbl after call closeProducer().
// You can even call all three functions(openProducer/produce/closeProducer) in CTEExec.Next().
// Separating these three function calls is only to follow the abstraction of the volcano model.
err = e.producer.closeProducer()
}
}()
if err != nil {
return err
}
Expand Down
31 changes: 31 additions & 0 deletions executor/cte_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,37 @@ import (
"golang.org/x/exp/slices"
)

func TestCTEIssue49096(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test;")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mock_cte_exec_panic_avoid_deadlock", "return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mock_cte_exec_panic_avoid_deadlock"))
}()
insertStr := "insert into t1 values(0)"
rowNum := 10
vals := make([]int, rowNum)
vals[0] = 0
for i := 1; i < rowNum; i++ {
v := rand.Intn(100)
vals[i] = v
insertStr += fmt.Sprintf(", (%d)", v)
}
tk.MustExec("drop table if exists t1, t2;")
tk.MustExec("create table t1(c1 int);")
tk.MustExec("create table t2(c1 int);")
tk.MustExec(insertStr)
// should be insert statement, otherwise it couldn't step int resetCTEStorageMap in handleNoDelay func.
sql := "insert into t2 with cte1 as ( " +
"select c1 from t1) " +
"select c1 from cte1 natural join (select * from cte1 where c1 > 0) cte2 order by c1;"
err := tk.ExecToErr(sql)
require.NotNil(t, err)
require.Equal(t, "Your query has been cancelled due to exceeding the allowed memory limit", err.Error())
}

func TestBasicCTE(t *testing.T) {
store := testkit.CreateMockStore(t)

Expand Down

0 comments on commit bb5be54

Please sign in to comment.