Skip to content

Commit

Permalink
Fix upsert of Jobs on cluster (#66)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshVanL authored Jan 21, 2025
1 parent e29e00e commit 6874185
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 6 deletions.
6 changes: 5 additions & 1 deletion internal/informer/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,11 @@ func (i *Informer) handleEvent(ev *clientv3.Event) (*Event, error) {
}

if !i.part.IsJobManaged(job.GetPartitionId()) {
return nil, nil
return &Event{
IsPut: false,
Key: kv.Key,
Job: nil,
}, nil
}

if !isPut && ev.Kv != nil {
Expand Down
23 changes: 19 additions & 4 deletions internal/informer/informer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,14 @@ func Test_Run(t *testing.T) {
require.NoError(t, err)

for i := range 2 {
select {
case ev := <-ch:
assert.False(t, ev.IsPut)
assert.Nil(t, ev.Job)
case <-time.After(time.Second):
t.Fatalf("timed out waiting for event %d", i)
}

select {
case ev := <-ch:
assert.True(t, ev.IsPut)
Expand Down Expand Up @@ -224,9 +232,13 @@ func Test_Run(t *testing.T) {
require.NoError(t, err)

expEvents := []*Event{
{IsPut: false, Job: nil, Key: []byte("abc/jobs/1")},
{IsPut: true, Job: &jobs[0], Key: []byte("abc/jobs/2")},
{IsPut: false, Job: nil, Key: []byte("abc/jobs/3")},
{IsPut: true, Job: &jobs[1], Key: []byte("abc/jobs/4")},
{IsPut: false, Job: nil, Key: []byte("abc/jobs/1")},
{IsPut: false, Job: &jobs[0], Key: []byte("abc/jobs/2")},
{IsPut: false, Job: nil, Key: []byte("abc/jobs/5")},
{IsPut: true, Job: &jobs[2], Key: []byte("abc/jobs/6")},
{IsPut: false, Job: &jobs[1], Key: []byte("abc/jobs/4")},
}
Expand All @@ -243,7 +255,7 @@ func Test_Run(t *testing.T) {
select {
case ev := <-ch:
t.Fatalf("unexpected event: %v", ev)
default:
case <-time.After(time.Second):
}
})
}
Expand Down Expand Up @@ -285,7 +297,7 @@ func Test_handleEvent(t *testing.T) {
expEvent: nil,
expErr: true,
},
"if job is for different partition, return nil": {
"if job is for different partition, return delete event": {
ev: &clientv3.Event{
Type: clientv3.EventTypePut,
Kv: &mvccpb.KeyValue{
Expand All @@ -294,8 +306,11 @@ func Test_handleEvent(t *testing.T) {
},
},
expCollectorPops: []string{"abc/counters/2"},
expEvent: nil,
expErr: false,
expEvent: &Event{
IsPut: false,
Key: []byte("abc/jobs/2"),
},
expErr: false,
},
"if job is for partition, return job on PUT": {
ev: &clientv3.Event{
Expand Down
5 changes: 4 additions & 1 deletion internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,11 @@ func (q *Queue) HandleInformerEvent(ctx context.Context, e *informer.Event) erro
}

if _, ok := q.counterCache.LoadAndDelete(string(e.Key)); ok {
q.collector.Push(q.key.CounterKey(jobName))
q.queue.Dequeue(string(e.Key))

if e.Job != nil {
q.collector.Push(q.key.CounterKey(jobName))
}
}

return nil
Expand Down
25 changes: 25 additions & 0 deletions tests/suite/upsert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,28 @@ func Test_upsert(t *testing.T) {
require.NoError(t, err)
assert.Empty(t, resp.Kvs)
}

func Test_upsert_duetime(t *testing.T) {
t.Parallel()

cron := integration.NewBase(t, 3)

job := &api.Job{
DueTime: ptr.Of("1s"),
Schedule: ptr.Of("@every 5s"),
}
now := time.Now().Format(time.RFC3339)
require.NoError(t, cron.API().Add(cron.Context(), now, job))

time.Sleep(time.Second * 2)
assert.Equal(t, 1, cron.Triggered())

job = &api.Job{
DueTime: ptr.Of("20s"),
Schedule: ptr.Of("@every 5s"),
}
require.NoError(t, cron.API().Add(cron.Context(), now, job))

time.Sleep(time.Second * 5)
assert.Equal(t, 1, cron.Triggered())
}

0 comments on commit 6874185

Please sign in to comment.