Skip to content

Commit

Permalink
test Selfcare
Browse files Browse the repository at this point in the history
  • Loading branch information
mbretter committed Nov 4, 2024
1 parent 3fcddd6 commit 10cbd66
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 10 deletions.
2 changes: 1 addition & 1 deletion cmd/demo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func main() {
}

if *selfcare {
err := qu.Selfcare(nil)
err := qu.Selfcare("")
if err != nil {
log.Fatal(err)
}
Expand Down
26 changes: 17 additions & 9 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,18 +208,18 @@ func (q *Queue) Err(id string, err error) error {
})
}

func (q *Queue) Selfcare(topic *string) error {
func (q *Queue) Selfcare(topic string) error {
// re-schedule long-running tasks
// this only happens if the processor could not ack the task, i.e. the application crashed
query := bson.M{
"state": StateRunning,
"meta.dispatched": bson.M{"$lt": time.Now().Add(DefaultTimeout)},
"meta.dispatched": bson.M{"$lt": nowFunc().Add(DefaultTimeout)},
}
if topic != nil {
query["topic"] = *topic
if len(topic) > 0 {
query["topic"] = topic
}

_ = q.db.UpdateMany(
err1 := q.db.UpdateMany(
query,
bson.M{"$set": bson.M{
"state": StatePending,
Expand All @@ -231,17 +231,25 @@ func (q *Queue) Selfcare(topic *string) error {
"state": StatePending,
"$expr": bson.M{"$gte": bson.A{"$tries", "$maxtries"}},
}
if topic != nil {
query["topic"] = *topic
if len(topic) > 0 {
query["topic"] = topic
}

_ = q.db.UpdateMany(
err2 := q.db.UpdateMany(
query,
bson.M{"$set": bson.M{
"state": StateError,
"meta.completed": time.Now()},
"meta.completed": nowFunc()},
})

if err1 != nil {
return err1
}

if err2 != nil {
return err2
}

return nil
}

Expand Down
81 changes: 81 additions & 0 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,3 +415,84 @@ func TestQueue_Err(t *testing.T) {
})
}
}

func TestQueue_Selftest(t *testing.T) {
setNowFunc(func() time.Time {
t, _ := time.Parse(time.DateTime, "2024-11-04 15:04:05")
return t
})

tests := []struct {
name string
topic string
error1 error
error2 error
}{
{
name: "Success",
topic: "",
},
{
name: "Success with topic",
topic: "user.delete",
},
{
name: "Reschedule failed",
topic: "",
error1: errors.New("FindOneAndUpdate1"),
},
{
name: "Set maxtries to error failed",
topic: "",
error2: errors.New("FindOneAndUpdate2"),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
dbMock := NewDbInterfaceMock(t)

q := NewQueue(dbMock)

query1 := bson.M{
"state": StateRunning,
"meta.dispatched": bson.M{"$lt": nowFunc().Add(DefaultTimeout)},
}

if tt.topic != "" {
query1["topic"] = tt.topic
}

dbMock.EXPECT().UpdateMany(query1,
bson.M{"$set": bson.M{
"state": StatePending,
"meta.dispatched": nil},
}).Return(tt.error1)

query2 := bson.M{
"state": StatePending,
"$expr": bson.M{"$gte": bson.A{"$tries", "$maxtries"}},
}

if tt.topic != "" {
query2["topic"] = tt.topic
}

dbMock.EXPECT().UpdateMany(query2,
bson.M{"$set": bson.M{
"state": StateError,
"meta.completed": nowFunc()},
}).Return(tt.error2)

err := q.Selfcare(tt.topic)

if tt.error1 != nil {
assert.Equal(t, tt.error1, err)
} else if tt.error2 != nil {
assert.Equal(t, tt.error2, err)
} else {
assert.Equal(t, nil, err)
}
})
}
}

0 comments on commit 10cbd66

Please sign in to comment.