diff --git a/protocol/pubsub/v2/internal/connection.go b/protocol/pubsub/v2/internal/connection.go index bdd430b28..673231dfa 100644 --- a/protocol/pubsub/v2/internal/connection.go +++ b/protocol/pubsub/v2/internal/connection.go @@ -73,6 +73,12 @@ type Connection struct { // MessageOrdering enables message ordering for all topics and subscriptions. // This can only be set prior to first call of any function. MessageOrdering bool + // Filter is an expression written in the Cloud Pub/Sub filter language. If + // non-empty, then only `PubsubMessage`s whose `attributes` field matches the + // filter are delivered on this subscription. If empty, then no messages are + // filtered out. Cannot be changed after the subscription is created. + // This can only be set prior to first call of any function. + Filter string } const ( @@ -234,6 +240,7 @@ func (c *Connection) getOrCreateSubscriptionInfo(ctx context.Context, getAlready AckDeadline: *c.AckDeadline, RetentionDuration: *c.RetentionDuration, EnableMessageOrdering: c.MessageOrdering, + Filter: c.Filter, }) if si.err != nil { return diff --git a/protocol/pubsub/v2/options.go b/protocol/pubsub/v2/options.go index fd8e0d1d7..2e6b328e2 100644 --- a/protocol/pubsub/v2/options.go +++ b/protocol/pubsub/v2/options.go @@ -115,6 +115,37 @@ func WithSubscriptionAndTopicID(subscriptionID, topicID string) Option { } } +// WithSubscriptionIDAndFilter sets the subscription and topic IDs for pubsub transport. +// This option can be used multiple times. +func WithSubscriptionIDAndFilter(subscriptionID, filter string) Option { + return func(t *Protocol) error { + if t.subscriptions == nil { + t.subscriptions = make([]subscriptionWithTopic, 0) + } + t.subscriptions = append(t.subscriptions, subscriptionWithTopic{ + subscriptionID: subscriptionID, + filter: filter, + }) + return nil + } +} + +// WithSubscriptionTopicIDAndFilter sets the subscription with filter option and topic IDs for pubsub transport. +// This option can be used multiple times. +func WithSubscriptionTopicIDAndFilter(subscriptionID, topicID, filter string) Option { + return func(t *Protocol) error { + if t.subscriptions == nil { + t.subscriptions = make([]subscriptionWithTopic, 0) + } + t.subscriptions = append(t.subscriptions, subscriptionWithTopic{ + subscriptionID: subscriptionID, + topicID: topicID, + filter: filter, + }) + return nil + } +} + // WithSubscriptionIDFromEnv sets the subscription ID for pubsub transport from // a given environment variable name. func WithSubscriptionIDFromEnv(key string) Option { @@ -135,6 +166,33 @@ func WithSubscriptionIDFromDefaultEnv() Option { return WithSubscriptionIDFromEnv(DefaultSubscriptionEnvKey) } +// WithFilter sets the subscription filter for pubsub transport. +func WithFilter(filter string) Option { + return func(t *Protocol) error { + if t.subscriptions == nil { + t.subscriptions = make([]subscriptionWithTopic, 0) + } + t.subscriptions = append(t.subscriptions, subscriptionWithTopic{ + filter: filter, + }) + return nil + } +} + +// WithFilterFromEnv sets the subscription filter for pubsub transport from +// a given environment variable name. +func WithFilterFromEnv(key string) Option { + return func(t *Protocol) error { + v := os.Getenv(key) + if v == "" { + return fmt.Errorf("unable to load subscription filter, %q environment variable not set", key) + } + + opt := WithFilter(v) + return opt(t) + } +} + // AllowCreateTopic sets if the transport can create a topic if it does not // exist. func AllowCreateTopic(allow bool) Option { diff --git a/protocol/pubsub/v2/protocol.go b/protocol/pubsub/v2/protocol.go index e8e127bc1..fee092694 100644 --- a/protocol/pubsub/v2/protocol.go +++ b/protocol/pubsub/v2/protocol.go @@ -26,6 +26,7 @@ const ( type subscriptionWithTopic struct { topicID string subscriptionID string + filter string } // Protocol acts as both a pubsub topic and a pubsub subscription .