diff --git a/.github/workflows/go_test.yml b/.github/workflows/go_test.yml index 0a5e8e1c..9f19d0bd 100644 --- a/.github/workflows/go_test.yml +++ b/.github/workflows/go_test.yml @@ -13,7 +13,6 @@ jobs: matrix: go-version: - "1.21" - - "1.20" name: lint and test runs-on: ubuntu-latest steps: diff --git a/errors.go b/errors.go index 021cc7a1..eb8c0a1f 100644 --- a/errors.go +++ b/errors.go @@ -29,6 +29,7 @@ var ( ErrWithDistributedElector = fmt.Errorf("gocron: WithDistributedElector: elector must not be nil") ErrWithClockNil = fmt.Errorf("gocron: WithClock: clock must not be nil") ErrWithLocationNil = fmt.Errorf("gocron: WithLocation: location must not be nil") + ErrWithLoggerNil = fmt.Errorf("gocron: WithLogger: logger must not be nil") ErrWithNameEmpty = fmt.Errorf("gocron: WithName: name must not be empty") ErrWithStartDateTimePast = fmt.Errorf("gocron: WithStartDateTime: start must not be in the past") ) diff --git a/example_test.go b/example_test.go index 099825cd..e0c9d47c 100644 --- a/example_test.go +++ b/example_test.go @@ -2,6 +2,7 @@ package gocron_test import ( "fmt" + "log/slog" "sync" "time" @@ -525,6 +526,14 @@ func ExampleWithLocation() { ) } +func ExampleWithLogger() { + _, _ = NewScheduler( + WithLogger( + NewJsonSlogLogger(slog.LevelInfo), + ), + ) +} + func ExampleWithName() { s, _ := NewScheduler() diff --git a/executor.go b/executor.go index 0458a9a9..b53403d6 100644 --- a/executor.go +++ b/executor.go @@ -2,6 +2,7 @@ package gocron import ( "context" + "strconv" "sync" "time" @@ -11,6 +12,7 @@ import ( type executor struct { ctx context.Context cancel context.CancelFunc + logger Logger stopCh chan struct{} jobsIDsIn chan uuid.UUID jobIDsOut chan uuid.UUID @@ -38,6 +40,8 @@ type limitModeConfig struct { } func (e *executor) start() { + e.logger.Debug("executor started") + // creating the executor's context here as the executor // is the only goroutine that should access this context // any other uses within the executor should create a context @@ -69,7 +73,7 @@ func (e *executor) start() { // if not, spin up the required number i.e. limit! e.limitMode.started = true for i := e.limitMode.limit; i > 0; i-- { - go e.limitModeRunner(e.limitMode.in, e.limitMode.done, e.limitMode.mode, e.limitMode.rescheduleLimiter) + go e.limitModeRunner("limitMode-"+strconv.Itoa(int(i)), e.limitMode.in, e.limitMode.done, e.limitMode.mode, e.limitMode.rescheduleLimiter) } } @@ -129,7 +133,7 @@ func (e *executor) start() { runner.rescheduleLimiter = make(chan struct{}, 1) } e.singletonRunners[id] = runner - go e.limitModeRunner(runner.in, runner.done, j.singletonLimitMode, runner.rescheduleLimiter) + go e.limitModeRunner("singleton-"+id.String(), runner.in, runner.done, j.singletonLimitMode, runner.rescheduleLimiter) } if j.singletonLimitMode == LimitModeReschedule { @@ -163,6 +167,7 @@ func (e *executor) start() { } }() case <-e.stopCh: + e.logger.Debug("stopping executor") // we've been asked to stop. This is either because the scheduler has been told // to stop all jobs or the scheduler has been asked to completely shutdown. // @@ -181,6 +186,7 @@ func (e *executor) start() { // wait for standard jobs to complete go func() { + e.logger.Debug("waiting for standard jobs to complete") go func() { // this is done in a separate goroutine, so we aren't // blocked by the WaitGroup's Wait call in the event @@ -188,13 +194,16 @@ func (e *executor) start() { // This particular goroutine could leak in the event that // some long-running standard job doesn't complete. standardJobsWg.Wait() + e.logger.Debug("standard jobs completed") waitForJobs <- struct{}{} }() <-waiterCtx.Done() + }() // wait for per job singleton limit mode runner jobs to complete go func() { + e.logger.Debug("waiting for singleton jobs to complete") For: for _, sr := range e.singletonRunners { select { @@ -207,12 +216,14 @@ func (e *executor) start() { case <-waiterCtx.Done(): return default: + e.logger.Debug("singleton jobs completed") waitForSingletons <- struct{}{} } }() // wait for limit mode runners to complete go func() { + e.logger.Debug("waiting for limit mode jobs to complete") if e.limitMode != nil { For: for i := e.limitMode.limit; i > 0; i-- { @@ -227,6 +238,7 @@ func (e *executor) start() { case <-waiterCtx.Done(): return default: + e.logger.Debug("limit mode jobs completed") waitForLimitMode <- struct{}{} } }() @@ -252,8 +264,10 @@ func (e *executor) start() { } if count < 3 { e.done <- ErrStopTimedOut + e.logger.Debug("executor stopped - timed out") } else { e.done <- nil + e.logger.Debug("executor stopped") } waiterCancel() return @@ -261,7 +275,8 @@ func (e *executor) start() { } } -func (e *executor) limitModeRunner(in chan uuid.UUID, done chan struct{}, limitMode LimitMode, rescheduleLimiter chan struct{}) { +func (e *executor) limitModeRunner(name string, in chan uuid.UUID, done chan struct{}, limitMode LimitMode, rescheduleLimiter chan struct{}) { + e.logger.Debug("limitModeRunner starting", "name", name) for { select { case id := <-in: @@ -280,6 +295,7 @@ func (e *executor) limitModeRunner(in chan uuid.UUID, done chan struct{}, limitM } } case <-e.ctx.Done(): + e.logger.Debug("limitModeRunner shutting down", "name", name) select { case done <- struct{}{}: default: diff --git a/go.mod b/go.mod index e365b58e..2f41e002 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/go-co-op/gocron/v2 -go 1.20 +go 1.21 require ( github.com/google/uuid v1.4.0 diff --git a/job.go b/job.go index 8b9f5f51..2c9ebb3b 100644 --- a/job.go +++ b/job.go @@ -5,13 +5,13 @@ import ( "errors" "fmt" "math/rand" + "slices" "strings" "time" "github.com/google/uuid" "github.com/jonboulle/clockwork" "github.com/robfig/cron/v3" - "golang.org/x/exp/slices" ) // internalJob stores the information needed by the scheduler diff --git a/job_test.go b/job_test.go index 7e52beda..82a521cb 100644 --- a/job_test.go +++ b/job_test.go @@ -313,7 +313,7 @@ func TestJob_LastRun(t *testing.T) { testTime := time.Date(2000, 1, 1, 0, 0, 0, 0, time.Local) fakeClock := clockwork.NewFakeClockAt(testTime) - s, err := NewScheduler( + s, err := newTestScheduler( WithClock(fakeClock), ) require.NoError(t, err) diff --git a/logger.go b/logger.go new file mode 100644 index 00000000..25f43cec --- /dev/null +++ b/logger.go @@ -0,0 +1,67 @@ +package gocron + +import ( + "log/slog" + "os" +) + +// Logger is the interface that wraps the basic logging methods +// used by gocron. The methods are modeled after the standard +// library slog package. The default logger is a no-op logger. +// To enable logging, use one of the provided New*Logger functions +// or implement your own Logger. The actual level of Log that is logged +// is handled by the implementation. +type Logger interface { + Debug(msg string, args ...any) + Error(msg string, args ...any) + Info(msg string, args ...any) + Warn(msg string, args ...any) +} + +var _ Logger = (*noOpLogger)(nil) + +type noOpLogger struct{} + +func (_ noOpLogger) Debug(_ string, _ ...any) {} +func (_ noOpLogger) Error(_ string, _ ...any) {} +func (_ noOpLogger) Info(_ string, _ ...any) {} +func (_ noOpLogger) Warn(_ string, _ ...any) {} + +var _ Logger = (*slogLogger)(nil) + +type slogLogger struct { + sl *slog.Logger +} + +func NewJsonSlogLogger(level slog.Level) Logger { + return NewSlogLogger( + slog.New( + slog.NewJSONHandler( + os.Stdout, + &slog.HandlerOptions{ + Level: level, + }, + ), + ), + ) +} + +func NewSlogLogger(sl *slog.Logger) Logger { + return &slogLogger{sl: sl} +} + +func (l *slogLogger) Debug(msg string, args ...any) { + l.sl.Debug(msg, args...) +} + +func (l *slogLogger) Error(msg string, args ...any) { + l.sl.Error(msg, args...) +} + +func (l *slogLogger) Info(msg string, args ...any) { + l.sl.Info(msg, args...) +} + +func (l *slogLogger) Warn(msg string, args ...any) { + l.sl.Warn(msg, args...) +} diff --git a/scheduler.go b/scheduler.go index ff000e61..86e3ab26 100644 --- a/scheduler.go +++ b/scheduler.go @@ -3,10 +3,9 @@ package gocron import ( "context" "reflect" + "slices" "time" - "golang.org/x/exp/slices" - "github.com/google/uuid" "github.com/jonboulle/clockwork" ) @@ -39,6 +38,7 @@ type scheduler struct { clock clockwork.Clock started bool globalJobOptions []JobOption + logger Logger startCh chan struct{} startedCh chan struct{} @@ -66,6 +66,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { stopCh: make(chan struct{}), stopTimeout: time.Second * 10, singletonRunners: make(map[uuid.UUID]singletonRunner), + logger: &noOpLogger{}, jobsIDsIn: make(chan uuid.UUID), jobIDsOut: make(chan uuid.UUID), @@ -80,6 +81,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { jobs: make(map[uuid.UUID]internalJob), location: time.Local, clock: clockwork.NewRealClock(), + logger: &noOpLogger{}, newJobCh: make(chan internalJob), removeJobCh: make(chan uuid.UUID), @@ -99,6 +101,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { } go func() { + s.logger.Info("new scheduler created") for { select { case id := <-s.exec.jobIDsOut: @@ -151,6 +154,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { // about jobs. func (s *scheduler) stopScheduler() { + s.logger.Debug("stopping scheduler") s.exec.stopCh <- struct{}{} s.started = false for _, j := range s.jobs { @@ -162,6 +166,7 @@ func (s *scheduler) stopScheduler() { j.ctx, j.cancel = context.WithCancel(s.shutdownCtx) s.jobs[id] = j } + s.logger.Debug("scheduler stopped") } func (s *scheduler) selectAllJobsOutRequest(out allJobsOutRequest) { @@ -262,6 +267,7 @@ func (s *scheduler) selectRemoveJobsByTags(tags []string) { } func (s *scheduler) selectStart() { + s.logger.Debug("scheduler starting") go s.exec.start() s.started = true @@ -284,6 +290,7 @@ func (s *scheduler) selectStart() { s.jobs[id] = j } s.startedCh <- struct{}{} + s.logger.Info("scheduler started") } // ----------------------------------------------- @@ -546,6 +553,17 @@ func WithLocation(location *time.Location) SchedulerOption { } } +func WithLogger(logger Logger) SchedulerOption { + return func(s *scheduler) error { + if logger == nil { + return ErrWithLoggerNil + } + s.logger = logger + s.exec.logger = logger + return nil + } +} + // WithStopTimeout sets the amount of time the Scheduler should // wait gracefully for jobs to complete before returning when // StopJobs() or Shutdown() are called. diff --git a/scheduler_test.go b/scheduler_test.go index c2c2708d..8d4ea95e 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -3,6 +3,7 @@ package gocron import ( "context" "log" + "log/slog" "testing" "time" @@ -11,6 +12,18 @@ import ( "go.uber.org/goleak" ) +func newTestScheduler(options ...SchedulerOption) (Scheduler, error) { + // default test options + out := []SchedulerOption{ + WithLogger(NewJsonSlogLogger(slog.LevelDebug)), + WithStopTimeout(time.Second), + } + + // append any additional options 2nd to override defaults if needed + out = append(out, options...) + return NewScheduler(out...) +} + func TestScheduler_OneSecond_NoOptions(t *testing.T) { defer goleak.VerifyNone(t) cronNoOptionsCh := make(chan struct{}, 10) @@ -51,7 +64,7 @@ func TestScheduler_OneSecond_NoOptions(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s, err := NewScheduler() + s, err := newTestScheduler() require.NoError(t, err) _, err = s.NewJob(tt.jd, tt.tsk) @@ -133,7 +146,7 @@ func TestScheduler_LongRunningJobs(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s, err := NewScheduler(tt.options...) + s, err := newTestScheduler(tt.options...) require.NoError(t, err) _, err = s.NewJob(tt.jd, tt.tsk, tt.opts...) @@ -204,7 +217,7 @@ func TestScheduler_Update(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s, err := NewScheduler() + s, err := newTestScheduler() require.NoError(t, err) j, err := s.NewJob(tt.initialJob, tt.tsk) @@ -282,7 +295,7 @@ func TestScheduler_StopTimeout(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { testDoneCtx, cancel := context.WithCancel(context.Background()) - s, err := NewScheduler( + s, err := newTestScheduler( WithStopTimeout(time.Millisecond * 100), ) require.NoError(t, err) @@ -304,7 +317,7 @@ func TestScheduler_Shutdown(t *testing.T) { goleak.VerifyNone(t) t.Run("start, stop, start, shutdown", func(t *testing.T) { - s, err := NewScheduler( + s, err := newTestScheduler( WithStopTimeout(time.Second), ) require.NoError(t, err) @@ -334,13 +347,13 @@ func TestScheduler_Shutdown(t *testing.T) { }) t.Run("calling Job methods after shutdown errors", func(t *testing.T) { - s, err := NewScheduler( + s, err := newTestScheduler( WithStopTimeout(time.Second), ) require.NoError(t, err) j, err := s.NewJob( DurationJob( - 5*time.Millisecond, + 100*time.Millisecond, ), NewTask( func() {}, @@ -448,7 +461,7 @@ func TestScheduler_NewJob(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s, err := NewScheduler() + s, err := newTestScheduler() require.NoError(t, err) _, err = s.NewJob(tt.jd, tt.tsk, tt.opts...) @@ -713,7 +726,7 @@ func TestScheduler_NewJobErrors(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s, err := NewScheduler() + s, err := newTestScheduler() require.NoError(t, err) _, err = s.NewJob(tt.jd, NewTask(func() {}), tt.opts...) @@ -745,7 +758,7 @@ func TestScheduler_Singleton(t *testing.T) { t.Run(tt.name, func(t *testing.T) { jobRanCh := make(chan struct{}, 10) - s, err := NewScheduler( + s, err := newTestScheduler( WithStopTimeout(1 * time.Second), ) require.NoError(t, err) @@ -817,9 +830,9 @@ func TestScheduler_LimitMode(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s, err := NewScheduler( + s, err := newTestScheduler( WithLimitConcurrentJobs(tt.limit, tt.limitMode), - WithStopTimeout(time.Second), + WithStopTimeout(2*time.Second), ) require.NoError(t, err) diff --git a/util.go b/util.go index 8d9cfc7e..f5390787 100644 --- a/util.go +++ b/util.go @@ -3,12 +3,12 @@ package gocron import ( "context" "reflect" + "slices" "sync" "time" "github.com/google/uuid" "golang.org/x/exp/maps" - "golang.org/x/exp/slices" ) func callJobFuncWithParams(jobFunc any, params ...any) error {