Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: move config file option committer-concurrency to sysvar #34368

Merged
merged 18 commits into from
May 13, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"os/user"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"

Expand All @@ -32,6 +33,7 @@ import (
zaplog "github.com/pingcap/log"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/tikvutil"
"github.com/pingcap/tidb/util/versioninfo"
tikvcfg "github.com/tikv/client-go/v2/config"
tracing "github.com/uber/jaeger-client-go/config"
Expand Down Expand Up @@ -140,6 +142,9 @@ var (
// DeprecatedOptions indicates the config options existing in some other sections in config file.
// They should be moved to [instance] section.
DeprecatedOptions []InstanceConfigSection

// TikvConfigLock protects against concurrent tikv config refresh
TikvConfigLock sync.Mutex
)

// Config contains configuration options.
Expand Down Expand Up @@ -260,9 +265,10 @@ func (c *Config) UpdateTempStoragePath() {
}
}

func (c *Config) getTiKVConfig() *tikvcfg.Config {
// GetTiKVConfig returns configuration options from tikvcfg
func (c *Config) GetTiKVConfig() *tikvcfg.Config {
return &tikvcfg.Config{
CommitterConcurrency: c.Performance.CommitterConcurrency,
CommitterConcurrency: int(tikvutil.CommitterConcurrency.Load()),
MaxTxnTTL: c.Performance.MaxTxnTTL,
TiKVClient: c.TiKVClient,
Security: c.Security.ClusterSecurity(),
Expand Down Expand Up @@ -595,9 +601,8 @@ type Performance struct {
RunAutoAnalyze bool `toml:"run-auto-analyze" json:"run-auto-analyze"`
DistinctAggPushDown bool `toml:"distinct-agg-push-down" json:"distinct-agg-push-down"`
// Whether enable projection push down for coprocessors (both tikv & tiflash), default false.
ProjectionPushDown bool `toml:"projection-push-down" json:"projection-push-down"`
CommitterConcurrency int `toml:"committer-concurrency" json:"committer-concurrency"`
MaxTxnTTL uint64 `toml:"max-txn-ttl" json:"max-txn-ttl"`
ProjectionPushDown bool `toml:"projection-push-down" json:"projection-push-down"`
MaxTxnTTL uint64 `toml:"max-txn-ttl" json:"max-txn-ttl"`
// Deprecated
MemProfileInterval string `toml:"-" json:"-"`
IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"`
Expand Down Expand Up @@ -820,7 +825,6 @@ var defaultConf = Config{
TxnTotalSizeLimit: DefTxnTotalSizeLimit,
DistinctAggPushDown: false,
ProjectionPushDown: false,
CommitterConcurrency: defTiKVCfg.CommitterConcurrency,
MaxTxnTTL: defTiKVCfg.MaxTxnTTL, // 1hour
// TODO: set indexUsageSyncLease to 60s.
IndexUsageSyncLease: "0s",
Expand Down Expand Up @@ -902,7 +906,9 @@ func GetGlobalConfig() *Config {
// StoreGlobalConfig stores a new config to the globalConf. It mostly uses in the test to avoid some data races.
func StoreGlobalConfig(config *Config) {
globalConf.Store(config)
cfg := *config.getTiKVConfig()
TikvConfigLock.Lock()
defer TikvConfigLock.Unlock()
cfg := *config.GetTiKVConfig()
tikvcfg.StoreGlobalConfig(&cfg)
}

Expand Down Expand Up @@ -932,6 +938,7 @@ var deprecatedConfig = map[string]struct{}{
"stmt-summary.history-size": {},
"mem-quota-query": {},
"query-log-max-len": {},
"performance.committer-concurrency": {},
}

func isAllDeprecatedConfigItems(items []string) bool {
Expand Down
3 changes: 0 additions & 3 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,6 @@ txn-total-size-limit = 104857600
# NOTE: Increasing this limit may cause performance problems.
txn-entry-size-limit = 6291456

# The max number of running concurrency two phase committer request for an SQL.
committer-concurrency = 128

# max lifetime of transaction ttl manager.
max-txn-ttl = 3600000

Expand Down
10 changes: 10 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ import (
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/stmtsummary"
"github.com/pingcap/tidb/util/tikvutil"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/pingcap/tidb/util/versioninfo"
tikvcfg "github.com/tikv/client-go/v2/config"
tikvstore "github.com/tikv/client-go/v2/kv"
atomic2 "go.uber.org/atomic"
)
Expand Down Expand Up @@ -693,6 +695,14 @@ var defaultSysVars = []*SysVar{
}, GetGlobal: func(s *SessionVars) (string, error) {
return fmt.Sprint(QueryLogMaxLen.Load()), nil
}},
{Scope: ScopeGlobal, Name: TiDBCommitterConcurrency, Value: strconv.Itoa(DefTiDBCommitterConcurrency), Type: TypeInt, MinValue: 1, MaxValue: 10000, SetGlobal: func(s *SessionVars, val string) error {
tikvutil.CommitterConcurrency.Store(int32(TidbOptInt64(val, DefTiDBCommitterConcurrency)))
cfg := config.GetGlobalConfig().GetTiKVConfig()
tikvcfg.StoreGlobalConfig(cfg)
return nil
morgo marked this conversation as resolved.
Show resolved Hide resolved
}, GetGlobal: func(s *SessionVars) (string, error) {
return fmt.Sprint(tikvutil.CommitterConcurrency.Load()), nil
}},
{Scope: ScopeGlobal, Name: TiDBMemQuotaAnalyze, Value: strconv.Itoa(DefTiDBMemQuotaAnalyze), Type: TypeInt, MinValue: -1, MaxValue: math.MaxInt64,
GetGlobal: func(s *SessionVars) (string, error) {
return strconv.FormatInt(GetMemQuotaAnalyze(), 10), nil
Expand Down
26 changes: 26 additions & 0 deletions sessionctx/variable/sysvar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,3 +960,29 @@ func TestTiDBQueryLogMaxLen(t *testing.T) {
require.Equal(t, val, fmt.Sprintf("%d", expected))
require.NoError(t, err)
}

func TestTiDBCommitterConcurrency(t *testing.T) {
sv := GetSysVar(TiDBCommitterConcurrency)
vars := NewSessionVars()

newVal := 1024
val, err := sv.Validate(vars, fmt.Sprintf("%d", newVal), ScopeGlobal)
require.Equal(t, val, "1024")
require.NoError(t, err)

// out of range
newVal = 10001
expected := 10000
val, err = sv.Validate(vars, fmt.Sprintf("%d", newVal), ScopeGlobal)
// expected to truncate
require.Equal(t, val, fmt.Sprintf("%d", expected))
require.NoError(t, err)

// min value out of range
newVal = 0
expected = 1
val, err = sv.Validate(vars, fmt.Sprintf("%d", newVal), ScopeGlobal)
// expected to set to min value
require.Equal(t, val, fmt.Sprintf("%d", expected))
require.NoError(t, err)
}
3 changes: 3 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,8 @@ const (
TiDBMemQuotaBindingCache = "tidb_mem_quota_binding_cache"
// TiDBRCReadCheckTS indicates the tso optimization for read-consistency read is enabled.
TiDBRCReadCheckTS = "tidb_rc_read_check_ts"
// TiDBCommitterConcurrency controls the number of running concurrent requests in the commit phase.
TiDBCommitterConcurrency = "tidb_committer_concurrency"
// TiDBStatsCacheMemQuota records stats cache quota
TiDBStatsCacheMemQuota = "tidb_stats_cache_mem_quota"
// TiDBMemQuotaAnalyze indicates the memory quota for all analyze jobs.
Expand Down Expand Up @@ -852,6 +854,7 @@ const (
DefTiDBStatsCacheMemQuota = 0
MaxTiDBStatsCacheMemQuota = 1024 * 1024 * 1024 * 1024 // 1TB
DefTiDBQueryLogMaxLen = 4096
DefTiDBCommitterConcurrency = 128
DefTiDBBatchDMLIgnoreError = false
DefTiDBMemQuotaAnalyze = -1
)
Expand Down
22 changes: 22 additions & 0 deletions util/tikvutil/tikvutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tikvutil

import (
"go.uber.org/atomic"
)

// CommitterConcurrency stores the current value of the sysvar tidb_committer_concurrency
var CommitterConcurrency = atomic.NewInt32(128)
morgo marked this conversation as resolved.
Show resolved Hide resolved