Skip to content

Commit

Permalink
make sure the mtchannelbasedbroker uses trigger.spec.delivery (#5267)
Browse files Browse the repository at this point in the history
* make sure the mtchannelbasedbroker uses trigger.spec.delivery

* lint
  • Loading branch information
Scott Nichols authored Apr 15, 2021
1 parent 71153e6 commit e745733
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 3 deletions.
8 changes: 7 additions & 1 deletion pkg/reconciler/broker/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,13 @@ func (r *Reconciler) subscribeToBrokerChannel(ctx context.Context, b *eventingv1
Name: b.Name,
Namespace: b.Namespace,
}
expected := resources.NewSubscription(t, brokerTrigger, brokerObjRef, uri, b.Spec.Delivery)

delivery := t.Spec.Delivery
if delivery == nil {
delivery = b.Spec.Delivery
}

expected := resources.NewSubscription(t, brokerTrigger, brokerObjRef, uri, delivery)

sub, err := r.subscriptionLister.Subscriptions(t.Namespace).Get(expected.Name)
// If the resource doesn't exist, we'll create it.
Expand Down
87 changes: 85 additions & 2 deletions pkg/reconciler/broker/trigger/trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@ import (
"testing"

cloudevents "github.com/cloudevents/sdk-go/v2"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"

clientgotesting "k8s.io/client-go/testing"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
v1 "knative.dev/eventing/pkg/apis/duck/v1"
Expand All @@ -53,6 +51,7 @@ import (
fakedynamicclient "knative.dev/pkg/injection/clients/dynamicclient/fake"
logtesting "knative.dev/pkg/logging/testing"
"knative.dev/pkg/network"
"knative.dev/pkg/ptr"
"knative.dev/pkg/resolver"

_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake"
Expand Down Expand Up @@ -246,6 +245,70 @@ func TestReconcile(t *testing.T) {
WithTriggerSubscribedUnknown("SubscriptionNotConfigured", "Subscription has not yet been reconciled."),
WithTriggerStatusSubscriberURI(subscriberURI)),
}},
}, {
Name: "Creates subscription with retry from trigger",
Key: testKey,
Objects: []runtime.Object{
NewBroker(brokerName, testNS,
WithBrokerClass(eventing.MTChannelBrokerClassValue),
WithBrokerConfig(config()),
WithInitBrokerConditions,
WithBrokerReady,
WithChannelAddressAnnotation(triggerChannelURL),
WithChannelAPIVersionAnnotation(triggerChannelAPIVersion),
WithChannelKindAnnotation(triggerChannelKind),
WithChannelNameAnnotation(triggerChannelName)),
NewTrigger(triggerName, testNS, brokerName,
WithTriggerUID(triggerUID),
WithTriggerSubscriberURI(subscriberURI),
WithTriggerRetry(5, nil, nil)),
},
WantCreates: []runtime.Object{
resources.NewSubscription(makeTrigger(testNS), createTriggerChannelRef(), makeBrokerRef(), makeServiceURI(), makeDelivery(nil, "", ptr.Int32(5), nil, nil)),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewTrigger(triggerName, testNS, brokerName,
WithTriggerUID(triggerUID),
WithTriggerSubscriberURI(subscriberURI),
WithTriggerRetry(5, nil, nil),
WithTriggerBrokerReady(),
WithTriggerDependencyReady(),
WithTriggerSubscriberResolvedSucceeded(),
WithTriggerSubscribedUnknown("SubscriptionNotConfigured", "Subscription has not yet been reconciled."),
WithTriggerStatusSubscriberURI(subscriberURI)),
}},
}, {
Name: "Creates subscription with dlq from trigger",
Key: testKey,
Objects: []runtime.Object{
NewBroker(brokerName, testNS,
WithBrokerClass(eventing.MTChannelBrokerClassValue),
WithBrokerConfig(config()),
WithInitBrokerConditions,
WithBrokerReady,
WithChannelAddressAnnotation(triggerChannelURL),
WithChannelAPIVersionAnnotation(triggerChannelAPIVersion),
WithChannelKindAnnotation(triggerChannelKind),
WithChannelNameAnnotation(triggerChannelName)),
NewTrigger(triggerName, testNS, brokerName,
WithTriggerUID(triggerUID),
WithTriggerSubscriberURI(subscriberURI),
WithTriggerDeadLeaderSink(nil, "http://example.com")),
},
WantCreates: []runtime.Object{
resources.NewSubscription(makeTrigger(testNS), createTriggerChannelRef(), makeBrokerRef(), makeServiceURI(), makeDelivery(nil, "http://example.com", nil, nil, nil)),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewTrigger(triggerName, testNS, brokerName,
WithTriggerUID(triggerUID),
WithTriggerSubscriberURI(subscriberURI),
WithTriggerDeadLeaderSink(nil, "http://example.com"),
WithTriggerBrokerReady(),
WithTriggerDependencyReady(),
WithTriggerSubscriberResolvedSucceeded(),
WithTriggerSubscribedUnknown("SubscriptionNotConfigured", "Subscription has not yet been reconciled."),
WithTriggerStatusSubscriberURI(subscriberURI)),
}},
}, {
Name: "Subscription Create fails",
Key: testKey,
Expand Down Expand Up @@ -951,10 +1014,30 @@ func makeBrokerRef() *corev1.ObjectReference {
Name: brokerName,
}
}

func makeEmptyDelivery() *eventingduckv1.DeliverySpec {
return nil
}

func makeDelivery(ref *duckv1.KReference, uri string, retry *int32, backoffPolicy *v1.BackoffPolicyType, backoffDelay *string) *eventingduckv1.DeliverySpec {
ds := &v1.DeliverySpec{
Retry: retry,
BackoffPolicy: backoffPolicy,
BackoffDelay: backoffDelay,
}
if ref != nil || uri != "" {
var u *apis.URL
if uri != "" {
u, _ = apis.ParseURL(uri)
}
ds.DeadLetterSink = &duckv1.Destination{
Ref: ref,
URI: u,
}
}
return ds
}

func allBrokerObjectsReadyPlus(objs ...runtime.Object) []runtime.Object {
brokerObjs := []runtime.Object{
NewBroker(brokerName, testNS,
Expand Down
29 changes: 29 additions & 0 deletions pkg/reconciler/testing/v1/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

eventingv1 "knative.dev/eventing/pkg/apis/duck/v1"
v1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/ptr"
)

// TriggerOption enables further configuration of a Trigger.
Expand Down Expand Up @@ -56,6 +58,33 @@ func WithTriggerSubscriberURI(rawurl string) TriggerOption {
}
}

func WithTriggerDeadLeaderSink(ref *duckv1.KReference, uri string) TriggerOption {
return func(t *v1.Trigger) {
if t.Spec.Delivery == nil {
t.Spec.Delivery = new(eventingv1.DeliverySpec)
}
var u *apis.URL
if uri != "" {
u, _ = apis.ParseURL(uri)
}
t.Spec.Delivery.DeadLetterSink = &duckv1.Destination{
Ref: ref,
URI: u,
}
}
}

func WithTriggerRetry(count int32, backoffPolicy *eventingv1.BackoffPolicyType, backoffDelay *string) TriggerOption {
return func(t *v1.Trigger) {
if t.Spec.Delivery == nil {
t.Spec.Delivery = new(eventingv1.DeliverySpec)
}
t.Spec.Delivery.Retry = ptr.Int32(count)
t.Spec.Delivery.BackoffPolicy = backoffPolicy
t.Spec.Delivery.BackoffDelay = backoffDelay
}
}

func WithTriggerSubscriberRef(gvk metav1.GroupVersionKind, name, namespace string) TriggerOption {
return func(t *v1.Trigger) {
t.Spec.Subscriber = duckv1.Destination{
Expand Down

0 comments on commit e745733

Please sign in to comment.