From 17c3e3d8a23543d5e9375f2584faf17cb24b05f2 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 11 Jan 2018 22:46:58 +0800 Subject: [PATCH 1/2] store/tikv: call Next() after copIterator Closed lead to goroutine leak After Close(), worker groutine receive from copIterator.finished and exit directly, without writing any thing to taskCh. Next() receives from taskCh and may hang forever, cause the caller goroutine leak. --- store/tikv/coprocessor.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index b828c5c1d00da..bfa6406972cd1 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -393,6 +393,15 @@ func (it *copIterator) run(ctx goctx.Context) { }) } +func recvFromRespCh(respCh <-chan copResponse, finished <-chan struct{}) (resp copResponse, ok bool, exit bool) { + select { + case resp, ok = <-respCh: + case <-finished: + exit = true + } + return +} + func (it *copIterator) sendToTaskCh(ctx goctx.Context, t *copTask, taskCh chan<- *copTask) (finished bool, canceled bool) { select { case taskCh <- t: @@ -421,13 +430,18 @@ func (it *copIterator) Next() ([]byte, error) { return nil, nil } } else { + var closed bool for { if it.curr >= len(it.tasks) { // Resp will be nil if iterator is finished. return nil, nil } task := it.tasks[it.curr] - resp, ok = <-task.respChan + resp, ok, closed = recvFromRespCh(task.respChan, it.finished) + if closed { + // Close() is already called, so Next() is invalid. + return nil, nil + } if ok { break } From 4ea6d210c7aa73c991889a8ac71664c3c4fd0615 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 15 Jan 2018 18:20:51 +0800 Subject: [PATCH 2/2] add tests --- executor/executor_test.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/executor/executor_test.go b/executor/executor_test.go index 47d7d887edb63..9bb4ad288eecf 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -17,6 +17,7 @@ import ( "flag" "fmt" "os" + "strings" "sync" "testing" "time" @@ -2302,3 +2303,35 @@ func (s *testSuite) TestMaxInt64Handle(c *C) { tk.MustExec("delete from t where id = 9223372036854775807") tk.MustQuery("select * from t").Check(nil) } + +func (s *testSuite) TestEarlyClose(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("create table earlyclose (id int primary key)") + + // Insert 1000 rows. + var values []string + for i := 0; i < 1000; i++ { + values = append(values, fmt.Sprintf("(%d)", i)) + } + tk.MustExec("insert earlyclose values " + strings.Join(values, ",")) + + // Get table ID for split. + dom := sessionctx.GetDomain(tk.Se) + is := dom.InfoSchema() + tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("earlyclose")) + c.Assert(err, IsNil) + tblID := tbl.Meta().ID + + // Split the table. + s.cluster.SplitTable(s.mvccStore, tblID, 500) + + for i := 0; i < 500; i++ { + rss, err := tk.Se.Execute("select * from earlyclose order by id") + c.Assert(err, IsNil) + rs := rss[0] + _, err = rs.Next() + c.Assert(err, IsNil) + rs.Close() + } +}