Skip to content

Commit

Permalink
*: move config file option committer-concurrency to sysvar (#34368)
Browse files Browse the repository at this point in the history
ref #33769
  • Loading branch information
espresso98 authored May 13, 2022
1 parent bbfbe13 commit 0017963
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 10 deletions.
21 changes: 14 additions & 7 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"os/user"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"

Expand All @@ -32,6 +33,7 @@ import (
zaplog "github.com/pingcap/log"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/tikvutil"
"github.com/pingcap/tidb/util/versioninfo"
tikvcfg "github.com/tikv/client-go/v2/config"
tracing "github.com/uber/jaeger-client-go/config"
Expand Down Expand Up @@ -140,6 +142,9 @@ var (
// DeprecatedOptions indicates the config options existing in some other sections in config file.
// They should be moved to [instance] section.
DeprecatedOptions []InstanceConfigSection

// TikvConfigLock protects against concurrent tikv config refresh
TikvConfigLock sync.Mutex
)

// Config contains configuration options.
Expand Down Expand Up @@ -265,9 +270,10 @@ func (c *Config) UpdateTempStoragePath() {
}
}

func (c *Config) getTiKVConfig() *tikvcfg.Config {
// GetTiKVConfig returns configuration options from tikvcfg
func (c *Config) GetTiKVConfig() *tikvcfg.Config {
return &tikvcfg.Config{
CommitterConcurrency: c.Performance.CommitterConcurrency,
CommitterConcurrency: int(tikvutil.CommitterConcurrency.Load()),
MaxTxnTTL: c.Performance.MaxTxnTTL,
TiKVClient: c.TiKVClient,
Security: c.Security.ClusterSecurity(),
Expand Down Expand Up @@ -600,9 +606,8 @@ type Performance struct {
RunAutoAnalyze bool `toml:"run-auto-analyze" json:"run-auto-analyze"`
DistinctAggPushDown bool `toml:"distinct-agg-push-down" json:"distinct-agg-push-down"`
// Whether enable projection push down for coprocessors (both tikv & tiflash), default false.
ProjectionPushDown bool `toml:"projection-push-down" json:"projection-push-down"`
CommitterConcurrency int `toml:"committer-concurrency" json:"committer-concurrency"`
MaxTxnTTL uint64 `toml:"max-txn-ttl" json:"max-txn-ttl"`
ProjectionPushDown bool `toml:"projection-push-down" json:"projection-push-down"`
MaxTxnTTL uint64 `toml:"max-txn-ttl" json:"max-txn-ttl"`
// Deprecated
MemProfileInterval string `toml:"-" json:"-"`
IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"`
Expand Down Expand Up @@ -825,7 +830,6 @@ var defaultConf = Config{
TxnTotalSizeLimit: DefTxnTotalSizeLimit,
DistinctAggPushDown: false,
ProjectionPushDown: false,
CommitterConcurrency: defTiKVCfg.CommitterConcurrency,
MaxTxnTTL: defTiKVCfg.MaxTxnTTL, // 1hour
// TODO: set indexUsageSyncLease to 60s.
IndexUsageSyncLease: "0s",
Expand Down Expand Up @@ -907,7 +911,9 @@ func GetGlobalConfig() *Config {
// StoreGlobalConfig stores a new config to the globalConf. It mostly uses in the test to avoid some data races.
func StoreGlobalConfig(config *Config) {
globalConf.Store(config)
cfg := *config.getTiKVConfig()
TikvConfigLock.Lock()
defer TikvConfigLock.Unlock()
cfg := *config.GetTiKVConfig()
tikvcfg.StoreGlobalConfig(&cfg)
}

Expand Down Expand Up @@ -938,6 +944,7 @@ var deprecatedConfig = map[string]struct{}{
"enable-batch-dml": {}, // use tidb_enable_batch_dml
"mem-quota-query": {},
"query-log-max-len": {},
"performance.committer-concurrency": {},
}

func isAllDeprecatedConfigItems(items []string) bool {
Expand Down
3 changes: 0 additions & 3 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,6 @@ txn-total-size-limit = 104857600
# NOTE: Increasing this limit may cause performance problems.
txn-entry-size-limit = 6291456

# The max number of running concurrency two phase committer request for an SQL.
committer-concurrency = 128

# max lifetime of transaction ttl manager.
max-txn-ttl = 3600000

Expand Down
10 changes: 10 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ import (
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/stmtsummary"
"github.com/pingcap/tidb/util/tikvutil"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/pingcap/tidb/util/versioninfo"
tikvcfg "github.com/tikv/client-go/v2/config"
tikvstore "github.com/tikv/client-go/v2/kv"
atomic2 "go.uber.org/atomic"
)
Expand Down Expand Up @@ -699,6 +701,14 @@ var defaultSysVars = []*SysVar{
}, GetGlobal: func(s *SessionVars) (string, error) {
return fmt.Sprint(QueryLogMaxLen.Load()), nil
}},
{Scope: ScopeGlobal, Name: TiDBCommitterConcurrency, Value: strconv.Itoa(DefTiDBCommitterConcurrency), Type: TypeInt, MinValue: 1, MaxValue: 10000, SetGlobal: func(s *SessionVars, val string) error {
tikvutil.CommitterConcurrency.Store(int32(TidbOptInt64(val, DefTiDBCommitterConcurrency)))
cfg := config.GetGlobalConfig().GetTiKVConfig()
tikvcfg.StoreGlobalConfig(cfg)
return nil
}, GetGlobal: func(s *SessionVars) (string, error) {
return fmt.Sprint(tikvutil.CommitterConcurrency.Load()), nil
}},
{Scope: ScopeGlobal, Name: TiDBMemQuotaAnalyze, Value: strconv.Itoa(DefTiDBMemQuotaAnalyze), Type: TypeInt, MinValue: -1, MaxValue: math.MaxInt64,
GetGlobal: func(s *SessionVars) (string, error) {
return strconv.FormatInt(GetMemQuotaAnalyze(), 10), nil
Expand Down
26 changes: 26 additions & 0 deletions sessionctx/variable/sysvar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,3 +960,29 @@ func TestTiDBQueryLogMaxLen(t *testing.T) {
require.Equal(t, val, fmt.Sprintf("%d", expected))
require.NoError(t, err)
}

func TestTiDBCommitterConcurrency(t *testing.T) {
sv := GetSysVar(TiDBCommitterConcurrency)
vars := NewSessionVars()

newVal := 1024
val, err := sv.Validate(vars, fmt.Sprintf("%d", newVal), ScopeGlobal)
require.Equal(t, val, "1024")
require.NoError(t, err)

// out of range
newVal = 10001
expected := 10000
val, err = sv.Validate(vars, fmt.Sprintf("%d", newVal), ScopeGlobal)
// expected to truncate
require.Equal(t, val, fmt.Sprintf("%d", expected))
require.NoError(t, err)

// min value out of range
newVal = 0
expected = 1
val, err = sv.Validate(vars, fmt.Sprintf("%d", newVal), ScopeGlobal)
// expected to set to min value
require.Equal(t, val, fmt.Sprintf("%d", expected))
require.NoError(t, err)
}
3 changes: 3 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,8 @@ const (
TiDBMemQuotaBindingCache = "tidb_mem_quota_binding_cache"
// TiDBRCReadCheckTS indicates the tso optimization for read-consistency read is enabled.
TiDBRCReadCheckTS = "tidb_rc_read_check_ts"
// TiDBCommitterConcurrency controls the number of running concurrent requests in the commit phase.
TiDBCommitterConcurrency = "tidb_committer_concurrency"
// TiDBEnableBatchDML enables batch dml.
TiDBEnableBatchDML = "tidb_enable_batch_dml"
// TiDBStatsCacheMemQuota records stats cache quota
Expand Down Expand Up @@ -855,6 +857,7 @@ const (
DefTiDBStatsCacheMemQuota = 0
MaxTiDBStatsCacheMemQuota = 1024 * 1024 * 1024 * 1024 // 1TB
DefTiDBQueryLogMaxLen = 4096
DefTiDBCommitterConcurrency = 128
DefTiDBBatchDMLIgnoreError = false
DefTiDBMemQuotaAnalyze = -1
)
Expand Down
22 changes: 22 additions & 0 deletions util/tikvutil/tikvutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tikvutil

import (
"go.uber.org/atomic"
)

// CommitterConcurrency stores the current value of the sysvar tidb_committer_concurrency
var CommitterConcurrency = atomic.NewInt32(128)

0 comments on commit 0017963

Please sign in to comment.