From 9643baffd74761568717b28f0e7668272a3856eb Mon Sep 17 00:00:00 2001 From: Francesco Guardiani Date: Thu, 17 Dec 2020 23:12:02 +0100 Subject: [PATCH] Implement #4057 Channeleable.Delivery (#4652) * Implement #4057 Signed-off-by: Francesco Guardiani * Fix Signed-off-by: Francesco Guardiani * Retry Signed-off-by: Francesco Guardiani * Try to fix that ducktype Signed-off-by: Francesco Guardiani * Reverted yaml usage Signed-off-by: Francesco Guardiani * Now this should just works Signed-off-by: Francesco Guardiani * Now it works! Signed-off-by: Francesco Guardiani * Just some doc! Signed-off-by: Francesco Guardiani --- .../messaging/v1/channel_template_types.go | 68 ++++++- .../messaging/v1/zz_generated.deepcopy.go | 24 ++- pkg/reconciler/channel/channel_test.go | 71 +++++++- pkg/reconciler/channel/resources/channel.go | 7 +- pkg/reconciler/mtbroker/resources/channel.go | 3 +- pkg/reconciler/parallel/resources/channel.go | 3 +- pkg/reconciler/sequence/resources/channel.go | 3 +- pkg/reconciler/subscription/subscription.go | 63 ++++--- .../subscription/subscription_test.go | 168 ++++++++++++++++++ pkg/reconciler/testing/v1/channel.go | 6 + pkg/reconciler/testing/v1/inmemorychannel.go | 9 +- test/e2e/channel_dls_test.go | 4 + test/e2e/helpers/channel_dls_test_helper.go | 86 +++++++++ 13 files changed, 475 insertions(+), 40 deletions(-) diff --git a/pkg/apis/messaging/v1/channel_template_types.go b/pkg/apis/messaging/v1/channel_template_types.go index 35f42de7f10..6ef23d80942 100644 --- a/pkg/apis/messaging/v1/channel_template_types.go +++ b/pkg/apis/messaging/v1/channel_template_types.go @@ -17,8 +17,13 @@ limitations under the License. package v1 import ( + "encoding/json" + + "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + + v1 "knative.dev/eventing/pkg/apis/duck/v1" ) // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -40,8 +45,67 @@ type ChannelTemplateSpecInternal struct { // +optional metav1.ObjectMeta `json:"metadata,omitempty"` - // Spec defines the Spec to use for each channel created. Passed + // Spec includes the Channel CR ChannelableSpec and the physical channel spec. + // In order to create a new ChannelTemplateSpecInternalSpec, you must use NewChannelTemplateSpecInternalSpec + Spec *ChannelTemplateSpecInternalSpec `json:"spec,omitempty"` +} + +// ChannelTemplateSpecInternalSpec merges the "general" spec from Channel CR and the template of the physical channel spec. +// Note that this struct properly implements only Marshalling, unmarshalling doesn't work! +type ChannelTemplateSpecInternalSpec struct { + // ChannelableSpec includes the fields from the Channel Spec section + v1.ChannelableSpec + + // PhysicalChannelSpec includes the fields from the physical channel Spec. Passed // in verbatim to the Channel CRD as Spec section. // +optional - Spec *runtime.RawExtension `json:"spec,omitempty"` + PhysicalChannelSpec *runtime.RawExtension +} + +// NewChannelTemplateSpecInternalSpec creates a new ChannelTemplateSpecInternalSpec, returning nil if channelableSpec is empty and physicalChannelSpec is nil. +func NewChannelTemplateSpecInternalSpec(channelableSpec v1.ChannelableSpec, physicalChannelSpec *runtime.RawExtension) *ChannelTemplateSpecInternalSpec { + if physicalChannelSpec == nil && equality.Semantic.DeepEqual(channelableSpec, v1.ChannelableSpec{}) { + return nil + } + return &ChannelTemplateSpecInternalSpec{ + ChannelableSpec: channelableSpec, + PhysicalChannelSpec: physicalChannelSpec, + } +} + +func (s ChannelTemplateSpecInternalSpec) MarshalJSON() ([]byte, error) { + // Check if empty + if s.PhysicalChannelSpec == nil && equality.Semantic.DeepEqual(s.ChannelableSpec, v1.ChannelableSpec{}) { + return []byte{}, nil + } + + // Let's merge the channel template spec and the channelable spec from channel + channelableSpec := make(map[string]interface{}) + physicalChannelTemplateSpec := make(map[string]interface{}) + + rawChannelSpec, err := json.Marshal(s.ChannelableSpec) + if err != nil { + return nil, err + } + if err := json.Unmarshal(rawChannelSpec, &channelableSpec); err != nil { + return nil, err + } + + if s.PhysicalChannelSpec != nil { + rawPhysicalChannelTemplateSpec, err := json.Marshal(s.PhysicalChannelSpec) + if err != nil { + return nil, err + } + if err := json.Unmarshal(rawPhysicalChannelTemplateSpec, &physicalChannelTemplateSpec); err != nil { + return nil, err + } + } + + // Merge the two maps into channelableSpec + for k, v := range physicalChannelTemplateSpec { + channelableSpec[k] = v + } + + // Just return the merged map marshalled + return json.Marshal(channelableSpec) } diff --git a/pkg/apis/messaging/v1/zz_generated.deepcopy.go b/pkg/apis/messaging/v1/zz_generated.deepcopy.go index f5158622a99..90d160a32b1 100644 --- a/pkg/apis/messaging/v1/zz_generated.deepcopy.go +++ b/pkg/apis/messaging/v1/zz_generated.deepcopy.go @@ -169,7 +169,7 @@ func (in *ChannelTemplateSpecInternal) DeepCopyInto(out *ChannelTemplateSpecInte in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) if in.Spec != nil { in, out := &in.Spec, &out.Spec - *out = new(runtime.RawExtension) + *out = new(ChannelTemplateSpecInternalSpec) (*in).DeepCopyInto(*out) } return @@ -193,6 +193,28 @@ func (in *ChannelTemplateSpecInternal) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ChannelTemplateSpecInternalSpec) DeepCopyInto(out *ChannelTemplateSpecInternalSpec) { + *out = *in + in.ChannelableSpec.DeepCopyInto(&out.ChannelableSpec) + if in.PhysicalChannelSpec != nil { + in, out := &in.PhysicalChannelSpec, &out.PhysicalChannelSpec + *out = new(runtime.RawExtension) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ChannelTemplateSpecInternalSpec. +func (in *ChannelTemplateSpecInternalSpec) DeepCopy() *ChannelTemplateSpecInternalSpec { + if in == nil { + return nil + } + out := new(ChannelTemplateSpecInternalSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *InMemoryChannel) DeepCopyInto(out *InMemoryChannel) { *out = *in diff --git a/pkg/reconciler/channel/channel_test.go b/pkg/reconciler/channel/channel_test.go index 1aaa2e3f707..36e9b34690a 100644 --- a/pkg/reconciler/channel/channel_test.go +++ b/pkg/reconciler/channel/channel_test.go @@ -25,6 +25,16 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" clientgotesting "k8s.io/client-go/testing" + "k8s.io/utils/pointer" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/configmap" + "knative.dev/pkg/controller" + fakedynamicclient "knative.dev/pkg/injection/clients/dynamicclient/fake" + logtesting "knative.dev/pkg/logging/testing" + "knative.dev/pkg/network" + . "knative.dev/pkg/reconciler/testing" + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" v1 "knative.dev/eventing/pkg/apis/duck/v1" v1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" @@ -35,14 +45,6 @@ import ( channelreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/channel" "knative.dev/eventing/pkg/duck" . "knative.dev/eventing/pkg/reconciler/testing/v1" - "knative.dev/pkg/apis" - duckv1 "knative.dev/pkg/apis/duck/v1" - "knative.dev/pkg/configmap" - "knative.dev/pkg/controller" - fakedynamicclient "knative.dev/pkg/injection/clients/dynamicclient/fake" - logtesting "knative.dev/pkg/logging/testing" - "knative.dev/pkg/network" - . "knative.dev/pkg/reconciler/testing" ) const ( @@ -54,6 +56,10 @@ var ( testKey = fmt.Sprintf("%s/%s", testNS, channelName) backingChannelHostname = network.GetServiceHostname("foo", "bar") + + deliverySpec = &eventingduckv1.DeliverySpec{ + Retry: pointer.Int32Ptr(10), + } ) func init() { @@ -176,6 +182,55 @@ func TestReconcile(t *testing.T) { WithChannelNoAddress(), WithBackingChannelUnknown("BackingChannelNotConfigured", "BackingChannel has not yet been reconciled.")), }}, + }, { + Name: "Backing channel created with delivery", + Key: testKey, + Objects: []runtime.Object{ + NewChannel(channelName, testNS, + WithChannelTemplate(channelCRD()), + WithInitChannelConditions, + WithBackingChannelObjRef(backingChannelObjRef()), + WithBackingChannelReady, + WithChannelDelivery(deliverySpec), + WithChannelAddress(backingChannelHostname)), + }, + WantCreates: []runtime.Object{ + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "messaging.knative.dev/v1", + "kind": "InMemoryChannel", + "metadata": map[string]interface{}{ + "creationTimestamp": nil, + "namespace": testNS, + "name": channelName, + "ownerReferences": []interface{}{ + map[string]interface{}{ + "apiVersion": "messaging.knative.dev/v1", + "blockOwnerDeletion": true, + "controller": true, + "kind": "Channel", + "name": channelName, + "uid": "", + }, + }, + }, + "spec": map[string]interface{}{ + "delivery": map[string]interface{}{ + "retry": int64(10), + }, + }, + }, + }, + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewChannel(channelName, testNS, + WithChannelTemplate(channelCRD()), + WithInitChannelConditions, + WithBackingChannelObjRef(backingChannelObjRef()), + WithChannelNoAddress(), + WithChannelDelivery(deliverySpec), + WithBackingChannelUnknown("BackingChannelNotConfigured", "BackingChannel has not yet been reconciled.")), + }}, }, { Name: "Generation Bump", Key: testKey, diff --git a/pkg/reconciler/channel/resources/channel.go b/pkg/reconciler/channel/resources/channel.go index 3d2c9da7fe5..33921862542 100644 --- a/pkg/reconciler/channel/resources/channel.go +++ b/pkg/reconciler/channel/resources/channel.go @@ -19,10 +19,10 @@ package resources import ( "encoding/json" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "knative.dev/pkg/kmeta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "knative.dev/eventing/pkg/apis/messaging/v1" ) @@ -41,7 +41,10 @@ func NewChannel(c *v1.Channel) (*unstructured.Unstructured, error) { Name: c.Name, Namespace: c.Namespace, }, - Spec: c.Spec.ChannelTemplate.Spec, + Spec: v1.NewChannelTemplateSpecInternalSpec( + c.Spec.ChannelableSpec, + c.Spec.ChannelTemplate.Spec, + ), } raw, err := json.Marshal(template) if err != nil { diff --git a/pkg/reconciler/mtbroker/resources/channel.go b/pkg/reconciler/mtbroker/resources/channel.go index e86cec3f1f2..89fb4e70a28 100644 --- a/pkg/reconciler/mtbroker/resources/channel.go +++ b/pkg/reconciler/mtbroker/resources/channel.go @@ -22,6 +22,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + duckv1 "knative.dev/eventing/pkg/apis/duck/v1" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" "knative.dev/pkg/kmeta" @@ -53,7 +54,7 @@ func NewChannel(channelType string, owner kmeta.OwnerRefable, channelTemplate *m Labels: l, Annotations: map[string]string{eventing.ScopeAnnotationKey: eventing.ScopeCluster}, }, - Spec: channelTemplate.Spec, + Spec: messagingv1.NewChannelTemplateSpecInternalSpec(duckv1.ChannelableSpec{}, channelTemplate.Spec), } raw, err := json.Marshal(template) if err != nil { diff --git a/pkg/reconciler/parallel/resources/channel.go b/pkg/reconciler/parallel/resources/channel.go index 2ee567486a5..af9bf180133 100644 --- a/pkg/reconciler/parallel/resources/channel.go +++ b/pkg/reconciler/parallel/resources/channel.go @@ -24,6 +24,7 @@ import ( "knative.dev/pkg/kmeta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + duckv1 "knative.dev/eventing/pkg/apis/duck/v1" v1 "knative.dev/eventing/pkg/apis/flows/v1" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" ) @@ -54,7 +55,7 @@ func NewChannel(name string, p *v1.Parallel) (*unstructured.Unstructured, error) Name: name, Namespace: p.Namespace, }, - Spec: p.Spec.ChannelTemplate.Spec, + Spec: messagingv1.NewChannelTemplateSpecInternalSpec(duckv1.ChannelableSpec{}, p.Spec.ChannelTemplate.Spec), } raw, err := json.Marshal(template) if err != nil { diff --git a/pkg/reconciler/sequence/resources/channel.go b/pkg/reconciler/sequence/resources/channel.go index c4073203f9c..51f9d023615 100644 --- a/pkg/reconciler/sequence/resources/channel.go +++ b/pkg/reconciler/sequence/resources/channel.go @@ -24,6 +24,7 @@ import ( "knative.dev/pkg/kmeta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + duckv1 "knative.dev/eventing/pkg/apis/duck/v1" v1 "knative.dev/eventing/pkg/apis/flows/v1" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" ) @@ -49,7 +50,7 @@ func NewChannel(name string, p *v1.Sequence) (*unstructured.Unstructured, error) Name: name, Namespace: p.Namespace, }, - Spec: p.Spec.ChannelTemplate.Spec, + Spec: messagingv1.NewChannelTemplateSpecInternalSpec(duckv1.ChannelableSpec{}, p.Spec.ChannelTemplate.Spec), } raw, err := json.Marshal(template) if err != nil { diff --git a/pkg/reconciler/subscription/subscription.go b/pkg/reconciler/subscription/subscription.go index b91d09d6756..201570e9539 100644 --- a/pkg/reconciler/subscription/subscription.go +++ b/pkg/reconciler/subscription/subscription.go @@ -96,7 +96,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, subscription *v1.Subscri } // Make sure all the URI's that are suppose to be in status are up to date. - if event := r.resolveSubscriptionURIs(ctx, subscription); event != nil { + if event := r.resolveSubscriptionURIs(ctx, subscription, channel); event != nil { return event } @@ -173,7 +173,7 @@ func (r Reconciler) syncChannel(ctx context.Context, channel *eventingduckv1alph return nil } -func (r *Reconciler) resolveSubscriptionURIs(ctx context.Context, subscription *v1.Subscription) pkgreconciler.Event { +func (r *Reconciler) resolveSubscriptionURIs(ctx context.Context, subscription *v1.Subscription, channel *eventingduckv1alpha1.ChannelableCombined) pkgreconciler.Event { // Everything that was supposed to be resolved was, so flip the status bit on that. subscription.Status.MarkReferencesResolvedUnknown("Resolving", "Subscription resolution interrupted.") @@ -185,7 +185,13 @@ func (r *Reconciler) resolveSubscriptionURIs(ctx context.Context, subscription * return err } - if err := r.resolveDeadLetterSink(ctx, subscription); err != nil { + var err error + if subscription.Spec.Delivery == nil && channel.Spec.Delivery != nil { + err = r.resolveDeadLetterSink(ctx, channel.Spec.Delivery.DeadLetterSink, subscription) + } else if subscription.Spec.Delivery != nil { + err = r.resolveDeadLetterSink(ctx, subscription.Spec.Delivery.DeadLetterSink, subscription) + } + if err != nil { return err } @@ -248,16 +254,15 @@ func (r *Reconciler) resolveReply(ctx context.Context, subscription *v1.Subscrip return nil } -func (r *Reconciler) resolveDeadLetterSink(ctx context.Context, subscription *v1.Subscription) pkgreconciler.Event { +func (r *Reconciler) resolveDeadLetterSink(ctx context.Context, deadLetterSink *duckv1.Destination, subscription *v1.Subscription) pkgreconciler.Event { // Resolve DeadLetterSink. - delivery := subscription.Spec.Delivery.DeepCopy() - if !isNilOrEmptyDeliveryDeadLetterSink(delivery) { + if deadLetterSink != nil { // Populate the namespace for the dead letter sink since it is in the namespace - if delivery.DeadLetterSink.Ref != nil { - delivery.DeadLetterSink.Ref.Namespace = subscription.Namespace + if deadLetterSink.Ref != nil { + deadLetterSink.Ref.Namespace = subscription.Namespace } - deadLetterSink, err := r.destinationResolver.URIFromDestinationV1(ctx, *delivery.DeadLetterSink, subscription) + deadLetterSink, err := r.destinationResolver.URIFromDestinationV1(ctx, *deadLetterSink, subscription) if err != nil { logging.FromContext(ctx).Warnw("Failed to resolve spec.delivery.deadLetterSink", zap.Error(err), @@ -412,11 +417,6 @@ func (r *Reconciler) getChannel(ctx context.Context, sub *v1.Subscription) (*eve return ch.DeepCopy(), nil } -func isNilOrEmptyDeliveryDeadLetterSink(delivery *eventingduckv1.DeliverySpec) bool { - return delivery == nil || equality.Semantic.DeepEqual(delivery, &eventingduckv1.DeliverySpec{}) || - delivery.DeadLetterSink == nil -} - func isNilOrEmptyDestination(destination *duckv1.Destination) bool { return destination == nil || equality.Semantic.DeepEqual(destination, &duckv1.Destination{}) } @@ -524,7 +524,7 @@ func (r *Reconciler) updateChannelAddSubscriptionV1Alpha1(channel *eventingduckv SubscriberURI: sub.Status.PhysicalSubscription.SubscriberURI, ReplyURI: sub.Status.PhysicalSubscription.ReplyURI, DeadLetterSinkURI: sub.Status.PhysicalSubscription.DeadLetterSinkURI, - Delivery: deliverySpec(sub), + Delivery: deliverySpec(sub, channel), }}, } return @@ -537,7 +537,7 @@ func (r *Reconciler) updateChannelAddSubscriptionV1Alpha1(channel *eventingduckv channel.Spec.Subscribable.Subscribers[i].SubscriberURI = sub.Status.PhysicalSubscription.SubscriberURI channel.Spec.Subscribable.Subscribers[i].ReplyURI = sub.Status.PhysicalSubscription.ReplyURI channel.Spec.Subscribable.Subscribers[i].DeadLetterSinkURI = sub.Status.PhysicalSubscription.DeadLetterSinkURI - channel.Spec.Subscribable.Subscribers[i].Delivery = deliverySpec(sub) + channel.Spec.Subscribable.Subscribers[i].Delivery = deliverySpec(sub, channel) return } } @@ -550,7 +550,7 @@ func (r *Reconciler) updateChannelAddSubscriptionV1Alpha1(channel *eventingduckv SubscriberURI: sub.Status.PhysicalSubscription.SubscriberURI, ReplyURI: sub.Status.PhysicalSubscription.ReplyURI, DeadLetterSinkURI: sub.Status.PhysicalSubscription.DeadLetterSinkURI, - Delivery: deliverySpec(sub), + Delivery: deliverySpec(sub, channel), }) } @@ -561,7 +561,7 @@ func (r *Reconciler) updateChannelAddSubscriptionV1Beta1(channel *eventingduckv1 channel.Spec.Subscribers[i].Generation = sub.Generation channel.Spec.Subscribers[i].SubscriberURI = sub.Status.PhysicalSubscription.SubscriberURI channel.Spec.Subscribers[i].ReplyURI = sub.Status.PhysicalSubscription.ReplyURI - channel.Spec.Subscribers[i].Delivery = deliverySpec(sub) + channel.Spec.Subscribers[i].Delivery = deliverySpec(sub, channel) return } } @@ -571,16 +571,33 @@ func (r *Reconciler) updateChannelAddSubscriptionV1Beta1(channel *eventingduckv1 Generation: sub.Generation, SubscriberURI: sub.Status.PhysicalSubscription.SubscriberURI, ReplyURI: sub.Status.PhysicalSubscription.ReplyURI, - Delivery: deliverySpec(sub), + Delivery: deliverySpec(sub, channel), } // Must not have been found. Add it. channel.Spec.Subscribers = append(channel.Spec.Subscribers, toAdd) } -func deliverySpec(sub *v1.Subscription) *eventingduckv1beta1.DeliverySpec { - - var delivery *eventingduckv1beta1.DeliverySpec +func deliverySpec(sub *v1.Subscription, channel *eventingduckv1alpha1.ChannelableCombined) (delivery *eventingduckv1beta1.DeliverySpec) { + if sub.Spec.Delivery == nil && channel.Spec.Delivery != nil { + // Default to the channel spec + if sub.Status.PhysicalSubscription.DeadLetterSinkURI != nil { + delivery = &eventingduckv1beta1.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + URI: sub.Status.PhysicalSubscription.DeadLetterSinkURI, + }, + } + } + if channel.Spec.Delivery.BackoffDelay != nil || channel.Spec.Delivery.Retry != nil || channel.Spec.Delivery.BackoffPolicy != nil { + if delivery == nil { + delivery = &eventingduckv1beta1.DeliverySpec{} + } + delivery.BackoffPolicy = channel.Spec.Delivery.BackoffPolicy + delivery.Retry = channel.Spec.Delivery.Retry + delivery.BackoffDelay = channel.Spec.Delivery.BackoffDelay + } + return + } // Only set the deadletter sink if it's not nil. Otherwise we'll just end up patching // empty delivery in there. @@ -599,5 +616,5 @@ func deliverySpec(sub *v1.Subscription) *eventingduckv1beta1.DeliverySpec { delivery.Retry = sub.Spec.Delivery.Retry delivery.BackoffDelay = sub.Spec.Delivery.BackoffDelay } - return delivery + return } diff --git a/pkg/reconciler/subscription/subscription_test.go b/pkg/reconciler/subscription/subscription_test.go index cac873bad11..4e64856db8a 100644 --- a/pkg/reconciler/subscription/subscription_test.go +++ b/pkg/reconciler/subscription/subscription_test.go @@ -66,6 +66,7 @@ const ( channelName = "origin" serviceName = "service" dlcName = "dlc" + dlc2Name = "dlc2" subscriptionUID = subscriptionName + "-abc-123" subscriptionName = "testsubscription" @@ -93,6 +94,8 @@ var ( dlcDNS = "dlc.mynamespace.svc." + network.GetClusterDomainName() dlcURI = apis.HTTP(dlcDNS) + dlc2DNS = "dlc2.mynamespace.svc." + network.GetClusterDomainName() + subscriberGVK = metav1.GroupVersionKind{ Group: "eventing.knative.dev", Version: "v1alpha1", @@ -1093,6 +1096,171 @@ func TestAllCases(t *testing.T) { patchFinalizers(testNS, "a-"+subscriptionName), }, }, + { + Name: "v1 imc - delivery defaulting - full delivery spec", + Objects: []runtime.Object{ + NewSubscription("a-"+subscriptionName, testNS, + WithSubscriptionUID("a-"+subscriptionUID), + WithSubscriptionChannel(imcV1GVK, channelName), + WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS), + ), + NewUnstructured(subscriberGVK, dlcName, testNS, + WithUnstructuredAddressable(dlcDNS), + ), + NewInMemoryChannel(channelName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelSubscribers(nil), + WithInMemoryChannelAddress(channelDNS), + WithInMemoryChannelReadySubscriber("a-"+subscriptionUID), + WithInMemoryChannelDelivery(&eventingduck.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: subscriberGVK.Group + "/" + subscriberGVK.Version, + Kind: subscriberGVK.Kind, + Name: dlcName, + Namespace: testNS, + }, + }, + Retry: pointer.Int32Ptr(10), + BackoffPolicy: &linear, + BackoffDelay: pointer.StringPtr("PT1S"), + }), + ), + }, + Key: testNS + "/" + "a-" + subscriptionName, + WantErr: false, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", "a-"+subscriptionName), + Eventf(corev1.EventTypeNormal, "SubscriberSync", "Subscription was synchronized to channel %q", channelName), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewSubscription("a-"+subscriptionName, testNS, + WithSubscriptionUID("a-"+subscriptionUID), + WithSubscriptionChannel(imcV1GVK, channelName), + WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS), + // The first reconciliation will initialize the status conditions. + WithInitSubscriptionConditions, + MarkReferencesResolved, + MarkAddedToChannel, + WithSubscriptionPhysicalSubscriptionSubscriber(serviceURIWithPath), + WithSubscriptionDeadLetterSinkURI(dlcURI), + ), + }}, + WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ + { + UID: "a-" + subscriptionUID, + SubscriberURI: serviceURIWithPath, + Delivery: &eventingduck.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + URI: apis.HTTP("dlc.mynamespace.svc.cluster.local"), + }, + Retry: pointer.Int32Ptr(10), + BackoffPolicy: &linear, + BackoffDelay: pointer.StringPtr("PT1S"), + }, + }, + }), + patchFinalizers(testNS, "a-"+subscriptionName), + }, + }, + { + Name: "v1 imc - don't default delivery - full delivery spec", + Objects: []runtime.Object{ + NewSubscription("a-"+subscriptionName, testNS, + WithSubscriptionUID("a-"+subscriptionUID), + WithSubscriptionChannel(imcV1GVK, channelName), + WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS), + WithSubscriptionDeliverySpec(&eventingduck.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: subscriberGVK.Group + "/" + subscriberGVK.Version, + Kind: subscriberGVK.Kind, + Name: dlcName, + Namespace: testNS, + }, + }, + Retry: pointer.Int32Ptr(10), + BackoffPolicy: &linear, + BackoffDelay: pointer.StringPtr("PT1S"), + }), + ), + NewUnstructured(subscriberGVK, dlcName, testNS, + WithUnstructuredAddressable(dlcDNS), + ), + NewUnstructured(subscriberGVK, dlc2Name, testNS, + WithUnstructuredAddressable(dlc2DNS), + ), + NewInMemoryChannel(channelName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelSubscribers(nil), + WithInMemoryChannelAddress(channelDNS), + WithInMemoryChannelReadySubscriber("a-"+subscriptionUID), + WithInMemoryChannelDelivery(&eventingduck.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: subscriberGVK.Group + "/" + subscriberGVK.Version, + Kind: subscriberGVK.Kind, + Name: dlc2Name, + Namespace: testNS, + }, + }, + Retry: pointer.Int32Ptr(20), + BackoffPolicy: &linear, + BackoffDelay: pointer.StringPtr("PT10S"), + }), + ), + }, + Key: testNS + "/" + "a-" + subscriptionName, + WantErr: false, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", "a-"+subscriptionName), + Eventf(corev1.EventTypeNormal, "SubscriberSync", "Subscription was synchronized to channel %q", channelName), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewSubscription("a-"+subscriptionName, testNS, + WithSubscriptionUID("a-"+subscriptionUID), + WithSubscriptionChannel(imcV1GVK, channelName), + WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS), + // The first reconciliation will initialize the status conditions. + WithInitSubscriptionConditions, + MarkReferencesResolved, + MarkAddedToChannel, + WithSubscriptionPhysicalSubscriptionSubscriber(serviceURIWithPath), + WithSubscriptionDeliverySpec(&eventingduck.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: subscriberGVK.Group + "/" + subscriberGVK.Version, + Kind: subscriberGVK.Kind, + Name: dlcName, + Namespace: testNS, + }, + }, + Retry: pointer.Int32Ptr(10), + BackoffPolicy: &linear, + BackoffDelay: pointer.StringPtr("PT1S"), + }), + WithSubscriptionDeadLetterSinkURI(dlcURI), + ), + }}, + WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ + { + UID: "a-" + subscriptionUID, + SubscriberURI: serviceURIWithPath, + Delivery: &eventingduck.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + URI: apis.HTTP("dlc.mynamespace.svc.cluster.local"), + }, + Retry: pointer.Int32Ptr(10), + BackoffPolicy: &linear, + BackoffDelay: pointer.StringPtr("PT1S"), + }, + }, + }), + patchFinalizers(testNS, "a-"+subscriptionName), + }, + }, { Name: "v1 imc+deleted - channel patch succeeded", Objects: []runtime.Object{ diff --git a/pkg/reconciler/testing/v1/channel.go b/pkg/reconciler/testing/v1/channel.go index bee1c09db24..066c1506ace 100644 --- a/pkg/reconciler/testing/v1/channel.go +++ b/pkg/reconciler/testing/v1/channel.go @@ -102,6 +102,12 @@ func WithBackingChannelReady(c *eventingv1.Channel) { c.Status.MarkBackingChannelReady() } +func WithChannelDelivery(d *eventingduckv1.DeliverySpec) ChannelOption { + return func(c *eventingv1.Channel) { + c.Spec.Delivery = d + } +} + func WithBackingChannelObjRef(objRef *duckv1.KReference) ChannelOption { return func(c *eventingv1.Channel) { c.Status.Channel = objRef diff --git a/pkg/reconciler/testing/v1/inmemorychannel.go b/pkg/reconciler/testing/v1/inmemorychannel.go index d44cabfbf22..a0b9c4e4abd 100644 --- a/pkg/reconciler/testing/v1/inmemorychannel.go +++ b/pkg/reconciler/testing/v1/inmemorychannel.go @@ -22,11 +22,12 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "knative.dev/pkg/apis" + duckv1 "knative.dev/eventing/pkg/apis/duck/v1" "knative.dev/eventing/pkg/apis/eventing" "knative.dev/eventing/pkg/apis/messaging" v1 "knative.dev/eventing/pkg/apis/messaging/v1" - "knative.dev/pkg/apis" ) // InMemoryChannelOption enables further configuration of a v1.InMemoryChannel. @@ -171,6 +172,12 @@ func WithInMemoryChannelReadySubscriberAndGeneration(uid string, observedGenerat } } +func WithInMemoryChannelDelivery(d *duckv1.DeliverySpec) InMemoryChannelOption { + return func(c *v1.InMemoryChannel) { + c.Spec.Delivery = d + } +} + func WithInMemoryChannelStatusSubscribers(subscriberStatuses []duckv1.SubscriberStatus) InMemoryChannelOption { return func(imc *v1.InMemoryChannel) { imc.Status.Subscribers = subscriberStatuses diff --git a/test/e2e/channel_dls_test.go b/test/e2e/channel_dls_test.go index 4ccfa451a82..edd95217908 100644 --- a/test/e2e/channel_dls_test.go +++ b/test/e2e/channel_dls_test.go @@ -33,3 +33,7 @@ func TestChannelDeadLetterSinkV1Beta1(t *testing.T) { func TestChannelDeadLetterSinkV1(t *testing.T) { helpers.ChannelDeadLetterSinkTestHelper(context.Background(), t, helpers.SubscriptionV1, channelTestRunner) } + +func TestChannelDeadLetterSinkDefault(t *testing.T) { + helpers.ChannelDeadLetterSinkDefaultTestHelper(context.Background(), t, channelTestRunner) +} diff --git a/test/e2e/helpers/channel_dls_test_helper.go b/test/e2e/helpers/channel_dls_test_helper.go index ca2742ad1d4..65091040602 100644 --- a/test/e2e/helpers/channel_dls_test_helper.go +++ b/test/e2e/helpers/channel_dls_test_helper.go @@ -24,8 +24,14 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" . "github.com/cloudevents/sdk-go/v2/test" "github.com/google/uuid" + "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" + duckv1 "knative.dev/pkg/apis/duck/v1" + + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" testlib "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/recordevents" "knative.dev/eventing/test/lib/resources" @@ -100,3 +106,83 @@ func ChannelDeadLetterSinkTestHelper( )) }) } + +// ChannelDeadLetterDefaultSinkTestHelper is the helper function for channel_deadlettersink_test, but setting the delivery from the channel spec +func ChannelDeadLetterSinkDefaultTestHelper( + ctx context.Context, + t *testing.T, + channelTestRunner testlib.ComponentsTestRunner, + options ...testlib.SetupClientOption) { + const ( + senderName = "e2e-channelchain-sender" + recordEventsPodName = "e2e-channel-dls-recordevents-pod" + channelName = "e2e-channel-dls" + ) + channelGK := messagingv1.SchemeGroupVersion.WithKind("Channel").GroupKind() + // subscriptionNames corresponds to Subscriptions + subscriptionNames := []string{"e2e-channel-dls-subs1"} + + channelTestRunner.RunTests(t, testlib.FeatureBasic, func(st *testing.T, channel metav1.TypeMeta) { + thisChannelGk := channel.GroupVersionKind().GroupKind() + if equality.Semantic.DeepEqual(thisChannelGk, channelGK) { + st.Skip("It doesn't make sense to create a messaging.Channel with a backing messaging.Channel") + return + } + + client := testlib.Setup(st, true, options...) + defer testlib.TearDown(client) + + // create channel + client.CreateChannelV1WithDefaultOrFail(&messagingv1.Channel{ + ObjectMeta: metav1.ObjectMeta{ + Name: channelName, + Namespace: client.Namespace, + }, + Spec: messagingv1.ChannelSpec{ + ChannelTemplate: &messagingv1.ChannelTemplateSpec{ + TypeMeta: channel, + }, + ChannelableSpec: eventingduckv1.ChannelableSpec{ + Delivery: &eventingduckv1.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: resources.KnativeRefForService(recordEventsPodName, client.Namespace), + }, + Retry: pointer.Int32Ptr(10), + }, + }, + }, + }) + client.WaitForResourcesReadyOrFail(&channel) + + // create event logger pod and service as the subscriber + eventTracker, _ := recordevents.StartEventRecordOrFail(ctx, client, recordEventsPodName) + // create subscriptions that subscribe to a service that does not exist + client.CreateSubscriptionsV1OrFail( + subscriptionNames, + channelName, + &channel, + resources.WithSubscriberForSubscriptionV1("does-not-exist"), + ) + + // wait for all test resources to be ready, so that we can start sending events + client.WaitForAllTestResourcesReadyOrFail(ctx) + + // send CloudEvent to the first channel + event := cloudevents.NewEvent() + event.SetID("test") + eventSource := fmt.Sprintf("http://%s.svc/", senderName) + event.SetSource(eventSource) + event.SetType(testlib.DefaultEventType) + body := fmt.Sprintf(`{"msg":"TestChannelDeadLetterSink %s"}`, uuid.New().String()) + if err := event.SetData(cloudevents.ApplicationJSON, []byte(body)); err != nil { + t.Fatal("Cannot set the payload of the event:", err.Error()) + } + client.SendEventToAddressable(ctx, senderName, channelName, &channel, event) + + // check if the logging service receives the correct number of event messages + eventTracker.AssertAtLeast(len(subscriptionNames), recordevents.MatchEvent( + HasSource(eventSource), + HasData([]byte(body)), + )) + }) +}