@@ -18,7 +18,9 @@ import (
18
18
"context"
19
19
"time"
20
20
21
+ "github.com/google/uuid"
21
22
"github.com/pingcap/errors"
23
+ "github.com/pingcap/failpoint"
22
24
"github.com/pingcap/tidb/kv"
23
25
"github.com/pingcap/tidb/sessionctx/variable"
24
26
"github.com/pingcap/tidb/ttl/cache"
@@ -32,7 +34,7 @@ import (
32
34
33
35
const insertNewTableIntoStatusTemplate = "INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (%?, %?)"
34
36
const setTableStatusOwnerTemplate = `UPDATE mysql.tidb_ttl_table_status
35
- SET current_job_id = UUID() ,
37
+ SET current_job_id = %? ,
36
38
current_job_owner_id = %?,
37
39
current_job_start_time = %?,
38
40
current_job_status = 'waiting',
@@ -48,8 +50,8 @@ func insertNewTableIntoStatusSQL(tableID int64, parentTableID int64) (string, []
48
50
return insertNewTableIntoStatusTemplate , []interface {}{tableID , parentTableID }
49
51
}
50
52
51
- func setTableStatusOwnerSQL (tableID int64 , now time.Time , currentJobTTLExpire time.Time , id string ) (string , []interface {}) {
52
- return setTableStatusOwnerTemplate , []interface {}{id , now .Format (timeFormat ), now .Format (timeFormat ), currentJobTTLExpire .Format (timeFormat ), now .Format (timeFormat ), tableID }
53
+ func setTableStatusOwnerSQL (jobID string , tableID int64 , now time.Time , currentJobTTLExpire time.Time , id string ) (string , []interface {}) {
54
+ return setTableStatusOwnerTemplate , []interface {}{jobID , id , now .Format (timeFormat ), now .Format (timeFormat ), currentJobTTLExpire .Format (timeFormat ), now .Format (timeFormat ), tableID }
53
55
}
54
56
55
57
func updateHeartBeatSQL (tableID int64 , now time.Time , id string ) (string , []interface {}) {
@@ -508,6 +510,7 @@ func (m *JobManager) couldTrySchedule(table *cache.TableStatus, now time.Time) b
508
510
// It could be nil, nil, if the table query doesn't return error but the job has been locked by other instances.
509
511
func (m * JobManager ) lockNewJob (ctx context.Context , se session.Session , table * cache.PhysicalTable , now time.Time ) (* ttlJob , error ) {
510
512
var expireTime time.Time
513
+ var jobID string
511
514
512
515
err := se .RunInTxn (ctx , func () error {
513
516
sql , args := cache .SelectFromTTLTableStatusWithID (table .ID )
@@ -544,7 +547,12 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
544
547
return err
545
548
}
546
549
547
- sql , args = setTableStatusOwnerSQL (table .ID , now , expireTime , m .id )
550
+ jobID = uuid .New ().String ()
551
+ failpoint .Inject ("set-job-uuid" , func (val failpoint.Value ) {
552
+ jobID = val .(string )
553
+ })
554
+
555
+ sql , args = setTableStatusOwnerSQL (jobID , table .ID , now , expireTime , m .id )
548
556
_ , err = se .ExecuteSQL (ctx , sql , args ... )
549
557
return errors .Wrapf (err , "execute sql: %s" , sql )
550
558
})
@@ -561,12 +569,10 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
561
569
if err != nil {
562
570
return nil , err
563
571
}
564
- return m .createNewJob (expireTime , now , table )
572
+ return m .createNewJob (jobID , expireTime , now , table )
565
573
}
566
574
567
- func (m * JobManager ) createNewJob (expireTime time.Time , now time.Time , table * cache.PhysicalTable ) (* ttlJob , error ) {
568
- id := m .tableStatusCache .Tables [table .ID ].CurrentJobID
569
-
575
+ func (m * JobManager ) createNewJob (id string , expireTime time.Time , now time.Time , table * cache.PhysicalTable ) (* ttlJob , error ) {
570
576
statistics := & ttlStatistics {}
571
577
572
578
ranges , err := table .SplitScanRanges (m .ctx , m .store , splitScanCount )
0 commit comments