Skip to content

Commit

Permalink
docs
Browse files Browse the repository at this point in the history
  • Loading branch information
mbretter committed Nov 12, 2024
1 parent 9c96e13 commit 28a1e11
Showing 1 changed file with 13 additions and 0 deletions.
13 changes: 13 additions & 0 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,15 @@ type PublishOptions struct {
Tries int
}

// NewPublishOptions creates a new PublishOptions with default settings.
func NewPublishOptions() *PublishOptions {
return &PublishOptions{
MaxTries: 0,
Tries: -1,
}
}

// SetMaxTries sets the maximum number of retry attempts for publishing. Returns the updated PublishOptions instance.
func (p *PublishOptions) SetMaxTries(maxTries uint) *PublishOptions {
p.MaxTries = maxTries
return p
Expand Down Expand Up @@ -128,6 +130,7 @@ func (q *Queue) Publish(topic string, payload any, opts ...*PublishOptions) (*Ta
return &t, nil
}

// GetNext retrieves the next item from the queue for the given topic, marks it as running, and increments its tries count.
func (q *Queue) GetNext(topic string) (*Task, error) {
t := Task{}
res := q.db.FindOneAndUpdate(bson.M{
Expand All @@ -153,6 +156,7 @@ func (q *Queue) GetNext(topic string) (*Task, error) {
return &t, nil
}

// GetNextById retrieves the next pending task by its ID, transitions it to the running state, and increments its tries count.
func (q *Queue) GetNextById(id primitive.ObjectID) (*Task, error) {
t := Task{}
res := q.db.FindOneAndUpdate(bson.M{
Expand All @@ -178,12 +182,15 @@ func (q *Queue) GetNextById(id primitive.ObjectID) (*Task, error) {
return &t, nil
}

// Reschedule republishes a task to the queue, retaining its topic, payload, tries, and maxTries settings.
func (q *Queue) Reschedule(task *Task) (*Task, error) {
return q.Publish(task.Topic, task.Payload, NewPublishOptions().setTries(task.Tries).SetMaxTries(task.MaxTries))
}

type Callback func(t Task)

// Subscribe listens for new tasks on a given topic and calls the provided callback when a new task is available.
// It processes unprocessed tasks scheduled before starting the watch and continuously monitors for new tasks.
func (q *Queue) Subscribe(topic string, cb Callback) error {
pipeline := bson.D{{"$match", bson.D{
{"operationType", "insert"},
Expand Down Expand Up @@ -240,6 +247,7 @@ func (q *Queue) Subscribe(topic string, cb Callback) error {
return nil
}

// Ack acknowledges a task completion by its ID, updating its state to "completed" and setting the completion timestamp.
func (q *Queue) Ack(id string) error {
oId, err := primitive.ObjectIDFromHex(id)
if err != nil {
Expand All @@ -254,6 +262,7 @@ func (q *Queue) Ack(id string) error {
}})
}

// Err updates the state of a task to "error" by its ID, setting the completion time and storing the error message.
func (q *Queue) Err(id string, err error) error {
oId, e := primitive.ObjectIDFromHex(id)
if e != nil {
Expand All @@ -269,6 +278,9 @@ func (q *Queue) Err(id string, err error) error {
})
}

// Selfcare re-schedules long-running tasks and sets tasks exceeding max tries to error state.
// It updates tasks in an ongoing state that haven't been acknowledged within a specific timeout period.
// If timeout is zero, the default timeout value is used. Optionally, tasks can be filtered by topic.
func (q *Queue) Selfcare(topic string, timeout time.Duration) error {
// re-schedule long-running tasks
// this only happens if the processor could not ack the task, i.e. the application crashed
Expand Down Expand Up @@ -319,6 +331,7 @@ func (q *Queue) Selfcare(topic string, timeout time.Duration) error {
return nil
}

// CreateIndexes creates MongoDB indexes for the task collection to improve query performance and manage TTL for completed tasks.
func (q *Queue) CreateIndexes() error {
err := q.db.CreateIndexes([]mongo.IndexModel{{
Keys: bson.D{{"topic", 1}, {"state", 1}},
Expand Down

0 comments on commit 28a1e11

Please sign in to comment.