Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for MAX_EXECUTION_TIME. #10541

Merged
merged 21 commits into from
Jun 24, 2019
Merged
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
@@ -50,7 +50,7 @@ import (

// processinfoSetter is the interface use to set current running process info.
type processinfoSetter interface {
SetProcessInfo(string, time.Time, byte)
SetProcessInfo(string, time.Time, byte, uint64)
}

// recordSet wraps an executor, implements sqlexec.RecordSet interface
@@ -240,8 +240,9 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
sql = ss.SecureText()
}
}
maxExecutionTime := getMaxExecutionTime(sctx, a.StmtNode)
// Update processinfo, ShowProcess() will use it.
pi.SetProcessInfo(sql, time.Now(), cmd)
pi.SetProcessInfo(sql, time.Now(), cmd, maxExecutionTime)
a.Ctx.GetSessionVars().StmtCtx.StmtType = GetStmtLabel(a.StmtNode)
}

@@ -280,6 +281,20 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
}, nil
}

// getMaxExecutionTime get the max execution timeout value.
func getMaxExecutionTime(sctx sessionctx.Context, stmtNode ast.StmtNode) uint64 {
ret := sctx.GetSessionVars().MaxExecutionTime
if sel, ok := stmtNode.(*ast.SelectStmt); ok {
for _, hint := range sel.TableHints {
if hint.HintName.L == variable.MaxExecutionTime {
ret = hint.MaxExecutionTime
break
}
}
}
return ret
}

type chunkRowRecordSet struct {
rows []chunk.Row
idx int
19 changes: 16 additions & 3 deletions server/conn.go
Original file line number Diff line number Diff line change
@@ -50,6 +50,7 @@ import (

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/auth"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
@@ -262,6 +263,11 @@ func (cc *clientConn) readPacket() ([]byte, error) {
}

func (cc *clientConn) writePacket(data []byte) error {
failpoint.Inject("FakeClientConn", func() {
if cc.pkt == nil {
failpoint.Return(nil)
}
})
return cc.pkt.writePacket(data)
}

@@ -847,8 +853,9 @@ func (cc *clientConn) dispatch(ctx context.Context, data []byte) error {
defer func() {
// if handleChangeUser failed, cc.ctx may be nil
if cc.ctx != nil {
cc.ctx.SetProcessInfo("", t, mysql.ComSleep)
cc.ctx.SetProcessInfo("", t, mysql.ComSleep, 0)
}

cc.server.releaseToken(token)
span.Finish()
}()
@@ -863,9 +870,9 @@ func (cc *clientConn) dispatch(ctx context.Context, data []byte) error {
switch cmd {
case mysql.ComPing, mysql.ComStmtClose, mysql.ComStmtSendLongData, mysql.ComStmtReset,
mysql.ComSetOption, mysql.ComChangeUser:
cc.ctx.SetProcessInfo("", t, cmd)
cc.ctx.SetProcessInfo("", t, cmd, 0)
case mysql.ComInitDB:
cc.ctx.SetProcessInfo("use "+dataStr, t, cmd)
cc.ctx.SetProcessInfo("use "+dataStr, t, cmd, 0)
}

switch cmd {
@@ -928,6 +935,11 @@ func (cc *clientConn) useDB(ctx context.Context, db string) (err error) {
}

func (cc *clientConn) flush() error {
failpoint.Inject("FakeClientConn", func() {
if cc.pkt == nil {
failpoint.Return(nil)
}
})
return cc.pkt.flush()
}

@@ -1258,6 +1270,7 @@ func (cc *clientConn) writeResultset(ctx context.Context, rs ResultSet, binary b
if err != nil {
return err
}

return cc.flush()
}

2 changes: 1 addition & 1 deletion server/conn_stmt.go
Original file line number Diff line number Diff line change
@@ -223,7 +223,7 @@ func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err err
if prepared, ok := cc.ctx.GetStatement(int(stmtID)).(*TiDBStatement); ok {
sql = prepared.sql
}
cc.ctx.SetProcessInfo(sql, time.Now(), mysql.ComStmtExecute)
cc.ctx.SetProcessInfo(sql, time.Now(), mysql.ComStmtExecute, 0)
rs := stmt.GetResultSet()
if rs == nil {
return mysql.NewErr(mysql.ErrUnknownStmtHandler,
69 changes: 69 additions & 0 deletions server/conn_test.go
Original file line number Diff line number Diff line change
@@ -18,9 +18,11 @@ import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
@@ -363,3 +365,70 @@ func mapBelong(m1, m2 map[string]string) bool {
}
return true
}

func (ts ConnTestSuite) TestConnExecutionTimeout(c *C) {
//There is no underlying netCon, use failpoint to avoid panic
c.Assert(failpoint.Enable("github.com/pingcap/tidb/server/FakeClientConn", "return(1)"), IsNil)

c.Parallel()
var err error
ts.store, err = mockstore.NewMockTikvStore()
c.Assert(err, IsNil)
ts.dom, err = session.BootstrapSession(ts.store)
c.Assert(err, IsNil)
se, err := session.CreateSession4Test(ts.store)
c.Assert(err, IsNil)

connID := 1
se.SetConnectionID(uint64(connID))
tc := &TiDBContext{
session: se,
stmts: make(map[int]*TiDBStatement),
}
cc := &clientConn{
connectionID: uint32(connID),
server: &Server{
capability: defaultCapability,
},
ctx: tc,
alloc: arena.NewAllocator(32 * 1024),
}
srv := &Server{
clients: map[uint32]*clientConn{
uint32(connID): cc,
},
}
handle := ts.dom.ExpensiveQueryHandle().SetSessionManager(srv)
go handle.Run()
defer handle.Close()

_, err = se.Execute(context.Background(), "use test;")
c.Assert(err, IsNil)
_, err = se.Execute(context.Background(), "CREATE TABLE testTable2 (id bigint PRIMARY KEY, age int)")
c.Assert(err, IsNil)
for i := 0; i < 10; i++ {
str := fmt.Sprintf("insert into testTable2 values(%d, %d)", i, i%80)
_, err = se.Execute(context.Background(), str)
c.Assert(err, IsNil)
}

_, err = se.Execute(context.Background(), "select SLEEP(1);")
c.Assert(err, IsNil)

_, err = se.Execute(context.Background(), "set @@max_execution_time = 500;")
c.Assert(err, IsNil)

err = cc.handleQuery(context.Background(), "select * FROM testTable2 WHERE SLEEP(1);")
c.Assert(err, NotNil)

_, err = se.Execute(context.Background(), "set @@max_execution_time = 0;")
c.Assert(err, IsNil)

err = cc.handleQuery(context.Background(), "select * FROM testTable2 WHERE SLEEP(1);")
c.Assert(err, IsNil)

err = cc.handleQuery(context.Background(), "select /*+ MAX_EXECUTION_TIME(100)*/ * FROM testTable2 WHERE SLEEP(1);")
c.Assert(err, NotNil)

c.Assert(failpoint.Disable("github.com/pingcap/tidb/server/FakeClientConn"), IsNil)
}
2 changes: 1 addition & 1 deletion server/driver.go
Original file line number Diff line number Diff line change
@@ -51,7 +51,7 @@ type QueryCtx interface {
// SetValue saves a value associated with this context for key.
SetValue(key fmt.Stringer, value interface{})

SetProcessInfo(sql string, t time.Time, command byte)
SetProcessInfo(sql string, t time.Time, command byte, maxExecutionTime uint64)

// CommitTxn commits the transaction operations.
CommitTxn(ctx context.Context) error
4 changes: 2 additions & 2 deletions server/driver_tidb.go
Original file line number Diff line number Diff line change
@@ -212,8 +212,8 @@ func (tc *TiDBContext) CommitTxn(ctx context.Context) error {
}

// SetProcessInfo implements QueryCtx SetProcessInfo method.
func (tc *TiDBContext) SetProcessInfo(sql string, t time.Time, command byte) {
tc.session.SetProcessInfo(sql, t, command)
func (tc *TiDBContext) SetProcessInfo(sql string, t time.Time, command byte, maxExecutionTime uint64) {
tc.session.SetProcessInfo(sql, t, command, maxExecutionTime)
}

// RollbackTxn implements QueryCtx RollbackTxn method.
26 changes: 14 additions & 12 deletions server/server.go
Original file line number Diff line number Diff line change
@@ -84,12 +84,13 @@ func init() {
}

var (
errUnknownFieldType = terror.ClassServer.New(codeUnknownFieldType, "unknown field type")
errInvalidPayloadLen = terror.ClassServer.New(codeInvalidPayloadLen, "invalid payload length")
errInvalidSequence = terror.ClassServer.New(codeInvalidSequence, "invalid sequence")
errInvalidType = terror.ClassServer.New(codeInvalidType, "invalid type")
errNotAllowedCommand = terror.ClassServer.New(codeNotAllowedCommand, "the used command is not allowed with this TiDB version")
errAccessDenied = terror.ClassServer.New(codeAccessDenied, mysql.MySQLErrName[mysql.ErrAccessDenied])
errUnknownFieldType = terror.ClassServer.New(codeUnknownFieldType, "unknown field type")
errInvalidPayloadLen = terror.ClassServer.New(codeInvalidPayloadLen, "invalid payload length")
errInvalidSequence = terror.ClassServer.New(codeInvalidSequence, "invalid sequence")
errInvalidType = terror.ClassServer.New(codeInvalidType, "invalid type")
errNotAllowedCommand = terror.ClassServer.New(codeNotAllowedCommand, "the used command is not allowed with this TiDB version")
errAccessDenied = terror.ClassServer.New(codeAccessDenied, mysql.MySQLErrName[mysql.ErrAccessDenied])
errMaxExecTimeExceeded = terror.ClassServer.New(codeMaxExecTimeExceeded, mysql.MySQLErrName[mysql.ErrMaxExecTimeExceeded])
)

// DefaultCapability is the capability of the server when it is created using the default configuration.
@@ -107,7 +108,7 @@ type Server struct {
driver IDriver
listener net.Listener
socket net.Listener
rwlock *sync.RWMutex
rwlock sync.RWMutex
concurrentLimiter *TokenLimiter
clients map[uint32]*clientConn
capability uint32
@@ -199,7 +200,6 @@ func NewServer(cfg *config.Config, driver IDriver) (*Server, error) {
cfg: cfg,
driver: driver,
concurrentLimiter: NewTokenLimiter(cfg.TokenLimit),
rwlock: &sync.RWMutex{},
clients: make(map[uint32]*clientConn),
stopListenerCh: make(chan struct{}, 1),
}
@@ -618,14 +618,16 @@ const (
codeInvalidSequence = 3
codeInvalidType = 4

codeNotAllowedCommand = 1148
codeAccessDenied = mysql.ErrAccessDenied
codeNotAllowedCommand = 1148
codeAccessDenied = mysql.ErrAccessDenied
codeMaxExecTimeExceeded = mysql.ErrMaxExecTimeExceeded
)

func init() {
serverMySQLErrCodes := map[terror.ErrCode]uint16{
codeNotAllowedCommand: mysql.ErrNotAllowedCommand,
codeAccessDenied: mysql.ErrAccessDenied,
codeNotAllowedCommand: mysql.ErrNotAllowedCommand,
codeAccessDenied: mysql.ErrAccessDenied,
codeMaxExecTimeExceeded: mysql.ErrMaxExecTimeExceeded,
}
terror.ErrClassToMySQLCodes[terror.ClassServer] = serverMySQLErrCodes
}
33 changes: 21 additions & 12 deletions session/session.go
Original file line number Diff line number Diff line change
@@ -116,7 +116,7 @@ type Session interface {
SetClientCapability(uint32) // Set client capability flags.
SetConnectionID(uint64)
SetCommandValue(byte)
SetProcessInfo(string, time.Time, byte)
SetProcessInfo(string, time.Time, byte, uint64)
SetTLSState(*tls.ConnectionState)
SetCollation(coID int) error
SetSessionManager(util.SessionManager)
@@ -829,6 +829,10 @@ func createSessionFunc(store kv.Storage) pools.Factory {
if err != nil {
return nil, err
}
err = variable.SetSessionSystemVar(se.sessionVars, variable.MaxExecutionTime, types.NewUintDatum(0))
if err != nil {
return nil, errors.Trace(err)
}
se.sessionVars.CommonGlobalLoaded = true
se.sessionVars.InRestrictedSQL = true
return se, nil
@@ -845,6 +849,10 @@ func createSessionWithDomainFunc(store kv.Storage) func(*domain.Domain) (pools.R
if err != nil {
return nil, err
}
err = variable.SetSessionSystemVar(se.sessionVars, variable.MaxExecutionTime, types.NewUintDatum(0))
if err != nil {
return nil, errors.Trace(err)
}
se.sessionVars.CommonGlobalLoaded = true
se.sessionVars.InRestrictedSQL = true
return se, nil
@@ -956,18 +964,19 @@ func (s *session) ParseSQL(ctx context.Context, sql, charset, collation string)
return s.parser.Parse(sql, charset, collation)
}

func (s *session) SetProcessInfo(sql string, t time.Time, command byte) {
func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecutionTime uint64) {
pi := util.ProcessInfo{
ID: s.sessionVars.ConnectionID,
DB: s.sessionVars.CurrentDB,
Command: command,
Plan: s.currentPlan,
Time: t,
State: s.Status(),
Info: sql,
CurTxnStartTS: s.sessionVars.TxnCtx.StartTS,
StmtCtx: s.sessionVars.StmtCtx,
StatsInfo: plannercore.GetStatsInfo,
ID: s.sessionVars.ConnectionID,
DB: s.sessionVars.CurrentDB,
Command: command,
Plan: s.currentPlan,
Time: t,
State: s.Status(),
Info: sql,
CurTxnStartTS: s.sessionVars.TxnCtx.StartTS,
StmtCtx: s.sessionVars.StmtCtx,
StatsInfo: plannercore.GetStatsInfo,
MaxExecutionTime: maxExecutionTime,
}
if s.sessionVars.User != nil {
pi.User = s.sessionVars.User.Username
26 changes: 26 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
@@ -2640,6 +2640,32 @@ func (s *testSessionSuite) TestTxnGoString(c *C) {
c.Assert(fmt.Sprintf("%#v", txn), Equals, "Txn{state=invalid}")
}

func (s *testSessionSuite) TestMaxExeucteTime(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)

tk.MustExec("create table MaxExecTime( id int,name varchar(128),age int);")
tk.MustExec("begin")
tk.MustExec("insert into MaxExecTime (id,name,age) values (1,'john',18),(2,'lary',19),(3,'lily',18);")

tk.MustQuery("select @@MAX_EXECUTION_TIME;").Check(testkit.Rows("0"))
tk.MustQuery("select @@global.MAX_EXECUTION_TIME;").Check(testkit.Rows("0"))
tk.MustQuery("select /*+ MAX_EXECUTION_TIME(1000) */ * FROM MaxExecTime;")

tk.MustExec("set @@global.MAX_EXECUTION_TIME = 300;")
tk.MustQuery("select * FROM MaxExecTime;")

tk.MustExec("set @@MAX_EXECUTION_TIME = 150;")
tk.MustQuery("select * FROM MaxExecTime;")

tk.MustQuery("select @@global.MAX_EXECUTION_TIME;").Check(testkit.Rows("300"))
tk.MustQuery("select @@MAX_EXECUTION_TIME;").Check(testkit.Rows("150"))

tk.MustExec("set @@global.MAX_EXECUTION_TIME = 0;")
tk.MustExec("set @@MAX_EXECUTION_TIME = 0;")
tk.MustExec("commit")
tk.MustExec("drop table if exists MaxExecTime;")
}

func (s *testSessionSuite) TestGrantViewRelated(c *C) {
tkRoot := testkit.NewTestKitWithInit(c, s.store)
tkUser := testkit.NewTestKitWithInit(c, s.store)
26 changes: 17 additions & 9 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
@@ -111,7 +111,7 @@ type TransactionContext struct {
TableDeltaMap map[int64]TableDelta
IsPessimistic bool

// For metrics.
// CreateTime For metrics.
CreateTime time.Time
StatementCount int
}
@@ -205,25 +205,24 @@ type SessionVars struct {
PreparedStmtNameToID map[string]uint32
// preparedStmtID is id of prepared statement.
preparedStmtID uint32
// params for prepared statements
// PreparedParams params for prepared statements
PreparedParams []types.Datum

// ActiveRoles stores active roles for current user
ActiveRoles []*auth.RoleIdentity

// retry information
RetryInfo *RetryInfo
// Should be reset on transaction finished.
// TxnCtx Should be reset on transaction finished.
TxnCtx *TransactionContext

// KVVars is the variables for KV storage.
KVVars *kv.Variables

// TxnIsolationLevelOneShot is used to implements "set transaction isolation level ..."
TxnIsolationLevelOneShot struct {
// state 0 means default
// state 1 means it's set in current transaction.
// state 2 means it should be used in current transaction.
// State 0 means default
// State 1 means it's set in current transaction.
// State 2 means it should be used in current transaction.
State int
Value string
}
@@ -367,7 +366,7 @@ type SessionVars struct {
// CommandValue indicates which command current session is doing.
CommandValue uint32

// TIDBOptJoinOrderAlgoThreshold defines the minimal number of join nodes
// TiDBOptJoinReorderThreshold defines the minimal number of join nodes
// to use the greedy join reorder algorithm.
TiDBOptJoinReorderThreshold int

@@ -383,6 +382,11 @@ type SessionVars struct {
// LowResolutionTSO is used for reading data with low resolution TSO which is updated once every two seconds.
LowResolutionTSO bool

// MaxExecutionTime is the timeout for select statement, in milliseconds.
// If the value is 0, timeouts are not enabled.
// See https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_max_execution_time
MaxExecutionTime uint64

// Killed is a flag to indicate that this query is killed.
Killed uint32
}
@@ -692,6 +696,9 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
if isAutocommit {
s.SetStatusFlag(mysql.ServerStatusInTrans, false)
}
case MaxExecutionTime:
timeoutMS := tidbOptPositiveInt32(val, 0)
s.MaxExecutionTime = uint64(timeoutMS)
case TiDBSkipUTF8Check:
s.SkipUTF8Check = TiDBOptOn(val)
case TiDBOptAggPushDown:
@@ -848,6 +855,7 @@ const (
TxnIsolation = "tx_isolation"
TransactionIsolation = "transaction_isolation"
TxnIsolationOneShot = "tx_isolation_one_shot"
MaxExecutionTime = "max_execution_time"
)

// these variables are useless for TiDB, but still need to validate their values for some compatible issues.
@@ -894,7 +902,7 @@ type Concurrency struct {
// HashAggPartialConcurrency is the number of concurrent hash aggregation partial worker.
HashAggPartialConcurrency int

// HashAggPartialConcurrency is the number of concurrent hash aggregation final worker.
// HashAggFinalConcurrency is the number of concurrent hash aggregation final worker.
HashAggFinalConcurrency int

// IndexSerialScanConcurrency is the number of concurrent index serial scan worker.
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
@@ -174,6 +174,7 @@ var defaultSysVars = []*SysVar{
{ScopeGlobal, "innodb_max_undo_log_size", ""},
{ScopeGlobal | ScopeSession, "range_alloc_block_size", "4096"},
{ScopeGlobal, ConnectTimeout, "10"},
{ScopeGlobal | ScopeSession, MaxExecutionTime, "0"},
{ScopeGlobal | ScopeSession, CollationServer, mysql.DefaultCollationName},
{ScopeNone, "have_rtree_keys", "YES"},
{ScopeGlobal, "innodb_old_blocks_pct", "37"},
13 changes: 7 additions & 6 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
@@ -336,8 +336,8 @@ const (
DefTiDBDDLSlowOprThreshold = 300
DefTiDBUseFastAnalyze = false
DefTiDBSkipIsolationLevelCheck = false
DefTiDBExpensiveQueryTimeThreshold = 60 // 60s
DefTiDBWaitSplitRegionFinish = true
DefTiDBExpensiveQueryTimeThreshold = 60 // 60s
DefWaitSplitRegionTimeout = 300 // 300s
)

@@ -352,9 +352,10 @@ var (
MaxDDLReorgBatchSize int32 = 10240
MinDDLReorgBatchSize int32 = 32
// DDLSlowOprThreshold is the threshold for ddl slow operations, uint is millisecond.
DDLSlowOprThreshold uint32 = DefTiDBDDLSlowOprThreshold
ForcePriority = int32(DefTiDBForcePriority)
ServerHostname, _ = os.Hostname()
MaxOfMaxAllowedPacket uint64 = 1073741824
ExpensiveQueryTimeThreshold uint64 = DefTiDBExpensiveQueryTimeThreshold
DDLSlowOprThreshold uint32 = DefTiDBDDLSlowOprThreshold
ForcePriority = int32(DefTiDBForcePriority)
ServerHostname, _ = os.Hostname()
MaxOfMaxAllowedPacket uint64 = 1073741824
ExpensiveQueryTimeThreshold uint64 = DefTiDBExpensiveQueryTimeThreshold
MinExpensiveQueryTimeThreshold uint64 = 10 //10s
)
6 changes: 5 additions & 1 deletion sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
@@ -424,6 +424,8 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
return value, nil
}
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
case MaxExecutionTime:
return checkUInt64SystemVar(name, value, 0, math.MaxUint64, vars)
case TiDBEnableTablePartition:
switch {
case strings.EqualFold(value, "ON") || value == "1":
@@ -438,6 +440,8 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
return checkUInt64SystemVar(name, value, uint64(MinDDLReorgBatchSize), uint64(MaxDDLReorgBatchSize), vars)
case TiDBDDLErrorCountLimit:
return checkUInt64SystemVar(name, value, uint64(0), math.MaxInt64, vars)
case TiDBExpensiveQueryTimeThreshold:
return checkUInt64SystemVar(name, value, MinExpensiveQueryTimeThreshold, math.MaxInt64, vars)
case TiDBIndexLookupConcurrency, TiDBIndexLookupJoinConcurrency, TiDBIndexJoinBatchSize,
TiDBIndexLookupSize,
TiDBHashJoinConcurrency,
@@ -446,7 +450,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
TiDBDistSQLScanConcurrency,
TiDBIndexSerialScanConcurrency, TiDBDDLReorgWorkerCount,
TiDBBackoffLockFast, TiDBBackOffWeight,
TiDBDMLBatchSize, TiDBOptimizerSelectivityLevel, TiDBExpensiveQueryTimeThreshold:
TiDBDMLBatchSize, TiDBOptimizerSelectivityLevel:
v, err := strconv.Atoi(value)
if err != nil {
return value, ErrWrongTypeForVar.GenWithStackByArgs(name)
2 changes: 1 addition & 1 deletion tidb-server/main.go
Original file line number Diff line number Diff line change
@@ -28,7 +28,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/pd/client"
pd "github.com/pingcap/pd/client"
pumpcli "github.com/pingcap/tidb-tools/tidb-binlog/pump_client"
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/config"
26 changes: 15 additions & 11 deletions util/expensivequery/expensivequery.go
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

@@ -30,6 +31,7 @@ import (

// Handle is the handler for expensive query.
type Handle struct {
mu sync.RWMutex
exitCh chan struct{}
sm util.SessionManager
}
@@ -49,36 +51,38 @@ func (eqh *Handle) SetSessionManager(sm util.SessionManager) *Handle {
// Run starts a expensive query checker goroutine at the start time of the server.
func (eqh *Handle) Run() {
threshold := atomic.LoadUint64(&variable.ExpensiveQueryTimeThreshold)
curInterval := time.Second * time.Duration(threshold)
ticker := time.NewTicker(curInterval / 2)
// use 100ms as tickInterval temply, may use given interval or use defined variable later
tickInterval := time.Millisecond * time.Duration(100)
ticker := time.NewTicker(tickInterval)
for {
select {
case <-ticker.C:
if log.GetLevel() > zapcore.WarnLevel {
continue
}
processInfo := eqh.sm.ShowProcessList()
for _, info := range processInfo {
if len(info.Info) == 0 || info.ExceedExpensiveTimeThresh {
continue
}
if costTime := time.Since(info.Time); costTime >= curInterval {
costTime := time.Since(info.Time)
if costTime >= time.Second*time.Duration(threshold) && log.GetLevel() <= zapcore.WarnLevel {
logExpensiveQuery(costTime, info)
info.ExceedExpensiveTimeThresh = true

} else if info.MaxExecutionTime > 0 && costTime > time.Duration(info.MaxExecutionTime)*time.Millisecond {
eqh.sm.Kill(info.ID, true)
}
}
threshold = atomic.LoadUint64(&variable.ExpensiveQueryTimeThreshold)
if newInterval := time.Second * time.Duration(threshold); curInterval != newInterval {
curInterval = newInterval
ticker.Stop()
ticker = time.NewTicker(curInterval / 2)
}
case <-eqh.exitCh:
return
}
}
}

// Close closes the handle and release the background goroutine.
func (eqh *Handle) Close() {
close(eqh.exitCh)
}

// LogOnQueryExceedMemQuota prints a log when memory usage of connID is out of memory quota.
func (eqh *Handle) LogOnQueryExceedMemQuota(connID uint64) {
if log.GetLevel() > zapcore.WarnLevel {
3 changes: 3 additions & 0 deletions util/processinfo.go
Original file line number Diff line number Diff line change
@@ -36,6 +36,9 @@ type ProcessInfo struct {
StmtCtx *stmtctx.StatementContext
StatsInfo func(interface{}) map[string]uint64
ExceedExpensiveTimeThresh bool
// MaxExecutionTime is the timeout for select statement, in milliseconds.
// If the query takes too long, kill it.
MaxExecutionTime uint64
}

// ToRowForShow returns []interface{} for the row data of "SHOW [FULL] PROCESSLIST".