Skip to content

Commit

Permalink
store/tikv: call Next() after copIterator closed lead to goroutine le…
Browse files Browse the repository at this point in the history
…ak (pingcap#5624)

After Close(), worker groutine receive from copIterator.finished and
exit directly, without writing any thing to taskCh.
Next() receives from taskCh and may hang forever, cause the caller goroutine
leak.
  • Loading branch information
tiancaiamao committed Jan 16, 2018
1 parent 0961907 commit 81f0318
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 1 deletion.
33 changes: 33 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"flag"
"fmt"
"os"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -2306,3 +2307,35 @@ func (s *testSuite) TestTableScanWithPointRanges(c *C) {
tk.MustExec("insert into t values(1), (5), (10)")
tk.MustQuery("select * from t where id in(1, 2, 10)").Check(testkit.Rows("1", "10"))
}

func (s *testSuite) TestEarlyClose(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("create table earlyclose (id int primary key)")

// Insert 1000 rows.
var values []string
for i := 0; i < 1000; i++ {
values = append(values, fmt.Sprintf("(%d)", i))
}
tk.MustExec("insert earlyclose values " + strings.Join(values, ","))

// Get table ID for split.
dom := sessionctx.GetDomain(tk.Se)
is := dom.InfoSchema()
tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("earlyclose"))
c.Assert(err, IsNil)
tblID := tbl.Meta().ID

// Split the table.
s.cluster.SplitTable(s.mvccStore, tblID, 500)

for i := 0; i < 500; i++ {
rss, err := tk.Se.Execute("select * from earlyclose order by id")
c.Assert(err, IsNil)
rs := rss[0]
_, err = rs.Next()
c.Assert(err, IsNil)
rs.Close()
}
}
16 changes: 15 additions & 1 deletion store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,15 @@ func (it *copIterator) run(ctx goctx.Context) {
})
}

func recvFromRespCh(respCh <-chan copResponse, finished <-chan struct{}) (resp copResponse, ok bool, exit bool) {
select {
case resp, ok = <-respCh:
case <-finished:
exit = true
}
return
}

func (it *copIterator) sendToTaskCh(ctx goctx.Context, t *copTask, taskCh chan<- *copTask) (exit bool) {
select {
case taskCh <- t:
Expand Down Expand Up @@ -475,13 +484,18 @@ func (it *copIterator) Next() ([]byte, error) {
return nil, nil
}
} else {
var closed bool
for {
if it.curr >= len(it.tasks) {
// Resp will be nil if iterator is finished.
return nil, nil
}
task := it.tasks[it.curr]
resp, ok = <-task.respChan
resp, ok, closed = recvFromRespCh(task.respChan, it.finished)
if closed {
// Close() is already called, so Next() is invalid.
return nil, nil
}
if ok {
break
}
Expand Down

0 comments on commit 81f0318

Please sign in to comment.