From e287edfdcd6011808bff6999b5518264ec0dabb9 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Mon, 9 Oct 2023 16:14:00 -0500 Subject: [PATCH] fix concurrent access to jobs map (#589) * fix concurrent access to jobs map * replace all internal uses of RemoveByReference --- scheduler.go | 105 ++++++++++++++++++++++------------------------ scheduler_test.go | 60 ++++++++++++++++---------- 2 files changed, 90 insertions(+), 75 deletions(-) diff --git a/scheduler.go b/scheduler.go index e9e6507d..72f5c0c3 100644 --- a/scheduler.go +++ b/scheduler.go @@ -104,11 +104,13 @@ func (s *Scheduler) StartAsync() { func (s *Scheduler) start() { s.executor.start() s.setRunning(true) - s.runJobs(s.jobsMap()) + s.runJobs() } -func (s *Scheduler) runJobs(jobs map[uuid.UUID]*Job) { - for _, job := range jobs { +func (s *Scheduler) runJobs() { + s.jobsMutex.RLock() + defer s.jobsMutex.RUnlock() + for _, job := range s.jobs { ctx, cancel := context.WithCancel(context.Background()) job.mu.Lock() job.ctx = ctx @@ -151,12 +153,6 @@ func (s *Scheduler) JobsMap() map[uuid.UUID]*Job { return jobs } -func (s *Scheduler) jobsMap() map[uuid.UUID]*Job { - s.jobsMutex.RLock() - defer s.jobsMutex.RUnlock() - return s.jobs -} - // Name sets the name of the current job. // // If the scheduler is running using WithDistributedLocker(), the job name is used @@ -167,12 +163,6 @@ func (s *Scheduler) Name(name string) *Scheduler { return s } -func (s *Scheduler) setJobs(jobs map[uuid.UUID]*Job) { - s.jobsMutex.Lock() - defer s.jobsMutex.Unlock() - s.jobs = jobs -} - // Len returns the number of Jobs in the Scheduler func (s *Scheduler) Len() int { s.jobsMutex.RLock() @@ -231,7 +221,7 @@ func (s *Scheduler) scheduleNextRun(job *Job) (bool, nextRun) { } if !job.shouldRun() { - s.RemoveByReference(job) + _ = s.RemoveByID(job) return false, nextRun{} } @@ -510,13 +500,15 @@ func (s *Scheduler) roundToMidnightAndAddDSTAware(t time.Time, d time.Duration) // NextRun datetime when the next Job should run. func (s *Scheduler) NextRun() (*Job, time.Time) { - if len(s.jobsMap()) <= 0 { + s.jobsMutex.RLock() + defer s.jobsMutex.RUnlock() + if len(s.jobs) <= 0 { return nil, time.Time{} } var jobID uuid.UUID var nearestRun time.Time - for _, job := range s.jobsMap() { + for _, job := range s.jobs { nr := job.NextRun() if (nr.Before(nearestRun) || nearestRun.IsZero()) && s.now().Before(nr) { nearestRun = nr @@ -524,7 +516,7 @@ func (s *Scheduler) NextRun() (*Job, time.Time) { } } - return s.jobsMap()[jobID], nearestRun + return s.jobs[jobID], nearestRun } // EveryRandom schedules a new period Job that runs at random intervals @@ -650,7 +642,9 @@ func (s *Scheduler) RunAll() { // RunAllWithDelay runs all Jobs with the provided delay in between each Job func (s *Scheduler) RunAllWithDelay(d time.Duration) { - for _, job := range s.jobsMap() { + s.jobsMutex.RLock() + defer s.jobsMutex.RUnlock() + for _, job := range s.jobs { s.run(job) s.time.Sleep(d) } @@ -698,16 +692,13 @@ func (s *Scheduler) Remove(job interface{}) { // RemoveByReference removes specific Job by reference func (s *Scheduler) RemoveByReference(job *Job) { - s.removeJobsUniqueTags(job) - s.removeByCondition(func(someJob *Job) bool { - job.mu.RLock() - defer job.mu.RUnlock() - return someJob == job - }) + _ = s.RemoveByID(job) } func (s *Scheduler) findJobByTaskName(name string) *Job { - for _, job := range s.jobsMap() { + s.jobsMutex.RLock() + defer s.jobsMutex.RUnlock() + for _, job := range s.jobs { if job.funcName == name { return job } @@ -727,15 +718,14 @@ func (s *Scheduler) removeJobsUniqueTags(job *Job) { } func (s *Scheduler) removeByCondition(shouldRemove func(*Job) bool) { - retainedJobs := make(map[uuid.UUID]*Job, 0) - for _, job := range s.jobsMap() { - if !shouldRemove(job) { - retainedJobs[job.id] = job - } else { + s.jobsMutex.Lock() + defer s.jobsMutex.Unlock() + for _, job := range s.jobs { + if shouldRemove(job) { s.stopJob(job) + delete(s.jobs, job.id) } } - s.setJobs(retainedJobs) } func (s *Scheduler) stopJob(job *Job) { @@ -760,7 +750,7 @@ func (s *Scheduler) RemoveByTags(tags ...string) error { } for _, job := range jobs { - s.RemoveByReference(job) + _ = s.RemoveByID(job) } return nil } @@ -780,7 +770,7 @@ func (s *Scheduler) RemoveByTagsAny(tags ...string) error { } for job := range mJob { - s.RemoveByReference(job) + _ = s.RemoveByID(job) } return errs @@ -788,9 +778,12 @@ func (s *Scheduler) RemoveByTagsAny(tags ...string) error { // RemoveByID removes the job from the scheduler looking up by id func (s *Scheduler) RemoveByID(job *Job) error { - if _, ok := s.jobsMap()[job.id]; ok { + s.jobsMutex.Lock() + defer s.jobsMutex.Unlock() + if _, ok := s.jobs[job.id]; ok { + s.removeJobsUniqueTags(job) s.stopJob(job) - delete(s.jobsMap(), job.id) + delete(s.jobs, job.id) return nil } return ErrJobNotFound @@ -800,8 +793,10 @@ func (s *Scheduler) RemoveByID(job *Job) error { func (s *Scheduler) FindJobsByTag(tags ...string) ([]*Job, error) { var jobs []*Job + s.jobsMutex.RLock() + defer s.jobsMutex.RUnlock() Jobs: - for _, job := range s.jobsMap() { + for _, job := range s.jobs { if job.hasTags(tags...) { jobs = append(jobs, job) continue Jobs @@ -872,7 +867,9 @@ func (s *Scheduler) SingletonModeAll() { // TaskPresent checks if specific job's function was added to the scheduler. func (s *Scheduler) TaskPresent(j interface{}) bool { - for _, job := range s.jobsMap() { + s.jobsMutex.RLock() + defer s.jobsMutex.RUnlock() + for _, job := range s.jobs { if job.funcName == getFunctionName(j) { return true } @@ -880,25 +877,21 @@ func (s *Scheduler) TaskPresent(j interface{}) bool { return false } -// To avoid the recursive read lock on s.jobsMap() and this function, -// creating this new function and distributing the lock between jobPresent, _jobPresent -func (s *Scheduler) _jobPresent(j *Job, jobs map[uuid.UUID]*Job) bool { +func (s *Scheduler) jobPresent(j *Job) bool { s.jobsMutex.RLock() defer s.jobsMutex.RUnlock() - if _, ok := jobs[j.id]; ok { + if _, ok := s.jobs[j.id]; ok { return true } return false } -func (s *Scheduler) jobPresent(j *Job) bool { - return s._jobPresent(j, s.jobsMap()) -} - // Clear clears all Jobs from this scheduler func (s *Scheduler) Clear() { s.stopJobs() - s.setJobs(make(map[uuid.UUID]*Job, 0)) + s.jobsMutex.Lock() + defer s.jobsMutex.Unlock() + s.jobs = make(map[uuid.UUID]*Job) // If unique tags was enabled, delete all the tags loaded in the tags sync.Map if s.tagsUnique { s.tags.Range(func(key interface{}, value interface{}) bool { @@ -954,7 +947,7 @@ func (s *Scheduler) doCommon(jobFun interface{}, params ...interface{}) (*Job, e if job.error != nil { // delete the job from the scheduler as this job // cannot be executed - s.RemoveByReference(job) + _ = s.RemoveByID(job) return nil, job.error } @@ -965,7 +958,7 @@ func (s *Scheduler) doCommon(jobFun interface{}, params ...interface{}) (*Job, e if val.Kind() != reflect.Func { // delete the job for the same reason as above - s.RemoveByReference(job) + _ = s.RemoveByID(job) return nil, ErrNotAFunction } @@ -992,13 +985,13 @@ func (s *Scheduler) doCommon(jobFun interface{}, params ...interface{}) (*Job, e } if len(params) != expectedParamLength { - s.RemoveByReference(job) + _ = s.RemoveByID(job) job.error = wrapOrError(job.error, ErrWrongParams) return nil, job.error } if job.runWithDetails && val.Type().In(len(params)).Kind() != reflect.ValueOf(*job).Kind() { - s.RemoveByReference(job) + _ = s.RemoveByID(job) job.error = wrapOrError(job.error, ErrDoWithJobDetails) return nil, job.error } @@ -1078,7 +1071,9 @@ func (s *Scheduler) Tag(t ...string) *Scheduler { // GetAllTags returns all tags. func (s *Scheduler) GetAllTags() []string { var tags []string - for _, job := range s.jobsMap() { + s.jobsMutex.RLock() + defer s.jobsMutex.RUnlock() + for _, job := range s.jobs { tags = append(tags, job.Tags()...) } return tags @@ -1514,7 +1509,9 @@ func (s *Scheduler) WithDistributedElector(e Elector) { // If a new job is added, an additional call to this method, or the job specific // version must be executed in order for the new job to trigger event listeners. func (s *Scheduler) RegisterEventListeners(eventListeners ...EventListener) { - for _, job := range s.jobsMap() { + s.jobsMutex.RLock() + defer s.jobsMutex.RUnlock() + for _, job := range s.jobs { job.RegisterEventListeners(eventListeners...) } } diff --git a/scheduler_test.go b/scheduler_test.go index 0b024731..d794dee6 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -588,9 +588,11 @@ func TestScheduler_RemoveByReference(t *testing.T) { assert.Equal(t, 2, s.Len(), "Incorrect number of jobs") - s.RemoveByReference(job1) - assert.NotContains(t, s.jobsMap(), job1.id) - assert.Contains(t, s.jobsMap(), job2.id) + _ = s.RemoveByID(job1) + s.jobsMutex.RLock() + defer s.jobsMutex.RUnlock() + assert.NotContains(t, s.jobs, job1.id) + assert.Contains(t, s.jobs, job2.id) }) t.Run("remove from running scheduler", func(t *testing.T) { @@ -605,7 +607,7 @@ func TestScheduler_RemoveByReference(t *testing.T) { s.StartAsync() - s.RemoveByReference(j) + _ = s.RemoveByID(j) select { case <-time.After(200 * time.Millisecond): @@ -631,15 +633,19 @@ func TestScheduler_RemoveByTags(t *testing.T) { j2, err := s.Every(1).Second().Tag(tag2).Do(taskWithParams, 2, "world") // index 1 require.NoError(t, err) + s.jobsMutex.RLock() // check j1 tags is equal with tag "a" (tag1) - assert.Equal(t, s.jobsMap()[j1.id].Tags()[0], tag1, "Job With Tag 'a' is removed from index 0") + assert.Equal(t, s.jobs[j1.id].Tags()[0], tag1, "Job With Tag 'a' is removed from index 0") + s.jobsMutex.RUnlock() err = s.RemoveByTags(tag1) require.NoError(t, err) assert.Equal(t, 1, s.Len(), "Incorrect number of jobs after removing 1 job") + s.jobsMutex.RLock() // check j2 tags is equal with tag "tag two" (tag2) after removing "a" - assert.Equal(t, s.jobsMap()[j2.id].Tags()[0], tag2, "Job With Tag 'tag two' is removed from index 0") + assert.Equal(t, s.jobs[j2.id].Tags()[0], tag2, "Job With Tag 'tag two' is removed from index 0") + s.jobsMutex.RUnlock() // Removing Non-Existent Job with "a" because already removed above (will not removing any jobs because tag not match) err = s.RemoveByTags(tag1) @@ -679,16 +685,20 @@ func TestScheduler_RemoveByTags(t *testing.T) { require.NoError(t, err) // check j1 tags contains tag "a" (tag1) and "abc" (tag3) - assert.Contains(t, s.jobsMap()[j1.id].Tags(), tag1, "Job With Tag 'a' is removed from index 0") - assert.Contains(t, s.jobsMap()[j1.id].Tags(), tag3, "Job With Tag 'abc' is removed from index 0") + s.jobsMutex.RLock() + assert.Contains(t, s.jobs[j1.id].Tags(), tag1, "Job With Tag 'a' is removed from index 0") + assert.Contains(t, s.jobs[j1.id].Tags(), tag3, "Job With Tag 'abc' is removed from index 0") + s.jobsMutex.RUnlock() err = s.RemoveByTags(tag1, tag3) require.NoError(t, err) assert.Equal(t, 1, s.Len(), "Incorrect number of jobs after removing 1 job") // check j2 tags is equal with tag "a" (tag1) and "ab" (tag2) after removing "a"+"abc" - assert.Contains(t, s.jobsMap()[j2.id].Tags(), tag1, "Job With Tag 'a' is removed from index 0") - assert.Contains(t, s.jobsMap()[j2.id].Tags(), tag2, "Job With Tag 'ab' is removed from index 0") + s.jobsMutex.RLock() + assert.Contains(t, s.jobs[j2.id].Tags(), tag1, "Job With Tag 'a' is removed from index 0") + assert.Contains(t, s.jobs[j2.id].Tags(), tag2, "Job With Tag 'ab' is removed from index 0") + s.jobsMutex.RUnlock() // Removing Non-Existent Job with "a"+"abc" because already removed above (will not removing any jobs because tag not match) err = s.RemoveByTags(tag1, tag3) @@ -730,7 +740,9 @@ func TestScheduler_RemoveByTagsAny(t *testing.T) { require.NoError(t, err) // check j1 tags is equal with tag "a" (tag1) - assert.Equal(t, s.jobsMap()[j1.id].Tags()[0], tag1, "Job With Tag 'a' is removed from index 0") + s.jobsMutex.RLock() + assert.Equal(t, s.jobs[j1.id].Tags()[0], tag1, "Job With Tag 'a' is removed from index 0") + s.jobsMutex.RUnlock() err = s.RemoveByTagsAny(tag1, tag2) require.NoError(t, err) @@ -774,8 +786,10 @@ func TestScheduler_RemoveByTagsAny(t *testing.T) { require.NoError(t, err) // check j1 tags contains tag "a" (tag1) and "abc" (tag3) - assert.Contains(t, s.jobsMap()[j1.id].Tags(), tag1, "Job With Tag 'a' is removed from index 0") - assert.Contains(t, s.jobsMap()[j1.id].Tags(), tag3, "Job With Tag 'abc' is removed from index 0") + s.jobsMutex.RLock() + assert.Contains(t, s.jobs[j1.id].Tags(), tag1, "Job With Tag 'a' is removed from index 0") + assert.Contains(t, s.jobs[j1.id].Tags(), tag3, "Job With Tag 'abc' is removed from index 0") + s.jobsMutex.RUnlock() err = s.RemoveByTagsAny(tag1, tag2, tag3) require.NoError(t, err) @@ -814,8 +828,10 @@ func TestScheduler_Jobs(t *testing.T) { _, _ = s.Every(2).Minutes().Do(task) _, _ = s.Every(3).Minutes().Do(task) _, _ = s.Every(4).Minutes().Do(task) - js := s.jobsMap() - assert.Len(t, js, 4) + + s.jobsMutex.RLock() + defer s.jobsMutex.RUnlock() + assert.Len(t, s.jobs, 4) } func TestScheduler_Len(t *testing.T) { @@ -1476,7 +1492,8 @@ func TestScheduler_SingletonMode(t *testing.T) { time.Sleep(200 * time.Millisecond) if tc.removeJob { - s.RemoveByReference(j) + err = s.RemoveByID(j) + require.NoError(t, err) time.Sleep(300 * time.Millisecond) } s.Stop() @@ -1513,7 +1530,8 @@ func TestScheduler_SingletonModeAll(t *testing.T) { time.Sleep(200 * time.Millisecond) if tc.removeJob { - s.RemoveByReference(j) + err = s.RemoveByID(j) + require.NoError(t, err) time.Sleep(300 * time.Millisecond) } s.Stop() @@ -1675,9 +1693,9 @@ func TestScheduler_SetMaxConcurrentJobs(t *testing.T) { } if tc.removeJobs { - s.RemoveByReference(j1) - s.RemoveByReference(j2) - s.RemoveByReference(j3) + _ = s.RemoveByID(j1) + _ = s.RemoveByID(j2) + _ = s.RemoveByID(j3) } else { s.Stop() } @@ -1894,7 +1912,7 @@ func TestScheduler_Update(t *testing.T) { require.NoError(t, err) time.Sleep(550 * time.Millisecond) - s.RemoveByReference(j) + _ = s.RemoveByID(j) j, err = s.Every("750ms").WaitForSchedule().Do(func() { counterMutex.Lock(); defer counterMutex.Unlock(); counter++ }) require.NoError(t, err)