Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
wshwsh12 committed Jun 20, 2023
1 parent 8bc3826 commit c7a69b4
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 98 deletions.
45 changes: 0 additions & 45 deletions server/conn_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,42 +233,13 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e
// Currently the first return value is used to fallback to TiKV when TiFlash is down.
func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stmt PreparedStatement, args []types.Datum, useCursor bool) (bool, error) {
vars := (&cc.ctx).GetSessionVars()
<<<<<<< HEAD
rs, err := stmt.Execute(ctx, args)
=======
prepStmt, err := vars.GetPreparedStmtByID(uint32(stmt.ID()))
if err != nil {
return true, errors.Annotate(err, cc.preparedStmt2String(uint32(stmt.ID())))
}
execStmt := &ast.ExecuteStmt{
BinaryArgs: args,
PrepStmt: prepStmt,
}

// For the combination of `ComPrepare` and `ComExecute`, the statement name is stored in the client side, and the
// TiDB only has the ID, so don't try to construct an `EXECUTE SOMETHING`. Use the original prepared statement here
// instead.
sql := ""
planCacheStmt, ok := prepStmt.(*plannercore.PlanCacheStmt)
if ok {
sql = planCacheStmt.StmtText
}
execStmt.SetText(charset.EncodingUTF8Impl, sql)
rs, err := (&cc.ctx).ExecuteStmt(ctx, execStmt)
if rs != nil {
defer terror.Call(rs.Close)
}
>>>>>>> d41793e9691 (server: fix memtracker leak with cursor (#44257))
if err != nil {
// If error is returned during the planner phase or the executor.Open
// phase, the rs will be nil, and StmtCtx.MemTracker StmtCtx.DiskTracker
// will not be detached. We need to detach them manually.
if sv := cc.ctx.GetSessionVars(); sv != nil && sv.StmtCtx != nil {
sv.StmtCtx.DetachMemDiskTracker()
}
return true, errors.Annotate(err, cc.preparedStmt2String(uint32(stmt.ID())))
}

if rs == nil {
if useCursor {
vars.SetStatusFlag(mysql.ServerStatusCursorExists, false)
Expand Down Expand Up @@ -316,26 +287,10 @@ func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stm
cl.OnFetchReturned()
}

<<<<<<< HEAD
// as the `Next` of `ResultSet` will never be called, all rows have been cached inside it. We could close this
// `ResultSet`.
err = rs.Close()
if err != nil {
return false, err
}
=======
stmt.SetCursorActive(true)
>>>>>>> d41793e9691 (server: fix memtracker leak with cursor (#44257))

// explicitly flush columnInfo to client.
return false, cc.flush(ctx)
}
<<<<<<< HEAD
defer terror.Call(rs.Close)
retryable, err := cc.writeResultset(ctx, rs, true, 0, 0)
=======
retryable, err := cc.writeResultSet(ctx, rs, true, cc.ctx.Status(), 0)
>>>>>>> d41793e9691 (server: fix memtracker leak with cursor (#44257))
if err != nil {
return retryable, errors.Annotate(err, cc.preparedStmt2String(uint32(stmt.ID())))
}
Expand Down
53 changes: 0 additions & 53 deletions server/conn_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"bytes"
"context"
"encoding/binary"
"fmt"
"testing"

"github.com/pingcap/tidb/parser/mysql"
Expand Down Expand Up @@ -372,55 +371,3 @@ func TestCursorWithParams(t *testing.T) {
0x0, 0x1, 0x0, 0x0,
)))
}

func TestCursorDetachMemTracker(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
srv := CreateMockServer(t, store)
srv.SetDomain(dom)
defer srv.Close()

appendUint32 := binary.LittleEndian.AppendUint32
ctx := context.Background()
c := CreateMockConn(t, srv).(*mockConn)

tk := testkit.NewTestKitWithSession(t, store, c.Context().Session)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(id_1 int, id_2 int)")
tk.MustExec("insert into t values (1, 1), (1, 2)")
tk.MustExec("set global tidb_mem_oom_action = 'CANCEL'")
defer tk.MustExec("set global tidb_mem_oom_action= DEFAULT")
// TODO: find whether it's expected to have one child at the beginning
require.Len(t, tk.Session().GetSessionVars().MemTracker.GetChildrenForTest(), 1)

// execute a normal statement, it'll success
stmt, _, _, err := c.Context().Prepare("select count(id_2) from t")
require.NoError(t, err)

require.NoError(t, c.Dispatch(ctx, append(
appendUint32([]byte{mysql.ComStmtExecute}, uint32(stmt.ID())),
mysql.CursorTypeReadOnly, 0x1, 0x0, 0x0, 0x0,
)))
maxConsumed := tk.Session().GetSessionVars().MemTracker.MaxConsumed()

// testkit also uses `PREPARE` related calls to run statement with arguments.
// format the SQL to avoid the interference from testkit.
tk.MustExec(fmt.Sprintf("set tidb_mem_quota_query=%d", maxConsumed/2))
require.Len(t, tk.Session().GetSessionVars().MemTracker.GetChildrenForTest(), 1)
// This query should exceed the memory limitation during `openExecutor`
require.Error(t, c.Dispatch(ctx, append(
appendUint32([]byte{mysql.ComStmtExecute}, uint32(stmt.ID())),
mysql.CursorTypeReadOnly, 0x1, 0x0, 0x0, 0x0,
)))
require.Len(t, tk.Session().GetSessionVars().MemTracker.GetChildrenForTest(), 1)

// The next query should succeed
tk.MustExec(fmt.Sprintf("set tidb_mem_quota_query=%d", maxConsumed+1))
require.Len(t, tk.Session().GetSessionVars().MemTracker.GetChildrenForTest(), 1)
// This query should succeed
require.NoError(t, c.Dispatch(ctx, append(
appendUint32([]byte{mysql.ComStmtExecute}, uint32(stmt.ID())),
mysql.CursorTypeReadOnly, 0x1, 0x0, 0x0, 0x0,
)))
require.Len(t, tk.Session().GetSessionVars().MemTracker.GetChildrenForTest(), 1)
}

0 comments on commit c7a69b4

Please sign in to comment.