Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl: Implement scan and delete task for TTL #39481

Merged
merged 32 commits into from
Dec 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
4f04c73
ttl: implement scan task
lcwangchao Nov 29, 2022
c646f59
update
lcwangchao Nov 29, 2022
784eb2e
update
lcwangchao Nov 29, 2022
b2abd9d
update
lcwangchao Nov 30, 2022
4de83eb
update
lcwangchao Nov 30, 2022
4141cff
Merge branch 'master' into ttl_scan
lcwangchao Dec 1, 2022
f31dbd5
update
lcwangchao Dec 1, 2022
d264384
update
lcwangchao Dec 1, 2022
5cffd05
update
lcwangchao Dec 1, 2022
32d669b
ttl: implement deleteWorker
lcwangchao Dec 1, 2022
8c70348
Merge branch 'master' into ttl_scan
lcwangchao Dec 1, 2022
ce17b01
update
lcwangchao Dec 1, 2022
dcbb731
update
lcwangchao Dec 1, 2022
c22e38a
update
lcwangchao Dec 1, 2022
8866d9b
remove blank line
lcwangchao Dec 1, 2022
a92cad3
update
lcwangchao Dec 2, 2022
8e3a9ad
Merge branch 'master' into ttl_scan
lcwangchao Dec 2, 2022
ea62729
update bazel
lcwangchao Dec 2, 2022
108a3fe
update
lcwangchao Dec 2, 2022
f22f506
update
lcwangchao Dec 2, 2022
bb7771f
update
lcwangchao Dec 2, 2022
e58f5b0
Merge branch 'master' into ttl_scan
lcwangchao Dec 2, 2022
4ec1626
update
lcwangchao Dec 2, 2022
a762ceb
Merge branch 'ttl_scan' of github.com:lcwangchao/tidb into ttl_scan
lcwangchao Dec 2, 2022
56027a6
Merge branch 'master' into ttl_scan
lcwangchao Dec 2, 2022
1cfff23
update
lcwangchao Dec 2, 2022
d5199e6
update
lcwangchao Dec 2, 2022
132cc14
update
lcwangchao Dec 5, 2022
03b8b5f
Merge branch 'master' into ttl_scan
lcwangchao Dec 5, 2022
5a37bb9
Merge branch 'master' into ttl_scan
ti-chi-bot Dec 5, 2022
d8b77d5
Merge branch 'master' into ttl_scan
ti-chi-bot Dec 5, 2022
bd795fb
Merge branch 'master' into ttl_scan
ti-chi-bot Dec 5, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2127,6 +2127,45 @@ var defaultSysVars = []*SysVar{
s.EnableReuseCheck = TiDBOptOn(val)
return nil
}},
{Scope: ScopeGlobal, Name: TiDBTTLJobEnable, Value: BoolToOnOff(DefTiDBTTLJobEnable), Type: TypeBool, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
EnableTTLJob.Store(TiDBOptOn(s))
return nil
}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
return BoolToOnOff(EnableTTLJob.Load()), nil
}},
{Scope: ScopeGlobal, Name: TiDBTTLScanBatchSize, Value: strconv.Itoa(DefTiDBTTLScanBatchSize), Type: TypeInt, MinValue: DefTiDBTTLScanBatchMinSize, MaxValue: DefTiDBTTLScanBatchMaxSize, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
val, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return err
}
TTLScanBatchSize.Store(val)
return nil
}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
val := TTLScanBatchSize.Load()
return strconv.FormatInt(val, 10), nil
}},
{Scope: ScopeGlobal, Name: TiDBTTLDeleteBatchSize, Value: strconv.Itoa(DefTiDBTTLDeleteBatchSize), Type: TypeInt, MinValue: DefTiDBTTLDeleteBatchMinSize, MaxValue: DefTiDBTTLDeleteBatchMaxSize, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
val, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return err
}
TTLDeleteBatchSize.Store(val)
return nil
}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
val := TTLDeleteBatchSize.Load()
return strconv.FormatInt(val, 10), nil
}},
{Scope: ScopeGlobal, Name: TiDBTTLDeleteRateLimit, Value: strconv.Itoa(DefTiDBTTLDeleteRateLimit), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt64, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
val, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return err
}
TTLDeleteRateLimit.Store(val)
return nil
}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
val := TTLDeleteRateLimit.Load()
return strconv.FormatInt(val, 10), nil
}},
{
Scope: ScopeGlobal | ScopeSession, Name: TiDBStoreBatchSize, Value: strconv.FormatInt(DefTiDBStoreBatchSize, 10),
Type: TypeInt, MinValue: 0, MaxValue: 25000, SetSession: func(s *SessionVars, val string) error {
Expand Down
20 changes: 20 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,14 @@ const (
TiDBGOGCTunerThreshold = "tidb_gogc_tuner_threshold"
// TiDBExternalTS is the ts to read through when the `TiDBEnableExternalTsRead` is on
TiDBExternalTS = "tidb_external_ts"
// TiDBTTLJobEnable is used to enable/disable scheduling ttl job
TiDBTTLJobEnable = "tidb_ttl_job_enable"
// TiDBTTLScanBatchSize is used to control the batch size in the SELECT statement for TTL jobs
TiDBTTLScanBatchSize = "tidb_ttl_scan_batch_size"
// TiDBTTLDeleteBatchSize is used to control the batch size in the DELETE statement for TTL jobs
TiDBTTLDeleteBatchSize = "tidb_ttl_delete_batch_size"
// TiDBTTLDeleteRateLimit is used to control the delete rate limit for TTL jobs in each node
TiDBTTLDeleteRateLimit = "tidb_ttl_delete_rate_limit"
// PasswordReuseHistory limit a few passwords to reuse.
PasswordReuseHistory = "password_history"
// PasswordReuseTime limit how long passwords can be reused.
Expand Down Expand Up @@ -1115,6 +1123,14 @@ const (
DefTiDBUseAlloc = false
DefTiDBEnablePlanReplayerCapture = false
DefTiDBIndexMergeIntersectionConcurrency = ConcurrencyUnset
DefTiDBTTLJobEnable = true
DefTiDBTTLScanBatchSize = 500
DefTiDBTTLScanBatchMaxSize = 10240
DefTiDBTTLScanBatchMinSize = 1
DefTiDBTTLDeleteBatchSize = 500
DefTiDBTTLDeleteBatchMaxSize = 10240
DefTiDBTTLDeleteBatchMinSize = 1
DefTiDBTTLDeleteRateLimit = 0
DefPasswordReuseHistory = 0
DefPasswordReuseTime = 0
DefTiDBStoreBatchSize = 0
Expand Down Expand Up @@ -1180,6 +1196,10 @@ var (
PasswordValidationMixedCaseCount = atomic.NewInt32(1)
PasswordValidtaionNumberCount = atomic.NewInt32(1)
PasswordValidationSpecialCharCount = atomic.NewInt32(1)
EnableTTLJob = atomic.NewBool(DefTiDBTTLJobEnable)
TTLScanBatchSize = atomic.NewInt64(DefTiDBTTLScanBatchSize)
TTLDeleteBatchSize = atomic.NewInt64(DefTiDBTTLDeleteBatchSize)
TTLDeleteRateLimit = atomic.NewInt64(DefTiDBTTLDeleteRateLimit)
PasswordHistory = atomic.NewInt64(DefPasswordReuseHistory)
PasswordReuseInterval = atomic.NewInt64(DefPasswordReuseTime)
IsSandBoxModeEnabled = atomic.NewBool(false)
Expand Down
7 changes: 7 additions & 0 deletions ttl/cache/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func getTableKeyColumns(tbl *model.TableInfo) ([]*model.ColumnInfo, []*types.Fie

// PhysicalTable is used to provide some information for a physical table in TTL job
type PhysicalTable struct {
// ID is the physical ID of the table
ID int64
// Schema is the database name of the table
Schema model.CIStr
*model.TableInfo
Expand Down Expand Up @@ -92,11 +94,13 @@ func NewPhysicalTable(schema model.CIStr, tbl *model.TableInfo, partition model.
return nil, err
}

var physicalID int64
var partitionDef *model.PartitionDefinition
if tbl.Partition == nil {
if partition.L != "" {
return nil, errors.Errorf("table '%s.%s' is not a partitioned table", schema, tbl.Name)
}
physicalID = tbl.ID
} else {
if partition.L == "" {
return nil, errors.Errorf("partition name is required, table '%s.%s' is a partitioned table", schema, tbl.Name)
Expand All @@ -112,9 +116,12 @@ func NewPhysicalTable(schema model.CIStr, tbl *model.TableInfo, partition model.
if partitionDef == nil {
return nil, errors.Errorf("partition '%s' is not found in ttl table '%s.%s'", partition.O, schema, tbl.Name)
}

physicalID = partitionDef.ID
}

return &PhysicalTable{
ID: physicalID,
Schema: schema,
TableInfo: tbl,
Partition: partition,
Expand Down
4 changes: 3 additions & 1 deletion ttl/cache/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestNewTTLTable(t *testing.T) {
physicalTbls = append(physicalTbls, ttlTbl)
} else {
for _, partition := range tblInfo.Partition.Definitions {
ttlTbl, err := cache.NewPhysicalTable(model.NewCIStr(c.db), tblInfo, model.NewCIStr(partition.Name.O))
ttlTbl, err := cache.NewPhysicalTable(model.NewCIStr(c.db), tblInfo, partition.Name)
if c.timeCol == "" {
require.Error(t, err)
continue
Expand All @@ -131,10 +131,12 @@ func TestNewTTLTable(t *testing.T) {
require.Same(t, timeColumn, ttlTbl.TimeColumn)

if tblInfo.Partition == nil {
require.Equal(t, ttlTbl.TableInfo.ID, ttlTbl.ID)
require.Equal(t, "", ttlTbl.Partition.L)
require.Nil(t, ttlTbl.PartitionDef)
} else {
def := tblInfo.Partition.Definitions[i]
require.Equal(t, def.ID, ttlTbl.ID)
require.Equal(t, def.Name.L, ttlTbl.Partition.L)
require.Equal(t, def, *(ttlTbl.PartitionDef))
}
Expand Down
5 changes: 4 additions & 1 deletion ttl/session/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//kv",
"//parser/terror",
"//sessionctx",
"//sessionctx/variable",
"//sessiontxn",
"//util/chunk",
"//util/sqlexec",
Expand All @@ -22,10 +23,12 @@ go_test(
srcs = [
"main_test.go",
"session_test.go",
"sysvar_test.go",
],
embed = [":session"],
flaky = True,
deps = [
":session",
"//sessionctx/variable",
"//testkit",
"//testkit/testsetup",
"@com_github_pingcap_errors//:errors",
Expand Down
24 changes: 24 additions & 0 deletions ttl/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
Expand All @@ -36,6 +37,8 @@ type Session interface {
ExecuteSQL(ctx context.Context, sql string, args ...interface{}) ([]chunk.Row, error)
// RunInTxn executes the specified function in a txn
RunInTxn(ctx context.Context, fn func() error) (err error)
// ResetWithGlobalTimeZone resets the session time zone to global time zone
ResetWithGlobalTimeZone(ctx context.Context) error
// Close closes the session
Close()
}
Expand Down Expand Up @@ -112,6 +115,27 @@ func (s *session) RunInTxn(ctx context.Context, fn func() error) (err error) {
return err
}

// ResetWithGlobalTimeZone resets the session time zone to global time zone
func (s *session) ResetWithGlobalTimeZone(ctx context.Context) error {
sessVar := s.GetSessionVars()
globalTZ, err := sessVar.GetGlobalSystemVar(ctx, variable.TimeZone)
if err != nil {
return err
}

tz, err := sessVar.GetSessionOrGlobalSystemVar(ctx, variable.TimeZone)
if err != nil {
return err
}

if globalTZ == tz {
return nil
}

_, err = s.ExecuteSQL(ctx, "SET @@time_zone=@@global.time_zone")
return err
}

// Close closes the session
func (s *session) Close() {
if s.closeFn != nil {
Expand Down
17 changes: 15 additions & 2 deletions ttl/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package session
package session_test

import (
"context"
"testing"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/ttl/session"
"github.com/stretchr/testify/require"
)

Expand All @@ -28,7 +29,7 @@ func TestSessionRunInTxn(t *testing.T) {
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(id int primary key, v int)")
se := NewSession(tk.Session(), tk.Session(), nil)
se := session.NewSession(tk.Session(), tk.Session(), nil)
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")

Expand All @@ -50,3 +51,15 @@ func TestSessionRunInTxn(t *testing.T) {
}))
tk2.MustQuery("select * from t order by id asc").Check(testkit.Rows("1 10", "3 30"))
}

func TestSessionResetTimeZone(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@global.time_zone='UTC'")
tk.MustExec("set @@time_zone='Asia/Shanghai'")

se := session.NewSession(tk.Session(), tk.Session(), nil)
tk.MustQuery("select @@time_zone").Check(testkit.Rows("Asia/Shanghai"))
require.NoError(t, se.ResetWithGlobalTimeZone(context.TODO()))
tk.MustQuery("select @@time_zone").Check(testkit.Rows("UTC"))
}
125 changes: 125 additions & 0 deletions ttl/session/sysvar_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package session_test

import (
"fmt"
"strconv"
"testing"

"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
)

func TestSysVarTTLJobEnable(t *testing.T) {
origEnableDDL := variable.EnableTTLJob.Load()
defer func() {
variable.EnableTTLJob.Store(origEnableDDL)
}()

store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@global.tidb_ttl_job_enable=0")
require.False(t, variable.EnableTTLJob.Load())
tk.MustQuery("select @@global.tidb_ttl_job_enable").Check(testkit.Rows("0"))
tk.MustQuery("select @@tidb_ttl_job_enable").Check(testkit.Rows("0"))

tk.MustExec("set @@global.tidb_ttl_job_enable=1")
require.True(t, variable.EnableTTLJob.Load())
tk.MustQuery("select @@global.tidb_ttl_job_enable").Check(testkit.Rows("1"))
tk.MustQuery("select @@tidb_ttl_job_enable").Check(testkit.Rows("1"))

tk.MustExec("set @@global.tidb_ttl_job_enable=0")
require.False(t, variable.EnableTTLJob.Load())
tk.MustQuery("select @@global.tidb_ttl_job_enable").Check(testkit.Rows("0"))
tk.MustQuery("select @@tidb_ttl_job_enable").Check(testkit.Rows("0"))
}

func TestSysVarTTLScanBatchSize(t *testing.T) {
origScanBatchSize := variable.TTLScanBatchSize.Load()
defer func() {
variable.TTLScanBatchSize.Store(origScanBatchSize)
}()

store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@global.tidb_ttl_scan_batch_size=789")
require.Equal(t, int64(789), variable.TTLScanBatchSize.Load())
tk.MustQuery("select @@global.tidb_ttl_scan_batch_size").Check(testkit.Rows("789"))
tk.MustQuery("select @@tidb_ttl_scan_batch_size").Check(testkit.Rows("789"))

tk.MustExec("set @@global.tidb_ttl_scan_batch_size=0")
require.Equal(t, int64(1), variable.TTLScanBatchSize.Load())
tk.MustQuery("select @@global.tidb_ttl_scan_batch_size").Check(testkit.Rows("1"))
tk.MustQuery("select @@tidb_ttl_scan_batch_size").Check(testkit.Rows("1"))

maxVal := int64(variable.DefTiDBTTLScanBatchMaxSize)
tk.MustExec(fmt.Sprintf("set @@global.tidb_ttl_scan_batch_size=%d", maxVal+1))
require.Equal(t, maxVal, variable.TTLScanBatchSize.Load())
tk.MustQuery("select @@global.tidb_ttl_scan_batch_size").Check(testkit.Rows(strconv.FormatInt(maxVal, 10)))
tk.MustQuery("select @@tidb_ttl_scan_batch_size").Check(testkit.Rows(strconv.FormatInt(maxVal, 10)))
}

func TestSysVarTTLScanDeleteBatchSize(t *testing.T) {
origScanBatchSize := variable.TTLScanBatchSize.Load()
defer func() {
variable.TTLScanBatchSize.Store(origScanBatchSize)
}()

store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@global.tidb_ttl_delete_batch_size=789")
require.Equal(t, int64(789), variable.TTLDeleteBatchSize.Load())
tk.MustQuery("select @@global.tidb_ttl_delete_batch_size").Check(testkit.Rows("789"))
tk.MustQuery("select @@tidb_ttl_delete_batch_size").Check(testkit.Rows("789"))

tk.MustExec("set @@global.tidb_ttl_delete_batch_size=0")
require.Equal(t, int64(1), variable.TTLDeleteBatchSize.Load())
tk.MustQuery("select @@global.tidb_ttl_delete_batch_size").Check(testkit.Rows("1"))
tk.MustQuery("select @@tidb_ttl_delete_batch_size").Check(testkit.Rows("1"))

maxVal := int64(variable.DefTiDBTTLDeleteBatchMaxSize)
tk.MustExec(fmt.Sprintf("set @@global.tidb_ttl_delete_batch_size=%d", maxVal+1))
require.Equal(t, maxVal, variable.TTLDeleteBatchSize.Load())
tk.MustQuery("select @@global.tidb_ttl_delete_batch_size").Check(testkit.Rows(strconv.FormatInt(maxVal, 10)))
tk.MustQuery("select @@tidb_ttl_delete_batch_size").Check(testkit.Rows(strconv.FormatInt(maxVal, 10)))
}

func TestSysVarTTLScanDeleteLimit(t *testing.T) {
origDeleteLimit := variable.TTLDeleteRateLimit.Load()
defer func() {
variable.TTLDeleteRateLimit.Store(origDeleteLimit)
}()

store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustQuery("select @@global.tidb_ttl_delete_rate_limit").Check(testkit.Rows("0"))

tk.MustExec("set @@global.tidb_ttl_delete_rate_limit=100000")
require.Equal(t, int64(100000), variable.TTLDeleteRateLimit.Load())
tk.MustQuery("select @@global.tidb_ttl_delete_rate_limit").Check(testkit.Rows("100000"))
tk.MustQuery("select @@tidb_ttl_delete_rate_limit").Check(testkit.Rows("100000"))

tk.MustExec("set @@global.tidb_ttl_delete_rate_limit=0")
require.Equal(t, int64(0), variable.TTLDeleteRateLimit.Load())
tk.MustQuery("select @@global.tidb_ttl_delete_rate_limit").Check(testkit.Rows("0"))
tk.MustQuery("select @@tidb_ttl_delete_rate_limit").Check(testkit.Rows("0"))

tk.MustExec("set @@global.tidb_ttl_delete_rate_limit=-1")
require.Equal(t, int64(0), variable.TTLDeleteRateLimit.Load())
tk.MustQuery("select @@global.tidb_ttl_delete_rate_limit").Check(testkit.Rows("0"))
tk.MustQuery("select @@tidb_ttl_delete_rate_limit").Check(testkit.Rows("0"))
}
Loading