Skip to content

Commit c39331d

Browse files
authored
Merge branch 'release-6.5' into cherry-pick-39878-to-release-6.5
2 parents a57de99 + 4dfc339 commit c39331d

8 files changed

+154
-18
lines changed

ttl/cache/infoschema_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func TestInfoSchemaCache(t *testing.T) {
3434
conn := server.CreateMockConn(t, sv)
3535
sctx := conn.Context().Session
3636
tk := testkit.NewTestKitWithSession(t, store, sctx)
37-
se := session.NewSession(sctx, sctx, func() {})
37+
se := session.NewSession(sctx, sctx, func(_ session.Session) {})
3838

3939
isc := cache.NewInfoSchemaCache(time.Hour)
4040

ttl/cache/ttlstatus_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func TestTTLStatusCache(t *testing.T) {
3636
conn := server.CreateMockConn(t, sv)
3737
sctx := conn.Context().Session
3838
tk := testkit.NewTestKitWithSession(t, store, sctx)
39-
ttlSession := session.NewSession(sctx, tk.Session(), func() {})
39+
ttlSession := session.NewSession(sctx, tk.Session(), func(_ session.Session) {})
4040

4141
isc := cache.NewTableStatusCache(time.Hour)
4242

ttl/session/session.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,11 @@ type Session interface {
5050
type session struct {
5151
sessionctx.Context
5252
sqlExec sqlexec.SQLExecutor
53-
closeFn func()
53+
closeFn func(Session)
5454
}
5555

5656
// NewSession creates a new Session
57-
func NewSession(sctx sessionctx.Context, sqlExec sqlexec.SQLExecutor, closeFn func()) Session {
57+
func NewSession(sctx sessionctx.Context, sqlExec sqlexec.SQLExecutor, closeFn func(Session)) Session {
5858
return &session{
5959
Context: sctx,
6060
sqlExec: sqlExec,
@@ -99,7 +99,7 @@ func (s *session) RunInTxn(ctx context.Context, fn func() error) (err error) {
9999
defer tracer.EnterPhase(tracer.Phase())
100100

101101
tracer.EnterPhase(metrics.PhaseBeginTxn)
102-
if _, err = s.ExecuteSQL(ctx, "BEGIN"); err != nil {
102+
if _, err = s.ExecuteSQL(ctx, "BEGIN OPTIMISTIC"); err != nil {
103103
return err
104104
}
105105
tracer.EnterPhase(metrics.PhaseOther)
@@ -150,7 +150,7 @@ func (s *session) ResetWithGlobalTimeZone(ctx context.Context) error {
150150
// Close closes the session
151151
func (s *session) Close() {
152152
if s.closeFn != nil {
153-
s.closeFn()
153+
s.closeFn(s)
154154
s.Context = nil
155155
s.sqlExec = nil
156156
s.closeFn = nil

ttl/ttlworker/BUILD.bazel

+7
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ go_test(
4040
name = "ttlworker_test",
4141
srcs = [
4242
"del_test.go",
43+
"job_manager_integration_test.go",
4344
"job_manager_test.go",
4445
"job_test.go",
4546
"scan_test.go",
@@ -52,15 +53,21 @@ go_test(
5253
"//parser/ast",
5354
"//parser/model",
5455
"//parser/mysql",
56+
"//session",
5557
"//sessionctx",
5658
"//sessionctx/variable",
59+
"//testkit",
5760
"//ttl/cache",
61+
"//ttl/session",
5862
"//types",
5963
"//util/chunk",
64+
"//util/logutil",
6065
"@com_github_ngaut_pools//:pools",
6166
"@com_github_pingcap_errors//:errors",
6267
"@com_github_stretchr_testify//assert",
6368
"@com_github_stretchr_testify//require",
6469
"@org_golang_x_time//rate",
70+
"@org_uber_go_atomic//:atomic",
71+
"@org_uber_go_zap//:zap",
6572
],
6673
)

ttl/ttlworker/job_manager.go

+6-7
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import (
3232
)
3333

3434
const insertNewTableIntoStatusTemplate = "INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (%d, %d)"
35-
const setTableStatusOwnerTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_id = UUID(), current_job_owner_id = '%s',current_job_start_time = '%s',current_job_status = 'waiting',current_job_status_update_time = '%s',current_job_ttl_expire = '%s',current_job_owner_hb_time = '%s' WHERE (current_job_owner_id IS NULL OR current_job_owner_hb_time < '%s') AND table_id = %d"
35+
const setTableStatusOwnerTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_id = UUID(), current_job_owner_id = '%s',current_job_start_time = '%s',current_job_status = 'waiting',current_job_status_update_time = '%s',current_job_ttl_expire = '%s',current_job_owner_hb_time = '%s' WHERE table_id = %d"
3636
const updateHeartBeatTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_owner_hb_time = '%s' WHERE table_id = %d AND current_job_owner_id = '%s'"
3737

3838
const timeFormat = "2006-01-02 15:04:05"
@@ -41,8 +41,8 @@ func insertNewTableIntoStatusSQL(tableID int64, parentTableID int64) string {
4141
return fmt.Sprintf(insertNewTableIntoStatusTemplate, tableID, parentTableID)
4242
}
4343

44-
func setTableStatusOwnerSQL(tableID int64, now time.Time, currentJobTTLExpire time.Time, maxHBTime time.Time, id string) string {
45-
return fmt.Sprintf(setTableStatusOwnerTemplate, id, now.Format(timeFormat), now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), maxHBTime.Format(timeFormat), tableID)
44+
func setTableStatusOwnerSQL(tableID int64, now time.Time, currentJobTTLExpire time.Time, id string) string {
45+
return fmt.Sprintf(setTableStatusOwnerTemplate, id, now.Format(timeFormat), now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), tableID)
4646
}
4747

4848
func updateHeartBeatSQL(tableID int64, now time.Time, id string) string {
@@ -492,11 +492,10 @@ func (m *JobManager) couldTrySchedule(table *cache.TableStatus, now time.Time) b
492492
// localJob and return it.
493493
// It could be nil, nil, if the table query doesn't return error but the job has been locked by other instances.
494494
func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time) (*ttlJob, error) {
495-
maxHBTime := now.Add(-2 * jobManagerLoopTickerInterval)
496495
var expireTime time.Time
497496

498497
err := se.RunInTxn(ctx, func() error {
499-
rows, err := se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.TableInfo.ID))
498+
rows, err := se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.ID))
500499
if err != nil {
501500
return err
502501
}
@@ -506,7 +505,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
506505
if err != nil {
507506
return err
508507
}
509-
rows, err = se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.TableInfo.ID))
508+
rows, err = se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.ID))
510509
if err != nil {
511510
return err
512511
}
@@ -527,7 +526,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
527526
return err
528527
}
529528

530-
_, err = se.ExecuteSQL(ctx, setTableStatusOwnerSQL(table.ID, now, expireTime, maxHBTime, m.id))
529+
_, err = se.ExecuteSQL(ctx, setTableStatusOwnerSQL(table.ID, now, expireTime, m.id))
531530

532531
return err
533532
})
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Copyright 2022 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package ttlworker_test
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"sync"
21+
"testing"
22+
"time"
23+
24+
"github.com/pingcap/tidb/parser/ast"
25+
"github.com/pingcap/tidb/parser/model"
26+
dbsession "github.com/pingcap/tidb/session"
27+
"github.com/pingcap/tidb/sessionctx/variable"
28+
"github.com/pingcap/tidb/testkit"
29+
"github.com/pingcap/tidb/ttl/cache"
30+
"github.com/pingcap/tidb/ttl/session"
31+
"github.com/pingcap/tidb/ttl/ttlworker"
32+
"github.com/pingcap/tidb/util/logutil"
33+
"github.com/stretchr/testify/require"
34+
"go.uber.org/atomic"
35+
"go.uber.org/zap"
36+
)
37+
38+
func TestParallelLockNewJob(t *testing.T) {
39+
store := testkit.CreateMockStore(t)
40+
41+
sessionFactory := func() session.Session {
42+
dbSession, err := dbsession.CreateSession4Test(store)
43+
require.NoError(t, err)
44+
se := session.NewSession(dbSession, dbSession, nil)
45+
46+
_, err = se.ExecuteSQL(context.Background(), "ROLLBACK")
47+
require.NoError(t, err)
48+
_, err = se.ExecuteSQL(context.Background(), "set tidb_retry_limit=0")
49+
require.NoError(t, err)
50+
51+
return se
52+
}
53+
54+
storedTTLJobRunInterval := variable.TTLJobRunInterval.Load()
55+
variable.TTLJobRunInterval.Store(0)
56+
defer func() {
57+
variable.TTLJobRunInterval.Store(storedTTLJobRunInterval)
58+
}()
59+
60+
testTable := &cache.PhysicalTable{ID: 2, TableInfo: &model.TableInfo{ID: 1, TTLInfo: &model.TTLInfo{IntervalExprStr: "1", IntervalTimeUnit: int(ast.TimeUnitDay)}}}
61+
// simply lock a new job
62+
m := ttlworker.NewJobManager("test-id", nil, store)
63+
se := sessionFactory()
64+
job, err := m.LockNewJob(context.Background(), se, testTable, time.Now())
65+
require.NoError(t, err)
66+
job.Finish(se, time.Now())
67+
68+
// lock one table in parallel, only one of them should lock successfully
69+
testTimes := 100
70+
concurrency := 5
71+
for i := 0; i < testTimes; i++ {
72+
successCounter := atomic.NewUint64(0)
73+
successJob := &ttlworker.TTLJob{}
74+
75+
wg := sync.WaitGroup{}
76+
for j := 0; j < concurrency; j++ {
77+
jobManagerID := fmt.Sprintf("test-ttl-manager-%d", j)
78+
wg.Add(1)
79+
go func() {
80+
m := ttlworker.NewJobManager(jobManagerID, nil, store)
81+
82+
se := sessionFactory()
83+
job, err := m.LockNewJob(context.Background(), se, testTable, time.Now())
84+
if err == nil {
85+
successCounter.Add(1)
86+
successJob = job
87+
} else {
88+
logutil.BgLogger().Error("lock new job with error", zap.Error(err))
89+
}
90+
wg.Done()
91+
}()
92+
}
93+
wg.Wait()
94+
95+
require.Equal(t, uint64(1), successCounter.Load())
96+
successJob.Finish(se, time.Now())
97+
}
98+
}

ttl/ttlworker/job_manager_test.go

+20-4
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/pingcap/tidb/parser/mysql"
2525
"github.com/pingcap/tidb/sessionctx/variable"
2626
"github.com/pingcap/tidb/ttl/cache"
27+
"github.com/pingcap/tidb/ttl/session"
2728
"github.com/pingcap/tidb/types"
2829
"github.com/pingcap/tidb/util/chunk"
2930
"github.com/stretchr/testify/assert"
@@ -139,6 +140,22 @@ func (m *JobManager) SetScanWorkers4Test(workers []worker) {
139140
m.scanWorkers = workers
140141
}
141142

143+
// TTLJob exports the ttlJob for test
144+
type TTLJob = ttlJob
145+
146+
// LockNewJob is an exported version of lockNewJob for test
147+
func (m *JobManager) LockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time) (*TTLJob, error) {
148+
return m.lockNewJob(ctx, se, table, now)
149+
}
150+
151+
func (j *ttlJob) Finish(se session.Session, now time.Time) {
152+
j.finish(se, now)
153+
}
154+
155+
func (j *ttlJob) ID() string {
156+
return j.id
157+
}
158+
142159
func newMockTTLJob(tbl *cache.PhysicalTable, status cache.JobStatus) *ttlJob {
143160
statistics := &ttlStatistics{}
144161
return &ttlJob{tbl: tbl, ctx: context.Background(), statistics: statistics, status: status, tasks: []*ttlScanTask{{ctx: context.Background(), tbl: tbl, statistics: statistics}}}
@@ -195,7 +212,6 @@ func TestReadyForNewJobTables(t *testing.T) {
195212
func TestLockNewTable(t *testing.T) {
196213
now, err := time.Parse(timeFormat, "2022-12-05 17:13:05")
197214
assert.NoError(t, err)
198-
maxHBTime := now.Add(-2 * jobManagerLoopTickerInterval)
199215
expireTime := now
200216

201217
testPhysicalTable := &cache.PhysicalTable{ID: 1, TableInfo: &model.TableInfo{ID: 1, TTLInfo: &model.TTLInfo{ColumnName: model.NewCIStr("test"), IntervalExprStr: "5 Year"}}}
@@ -219,7 +235,7 @@ func TestLockNewTable(t *testing.T) {
219235
newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil,
220236
},
221237
{
222-
setTableStatusOwnerSQL(1, now, expireTime, maxHBTime, "test-id"),
238+
setTableStatusOwnerSQL(1, now, expireTime, "test-id"),
223239
nil, nil,
224240
},
225241
{
@@ -241,7 +257,7 @@ func TestLockNewTable(t *testing.T) {
241257
newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil,
242258
},
243259
{
244-
setTableStatusOwnerSQL(1, now, expireTime, maxHBTime, "test-id"),
260+
setTableStatusOwnerSQL(1, now, expireTime, "test-id"),
245261
nil, nil,
246262
},
247263
{
@@ -255,7 +271,7 @@ func TestLockNewTable(t *testing.T) {
255271
newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil,
256272
},
257273
{
258-
setTableStatusOwnerSQL(1, now, expireTime, maxHBTime, "test-id"),
274+
setTableStatusOwnerSQL(1, now, expireTime, "test-id"),
259275
nil, errors.New("test error message"),
260276
},
261277
}, false, true},

ttl/ttlworker/session.go

+17-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package ttlworker
1616

1717
import (
1818
"context"
19+
"fmt"
1920
"time"
2021

2122
"github.com/ngaut/pools"
@@ -26,7 +27,9 @@ import (
2627
"github.com/pingcap/tidb/ttl/metrics"
2728
"github.com/pingcap/tidb/ttl/session"
2829
"github.com/pingcap/tidb/util/chunk"
30+
"github.com/pingcap/tidb/util/logutil"
2931
"github.com/pingcap/tidb/util/sqlexec"
32+
"go.uber.org/zap"
3033
)
3134

3235
type sessionPool interface {
@@ -57,10 +60,23 @@ func getSession(pool sessionPool) (session.Session, error) {
5760
return nil, errors.Errorf("%T cannot be casted to sqlexec.SQLExecutor", sctx)
5861
}
5962

60-
se := session.NewSession(sctx, exec, func() {
63+
originalRetryLimit := sctx.GetSessionVars().RetryLimit
64+
se := session.NewSession(sctx, exec, func(se session.Session) {
65+
_, err = se.ExecuteSQL(context.Background(), fmt.Sprintf("set tidb_retry_limit=%d", originalRetryLimit))
66+
if err != nil {
67+
logutil.BgLogger().Error("fail to reset tidb_retry_limit", zap.Int64("originalRetryLimit", originalRetryLimit), zap.Error(err))
68+
}
69+
6170
pool.Put(resource)
6271
})
6372

73+
// store and set the retry limit to 0
74+
_, err = se.ExecuteSQL(context.Background(), "set tidb_retry_limit=0")
75+
if err != nil {
76+
se.Close()
77+
return nil, err
78+
}
79+
6480
// Force rollback the session to guarantee the session is not in any explicit transaction
6581
if _, err = se.ExecuteSQL(context.Background(), "ROLLBACK"); err != nil {
6682
se.Close()

0 commit comments

Comments
 (0)