Skip to content

Commit

Permalink
Queue lock by key (by Josh) (#30)
Browse files Browse the repository at this point in the history
* Lock by key when executing cron queue

Use a per job key mutex to lock execution of a trigger. A per key lock
prevents writes of new jobs from being blocked by actively executing
triggers.

Signed-off-by: joshvanl <me@joshvanl.dev>

* Update go.mod

Signed-off-by: joshvanl <me@joshvanl.dev>

* Lint

Signed-off-by: joshvanl <me@joshvanl.dev>

* Lint

Signed-off-by: joshvanl <me@joshvanl.dev>

* Fix go.mod + update dapr/kit

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Fix .golangci.yaml to match pinned lint version.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Pin go version without using toolchain.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Fix a bunch of lint after go update.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

---------

Signed-off-by: joshvanl <me@joshvanl.dev>
Signed-off-by: Artur Souza <asouza.pro@gmail.com>
Co-authored-by: joshvanl <me@joshvanl.dev>
  • Loading branch information
artursouza and JoshVanL authored Jul 22, 2024
1 parent a46af5f commit 30016dc
Show file tree
Hide file tree
Showing 16 changed files with 313 additions and 395 deletions.
1 change: 1 addition & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ linters:
- gci
- funlen
- maintidx
- containedctx
linters-settings:
goimports:
local-prefixes: github.com/diagridio
2 changes: 1 addition & 1 deletion api.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (c *cron) Delete(ctx context.Context, name string) error {
return err
}

return c.queue.Dequeue(c.key.JobKey(name))
return nil
}

// validateName validates the name of a job.
Expand Down
7 changes: 4 additions & 3 deletions api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,8 @@ func Test_validateName(t *testing.T) {
}

for _, test := range tests {
test := test
name := test.name
expErr := test.expErr
t.Run(test.name, func(t *testing.T) {
t.Parallel()
c, err := New(Options{
Expand All @@ -506,8 +507,8 @@ func Test_validateName(t *testing.T) {
TriggerFn: func(context.Context, *api.TriggerRequest) bool { return true },
})
require.NoError(t, err)
err = c.(*cron).validateName(test.name)
assert.Equal(t, test.expErr, err != nil, "%v", err)
err = c.(*cron).validateName(name)
assert.Equal(t, expErr, err != nil, "%v", err)
})
}
}
41 changes: 24 additions & 17 deletions cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ type cron struct {
wg sync.WaitGroup
// queueLock prevents an informed schedule from overwriting a job as it is
// being triggered, i.e. prevent a PUT and mid-trigger race condition.
queueLock sync.RWMutex
queueLock concurrency.MutexMap[string]
}

// New creates a new cron instance.
Expand Down Expand Up @@ -178,6 +178,7 @@ func New(opts Options) (Interface, error) {
readyCh: make(chan struct{}),
closeCh: make(chan struct{}),
errCh: make(chan error),
queueLock: concurrency.NewMutexMap[string](),
}, nil
}

Expand All @@ -190,12 +191,20 @@ func (c *cron) Run(ctx context.Context) error {

c.queue = queue.NewProcessor[string, *counter.Counter](
func(counter *counter.Counter) {
c.queueLock.RLock()
c.queueLock.Lock(counter.Key())
if ctx.Err() != nil {
c.queueLock.Unlock(counter.Key())
return
}

c.wg.Add(1)
go func() {
defer c.queueLock.RUnlock()
defer c.wg.Done()
c.handleTrigger(ctx, counter)
if c.handleTrigger(ctx, counter) {
c.queueLock.Unlock(counter.Key())
} else {
c.queueLock.DeleteUnlock(counter.Key())
}
}()
},
).WithClock(c.clock)
Expand Down Expand Up @@ -259,7 +268,8 @@ func (c *cron) Run(ctx context.Context) error {
}

// handleTrigger handles triggering a schedule job.
func (c *cron) handleTrigger(ctx context.Context, counter *counter.Counter) {
// Returns true if the job is being re-enqueued, false otherwise.
func (c *cron) handleTrigger(ctx context.Context, counter *counter.Counter) bool {
if !c.triggerFn(ctx, counter.TriggerRequest()) {
// If the trigger function returns false, i.e. failed client side,
// re-enqueue the job immediately.
Expand All @@ -269,7 +279,7 @@ func (c *cron) handleTrigger(ctx context.Context, counter *counter.Counter) {
case c.errCh <- err:
}
}
return
return true
}

ok, err := counter.Trigger(ctx)
Expand All @@ -283,29 +293,26 @@ func (c *cron) handleTrigger(ctx context.Context, counter *counter.Counter) {
case c.errCh <- err:
}
}

return true
}

return false
}

// handleInformerEvent handles an etcd informed event.
// TODO: @joshvanl: add a safe per key read lock to prevent locking all
// triggers and an unrelated write. Must be able to handle a key being
// de-queued and unlocked (deleted) whilst an Add schedule is waiting on the
// lock, and visa versa. I don't think there is much if any we gain though as
// we _always_ hack to lock somewhere..
func (c *cron) handleInformerEvent(ctx context.Context, e *informer.Event) error {
c.queueLock.Lock()
defer c.queueLock.Unlock()

select {
case <-ctx.Done():
if ctx.Err() != nil {
return ctx.Err()
default:
}

c.queueLock.Lock(string(e.Key))
if e.IsPut {
defer c.queueLock.Unlock(string(e.Key))
return c.schedule(ctx, c.key.JobName(e.Key), e.Job)
}

defer c.queueLock.DeleteUnlock(string(e.Key))
return c.queue.Dequeue(string(e.Key))
}

Expand Down
Loading

0 comments on commit 30016dc

Please sign in to comment.