Skip to content

Commit

Permalink
Propagate the entire delivery spec to the Channel (#4042)
Browse files Browse the repository at this point in the history
* Propagate the entire delivery spec to the Channel

The subscription reconciler didn't propagate the entire `deliverySpec`
to the channel, only the `deadLetterSink` was propagated.

Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>

* Apply suggestion

Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
  • Loading branch information
pierDipi authored Sep 14, 2020
1 parent 9913fad commit 5a54e9c
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 24 deletions.
37 changes: 24 additions & 13 deletions pkg/reconciler/subscription/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ func (r *Reconciler) updateChannelAddSubscriptionV1Alpha1(ctx context.Context, c
SubscriberURI: sub.Status.PhysicalSubscription.SubscriberURI,
ReplyURI: sub.Status.PhysicalSubscription.ReplyURI,
DeadLetterSinkURI: sub.Status.PhysicalSubscription.DeadLetterSinkURI,
Delivery: deliverySpec(sub),
}},
}
return
Expand All @@ -536,6 +537,7 @@ func (r *Reconciler) updateChannelAddSubscriptionV1Alpha1(ctx context.Context, c
channel.Spec.Subscribable.Subscribers[i].SubscriberURI = sub.Status.PhysicalSubscription.SubscriberURI
channel.Spec.Subscribable.Subscribers[i].ReplyURI = sub.Status.PhysicalSubscription.ReplyURI
channel.Spec.Subscribable.Subscribers[i].DeadLetterSinkURI = sub.Status.PhysicalSubscription.DeadLetterSinkURI
channel.Spec.Subscribable.Subscribers[i].Delivery = deliverySpec(sub)
return
}
}
Expand All @@ -548,6 +550,7 @@ func (r *Reconciler) updateChannelAddSubscriptionV1Alpha1(ctx context.Context, c
SubscriberURI: sub.Status.PhysicalSubscription.SubscriberURI,
ReplyURI: sub.Status.PhysicalSubscription.ReplyURI,
DeadLetterSinkURI: sub.Status.PhysicalSubscription.DeadLetterSinkURI,
Delivery: deliverySpec(sub),
})
}

Expand All @@ -558,15 +561,7 @@ func (r *Reconciler) updateChannelAddSubscriptionV1Beta1(ctx context.Context, ch
channel.Spec.Subscribers[i].Generation = sub.Generation
channel.Spec.Subscribers[i].SubscriberURI = sub.Status.PhysicalSubscription.SubscriberURI
channel.Spec.Subscribers[i].ReplyURI = sub.Status.PhysicalSubscription.ReplyURI
// Only set the deadletter sink if it's not nil. Otherwise we'll just end up patching
// empty delivery in there.
if sub.Status.PhysicalSubscription.DeadLetterSinkURI != nil {
channel.Spec.Subscribers[i].Delivery = &eventingduckv1beta1.DeliverySpec{
DeadLetterSink: &duckv1.Destination{
URI: sub.Status.PhysicalSubscription.DeadLetterSinkURI,
},
}
}
channel.Spec.Subscribers[i].Delivery = deliverySpec(sub)
return
}
}
Expand All @@ -576,17 +571,33 @@ func (r *Reconciler) updateChannelAddSubscriptionV1Beta1(ctx context.Context, ch
Generation: sub.Generation,
SubscriberURI: sub.Status.PhysicalSubscription.SubscriberURI,
ReplyURI: sub.Status.PhysicalSubscription.ReplyURI,
Delivery: deliverySpec(sub),
}

// Must not have been found. Add it.
channel.Spec.Subscribers = append(channel.Spec.Subscribers, toAdd)
}

func deliverySpec(sub *v1.Subscription) *eventingduckv1beta1.DeliverySpec {

var delivery *eventingduckv1beta1.DeliverySpec

// Only set the deadletter sink if it's not nil. Otherwise we'll just end up patching
// empty delivery in there.
if sub.Status.PhysicalSubscription.DeadLetterSinkURI != nil {
toAdd.Delivery = &eventingduckv1beta1.DeliverySpec{
delivery = &eventingduckv1beta1.DeliverySpec{
DeadLetterSink: &duckv1.Destination{
URI: sub.Status.PhysicalSubscription.DeadLetterSinkURI,
},
}
}

// Must not have been found. Add it.
channel.Spec.Subscribers = append(channel.Spec.Subscribers, toAdd)
if sub.Spec.Delivery != nil && (sub.Spec.Delivery.BackoffDelay != nil || sub.Spec.Delivery.Retry != nil || sub.Spec.Delivery.BackoffPolicy != nil) {
if delivery == nil {
delivery = &eventingduckv1beta1.DeliverySpec{}
}
delivery.BackoffPolicy = (*eventingduckv1beta1.BackoffPolicyType)(sub.Spec.Delivery.BackoffPolicy)
delivery.Retry = sub.Spec.Delivery.Retry
delivery.BackoffDelay = sub.Spec.Delivery.BackoffDelay
}
return delivery
}
118 changes: 107 additions & 11 deletions pkg/reconciler/subscription/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import (
"fmt"
"testing"

"k8s.io/utils/pointer"
"knative.dev/pkg/injection/clients/dynamicclient"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
eventingclient "knative.dev/eventing/pkg/client/injection/client"
"knative.dev/pkg/injection/clients/dynamicclient"

corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
Expand All @@ -33,14 +35,6 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
clientgotesting "k8s.io/client-go/testing"
eventingduck "knative.dev/eventing/pkg/apis/duck/v1"
eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/client/injection/ducks/duck/v1alpha1/channelable"
"knative.dev/eventing/pkg/client/injection/ducks/duck/v1alpha1/channelablecombined"
"knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/subscription"
"knative.dev/eventing/pkg/duck"
"knative.dev/eventing/pkg/utils"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
duckv1alpha1 "knative.dev/pkg/apis/duck/v1alpha1"
Expand All @@ -50,10 +44,20 @@ import (
logtesting "knative.dev/pkg/logging/testing"
"knative.dev/pkg/resolver"

eventingduck "knative.dev/eventing/pkg/apis/duck/v1"
eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/client/injection/ducks/duck/v1alpha1/channelable"
"knative.dev/eventing/pkg/client/injection/ducks/duck/v1alpha1/channelablecombined"
"knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/subscription"
"knative.dev/eventing/pkg/duck"
"knative.dev/eventing/pkg/utils"

. "knative.dev/pkg/reconciler/testing"

_ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/channel/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/inmemorychannel/fake"
. "knative.dev/eventing/pkg/reconciler/testing/v1"
. "knative.dev/pkg/reconciler/testing"
)

const (
Expand Down Expand Up @@ -134,6 +138,8 @@ func init() {
}

func TestAllCases(t *testing.T) {
linear := eventingduck.BackoffPolicyLinear

table := TableTest{
{
Name: "bad workqueue key",
Expand Down Expand Up @@ -1001,7 +1007,91 @@ func TestAllCases(t *testing.T) {
}),
patchFinalizers(testNS, "a-"+subscriptionName),
},
}, {
},
{
Name: "v1 imc+two subscribers for a channel - update delivery - full delivery spec",
Objects: []runtime.Object{
NewSubscription("a-"+subscriptionName, testNS,
WithSubscriptionUID("a-"+subscriptionUID),
WithSubscriptionChannel(imcV1GVK, channelName),
WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS),
WithSubscriptionDeliverySpec(&eventingduck.DeliverySpec{
DeadLetterSink: &duckv1.Destination{
Ref: &duckv1.KReference{
APIVersion: subscriberGVK.Group + "/" + subscriberGVK.Version,
Kind: subscriberGVK.Kind,
Name: dlcName,
Namespace: testNS,
},
},
Retry: pointer.Int32Ptr(10),
BackoffPolicy: &linear,
BackoffDelay: pointer.StringPtr("PT1S"),
}),
),
NewUnstructured(subscriberGVK, dlcName, testNS,
WithUnstructuredAddressable(dlcDNS),
),
NewInMemoryChannel(channelName, testNS,
WithInitInMemoryChannelConditions,
WithInMemoryChannelSubscribers(nil),
WithInMemoryChannelAddress(channelDNS),
WithInMemoryChannelReadySubscriber("a-"+subscriptionUID),
WithInMemoryChannelReadySubscriber("b-"+subscriptionUID),
),
NewService(serviceName, testNS),
},
Key: testNS + "/" + "a-" + subscriptionName,
WantErr: false,
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", "a-"+subscriptionName),
Eventf(corev1.EventTypeNormal, "SubscriberSync", "Subscription was synchronized to channel %q", channelName),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewSubscription("a-"+subscriptionName, testNS,
WithSubscriptionUID("a-"+subscriptionUID),
WithSubscriptionChannel(imcV1GVK, channelName),
WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS),
// The first reconciliation will initialize the status conditions.
WithInitSubscriptionConditions,
MarkReferencesResolved,
MarkAddedToChannel,
WithSubscriptionPhysicalSubscriptionSubscriber(serviceURIWithPath),
WithSubscriptionDeliverySpec(&eventingduck.DeliverySpec{
DeadLetterSink: &duckv1.Destination{
Ref: &duckv1.KReference{
APIVersion: subscriberGVK.Group + "/" + subscriberGVK.Version,
Kind: subscriberGVK.Kind,
Name: dlcName,
Namespace: testNS,
},
},
Retry: pointer.Int32Ptr(10),
BackoffPolicy: &linear,
BackoffDelay: pointer.StringPtr("PT1S"),
}),
WithSubscriptionDeadLetterSinkURI(dlcURI),
),
}},
WantPatches: []clientgotesting.PatchActionImpl{
patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{
{
UID: "a-" + subscriptionUID,
SubscriberURI: serviceURIWithPath,
Delivery: &eventingduck.DeliverySpec{
DeadLetterSink: &duckv1.Destination{
URI: apis.HTTP("dlc.mynamespace.svc.cluster.local"),
},
Retry: pointer.Int32Ptr(10),
BackoffPolicy: &linear,
BackoffDelay: pointer.StringPtr("PT1S"),
},
},
}),
patchFinalizers(testNS, "a-"+subscriptionName),
},
},
{
Name: "v1 imc+deleted - channel patch succeeded",
Objects: []runtime.Object{
NewSubscription(subscriptionName, testNS,
Expand Down Expand Up @@ -1130,6 +1220,12 @@ func TestAllCases(t *testing.T) {
}, false, logger))
}

func WithSubscriptionDeliverySpec(d *eventingduck.DeliverySpec) SubscriptionOption {
return func(v *messagingv1.Subscription) {
v.Spec.Delivery = d
}
}

func patchSubscribers(namespace, name string, subscribers []eventingduck.SubscriberSpec) clientgotesting.PatchActionImpl {
action := clientgotesting.PatchActionImpl{}
action.Name = name
Expand Down

0 comments on commit 5a54e9c

Please sign in to comment.