Skip to content

Commit

Permalink
test subscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
mbretter committed Oct 29, 2024
1 parent 410a251 commit 225e586
Show file tree
Hide file tree
Showing 6 changed files with 458 additions and 24 deletions.
1 change: 1 addition & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ packages:
github.com/mbretter/go-mongodb-queue:
interfaces:
DbInterface:
ChangeStreamInterface:
174 changes: 174 additions & 0 deletions changestreaminterface_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@ type DbInterface interface {
FindOneAndUpdate(filter interface{}, update interface{}, opts ...*options.FindOneAndUpdateOptions) *mongo.SingleResult
UpdateOne(filter interface{}, update interface{}) error
UpdateMany(filter interface{}, update interface{}) error
Watch(pipeline interface{}) (*mongo.ChangeStream, error)
Watch(pipeline interface{}) (ChangeStreamInterface, error)
CreateIndexes(index []mongo.IndexModel) error
Context() context.Context
}

type ChangeStreamInterface interface {
Next(ctx context.Context) bool
Decode(v interface{}) error
Close(ctx context.Context) error
}

type StdDb struct {
context context.Context
collection *mongo.Collection
Expand Down Expand Up @@ -64,7 +70,7 @@ func (d *StdDb) UpdateMany(filter interface{}, update interface{}) error {
return err
}

func (d *StdDb) Watch(pipeline interface{}) (*mongo.ChangeStream, error) {
func (d *StdDb) Watch(pipeline interface{}) (ChangeStreamInterface, error) {
return d.collection.Watch(d.context, pipeline)
}

Expand Down
14 changes: 7 additions & 7 deletions dbinterface_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 19 additions & 15 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ type Task struct {
Meta Meta
}

type event struct {
Task Task `bson:"fullDocument"`
}

func NewQueue(db DbInterface) *Queue {
queue := Queue{
db: db,
Expand Down Expand Up @@ -92,7 +96,7 @@ func (q *Queue) GetNext(topic string) (*Task, error) {
"$expr": bson.M{"$lt": bson.A{"$tries", "$maxtries"}},
},
bson.M{
"$set": bson.M{"state": StateRunning, "meta.dispatched": time.Now()},
"$set": bson.M{"state": StateRunning, "meta.dispatched": nowFunc()},
"$inc": bson.M{"tries": 1},
},
options.FindOneAndUpdate().SetSort(bson.D{{"meta.scheduled", 1}}),
Expand Down Expand Up @@ -140,36 +144,36 @@ func (q *Queue) Subscribe(topic string, cb Callback) error {
}

for stream.Next(q.db.Context()) {
var event struct {
Task Task `bson:"fullDocument"`
}
var evt event

if err := stream.Decode(&event); err != nil {
if err := stream.Decode(&evt); err != nil {
continue
}

task := evt.Task

// already processed
if event.Task.Meta.Created.Before(processedUntil) {
if task.Meta.Created.Before(processedUntil) {
continue
}

event.Task.State = StateRunning
now := time.Now()
event.Task.Meta.Dispatched = &now
task.State = StateRunning
now := nowFunc()
task.Meta.Dispatched = &now

err := q.db.UpdateOne(
bson.M{"_id": event.Task.Id},
bson.M{"_id": task.Id},
bson.M{"$set": bson.M{
"state": event.Task.State,
"meta.dispatched": event.Task.Meta.Dispatched,
"state": task.State,
"meta.dispatched": task.Meta.Dispatched,
}})

if err != nil {
_ = q.Err(event.Task.Id.Hex(), err)
_ = q.Err(task.Id.Hex(), err)
continue
}

cb(event.Task)
cb(task)
}

return nil
Expand Down Expand Up @@ -199,7 +203,7 @@ func (q *Queue) Err(id string, err error) error {
bson.M{"_id": oId},
bson.M{"$set": bson.M{
"state": StateError,
"meta.completed": time.Now(),
"meta.completed": nowFunc(),
"message": err.Error()},
})
}
Expand Down
Loading

0 comments on commit 225e586

Please sign in to comment.