Skip to content

Commit

Permalink
Fix commonNack logic to retain original functionality
Browse files Browse the repository at this point in the history
Adjust error message
change if-else to switch
  • Loading branch information
nickdnk committed Oct 23, 2024
1 parent cfc4ad9 commit 27e466f
Showing 1 changed file with 25 additions and 27 deletions.
52 changes: 25 additions & 27 deletions sqsjobs/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package sqsjobs

import (
"context"
"log"
"maps"
"strconv"
"strings"
Expand All @@ -28,7 +27,7 @@ const (
BinaryType string = "Binary"
ApproximateReceiveCount string = "ApproximateReceiveCount"
fifoSuffix string = ".fifo"
pipelineStoppedError string = "Failed to ACK/NACK the job. The pipeline is probably stopped."
pipelineStoppedError string = "Failed to ACK/NACK or requeue the job. The pipeline is probably stopped."
)

var _ jobs.Job = (*Item)(nil)
Expand Down Expand Up @@ -157,7 +156,10 @@ func (i *Item) Ack() error {
return nil
}

func commonNack(i *Item) error {
func (i *Item) commonNack(requeue bool, delay int) error {
if atomic.LoadUint64(i.Options.stopped) == 1 {
return errors.Str(pipelineStoppedError)
}
defer func() {
i.Options.cond.Signal()
atomic.AddInt64(i.Options.msgInFlight, ^int64(0))
Expand All @@ -166,8 +168,17 @@ func commonNack(i *Item) error {
if i.Options.AutoAck {
return nil
}
if requeue {
// requeue message
err := i.Requeue(nil, delay)
if err != nil {
return err
}

if !i.Options.RetainFailedJobs {
return nil
}
switch {
case !i.Options.RetainFailedJobs:
// requeue as new message
err := i.Options.requeueFn(context.Background(), i)
if err != nil {
Expand All @@ -182,7 +193,7 @@ func commonNack(i *Item) error {
if err != nil {
return err
}
} else if i.Options.ErrorVisibilityTimeout > 0 {
case i.Options.ErrorVisibilityTimeout > 0:
// If error visibility is defined change the visibility timeout of the job that failed
_, err := i.Options.client.ChangeMessageVisibility(context.Background(), &sqs.ChangeMessageVisibilityInput{
QueueUrl: i.Options.queue,
Expand All @@ -192,39 +203,26 @@ func commonNack(i *Item) error {

if err != nil {
var notInFlight *types.MessageNotInflight
if stderr.As(err, &notInFlight) {
// I would like to log info/warning here, but not sure how to get the correct logger
log.Println("MessageNotInFlight; ignoring ChangeMessageVisibility")
} else {
// We ignore this error. If the message is not in flight, we cannot change the visibility. This may happen
// if processing takes longer than the timeout for the message, and no other works pick it up. Should be
// very rare though.
if !stderr.As(err, &notInFlight) {
return err
}
}
} // else dont do anything; wait for VisibilityTimeout to expire.
default:
// dont do anything; wait for VisibilityTimeout to expire.
}

return nil
}

func (i *Item) Nack() error {
if atomic.LoadUint64(i.Options.stopped) == 1 {
return errors.Str(pipelineStoppedError)
}
return commonNack(i)
return i.commonNack(false, 0)
}

func (i *Item) NackWithOptions(requeue bool, delay int) error {
if atomic.LoadUint64(i.Options.stopped) == 1 {
return errors.Str(pipelineStoppedError)
}
if requeue {
// requeue message
err := i.Requeue(nil, delay)
if err != nil {
return err
}

return nil
}
return commonNack(i)
return i.commonNack(requeue, delay)
}

func (i *Item) Requeue(headers map[string][]string, delay int) error {
Expand Down

0 comments on commit 27e466f

Please sign in to comment.