Skip to content

Commit

Permalink
pubsub/awssnssqs: Support lazy mode for Nacks (#3194)
Browse files Browse the repository at this point in the history
  • Loading branch information
vangent authored Dec 28, 2022
1 parent 7d69099 commit be80e70
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 0 deletions.
24 changes: 24 additions & 0 deletions pubsub/awssnssqs/awssnssqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ const SQSScheme = "awssqs"
//
// - raw (for "awssqs" Subscriptions only): sets SubscriberOptions.Raw. The
// value must be parseable by `strconv.ParseBool`.
// - nacklazy (for "awssqs" Subscriptions only): sets SubscriberOptions.NackLazy. The
// value must be parseable by `strconv.ParseBool`.
// - waittime: sets SubscriberOptions.WaitTime, in time.ParseDuration formats.
//
// See gocloud.dev/aws/ConfigFromURLParams for other query parameters
Expand Down Expand Up @@ -284,6 +286,14 @@ func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsu
}
q.Del("raw")
}
if nackLazyStr := q.Get("nacklazy"); nackLazyStr != "" {
var err error
opts.NackLazy, err = strconv.ParseBool(nackLazyStr)
if err != nil {
return nil, fmt.Errorf("invalid value %q for nacklazy: %v", nackLazyStr, err)
}
q.Del("nacklazy")
}
if waitTimeStr := q.Get("waittime"); waitTimeStr != "" {
var err error
opts.WaitTime, err = time.ParseDuration(waitTimeStr)
Expand Down Expand Up @@ -918,6 +928,17 @@ type SubscriptionOptions struct {
// See https://aws.amazon.com/sns/faqs/#Raw_message_delivery.
Raw bool

// NackLazy determines what Nack does.
//
// By default, Nack uses ChangeMessageVisibility to set the VisibilityTimeout
// for the nacked message to 0, so that it will be redelivered immediately.
// Set NackLazy to true to bypass this behavior; Nack will do nothing,
// and the message will be redelivered after the existing VisibilityTimeout
// expires (defaults to 30s, but can be configured per queue).
//
// See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html.
NackLazy bool

// WaitTime passed to ReceiveMessage to enable long polling.
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html#sqs-long-polling.
// Note that a non-zero WaitTime can delay delivery of messages
Expand Down Expand Up @@ -1199,6 +1220,9 @@ func (s *subscription) CanNack() bool { return true }

// SendNacks implements driver.Subscription.SendNacks.
func (s *subscription) SendNacks(ctx context.Context, ids []driver.AckID) error {
if s.opts.NackLazy {
return nil
}
if s.useV2 {
req := &sqsv2.ChangeMessageVisibilityBatchInput{QueueUrl: aws.String(s.qURL)}
for _, id := range ids {
Expand Down
4 changes: 4 additions & 0 deletions pubsub/awssnssqs/awssnssqs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,10 @@ func TestOpenSubscriptionFromURL(t *testing.T) {
{"awssqs://sqs.us-east-2.amazonaws.com/99999/my-queue?raw=1", false},
// Invalid raw.
{"awssqs://sqs.us-east-2.amazonaws.com/99999/my-queue?raw=foo", true},
// OK, setting nacklazy.
{"awssqs://sqs.us-east-2.amazonaws.com/99999/my-queue?nacklazy=1", false},
// Invalid nacklazy.
{"awssqs://sqs.us-east-2.amazonaws.com/99999/my-queue?nacklazy=foo", true},
// OK, setting waittime.
{"awssqs://sqs.us-east-2.amazonaws.com/99999/my-queue?waittime=5s", false},
// OK, setting usev2.
Expand Down

0 comments on commit be80e70

Please sign in to comment.