Skip to content

Commit

Permalink
server: implement spill disk for cursorFetch result (pingcap#45163) (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jul 31, 2023
1 parent e44839c commit 4a18a3d
Show file tree
Hide file tree
Showing 10 changed files with 478 additions and 137 deletions.
2 changes: 2 additions & 0 deletions expression/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -1567,6 +1567,8 @@ func Args2Expressions4Test(args ...interface{}) []Expression {
ft = types.NewFieldType(mysql.TypeVarString)
case types.KindMysqlTime:
ft = types.NewFieldType(mysql.TypeTimestamp)
case types.KindBytes:
ft = types.NewFieldType(mysql.TypeBlob)
default:
exprs[i] = nil
continue
Expand Down
2 changes: 1 addition & 1 deletion parser/mysql/errname.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ var MySQLErrName = map[uint16]*ErrMessage{
ErrKeyRefDoNotMatchTableRef: Message("Key reference and table reference don't match", nil),
ErrOperandColumns: Message("Operand should contain %d column(s)", nil),
ErrSubqueryNo1Row: Message("Subquery returns more than 1 row", nil),
ErrUnknownStmtHandler: Message("Unknown prepared statement handler (%.*s) given to %s", nil),
ErrUnknownStmtHandler: Message("Unknown prepared statement handler %s given to %s", nil),
ErrCorruptHelpDB: Message("Help database is corrupt or does not exist", nil),
ErrCyclicReference: Message("Cyclic reference on subqueries", nil),
ErrAutoConvert: Message("Converting column '%s' from %s to %s", nil),
Expand Down
67 changes: 35 additions & 32 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2277,7 +2277,12 @@ func (cc *clientConn) writeResultset(ctx context.Context, rs ResultSet, binary b
cc.initResultEncoder(ctx)
defer cc.rsEncoder.clean()
if mysql.HasCursorExistsFlag(serverStatus) {
if err := cc.writeChunksWithFetchSize(ctx, rs, serverStatus, fetchSize); err != nil {
crs, ok := rs.(cursorResultSet)
if !ok {
// this branch is actually unreachable
return false, errors.New("this cursor is not a resultSet")
}
if err := cc.writeChunksWithFetchSize(ctx, crs, serverStatus, fetchSize); err != nil {
return false, err
}
return false, cc.flush(ctx)
Expand Down Expand Up @@ -2411,43 +2416,27 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool
// binary specifies the way to dump data. It throws any error while dumping data.
// serverStatus, a flag bit represents server information.
// fetchSize, the desired number of rows to be fetched each time when client uses cursor.
func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs ResultSet, serverStatus uint16, fetchSize int) error {
fetchedRows := rs.GetFetchedRows()

// tell the client COM_STMT_FETCH has finished by setting proper serverStatus,
// and close ResultSet.
if len(fetchedRows) == 0 {
serverStatus &^= mysql.ServerStatusCursorExists
serverStatus |= mysql.ServerStatusLastRowSend
return cc.writeEOF(ctx, serverStatus)
}

// construct the rows sent to the client according to fetchSize.
var curRows []chunk.Row
if fetchSize < len(fetchedRows) {
curRows = fetchedRows[:fetchSize]
fetchedRows = fetchedRows[fetchSize:]
} else {
curRows = fetchedRows
fetchedRows = fetchedRows[:0]
}
rs.StoreFetchedRows(fetchedRows)

func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs cursorResultSet, serverStatus uint16, fetchSize int) error {
var (
stmtDetail *execdetails.StmtExecDetails
err error
start time.Time
)
data := cc.alloc.AllocWithLen(4, 1024)
var stmtDetail *execdetails.StmtExecDetails
stmtDetailRaw := ctx.Value(execdetails.StmtExecDetailKey)
if stmtDetailRaw != nil {
//nolint:forcetypeassert
stmtDetail = stmtDetailRaw.(*execdetails.StmtExecDetails)
}
var (
err error
start time.Time
)
if stmtDetail != nil {
start = time.Now()
}
for _, row := range curRows {

iter := rs.GetRowContainerReader()
// send the rows to the client according to fetchSize.
for i := 0; i < fetchSize && iter.Current() != iter.End(); i++ {
row := iter.Current()

data = data[0:4]
data, err = dumpBinaryRow(data, rs.Columns(), row, cc.rsEncoder)
if err != nil {
Expand All @@ -2456,16 +2445,30 @@ func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs ResultSet
if err = cc.writePacket(data); err != nil {
return err
}

iter.Next()
}
if iter.Error() != nil {
return iter.Error()
}

// tell the client COM_STMT_FETCH has finished by setting proper serverStatus,
// and close ResultSet.
if iter.Current() == iter.End() {
serverStatus &^= mysql.ServerStatusCursorExists
serverStatus |= mysql.ServerStatusLastRowSend
}

// don't include the time consumed by `cl.OnFetchReturned()` in the `WriteSQLRespDuration`
if stmtDetail != nil {
stmtDetail.WriteSQLRespDuration += time.Since(start)
}

if cl, ok := rs.(fetchNotifier); ok {
cl.OnFetchReturned()
}
if stmtDetail != nil {
start = time.Now()
}

start = time.Now()
err = cc.writeEOF(ctx, serverStatus)
if stmtDetail != nil {
stmtDetail.WriteSQLRespDuration += time.Since(start)
Expand Down
166 changes: 134 additions & 32 deletions server/conn_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
Expand All @@ -53,15 +54,19 @@ import (
"github.com/pingcap/tidb/parser/terror"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
storeerr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/topsql"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)

func (cc *clientConn) handleStmtPrepare(ctx context.Context, sql string) error {
Expand Down Expand Up @@ -203,7 +208,11 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e
}

err = parseExecArgs(cc.ctx.GetSessionVars().StmtCtx, args, stmt.BoundParams(), nullBitmaps, stmt.GetParamsType(), paramValues, cc.inputDecoder)
stmt.Reset()
// This `.Reset` resets the arguments, so it's fine to just ignore the error (and the it'll be reset again in the following routine)
errReset := stmt.Reset()
if errReset != nil {
logutil.Logger(ctx).Warn("fail to reset statement in EXECUTE command", zap.Error(errReset))
}
if err != nil {
return errors.Annotate(err, cc.preparedStmt2String(stmtID))
}
Expand Down Expand Up @@ -265,6 +274,26 @@ func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stm
PrepStmt: prepStmt,
}

// first, try to clear the left cursor if there is one
if useCursor && stmt.GetCursorActive() {
if stmt.GetResultSet() != nil && stmt.GetResultSet().GetRowContainerReader() != nil {
stmt.GetResultSet().GetRowContainerReader().Close()
}
if stmt.GetRowContainer() != nil {
stmt.GetRowContainer().GetMemTracker().Detach()
stmt.GetRowContainer().GetDiskTracker().Detach()
err := stmt.GetRowContainer().Close()
if err != nil {
logutil.Logger(ctx).Error(
"Fail to close rowContainer before executing statement. May cause resource leak",
zap.Error(err))
}
stmt.StoreRowContainer(nil)
}
stmt.StoreResultSet(nil)
stmt.SetCursorActive(false)
}

// For the combination of `ComPrepare` and `ComExecute`, the statement name is stored in the client side, and the
// TiDB only has the ID, so don't try to construct an `EXECUTE SOMETHING`. Use the original prepared statement here
// instead.
Expand Down Expand Up @@ -306,42 +335,83 @@ func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stm
// we should hold the ResultSet in PreparedStatement for next stmt_fetch, and only send back ColumnInfo.
// Tell the client cursor exists in server by setting proper serverStatus.
if useCursor {
crs := wrapWithCursor(rs)

cc.initResultEncoder(ctx)
defer cc.rsEncoder.clean()
// fetch all results of the resultSet, and stored them locally, so that the future `FETCH` command can read
// the rows directly to avoid running executor and accessing shared params/variables in the session
// NOTE: chunk should not be allocated from the connection allocator, which will reset after executing this command
// but the rows are still needed in the following FETCH command.
//
// TODO: trace the memory used here
chk := rs.NewChunk(nil)
var rows []chunk.Row

// create the row container to manage spill
// this `rowContainer` will be released when the statement (or the connection) is closed.
rowContainer := chunk.NewRowContainer(crs.FieldTypes(), vars.MaxChunkSize)
rowContainer.GetMemTracker().AttachTo(vars.MemTracker)
rowContainer.GetMemTracker().SetLabel(memory.LabelForCursorFetch)
rowContainer.GetDiskTracker().AttachTo(vars.DiskTracker)
rowContainer.GetDiskTracker().SetLabel(memory.LabelForCursorFetch)
if variable.EnableTmpStorageOnOOM.Load() {
failpoint.Inject("testCursorFetchSpill", func(val failpoint.Value) {
if val, ok := val.(bool); val && ok {
actionSpill := rowContainer.ActionSpillForTest()
defer actionSpill.WaitForTest()
}
})
action := memory.NewActionWithPriority(rowContainer.ActionSpill(), memory.DefCursorFetchSpillPriority)
vars.MemTracker.FallbackOldAndSetNewAction(action)
}
defer func() {
if err != nil {
rowContainer.GetMemTracker().Detach()
rowContainer.GetDiskTracker().Detach()
errCloseRowContainer := rowContainer.Close()
if errCloseRowContainer != nil {
logutil.Logger(ctx).Error("Fail to close rowContainer in error handler. May cause resource leak",
zap.NamedError("original-error", err), zap.NamedError("close-error", errCloseRowContainer))
}
}
}()

for {
if err = rs.Next(ctx, chk); err != nil {
chk := crs.NewChunk(nil)

if err = crs.Next(ctx, chk); err != nil {
return false, err
}
rowCount := chk.NumRows()
if rowCount == 0 {
break
}
// filling fetchedRows with chunk
for i := 0; i < rowCount; i++ {
row := chk.GetRow(i)
rows = append(rows, row)

err = rowContainer.Add(chk)
if err != nil {
return false, err
}
chk = chunk.Renew(chk, vars.MaxChunkSize)
}
rs.StoreFetchedRows(rows)

stmt.StoreResultSet(rs)
if err = cc.writeColumnInfo(rs.Columns()); err != nil {
return false, err
}
if cl, ok := rs.(fetchNotifier); ok {
reader := chunk.NewRowContainerReader(rowContainer)
crs.StoreRowContainerReader(reader)
stmt.StoreResultSet(crs)
stmt.StoreRowContainer(rowContainer)
if cl, ok := crs.(fetchNotifier); ok {
cl.OnFetchReturned()
}

stmt.SetCursorActive(true)
defer func() {
if err != nil {
reader.Close()

// the resultSet and rowContainer have been closed in former "defer" statement.
stmt.StoreResultSet(nil)
stmt.StoreRowContainer(nil)
stmt.SetCursorActive(false)
}
}()

if err = cc.writeColumnInfo(crs.Columns()); err != nil {
return false, err
}

// explicitly flush columnInfo to client.
err = cc.writeEOF(ctx, cc.ctx.Status())
Expand All @@ -368,6 +438,12 @@ func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err err
cc.ctx.GetSessionVars().ClearAlloc(nil, false)
cc.ctx.GetSessionVars().SetStatusFlag(mysql.ServerStatusCursorExists, true)
defer cc.ctx.GetSessionVars().SetStatusFlag(mysql.ServerStatusCursorExists, false)
// Reset the warn count. TODO: consider whether it's better to reset the whole session context/statement context.
if cc.ctx.GetSessionVars().StmtCtx != nil {
cc.ctx.GetSessionVars().StmtCtx.SetWarnings(nil)
}
cc.ctx.GetSessionVars().SysErrorCount = 0
cc.ctx.GetSessionVars().SysWarningCount = 0

stmtID, fetchSize, err := parseStmtFetchCmd(data)
if err != nil {
Expand All @@ -379,6 +455,21 @@ func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err err
return errors.Annotate(mysql.NewErr(mysql.ErrUnknownStmtHandler,
strconv.FormatUint(uint64(stmtID), 10), "stmt_fetch"), cc.preparedStmt2String(stmtID))
}
if !stmt.GetCursorActive() {
return errors.Annotate(mysql.NewErr(mysql.ErrSpCursorNotOpen), cc.preparedStmt2String(stmtID))
}
// from now on, we have made sure: the statement has an active cursor
// then if facing any error, this cursor should be reset
defer func() {
if err != nil {
errReset := stmt.Reset()
if errReset != nil {
logutil.Logger(ctx).Error("Fail to reset statement in error handler. May cause resource leak.",
zap.NamedError("original-error", err), zap.NamedError("reset-error", errReset))
}
}
}()

if topsqlstate.TopSQLEnabled() {
prepareObj, _ := cc.preparedStmtID2CachePreparedStmt(stmtID)
if prepareObj != nil && prepareObj.SQLDigest != nil {
Expand All @@ -391,23 +482,22 @@ func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err err
}
cc.ctx.SetProcessInfo(sql, time.Now(), mysql.ComStmtExecute, 0)
rs := stmt.GetResultSet()
if rs == nil {
return errors.Annotate(mysql.NewErr(mysql.ErrUnknownStmtHandler,
strconv.FormatUint(uint64(stmtID), 10), "stmt_fetch_rs"), cc.preparedStmt2String(stmtID))
}

sendingEOF := false
// if the `fetchedRows` are empty before writing result, we could say the `FETCH` command will send EOF
if len(rs.GetFetchedRows()) == 0 {
sendingEOF = true
}
_, err = cc.writeResultset(ctx, rs, true, cc.ctx.Status(), int(fetchSize))
// if the iterator reached the end before writing result, we could say the `FETCH` command will send EOF
if rs.GetRowContainerReader().Current() == rs.GetRowContainerReader().End() {
// also reset the statement when the cursor reaches the end
// don't overwrite the `err` in outer scope, to avoid redundant `Reset()` in `defer` statement (though, it's not
// a big problem, as the `Reset()` function call is idempotent.)
err := stmt.Reset()
if err != nil {
logutil.Logger(ctx).Error("Fail to reset statement when FETCH command reaches the end. May cause resource leak",
zap.NamedError("error", err))
}
}
if err != nil {
return errors.Annotate(err, cc.preparedStmt2String(stmtID))
}
if sendingEOF {
stmt.SetCursorActive(false)
}

return nil
}
Expand Down Expand Up @@ -768,6 +858,10 @@ func (cc *clientConn) handleStmtSendLongData(data []byte) (err error) {
}

func (cc *clientConn) handleStmtReset(ctx context.Context, data []byte) (err error) {
// A reset command should reset the statement to the state when it was right after prepare
// Then the following state should be cleared:
// 1.The opened cursor, including the rowContainer (and its cursor/memTracker).
// 2.The argument sent through `SEND_LONG_DATA`.
if len(data) < 4 {
return mysql.ErrMalformPacket
}
Expand All @@ -778,8 +872,16 @@ func (cc *clientConn) handleStmtReset(ctx context.Context, data []byte) (err err
return mysql.NewErr(mysql.ErrUnknownStmtHandler,
strconv.Itoa(stmtID), "stmt_reset")
}
stmt.Reset()
stmt.StoreResultSet(nil)
err = stmt.Reset()
if err != nil {
// Both server and client cannot handle the error case well, so just left an error and return OK.
// It's fine to receive further `EXECUTE` command even the `Reset` function call failed.
logutil.Logger(ctx).Error("Fail to close statement in error handler of RESET command. May cause resource leak",
zap.NamedError("original-error", err), zap.NamedError("close-error", err))

return cc.writeOK(ctx)
}

return cc.writeOK(ctx)
}

Expand Down
Loading

0 comments on commit 4a18a3d

Please sign in to comment.