Skip to content

Commit

Permalink
adding changes to disable automatic bind throttle
Browse files Browse the repository at this point in the history
  • Loading branch information
rajesh-1983 committed Apr 19, 2024
1 parent d5699eb commit a2a3b1b
Show file tree
Hide file tree
Showing 3 changed files with 282 additions and 33 deletions.
50 changes: 32 additions & 18 deletions lib/bindevict.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,22 @@ type BindEvict struct {
// evicted binds get throttled to have overall steady state during bad bind queries
// nested map uses sqlhash "bindName|bindValue"
BindThrottle map[uint32]map[string]*BindThrottle
lock sync.Mutex
lock sync.Mutex
}

func GetBindEvict() *BindEvict {
cfg := gBindEvict.Load()
if cfg == nil {
out := BindEvict{BindThrottle:make(map[uint32]map[string]*BindThrottle)}
out := BindEvict{BindThrottle: make(map[uint32]map[string]*BindThrottle)}
gBindEvict.Store(&out)
return &out
}
return cfg.(*BindEvict)
}
func (this *BindEvict) Copy() *BindEvict {
out := BindEvict{BindThrottle:make(map[uint32]map[string]*BindThrottle)}
for k,v := range this.BindThrottle {

func (bindEvict *BindEvict) Copy() *BindEvict {
out := BindEvict{BindThrottle: make(map[uint32]map[string]*BindThrottle)}
for k, v := range bindEvict.BindThrottle {
out.BindThrottle[k] = v
}
return &out
Expand All @@ -77,37 +78,44 @@ func NormalizeBindName(bindName0 string) string {

func (entry *BindThrottle) decrAllowEveryX(y int) {
if y >= 2 && logger.GetLogger().V(logger.Warning) {
info := fmt.Sprintf("hash:%d bindName:%s val:%s allowEveryX:%d-%d",entry.Sqlhash, entry.Name, entry.Value, entry.AllowEveryX, y)
info := fmt.Sprintf("hash:%d bindName:%s val:%s allowEveryX:%d-%d", entry.Sqlhash, entry.Name, entry.Value, entry.AllowEveryX, y)
logger.GetLogger().Log(logger.Warning, "bind throttle decr", info)
}
entry.AllowEveryX -= y
if entry.AllowEveryX > 0 {
return
}
entry.AllowEveryX = 0
GetBindEvict().lock.Lock()
defer GetBindEvict().lock.Unlock()
bindEvict := GetBindEvict()
bindEvict.lock.Lock()
defer bindEvict.lock.Unlock()
// delete entry
if len(GetBindEvict().BindThrottle[entry.Sqlhash]) == 1 {
updateCopy := GetBindEvict().Copy()
if len(bindEvict.BindThrottle[entry.Sqlhash]) == 1 {
updateCopy := bindEvict.Copy()
delete(updateCopy.BindThrottle, entry.Sqlhash)
gBindEvict.Store(updateCopy)
} else {
// copy everything except bindKV (skipping it is deleting it)
bindKV := fmt.Sprintf("%s|%s", entry.Name, entry.Value)
updateCopy := make(map[string]*BindThrottle)
for k,v := range GetBindEvict().BindThrottle[entry.Sqlhash] {
for k, v := range bindEvict.BindThrottle[entry.Sqlhash] {
if k == bindKV {
continue
}
updateCopy[k] = v
}
GetBindEvict().BindThrottle[entry.Sqlhash] = updateCopy
if len(updateCopy) > 0 {
bindEvict.BindThrottle[entry.Sqlhash] = updateCopy
} else {
delete(bindEvict.BindThrottle, entry.Sqlhash)
}
gBindEvict.Store(bindEvict)
}
}

func (entry *BindThrottle) incrAllowEveryX() {
if logger.GetLogger().V(logger.Warning) {
info := fmt.Sprintf("hash:%d bindName:%s val:%s prev:%d",entry.Sqlhash, entry.Name, entry.Value, entry.AllowEveryX)
info := fmt.Sprintf("hash:%d bindName:%s val:%s prev:%d", entry.Sqlhash, entry.Name, entry.Value, entry.AllowEveryX)
logger.GetLogger().Log(logger.Warning, "bind throttle incr", info)
}
entry.AllowEveryX = 3*entry.AllowEveryX + 1
Expand All @@ -116,10 +124,16 @@ func (entry *BindThrottle) incrAllowEveryX() {
}
}

func (be *BindEvict) ShouldBlock(sqlhash uint32, bindKV map[string]string, heavyUsage bool) (bool, *BindThrottle) {
GetBindEvict().lock.Lock()
sqlBinds := GetBindEvict().BindThrottle[sqlhash]
GetBindEvict().lock.Unlock()
func (bindEvict *BindEvict) ShouldBlock(sqlhash uint32, bindKV map[string]string, heavyUsage bool) (bool, *BindThrottle) {
entryTime := time.Now()
defer func() {
if logger.GetLogger().V(logger.Verbose) {
logger.GetLogger().Log(logger.Info, fmt.Sprintf("bind throttle check operation exec duration is %v microseconds Bind-eviction decrese per sec %v", time.Now().Sub(entryTime).Microseconds(), GetConfig().BindEvictionDecrPerSec))
}
}()
bindEvict.lock.Lock()
sqlBinds := bindEvict.BindThrottle[sqlhash]
bindEvict.lock.Unlock()
for k0, v := range bindKV /*parseBinds(request)*/ {
k := NormalizeBindName(k0)
concatKey := fmt.Sprintf("%s|%s", k, v)
Expand Down Expand Up @@ -149,7 +163,7 @@ func (be *BindEvict) ShouldBlock(sqlhash uint32, bindKV map[string]string, heavy
entry.RecentAttempt.Store(&now)
entry.AllowEveryXCount++
if entry.AllowEveryXCount < entry.AllowEveryX {
return true/*block*/, entry
return true /*block*/, entry
}
entry.AllowEveryXCount = 0

Expand Down
28 changes: 13 additions & 15 deletions lib/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ type Config struct {
// time_skew_threshold_error(15)
TimeSkewThresholdErrorSec int
// max_stranded_time_interval(2000)
StrandedWorkerTimeoutMs int
StrandedWorkerTimeoutMs int
HighLoadStrandedWorkerTimeoutMs int
HighLoadSkipInitiateRecoverPct int
HighLoadPct int
InitLimitPct int
HighLoadSkipInitiateRecoverPct int
HighLoadPct int
InitLimitPct int

// the worker scheduler policy
LifoScheduler bool
Expand All @@ -110,7 +110,7 @@ type Config struct {
HostnamePrefix map[string]string
ShardingCrossKeysErr bool

CfgFromTns bool
CfgFromTns bool
CfgFromTnsOverrideNumShards int // -1 no-override
CfgFromTnsOverrideTaf int // -1 no-override, 0 override-false, 1 override-true
CfgFromTnsOverrideRWSplit int // -1 no-override, readChildPct
Expand Down Expand Up @@ -156,8 +156,8 @@ type Config struct {
// when numWorkers changes, it will write to this channel, for worker manager to update
numWorkersCh chan int

EnableConnLimitCheck bool
EnableQueryBindBlocker bool
EnableConnLimitCheck bool
EnableQueryBindBlocker bool
QueryBindBlockerMinSqlPrefix int

// taf testing
Expand All @@ -169,7 +169,7 @@ type Config struct {
EnableDanglingWorkerRecovery bool

GoStatsInterval int
RandomStartMs int
RandomStartMs int

// The max number of database connections to be established per second
MaxDbConnectsPerSec int
Expand Down Expand Up @@ -274,10 +274,9 @@ func InitConfig() error {
gAppConfig.StrandedWorkerTimeoutMs = cdb.GetOrDefaultInt("max_stranded_time_interval", 2000)
gAppConfig.HighLoadStrandedWorkerTimeoutMs = cdb.GetOrDefaultInt("high_load_max_stranded_time_interval", 600111)
gAppConfig.HighLoadSkipInitiateRecoverPct = cdb.GetOrDefaultInt("high_load_skip_initiate_recover_pct", 80)
gAppConfig.HighLoadPct = cdb.GetOrDefaultInt("high_load_pct", 130) // >100 disabled
gAppConfig.HighLoadPct = cdb.GetOrDefaultInt("high_load_pct", 130) // >100 disabled
gAppConfig.InitLimitPct = cdb.GetOrDefaultInt("init_limit_pct", 125) // >100 disabled


gAppConfig.StateLogInterval = cdb.GetOrDefaultInt("state_log_interval", 1)
if gAppConfig.StateLogInterval <= 0 {
gAppConfig.StateLogInterval = 1
Expand All @@ -300,7 +299,7 @@ func InitConfig() error {
gAppConfig.ChildExecutable = "postgresworker"
}
} else {
// db type is not supported
// db type is not supported
return errors.New("database type must be either Oracle or MySQL")
}

Expand Down Expand Up @@ -422,12 +421,11 @@ func InitConfig() error {
default_evict_names := fmt.Sprintf("id,num,%s", SrcPrefixAppKey)
gAppConfig.BindEvictionNames = cdb.GetOrDefaultString("bind_eviction_names", default_evict_names)
gAppConfig.BindEvictionThresholdPct = cdb.GetOrDefaultInt("bind_eviction_threshold_pct", 60)
fmt.Sscanf(cdb.GetOrDefaultString("bind_eviction_decr_per_sec", "10.0"),
fmt.Sscanf(cdb.GetOrDefaultString("bind_eviction_decr_per_sec", "10000000.0"),
"%f", &gAppConfig.BindEvictionDecrPerSec)

gAppConfig.SkipEvictRegex= cdb.GetOrDefaultString("skip_eviction_host_prefix","")
gAppConfig.EvictRegex= cdb.GetOrDefaultString("eviction_host_prefix", "")

gAppConfig.SkipEvictRegex = cdb.GetOrDefaultString("skip_eviction_host_prefix", "")
gAppConfig.EvictRegex = cdb.GetOrDefaultString("eviction_host_prefix", "")

gAppConfig.BouncerEnabled = cdb.GetOrDefaultBool("bouncer_enabled", true)
gAppConfig.BouncerStartupDelay = cdb.GetOrDefaultInt("bouncer_startup_delay", 10)
Expand Down
Loading

0 comments on commit a2a3b1b

Please sign in to comment.