Skip to content

Commit e245a93

Browse files
authored
ttl: fix ttl job manager will panic if the status cache doesn't contain table (#41069)
close #41067, close #41068
1 parent 1e54ee4 commit e245a93

File tree

2 files changed

+6
-7
lines changed

2 files changed

+6
-7
lines changed

ttl/ttlworker/job_manager.go

+5-6
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ func (m *JobManager) jobLoop() error {
153153

154154
scheduleTaskTicker := time.Tick(getTaskManagerLoopTickerInterval())
155155
updateTaskHeartBeatTicker := time.Tick(ttlTaskHeartBeatTickerInterval)
156-
taskCheckTicker := time.Tick(getTaskManagerLoopTickerInterval())
156+
taskCheckTicker := time.Tick(time.Second * 5)
157157
checkScanTaskFinishedTicker := time.Tick(getTaskManagerLoopTickerInterval())
158158

159159
cmdWatcher := m.cmdCli.WatchCommand(m.ctx)
@@ -535,6 +535,7 @@ func (m *JobManager) couldTrySchedule(tableStatus *cache.TableStatus, table *cac
535535
// It could be nil, nil, if the table query doesn't return error but the job has been locked by other instances.
536536
func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time, ignoreScheduleInterval bool) (*ttlJob, error) {
537537
var expireTime time.Time
538+
var jobID string
538539

539540
err := se.RunInTxn(ctx, func() error {
540541
sql, args := cache.SelectFromTTLTableStatusWithID(table.ID)
@@ -574,7 +575,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
574575
return err
575576
}
576577

577-
jobID := uuid.New().String()
578+
jobID = uuid.New().String()
578579
jobExist := false
579580
if len(tableStatus.CurrentJobID) > 0 {
580581
// don't create new job if there is already one running
@@ -629,7 +630,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
629630
return nil, err
630631
}
631632

632-
job := m.createNewJob(expireTime, now, table)
633+
job := m.createNewJob(jobID, expireTime, now, table)
633634

634635
// job is created, notify every scan managers to fetch new tasks
635636
err = m.notificationCli.Notify(m.ctx, scanTaskNotificationType, job.id)
@@ -639,9 +640,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
639640
return job, nil
640641
}
641642

642-
func (m *JobManager) createNewJob(expireTime time.Time, now time.Time, table *cache.PhysicalTable) *ttlJob {
643-
id := m.tableStatusCache.Tables[table.ID].CurrentJobID
644-
643+
func (m *JobManager) createNewJob(id string, expireTime time.Time, now time.Time, table *cache.PhysicalTable) *ttlJob {
645644
return &ttlJob{
646645
id: id,
647646
ownerID: m.id,

ttl/ttlworker/scan.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s
166166
zap.String("SQL", sql),
167167
zap.Int("retryTimes", retryTimes),
168168
zap.Bool("needRetry", needRetry),
169-
zap.Error(err),
169+
zap.Error(sqlErr),
170170
)
171171

172172
if !needRetry {

0 commit comments

Comments
 (0)