Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: move config file option tidb_txn_total_size_limit and tidb_txn_entry_size_limit to sysvar #34448

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 8 additions & 12 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ import (
// Config number limitations
const (
MaxLogFileSize = 4096 // MB
// DefTxnEntrySizeLimit is the default value of TxnEntrySizeLimit.
DefTxnEntrySizeLimit = 6 * 1024 * 1024
// DefTxnTotalSizeLimit is the default value of TxnTxnTotalSizeLimit.
DefTxnTotalSizeLimit = 100 * 1024 * 1024
// DefMaxIndexLength is the maximum index length(in bytes). This value is consistent with MySQL.
DefMaxIndexLength = 3072
// DefMaxOfMaxIndexLength is the maximum index length(in bytes) for TiDB v3.0.7 and previous version.
Expand Down Expand Up @@ -598,8 +594,6 @@ type Performance struct {
PseudoEstimateRatio float64 `toml:"pseudo-estimate-ratio" json:"pseudo-estimate-ratio"`
ForcePriority string `toml:"force-priority" json:"force-priority"`
BindInfoLease string `toml:"bind-info-lease" json:"bind-info-lease"`
TxnEntrySizeLimit uint64 `toml:"txn-entry-size-limit" json:"txn-entry-size-limit"`
TxnTotalSizeLimit uint64 `toml:"txn-total-size-limit" json:"txn-total-size-limit"`
TCPKeepAlive bool `toml:"tcp-keep-alive" json:"tcp-keep-alive"`
TCPNoDelay bool `toml:"tcp-no-delay" json:"tcp-no-delay"`
CrossJoin bool `toml:"cross-join" json:"cross-join"`
Expand All @@ -617,6 +611,12 @@ type Performance struct {
StatsLoadConcurrency uint `toml:"stats-load-concurrency" json:"stats-load-concurrency"`
StatsLoadQueueSize uint `toml:"stats-load-queue-size" json:"stats-load-queue-size"`
EnableStatsCacheMemQuota bool `toml:"enable-stats-cache-mem-quota" json:"enable-stats-cache-mem-quota"`
// The following items are deprecated. We need to keep them here temporarily
// to support the upgrade process. They can be removed in future.

// TxnEntrySizeLimit and TxnTotalSizeLimit , unused since bootstrap v91
TxnEntrySizeLimit uint64 `toml:"txn-entry-size-limit" json:"txn-entry-size-limit"`
TxnTotalSizeLimit uint64 `toml:"txn-total-size-limit" json:"txn-total-size-limit"`
}

// PlanCache is the PlanCache section of the config.
Expand Down Expand Up @@ -826,8 +826,6 @@ var defaultConf = Config{
PseudoEstimateRatio: 0.8,
ForcePriority: "NO_PRIORITY",
BindInfoLease: "3s",
TxnEntrySizeLimit: DefTxnEntrySizeLimit,
TxnTotalSizeLimit: DefTxnTotalSizeLimit,
DistinctAggPushDown: false,
ProjectionPushDown: false,
MaxTxnTTL: defTiKVCfg.MaxTxnTTL, // 1hour
Expand Down Expand Up @@ -944,6 +942,8 @@ var deprecatedConfig = map[string]struct{}{
"enable-batch-dml": {}, // use tidb_enable_batch_dml
"mem-quota-query": {},
"query-log-max-len": {},
"performance.txn-total-size-limit": {}, // use tidb_txn_total_size_limit
"performance.txn-entry-size-limit": {}, // use tidb_txn_entry_size_limit
"performance.committer-concurrency": {},
}

Expand Down Expand Up @@ -1114,10 +1114,6 @@ func (c *Config) Valid() error {
return err
}

if c.Performance.TxnTotalSizeLimit > 1<<40 {
return fmt.Errorf("txn-total-size-limit should be less than %d", 1<<40)
}

if c.Instance.MemoryUsageAlarmRatio > 1 || c.Instance.MemoryUsageAlarmRatio < 0 {
return fmt.Errorf("tidb_memory_usage_alarm_ratio in [Instance] must be greater than or equal to 0 and less than or equal to 1")
}
Expand Down
13 changes: 1 addition & 12 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -251,17 +251,6 @@ bind-info-lease = "3s"
# Whether support pushing down aggregation with distinct to cop task
distinct-agg-push-down = false

# The limitation of the size in byte for the entries in one transaction.
# If using TiKV as the storage, the entry represents a key/value pair.
# NOTE: If binlog is enabled with Kafka (e.g. arbiter cluster),
# this value should be less than 1073741824(1G) because this is the maximum size that can be handled by Kafka.
# If binlog is disabled or binlog is enabled without Kafka, this value should be less than 1099511627776(1T).
txn-total-size-limit = 104857600

# The limitation of the size in byte for each entry in one transaction.
# NOTE: Increasing this limit may cause performance problems.
txn-entry-size-limit = 6291456

# max lifetime of transaction ttl manager.
max-txn-ttl = 3600000

Expand Down Expand Up @@ -394,7 +383,7 @@ capacity-mb = 1000.0
[binlog]
# enable to write binlog.
# NOTE: If binlog is enabled with Kafka (e.g. arbiter cluster),
# txn-total-size-limit should be less than 1073741824(1G) because this is the maximum size that can be handled by Kafka.
# The sysvar tidb_txn_total_size_limit should be less than 1073741824(1G) because this is the maximum size that can be handled by Kafka.
enable = false

# WriteTimeout specifies how long it will wait for writing binlog to pump.
Expand Down
22 changes: 0 additions & 22 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ func TestConfig(t *testing.T) {
conf.Binlog.Enable = true
conf.Binlog.IgnoreError = true
conf.Binlog.Strategy = "hash"
conf.Performance.TxnTotalSizeLimit = 1000
conf.TiKVClient.CommitTimeout = "10s"
conf.TiKVClient.RegionCacheTTL = 600
conf.Instance.EnableSlowLog.Store(logutil.DefaultTiDBEnableSlowLog)
Expand Down Expand Up @@ -221,7 +220,6 @@ enable-enum-length-limit = false
stores-refresh-interval = 30
enable-forwarding = true
[performance]
txn-total-size-limit=2000
tcp-no-delay = false
[tikv-client]
commit-timeout="41s"
Expand Down Expand Up @@ -267,7 +265,6 @@ grpc-max-send-msg-size = 40960
require.Equal(t, "hash", conf.Binlog.Strategy)

// Test that the value will be overwritten by the config file.
require.Equal(t, uint64(2000), conf.Performance.TxnTotalSizeLimit)
require.True(t, conf.AlterPrimaryKey)
require.False(t, conf.Performance.TCPNoDelay)

Expand Down Expand Up @@ -490,25 +487,6 @@ func TestOOMActionValid(t *testing.T) {
}
}

func TestTxnTotalSizeLimitValid(t *testing.T) {
conf := NewConfig()
tests := []struct {
limit uint64
valid bool
}{
{4 << 10, true},
{10 << 30, true},
{10<<30 + 1, true},
{1 << 40, true},
{1<<40 + 1, false},
}

for _, tt := range tests {
conf.Performance.TxnTotalSizeLimit = tt.limit
require.Equal(t, tt.valid, conf.Valid() == nil)
}
}

func TestPreparePlanCacheValid(t *testing.T) {
conf := NewConfig()
tests := map[PreparedPlanCache]bool{
Expand Down
10 changes: 2 additions & 8 deletions executor/seqtest/seq_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -935,16 +935,10 @@ func TestCartesianProduct(t *testing.T) {
func TestBatchInsertDelete(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()

originLimit := atomic.LoadUint64(&kv.TxnTotalSizeLimit)
defer func() {
atomic.StoreUint64(&kv.TxnTotalSizeLimit, originLimit)
}()
// Set the limitation to a small value, make it easier to reach the limitation.
atomic.StoreUint64(&kv.TxnTotalSizeLimit, 5500)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set global tidb_txn_total_size_limit = 5500")
morgo marked this conversation as resolved.
Show resolved Hide resolved
defer tk.MustExec("set global tidb_txn_total_size_limit = default")
tk.MustExec("drop table if exists batch_insert")
tk.MustExec("create table batch_insert (c int)")
tk.MustExec("drop table if exists batch_insert_on_duplicate")
Expand Down
6 changes: 3 additions & 3 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
deadlockpb "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/trxevents"
Expand All @@ -32,6 +31,7 @@ import (
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
pd "github.com/tikv/pd/client"
"go.uber.org/atomic"
)

// UnCommitIndexKVFlag uses to indicate the index key/value is no need to commit.
Expand All @@ -46,9 +46,9 @@ const UnCommitIndexKVFlag byte = '1'
// Those limits is enforced to make sure the transaction can be well handled by TiKV.
var (
// TxnEntrySizeLimit is limit of single entry size (len(key) + len(value)).
TxnEntrySizeLimit uint64 = config.DefTxnEntrySizeLimit
TxnEntrySizeLimit = atomic.NewUint64(10485760) //DefTiDBTxnEntrySizeLimit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not using DefTiDBTxnEntrySizeLimit directly? BTW 10485760 is not equal to DefTiDBTxnEntrySizeLimit((6 * 1024 * 1024))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Afaik this is due to circular dependencies since sessionctx/variable imports kv. But I agree we can change the starting value to 6M. It will be overwritten in the startup procedure though, as the sysvar cache is populated.

// TxnTotalSizeLimit is limit of the sum of all entry size.
TxnTotalSizeLimit uint64 = config.DefTxnTotalSizeLimit
TxnTotalSizeLimit = atomic.NewUint64(100 * 1024 * 1024) //DefTiDBTxnTotalSizeLimit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

)

// Getter is the interface for the Get method.
Expand Down
2 changes: 1 addition & 1 deletion planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2522,7 +2522,7 @@ func (b *PlanBuilder) buildAnalyzeAllIndex(as *ast.AnalyzeTableStmt, opts map[as
}

// CMSketchSizeLimit indicates the size limit of CMSketch.
var CMSketchSizeLimit = kv.TxnEntrySizeLimit / binary.MaxVarintLen32
var CMSketchSizeLimit = kv.TxnEntrySizeLimit.Load() / binary.MaxVarintLen32
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable is initialized once and will not be updated with the change of TxnEntrySizeLimit, it could be a risk.

@XuHuaiyu @chrysan Do you have any idea how this should work with the new tidb_entry_size_limit variable?


var analyzeOptionLimit = map[ast.AnalyzeOptionType]uint64{
ast.AnalyzeOptNumBuckets: 1024,
Expand Down
10 changes: 3 additions & 7 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
tmysql "github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util/versioninfo"
Expand Down Expand Up @@ -868,17 +867,14 @@ func (cli *testServerClient) runTestLoadData(t *testing.T, server *Server) {
"xxx row4_col1 - 900\n" +
"xxx row5_col1 - row5_col3")
require.NoError(t, err)

originalTxnTotalSizeLimit := kv.TxnTotalSizeLimit
// If the MemBuffer can't be committed once in each batch, it will return an error like "transaction is too large".
kv.TxnTotalSizeLimit = 10240
defer func() { kv.TxnTotalSizeLimit = originalTxnTotalSizeLimit }()

// support ClientLocalFiles capability
cli.runTestsOnNewDB(t, func(config *mysql.Config) {
config.AllowAllFiles = true
config.Params["sql_mode"] = "''"
}, "LoadData", func(dbt *testkit.DBTestKit) {
// If the MemBuffer can't be committed once in each batch, it will return an error like "transaction is too large".
dbt.MustExec("set global tidb_txn_total_size_limit = 10240")
morgo marked this conversation as resolved.
Show resolved Hide resolved
defer dbt.MustExec("set global tidb_txn_total_size_limit = default")
dbt.MustExec("set @@tidb_dml_batch_size = 3")
dbt.MustExec("create table test (a varchar(255), b varchar(255) default 'default value', c int not null auto_increment, primary key(c))")
dbt.MustExec("create view v1 as select 1")
Expand Down
12 changes: 12 additions & 0 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,8 @@ const (
version89 = 89
// version90 converts enable-batch-dml to a sysvar
version90 = 90
// version91 converts txn-total-size-limit and txn-entry-size-limit to a sysvar
version91 = 91
)

// currentBootstrapVersion is defined as a variable, so we can modify its value for testing.
Expand Down Expand Up @@ -715,6 +717,7 @@ var (
upgradeToVer88,
upgradeToVer89,
upgradeToVer90,
upgradeToVer91,
}
)

Expand Down Expand Up @@ -1847,6 +1850,15 @@ func upgradeToVer90(s Session, ver int64) {
valStr := variable.BoolToOnOff(config.GetGlobalConfig().EnableBatchDML)
importConfigOption(s, "enable-batch-dml", variable.TiDBEnableBatchDML, valStr)
}
func upgradeToVer91(s Session, ver int64) {
if ver >= version91 {
return
}
valStr := strconv.FormatUint(uint64(config.GetGlobalConfig().Performance.TxnEntrySizeLimit), 10)
importConfigOption(s, "txn-entry-size-limit", variable.TiDBTxnEntrySizeLimit, valStr)
vStr := strconv.FormatUint(uint64(config.GetGlobalConfig().Performance.TxnTotalSizeLimit), 10)
importConfigOption(s, "txn-total-size-limit", variable.TiDBTxnTotalSizeLimit, vStr)
Alkaagr81 marked this conversation as resolved.
Show resolved Hide resolved
}

func writeOOMAction(s Session) {
comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+"
Expand Down
2 changes: 1 addition & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,7 @@ func (s *session) doCommitWithRetry(ctx context.Context) error {
zap.String("txn", s.txn.GoString()))
// Transactions will retry 2 ~ commitRetryLimit times.
// We make larger transactions retry less times to prevent cluster resource outage.
txnSizeRate := float64(txnSize) / float64(kv.TxnTotalSizeLimit)
txnSizeRate := float64(txnSize) / float64(kv.TxnTotalSizeLimit.Load())
maxRetryCount := commitRetryLimit - int64(float64(commitRetryLimit-1)*txnSizeRate)
err = s.retry(ctx, uint(maxRetryCount))
} else if !errIsNoisy(err) {
Expand Down
12 changes: 12 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,18 @@ var defaultSysVars = []*SysVar{
}},

/* The system variables below have GLOBAL scope */
{Scope: ScopeGlobal, Name: TiDBTxnEntrySizeLimit, Value: strconv.FormatInt(DefTiDBTxnEntrySizeLimit, 10), Type: TypeInt, MinValue: 1 << 10, MaxValue: 120 << 20, GetGlobal: func(sv *SessionVars) (string, error) {
return strconv.FormatUint(kv.TxnEntrySizeLimit.Load(), 10), nil
}, SetGlobal: func(s *SessionVars, val string) error {
kv.TxnEntrySizeLimit.Store(uint64(TidbOptInt64(val, DefTiDBTxnEntrySizeLimit)))
return nil
}},
{Scope: ScopeGlobal, Name: TiDBTxnTotalSizeLimit, Value: strconv.FormatInt(DefTiDBTxnTotalSizeLimit, 10), Type: TypeInt, MinValue: 1 << 10, MaxValue: 1 << 40, GetGlobal: func(sv *SessionVars) (string, error) {
return strconv.FormatUint(kv.TxnTotalSizeLimit.Load(), 10), nil
}, SetGlobal: func(s *SessionVars, val string) error {
kv.TxnTotalSizeLimit.Store(uint64(TidbOptInt64(val, DefTiDBTxnTotalSizeLimit)))
return nil
}},
{Scope: ScopeGlobal, Name: MaxPreparedStmtCount, Value: strconv.FormatInt(DefMaxPreparedStmtCount, 10), Type: TypeInt, MinValue: -1, MaxValue: 1048576},
{Scope: ScopeGlobal, Name: InitConnect, Value: ""},
/* TiDB specific variables */
Expand Down
50 changes: 50 additions & 0 deletions sessionctx/variable/sysvar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,6 +961,56 @@ func TestTiDBQueryLogMaxLen(t *testing.T) {
require.NoError(t, err)
}

func TestTiDBTxnTotalSizeLimit(t *testing.T) {
sv := GetSysVar(TiDBTxnTotalSizeLimit)
vars := NewSessionVars()

newVal := 10485760
val, err := sv.Validate(vars, fmt.Sprintf("%d", newVal), ScopeGlobal)
require.Equal(t, "10485760", val)
require.NoError(t, err)

// out of range
newVal = 1099511627777
expected := 1099511627776
val, err = sv.Validate(vars, fmt.Sprintf("%d", newVal), ScopeGlobal)
// expected to truncate
require.Equal(t, fmt.Sprintf("%d", expected), val)
require.NoError(t, err)

// min value out of range
newVal = 1022
expected = 1024
val, err = sv.Validate(vars, fmt.Sprintf("%d", newVal), ScopeGlobal)
// expected to set to min value
require.Equal(t, fmt.Sprintf("%d", expected), val)
require.NoError(t, err)
}
func TestTiDBTxnEntrySizeLimit(t *testing.T) {
sv := GetSysVar(TiDBTxnEntrySizeLimit)
vars := NewSessionVars()

newVal := 10485760
val, err := sv.Validate(vars, fmt.Sprintf("%d", newVal), ScopeGlobal)
require.Equal(t, "10485760", val)
require.NoError(t, err)

// Max Value out of range
newVal = 125829121
expected := 125829120
val, err = sv.Validate(vars, fmt.Sprintf("%d", newVal), ScopeGlobal)
// expected to truncate
require.Equal(t, fmt.Sprintf("%d", expected), val)
require.NoError(t, err)

// min value out of range
newVal = 1022
expected = 1024
val, err = sv.Validate(vars, fmt.Sprintf("%d", newVal), ScopeGlobal)
// expected to set to min value
require.Equal(t, fmt.Sprintf("%d", expected), val)
require.NoError(t, err)
}
func TestTiDBCommitterConcurrency(t *testing.T) {
sv := GetSysVar(TiDBCommitterConcurrency)
vars := NewSessionVars()
Expand Down
8 changes: 8 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,10 @@ const (
// TiDB vars that have only global scope

const (
// TiDBTxnEntrySizeLimit is the size limit of a single row of data in TiDB
TiDBTxnEntrySizeLimit = "tidb_txn_entry_size_limit"
// TiDBTxnTotalSizeLimit is the size limit of a single transaction
TiDBTxnTotalSizeLimit = "tidb_txn_total_size_limit"
// TiDBGCEnable turns garbage collection on or OFF
TiDBGCEnable = "tidb_gc_enable"
// TiDBGCRunInterval sets the interval that GC runs
Expand Down Expand Up @@ -857,6 +861,8 @@ const (
DefTiDBStatsCacheMemQuota = 0
MaxTiDBStatsCacheMemQuota = 1024 * 1024 * 1024 * 1024 // 1TB
DefTiDBQueryLogMaxLen = 4096
DefTiDBTxnTotalSizeLimit = 100 * 1024 * 1024
DefTiDBTxnEntrySizeLimit = 6 * 1024 * 1024
DefTiDBCommitterConcurrency = 128
DefTiDBBatchDMLIgnoreError = false
DefTiDBMemQuotaAnalyze = -1
Expand Down Expand Up @@ -897,6 +903,8 @@ var (
StatsLoadPseudoTimeout = atomic.NewBool(DefTiDBStatsLoadPseudoTimeout)
MemQuotaBindingCache = atomic.NewInt64(DefTiDBMemQuotaBindingCache)
GCMaxWaitTime = atomic.NewInt64(DefTiDBGCMaxWaitTime)
TxnTotalSizeLimit uint64 = DefTiDBTxnTotalSizeLimit
TxnEntrySizeLimit uint64 = DefTiDBTxnEntrySizeLimit
StatsCacheMemQuota = atomic.NewInt64(DefTiDBStatsCacheMemQuota)
)

Expand Down
5 changes: 2 additions & 3 deletions store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package txn

import (
"context"
"sync/atomic"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -48,8 +47,8 @@ type tikvTxn struct {
func NewTiKVTxn(txn *tikv.KVTxn) kv.Transaction {
txn.SetKVFilter(TiDBKVFilter{})

entryLimit := atomic.LoadUint64(&kv.TxnEntrySizeLimit)
totalLimit := atomic.LoadUint64(&kv.TxnTotalSizeLimit)
entryLimit := kv.TxnEntrySizeLimit.Load()
totalLimit := kv.TxnTotalSizeLimit.Load()
txn.GetUnionStore().SetEntrySizeLimit(entryLimit, totalLimit)

return &tikvTxn{txn, make(map[int64]*model.TableInfo), nil}
Expand Down
Loading