diff --git a/cmd/webhook/main.go b/cmd/webhook/main.go index e1e309f945..66e6a368c0 100644 --- a/cmd/webhook/main.go +++ b/cmd/webhook/main.go @@ -20,6 +20,7 @@ import ( "context" "log" + brokerv1beta1 "github.com/google/knative-gcp/pkg/apis/broker/v1beta1" "github.com/google/knative-gcp/pkg/apis/configs/gcpauth" "github.com/google/knative-gcp/pkg/apis/events" eventsv1 "github.com/google/knative-gcp/pkg/apis/events/v1" @@ -52,6 +53,9 @@ import ( ) var types = map[schema.GroupVersionKind]resourcesemantics.GenericCRD{ + // For group eventing.knative.dev. + brokerv1beta1.SchemeGroupVersion.WithKind("Broker"): &brokerv1beta1.Broker{}, + // For group messaging.cloud.google.com. messagingv1alpha1.SchemeGroupVersion.WithKind("Channel"): &messagingv1alpha1.Channel{}, messagingv1beta1.SchemeGroupVersion.WithKind("Channel"): &messagingv1beta1.Channel{}, diff --git a/docs/spec/delivery.md b/docs/spec/delivery.md new file mode 100644 index 0000000000..0b6a20a9ba --- /dev/null +++ b/docs/spec/delivery.md @@ -0,0 +1,27 @@ +# Delivery Implementation + +This document outlines how GCP Brokers handle the translation from the +[Knative Eventing delivery specification](https://github.com/knative/eventing/tree/master/docs/delivery) +to the [Cloud Pub/Sub subscription configuration](https://pkg.go.dev/cloud.google.com/go/pubsub?tab=doc#SubscriptionConfig). + +The Knative Eventing delivery specification allows for the configuration of a +backoff retry policy and a dead letter policy. + +## Dead Letter Policy + +A Pub/Sub subscription has its dead letter policy configured through the subscription configuration member `DeadLetterPolicy`. + +The Knative dead letter policy is specified through the following parameters in the Knative Eventing delivery spec: +* `DeadLetterSink`: We only allow special URLs as the dead letter sink, of the form `pubsub://[dead_letter_sink_topic]`. We assume that if a topic is specified, it already exists. +* `Retry`: This is the number of delivery attempts until the event is forwarded to the dead letter topic. Mapped to the Pub/Sub dead letter policy's `MaxDeliveryAttempts`. + +## Retry Policy + +A Pub/Sub subscription has its backoff retry policy configured through the subscription configuration member `RetryPolicy`. + +Pub/Sub supports an [exponential backoff](https://en.wikipedia.org/wiki/Exponential_backoff) retry policy based on the specified minimum backoff delay, and capped to some specified maximum backoff delay. + +* `BackoffDelay`: This is mapped to the retry policy's `MinimumBackoff`. +* `BackoffPolicy`: There are two options for the backoff policy: + * `linear`: In this case, the retry policy's `MaximumBackoff` is set to be equal to the `MinimumBackoff`, which sets the delay between each retry to be equal. + * `exponential`: In this case, the retry policy's `MaximumBackoff` is set to 600 seconds, which is the largest value allowed by Pub/Sub. diff --git a/go.mod b/go.mod index 3199e05df0..56dfdf3ab7 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/google/wire v0.4.0 github.com/googleapis/gax-go/v2 v2.0.5 github.com/kelseyhightower/envconfig v1.4.0 + github.com/rickb777/date v1.13.0 go.opencensus.io v0.22.5-0.20200714042313-af30f77c5f65 go.opentelemetry.io/otel v0.3.0 // indirect go.uber.org/multierr v1.5.0 diff --git a/pkg/apis/broker/v1beta1/broker_defaults.go b/pkg/apis/broker/v1beta1/broker_defaults.go index e675e25a66..aa4d456324 100644 --- a/pkg/apis/broker/v1beta1/broker_defaults.go +++ b/pkg/apis/broker/v1beta1/broker_defaults.go @@ -18,10 +18,37 @@ package v1beta1 import ( "context" + + eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" +) + +var ( + // DefaultBackoffDelay is the default backoff delay used in the backoff retry policy + // for the Broker delivery spec. + DefaultBackoffDelay = "PT1S" + // DefaultBackoffPolicy is the default backoff policy type used in the backoff retry + // policy for the Broker delivery spec. + DefaultBackoffPolicy = eventingduckv1beta1.BackoffPolicyExponential + // DefaultRetry is the default number of maximum delivery attempts for unacked messages + // before they are sent to a dead letter topic in the Broker delivery spec, in + // case a dead letter topic is specified. Without a dead letter topic specified, + // the retry count is infinite. + DefaultRetry int32 = 6 ) // SetDefaults sets the default field values for a Broker. func (b *Broker) SetDefaults(ctx context.Context) { - // The Google Cloud Broker doesn't have any custom defaults. The - // eventing webhook will add the usual defaults. + // Set the default delivery spec. + if b.Spec.Delivery == nil { + b.Spec.Delivery = &eventingduckv1beta1.DeliverySpec{} + } + if b.Spec.Delivery.BackoffPolicy == nil && + b.Spec.Delivery.BackoffDelay == nil { + b.Spec.Delivery.BackoffPolicy = &DefaultBackoffPolicy + b.Spec.Delivery.BackoffDelay = &DefaultBackoffDelay + } + if b.Spec.Delivery.Retry == nil && b.Spec.Delivery.DeadLetterSink != nil { + b.Spec.Delivery.Retry = &DefaultRetry + } + // Besides this, the eventing webhook will add the usual defaults. } diff --git a/pkg/apis/broker/v1beta1/broker_lifecycle.go b/pkg/apis/broker/v1beta1/broker_lifecycle.go index 2fe0e62570..60170974b0 100644 --- a/pkg/apis/broker/v1beta1/broker_lifecycle.go +++ b/pkg/apis/broker/v1beta1/broker_lifecycle.go @@ -70,11 +70,11 @@ func (bs *BrokerStatus) SetAddress(url *apis.URL) { } } -func (bs *BrokerStatus) MarkBrokerCelllUnknown(reason, format string, args ...interface{}) { +func (bs *BrokerStatus) MarkBrokerCellUnknown(reason, format string, args ...interface{}) { brokerCondSet.Manage(bs).MarkUnknown(BrokerConditionBrokerCell, reason, format, args...) } -func (bs *BrokerStatus) MarkBrokerCelllFailed(reason, format string, args ...interface{}) { +func (bs *BrokerStatus) MarkBrokerCellFailed(reason, format string, args ...interface{}) { brokerCondSet.Manage(bs).MarkFalse(BrokerConditionBrokerCell, reason, format, args...) } diff --git a/pkg/apis/broker/v1beta1/broker_lifecycle_test.go b/pkg/apis/broker/v1beta1/broker_lifecycle_test.go index 9adf21b3b6..98fd88d516 100644 --- a/pkg/apis/broker/v1beta1/broker_lifecycle_test.go +++ b/pkg/apis/broker/v1beta1/broker_lifecycle_test.go @@ -348,9 +348,9 @@ func TestBrokerConditionStatus(t *testing.T) { if test.brokerCellStatus == corev1.ConditionTrue { bs.MarkBrokerCellReady() } else if test.brokerCellStatus == corev1.ConditionFalse { - bs.MarkBrokerCelllFailed("Unable to create brokercell", "induced failure") + bs.MarkBrokerCellFailed("Unable to create brokercell", "induced failure") } else { - bs.MarkBrokerCelllUnknown("Unable to create brokercell", "induced unknown") + bs.MarkBrokerCellUnknown("Unable to create brokercell", "induced unknown") } if test.subscriptionStatus == corev1.ConditionTrue { bs.MarkSubscriptionReady() diff --git a/pkg/apis/broker/v1beta1/broker_validation.go b/pkg/apis/broker/v1beta1/broker_validation.go index ebb598b407..2d1c2cf19f 100644 --- a/pkg/apis/broker/v1beta1/broker_validation.go +++ b/pkg/apis/broker/v1beta1/broker_validation.go @@ -19,12 +19,41 @@ package v1beta1 import ( "context" + eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" ) // Validate verifies that the Broker is valid. func (b *Broker) Validate(ctx context.Context) *apis.FieldError { - // The Google Cloud Broker doesn't have any custom validations. The - // eventing webhook will run the usual validations. + // We validate the GCP Broker's delivery spec. The eventing webhook will run + // the other usual validations. + return ValidateDeliverySpec(ctx, b.Spec.Delivery).ViaField("spec", "delivery") +} + +func ValidateDeliverySpec(ctx context.Context, spec *eventingduckv1beta1.DeliverySpec) *apis.FieldError { + if spec == nil { + return nil + } + return ValidateDeadLetterSink(ctx, spec.DeadLetterSink).ViaField("deadLetterSink") +} + +func ValidateDeadLetterSink(ctx context.Context, sink *duckv1.Destination) *apis.FieldError { + if sink == nil { + return nil + } + if sink.URI == nil { + return apis.ErrMissingField("uri") + } + if scheme := sink.URI.Scheme; scheme != "pubsub" { + return apis.ErrInvalidValue("Dead letter sink URI scheme should be pubsub", "uri") + } + topicID := sink.URI.Host + if topicID == "" { + return apis.ErrInvalidValue("Dead letter topic must not be empty", "uri") + } + if len(topicID) > 255 { + return apis.ErrInvalidValue("Dead letter topic maximum length is 255 characters", "uri") + } return nil } diff --git a/pkg/apis/broker/v1beta1/broker_validation_test.go b/pkg/apis/broker/v1beta1/broker_validation_test.go index 463a0a6cf4..91c79b09ec 100644 --- a/pkg/apis/broker/v1beta1/broker_validation_test.go +++ b/pkg/apis/broker/v1beta1/broker_validation_test.go @@ -18,7 +18,13 @@ package v1beta1 import ( "context" + "strings" "testing" + + "github.com/google/go-cmp/cmp" + eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" ) func TestBroker_Validate(t *testing.T) { @@ -27,3 +33,71 @@ func TestBroker_Validate(t *testing.T) { t.Errorf("expected nil, got %v", err) } } + +func TestDeliverySpec_Validate(t *testing.T) { + ds := &eventingduckv1beta1.DeliverySpec{} + if err := ValidateDeliverySpec(context.TODO(), ds); err != nil { + t.Errorf("expected nil, got %v", err) + } +} + +func TestDeadLetterSink_Validate(t *testing.T) { + tests := []struct { + name string + deadLetterSink *duckv1.Destination + want *apis.FieldError + }{{ + name: "invalid dead letter sink missing uri", + deadLetterSink: &duckv1.Destination{}, + want: apis.ErrMissingField("uri"), + }, { + name: "invalid dead letter sink uri scheme", + deadLetterSink: &duckv1.Destination{ + URI: &apis.URL{ + Scheme: "http", + Host: "test-topic-id", + Path: "/", + }, + }, + want: apis.ErrInvalidValue("Dead letter sink URI scheme should be pubsub", "uri"), + }, { + name: "invalid empty dead letter topic id", + deadLetterSink: &duckv1.Destination{ + URI: &apis.URL{ + Scheme: "pubsub", + Host: "", + Path: "/", + }, + }, + want: apis.ErrInvalidValue("Dead letter topic must not be empty", "uri"), + }, { + name: "invalid dead letter topic id too long", + deadLetterSink: &duckv1.Destination{ + URI: &apis.URL{ + Scheme: "pubsub", + Host: strings.Repeat("x", 256), + Path: "/", + }, + }, + want: apis.ErrInvalidValue("Dead letter topic maximum length is 255 characters", "uri"), + }, { + name: "valid dead letter topic", + deadLetterSink: &duckv1.Destination{ + URI: &apis.URL{ + Scheme: "pubsub", + Host: "test-topic-id", + Path: "/", + }, + }, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := ValidateDeadLetterSink(context.Background(), test.deadLetterSink) + //got := test.spec.Validate(context.Background()) + if diff := cmp.Diff(test.want.Error(), got.Error()); diff != "" { + t.Errorf("ValidateDeadLetterSink (-want, +got) = %v", diff) + } + }) + } +} diff --git a/pkg/reconciler/broker/create_brokercell.go b/pkg/reconciler/broker/create_brokercell.go index 4de4dd8801..07017be9d3 100644 --- a/pkg/reconciler/broker/create_brokercell.go +++ b/pkg/reconciler/broker/create_brokercell.go @@ -44,7 +44,7 @@ func (r *Reconciler) ensureBrokerCellExists(ctx context.Context, b *brokerv1beta if err != nil && !apierrs.IsNotFound(err) { logging.FromContext(ctx).Error("Error reconciling brokercell", zap.String("namespace", b.Namespace), zap.String("broker", b.Name), zap.Error(err)) - b.Status.MarkBrokerCelllUnknown("BrokerCellUnknown", "Failed to get brokercell %s/%s", bc.Namespace, bc.Name) + b.Status.MarkBrokerCellUnknown("BrokerCellUnknown", "Failed to get brokercell %s/%s", bc.Namespace, bc.Name) return err } @@ -53,7 +53,7 @@ func (r *Reconciler) ensureBrokerCellExists(ctx context.Context, b *brokerv1beta bc, err = r.RunClientSet.InternalV1alpha1().BrokerCells(want.Namespace).Create(want) if err != nil && !apierrs.IsAlreadyExists(err) { logging.FromContext(ctx).Error("Error creating brokercell", zap.String("namespace", b.Namespace), zap.String("broker", b.Name), zap.Error(err)) - b.Status.MarkBrokerCelllFailed("BrokerCellCreationFailed", "Failed to create %s/%s", want.Namespace, want.Name) + b.Status.MarkBrokerCellFailed("BrokerCellCreationFailed", "Failed to create %s/%s", want.Namespace, want.Name) return err } if apierrs.IsAlreadyExists(err) { @@ -63,7 +63,7 @@ func (r *Reconciler) ensureBrokerCellExists(ctx context.Context, b *brokerv1beta bc, err = r.RunClientSet.InternalV1alpha1().BrokerCells(want.Namespace).Get(want.Name, metav1.GetOptions{}) if err != nil { logging.FromContext(ctx).Error("Failed to get the brokercell from the API server", zap.String("namespace", b.Namespace), zap.String("broker", b.Name), zap.Error(err)) - b.Status.MarkBrokerCelllUnknown("BrokerCellUnknown", "Failed to get the brokercell from the API server %s/%s", want.Namespace, want.Name) + b.Status.MarkBrokerCellUnknown("BrokerCellUnknown", "Failed to get the brokercell from the API server %s/%s", want.Namespace, want.Name) return err } } @@ -75,7 +75,7 @@ func (r *Reconciler) ensureBrokerCellExists(ctx context.Context, b *brokerv1beta if bc.Status.IsReady() { b.Status.MarkBrokerCellReady() } else { - b.Status.MarkBrokerCelllUnknown("BrokerCellNotReady", "Brokercell %s/%s is not ready", bc.Namespace, bc.Name) + b.Status.MarkBrokerCellUnknown("BrokerCellNotReady", "Brokercell %s/%s is not ready", bc.Namespace, bc.Name) } //TODO(#1019) Use the IngressTemplate of brokercell. diff --git a/pkg/reconciler/testing/broker.go b/pkg/reconciler/testing/broker.go index c13bfeac09..c8d5c1dcda 100644 --- a/pkg/reconciler/testing/broker.go +++ b/pkg/reconciler/testing/broker.go @@ -23,6 +23,7 @@ import ( brokerv1beta1 "github.com/google/knative-gcp/pkg/apis/broker/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1" "knative.dev/pkg/apis" ) @@ -123,13 +124,13 @@ func WithBrokerReadyURI(address *apis.URL) BrokerOption { func WithBrokerBrokerCellFailed(reason, msg string) BrokerOption { return func(b *brokerv1beta1.Broker) { - b.Status.MarkBrokerCelllFailed(reason, msg) + b.Status.MarkBrokerCellFailed(reason, msg) } } func WithBrokerBrokerCellUnknown(reason, msg string) BrokerOption { return func(b *brokerv1beta1.Broker) { - b.Status.MarkBrokerCelllUnknown(reason, msg) + b.Status.MarkBrokerCellUnknown(reason, msg) } } @@ -159,3 +160,10 @@ func WithBrokerClass(bc string) BrokerOption { func WithBrokerSetDefaults(b *brokerv1beta1.Broker) { b.SetDefaults(context.Background()) } + +// WithBrokerDeliverySpec sets the Broker's delivery spec. +func WithBrokerDeliverySpec(deliverySpec *eventingduckv1beta1.DeliverySpec) BrokerOption { + return func(b *brokerv1beta1.Broker) { + b.Spec.Delivery = deliverySpec + } +} diff --git a/pkg/reconciler/testing/pstest.go b/pkg/reconciler/testing/pstest.go index 6b71009706..2dadd3f943 100644 --- a/pkg/reconciler/testing/pstest.go +++ b/pkg/reconciler/testing/pstest.go @@ -128,6 +128,20 @@ func SubscriptionHasRetryPolicy(id string, wantPolicy *pubsub.RetryPolicy) func( } } +func SubscriptionHasDeadLetterPolicy(id string, wantPolicy *pubsub.DeadLetterPolicy) func(*testing.T, *rtesting.TableRow) { + return func(t *testing.T, r *rtesting.TableRow) { + c := getPubsubClient(r) + sub := c.Subscription(id) + cfg, err := sub.Config(context.Background()) + if err != nil { + t.Errorf("Error getting pubsub config: %v", err) + } + if diff := cmp.Diff(wantPolicy, cfg.DeadLetterPolicy); diff != "" { + t.Errorf("Pubsub config dead letter policy (-want,+got): %v", diff) + } + } +} + func OnlySubscriptions(ids ...string) func(*testing.T, *rtesting.TableRow) { return func(t *testing.T, r *rtesting.TableRow) { c := getPubsubClient(r) diff --git a/pkg/reconciler/trigger/controller.go b/pkg/reconciler/trigger/controller.go index a6a2249dbf..72762341b8 100644 --- a/pkg/reconciler/trigger/controller.go +++ b/pkg/reconciler/trigger/controller.go @@ -18,7 +18,6 @@ package trigger import ( "context" - "time" "cloud.google.com/go/pubsub" "github.com/kelseyhightower/envconfig" @@ -60,9 +59,6 @@ var filterBroker = pkgreconciler.AnnotationFilterFunc(eventingv1beta1.BrokerClas type envConfig struct { ProjectID string `envconfig:"PROJECT_ID"` - - MinRetryBackoff time.Duration `envconfig:"MIN_RETRY_BACKOFF" default:"1s"` - MaxRetryBackoff time.Duration `envconfig:"MAX_RETRY_BACKOFF" default:"1m"` } func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl { @@ -79,11 +75,7 @@ func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl if err != nil { logging.FromContext(ctx).Error("Failed to get project ID", zap.Error(err)) } - // Set up the pub/sub retry policy. - retryPolicy := &pubsub.RetryPolicy{ - MaximumBackoff: env.MaxRetryBackoff, - MinimumBackoff: env.MinRetryBackoff, - } + // Attempt to create a pubsub client for all worker threads to use. If this // fails, pass a nil value to the Reconciler. They will attempt to // create a client on reconcile. @@ -104,7 +96,6 @@ func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl brokerLister: brokerinformer.Get(ctx).Lister(), pubsubClient: client, projectID: projectID, - retryPolicy: retryPolicy, } impl := triggerreconciler.NewImpl(ctx, r, withAgentAndFinalizer) diff --git a/pkg/reconciler/trigger/trigger.go b/pkg/reconciler/trigger/trigger.go index bdc67ad899..68ffc9b253 100644 --- a/pkg/reconciler/trigger/trigger.go +++ b/pkg/reconciler/trigger/trigger.go @@ -19,12 +19,15 @@ package trigger import ( "context" "fmt" + "time" + "github.com/rickb777/date/period" "go.uber.org/multierr" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" + eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" "knative.dev/eventing/pkg/duck" "knative.dev/eventing/pkg/logging" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -47,6 +50,10 @@ const ( // Name of the corev1.Events emitted from the Trigger reconciliation process. triggerReconciled = "TriggerReconciled" triggerFinalized = "TriggerFinalized" + + // Default maximum backoff duration used in the backoff retry policy for + // pubsub subscriptions. 600 seconds is the longest supported time. + defaultMaximumBackoff = 600 * time.Second ) // Reconciler implements controller.Reconciler for Trigger resources. @@ -66,9 +73,6 @@ type Reconciler struct { // pubsubClient is used as the Pubsub client when present. pubsubClient *pubsub.Client - - // retryPolicy defines the retry policy for pubsub messages. - retryPolicy *pubsub.RetryPolicy } // Check that TriggerReconciler implements Interface @@ -109,7 +113,7 @@ func (r *Reconciler) reconcile(ctx context.Context, t *brokerv1beta1.Trigger, b return err } - if err := r.reconcileRetryTopicAndSubscription(ctx, t); err != nil { + if err := r.reconcileRetryTopicAndSubscription(ctx, t, b.Spec.Delivery); err != nil { return err } @@ -168,7 +172,7 @@ func hasGCPBrokerFinalizer(t *brokerv1beta1.Trigger) bool { return false } -func (r *Reconciler) reconcileRetryTopicAndSubscription(ctx context.Context, trig *brokerv1beta1.Trigger) error { +func (r *Reconciler) reconcileRetryTopicAndSubscription(ctx context.Context, trig *brokerv1beta1.Trigger, deliverySpec *eventingduckv1beta1.DeliverySpec) error { logger := logging.FromContext(ctx) logger.Debug("Reconciling retry topic") // get ProjectID from metadata @@ -216,12 +220,24 @@ func (r *Reconciler) reconcileRetryTopicAndSubscription(ctx context.Context, tri //TODO uncomment when eventing webhook allows this //trig.Status.TopicID = topic.ID() + retryPolicy := getPubsubRetryPolicy(deliverySpec) + if err != nil { + logger.Error("Error getting broker retry policy", zap.Error(err)) + return err + } + deadLetterPolicy := getPubsubDeadLetterPolicy(projectID, deliverySpec) + if err != nil { + logger.Error("Error getting broker dead letter policy", zap.Error(err)) + return err + } + // Check if PullSub exists, and if not, create it. subID := resources.GenerateRetrySubscriptionName(trig) subConfig := pubsub.SubscriptionConfig{ - Topic: topic, - Labels: labels, - RetryPolicy: r.retryPolicy, + Topic: topic, + Labels: labels, + RetryPolicy: retryPolicy, + DeadLetterPolicy: deadLetterPolicy, //TODO(grantr): configure these settings? // AckDeadline // RetentionDuration @@ -236,6 +252,55 @@ func (r *Reconciler) reconcileRetryTopicAndSubscription(ctx context.Context, tri return nil } +// getPubsubRetryPolicy gets the eventing retry policy from the Broker delivery +// spec and translates it to a pubsub retry policy. +func getPubsubRetryPolicy(spec *eventingduckv1beta1.DeliverySpec) *pubsub.RetryPolicy { + var backoffDelay *string + var backoffPolicy *eventingduckv1beta1.BackoffPolicyType + if spec == nil { + backoffDelay = &brokerv1beta1.DefaultBackoffDelay + backoffPolicy = &brokerv1beta1.DefaultBackoffPolicy + } else { + backoffDelay = spec.BackoffDelay + backoffPolicy = spec.BackoffPolicy + } + // The Broker delivery spec is translated to a pubsub retry policy in the + // manner defined in the following post: + // https://github.com/google/knative-gcp/issues/1392#issuecomment-655617873 + p, _ := period.Parse(*backoffDelay) + minimumBackoff, _ := p.Duration() + var maximumBackoff time.Duration + switch *backoffPolicy { + case eventingduckv1beta1.BackoffPolicyLinear: + maximumBackoff = minimumBackoff + case eventingduckv1beta1.BackoffPolicyExponential: + maximumBackoff = defaultMaximumBackoff + } + return &pubsub.RetryPolicy{ + MinimumBackoff: minimumBackoff, + MaximumBackoff: maximumBackoff, + } +} + +// getPubsubDeadLetterPolicy gets the eventing dead letter policy from the +// Broker delivery spec and translates it to a pubsub dead letter policy. +func getPubsubDeadLetterPolicy(projectID string, spec *eventingduckv1beta1.DeliverySpec) *pubsub.DeadLetterPolicy { + if spec == nil || spec.DeadLetterSink == nil { + return nil + } + var retry *int32 + if spec.Retry == nil { + retry = &brokerv1beta1.DefaultRetry + } else { + retry = spec.Retry + } + // Translate to the pubsub dead letter policy format. + return &pubsub.DeadLetterPolicy{ + MaxDeliveryAttempts: int(*retry), + DeadLetterTopic: fmt.Sprintf("projects/%s/topics/%s", projectID, spec.DeadLetterSink.URI.Host), + } +} + func (r *Reconciler) deleteRetryTopicAndSubscription(ctx context.Context, trig *brokerv1beta1.Trigger) error { logger := logging.FromContext(ctx) logger.Debug("Deleting retry topic") @@ -269,7 +334,7 @@ func (r *Reconciler) deleteRetryTopicAndSubscription(ctx context.Context, trig * err = multierr.Append(nil, pubsubReconciler.DeleteTopic(ctx, topicID, trig, &trig.Status)) // Delete pull subscription if it exists. subID := resources.GenerateRetrySubscriptionName(trig) - err = multierr.Append(nil, pubsubReconciler.DeleteSubscription(ctx, subID, trig, &trig.Status)) + err = multierr.Append(err, pubsubReconciler.DeleteSubscription(ctx, subID, trig, &trig.Status)) return err } diff --git a/pkg/reconciler/trigger/trigger_test.go b/pkg/reconciler/trigger/trigger_test.go index 1a66f9551d..3d6dacff26 100644 --- a/pkg/reconciler/trigger/trigger_test.go +++ b/pkg/reconciler/trigger/trigger_test.go @@ -30,7 +30,10 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" clientgotesting "k8s.io/client-go/testing" + eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" "knative.dev/eventing/pkg/duck" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/client/injection/ducks/duck/v1/addressable" "knative.dev/pkg/client/injection/ducks/duck/v1/conditions" "knative.dev/pkg/configmap" @@ -61,22 +64,37 @@ const ( ) var ( + backoffPolicy = eventingduckv1beta1.BackoffPolicyLinear + backoffDelay = "PT5S" + deadLetterTopicID = "test-dead-letter-topic-id" + retry int32 = 3 + testKey = fmt.Sprintf("%s/%s", testNS, triggerName) triggerFinalizerUpdatedEvent = Eventf(corev1.EventTypeNormal, "FinalizerUpdate", `Updated "test-trigger" finalizers`) triggerReconciledEvent = Eventf(corev1.EventTypeNormal, "TriggerReconciled", `Trigger reconciled: "testnamespace/test-trigger"`) triggerFinalizedEvent = Eventf(corev1.EventTypeNormal, "TriggerFinalized", `Trigger finalized: "testnamespace/test-trigger"`) topicCreatedEvent = Eventf(corev1.EventTypeNormal, "TopicCreated", `Created PubSub topic "cre-tgr_testnamespace_test-trigger_abc123"`) + topicDeletedEvent = Eventf(corev1.EventTypeNormal, "TopicDeleted", `Deleted PubSub topic "cre-tgr_testnamespace_test-trigger_abc123"`) + deadLetterTopicCreatedEvent = Eventf(corev1.EventTypeNormal, "TopicCreated", `Created PubSub topic "test-dead-letter-topic-id"`) subscriptionCreatedEvent = Eventf(corev1.EventTypeNormal, "SubscriptionCreated", `Created PubSub subscription "cre-tgr_testnamespace_test-trigger_abc123"`) + subscriptionDeletedEvent = Eventf(corev1.EventTypeNormal, "SubscriptionDeleted", `Deleted PubSub subscription "cre-tgr_testnamespace_test-trigger_abc123"`) subscriberAPIVersion = fmt.Sprintf("%s/%s", subscriberGroup, subscriberVersion) subscriberGVK = metav1.GroupVersionKind{ Group: subscriberGroup, Version: subscriberVersion, Kind: subscriberKind, } - retryPolicy = &pubsub.RetryPolicy{ - MaximumBackoff: time.Minute, - MinimumBackoff: time.Second, + brokerDeliverySpec = &eventingduckv1beta1.DeliverySpec{ + BackoffDelay: &backoffDelay, + BackoffPolicy: &backoffPolicy, + Retry: &retry, + DeadLetterSink: &duckv1.Destination{ + URI: &apis.URL{ + Scheme: "pubsub", + Host: deadLetterTopicID, + }, + }, } ) @@ -133,8 +151,8 @@ func TestAllCasesTrigger(t *testing.T) { WithTriggerSetDefaults), }, WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "TopicDeleted", `Deleted PubSub topic "cre-tgr_testnamespace_test-trigger_abc123"`), - Eventf(corev1.EventTypeNormal, "SubscriptionDeleted", `Deleted PubSub subscription "cre-tgr_testnamespace_test-trigger_abc123"`), + topicDeletedEvent, + subscriptionDeletedEvent, triggerFinalizerUpdatedEvent, triggerFinalizedEvent, }, @@ -159,8 +177,8 @@ func TestAllCasesTrigger(t *testing.T) { ), }, WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "TopicDeleted", `Deleted PubSub topic "cre-tgr_testnamespace_test-trigger_abc123"`), - Eventf(corev1.EventTypeNormal, "SubscriptionDeleted", `Deleted PubSub subscription "cre-tgr_testnamespace_test-trigger_abc123"`), + topicDeletedEvent, + subscriptionDeletedEvent, triggerFinalizedEvent, }, OtherTestData: map[string]interface{}{ @@ -185,8 +203,8 @@ func TestAllCasesTrigger(t *testing.T) { ), }, WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "TopicDeleted", `Deleted PubSub topic "cre-tgr_testnamespace_test-trigger_abc123"`), - Eventf(corev1.EventTypeNormal, "SubscriptionDeleted", `Deleted PubSub subscription "cre-tgr_testnamespace_test-trigger_abc123"`), + topicDeletedEvent, + subscriptionDeletedEvent, triggerFinalizedEvent, }, OtherTestData: map[string]interface{}{ @@ -211,8 +229,8 @@ func TestAllCasesTrigger(t *testing.T) { ), }, WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "TopicDeleted", `Deleted PubSub topic "cre-tgr_testnamespace_test-trigger_abc123"`), - Eventf(corev1.EventTypeNormal, "SubscriptionDeleted", `Deleted PubSub subscription "cre-tgr_testnamespace_test-trigger_abc123"`), + topicDeletedEvent, + subscriptionDeletedEvent, triggerFinalizedEvent, }, OtherTestData: map[string]interface{}{ @@ -262,6 +280,11 @@ func TestAllCasesTrigger(t *testing.T) { PostConditions: []func(*testing.T, *TableRow){ OnlyTopics("cre-tgr_testnamespace_test-trigger_abc123"), OnlySubscriptions("cre-tgr_testnamespace_test-trigger_abc123"), + SubscriptionHasRetryPolicy("cre-tgr_testnamespace_test-trigger_abc123", + &pubsub.RetryPolicy{ + MaximumBackoff: 600 * time.Second, + MinimumBackoff: time.Second, + }), }, }, { @@ -306,6 +329,7 @@ func TestAllCasesTrigger(t *testing.T) { WithInitBrokerConditions, WithBrokerReady("url"), WithBrokerSetDefaults, + WithBrokerDeliverySpec(brokerDeliverySpec), ), makeSubscriberAddressableAsUnstructured(), NewTrigger(triggerName, testNS, brokerName, @@ -335,14 +359,23 @@ func TestAllCasesTrigger(t *testing.T) { WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, triggerName, finalizerName), }, - OtherTestData: map[string]interface{}{}, + OtherTestData: map[string]interface{}{ + "pre": []PubsubAction{ + Topic("test-dead-letter-topic-id"), + }, + }, PostConditions: []func(*testing.T, *TableRow){ - OnlyTopics("cre-tgr_testnamespace_test-trigger_abc123"), + OnlyTopics("cre-tgr_testnamespace_test-trigger_abc123", "test-dead-letter-topic-id"), OnlySubscriptions("cre-tgr_testnamespace_test-trigger_abc123"), SubscriptionHasRetryPolicy("cre-tgr_testnamespace_test-trigger_abc123", &pubsub.RetryPolicy{ - MaximumBackoff: time.Minute, - MinimumBackoff: time.Second, + MaximumBackoff: 5 * time.Second, + MinimumBackoff: 5 * time.Second, + }), + SubscriptionHasDeadLetterPolicy("cre-tgr_testnamespace_test-trigger_abc123", + &pubsub.DeadLetterPolicy{ + MaxDeliveryAttempts: 3, + DeadLetterTopic: "projects/test-project-id/topics/test-dead-letter-topic-id", }), }, }, @@ -375,7 +408,6 @@ func TestAllCasesTrigger(t *testing.T) { uriResolver: resolver.NewURIResolver(ctx, func(types.NamespacedName) {}), projectID: testProject, pubsubClient: psclient, - retryPolicy: retryPolicy, } return triggerreconciler.NewReconciler(ctx, r.Logger, r.RunClientSet, listers.GetTriggerLister(), r.Recorder, r, withAgentAndFinalizer(nil)) diff --git a/vendor/modules.txt b/vendor/modules.txt index 4e9fe0022e..de769f9aae 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -296,6 +296,7 @@ github.com/prometheus/procfs/internal/util github.com/prometheus/statsd_exporter/pkg/mapper github.com/prometheus/statsd_exporter/pkg/mapper/fsm # github.com/rickb777/date v1.13.0 +## explicit github.com/rickb777/date/period # github.com/rickb777/plural v1.2.1 github.com/rickb777/plural