Skip to content

Commit

Permalink
Feat: Decouple queue (#1)
Browse files Browse the repository at this point in the history
Hide the implementation of the queue behind an interface
and allow custom implementations to be used instead.
  • Loading branch information
romshark authored Jun 27, 2021
1 parent e9c1f3f commit 13921c4
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 62 deletions.
82 changes: 82 additions & 0 deletions sched/internal/queue/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package queue

import (
"github.com/huandu/skiplist"
"github.com/segmentio/ksuid"
)

func New() Queue {
return Queue{
l: skiplist.New(
skiplist.GreaterThanFunc(func(a, b interface{}) int {
s1, s2 := a.(ksuid.KSUID).String(), b.(ksuid.KSUID).String()
if s1 > s2 {
return 1
} else if s1 < s2 {
return -1
}
return 0
}),
),
}
}

type Queue struct {
l *skiplist.SkipList
}

func (q Queue) Set(id ksuid.KSUID, fn func()) (setAtFront bool) {
e := q.l.Set(id, job{ID: id, Fn: fn})
return e.Prev() == nil
}

func (q Queue) Has(id ksuid.KSUID) bool {
return q.l.Get(id) != nil
}

func (q Queue) Front() (ksuid.KSUID, func()) {
if e := q.l.Front(); e != nil {
v := e.Value.(job)
return v.ID, v.Fn
}
return ksuid.KSUID{}, nil
}

func (q Queue) Remove(id ksuid.KSUID) (removed bool) {
e := q.l.Remove(id)
return e != nil
}

func (q Queue) Len() int {
return q.l.Len()
}

func (q Queue) Scan(
after ksuid.KSUID,
fn func(ksuid.KSUID, func()) bool,
) (afterFound bool) {
var start *skiplist.Element
var zero ksuid.KSUID
if after != zero {
if start = q.l.Get(after); start == nil {
return false
}
start = start.Next()
} else {
start = q.l.Front()
}

for e := start; e != nil; e = e.Next() {
j := e.Value.(job)
if !fn(j.ID, j.Fn) {
return true
}
}
return true
}

// job is a job descriptor.
type job struct {
ID ksuid.KSUID
Fn func()
}
126 changes: 64 additions & 62 deletions sched/sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sync"
"time"

"github.com/huandu/skiplist"
"github.com/romshark/sched/sched/internal/queue"
"github.com/segmentio/ksuid"
)

Expand Down Expand Up @@ -33,11 +33,24 @@ type TimeProvider interface {
AfterFunc(Duration, func()) Timer
}

type defaultTimeProvider struct{}
type QueueReader interface {
Has(ksuid.KSUID) bool
Len() int
Scan(
after ksuid.KSUID,
fn func(ksuid.KSUID, func()) bool,
) (afterFound bool)
}

type QueueWriter interface {
Set(ksuid.KSUID, func()) (setAtFront bool)
Front() (ksuid.KSUID, func())
Remove(ksuid.KSUID) (ok bool)
}

func (p defaultTimeProvider) Now() Time { return time.Now() }
func (p defaultTimeProvider) AfterFunc(d Duration, fn func()) Timer {
return time.AfterFunc(d, fn)
type QueueReadWriter interface {
QueueReader
QueueWriter
}

// DefaultScheduler is the default Scheduler
Expand Down Expand Up @@ -95,20 +108,28 @@ func Scan(after Job, fn func(job Job, jobFn func()) bool) (ok bool) {

// New creates a new scheduler with the given time offset.
func New(timeOffset Duration) *Scheduler {
return NewWith(timeOffset, nil, nil)
}

// NewWith is similar to New but replaces the default time provider
// and queue implementation.
// If t == nil then standard time package is used by default.
// If q == nil then sched/internal/queue.Queue is used by default.
func NewWith(
timeOffset Duration,
t TimeProvider,
q QueueReadWriter,
) *Scheduler {
if t == nil {
t = timeProvider{}
}
if q == nil {
q = queue.New()
}
return &Scheduler{
provider: defaultTimeProvider{},
provider: t,
queue: q,
timeOffset: timeOffset,
queue: skiplist.New(
skiplist.GreaterThanFunc(func(a, b interface{}) int {
s1, s2 := a.(Job).String(), b.(Job).String()
if s1 > s2 {
return 1
} else if s1 < s2 {
return -1
}
return 0
}),
),
}
}

Expand All @@ -127,9 +148,10 @@ type Scheduler struct {
provider TimeProvider
lock sync.RWMutex
timeOffset Duration
queue *skiplist.SkipList
queue QueueReadWriter
scheduled struct {
job
ID Job
Fn func()
Timer
}
}
Expand All @@ -156,21 +178,19 @@ func (s *Scheduler) Schedule(in Duration, fn func()) (Job, error) {
if err != nil {
return Job{}, fmt.Errorf("generating unique KSUID: %w", err)
}
j := job{ID: id, Fn: fn}

s.lock.Lock()
defer s.lock.Unlock()

if e := s.queue.Get(id); e != nil {
if s.queue.Has(ksuid.KSUID(id)) {
return Job{}, fmt.Errorf("identifier collision: %s", id.String())
}

e := s.queue.Set(id, j)
if e.Prev() != nil {
if !s.queue.Set(ksuid.KSUID(id), fn) {
return id, nil
}

s.execute(j)
s.execute(id, fn)

return id, nil
}
Expand All @@ -181,12 +201,11 @@ func (s *Scheduler) Cancel(id Job) bool {
s.lock.Lock()
defer s.lock.Unlock()

e := s.queue.Remove(id)
if e == nil {
if !s.queue.Remove(ksuid.KSUID(id)) {
return false
}

if e.Value.(job).ID == s.scheduled.ID {
if id == s.scheduled.ID {
// Canceled currently scheduled job
s.scheduleFirstFromQueue()
}
Expand Down Expand Up @@ -216,10 +235,10 @@ func (s *Scheduler) AdvanceToNext() (newOffset, advancedBy Duration) {
return s.timeOffset, 0
}

by := s.scheduled.job.ID.Due().Sub(s.now())
by := s.scheduled.ID.Due().Sub(s.now())
s.timeOffset += by

e := s.makeExecutable(s.scheduled.job)
e := s.makeExecutable(s.scheduled.ID, s.scheduled.Fn)

if s.scheduled.Timer != nil {
s.scheduled.Timer.Stop()
Expand Down Expand Up @@ -259,34 +278,22 @@ func (s *Scheduler) Scan(
s.lock.RLock()
defer s.lock.RUnlock()

var start *skiplist.Element
var zero Job
if after != zero {
if start = s.queue.Get(after); start == nil {
return false
}
start = start.Next()
} else {
start = s.queue.Front()
}

for e := start; e != nil; e = e.Next() {
j := e.Value.(job)
if !fn(j.ID, j.Fn) {
return true
}
}
return true
return s.queue.Scan(
ksuid.KSUID(after),
func(id ksuid.KSUID, job func()) bool {
return fn(Job(id), job)
},
)
}

func (s *Scheduler) makeExecutable(j job) func() {
func (s *Scheduler) makeExecutable(id Job, fn func()) func() {
return func() {
j.Fn()
fn()

s.lock.Lock()
defer s.lock.Unlock()

s.queue.Remove(j.ID)
s.queue.Remove(ksuid.KSUID(id))

// Schedule next if any
s.scheduleFirstFromQueue()
Expand All @@ -295,14 +302,14 @@ func (s *Scheduler) makeExecutable(j job) func() {

// execute either executes the job immediately
// or schedules the job for deferred execution.
func (s *Scheduler) execute(j job) {
e := s.makeExecutable(j)
func (s *Scheduler) execute(id Job, fn func()) {
e := s.makeExecutable(id, fn)

if s.scheduled.Timer != nil {
s.scheduled.Timer.Stop()
}
s.scheduled.job = j
d := j.ID.Due().Sub(s.now())
s.scheduled.ID, s.scheduled.Fn = id, fn
d := id.Due().Sub(s.now())
if d < 1 {
// Execute immediately
s.scheduled.Timer = nil
Expand All @@ -316,8 +323,9 @@ func (s *Scheduler) execute(j job) {
// scheduleFirstFromQueue takes the first job from the queue
// and schedules it for execution.
func (s *Scheduler) scheduleFirstFromQueue() {
if n := s.queue.Front(); n != nil {
s.execute(n.Value.(job))
id, fn := s.queue.Front()
if fn != nil {
s.execute(Job(id), fn)
}
}

Expand All @@ -335,12 +343,6 @@ func newJobID(tm Time) (Job, error) {
return Job(k), nil
}

// job is a job descriptor.
type job struct {
ID Job
Fn func()
}

// Job is a unique job identifier.
type Job ksuid.KSUID

Expand Down
13 changes: 13 additions & 0 deletions sched/time_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package sched

import "time"

type timeProvider struct{}

func (p timeProvider) Now() Time {
return time.Now()
}

func (p timeProvider) AfterFunc(d Duration, fn func()) Timer {
return time.AfterFunc(d, fn)
}

0 comments on commit 13921c4

Please sign in to comment.