diff --git a/server/conn_stmt.go b/server/conn_stmt.go index 283437483c149..f33216c4cb240 100644 --- a/server/conn_stmt.go +++ b/server/conn_stmt.go @@ -233,42 +233,13 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e // Currently the first return value is used to fallback to TiKV when TiFlash is down. func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stmt PreparedStatement, args []types.Datum, useCursor bool) (bool, error) { vars := (&cc.ctx).GetSessionVars() -<<<<<<< HEAD rs, err := stmt.Execute(ctx, args) -======= - prepStmt, err := vars.GetPreparedStmtByID(uint32(stmt.ID())) - if err != nil { - return true, errors.Annotate(err, cc.preparedStmt2String(uint32(stmt.ID()))) - } - execStmt := &ast.ExecuteStmt{ - BinaryArgs: args, - PrepStmt: prepStmt, - } - - // For the combination of `ComPrepare` and `ComExecute`, the statement name is stored in the client side, and the - // TiDB only has the ID, so don't try to construct an `EXECUTE SOMETHING`. Use the original prepared statement here - // instead. - sql := "" - planCacheStmt, ok := prepStmt.(*plannercore.PlanCacheStmt) - if ok { - sql = planCacheStmt.StmtText - } - execStmt.SetText(charset.EncodingUTF8Impl, sql) - rs, err := (&cc.ctx).ExecuteStmt(ctx, execStmt) if rs != nil { defer terror.Call(rs.Close) } ->>>>>>> d41793e9691 (server: fix memtracker leak with cursor (#44257)) if err != nil { - // If error is returned during the planner phase or the executor.Open - // phase, the rs will be nil, and StmtCtx.MemTracker StmtCtx.DiskTracker - // will not be detached. We need to detach them manually. - if sv := cc.ctx.GetSessionVars(); sv != nil && sv.StmtCtx != nil { - sv.StmtCtx.DetachMemDiskTracker() - } return true, errors.Annotate(err, cc.preparedStmt2String(uint32(stmt.ID()))) } - if rs == nil { if useCursor { vars.SetStatusFlag(mysql.ServerStatusCursorExists, false) @@ -316,26 +287,10 @@ func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stm cl.OnFetchReturned() } -<<<<<<< HEAD - // as the `Next` of `ResultSet` will never be called, all rows have been cached inside it. We could close this - // `ResultSet`. - err = rs.Close() - if err != nil { - return false, err - } -======= - stmt.SetCursorActive(true) ->>>>>>> d41793e9691 (server: fix memtracker leak with cursor (#44257)) - // explicitly flush columnInfo to client. return false, cc.flush(ctx) } -<<<<<<< HEAD - defer terror.Call(rs.Close) retryable, err := cc.writeResultset(ctx, rs, true, 0, 0) -======= - retryable, err := cc.writeResultSet(ctx, rs, true, cc.ctx.Status(), 0) ->>>>>>> d41793e9691 (server: fix memtracker leak with cursor (#44257)) if err != nil { return retryable, errors.Annotate(err, cc.preparedStmt2String(uint32(stmt.ID()))) } diff --git a/server/conn_stmt_test.go b/server/conn_stmt_test.go index ee9470873177d..7773f8c8903fa 100644 --- a/server/conn_stmt_test.go +++ b/server/conn_stmt_test.go @@ -18,7 +18,6 @@ import ( "bytes" "context" "encoding/binary" - "fmt" "testing" "github.com/pingcap/tidb/parser/mysql" @@ -372,55 +371,3 @@ func TestCursorWithParams(t *testing.T) { 0x0, 0x1, 0x0, 0x0, ))) } - -func TestCursorDetachMemTracker(t *testing.T) { - store, dom := testkit.CreateMockStoreAndDomain(t) - srv := CreateMockServer(t, store) - srv.SetDomain(dom) - defer srv.Close() - - appendUint32 := binary.LittleEndian.AppendUint32 - ctx := context.Background() - c := CreateMockConn(t, srv).(*mockConn) - - tk := testkit.NewTestKitWithSession(t, store, c.Context().Session) - tk.MustExec("use test") - tk.MustExec("drop table if exists t") - tk.MustExec("create table t(id_1 int, id_2 int)") - tk.MustExec("insert into t values (1, 1), (1, 2)") - tk.MustExec("set global tidb_mem_oom_action = 'CANCEL'") - defer tk.MustExec("set global tidb_mem_oom_action= DEFAULT") - // TODO: find whether it's expected to have one child at the beginning - require.Len(t, tk.Session().GetSessionVars().MemTracker.GetChildrenForTest(), 1) - - // execute a normal statement, it'll success - stmt, _, _, err := c.Context().Prepare("select count(id_2) from t") - require.NoError(t, err) - - require.NoError(t, c.Dispatch(ctx, append( - appendUint32([]byte{mysql.ComStmtExecute}, uint32(stmt.ID())), - mysql.CursorTypeReadOnly, 0x1, 0x0, 0x0, 0x0, - ))) - maxConsumed := tk.Session().GetSessionVars().MemTracker.MaxConsumed() - - // testkit also uses `PREPARE` related calls to run statement with arguments. - // format the SQL to avoid the interference from testkit. - tk.MustExec(fmt.Sprintf("set tidb_mem_quota_query=%d", maxConsumed/2)) - require.Len(t, tk.Session().GetSessionVars().MemTracker.GetChildrenForTest(), 1) - // This query should exceed the memory limitation during `openExecutor` - require.Error(t, c.Dispatch(ctx, append( - appendUint32([]byte{mysql.ComStmtExecute}, uint32(stmt.ID())), - mysql.CursorTypeReadOnly, 0x1, 0x0, 0x0, 0x0, - ))) - require.Len(t, tk.Session().GetSessionVars().MemTracker.GetChildrenForTest(), 1) - - // The next query should succeed - tk.MustExec(fmt.Sprintf("set tidb_mem_quota_query=%d", maxConsumed+1)) - require.Len(t, tk.Session().GetSessionVars().MemTracker.GetChildrenForTest(), 1) - // This query should succeed - require.NoError(t, c.Dispatch(ctx, append( - appendUint32([]byte{mysql.ComStmtExecute}, uint32(stmt.ID())), - mysql.CursorTypeReadOnly, 0x1, 0x0, 0x0, 0x0, - ))) - require.Len(t, tk.Session().GetSessionVars().MemTracker.GetChildrenForTest(), 1) -}