Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue lock by key (by Josh) #30

Merged
merged 9 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ linters:
- gci
- funlen
- maintidx
- containedctx
linters-settings:
goimports:
local-prefixes: github.com/diagridio
2 changes: 1 addition & 1 deletion api.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (c *cron) Delete(ctx context.Context, name string) error {
return err
}

return c.queue.Dequeue(c.key.JobKey(name))
return nil
}

// validateName validates the name of a job.
Expand Down
7 changes: 4 additions & 3 deletions api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,8 @@ func Test_validateName(t *testing.T) {
}

for _, test := range tests {
test := test
name := test.name
expErr := test.expErr
t.Run(test.name, func(t *testing.T) {
t.Parallel()
c, err := New(Options{
Expand All @@ -506,8 +507,8 @@ func Test_validateName(t *testing.T) {
TriggerFn: func(context.Context, *api.TriggerRequest) bool { return true },
})
require.NoError(t, err)
err = c.(*cron).validateName(test.name)
assert.Equal(t, test.expErr, err != nil, "%v", err)
err = c.(*cron).validateName(name)
assert.Equal(t, expErr, err != nil, "%v", err)
})
}
}
41 changes: 24 additions & 17 deletions cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ type cron struct {
wg sync.WaitGroup
// queueLock prevents an informed schedule from overwriting a job as it is
// being triggered, i.e. prevent a PUT and mid-trigger race condition.
queueLock sync.RWMutex
queueLock concurrency.MutexMap[string]
}

// New creates a new cron instance.
Expand Down Expand Up @@ -178,6 +178,7 @@ func New(opts Options) (Interface, error) {
readyCh: make(chan struct{}),
closeCh: make(chan struct{}),
errCh: make(chan error),
queueLock: concurrency.NewMutexMap[string](),
}, nil
}

Expand All @@ -190,12 +191,20 @@ func (c *cron) Run(ctx context.Context) error {

c.queue = queue.NewProcessor[string, *counter.Counter](
func(counter *counter.Counter) {
c.queueLock.RLock()
c.queueLock.Lock(counter.Key())
if ctx.Err() != nil {
c.queueLock.Unlock(counter.Key())
return
}

c.wg.Add(1)
go func() {
defer c.queueLock.RUnlock()
defer c.wg.Done()
c.handleTrigger(ctx, counter)
if c.handleTrigger(ctx, counter) {
c.queueLock.Unlock(counter.Key())
} else {
c.queueLock.DeleteUnlock(counter.Key())
}
}()
},
).WithClock(c.clock)
Expand Down Expand Up @@ -259,7 +268,8 @@ func (c *cron) Run(ctx context.Context) error {
}

// handleTrigger handles triggering a schedule job.
func (c *cron) handleTrigger(ctx context.Context, counter *counter.Counter) {
// Returns true if the job is being re-enqueued, false otherwise.
func (c *cron) handleTrigger(ctx context.Context, counter *counter.Counter) bool {
if !c.triggerFn(ctx, counter.TriggerRequest()) {
// If the trigger function returns false, i.e. failed client side,
// re-enqueue the job immediately.
Expand All @@ -269,7 +279,7 @@ func (c *cron) handleTrigger(ctx context.Context, counter *counter.Counter) {
case c.errCh <- err:
}
}
return
return true
}

ok, err := counter.Trigger(ctx)
Expand All @@ -283,29 +293,26 @@ func (c *cron) handleTrigger(ctx context.Context, counter *counter.Counter) {
case c.errCh <- err:
}
}

return true
}

return false
}

// handleInformerEvent handles an etcd informed event.
// TODO: @joshvanl: add a safe per key read lock to prevent locking all
// triggers and an unrelated write. Must be able to handle a key being
// de-queued and unlocked (deleted) whilst an Add schedule is waiting on the
// lock, and visa versa. I don't think there is much if any we gain though as
// we _always_ hack to lock somewhere..
func (c *cron) handleInformerEvent(ctx context.Context, e *informer.Event) error {
c.queueLock.Lock()
defer c.queueLock.Unlock()

select {
case <-ctx.Done():
if ctx.Err() != nil {
return ctx.Err()
default:
}

c.queueLock.Lock(string(e.Key))
if e.IsPut {
defer c.queueLock.Unlock(string(e.Key))
return c.schedule(ctx, c.key.JobName(e.Key), e.Job)
}

defer c.queueLock.DeleteUnlock(string(e.Key))
return c.queue.Dequeue(string(e.Key))
}

Expand Down
Loading
Loading