Skip to content

Commit

Permalink
store : add store limit to restrain bad store from occupying too much…
Browse files Browse the repository at this point in the history
… token limit. (pingcap#12779)
  • Loading branch information
AilinKid committed Nov 14, 2019
1 parent 6f9ae22 commit 14f4b19
Show file tree
Hide file tree
Showing 20 changed files with 162 additions and 11 deletions.
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,9 @@ type TiKVClient struct {
// If a Region has not been accessed for more than the given duration (in seconds), it
// will be reloaded from the PD.
RegionCacheTTL uint `toml:"region-cache-ttl" json:"region-cache-ttl"`
// If a store has been up to the limit, it will return error for successive request to
// prevent the store occupying too much token in dispatching level.
StoreLimit int64 `toml:"store-limit" json:"store-limit"`
}

// Binlog is the config for binlog.
Expand Down Expand Up @@ -403,6 +406,7 @@ var defaultConf = Config{
BatchWaitSize: 8,

RegionCacheTTL: 600,
StoreLimit: 0,
},
Binlog: Binlog{
WriteTimeout: "15s",
Expand Down
5 changes: 5 additions & 0 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,11 @@ batch-wait-size = 8
# will be reloaded from the PD.
region-cache-ttl = 600

# store-limit is used to restrain TiDB from sending request to some stores which is up to the limit.
# If a store has been up to the limit, it will return error for the successive request in same store.
# default 0 means shutting off store limit.
store-limit = 0

[txn-local-latches]
# Enable local latches for transactions. Enable it when
# there are lots of conflicts between transactions.
Expand Down
2 changes: 2 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ split-region-max-num=10000
commit-timeout="41s"
max-batch-size=128
region-cache-ttl=6000
store-limit=0
[stmt-summary]
max-stmt-count=1000
max-sql-length=1024
Expand All @@ -84,6 +85,7 @@ max-sql-length=1024
c.Assert(conf.TiKVClient.CommitTimeout, Equals, "41s")
c.Assert(conf.TiKVClient.MaxBatchSize, Equals, uint(128))
c.Assert(conf.TiKVClient.RegionCacheTTL, Equals, uint(6000))
c.Assert(conf.TiKVClient.StoreLimit, Equals, int64(0))
c.Assert(conf.TokenLimit, Equals, uint(1000))
c.Assert(conf.SplitRegionMaxNum, Equals, uint64(10000))
c.Assert(conf.StmtSummary.MaxStmtCount, Equals, uint(1000))
Expand Down
12 changes: 12 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,18 @@ func (s *testSuite2) TestSetVar(c *C) {
tk.MustQuery("select @@tidb_record_plan_in_slow_log;").Check(testkit.Rows("1"))
tk.MustExec("set @@tidb_record_plan_in_slow_log = 0")
tk.MustQuery("select @@tidb_record_plan_in_slow_log;").Check(testkit.Rows("0"))

tk.MustQuery("select @@tidb_store_limit;").Check(testkit.Rows("0"))
tk.MustExec("set @@tidb_store_limit = 100")
tk.MustQuery("select @@tidb_store_limit;").Check(testkit.Rows("100"))
tk.MustQuery("select @@session.tidb_store_limit;").Check(testkit.Rows("100"))
tk.MustQuery("select @@global.tidb_store_limit;").Check(testkit.Rows("0"))
tk.MustExec("set @@tidb_store_limit = 0")

tk.MustExec("set global tidb_store_limit = 100")
tk.MustQuery("select @@tidb_store_limit;").Check(testkit.Rows("0"))
tk.MustQuery("select @@session.tidb_store_limit;").Check(testkit.Rows("0"))
tk.MustQuery("select @@global.tidb_store_limit;").Check(testkit.Rows("100"))
}

func (s *testSuite2) TestSetCharset(c *C) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ require (
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20191018025622-fbf07f9804da
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd
github.com/pingcap/parser v0.0.0-20191108104421-a8f0a51789f3
github.com/pingcap/parser v0.0.0-20191114042208-6edc40403122
github.com/pingcap/pd v1.1.0-beta.0.20191018040858-0d9d9d67d029
github.com/pingcap/tidb-tools v3.0.6-0.20191106033616-90632dda3863+incompatible
github.com/pingcap/tipb v0.0.0-20191101114505-cbd0e985c780
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ github.com/pingcap/kvproto v0.0.0-20191018025622-fbf07f9804da h1:dLmHmFq44tIq7yW
github.com/pingcap/kvproto v0.0.0-20191018025622-fbf07f9804da/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/parser v0.0.0-20191108104421-a8f0a51789f3 h1:igazArQuuuyXhR6T27JB7mfynBkEnnczL1A0kun9s4k=
github.com/pingcap/parser v0.0.0-20191108104421-a8f0a51789f3/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/parser v0.0.0-20191114042208-6edc40403122 h1:acSSlo1gyfBsqbjaqxRTbO7qjUzLDMP+ITToFBpCDMc=
github.com/pingcap/parser v0.0.0-20191114042208-6edc40403122/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/pd v1.1.0-beta.0.20191018040858-0d9d9d67d029 h1:nJp6nnW70cGtmOiRnjvoU8fSxhb5QwGNgYr/AtyPCgo=
github.com/pingcap/pd v1.1.0-beta.0.20191018040858-0d9d9d67d029/go.mod h1:+F+Zp4BoMM7TbDKCeosXO+X9A+IWaw/T3/gRBo1sr6Q=
github.com/pingcap/tidb-tools v3.0.6-0.20191106033616-90632dda3863+incompatible h1:H1jg0aDWz2SLRh3hNBo2HFtnuHtudIUvBumU7syRkic=
Expand Down
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func RegisterMetrics() {
prometheus.MustRegister(StmtNodeCounter)
prometheus.MustRegister(DbStmtNodeCounter)
prometheus.MustRegister(StoreQueryFeedbackCounter)
prometheus.MustRegister(GetStoreLimitErrorCounter)
prometheus.MustRegister(TiKVBackoffCounter)
prometheus.MustRegister(TiKVBackoffHistogram)
prometheus.MustRegister(TiKVCoprocessorHistogram)
Expand Down
8 changes: 8 additions & 0 deletions metrics/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ var (
Help: "Counter of storing query feedback.",
}, []string{LblType})

GetStoreLimitErrorCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "statistics",
Name: "get_store_limit_token_error",
Help: "store token is up to the limit, probably because one of the stores is the hotspot or unavailable",
}, []string{LblAddress, LblStore})

SignificantFeedbackCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "tidb",
Expand Down
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1716,6 +1716,7 @@ var builtinGlobalVariable = []string{
variable.TiDBTxnMode,
variable.TiDBEnableStmtSummary,
variable.TiDBMaxDeltaSchemaCount,
variable.TiDBStoreLimit,
}

var (
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/storeutil"
"github.com/pingcap/tidb/util/timeutil"
)

Expand Down Expand Up @@ -842,6 +843,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
} else if strings.EqualFold(val, "leader") || len(val) == 0 {
s.ReplicaRead = kv.ReplicaReadLeader
}
case TiDBStoreLimit:
storeutil.StoreLimit.Store(tidbOptInt64(val, DefTiDBStoreLimit))
}
s.systems[name] = val
return nil
Expand Down
2 changes: 2 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package variable
import (
"strconv"
"strings"
"sync/atomic"

"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
Expand Down Expand Up @@ -714,6 +715,7 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TiDBExpensiveQueryTimeThreshold, strconv.Itoa(DefTiDBExpensiveQueryTimeThreshold)},
{ScopeSession, TiDBReplicaRead, "leader"},
{ScopeGlobal | ScopeSession, TiDBEnableStmtSummary, "0"},
{ScopeGlobal | ScopeSession, TiDBStoreLimit, strconv.FormatInt(atomic.LoadInt64(&config.GetGlobalConfig().TiKVClient.StoreLimit), 10)},
}

// SynonymsSysVariables is synonyms of system variables.
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,9 @@ const (

// TiDBEnableStmtSummary indicates whether the statement summary is enabled.
TiDBEnableStmtSummary = "tidb_enable_stmt_summary"

// TiDBStoreLimit indicates the limit of sending request to a store, 0 means without limit.
TiDBStoreLimit = "tidb_store_limit"
)

// Default TiDB system variable values.
Expand Down Expand Up @@ -358,6 +361,7 @@ const (
DefTiDBWaitSplitRegionFinish = true
DefTiDBExpensiveQueryTimeThreshold = 60 // 60s
DefWaitSplitRegionTimeout = 300 // 300s
DefTiDBStoreLimit = 0
)

// Process global variables.
Expand Down
2 changes: 2 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
case SQLSelectLimit:
return checkUInt64SystemVar(name, value, 0, math.MaxUint64, vars)
case TiDBStoreLimit:
return checkInt64SystemVar(name, value, 0, math.MaxInt64, vars)
case SyncBinlog:
return checkUInt64SystemVar(name, value, 0, 4294967295, vars)
case TableDefinitionCache:
Expand Down
6 changes: 4 additions & 2 deletions store/mockstore/mocktikv/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/tablecodec"
"go.uber.org/atomic"
)

// Cluster simulates a TiKV cluster. It focuses on management and the change of
Expand Down Expand Up @@ -586,8 +587,9 @@ func (r *Region) incVersion() {

// Store is the Store's meta data.
type Store struct {
meta *metapb.Store
cancel bool // return context.Cancelled error when cancel is true.
meta *metapb.Store
cancel bool // return context.Cancelled error when cancel is true.
tokenCount atomic.Int64
}

func newStore(storeID uint64, addr string) *Store {
Expand Down
1 change: 1 addition & 0 deletions store/tikv/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var (
ErrRegionUnavailable = terror.ClassTiKV.New(mysql.ErrRegionUnavailable, mysql.MySQLErrName[mysql.ErrRegionUnavailable])
ErrTiKVServerBusy = terror.ClassTiKV.New(mysql.ErrTiKVServerBusy, mysql.MySQLErrName[mysql.ErrTiKVServerBusy])
ErrGCTooEarly = terror.ClassTiKV.New(mysql.ErrGCTooEarly, mysql.MySQLErrName[mysql.ErrGCTooEarly])
ErrTokenLimit = terror.ClassTiKV.New(mysql.ErrTiKVStoreLimit, mysql.MySQLErrName[mysql.ErrTiKVStoreLimit])
)

// ErrDeadlock wraps *kvrpcpb.Deadlock to implement the error interface.
Expand Down
12 changes: 7 additions & 5 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/util/logutil"
atomic2 "go.uber.org/atomic"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -990,11 +991,12 @@ func (r *Region) ContainsByEnd(key []byte) bool {

// Store contains a kv process's address.
type Store struct {
addr string // loaded store address
storeID uint64 // store's id
state uint64 // unsafe store storeState
resolveMutex sync.Mutex // protect pd from concurrent init requests
fail uint32 // store fail count, see RegionStore.storeFails
addr string // loaded store address
storeID uint64 // store's id
state uint64 // unsafe store storeState
resolveMutex sync.Mutex // protect pd from concurrent init requests
fail uint32 // store fail count, see RegionStore.storeFails
tokenCount atomic2.Int64 // used store token count
}

type resolveState uint64
Expand Down
32 changes: 32 additions & 0 deletions store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package tikv

import (
"context"
"strconv"
"sync/atomic"
"time"

Expand All @@ -26,6 +27,7 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/storeutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -148,6 +150,13 @@ func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, ctx *RPCContext, re
if e := tikvrpc.SetContext(req, ctx.Meta, ctx.Peer); e != nil {
return nil, false, errors.Trace(e)
}
// judge the store limit switch.
if limit := storeutil.StoreLimit.Load(); limit > 0 {
if err := s.getStoreToken(ctx.Store, limit); err != nil {
return nil, false, err
}
defer s.releaseStoreToken(ctx.Store)
}
resp, err = s.client.SendRequest(bo.ctx, ctx.Addr, req, timeout)
if err != nil {
s.rpcError = err
Expand All @@ -159,6 +168,29 @@ func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, ctx *RPCContext, re
return
}

func (s *RegionRequestSender) getStoreToken(st *Store, limit int64) error {
// Checking limit is not thread safe, preferring this for avoiding load in loop.
count := st.tokenCount.Load()
if count < limit {
// Adding tokenCount is no thread safe, preferring this for avoiding check in loop.
st.tokenCount.Add(1)
return nil
}
metrics.GetStoreLimitErrorCounter.WithLabelValues(st.addr, strconv.FormatUint(st.storeID, 10)).Inc()
return ErrTokenLimit.GenWithStackByArgs(st.storeID)

}

func (s *RegionRequestSender) releaseStoreToken(st *Store) {
count := st.tokenCount.Load()
// Decreasing tokenCount is no thread safe, preferring this for avoiding check in loop.
if count > 0 {
st.tokenCount.Sub(1)
return
}
logutil.Logger(context.Background()).Warn("release store token failed, count equals to 0")
}

func (s *RegionRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err error) error {
// If it failed because the context is cancelled by ourself, don't retry.
if errors.Cause(err) == context.Canceled {
Expand Down
45 changes: 45 additions & 0 deletions store/tikv/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/storeutil"
"google.golang.org/grpc"
)

Expand All @@ -42,7 +43,20 @@ type testRegionRequestSuite struct {
mvccStore mocktikv.MVCCStore
}

type testStoreLimitSuite struct {
cluster *mocktikv.Cluster
storeIDs []uint64
peerIDs []uint64
regionID uint64
leaderPeer uint64
cache *RegionCache
bo *Backoffer
regionRequestSender *RegionRequestSender
mvccStore mocktikv.MVCCStore
}

var _ = Suite(&testRegionRequestSuite{})
var _ = Suite(&testStoreLimitSuite{})

func (s *testRegionRequestSuite) SetUpTest(c *C) {
s.cluster = mocktikv.NewCluster()
Expand All @@ -55,10 +69,41 @@ func (s *testRegionRequestSuite) SetUpTest(c *C) {
s.regionRequestSender = NewRegionRequestSender(s.cache, client)
}

func (s *testStoreLimitSuite) SetUpTest(c *C) {
s.cluster = mocktikv.NewCluster()
s.storeIDs, s.peerIDs, s.regionID, s.leaderPeer = mocktikv.BootstrapWithMultiStores(s.cluster, 3)
pdCli := &codecPDClient{mocktikv.NewPDClient(s.cluster)}
s.cache = NewRegionCache(pdCli)
s.bo = NewNoopBackoff(context.Background())
s.mvccStore = mocktikv.MustNewMVCCStore()
client := mocktikv.NewRPCClient(s.cluster, s.mvccStore)
s.regionRequestSender = NewRegionRequestSender(s.cache, client)
}

func (s *testRegionRequestSuite) TearDownTest(c *C) {
s.cache.Close()
}

func (s *testStoreLimitSuite) TearDownTest(c *C) {
s.cache.Close()
}

func (s *testStoreLimitSuite) TestStoreTokenLimit(c *C) {
req := &tikvrpc.Request{Type: tikvrpc.CmdPrewrite, Prewrite: &kvrpcpb.PrewriteRequest{}, Context: kvrpcpb.Context{}}
region, err := s.cache.LocateRegionByID(s.bo, s.regionID)
c.Assert(err, IsNil)
c.Assert(region, NotNil)
oldStoreLimit := storeutil.StoreLimit.Load()
storeutil.StoreLimit.Store(500)
s.cache.getStoreByStoreID(s.storeIDs[0]).tokenCount.Store(500)
// cause there is only one region in this cluster, regionID maps this leader.
resp, err := s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second)
c.Assert(err, NotNil)
c.Assert(resp, IsNil)
c.Assert(err.Error(), Equals, "[tikv:9008]Store token is up to the limit, store id = 1")
storeutil.StoreLimit.Store(oldStoreLimit)
}

func (s *testRegionRequestSuite) TestOnSendFailedWithStoreRestart(c *C) {
req := &tikvrpc.Request{
Type: tikvrpc.CmdRawPut,
Expand Down
6 changes: 5 additions & 1 deletion tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/printer"
"github.com/pingcap/tidb/util/signal"
"github.com/pingcap/tidb/util/storeutil"
"github.com/pingcap/tidb/util/systimemon"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/push"
Expand Down Expand Up @@ -352,7 +353,7 @@ func loadConfig() string {
// hotReloadConfigItems lists all config items which support hot-reload.
var hotReloadConfigItems = []string{"Performance.MaxProcs", "Performance.MaxMemory", "Performance.CrossJoin",
"Performance.FeedbackProbability", "Performance.QueryFeedbackLimit", "Performance.PseudoEstimateRatio",
"OOMAction", "MemQuotaQuery", "StmtSummary.MaxStmtCount", "StmtSummary.MaxSQLLength"}
"OOMAction", "MemQuotaQuery", "StmtSummary.MaxStmtCount", "StmtSummary.MaxSQLLength", "TiKVClient.StoreLimit"}

func reloadConfig(nc, c *config.Config) {
// Just a part of config items need to be reload explicitly.
Expand All @@ -378,6 +379,9 @@ func reloadConfig(nc, c *config.Config) {
if nc.Performance.PseudoEstimateRatio != c.Performance.PseudoEstimateRatio {
statistics.RatioOfPseudoEstimate.Store(nc.Performance.PseudoEstimateRatio)
}
if nc.TiKVClient.StoreLimit != c.TiKVClient.StoreLimit {
storeutil.StoreLimit.Store(nc.TiKVClient.StoreLimit)
}
}

func overrideConfig() {
Expand Down
Loading

0 comments on commit 14f4b19

Please sign in to comment.