Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix many leaks of the test case #26909

Merged
merged 3 commits into from
Aug 6, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1052,8 +1052,9 @@ func (s *testStateChangeSuite) TestParallelAlterModifyColumn(c *C) {
f := func(c *C, err1, err2 error) {
c.Assert(err1, IsNil)
c.Assert(err2, IsNil)
_, err := s.se.Execute(context.Background(), "select * from t")
rs, err := s.se.Execute(context.Background(), "select * from t")
c.Assert(err, IsNil)
c.Assert(rs[0].Close(), IsNil)
}
s.testControlParallelExecSQL(c, sql, sql, f)
}
Expand All @@ -1064,8 +1065,9 @@ func (s *testStateChangeSuite) TestParallelAddGeneratedColumnAndAlterModifyColum
f := func(c *C, err1, err2 error) {
c.Assert(err1, IsNil)
c.Assert(err2.Error(), Equals, "[ddl:8200]Unsupported modify column: oldCol is a dependent column 'a' for generated column")
_, err := s.se.Execute(context.Background(), "select * from t")
rs, err := s.se.Execute(context.Background(), "select * from t")
c.Assert(err, IsNil)
c.Assert(rs[0].Close(), IsNil)
}
s.testControlParallelExecSQL(c, sql1, sql2, f)
}
Expand All @@ -1076,8 +1078,9 @@ func (s *testStateChangeSuite) TestParallelAlterModifyColumnAndAddPK(c *C) {
f := func(c *C, err1, err2 error) {
c.Assert(err1, IsNil)
c.Assert(err2.Error(), Equals, "[ddl:8200]Unsupported modify column: this column has primary key flag")
_, err := s.se.Execute(context.Background(), "select * from t")
rs, err := s.se.Execute(context.Background(), "select * from t")
c.Assert(err, IsNil)
c.Assert(rs[0].Close(), IsNil)
}
s.testControlParallelExecSQL(c, sql1, sql2, f)
}
Expand Down Expand Up @@ -1361,12 +1364,24 @@ func (s *testStateChangeSuiteBase) testControlParallelExecSQL(c *C, sql1, sql2 s
wg.Add(2)
go func() {
defer wg.Done()
_, err1 = se.Execute(context.Background(), sql1)
var rss []sqlexec.RecordSet
rss, err1 = se.Execute(context.Background(), sql1)
if err1 == nil && len(rss) > 0 {
for _, rs := range rss {
c.Assert(rs.Close(), IsNil)
}
}
}()
go func() {
defer wg.Done()
<-ch
_, err2 = se1.Execute(context.Background(), sql2)
var rss []sqlexec.RecordSet
rss, err2 = se1.Execute(context.Background(), sql2)
if err2 == nil && len(rss) > 0 {
for _, rs := range rss {
c.Assert(rs.Close(), IsNil)
}
}
}()

wg.Wait()
Expand Down
12 changes: 6 additions & 6 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,12 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlex
ctx = opentracing.ContextWithSpan(ctx, span1)
}

var err error
defer func() {
terror.Log(e.Close())
a.logAudit()
}()

tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
// Check if "tidb_snapshot" is set for the write executors.
// In history read mode, we can not do write operations.
switch e.(type) {
Expand All @@ -581,12 +587,6 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlex
}
}

var err error
defer func() {
terror.Log(e.Close())
a.logAudit()
}()

err = Next(ctx, e, newFirstChunk(e))
if err != nil {
return nil, err
Expand Down
29 changes: 23 additions & 6 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -298,14 +299,17 @@ func (e *HashAggExec) Close() error {

// Open implements the Executor Open interface.
func (e *HashAggExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
failpoint.Inject("mockHashAggExecBaseExecutorOpenReturnedError", func(val failpoint.Value) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The failpoint is moved before the Open, because after calling Open and then error, the children Executor is not closed.

if val.(bool) {
failpoint.Return(errors.New("mock HashAggExec.baseExecutor.Open returned error"))
}
})

if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
// If panic here, the children executor should be closed because they are open.
defer closeBaseExecutor(&e.baseExecutor)
e.prepared = false

e.memTracker = memory.NewTracker(e.id, -1)
Expand Down Expand Up @@ -344,6 +348,15 @@ func (e *HashAggExec) initForUnparallelExec() {
}
}

func closeBaseExecutor(b *baseExecutor) {
if r := recover(); r != nil {
// Release the resource, but throw the panic again and let the top level handle it.
terror.Log(b.Close())
logutil.BgLogger().Warn("panic in Open(), close base executor and throw exception again")
panic(r)
}
}

func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
sessionVars := e.ctx.GetSessionVars()
finalConcurrency := sessionVars.HashAggFinalConcurrency()
Expand Down Expand Up @@ -1218,14 +1231,18 @@ type StreamAggExec struct {

// Open implements the Executor Open interface.
func (e *StreamAggExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
failpoint.Inject("mockStreamAggExecBaseExecutorOpenReturnedError", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(errors.New("mock StreamAggExec.baseExecutor.Open returned error"))
}
})

if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
// If panic in Open, the children executor should be closed because they are open.
defer closeBaseExecutor(&e.baseExecutor)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the panic in Open case, we do not handle the resource release well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually here is not just a problem in the test code, it's also a bug of the product.

set @@tidb_memtracker_size = xxx;
insert into select * from t1 where a in select b from t2 ... 
// If Out of memory, panic in Open, then there will be goroutine leak 


e.childResult = newFirstChunk(e.children[0])
e.executed = false
e.isChildReturnEmpty = true
Expand Down
2 changes: 1 addition & 1 deletion executor/executor_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
"github.com/pingcap/tidb/util/tableutil"
)

var _ = Suite(&testExecSuite{})
var _ = SerialSuites(&testExecSuite{})
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
var _ = SerialSuites(&testExecSerialSuite{})

// Note: it's a tricky way to export the `inspectionSummaryRules` and `inspectionRules` for unit test but invisible for normal code
Expand Down
4 changes: 4 additions & 0 deletions executor/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func (e *ExplainExec) Open(ctx context.Context) error {
// Close implements the Executor Close interface.
func (e *ExplainExec) Close() error {
e.rows = nil
if e.analyzeExec != nil && !e.executed {
// Open(), but Next() is not called.
return e.analyzeExec.Close()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some test code run the following logic:

rs, err := Execute("explan analyze xxxxx")

// Next() is not called

rs.Close()

And the analyzeExec is not released ...
Fix for it.

}
return nil
}

Expand Down
17 changes: 13 additions & 4 deletions executor/seqtest/prepared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,22 @@ func (s *seqTestSuite) TestPrepared(c *C) {
tk.MustExec("create table prepare_test (id int PRIMARY KEY AUTO_INCREMENT, c1 int, c2 int, c3 int default 1)")
tk.MustExec("insert prepare_test (c1) values (1),(2),(NULL)")

tk.MustExec(`prepare stmt_test_1 from 'select id from prepare_test where id > ?'; set @a = 1; execute stmt_test_1 using @a;`)
tk.MustExec(`prepare stmt_test_1 from 'select id from prepare_test where id > ?';`)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple statement is ban in the real code.
Some test code are still using them ... and the resulted record set seems not to be closed correctly.

tk.MustExec(`set @a = 1;`)
tk.MustExec(`execute stmt_test_1 using @a;`)
tk.MustExec(`prepare stmt_test_2 from 'select 1'`)
// Prepare multiple statement is not allowed.
_, err = tk.Exec(`prepare stmt_test_3 from 'select id from prepare_test where id > ?;select id from prepare_test where id > ?;'`)
c.Assert(executor.ErrPrepareMulti.Equal(err), IsTrue)

// The variable count does not match.
_, err = tk.Exec(`prepare stmt_test_4 from 'select id from prepare_test where id > ? and id < ?'; set @a = 1; execute stmt_test_4 using @a;`)
tk.MustExec(`prepare stmt_test_4 from 'select id from prepare_test where id > ? and id < ?';`)
tk.MustExec(`set @a = 1;`)
_, err = tk.Exec(`execute stmt_test_4 using @a;`)
c.Assert(plannercore.ErrWrongParamCount.Equal(err), IsTrue)
// Prepare and deallocate prepared statement immediately.
tk.MustExec(`prepare stmt_test_5 from 'select id from prepare_test where id > ?'; deallocate prepare stmt_test_5;`)
tk.MustExec(`prepare stmt_test_5 from 'select id from prepare_test where id > ?';`)
tk.MustExec(`deallocate prepare stmt_test_5;`)

// Statement not found.
_, err = tk.Exec("deallocate prepare stmt_test_5")
Expand Down Expand Up @@ -166,8 +172,11 @@ func (s *seqTestSuite) TestPrepared(c *C) {
c.Assert(err, IsNil)

// Should success as the changed schema do not affect the prepared statement.
_, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{types.NewDatum(1)})
rs, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{types.NewDatum(1)})
c.Assert(err, IsNil)
if rs != nil {
rs.Close()
}

// Drop a column so the prepared statement become invalid.
query = "select c1, c2 from prepare_test where c1 = ?"
Expand Down
11 changes: 6 additions & 5 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,15 @@ func (s *testStaleTxnSerialSuite) TestSelectAsOf(c *C) {
} else if testcase.preSec > 0 {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/assertStaleTSOWithTolerance", fmt.Sprintf(`return(%d)`, time.Now().Unix()-testcase.preSec)), IsNil)
}
_, err := tk.Exec(testcase.sql)
rs, err := tk.Exec(testcase.sql)
if len(testcase.errorStr) != 0 {
c.Assert(err, ErrorMatches, testcase.errorStr)
continue
}
c.Assert(err, IsNil, Commentf("sql:%s, error stack %v", testcase.sql, errors.ErrorStack(err)))
if rs != nil {
rs.Close()
}
if testcase.expectPhysicalTS > 0 {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/assertStaleTSO"), IsNil)
} else if testcase.preSec > 0 {
Expand Down Expand Up @@ -696,8 +699,7 @@ func (s *testStaleTxnSerialSuite) TestValidateReadOnlyInStalenessTransaction(c *
c.Log(testcase.name)
tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP NOW(3);`)
if testcase.isValidate {
_, err := tk.Exec(testcase.sql)
c.Assert(err, IsNil)
tk.MustExec(testcase.sql)
} else {
err := tk.ExecToErr(testcase.sql)
c.Assert(err, NotNil)
Expand All @@ -706,8 +708,7 @@ func (s *testStaleTxnSerialSuite) TestValidateReadOnlyInStalenessTransaction(c *
tk.MustExec("commit")
tk.MustExec("set transaction read only as of timestamp NOW(3);")
if testcase.isValidate {
_, err := tk.Exec(testcase.sql)
c.Assert(err, IsNil)
tk.MustExec(testcase.sql)
} else {
err := tk.ExecToErr(testcase.sql)
c.Assert(err, NotNil)
Expand Down
5 changes: 2 additions & 3 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,7 @@ func (s *testIntegrationSerialSuite) TestNoneAccessPathsFoundByIsolationRead(c *
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int primary key)")

_, err := tk.Exec("select * from t")
c.Assert(err, IsNil)
tk.MustExec("select * from t")

tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'")

Expand All @@ -360,7 +359,7 @@ func (s *testIntegrationSerialSuite) TestNoneAccessPathsFoundByIsolationRead(c *
"TableReader 10000.00 root data:TableFullScan",
"└─TableFullScan 10000.00 cop[tikv] table:stats_meta keep order:false, stats:pseudo"))

_, err = tk.Exec("select * from t")
_, err := tk.Exec("select * from t")
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[planner:1815]Internal : Can not find access path matching 'tidb_isolation_read_engines'(value: 'tiflash'). Available values are 'tikv'.")

Expand Down
4 changes: 1 addition & 3 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -1378,9 +1378,6 @@ func upgradeToVer67(s Session, ver int64) {
if err != nil {
logutil.BgLogger().Fatal("upgradeToVer67 error", zap.Error(err))
}
if rs != nil {
defer terror.Call(rs.Close)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The defer close the result too late, do it explicitly.

func() {
    t1, err := Execute("sql1 ...")
    defer t1.Close()

    t2, err := Execute("sql2" ...)
    // t1 is not closed, so statement context is still in use 
    // and we will have trouble reuse the statement context.
}

}
req := rs.NewChunk()
iter := chunk.NewIterator4Chunk(req)
p := parser.New()
Expand All @@ -1395,6 +1392,7 @@ func upgradeToVer67(s Session, ver int64) {
}
updateBindInfo(iter, p, bindMap)
}
terror.Call(rs.Close)

mustExecute(s, "DELETE FROM mysql.bind_info where source != 'builtin'")
for original, bind := range bindMap {
Expand Down
12 changes: 8 additions & 4 deletions session/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,14 @@ func (s *testBootstrapSuite) TestBootstrap(c *C) {
c.Assert(se.Auth(&auth.UserIdentity{Username: "root", Hostname: "anyhost"}, []byte(""), []byte("")), IsTrue)
mustExecSQL(c, se, "USE test;")
// Check privilege tables.
mustExecSQL(c, se, "SELECT * from mysql.global_priv;")
mustExecSQL(c, se, "SELECT * from mysql.db;")
mustExecSQL(c, se, "SELECT * from mysql.tables_priv;")
mustExecSQL(c, se, "SELECT * from mysql.columns_priv;")
rs := mustExecSQL(c, se, "SELECT * from mysql.global_priv;")
c.Assert(rs.Close(), IsNil)
rs = mustExecSQL(c, se, "SELECT * from mysql.db;")
c.Assert(rs.Close(), IsNil)
rs = mustExecSQL(c, se, "SELECT * from mysql.tables_priv;")
c.Assert(rs.Close(), IsNil)
rs = mustExecSQL(c, se, "SELECT * from mysql.columns_priv;")
c.Assert(rs.Close(), IsNil)
// Check privilege tables.
r = mustExecSQL(c, se, "SELECT COUNT(*) from mysql.global_variables;")
c.Assert(r, NotNil)
Expand Down
20 changes: 15 additions & 5 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1328,8 +1328,9 @@ func (s *testSessionSuite) TestPrepare(c *C) {
c.Assert(id, Equals, uint32(1))
c.Assert(ps, Equals, 1)
tk.MustExec(`set @a=1`)
_, err = tk.Se.ExecutePreparedStmt(ctx, id, []types.Datum{types.NewDatum("1")})
rs, err := tk.Se.ExecutePreparedStmt(ctx, id, []types.Datum{types.NewDatum("1")})
c.Assert(err, IsNil)
rs.Close()
err = tk.Se.DropPreparedStmt(id)
c.Assert(err, IsNil)

Expand All @@ -1349,7 +1350,7 @@ func (s *testSessionSuite) TestPrepare(c *C) {
tk.MustExec("insert multiexec values (1, 1), (2, 2)")
id, _, _, err = tk.Se.PrepareStmt("select a from multiexec where b = ? order by b")
c.Assert(err, IsNil)
rs, err := tk.Se.ExecutePreparedStmt(ctx, id, []types.Datum{types.NewDatum(1)})
rs, err = tk.Se.ExecutePreparedStmt(ctx, id, []types.Datum{types.NewDatum(1)})
c.Assert(err, IsNil)
rs.Close()
rs, err = tk.Se.ExecutePreparedStmt(ctx, id, []types.Datum{types.NewDatum(2)})
Expand Down Expand Up @@ -1963,17 +1964,26 @@ func (s *testSessionSuite3) TestCaseInsensitive(c *C) {

tk.MustExec("create table T (a text, B int)")
tk.MustExec("insert t (A, b) values ('aaa', 1)")
rs, _ := tk.Exec("select * from t")
rs, err := tk.Exec("select * from t")
c.Assert(err, IsNil)
fields := rs.Fields()
c.Assert(fields[0].ColumnAsName.O, Equals, "a")
c.Assert(fields[1].ColumnAsName.O, Equals, "B")
rs, _ = tk.Exec("select A, b from t")
rs.Close()

rs, err = tk.Exec("select A, b from t")
c.Assert(err, IsNil)
fields = rs.Fields()
c.Assert(fields[0].ColumnAsName.O, Equals, "A")
c.Assert(fields[1].ColumnAsName.O, Equals, "b")
rs, _ = tk.Exec("select a as A from t where A > 0")
rs.Close()

rs, err = tk.Exec("select a as A from t where A > 0")
c.Assert(err, IsNil)
fields = rs.Fields()
c.Assert(fields[0].ColumnAsName.O, Equals, "A")
rs.Close()

tk.MustExec("update T set b = B + 1")
tk.MustExec("update T set B = b + 1")
tk.MustQuery("select b from T").Check(testkit.Rows("3"))
Expand Down