Skip to content

Commit

Permalink
pubsub/gcppubsub: Support lazy mode for Nacks (#3195)
Browse files Browse the repository at this point in the history
  • Loading branch information
vangent authored Dec 28, 2022
1 parent be80e70 commit 43ed5a4
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 4 deletions.
29 changes: 28 additions & 1 deletion pubsub/gcppubsub/gcppubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ const Scheme = "gcppubsub"
//
// The following query parameters are supported:
//
// - max_recv_batch_size: sets SubscriptionOptions.MaxBatchSize
// - max_recv_batch_size: sets SubscriptionOptions.MaxBatchSize.
// - nacklazy: sets SubscriberOptions.NackLazy. The value must be parseable by `strconv.ParseBool`.
//
// Currently their use is limited to subscribers.
type URLOpener struct {
Expand Down Expand Up @@ -236,6 +237,13 @@ func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsu
}

opts.MaxBatchSize = maxBatchSize
case "nacklazy":
var err error
nackLazy, err := queryParameterBool(value)
if err != nil {
return nil, fmt.Errorf("open subscription %v: invalid query parameter %q: %v", u, param, err)
}
opts.NackLazy = nackLazy
default:
return nil, fmt.Errorf("open subscription %v: invalid query parameter %q", u, param)
}
Expand Down Expand Up @@ -436,6 +444,15 @@ type SubscriptionOptions struct {
// MaxBatchSize caps the maximum batch size used when retrieving messages. It defaults to 1000.
MaxBatchSize int

// NackLazy determines what Nack does.
//
// By default, Nack uses ModifyAckDeadline to set the ack deadline
// for the nacked message to 0, so that it will be redelivered immediately.
// Set NackLazy to true to bypass this behavior; Nack will do nothing,
// and the message will be redelivered after the existing ack deadline
// expires.
NackLazy bool

// ReceiveBatcherOptions adds constraints to the default batching done for receives.
ReceiveBatcherOptions batcher.Options

Expand Down Expand Up @@ -560,6 +577,9 @@ func (s *subscription) CanNack() bool { return true }

// SendNacks implements driver.Subscription.SendNacks.
func (s *subscription) SendNacks(ctx context.Context, ids []driver.AckID) error {
if s.options.NackLazy {
return nil
}
ids2 := make([]string, 0, len(ids))
for _, id := range ids {
ids2 = append(ids2, id.(string))
Expand Down Expand Up @@ -610,3 +630,10 @@ func queryParameterInt(value []string) (int, error) {

return strconv.Atoi(value[0])
}
func queryParameterBool(value []string) (bool, error) {
if len(value) > 1 {
return false, fmt.Errorf("expected only one parameter value, got: %v", len(value))
}

return strconv.ParseBool(value[0])
}
10 changes: 7 additions & 3 deletions pubsub/gcppubsub/gcppubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,12 +412,16 @@ func TestOpenSubscriptionFromURL(t *testing.T) {
{"gcppubsub://projects/myproject/subscriptions/mysub", false},
// Invalid parameter.
{"gcppubsub://myproject/mysub?param=value", true},
// Valid parameters
// Valid max_recv_batch_size
{"gcppubsub://projects/myproject/subscriptions/mysub?max_recv_batch_size=1", false},
// Invalid parameters
// Invalid max_recv_batch_size
{"gcppubsub://projects/myproject/subscriptions/mysub?max_recv_batch_size=0", true},
// Invalid parameters
// Invalid max_recv_batch_size
{"gcppubsub://projects/myproject/subscriptions/mysub?max_recv_batch_size=1001", true},
// Valid nacklazy
{"gcppubsub://projects/myproject/subscriptions/mysub?nacklazy=true", false},
// Invalid nacklazy
{"gcppubsub://projects/myproject/subscriptions/mysub?nacklazy=foo", true},
}

ctx := context.Background()
Expand Down

0 comments on commit 43ed5a4

Please sign in to comment.