Skip to content

Commit

Permalink
Propagate the entire delivery spec to the Channel (#4042) (#4044)
Browse files Browse the repository at this point in the history
* Propagate the entire delivery spec to the Channel

The subscription reconciler didn't propagate the entire `deliverySpec`
to the channel, only the `deadLetterSink` was propagated.

Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>

* Apply suggestion

Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
  • Loading branch information
pierDipi authored Sep 14, 2020
1 parent 4051c5d commit 34de810
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 33 deletions.
37 changes: 24 additions & 13 deletions pkg/reconciler/subscription/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ func (r *Reconciler) updateChannelAddSubscriptionV1Alpha1(ctx context.Context, c
SubscriberURI: sub.Status.PhysicalSubscription.SubscriberURI,
ReplyURI: sub.Status.PhysicalSubscription.ReplyURI,
DeadLetterSinkURI: sub.Status.PhysicalSubscription.DeadLetterSinkURI,
Delivery: deliverySpec(sub),
}},
}
return
Expand All @@ -536,6 +537,7 @@ func (r *Reconciler) updateChannelAddSubscriptionV1Alpha1(ctx context.Context, c
channel.Spec.Subscribable.Subscribers[i].SubscriberURI = sub.Status.PhysicalSubscription.SubscriberURI
channel.Spec.Subscribable.Subscribers[i].ReplyURI = sub.Status.PhysicalSubscription.ReplyURI
channel.Spec.Subscribable.Subscribers[i].DeadLetterSinkURI = sub.Status.PhysicalSubscription.DeadLetterSinkURI
channel.Spec.Subscribable.Subscribers[i].Delivery = deliverySpec(sub)
return
}
}
Expand All @@ -548,6 +550,7 @@ func (r *Reconciler) updateChannelAddSubscriptionV1Alpha1(ctx context.Context, c
SubscriberURI: sub.Status.PhysicalSubscription.SubscriberURI,
ReplyURI: sub.Status.PhysicalSubscription.ReplyURI,
DeadLetterSinkURI: sub.Status.PhysicalSubscription.DeadLetterSinkURI,
Delivery: deliverySpec(sub),
})
}

Expand All @@ -558,15 +561,7 @@ func (r *Reconciler) updateChannelAddSubscriptionV1Beta1(ctx context.Context, ch
channel.Spec.Subscribers[i].Generation = sub.Generation
channel.Spec.Subscribers[i].SubscriberURI = sub.Status.PhysicalSubscription.SubscriberURI
channel.Spec.Subscribers[i].ReplyURI = sub.Status.PhysicalSubscription.ReplyURI
// Only set the deadletter sink if it's not nil. Otherwise we'll just end up patching
// empty delivery in there.
if sub.Status.PhysicalSubscription.DeadLetterSinkURI != nil {
channel.Spec.Subscribers[i].Delivery = &eventingduckv1beta1.DeliverySpec{
DeadLetterSink: &duckv1.Destination{
URI: sub.Status.PhysicalSubscription.DeadLetterSinkURI,
},
}
}
channel.Spec.Subscribers[i].Delivery = deliverySpec(sub)
return
}
}
Expand All @@ -576,17 +571,33 @@ func (r *Reconciler) updateChannelAddSubscriptionV1Beta1(ctx context.Context, ch
Generation: sub.Generation,
SubscriberURI: sub.Status.PhysicalSubscription.SubscriberURI,
ReplyURI: sub.Status.PhysicalSubscription.ReplyURI,
Delivery: deliverySpec(sub),
}

// Must not have been found. Add it.
channel.Spec.Subscribers = append(channel.Spec.Subscribers, toAdd)
}

func deliverySpec(sub *v1beta1.Subscription) *eventingduckv1beta1.DeliverySpec {

var delivery *eventingduckv1beta1.DeliverySpec

// Only set the deadletter sink if it's not nil. Otherwise we'll just end up patching
// empty delivery in there.
if sub.Status.PhysicalSubscription.DeadLetterSinkURI != nil {
toAdd.Delivery = &eventingduckv1beta1.DeliverySpec{
delivery = &eventingduckv1beta1.DeliverySpec{
DeadLetterSink: &duckv1.Destination{
URI: sub.Status.PhysicalSubscription.DeadLetterSinkURI,
},
}
}

// Must not have been found. Add it.
channel.Spec.Subscribers = append(channel.Spec.Subscribers, toAdd)
if sub.Spec.Delivery != nil && (sub.Spec.Delivery.BackoffDelay != nil || sub.Spec.Delivery.Retry != nil || sub.Spec.Delivery.BackoffPolicy != nil) {
if delivery == nil {
delivery = &eventingduckv1beta1.DeliverySpec{}
}
delivery.BackoffPolicy = (*eventingduckv1beta1.BackoffPolicyType)(sub.Spec.Delivery.BackoffPolicy)
delivery.Retry = sub.Spec.Delivery.Retry
delivery.BackoffDelay = sub.Spec.Delivery.BackoffDelay
}
return delivery
}
111 changes: 91 additions & 20 deletions pkg/reconciler/subscription/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ import (
"fmt"
"testing"

"k8s.io/utils/pointer"
"knative.dev/pkg/injection/clients/dynamicclient"

eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1"

eventingclient "knative.dev/eventing/pkg/client/injection/client"
"knative.dev/pkg/injection/clients/dynamicclient"

corev1 "k8s.io/api/core/v1"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
Expand All @@ -33,14 +36,6 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
clientgotesting "k8s.io/client-go/testing"
eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
eventingduck "knative.dev/eventing/pkg/apis/duck/v1beta1"
messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1"
"knative.dev/eventing/pkg/client/injection/ducks/duck/v1alpha1/channelable"
"knative.dev/eventing/pkg/client/injection/ducks/duck/v1alpha1/channelablecombined"
"knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1beta1/subscription"
"knative.dev/eventing/pkg/duck"
"knative.dev/eventing/pkg/utils"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
duckv1alpha1 "knative.dev/pkg/apis/duck/v1alpha1"
Expand All @@ -50,10 +45,22 @@ import (
logtesting "knative.dev/pkg/logging/testing"
"knative.dev/pkg/resolver"

eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
eventingduck "knative.dev/eventing/pkg/apis/duck/v1beta1"
messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1"
"knative.dev/eventing/pkg/client/injection/ducks/duck/v1alpha1/channelable"
"knative.dev/eventing/pkg/client/injection/ducks/duck/v1alpha1/channelablecombined"
"knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1beta1/subscription"
"knative.dev/eventing/pkg/duck"
"knative.dev/eventing/pkg/utils"

. "knative.dev/pkg/reconciler/testing"

_ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/channel/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/inmemorychannel/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1beta1/channel/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1beta1/inmemorychannel/fake"
. "knative.dev/eventing/pkg/reconciler/testing/v1beta1"
. "knative.dev/pkg/reconciler/testing"
)

const (
Expand All @@ -67,6 +74,8 @@ const (
subscriptionName = "testsubscription"
testNS = "testnamespace"
subscriptionGeneration = 1

finalizerName = "subscriptions.messaging.knative.dev"
)

// subscriptions have: channel -> SUB -> subscriber -viaSub-> reply
Expand Down Expand Up @@ -147,6 +156,8 @@ func init() {
}

func TestAllCases(t *testing.T) {
linear := eventingduck.BackoffPolicyLinear

table := TableTest{
{
Name: "bad workqueue key",
Expand Down Expand Up @@ -1222,6 +1233,39 @@ func TestAllCases(t *testing.T) {
WithChannelableReadySubscriber("a-"+subscriptionUID),
WithChannelableReadySubscriber("b-"+subscriptionUID),
),
},
},
{
Name: "v1 imc+two subscribers for a channel - update delivery - full delivery spec",
Objects: []runtime.Object{
NewSubscription("a-"+subscriptionName, testNS,
WithSubscriptionUID("a-"+subscriptionUID),
WithSubscriptionChannel(imcV1Beta1GVK, channelName),
WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS),
WithSubscriptionDeliverySpec(&eventingduck.DeliverySpec{
DeadLetterSink: &duckv1.Destination{
Ref: &duckv1.KReference{
APIVersion: subscriberGVK.Group + "/" + subscriberGVK.Version,
Kind: subscriberGVK.Kind,
Name: dlcName,
Namespace: testNS,
},
},
Retry: pointer.Int32Ptr(10),
BackoffPolicy: &linear,
BackoffDelay: pointer.StringPtr("PT1S"),
}),
),
NewUnstructured(subscriberGVK, dlcName, testNS,
WithUnstructuredAddressable(dlcDNS),
),
NewInMemoryChannel(channelName, testNS,
WithInitInMemoryChannelConditions,
WithInMemoryChannelSubscribers(nil),
WithInMemoryChannelAddress(channelDNS),
WithInMemoryChannelReadySubscriber("a-"+subscriptionUID),
WithInMemoryChannelReadySubscriber("b-"+subscriptionUID),
),
NewService(serviceName, testNS),
},
Key: testNS + "/" + "a-" + subscriptionName,
Expand All @@ -1233,24 +1277,49 @@ func TestAllCases(t *testing.T) {
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewSubscription("a-"+subscriptionName, testNS,
WithSubscriptionUID("a-"+subscriptionUID),
WithSubscriptionChannel(channelableV1Alpha1GVK, channelName),
WithSubscriptionChannel(imcV1Beta1GVK, channelName),
WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS),
// The first reconciliation will initialize the status conditions.
WithInitSubscriptionConditions,
MarkReferencesResolved,
MarkAddedToChannel,
WithSubscriptionPhysicalSubscriptionSubscriber(serviceURIWithPath),
WithSubscriptionDeliverySpec(&eventingduck.DeliverySpec{
DeadLetterSink: &duckv1.Destination{
Ref: &duckv1.KReference{
APIVersion: subscriberGVK.Group + "/" + subscriberGVK.Version,
Kind: subscriberGVK.Kind,
Name: dlcName,
Namespace: testNS,
},
},
Retry: pointer.Int32Ptr(10),
BackoffPolicy: &linear,
BackoffDelay: pointer.StringPtr("PT1S"),
}),
WithSubscriptionDeadLetterSinkURI(dlcURI),
),
}},
WantPatches: []clientgotesting.PatchActionImpl{
patchSubscribersV1Alpha1(testNS, channelName, []eventingduckv1alpha1.SubscriberSpec{
{UID: "b-" + subscriptionUID},
{UID: "a-" + subscriptionUID, SubscriberURI: serviceURIWithPath},
patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{
{
UID: "a-" + subscriptionUID,
SubscriberURI: serviceURIWithPath,
Delivery: &eventingduck.DeliverySpec{
DeadLetterSink: &duckv1.Destination{
URI: apis.HTTP("dlc.mynamespace.svc.cluster.local"),
},
Retry: pointer.Int32Ptr(10),
BackoffPolicy: &linear,
BackoffDelay: pointer.StringPtr("PT1S"),
},
},
}),
patchFinalizers(testNS, "a-"+subscriptionName),
},
}, {
Name: "v1beta1 imc+deleted - channel patch succeeded",
},
{
Name: "v1 imc+deleted - channel patch succeeded",
Objects: []runtime.Object{
NewSubscription(subscriptionName, testNS,
WithSubscriptionUID(subscriptionUID),
Expand Down Expand Up @@ -1440,6 +1509,12 @@ func patchSubscribersV1Alpha1(namespace, name string, subscribers []eventingduck
return action
}

func WithSubscriptionDeliverySpec(d *eventingduck.DeliverySpec) SubscriptionOption {
return func(v *messagingv1beta1.Subscription) {
v.Spec.Delivery = d
}
}

func patchSubscribers(namespace, name string, subscribers []eventingduck.SubscriberSpec) clientgotesting.PatchActionImpl {
action := clientgotesting.PatchActionImpl{}
action.Name = name
Expand Down Expand Up @@ -1470,10 +1545,6 @@ func patchSubscribers(namespace, name string, subscribers []eventingduck.Subscri
return action
}

const (
finalizerName = "subscriptions.messaging.knative.dev"
)

func patchFinalizers(namespace, name string) clientgotesting.PatchActionImpl {
action := clientgotesting.PatchActionImpl{}
action.Name = name
Expand Down

0 comments on commit 34de810

Please sign in to comment.