Skip to content

Commit

Permalink
ttl: reschedule scan tasks after update task state (#39891)
Browse files Browse the repository at this point in the history
close #39890
  • Loading branch information
YangKeao committed Dec 20, 2022
1 parent 06290cd commit f9bccbd
Showing 1 changed file with 10 additions and 3 deletions.
13 changes: 10 additions & 3 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,13 @@ func (m *JobManager) jobLoop() error {
}
cancel()
case <-updateScanTaskStateTicker:
m.updateTaskState()
if m.updateTaskState() {
m.rescheduleJobs(se, now)
}
case <-m.notifyStateCh:
m.updateTaskState()
if m.updateTaskState() {
m.rescheduleJobs(se, now)
}
case <-jobCheckTicker:
m.checkFinishedJob(se, now)
m.checkNotOwnJob()
Expand Down Expand Up @@ -212,7 +216,8 @@ func (m *JobManager) resizeWorkers(workers []worker, count int, factory func() w
return workers, nil
}

func (m *JobManager) updateTaskState() {
// updateTaskState polls the result from scan worker and returns whether there are result polled
func (m *JobManager) updateTaskState() bool {
results := m.pollScanWorkerResults()
for _, result := range results {
job := findJobWithTableID(m.runningJobs, result.task.tbl.ID)
Expand All @@ -223,6 +228,8 @@ func (m *JobManager) updateTaskState() {
job.scanTaskErr = multierr.Append(job.scanTaskErr, result.err)
}
}

return len(results) > 0
}

func (m *JobManager) pollScanWorkerResults() []*ttlScanTaskExecResult {
Expand Down

0 comments on commit f9bccbd

Please sign in to comment.