Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Commit

Permalink
Implement the Broker delivery spec (#1536)
Browse files Browse the repository at this point in the history
* initial commit; contains some random typo fixes; initial broker delivery spec implementation; adjusted trigger tests

* don't create the dead letter topic if it doesn't already exist; remove accidentally committed file

* update-deps

* added default maximum backoff

* fixed test to use default maximum backoff

* adjust webhook to sync broker SetDefaults; deal with nil delivery spec

* addressing some PR comments

* address comments and add validation tests for the dead letter sink format

* added docs

* addressed PR comments

* forgot to fix dependencies
  • Loading branch information
tommyreddad authored Aug 7, 2020
1 parent fbe6c1e commit e768a6e
Show file tree
Hide file tree
Showing 15 changed files with 322 additions and 49 deletions.
4 changes: 4 additions & 0 deletions cmd/webhook/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"log"

brokerv1beta1 "github.com/google/knative-gcp/pkg/apis/broker/v1beta1"
"github.com/google/knative-gcp/pkg/apis/configs/gcpauth"
"github.com/google/knative-gcp/pkg/apis/events"
eventsv1 "github.com/google/knative-gcp/pkg/apis/events/v1"
Expand Down Expand Up @@ -52,6 +53,9 @@ import (
)

var types = map[schema.GroupVersionKind]resourcesemantics.GenericCRD{
// For group eventing.knative.dev.
brokerv1beta1.SchemeGroupVersion.WithKind("Broker"): &brokerv1beta1.Broker{},

// For group messaging.cloud.google.com.
messagingv1alpha1.SchemeGroupVersion.WithKind("Channel"): &messagingv1alpha1.Channel{},
messagingv1beta1.SchemeGroupVersion.WithKind("Channel"): &messagingv1beta1.Channel{},
Expand Down
27 changes: 27 additions & 0 deletions docs/spec/delivery.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Delivery Implementation

This document outlines how GCP Brokers handle the translation from the
[Knative Eventing delivery specification](https://github.com/knative/eventing/tree/master/docs/delivery)
to the [Cloud Pub/Sub subscription configuration](https://pkg.go.dev/cloud.google.com/go/pubsub?tab=doc#SubscriptionConfig).

The Knative Eventing delivery specification allows for the configuration of a
backoff retry policy and a dead letter policy.

## Dead Letter Policy

A Pub/Sub subscription has its dead letter policy configured through the subscription configuration member `DeadLetterPolicy`.

The Knative dead letter policy is specified through the following parameters in the Knative Eventing delivery spec:
* `DeadLetterSink`: We only allow special URLs as the dead letter sink, of the form `pubsub://[dead_letter_sink_topic]`. We assume that if a topic is specified, it already exists.
* `Retry`: This is the number of delivery attempts until the event is forwarded to the dead letter topic. Mapped to the Pub/Sub dead letter policy's `MaxDeliveryAttempts`.

## Retry Policy

A Pub/Sub subscription has its backoff retry policy configured through the subscription configuration member `RetryPolicy`.

Pub/Sub supports an [exponential backoff](https://en.wikipedia.org/wiki/Exponential_backoff) retry policy based on the specified minimum backoff delay, and capped to some specified maximum backoff delay.

* `BackoffDelay`: This is mapped to the retry policy's `MinimumBackoff`.
* `BackoffPolicy`: There are two options for the backoff policy:
* `linear`: In this case, the retry policy's `MaximumBackoff` is set to be equal to the `MinimumBackoff`, which sets the delay between each retry to be equal.
* `exponential`: In this case, the retry policy's `MaximumBackoff` is set to 600 seconds, which is the largest value allowed by Pub/Sub.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/google/wire v0.4.0
github.com/googleapis/gax-go/v2 v2.0.5
github.com/kelseyhightower/envconfig v1.4.0
github.com/rickb777/date v1.13.0
go.opencensus.io v0.22.5-0.20200714042313-af30f77c5f65
go.opentelemetry.io/otel v0.3.0 // indirect
go.uber.org/multierr v1.5.0
Expand Down
31 changes: 29 additions & 2 deletions pkg/apis/broker/v1beta1/broker_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,37 @@ package v1beta1

import (
"context"

eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1"
)

var (
// DefaultBackoffDelay is the default backoff delay used in the backoff retry policy
// for the Broker delivery spec.
DefaultBackoffDelay = "PT1S"
// DefaultBackoffPolicy is the default backoff policy type used in the backoff retry
// policy for the Broker delivery spec.
DefaultBackoffPolicy = eventingduckv1beta1.BackoffPolicyExponential
// DefaultRetry is the default number of maximum delivery attempts for unacked messages
// before they are sent to a dead letter topic in the Broker delivery spec, in
// case a dead letter topic is specified. Without a dead letter topic specified,
// the retry count is infinite.
DefaultRetry int32 = 6
)

// SetDefaults sets the default field values for a Broker.
func (b *Broker) SetDefaults(ctx context.Context) {
// The Google Cloud Broker doesn't have any custom defaults. The
// eventing webhook will add the usual defaults.
// Set the default delivery spec.
if b.Spec.Delivery == nil {
b.Spec.Delivery = &eventingduckv1beta1.DeliverySpec{}
}
if b.Spec.Delivery.BackoffPolicy == nil &&
b.Spec.Delivery.BackoffDelay == nil {
b.Spec.Delivery.BackoffPolicy = &DefaultBackoffPolicy
b.Spec.Delivery.BackoffDelay = &DefaultBackoffDelay
}
if b.Spec.Delivery.Retry == nil && b.Spec.Delivery.DeadLetterSink != nil {
b.Spec.Delivery.Retry = &DefaultRetry
}
// Besides this, the eventing webhook will add the usual defaults.
}
4 changes: 2 additions & 2 deletions pkg/apis/broker/v1beta1/broker_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ func (bs *BrokerStatus) SetAddress(url *apis.URL) {
}
}

func (bs *BrokerStatus) MarkBrokerCelllUnknown(reason, format string, args ...interface{}) {
func (bs *BrokerStatus) MarkBrokerCellUnknown(reason, format string, args ...interface{}) {
brokerCondSet.Manage(bs).MarkUnknown(BrokerConditionBrokerCell, reason, format, args...)
}

func (bs *BrokerStatus) MarkBrokerCelllFailed(reason, format string, args ...interface{}) {
func (bs *BrokerStatus) MarkBrokerCellFailed(reason, format string, args ...interface{}) {
brokerCondSet.Manage(bs).MarkFalse(BrokerConditionBrokerCell, reason, format, args...)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/broker/v1beta1/broker_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,9 @@ func TestBrokerConditionStatus(t *testing.T) {
if test.brokerCellStatus == corev1.ConditionTrue {
bs.MarkBrokerCellReady()
} else if test.brokerCellStatus == corev1.ConditionFalse {
bs.MarkBrokerCelllFailed("Unable to create brokercell", "induced failure")
bs.MarkBrokerCellFailed("Unable to create brokercell", "induced failure")
} else {
bs.MarkBrokerCelllUnknown("Unable to create brokercell", "induced unknown")
bs.MarkBrokerCellUnknown("Unable to create brokercell", "induced unknown")
}
if test.subscriptionStatus == corev1.ConditionTrue {
bs.MarkSubscriptionReady()
Expand Down
33 changes: 31 additions & 2 deletions pkg/apis/broker/v1beta1/broker_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,41 @@ package v1beta1
import (
"context"

eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
)

// Validate verifies that the Broker is valid.
func (b *Broker) Validate(ctx context.Context) *apis.FieldError {
// The Google Cloud Broker doesn't have any custom validations. The
// eventing webhook will run the usual validations.
// We validate the GCP Broker's delivery spec. The eventing webhook will run
// the other usual validations.
return ValidateDeliverySpec(ctx, b.Spec.Delivery).ViaField("spec", "delivery")
}

func ValidateDeliverySpec(ctx context.Context, spec *eventingduckv1beta1.DeliverySpec) *apis.FieldError {
if spec == nil {
return nil
}
return ValidateDeadLetterSink(ctx, spec.DeadLetterSink).ViaField("deadLetterSink")
}

func ValidateDeadLetterSink(ctx context.Context, sink *duckv1.Destination) *apis.FieldError {
if sink == nil {
return nil
}
if sink.URI == nil {
return apis.ErrMissingField("uri")
}
if scheme := sink.URI.Scheme; scheme != "pubsub" {
return apis.ErrInvalidValue("Dead letter sink URI scheme should be pubsub", "uri")
}
topicID := sink.URI.Host
if topicID == "" {
return apis.ErrInvalidValue("Dead letter topic must not be empty", "uri")
}
if len(topicID) > 255 {
return apis.ErrInvalidValue("Dead letter topic maximum length is 255 characters", "uri")
}
return nil
}
74 changes: 74 additions & 0 deletions pkg/apis/broker/v1beta1/broker_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@ package v1beta1

import (
"context"
"strings"
"testing"

"github.com/google/go-cmp/cmp"
eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
)

func TestBroker_Validate(t *testing.T) {
Expand All @@ -27,3 +33,71 @@ func TestBroker_Validate(t *testing.T) {
t.Errorf("expected nil, got %v", err)
}
}

func TestDeliverySpec_Validate(t *testing.T) {
ds := &eventingduckv1beta1.DeliverySpec{}
if err := ValidateDeliverySpec(context.TODO(), ds); err != nil {
t.Errorf("expected nil, got %v", err)
}
}

func TestDeadLetterSink_Validate(t *testing.T) {
tests := []struct {
name string
deadLetterSink *duckv1.Destination
want *apis.FieldError
}{{
name: "invalid dead letter sink missing uri",
deadLetterSink: &duckv1.Destination{},
want: apis.ErrMissingField("uri"),
}, {
name: "invalid dead letter sink uri scheme",
deadLetterSink: &duckv1.Destination{
URI: &apis.URL{
Scheme: "http",
Host: "test-topic-id",
Path: "/",
},
},
want: apis.ErrInvalidValue("Dead letter sink URI scheme should be pubsub", "uri"),
}, {
name: "invalid empty dead letter topic id",
deadLetterSink: &duckv1.Destination{
URI: &apis.URL{
Scheme: "pubsub",
Host: "",
Path: "/",
},
},
want: apis.ErrInvalidValue("Dead letter topic must not be empty", "uri"),
}, {
name: "invalid dead letter topic id too long",
deadLetterSink: &duckv1.Destination{
URI: &apis.URL{
Scheme: "pubsub",
Host: strings.Repeat("x", 256),
Path: "/",
},
},
want: apis.ErrInvalidValue("Dead letter topic maximum length is 255 characters", "uri"),
}, {
name: "valid dead letter topic",
deadLetterSink: &duckv1.Destination{
URI: &apis.URL{
Scheme: "pubsub",
Host: "test-topic-id",
Path: "/",
},
},
}}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got := ValidateDeadLetterSink(context.Background(), test.deadLetterSink)
//got := test.spec.Validate(context.Background())
if diff := cmp.Diff(test.want.Error(), got.Error()); diff != "" {
t.Errorf("ValidateDeadLetterSink (-want, +got) = %v", diff)
}
})
}
}
8 changes: 4 additions & 4 deletions pkg/reconciler/broker/create_brokercell.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (r *Reconciler) ensureBrokerCellExists(ctx context.Context, b *brokerv1beta

if err != nil && !apierrs.IsNotFound(err) {
logging.FromContext(ctx).Error("Error reconciling brokercell", zap.String("namespace", b.Namespace), zap.String("broker", b.Name), zap.Error(err))
b.Status.MarkBrokerCelllUnknown("BrokerCellUnknown", "Failed to get brokercell %s/%s", bc.Namespace, bc.Name)
b.Status.MarkBrokerCellUnknown("BrokerCellUnknown", "Failed to get brokercell %s/%s", bc.Namespace, bc.Name)
return err
}

Expand All @@ -53,7 +53,7 @@ func (r *Reconciler) ensureBrokerCellExists(ctx context.Context, b *brokerv1beta
bc, err = r.RunClientSet.InternalV1alpha1().BrokerCells(want.Namespace).Create(want)
if err != nil && !apierrs.IsAlreadyExists(err) {
logging.FromContext(ctx).Error("Error creating brokercell", zap.String("namespace", b.Namespace), zap.String("broker", b.Name), zap.Error(err))
b.Status.MarkBrokerCelllFailed("BrokerCellCreationFailed", "Failed to create %s/%s", want.Namespace, want.Name)
b.Status.MarkBrokerCellFailed("BrokerCellCreationFailed", "Failed to create %s/%s", want.Namespace, want.Name)
return err
}
if apierrs.IsAlreadyExists(err) {
Expand All @@ -63,7 +63,7 @@ func (r *Reconciler) ensureBrokerCellExists(ctx context.Context, b *brokerv1beta
bc, err = r.RunClientSet.InternalV1alpha1().BrokerCells(want.Namespace).Get(want.Name, metav1.GetOptions{})
if err != nil {
logging.FromContext(ctx).Error("Failed to get the brokercell from the API server", zap.String("namespace", b.Namespace), zap.String("broker", b.Name), zap.Error(err))
b.Status.MarkBrokerCelllUnknown("BrokerCellUnknown", "Failed to get the brokercell from the API server %s/%s", want.Namespace, want.Name)
b.Status.MarkBrokerCellUnknown("BrokerCellUnknown", "Failed to get the brokercell from the API server %s/%s", want.Namespace, want.Name)
return err
}
}
Expand All @@ -75,7 +75,7 @@ func (r *Reconciler) ensureBrokerCellExists(ctx context.Context, b *brokerv1beta
if bc.Status.IsReady() {
b.Status.MarkBrokerCellReady()
} else {
b.Status.MarkBrokerCelllUnknown("BrokerCellNotReady", "Brokercell %s/%s is not ready", bc.Namespace, bc.Name)
b.Status.MarkBrokerCellUnknown("BrokerCellNotReady", "Brokercell %s/%s is not ready", bc.Namespace, bc.Name)
}

//TODO(#1019) Use the IngressTemplate of brokercell.
Expand Down
12 changes: 10 additions & 2 deletions pkg/reconciler/testing/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
brokerv1beta1 "github.com/google/knative-gcp/pkg/apis/broker/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1"
eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1"
"knative.dev/pkg/apis"
)
Expand Down Expand Up @@ -123,13 +124,13 @@ func WithBrokerReadyURI(address *apis.URL) BrokerOption {

func WithBrokerBrokerCellFailed(reason, msg string) BrokerOption {
return func(b *brokerv1beta1.Broker) {
b.Status.MarkBrokerCelllFailed(reason, msg)
b.Status.MarkBrokerCellFailed(reason, msg)
}
}

func WithBrokerBrokerCellUnknown(reason, msg string) BrokerOption {
return func(b *brokerv1beta1.Broker) {
b.Status.MarkBrokerCelllUnknown(reason, msg)
b.Status.MarkBrokerCellUnknown(reason, msg)
}
}

Expand Down Expand Up @@ -159,3 +160,10 @@ func WithBrokerClass(bc string) BrokerOption {
func WithBrokerSetDefaults(b *brokerv1beta1.Broker) {
b.SetDefaults(context.Background())
}

// WithBrokerDeliverySpec sets the Broker's delivery spec.
func WithBrokerDeliverySpec(deliverySpec *eventingduckv1beta1.DeliverySpec) BrokerOption {
return func(b *brokerv1beta1.Broker) {
b.Spec.Delivery = deliverySpec
}
}
14 changes: 14 additions & 0 deletions pkg/reconciler/testing/pstest.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,20 @@ func SubscriptionHasRetryPolicy(id string, wantPolicy *pubsub.RetryPolicy) func(
}
}

func SubscriptionHasDeadLetterPolicy(id string, wantPolicy *pubsub.DeadLetterPolicy) func(*testing.T, *rtesting.TableRow) {
return func(t *testing.T, r *rtesting.TableRow) {
c := getPubsubClient(r)
sub := c.Subscription(id)
cfg, err := sub.Config(context.Background())
if err != nil {
t.Errorf("Error getting pubsub config: %v", err)
}
if diff := cmp.Diff(wantPolicy, cfg.DeadLetterPolicy); diff != "" {
t.Errorf("Pubsub config dead letter policy (-want,+got): %v", diff)
}
}
}

func OnlySubscriptions(ids ...string) func(*testing.T, *rtesting.TableRow) {
return func(t *testing.T, r *rtesting.TableRow) {
c := getPubsubClient(r)
Expand Down
11 changes: 1 addition & 10 deletions pkg/reconciler/trigger/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package trigger

import (
"context"
"time"

"cloud.google.com/go/pubsub"
"github.com/kelseyhightower/envconfig"
Expand Down Expand Up @@ -60,9 +59,6 @@ var filterBroker = pkgreconciler.AnnotationFilterFunc(eventingv1beta1.BrokerClas

type envConfig struct {
ProjectID string `envconfig:"PROJECT_ID"`

MinRetryBackoff time.Duration `envconfig:"MIN_RETRY_BACKOFF" default:"1s"`
MaxRetryBackoff time.Duration `envconfig:"MAX_RETRY_BACKOFF" default:"1m"`
}

func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
Expand All @@ -79,11 +75,7 @@ func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl
if err != nil {
logging.FromContext(ctx).Error("Failed to get project ID", zap.Error(err))
}
// Set up the pub/sub retry policy.
retryPolicy := &pubsub.RetryPolicy{
MaximumBackoff: env.MaxRetryBackoff,
MinimumBackoff: env.MinRetryBackoff,
}

// Attempt to create a pubsub client for all worker threads to use. If this
// fails, pass a nil value to the Reconciler. They will attempt to
// create a client on reconcile.
Expand All @@ -104,7 +96,6 @@ func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl
brokerLister: brokerinformer.Get(ctx).Lister(),
pubsubClient: client,
projectID: projectID,
retryPolicy: retryPolicy,
}

impl := triggerreconciler.NewImpl(ctx, r, withAgentAndFinalizer)
Expand Down
Loading

0 comments on commit e768a6e

Please sign in to comment.