Skip to content

Commit

Permalink
store : add store limit to restrain bad store from occupying too much…
Browse files Browse the repository at this point in the history
… token limit. (pingcap#12779)
  • Loading branch information
AilinKid authored and XiaTianliang committed Dec 21, 2019
1 parent 63e51af commit dca8644
Show file tree
Hide file tree
Showing 18 changed files with 160 additions and 9 deletions.
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,9 @@ type TiKVClient struct {
// If a Region has not been accessed for more than the given duration (in seconds), it
// will be reloaded from the PD.
RegionCacheTTL uint `toml:"region-cache-ttl" json:"region-cache-ttl"`
// If a store has been up to the limit, it will return error for successive request to
// prevent the store occupying too much token in dispatching level.
StoreLimit int64 `toml:"store-limit" json:"store-limit"`
}

// Binlog is the config for binlog.
Expand Down Expand Up @@ -505,6 +508,7 @@ var defaultConf = Config{
EnableChunkRPC: true,

RegionCacheTTL: 600,
StoreLimit: 0,
},
Binlog: Binlog{
WriteTimeout: "15s",
Expand Down
5 changes: 5 additions & 0 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,11 @@ enable-chunk-rpc = true
# will be reloaded from the PD.
region-cache-ttl = 600

# store-limit is used to restrain TiDB from sending request to some stores which is up to the limit.
# If a store has been up to the limit, it will return error for the successive request in same store.
# default 0 means shutting off store limit.
store-limit = 0

[txn-local-latches]
# Enable local latches for transactions. Enable it when
# there are lots of conflicts between transactions.
Expand Down
2 changes: 2 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ txn-total-size-limit=2000
commit-timeout="41s"
max-batch-size=128
region-cache-ttl=6000
store-limit=0
[stmt-summary]
max-stmt-count=1000
max-sql-length=1024
Expand All @@ -204,6 +205,7 @@ max-sql-length=1024
c.Assert(conf.TiKVClient.CommitTimeout, Equals, "41s")
c.Assert(conf.TiKVClient.MaxBatchSize, Equals, uint(128))
c.Assert(conf.TiKVClient.RegionCacheTTL, Equals, uint(6000))
c.Assert(conf.TiKVClient.StoreLimit, Equals, int64(0))
c.Assert(conf.TokenLimit, Equals, uint(1000))
c.Assert(conf.EnableTableLock, IsTrue)
c.Assert(conf.DelayCleanTableLock, Equals, uint64(5))
Expand Down
12 changes: 12 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,18 @@ func (s *testSuite5) TestSetVar(c *C) {
tk.MustQuery("select @@tidb_record_plan_in_slow_log;").Check(testkit.Rows("1"))
tk.MustExec("set @@tidb_record_plan_in_slow_log = 0")
tk.MustQuery("select @@tidb_record_plan_in_slow_log;").Check(testkit.Rows("0"))

tk.MustQuery("select @@tidb_store_limit;").Check(testkit.Rows("0"))
tk.MustExec("set @@tidb_store_limit = 100")
tk.MustQuery("select @@tidb_store_limit;").Check(testkit.Rows("100"))
tk.MustQuery("select @@session.tidb_store_limit;").Check(testkit.Rows("100"))
tk.MustQuery("select @@global.tidb_store_limit;").Check(testkit.Rows("0"))
tk.MustExec("set @@tidb_store_limit = 0")

tk.MustExec("set global tidb_store_limit = 100")
tk.MustQuery("select @@tidb_store_limit;").Check(testkit.Rows("0"))
tk.MustQuery("select @@session.tidb_store_limit;").Check(testkit.Rows("0"))
tk.MustQuery("select @@global.tidb_store_limit;").Check(testkit.Rows("100"))
}

func (s *testSuite5) TestSetCharset(c *C) {
Expand Down
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func RegisterMetrics() {
prometheus.MustRegister(StmtNodeCounter)
prometheus.MustRegister(DbStmtNodeCounter)
prometheus.MustRegister(StoreQueryFeedbackCounter)
prometheus.MustRegister(GetStoreLimitErrorCounter)
prometheus.MustRegister(TiKVBackoffCounter)
prometheus.MustRegister(TiKVBackoffHistogram)
prometheus.MustRegister(TiKVCoprocessorHistogram)
Expand Down
8 changes: 8 additions & 0 deletions metrics/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ var (
Help: "Counter of storing query feedback.",
}, []string{LblType})

GetStoreLimitErrorCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "statistics",
Name: "get_store_limit_token_error",
Help: "store token is up to the limit, probably because one of the stores is the hotspot or unavailable",
}, []string{LblAddress, LblStore})

SignificantFeedbackCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "tidb",
Expand Down
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1841,6 +1841,7 @@ var builtinGlobalVariable = []string{
variable.TiDBUsePlanBaselines,
variable.TiDBEvolvePlanBaselines,
variable.TiDBIsolationReadEngines,
variable.TiDBStoreLimit,
}

var (
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/storeutil"
"github.com/pingcap/tidb/util/timeutil"
)

Expand Down Expand Up @@ -989,6 +990,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.isolationReadEngines[kv.TiFlash] = struct{}{}
}
}
case TiDBStoreLimit:
storeutil.StoreLimit.Store(tidbOptInt64(val, DefTiDBStoreLimit))
}
s.systems[name] = val
return nil
Expand Down
2 changes: 2 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package variable
import (
"strconv"
"strings"
"sync/atomic"

"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
Expand Down Expand Up @@ -720,6 +721,7 @@ var defaultSysVars = []*SysVar{
{ScopeGlobal | ScopeSession, TiDBUsePlanBaselines, BoolToIntStr(DefTiDBUsePlanBaselines)},
{ScopeGlobal | ScopeSession, TiDBEvolvePlanBaselines, BoolToIntStr(DefTiDBEvolvePlanBaselines)},
{ScopeGlobal | ScopeSession, TiDBIsolationReadEngines, "tikv,tiflash"},
{ScopeGlobal | ScopeSession, TiDBStoreLimit, strconv.FormatInt(atomic.LoadInt64(&config.GetGlobalConfig().TiKVClient.StoreLimit), 10)},
}

// SynonymsSysVariables is synonyms of system variables.
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,9 @@ const (
// TiDBIsolationReadEngines indicates the tidb only read from the stores whose engine type is involved in IsolationReadEngines.
// Now, only support TiKV and TiFlash.
TiDBIsolationReadEngines = "tidb_isolation_read_engines"

// TiDBStoreLimit indicates the limit of sending request to a store, 0 means without limit.
TiDBStoreLimit = "tidb_store_limit"
)

// Default TiDB system variable values.
Expand Down Expand Up @@ -418,6 +421,7 @@ const (
DefTiDBUsePlanBaselines = true
DefTiDBEvolvePlanBaselines = false
DefInnodbLockWaitTimeout = 50 // 50s
DefTiDBStoreLimit = 0
)

// Process global variables.
Expand Down
2 changes: 2 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,8 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
case SQLSelectLimit:
return checkUInt64SystemVar(name, value, 0, math.MaxUint64, vars)
case TiDBStoreLimit:
return checkInt64SystemVar(name, value, 0, math.MaxInt64, vars)
case SyncBinlog:
return checkUInt64SystemVar(name, value, 0, 4294967295, vars)
case TableDefinitionCache:
Expand Down
6 changes: 4 additions & 2 deletions store/mockstore/mocktikv/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/tablecodec"
"go.uber.org/atomic"
)

// Cluster simulates a TiKV cluster. It focuses on management and the change of
Expand Down Expand Up @@ -639,8 +640,9 @@ func (r *Region) incVersion() {

// Store is the Store's meta data.
type Store struct {
meta *metapb.Store
cancel bool // return context.Cancelled error when cancel is true.
meta *metapb.Store
cancel bool // return context.Cancelled error when cancel is true.
tokenCount atomic.Int64
}

func newStore(storeID uint64, addr string) *Store {
Expand Down
1 change: 1 addition & 0 deletions store/tikv/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
ErrQueryInterrupted = terror.ClassTiKV.New(mysql.ErrQueryInterrupted, mysql.MySQLErrName[mysql.ErrQueryInterrupted])
ErrLockAcquireFailAndNoWaitSet = terror.ClassTiKV.New(mysql.ErrLockAcquireFailAndNoWaitSet, mysql.MySQLErrName[mysql.ErrLockAcquireFailAndNoWaitSet])
ErrLockWaitTimeout = terror.ClassTiKV.New(mysql.ErrLockWaitTimeout, mysql.MySQLErrName[mysql.ErrLockWaitTimeout])
ErrTokenLimit = terror.ClassTiKV.New(mysql.ErrTiKVStoreLimit, mysql.MySQLErrName[mysql.ErrTiKVStoreLimit])
)

// ErrDeadlock wraps *kvrpcpb.Deadlock to implement the error interface.
Expand Down
14 changes: 8 additions & 6 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/util/logutil"
atomic2 "go.uber.org/atomic"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -1167,12 +1168,13 @@ func (r *Region) ContainsByEnd(key []byte) bool {

// Store contains a kv process's address.
type Store struct {
addr string // loaded store address
storeID uint64 // store's id
state uint64 // unsafe store storeState
resolveMutex sync.Mutex // protect pd from concurrent init requests
fail uint32 // store fail count, see RegionStore.storeFails
storeType kv.StoreType // type of the store
addr string // loaded store address
storeID uint64 // store's id
state uint64 // unsafe store storeState
resolveMutex sync.Mutex // protect pd from concurrent init requests
fail uint32 // store fail count, see RegionStore.storeFails
storeType kv.StoreType // type of the store
tokenCount atomic2.Int64 // used store token count
}

type resolveState uint64
Expand Down
32 changes: 32 additions & 0 deletions store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package tikv

import (
"context"
"strconv"
"sync/atomic"
"time"

Expand All @@ -30,6 +31,7 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/storeutil"
)

// ShuttingDown is a flag to indicate tidb-server is exiting (Ctrl+C signal
Expand Down Expand Up @@ -167,6 +169,13 @@ func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, ctx *RPCContext, re
if e := tikvrpc.SetContext(req, ctx.Meta, ctx.Peer); e != nil {
return nil, false, errors.Trace(e)
}
// judge the store limit switch.
if limit := storeutil.StoreLimit.Load(); limit > 0 {
if err := s.getStoreToken(ctx.Store, limit); err != nil {
return nil, false, err
}
defer s.releaseStoreToken(ctx.Store)
}
resp, err = s.client.SendRequest(bo.ctx, ctx.Addr, req, timeout)
if err != nil {
s.rpcError = err
Expand All @@ -178,6 +187,29 @@ func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, ctx *RPCContext, re
return
}

func (s *RegionRequestSender) getStoreToken(st *Store, limit int64) error {
// Checking limit is not thread safe, preferring this for avoiding load in loop.
count := st.tokenCount.Load()
if count < limit {
// Adding tokenCount is no thread safe, preferring this for avoiding check in loop.
st.tokenCount.Add(1)
return nil
}
metrics.GetStoreLimitErrorCounter.WithLabelValues(st.addr, strconv.FormatUint(st.storeID, 10)).Inc()
return ErrTokenLimit.GenWithStackByArgs(st.storeID)

}

func (s *RegionRequestSender) releaseStoreToken(st *Store) {
count := st.tokenCount.Load()
// Decreasing tokenCount is no thread safe, preferring this for avoiding check in loop.
if count > 0 {
st.tokenCount.Sub(1)
return
}
logutil.BgLogger().Warn("release store token failed, count equals to 0")
}

func (s *RegionRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err error) error {
// If it failed because the context is cancelled by ourself, don't retry.
if errors.Cause(err) == context.Canceled {
Expand Down
45 changes: 45 additions & 0 deletions store/tikv/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/storeutil"
"google.golang.org/grpc"
)

Expand All @@ -43,7 +44,20 @@ type testRegionRequestSuite struct {
mvccStore mocktikv.MVCCStore
}

type testStoreLimitSuite struct {
cluster *mocktikv.Cluster
storeIDs []uint64
peerIDs []uint64
regionID uint64
leaderPeer uint64
cache *RegionCache
bo *Backoffer
regionRequestSender *RegionRequestSender
mvccStore mocktikv.MVCCStore
}

var _ = Suite(&testRegionRequestSuite{})
var _ = Suite(&testStoreLimitSuite{})

func (s *testRegionRequestSuite) SetUpTest(c *C) {
s.cluster = mocktikv.NewCluster()
Expand All @@ -56,10 +70,41 @@ func (s *testRegionRequestSuite) SetUpTest(c *C) {
s.regionRequestSender = NewRegionRequestSender(s.cache, client)
}

func (s *testStoreLimitSuite) SetUpTest(c *C) {
s.cluster = mocktikv.NewCluster()
s.storeIDs, s.peerIDs, s.regionID, s.leaderPeer = mocktikv.BootstrapWithMultiStores(s.cluster, 3)
pdCli := &codecPDClient{mocktikv.NewPDClient(s.cluster)}
s.cache = NewRegionCache(pdCli)
s.bo = NewNoopBackoff(context.Background())
s.mvccStore = mocktikv.MustNewMVCCStore()
client := mocktikv.NewRPCClient(s.cluster, s.mvccStore)
s.regionRequestSender = NewRegionRequestSender(s.cache, client)
}

func (s *testRegionRequestSuite) TearDownTest(c *C) {
s.cache.Close()
}

func (s *testStoreLimitSuite) TearDownTest(c *C) {
s.cache.Close()
}

func (s *testStoreLimitSuite) TestStoreTokenLimit(c *C) {
req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, &kvrpcpb.PrewriteRequest{}, kvrpcpb.Context{})
region, err := s.cache.LocateRegionByID(s.bo, s.regionID)
c.Assert(err, IsNil)
c.Assert(region, NotNil)
oldStoreLimit := storeutil.StoreLimit.Load()
storeutil.StoreLimit.Store(500)
s.cache.getStoreByStoreID(s.storeIDs[0]).tokenCount.Store(500)
// cause there is only one region in this cluster, regionID maps this leader.
resp, err := s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second)
c.Assert(err, NotNil)
c.Assert(resp, IsNil)
c.Assert(err.Error(), Equals, "[tikv:9008]Store token is up to the limit, store id = 1")
storeutil.StoreLimit.Store(oldStoreLimit)
}

func (s *testRegionRequestSuite) TestOnSendFailedWithStoreRestart(c *C) {
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
Key: []byte("key"),
Expand Down
6 changes: 5 additions & 1 deletion tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/printer"
"github.com/pingcap/tidb/util/signal"
"github.com/pingcap/tidb/util/storeutil"
"github.com/pingcap/tidb/util/sys/linux"
"github.com/pingcap/tidb/util/systimemon"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -387,7 +388,7 @@ func loadConfig() string {
var hotReloadConfigItems = []string{"Performance.MaxProcs", "Performance.MaxMemory", "Performance.CrossJoin",
"Performance.FeedbackProbability", "Performance.QueryFeedbackLimit", "Performance.PseudoEstimateRatio",
"OOMUseTmpStorage", "OOMAction", "MemQuotaQuery", "StmtSummary.MaxStmtCount", "StmtSummary.MaxSQLLength", "Log.QueryLogMaxLen",
"TiKVClient.EnableChunkRPC"}
"TiKVClient.EnableChunkRPC", "TiKVClient.StoreLimit"}

func reloadConfig(nc, c *config.Config) {
// Just a part of config items need to be reload explicitly.
Expand All @@ -410,6 +411,9 @@ func reloadConfig(nc, c *config.Config) {
if nc.Performance.PseudoEstimateRatio != c.Performance.PseudoEstimateRatio {
statistics.RatioOfPseudoEstimate.Store(nc.Performance.PseudoEstimateRatio)
}
if nc.TiKVClient.StoreLimit != c.TiKVClient.StoreLimit {
storeutil.StoreLimit.Store(nc.TiKVClient.StoreLimit)
}
}

func overrideConfig() {
Expand Down
21 changes: 21 additions & 0 deletions util/storeutil/store_vars.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package storeutil

import (
"go.uber.org/atomic"
)

// StoreLimit will update from config reload and global variable set.
var StoreLimit atomic.Int64

0 comments on commit dca8644

Please sign in to comment.