From 27e466f4b4a04f8b170671d31f41a8510f07104c Mon Sep 17 00:00:00 2001 From: Nicolai Cornelis Date: Wed, 23 Oct 2024 15:56:42 +0200 Subject: [PATCH] Fix commonNack logic to retain original functionality Adjust error message change if-else to switch --- sqsjobs/item.go | 52 ++++++++++++++++++++++++------------------------- 1 file changed, 25 insertions(+), 27 deletions(-) diff --git a/sqsjobs/item.go b/sqsjobs/item.go index aab17d8..1e6d0ee 100644 --- a/sqsjobs/item.go +++ b/sqsjobs/item.go @@ -2,7 +2,6 @@ package sqsjobs import ( "context" - "log" "maps" "strconv" "strings" @@ -28,7 +27,7 @@ const ( BinaryType string = "Binary" ApproximateReceiveCount string = "ApproximateReceiveCount" fifoSuffix string = ".fifo" - pipelineStoppedError string = "Failed to ACK/NACK the job. The pipeline is probably stopped." + pipelineStoppedError string = "Failed to ACK/NACK or requeue the job. The pipeline is probably stopped." ) var _ jobs.Job = (*Item)(nil) @@ -157,7 +156,10 @@ func (i *Item) Ack() error { return nil } -func commonNack(i *Item) error { +func (i *Item) commonNack(requeue bool, delay int) error { + if atomic.LoadUint64(i.Options.stopped) == 1 { + return errors.Str(pipelineStoppedError) + } defer func() { i.Options.cond.Signal() atomic.AddInt64(i.Options.msgInFlight, ^int64(0)) @@ -166,8 +168,17 @@ func commonNack(i *Item) error { if i.Options.AutoAck { return nil } + if requeue { + // requeue message + err := i.Requeue(nil, delay) + if err != nil { + return err + } - if !i.Options.RetainFailedJobs { + return nil + } + switch { + case !i.Options.RetainFailedJobs: // requeue as new message err := i.Options.requeueFn(context.Background(), i) if err != nil { @@ -182,7 +193,7 @@ func commonNack(i *Item) error { if err != nil { return err } - } else if i.Options.ErrorVisibilityTimeout > 0 { + case i.Options.ErrorVisibilityTimeout > 0: // If error visibility is defined change the visibility timeout of the job that failed _, err := i.Options.client.ChangeMessageVisibility(context.Background(), &sqs.ChangeMessageVisibilityInput{ QueueUrl: i.Options.queue, @@ -192,39 +203,26 @@ func commonNack(i *Item) error { if err != nil { var notInFlight *types.MessageNotInflight - if stderr.As(err, ¬InFlight) { - // I would like to log info/warning here, but not sure how to get the correct logger - log.Println("MessageNotInFlight; ignoring ChangeMessageVisibility") - } else { + // We ignore this error. If the message is not in flight, we cannot change the visibility. This may happen + // if processing takes longer than the timeout for the message, and no other works pick it up. Should be + // very rare though. + if !stderr.As(err, ¬InFlight) { return err } } - } // else dont do anything; wait for VisibilityTimeout to expire. + default: + // dont do anything; wait for VisibilityTimeout to expire. + } return nil } func (i *Item) Nack() error { - if atomic.LoadUint64(i.Options.stopped) == 1 { - return errors.Str(pipelineStoppedError) - } - return commonNack(i) + return i.commonNack(false, 0) } func (i *Item) NackWithOptions(requeue bool, delay int) error { - if atomic.LoadUint64(i.Options.stopped) == 1 { - return errors.Str(pipelineStoppedError) - } - if requeue { - // requeue message - err := i.Requeue(nil, delay) - if err != nil { - return err - } - - return nil - } - return commonNack(i) + return i.commonNack(requeue, delay) } func (i *Item) Requeue(headers map[string][]string, delay int) error {