From 10cbd66ab112e6d78545c9428ea35e202807f740 Mon Sep 17 00:00:00 2001 From: Michael Bretterklieber Date: Mon, 4 Nov 2024 17:02:03 +0100 Subject: [PATCH] test Selfcare --- cmd/demo/main.go | 2 +- queue.go | 26 ++++++++++------ queue_test.go | 81 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 99 insertions(+), 10 deletions(-) diff --git a/cmd/demo/main.go b/cmd/demo/main.go index 9088039..f9b53dd 100644 --- a/cmd/demo/main.go +++ b/cmd/demo/main.go @@ -107,7 +107,7 @@ func main() { } if *selfcare { - err := qu.Selfcare(nil) + err := qu.Selfcare("") if err != nil { log.Fatal(err) } diff --git a/queue.go b/queue.go index b2d6c61..068a804 100644 --- a/queue.go +++ b/queue.go @@ -208,18 +208,18 @@ func (q *Queue) Err(id string, err error) error { }) } -func (q *Queue) Selfcare(topic *string) error { +func (q *Queue) Selfcare(topic string) error { // re-schedule long-running tasks // this only happens if the processor could not ack the task, i.e. the application crashed query := bson.M{ "state": StateRunning, - "meta.dispatched": bson.M{"$lt": time.Now().Add(DefaultTimeout)}, + "meta.dispatched": bson.M{"$lt": nowFunc().Add(DefaultTimeout)}, } - if topic != nil { - query["topic"] = *topic + if len(topic) > 0 { + query["topic"] = topic } - _ = q.db.UpdateMany( + err1 := q.db.UpdateMany( query, bson.M{"$set": bson.M{ "state": StatePending, @@ -231,17 +231,25 @@ func (q *Queue) Selfcare(topic *string) error { "state": StatePending, "$expr": bson.M{"$gte": bson.A{"$tries", "$maxtries"}}, } - if topic != nil { - query["topic"] = *topic + if len(topic) > 0 { + query["topic"] = topic } - _ = q.db.UpdateMany( + err2 := q.db.UpdateMany( query, bson.M{"$set": bson.M{ "state": StateError, - "meta.completed": time.Now()}, + "meta.completed": nowFunc()}, }) + if err1 != nil { + return err1 + } + + if err2 != nil { + return err2 + } + return nil } diff --git a/queue_test.go b/queue_test.go index 6c51730..fe28832 100644 --- a/queue_test.go +++ b/queue_test.go @@ -415,3 +415,84 @@ func TestQueue_Err(t *testing.T) { }) } } + +func TestQueue_Selftest(t *testing.T) { + setNowFunc(func() time.Time { + t, _ := time.Parse(time.DateTime, "2024-11-04 15:04:05") + return t + }) + + tests := []struct { + name string + topic string + error1 error + error2 error + }{ + { + name: "Success", + topic: "", + }, + { + name: "Success with topic", + topic: "user.delete", + }, + { + name: "Reschedule failed", + topic: "", + error1: errors.New("FindOneAndUpdate1"), + }, + { + name: "Set maxtries to error failed", + topic: "", + error2: errors.New("FindOneAndUpdate2"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + dbMock := NewDbInterfaceMock(t) + + q := NewQueue(dbMock) + + query1 := bson.M{ + "state": StateRunning, + "meta.dispatched": bson.M{"$lt": nowFunc().Add(DefaultTimeout)}, + } + + if tt.topic != "" { + query1["topic"] = tt.topic + } + + dbMock.EXPECT().UpdateMany(query1, + bson.M{"$set": bson.M{ + "state": StatePending, + "meta.dispatched": nil}, + }).Return(tt.error1) + + query2 := bson.M{ + "state": StatePending, + "$expr": bson.M{"$gte": bson.A{"$tries", "$maxtries"}}, + } + + if tt.topic != "" { + query2["topic"] = tt.topic + } + + dbMock.EXPECT().UpdateMany(query2, + bson.M{"$set": bson.M{ + "state": StateError, + "meta.completed": nowFunc()}, + }).Return(tt.error2) + + err := q.Selfcare(tt.topic) + + if tt.error1 != nil { + assert.Equal(t, tt.error1, err) + } else if tt.error2 != nil { + assert.Equal(t, tt.error2, err) + } else { + assert.Equal(t, nil, err) + } + }) + } +}