From be80e70b3dcf7a6b86481881c7ac0b44a8095178 Mon Sep 17 00:00:00 2001 From: Robert van Gent Date: Wed, 28 Dec 2022 09:51:53 -0800 Subject: [PATCH] pubsub/awssnssqs: Support lazy mode for Nacks (#3194) --- pubsub/awssnssqs/awssnssqs.go | 24 ++++++++++++++++++++++++ pubsub/awssnssqs/awssnssqs_test.go | 4 ++++ 2 files changed, 28 insertions(+) diff --git a/pubsub/awssnssqs/awssnssqs.go b/pubsub/awssnssqs/awssnssqs.go index 320203e67e..fa967633ea 100644 --- a/pubsub/awssnssqs/awssnssqs.go +++ b/pubsub/awssnssqs/awssnssqs.go @@ -214,6 +214,8 @@ const SQSScheme = "awssqs" // // - raw (for "awssqs" Subscriptions only): sets SubscriberOptions.Raw. The // value must be parseable by `strconv.ParseBool`. +// - nacklazy (for "awssqs" Subscriptions only): sets SubscriberOptions.NackLazy. The +// value must be parseable by `strconv.ParseBool`. // - waittime: sets SubscriberOptions.WaitTime, in time.ParseDuration formats. // // See gocloud.dev/aws/ConfigFromURLParams for other query parameters @@ -284,6 +286,14 @@ func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsu } q.Del("raw") } + if nackLazyStr := q.Get("nacklazy"); nackLazyStr != "" { + var err error + opts.NackLazy, err = strconv.ParseBool(nackLazyStr) + if err != nil { + return nil, fmt.Errorf("invalid value %q for nacklazy: %v", nackLazyStr, err) + } + q.Del("nacklazy") + } if waitTimeStr := q.Get("waittime"); waitTimeStr != "" { var err error opts.WaitTime, err = time.ParseDuration(waitTimeStr) @@ -918,6 +928,17 @@ type SubscriptionOptions struct { // See https://aws.amazon.com/sns/faqs/#Raw_message_delivery. Raw bool + // NackLazy determines what Nack does. + // + // By default, Nack uses ChangeMessageVisibility to set the VisibilityTimeout + // for the nacked message to 0, so that it will be redelivered immediately. + // Set NackLazy to true to bypass this behavior; Nack will do nothing, + // and the message will be redelivered after the existing VisibilityTimeout + // expires (defaults to 30s, but can be configured per queue). + // + // See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html. + NackLazy bool + // WaitTime passed to ReceiveMessage to enable long polling. // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html#sqs-long-polling. // Note that a non-zero WaitTime can delay delivery of messages @@ -1199,6 +1220,9 @@ func (s *subscription) CanNack() bool { return true } // SendNacks implements driver.Subscription.SendNacks. func (s *subscription) SendNacks(ctx context.Context, ids []driver.AckID) error { + if s.opts.NackLazy { + return nil + } if s.useV2 { req := &sqsv2.ChangeMessageVisibilityBatchInput{QueueUrl: aws.String(s.qURL)} for _, id := range ids { diff --git a/pubsub/awssnssqs/awssnssqs_test.go b/pubsub/awssnssqs/awssnssqs_test.go index e60623f4e9..3bf6c8d333 100644 --- a/pubsub/awssnssqs/awssnssqs_test.go +++ b/pubsub/awssnssqs/awssnssqs_test.go @@ -700,6 +700,10 @@ func TestOpenSubscriptionFromURL(t *testing.T) { {"awssqs://sqs.us-east-2.amazonaws.com/99999/my-queue?raw=1", false}, // Invalid raw. {"awssqs://sqs.us-east-2.amazonaws.com/99999/my-queue?raw=foo", true}, + // OK, setting nacklazy. + {"awssqs://sqs.us-east-2.amazonaws.com/99999/my-queue?nacklazy=1", false}, + // Invalid nacklazy. + {"awssqs://sqs.us-east-2.amazonaws.com/99999/my-queue?nacklazy=foo", true}, // OK, setting waittime. {"awssqs://sqs.us-east-2.amazonaws.com/99999/my-queue?waittime=5s", false}, // OK, setting usev2.