From 2f2a82509c0e0806ffcbefbd51837572fbc86367 Mon Sep 17 00:00:00 2001 From: Knative Prow Robot Date: Mon, 14 Mar 2022 08:16:18 -0700 Subject: [PATCH] [release-1.3] Set dead letter sink URI in the `Channel` status (#6261) * Generic Channel dead letter sink URI We were dropping the dead letter sink URI in the status of the `Channel`. Signed-off-by: Pierangelo Di Pilato * Add Example_withTemplate Signed-off-by: Pierangelo Di Pilato Co-authored-by: Pierangelo Di Pilato --- config/core/resources/channel.yaml | 3 + docs/eventing-api.md | 5 ++ .../messaging/v1/channel_template_types.go | 3 + test/rekt/channel_test.go | 28 +++++++-- test/rekt/features/channel/features.go | 57 ++++++++++++++++--- test/rekt/resources/channel/channel.go | 48 +++++++++++++++- test/rekt/resources/channel/channel_test.go | 56 ++++++++++++++++++ .../resources/channel_impl/channel_impl.go | 34 ++++++++++- 8 files changed, 218 insertions(+), 16 deletions(-) diff --git a/config/core/resources/channel.yaml b/config/core/resources/channel.yaml index ed9956a2428..87d4aaae206 100644 --- a/config/core/resources/channel.yaml +++ b/config/core/resources/channel.yaml @@ -234,6 +234,9 @@ spec: namespace: description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ This is optional field, it gets defaulted to the object holding it if left out.' type: string + deadLetterSinkUri: + description: DeadLetterSinkURI is the resolved URI of the dead letter sink that will be used as a fallback when not specified by Triggers. + type: string observedGeneration: description: ObservedGeneration is the 'Generation' of the Service that was last processed by the controller. type: integer diff --git a/docs/eventing-api.md b/docs/eventing-api.md index 89aeb3a2f0e..d59f8fea576 100644 --- a/docs/eventing-api.md +++ b/docs/eventing-api.md @@ -3969,6 +3969,11 @@ in verbatim to the Channel CRD as Spec section.

+

ChannelTemplateSpecOption +

+

+

ChannelTemplateSpecOption is an optional function for ChannelTemplateSpec.

+

InMemoryChannelSpec

diff --git a/pkg/apis/messaging/v1/channel_template_types.go b/pkg/apis/messaging/v1/channel_template_types.go index ffa0f4195ef..625249ba1a9 100644 --- a/pkg/apis/messaging/v1/channel_template_types.go +++ b/pkg/apis/messaging/v1/channel_template_types.go @@ -30,3 +30,6 @@ type ChannelTemplateSpec struct { // +optional Spec *runtime.RawExtension `json:"spec,omitempty"` } + +// ChannelTemplateSpecOption is an optional function for ChannelTemplateSpec. +type ChannelTemplateSpecOption func(*ChannelTemplateSpec) error diff --git a/test/rekt/channel_test.go b/test/rekt/channel_test.go index 625c50f6d9f..4164c231802 100644 --- a/test/rekt/channel_test.go +++ b/test/rekt/channel_test.go @@ -23,10 +23,6 @@ import ( "testing" "github.com/cloudevents/sdk-go/v2/binding" - "knative.dev/eventing/test/rekt/features/channel" - ch "knative.dev/eventing/test/rekt/resources/channel" - chimpl "knative.dev/eventing/test/rekt/resources/channel_impl" - "knative.dev/eventing/test/rekt/resources/subscription" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/system" _ "knative.dev/pkg/system/testing" @@ -34,6 +30,11 @@ import ( "knative.dev/reconciler-test/pkg/k8s" "knative.dev/reconciler-test/pkg/knative" "knative.dev/reconciler-test/pkg/manifest" + + "knative.dev/eventing/test/rekt/features/channel" + ch "knative.dev/eventing/test/rekt/resources/channel" + chimpl "knative.dev/eventing/test/rekt/resources/channel_impl" + "knative.dev/eventing/test/rekt/resources/subscription" ) // TestChannelConformance @@ -248,6 +249,25 @@ func TestChannelDeadLetterSink(t *testing.T) { env.Test(ctx, t, channel.DeadLetterSink(createSubscriberFn)) } +// TestGenericChannelDeadLetterSink tests if the events that cannot be delivered end up in +// the dead letter sink. +func TestGenericChannelDeadLetterSink(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + ) + + createSubscriberFn := func(ref *duckv1.KReference, uri string) manifest.CfgFn { + return subscription.WithSubscriber(ref, uri) + } + env.Test(ctx, t, channel.DeadLetterSinkGenericChannel(createSubscriberFn)) +} + /* TestEventTransformationForSubscription tests the following scenario: diff --git a/test/rekt/features/channel/features.go b/test/rekt/features/channel/features.go index b4113d872d3..b348fba7abc 100644 --- a/test/rekt/features/channel/features.go +++ b/test/rekt/features/channel/features.go @@ -23,6 +23,14 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/binding" "github.com/cloudevents/sdk-go/v2/test" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/reconciler-test/pkg/eventshub" + "knative.dev/reconciler-test/pkg/eventshub/assert" + "knative.dev/reconciler-test/pkg/feature" + "knative.dev/reconciler-test/pkg/manifest" + "knative.dev/reconciler-test/resources/svc" + + "knative.dev/eventing/test/rekt/resources/channel" "knative.dev/eventing/test/rekt/resources/channel_impl" "knative.dev/eventing/test/rekt/resources/containersource" "knative.dev/eventing/test/rekt/resources/delivery" @@ -30,12 +38,6 @@ import ( "knative.dev/eventing/test/rekt/resources/pingsource" "knative.dev/eventing/test/rekt/resources/source" "knative.dev/eventing/test/rekt/resources/subscription" - duckv1 "knative.dev/pkg/apis/duck/v1" - "knative.dev/reconciler-test/pkg/eventshub" - "knative.dev/reconciler-test/pkg/eventshub/assert" - "knative.dev/reconciler-test/pkg/feature" - "knative.dev/reconciler-test/pkg/manifest" - "knative.dev/reconciler-test/resources/svc" ) func ChannelChain(length int, createSubscriberFn func(ref *duckv1.KReference, uri string) manifest.CfgFn) *feature.Feature { @@ -94,10 +96,47 @@ func DeadLetterSink(createSubscriberFn func(ref *duckv1.KReference, uri string) createSubscriberFn(svc.AsKReference(failer), ""), )) - f.Requirement("channel is ready", channel_impl.IsReady(name)) - f.Requirement("containersource is ready", containersource.IsReady(cs)) + f.Setup("channel is ready", channel_impl.IsReady(name)) + f.Setup("containersource is ready", containersource.IsReady(cs)) + + f.Requirement("Channel has dead letter sink uri", channel_impl.HasDeadLetterSinkURI(name, channel_impl.GVR())) + + f.Assert("dls receives events", assert.OnStore(sink). + MatchEvent(test.HasType("dev.knative.eventing.samples.heartbeat")). + AtLeast(1), + ) + + return f +} + +func DeadLetterSinkGenericChannel(createSubscriberFn func(ref *duckv1.KReference, uri string) manifest.CfgFn) *feature.Feature { + f := feature.NewFeature() + sink := feature.MakeRandomK8sName("sink") + failer := feature.MakeK8sNamePrefix("failer") + cs := feature.MakeRandomK8sName("containersource") + name := feature.MakeRandomK8sName("channel") + + f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver)) + f.Setup("install failing receiver", eventshub.Install(failer, eventshub.StartReceiver, eventshub.DropFirstN(1))) + f.Setup("install channel", channel.Install(name, + channel.WithTemplate(), + delivery.WithDeadLetterSink(svc.AsKReference(sink), "")), + ) + f.Setup("install containersource", containersource.Install(cs, source.WithSink(channel.AsRef(name), ""))) + f.Setup("install subscription", subscription.Install(feature.MakeRandomK8sName("subscription"), + subscription.WithChannel(channel.AsRef(name)), + createSubscriberFn(svc.AsKReference(failer), ""), + )) + + f.Setup("channel is ready", channel.IsReady(name)) + f.Setup("containersource is ready", containersource.IsReady(cs)) + + f.Requirement("Channel has dead letter sink uri", channel_impl.HasDeadLetterSinkURI(name, channel.GVR())) - f.Assert("dls receives events", assert.OnStore(sink).MatchEvent(test.HasType("dev.knative.eventing.samples.heartbeat")).AtLeast(1)) + f.Assert("dls receives events", assert.OnStore(sink). + MatchEvent(test.HasType("dev.knative.eventing.samples.heartbeat")). + AtLeast(1), + ) return f } diff --git a/test/rekt/resources/channel/channel.go b/test/rekt/resources/channel/channel.go index a63343a6404..36c556c8d27 100644 --- a/test/rekt/resources/channel/channel.go +++ b/test/rekt/resources/channel/channel.go @@ -19,15 +19,21 @@ package channel import ( "context" "embed" + "encoding/json" + "fmt" "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" - "knative.dev/eventing/test/rekt/resources/addressable" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/reconciler-test/pkg/feature" "knative.dev/reconciler-test/pkg/k8s" "knative.dev/reconciler-test/pkg/manifest" + + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" + "knative.dev/eventing/test/rekt/resources/addressable" + "knative.dev/eventing/test/rekt/resources/channel_impl" ) //go:embed *.yaml @@ -81,3 +87,43 @@ func AsRef(name string) *duckv1.KReference { Name: name, } } + +// WithTemplate adds channelTemplate to the Channel's config after apply the provided +// options. +func WithTemplate(options ...messagingv1.ChannelTemplateSpecOption) manifest.CfgFn { + return func(m map[string]interface{}) { + t := withTemplate(options...) + channelTemplate := map[string]interface{}{ + "apiVersion": t.APIVersion, + "kind": t.Kind, + } + m["channelTemplate"] = channelTemplate + if t.Spec != nil { + s := map[string]string{} + bytes, err := t.Spec.MarshalJSON() + if err != nil { + panic(fmt.Errorf("failed to marshal spec: %w", err)) + } + if err := json.Unmarshal(bytes, &s); err != nil { + panic(fmt.Errorf("failed to unmarshal spec '%s': %v", bytes, err)) + } + channelTemplate["spec"] = s + } + } +} + +func withTemplate(options ...messagingv1.ChannelTemplateSpecOption) *messagingv1.ChannelTemplateSpec { + gvk := channel_impl.GVK() + t := &messagingv1.ChannelTemplateSpec{ + TypeMeta: metav1.TypeMeta{ + Kind: gvk.Kind, + APIVersion: gvk.GroupVersion().String(), + }, + } + for _, opt := range options { + if err := opt(t); err != nil { + panic(err) + } + } + return t +} diff --git a/test/rekt/resources/channel/channel_test.go b/test/rekt/resources/channel/channel_test.go index 84cd8746b8d..a1029e6fdc9 100644 --- a/test/rekt/resources/channel/channel_test.go +++ b/test/rekt/resources/channel/channel_test.go @@ -17,9 +17,15 @@ limitations under the License. package channel_test import ( + "embed" + "encoding/json" "os" + "k8s.io/apimachinery/pkg/runtime" "knative.dev/reconciler-test/pkg/manifest" + + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" + "knative.dev/eventing/test/rekt/resources/channel" ) // The following examples validate the processing of the With* helper methods @@ -106,3 +112,53 @@ func Example_full() { // backoffPolicy: exponential // backoffDelay: "2007-03-01T13:00:00Z/P1Y2M10DT2H30M" } + +//go:embed *.yaml +var yaml embed.FS + +func Example_withTemplate() { + + spec := map[string]string{ + "thing1": "value1", + "thing2": "value2", + } + bytesSpec, err := json.Marshal(spec) + if err != nil { + panic(err) + } + + re := &runtime.RawExtension{ + Raw: bytesSpec, + } + + images := map[string]string{} + cfg := map[string]interface{}{ + "name": "foo", + "namespace": "bar", + } + withTemplate := channel.WithTemplate(func(spec *messagingv1.ChannelTemplateSpec) error { + spec.Spec = re + return nil + }) + withTemplate(cfg) + + files, err := manifest.ExecuteYAML(yaml, images, cfg) + if err != nil { + panic(err) + } + + manifest.OutputYAML(os.Stdout, files) + // Output: + // apiVersion: messaging.knative.dev/v1 + // kind: Channel + // metadata: + // name: foo + // namespace: bar + // spec: + // channelTemplate: + // apiVersion: messaging.knative.dev/v1 + // kind: InMemoryChannel + // spec: + // thing1: value1 + // thing2: value2 +} diff --git a/test/rekt/resources/channel_impl/channel_impl.go b/test/rekt/resources/channel_impl/channel_impl.go index 5efeaecb156..0dbc7de6304 100644 --- a/test/rekt/resources/channel_impl/channel_impl.go +++ b/test/rekt/resources/channel_impl/channel_impl.go @@ -24,14 +24,20 @@ import ( "github.com/kelseyhightower/envconfig" "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "knative.dev/eventing/test/rekt/resources/addressable" - "knative.dev/eventing/test/rekt/resources/delivery" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/injection/clients/dynamicclient" + "knative.dev/reconciler-test/pkg/environment" "knative.dev/reconciler-test/pkg/feature" "knative.dev/reconciler-test/pkg/k8s" "knative.dev/reconciler-test/pkg/manifest" + + eventingduck "knative.dev/eventing/pkg/apis/duck/v1" + "knative.dev/eventing/test/rekt/resources/addressable" + "knative.dev/eventing/test/rekt/resources/delivery" ) //go:embed *.yaml @@ -89,6 +95,30 @@ func IsAddressable(name string, timing ...time.Duration) feature.StepFn { return k8s.IsAddressable(GVR(), name, timing...) } +// HasDeadLetterSinkURI asserts that the Channel has the resolved dead letter sink URI +// in the status. +func HasDeadLetterSinkURI(name string, gvr schema.GroupVersionResource) feature.StepFn { + return func(ctx context.Context, t feature.T) { + ns := environment.FromContext(ctx).Namespace() + ch, err := dynamicclient.Get(ctx). + Resource(gvr). + Namespace(ns). + Get(ctx, name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("failed to get %s/%s channel: %v", ns, name, err) + } + + channelable := &eventingduck.Channelable{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(ch.UnstructuredContent(), channelable); err != nil { + t.Fatal(err) + } + + if channelable.Status.DeadLetterSinkURI.String() == "" { + t.Fatalf("channel %s/%s has no dead letter sink uri in the status", ns, name) + } + } +} + // Address returns a Channel's address. func Address(ctx context.Context, name string, timings ...time.Duration) (*apis.URL, error) { return addressable.Address(ctx, GVR(), name, timings...)