Skip to content

Commit

Permalink
add a logger
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnRoesler committed Nov 6, 2023
1 parent b689a51 commit 3fc9116
Show file tree
Hide file tree
Showing 11 changed files with 145 additions and 22 deletions.
1 change: 0 additions & 1 deletion .github/workflows/go_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ jobs:
matrix:
go-version:
- "1.21"
- "1.20"
name: lint and test
runs-on: ubuntu-latest
steps:
Expand Down
1 change: 1 addition & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var (
ErrWithDistributedElector = fmt.Errorf("gocron: WithDistributedElector: elector must not be nil")
ErrWithClockNil = fmt.Errorf("gocron: WithClock: clock must not be nil")
ErrWithLocationNil = fmt.Errorf("gocron: WithLocation: location must not be nil")
ErrWithLoggerNil = fmt.Errorf("gocron: WithLogger: logger must not be nil")
ErrWithNameEmpty = fmt.Errorf("gocron: WithName: name must not be empty")
ErrWithStartDateTimePast = fmt.Errorf("gocron: WithStartDateTime: start must not be in the past")
)
Expand Down
9 changes: 9 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gocron_test

import (
"fmt"
"log/slog"
"sync"
"time"

Expand Down Expand Up @@ -525,6 +526,14 @@ func ExampleWithLocation() {
)
}

func ExampleWithLogger() {
_, _ = NewScheduler(
WithLogger(
NewJsonSlogLogger(slog.LevelInfo),
),
)
}

func ExampleWithName() {
s, _ := NewScheduler()

Expand Down
22 changes: 19 additions & 3 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gocron

import (
"context"
"strconv"
"sync"
"time"

Expand All @@ -11,6 +12,7 @@ import (
type executor struct {
ctx context.Context
cancel context.CancelFunc
logger Logger
stopCh chan struct{}
jobsIDsIn chan uuid.UUID
jobIDsOut chan uuid.UUID
Expand Down Expand Up @@ -38,6 +40,8 @@ type limitModeConfig struct {
}

func (e *executor) start() {
e.logger.Debug("executor started")

// creating the executor's context here as the executor
// is the only goroutine that should access this context
// any other uses within the executor should create a context
Expand Down Expand Up @@ -69,7 +73,7 @@ func (e *executor) start() {
// if not, spin up the required number i.e. limit!
e.limitMode.started = true
for i := e.limitMode.limit; i > 0; i-- {
go e.limitModeRunner(e.limitMode.in, e.limitMode.done, e.limitMode.mode, e.limitMode.rescheduleLimiter)
go e.limitModeRunner("limitMode-"+strconv.Itoa(int(i)), e.limitMode.in, e.limitMode.done, e.limitMode.mode, e.limitMode.rescheduleLimiter)
}
}

Expand Down Expand Up @@ -129,7 +133,7 @@ func (e *executor) start() {
runner.rescheduleLimiter = make(chan struct{}, 1)
}
e.singletonRunners[id] = runner
go e.limitModeRunner(runner.in, runner.done, j.singletonLimitMode, runner.rescheduleLimiter)
go e.limitModeRunner("singleton-"+id.String(), runner.in, runner.done, j.singletonLimitMode, runner.rescheduleLimiter)
}

if j.singletonLimitMode == LimitModeReschedule {
Expand Down Expand Up @@ -163,6 +167,7 @@ func (e *executor) start() {
}
}()
case <-e.stopCh:
e.logger.Debug("stopping executor")
// we've been asked to stop. This is either because the scheduler has been told
// to stop all jobs or the scheduler has been asked to completely shutdown.
//
Expand All @@ -181,20 +186,24 @@ func (e *executor) start() {

// wait for standard jobs to complete
go func() {
e.logger.Debug("waiting for standard jobs to complete")
go func() {
// this is done in a separate goroutine, so we aren't
// blocked by the WaitGroup's Wait call in the event
// that the waiter context is cancelled.
// This particular goroutine could leak in the event that
// some long-running standard job doesn't complete.
standardJobsWg.Wait()
e.logger.Debug("standard jobs completed")
waitForJobs <- struct{}{}
}()
<-waiterCtx.Done()

}()

// wait for per job singleton limit mode runner jobs to complete
go func() {
e.logger.Debug("waiting for singleton jobs to complete")
For:
for _, sr := range e.singletonRunners {
select {
Expand All @@ -207,12 +216,14 @@ func (e *executor) start() {
case <-waiterCtx.Done():
return
default:
e.logger.Debug("singleton jobs completed")
waitForSingletons <- struct{}{}
}
}()

// wait for limit mode runners to complete
go func() {
e.logger.Debug("waiting for limit mode jobs to complete")
if e.limitMode != nil {
For:
for i := e.limitMode.limit; i > 0; i-- {
Expand All @@ -227,6 +238,7 @@ func (e *executor) start() {
case <-waiterCtx.Done():
return
default:
e.logger.Debug("limit mode jobs completed")
waitForLimitMode <- struct{}{}
}
}()
Expand All @@ -252,16 +264,19 @@ func (e *executor) start() {
}
if count < 3 {
e.done <- ErrStopTimedOut
e.logger.Debug("executor stopped - timed out")
} else {
e.done <- nil
e.logger.Debug("executor stopped")
}
waiterCancel()
return
}
}
}

func (e *executor) limitModeRunner(in chan uuid.UUID, done chan struct{}, limitMode LimitMode, rescheduleLimiter chan struct{}) {
func (e *executor) limitModeRunner(name string, in chan uuid.UUID, done chan struct{}, limitMode LimitMode, rescheduleLimiter chan struct{}) {
e.logger.Debug("limitModeRunner starting", "name", name)
for {
select {
case id := <-in:
Expand All @@ -280,6 +295,7 @@ func (e *executor) limitModeRunner(in chan uuid.UUID, done chan struct{}, limitM
}
}
case <-e.ctx.Done():
e.logger.Debug("limitModeRunner shutting down", "name", name)
select {
case done <- struct{}{}:
default:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/go-co-op/gocron/v2

go 1.20
go 1.21

require (
github.com/google/uuid v1.4.0
Expand Down
2 changes: 1 addition & 1 deletion job.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (
"errors"
"fmt"
"math/rand"
"slices"
"strings"
"time"

"github.com/google/uuid"
"github.com/jonboulle/clockwork"
"github.com/robfig/cron/v3"
"golang.org/x/exp/slices"
)

// internalJob stores the information needed by the scheduler
Expand Down
2 changes: 1 addition & 1 deletion job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func TestJob_LastRun(t *testing.T) {
testTime := time.Date(2000, 1, 1, 0, 0, 0, 0, time.Local)
fakeClock := clockwork.NewFakeClockAt(testTime)

s, err := NewScheduler(
s, err := newTestScheduler(
WithClock(fakeClock),
)
require.NoError(t, err)
Expand Down
67 changes: 67 additions & 0 deletions logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package gocron

import (
"log/slog"
"os"
)

// Logger is the interface that wraps the basic logging methods
// used by gocron. The methods are modeled after the standard
// library slog package. The default logger is a no-op logger.
// To enable logging, use one of the provided New*Logger functions
// or implement your own Logger. The actual level of Log that is logged
// is handled by the implementation.
type Logger interface {
Debug(msg string, args ...any)
Error(msg string, args ...any)
Info(msg string, args ...any)
Warn(msg string, args ...any)
}

var _ Logger = (*noOpLogger)(nil)

type noOpLogger struct{}

func (_ noOpLogger) Debug(_ string, _ ...any) {}
func (_ noOpLogger) Error(_ string, _ ...any) {}
func (_ noOpLogger) Info(_ string, _ ...any) {}
func (_ noOpLogger) Warn(_ string, _ ...any) {}

var _ Logger = (*slogLogger)(nil)

type slogLogger struct {
sl *slog.Logger
}

func NewJsonSlogLogger(level slog.Level) Logger {
return NewSlogLogger(
slog.New(
slog.NewJSONHandler(
os.Stdout,
&slog.HandlerOptions{
Level: level,
},
),
),
)
}

func NewSlogLogger(sl *slog.Logger) Logger {
return &slogLogger{sl: sl}
}

func (l *slogLogger) Debug(msg string, args ...any) {
l.sl.Debug(msg, args...)
}

func (l *slogLogger) Error(msg string, args ...any) {
l.sl.Error(msg, args...)
}

func (l *slogLogger) Info(msg string, args ...any) {
l.sl.Info(msg, args...)
}

func (l *slogLogger) Warn(msg string, args ...any) {
l.sl.Warn(msg, args...)
}
22 changes: 20 additions & 2 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ package gocron
import (
"context"
"reflect"
"slices"
"time"

"golang.org/x/exp/slices"

"github.com/google/uuid"
"github.com/jonboulle/clockwork"
)
Expand Down Expand Up @@ -39,6 +38,7 @@ type scheduler struct {
clock clockwork.Clock
started bool
globalJobOptions []JobOption
logger Logger

startCh chan struct{}
startedCh chan struct{}
Expand Down Expand Up @@ -66,6 +66,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
stopCh: make(chan struct{}),
stopTimeout: time.Second * 10,
singletonRunners: make(map[uuid.UUID]singletonRunner),
logger: &noOpLogger{},

jobsIDsIn: make(chan uuid.UUID),
jobIDsOut: make(chan uuid.UUID),
Expand All @@ -80,6 +81,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
jobs: make(map[uuid.UUID]internalJob),
location: time.Local,
clock: clockwork.NewRealClock(),
logger: &noOpLogger{},

newJobCh: make(chan internalJob),
removeJobCh: make(chan uuid.UUID),
Expand All @@ -99,6 +101,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
}

go func() {
s.logger.Info("new scheduler created")
for {
select {
case id := <-s.exec.jobIDsOut:
Expand Down Expand Up @@ -151,6 +154,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
// about jobs.

func (s *scheduler) stopScheduler() {
s.logger.Debug("stopping scheduler")
s.exec.stopCh <- struct{}{}
s.started = false
for _, j := range s.jobs {
Expand All @@ -162,6 +166,7 @@ func (s *scheduler) stopScheduler() {
j.ctx, j.cancel = context.WithCancel(s.shutdownCtx)
s.jobs[id] = j
}
s.logger.Debug("scheduler stopped")
}

func (s *scheduler) selectAllJobsOutRequest(out allJobsOutRequest) {
Expand Down Expand Up @@ -262,6 +267,7 @@ func (s *scheduler) selectRemoveJobsByTags(tags []string) {
}

func (s *scheduler) selectStart() {
s.logger.Debug("scheduler starting")
go s.exec.start()

s.started = true
Expand All @@ -284,6 +290,7 @@ func (s *scheduler) selectStart() {
s.jobs[id] = j
}
s.startedCh <- struct{}{}
s.logger.Info("scheduler started")
}

// -----------------------------------------------
Expand Down Expand Up @@ -546,6 +553,17 @@ func WithLocation(location *time.Location) SchedulerOption {
}
}

func WithLogger(logger Logger) SchedulerOption {
return func(s *scheduler) error {
if logger == nil {
return ErrWithLoggerNil
}
s.logger = logger
s.exec.logger = logger
return nil
}
}

// WithStopTimeout sets the amount of time the Scheduler should
// wait gracefully for jobs to complete before returning when
// StopJobs() or Shutdown() are called.
Expand Down
Loading

0 comments on commit 3fc9116

Please sign in to comment.