Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl: implement ttl job schedule framework #39472

Merged
merged 2 commits into from
Dec 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ddl/ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func onTTLInfoChange(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err er

if ttlInfo != nil {
// if the TTL_ENABLE is not set explicitly, use the original value
if ttlInfoEnable == nil {
if ttlInfoEnable == nil && tblInfo.TTLInfo != nil {
ttlInfo.Enable = tblInfo.TTLInfo.Enable
}
tblInfo.TTLInfo = ttlInfo
Expand Down
1 change: 1 addition & 0 deletions domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ go_library(
"//statistics",
"//statistics/handle",
"//telemetry",
"//ttl/ttlworker",
"//types",
"//util",
"//util/chunk",
Expand Down
29 changes: 29 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/telemetry"
"github.com/pingcap/tidb/ttl/ttlworker"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
Expand Down Expand Up @@ -121,6 +122,7 @@ type Domain struct {
expiredTimeStamp4PC types.Time
logBackupAdvancer *daemon.OwnerDaemon
historicalStatsWorker *HistoricalStatsWorker
ttlJobManager *ttlworker.JobManager

serverID uint64
serverIDSession *concurrency.Session
Expand Down Expand Up @@ -1059,6 +1061,10 @@ func (do *Domain) Init(
return err
}

do.wg.Run(func() {
YangKeao marked this conversation as resolved.
Show resolved Hide resolved
do.runTTLJobManager(ctx)
})

return nil
}

Expand Down Expand Up @@ -2446,6 +2452,29 @@ func (do *Domain) serverIDKeeper() {
}
}

func (do *Domain) runTTLJobManager(ctx context.Context) {
ttlJobManager := ttlworker.NewJobManager(do.ddl.GetID(), do.sysSessionPool, do.store)
ttlJobManager.Start()
do.ttlJobManager = ttlJobManager

// TODO: read the worker count from `do.sysVarCache` and resize the workers
ttlworker.ScanWorkersCount.Store(4)
ttlworker.DeleteWorkerCount.Store(4)

<-do.exit

ttlJobManager.Stop()
err := ttlJobManager.WaitStopped(ctx, 30*time.Second)
if err != nil {
logutil.BgLogger().Warn("fail to wait until the ttl job manager stop", zap.Error(err))
}
}

// TTLJobManager returns the ttl job manager on this domain
func (do *Domain) TTLJobManager() *ttlworker.JobManager {
return do.ttlJobManager
}

func init() {
initByLDFlagsForGlobalKill()
telemetry.GetDomainInfoSchema = func(ctx sessionctx.Context) infoschema.InfoSchema {
Expand Down
1 change: 0 additions & 1 deletion ttl/cache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ go_library(
importpath = "github.com/pingcap/tidb/ttl/cache",
visibility = ["//visibility:public"],
deps = [
"//infoschema",
"//kv",
"//parser/ast",
"//parser/model",
Expand Down
4 changes: 4 additions & 0 deletions ttl/cache/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,7 @@ func (bc *baseCache) ShouldUpdate() bool {
func (bc *baseCache) SetInterval(interval time.Duration) {
bc.interval = interval
}

func (bc *baseCache) GetInterval() time.Duration {
return bc.interval
}
20 changes: 5 additions & 15 deletions ttl/cache/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ package cache
import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/ttl/session"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)
Expand All @@ -41,26 +39,18 @@ func NewInfoSchemaCache(updateInterval time.Duration) *InfoSchemaCache {
}

// Update updates the info schema cache
func (isc *InfoSchemaCache) Update(sctx sessionctx.Context) error {
is, ok := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
if !ok {
return errors.New("fail to get domain info schema from session")
}

ext, ok := is.(*infoschema.SessionExtendedInfoSchema)
if !ok {
return errors.New("fail to get extended info schema")
}
func (isc *InfoSchemaCache) Update(se session.Session) error {
is := se.SessionInfoSchema()

if isc.schemaVer == ext.SchemaMetaVersion() {
if isc.schemaVer == is.SchemaMetaVersion() {
return nil
}

newTables := make(map[int64]*PhysicalTable, len(isc.Tables))
for _, db := range is.AllSchemas() {
for _, tbl := range is.SchemaTables(db.Name) {
tblInfo := tbl.Meta()
if tblInfo.TTLInfo == nil || tblInfo.State != model.StatePublic {
if tblInfo.TTLInfo == nil || !tblInfo.TTLInfo.Enable || tblInfo.State != model.StatePublic {
continue
}

Expand Down
8 changes: 5 additions & 3 deletions ttl/cache/infoschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/server"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/ttl/cache"
"github.com/pingcap/tidb/ttl/session"
"github.com/stretchr/testify/assert"
)

Expand All @@ -36,18 +37,19 @@ func TestInfoSchemaCache(t *testing.T) {
conn := server.CreateMockConn(t, sv)
sctx := conn.Context().Session
tk := testkit.NewTestKitWithSession(t, store, sctx)
se := session.NewSession(sctx, sctx, func() {})

isc := cache.NewInfoSchemaCache(time.Hour)

// test should update
assert.True(t, isc.ShouldUpdate())
assert.NoError(t, isc.Update(sctx))
assert.NoError(t, isc.Update(se))
assert.False(t, isc.ShouldUpdate())

// test new tables are synced
assert.Equal(t, 0, len(isc.Tables))
tk.MustExec("create table test.t(created_at datetime) ttl = created_at + INTERVAL 5 YEAR")
assert.NoError(t, isc.Update(sctx))
assert.NoError(t, isc.Update(se))
assert.Equal(t, 1, len(isc.Tables))
for _, table := range isc.Tables {
assert.Equal(t, "t", table.TableInfo.Name.L)
Expand All @@ -62,7 +64,7 @@ func TestInfoSchemaCache(t *testing.T) {
partition p1 values less than (2000)
)
`)
assert.NoError(t, isc.Update(sctx))
assert.NoError(t, isc.Update(se))
assert.Equal(t, 2, len(isc.Tables))
partitions := []string{}
for id, table := range isc.Tables {
Expand Down
2 changes: 1 addition & 1 deletion ttl/cache/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (t *PhysicalTable) ValidateKey(key []types.Datum) error {

// EvalExpireTime returns the expired time
func (t *PhysicalTable) EvalExpireTime(ctx context.Context, se session.Session, now time.Time) (expire time.Time, err error) {
tz := se.GetSessionVars().TimeZone
tz := se.GetSessionVars().Location()

expireExpr := t.TTLInfo.IntervalExprStr
unit := ast.TimeUnitType(t.TTLInfo.IntervalTimeUnit)
Expand Down
17 changes: 12 additions & 5 deletions ttl/cache/ttlstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package cache

import (
"context"
"fmt"
"time"

"github.com/pingcap/tidb/sessionctx"
Expand All @@ -35,12 +36,17 @@ const (
JobStatusCancelling = "cancelling"
// JobStatusCancelled means this job has been canceled successfully
JobStatusCancelled = "cancelled"
// JobStatusError means this job is in error status
JobStatusError = "error"
// JobStatusTimeout means this job has timeout
JobStatusTimeout = "timeout"
)

const selectFromTTLTableStatus = "SELECT table_id,parent_table_id,table_statistics,last_job_id,last_job_start_time,last_job_finish_time,last_job_ttl_expire,last_job_summary,current_job_id,current_job_owner_id,current_job_owner_addr,current_job_owner_hb_time,current_job_start_time,current_job_ttl_expire,current_job_state,current_job_status,current_job_status_update_time FROM mysql.tidb_ttl_table_status"

// SelectFromTTLTableStatusWithID returns an SQL statement to get the table status from table id
func SelectFromTTLTableStatusWithID(tableID int64) string {
return selectFromTTLTableStatus + fmt.Sprintf(" WHERE table_id = %d", tableID)
}

// TableStatus contains the corresponding information in the system table `mysql.tidb_ttl_table_status`
type TableStatus struct {
TableID int64
Expand Down Expand Up @@ -89,7 +95,7 @@ func (tsc *TableStatusCache) Update(ctx context.Context, se session.Session) err

newTables := make(map[int64]*TableStatus, len(rows))
for _, row := range rows {
status, err := rowToTableStatus(se, row)
status, err := RowToTableStatus(se, row)
if err != nil {
return err
}
Expand All @@ -101,9 +107,10 @@ func (tsc *TableStatusCache) Update(ctx context.Context, se session.Session) err
return nil
}

func rowToTableStatus(sctx sessionctx.Context, row chunk.Row) (*TableStatus, error) {
// RowToTableStatus converts a row to table status
func RowToTableStatus(sctx sessionctx.Context, row chunk.Row) (*TableStatus, error) {
var err error
timeZone := sctx.GetSessionVars().TimeZone
timeZone := sctx.GetSessionVars().Location()

status := &TableStatus{
TableID: row.GetInt64(0),
Expand Down
8 changes: 8 additions & 0 deletions ttl/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package session

import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/infoschema"
Expand All @@ -41,6 +42,8 @@ type Session interface {
ResetWithGlobalTimeZone(ctx context.Context) error
// Close closes the session
Close()
// Now returns the current time in location specified by session var
Now() time.Time
}

type session struct {
Expand Down Expand Up @@ -145,3 +148,8 @@ func (s *session) Close() {
s.closeFn = nil
}
}

// Now returns the current time in the location of time_zone session var
func (s *session) Now() time.Time {
return time.Now().In(s.Context.GetSessionVars().Location())
}
11 changes: 11 additions & 0 deletions ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "ttlworker",
srcs = [
"config.go",
"del.go",
"job.go",
"job_manager.go",
"scan.go",
"session.go",
"worker.go",
],
importpath = "github.com/pingcap/tidb/ttl/ttlworker",
visibility = ["//visibility:public"],
deps = [
"//kv",
"//parser/terror",
"//sessionctx",
"//sessionctx/variable",
Expand All @@ -22,9 +26,12 @@ go_library(
"//util/chunk",
"//util/logutil",
"//util/sqlexec",
"//util/timeutil",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@org_golang_x_time//rate",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_multierr//:multierr",
"@org_uber_go_zap//:zap",
],
)
Expand All @@ -33,10 +40,13 @@ go_test(
name = "ttlworker_test",
srcs = [
"del_test.go",
"job_manager_test.go",
"job_test.go",
"scan_test.go",
"session_test.go",
],
YangKeao marked this conversation as resolved.
Show resolved Hide resolved
embed = [":ttlworker"],
flaky = True,
deps = [
"//infoschema",
"//parser/ast",
Expand All @@ -49,6 +59,7 @@ go_test(
"//util/chunk",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_x_time//rate",
],
Expand Down
50 changes: 50 additions & 0 deletions ttl/ttlworker/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ttlworker

import (
"time"

"go.uber.org/atomic"
)

// TODO: the following functions should be put in the variable pkg to avoid cyclic dependency after adding variables for the TTL
// some of them are only used in test

const jobManagerLoopTickerInterval = 10 * time.Second

const updateInfoSchemaCacheInterval = time.Minute
const updateTTLTableStatusCacheInterval = 10 * time.Minute

const ttlInternalSQLTimeout = 30 * time.Second
const ttlJobTimeout = 6 * time.Hour

// TODO: add this variable to the sysvar
const ttlJobInterval = time.Hour
YangKeao marked this conversation as resolved.
Show resolved Hide resolved

// TODO: add these variables to the sysvar
var ttlJobScheduleWindowStartTime, _ = time.Parse(timeFormat, "2006-01-02 00:00:00")
var ttlJobScheduleWindowEndTime, _ = time.Parse(timeFormat, "2006-01-02 23:59:00")

// TODO: migrate these two count to sysvar

// ScanWorkersCount defines the count of scan worker
var ScanWorkersCount = atomic.NewUint64(0)

// DeleteWorkerCount defines the count of delete worker
var DeleteWorkerCount = atomic.NewUint64(0)

const resizeWorkersInterval = 30 * time.Second
const splitScanCount = 64
2 changes: 1 addition & 1 deletion ttl/ttlworker/del_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {

for _, c := range cases {
invokes = 0
retryRows := c.task.doDelete(context.TODO(), s)
retryRows := c.task.doDelete(context.Background(), s)
require.Equal(t, 4, invokes)
if c.retryRows == nil {
require.Nil(t, retryRows)
Expand Down
Loading