diff --git a/pubsub/gcppubsub/gcppubsub.go b/pubsub/gcppubsub/gcppubsub.go index eaa7838007..a75d5d3f67 100644 --- a/pubsub/gcppubsub/gcppubsub.go +++ b/pubsub/gcppubsub/gcppubsub.go @@ -186,7 +186,8 @@ const Scheme = "gcppubsub" // // The following query parameters are supported: // -// - max_recv_batch_size: sets SubscriptionOptions.MaxBatchSize +// - max_recv_batch_size: sets SubscriptionOptions.MaxBatchSize. +// - nacklazy: sets SubscriberOptions.NackLazy. The value must be parseable by `strconv.ParseBool`. // // Currently their use is limited to subscribers. type URLOpener struct { @@ -236,6 +237,13 @@ func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsu } opts.MaxBatchSize = maxBatchSize + case "nacklazy": + var err error + nackLazy, err := queryParameterBool(value) + if err != nil { + return nil, fmt.Errorf("open subscription %v: invalid query parameter %q: %v", u, param, err) + } + opts.NackLazy = nackLazy default: return nil, fmt.Errorf("open subscription %v: invalid query parameter %q", u, param) } @@ -436,6 +444,15 @@ type SubscriptionOptions struct { // MaxBatchSize caps the maximum batch size used when retrieving messages. It defaults to 1000. MaxBatchSize int + // NackLazy determines what Nack does. + // + // By default, Nack uses ModifyAckDeadline to set the ack deadline + // for the nacked message to 0, so that it will be redelivered immediately. + // Set NackLazy to true to bypass this behavior; Nack will do nothing, + // and the message will be redelivered after the existing ack deadline + // expires. + NackLazy bool + // ReceiveBatcherOptions adds constraints to the default batching done for receives. ReceiveBatcherOptions batcher.Options @@ -560,6 +577,9 @@ func (s *subscription) CanNack() bool { return true } // SendNacks implements driver.Subscription.SendNacks. func (s *subscription) SendNacks(ctx context.Context, ids []driver.AckID) error { + if s.options.NackLazy { + return nil + } ids2 := make([]string, 0, len(ids)) for _, id := range ids { ids2 = append(ids2, id.(string)) @@ -610,3 +630,10 @@ func queryParameterInt(value []string) (int, error) { return strconv.Atoi(value[0]) } +func queryParameterBool(value []string) (bool, error) { + if len(value) > 1 { + return false, fmt.Errorf("expected only one parameter value, got: %v", len(value)) + } + + return strconv.ParseBool(value[0]) +} diff --git a/pubsub/gcppubsub/gcppubsub_test.go b/pubsub/gcppubsub/gcppubsub_test.go index 0c5cbf9a07..2d2677668d 100644 --- a/pubsub/gcppubsub/gcppubsub_test.go +++ b/pubsub/gcppubsub/gcppubsub_test.go @@ -412,12 +412,16 @@ func TestOpenSubscriptionFromURL(t *testing.T) { {"gcppubsub://projects/myproject/subscriptions/mysub", false}, // Invalid parameter. {"gcppubsub://myproject/mysub?param=value", true}, - // Valid parameters + // Valid max_recv_batch_size {"gcppubsub://projects/myproject/subscriptions/mysub?max_recv_batch_size=1", false}, - // Invalid parameters + // Invalid max_recv_batch_size {"gcppubsub://projects/myproject/subscriptions/mysub?max_recv_batch_size=0", true}, - // Invalid parameters + // Invalid max_recv_batch_size {"gcppubsub://projects/myproject/subscriptions/mysub?max_recv_batch_size=1001", true}, + // Valid nacklazy + {"gcppubsub://projects/myproject/subscriptions/mysub?nacklazy=true", false}, + // Invalid nacklazy + {"gcppubsub://projects/myproject/subscriptions/mysub?nacklazy=foo", true}, } ctx := context.Background()