From cecedc85083dc520c6b0e3835bc221e5474e2b89 Mon Sep 17 00:00:00 2001 From: Ville Aikas <11279988+vaikas@users.noreply.github.com> Date: Sun, 26 Apr 2020 02:05:50 -0700 Subject: [PATCH] fix issue 3014 (#3049) * fix issue 3014 * fix the unit tests and fix typo on the duck version * really ville, really?? * deep copy before modifying * do not blindly overwrite the subscribable in defaults, doh --- .../v1alpha1/in_memory_channel_conversion.go | 9 +++++++++ .../in_memory_channel_conversion_test.go | 13 +++++++++++++ .../v1beta1/in_memory_channel_defaults.go | 11 ++++++++--- pkg/reconciler/subscription/subscription.go | 18 ++++++++++++++++-- test/e2e/channel_single_event_test.go | 14 ++++++++++++++ .../e2e/helpers/channel_single_event_helper.go | 11 +++++++++++ test/lib/duck/resource_checks.go | 2 +- 7 files changed, 72 insertions(+), 6 deletions(-) diff --git a/pkg/apis/messaging/v1alpha1/in_memory_channel_conversion.go b/pkg/apis/messaging/v1alpha1/in_memory_channel_conversion.go index 468b84ddedd..da60791ef80 100644 --- a/pkg/apis/messaging/v1alpha1/in_memory_channel_conversion.go +++ b/pkg/apis/messaging/v1alpha1/in_memory_channel_conversion.go @@ -27,6 +27,7 @@ import ( duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" eventingduck "knative.dev/eventing/pkg/apis/duck/v1alpha1" duckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" + "knative.dev/eventing/pkg/apis/messaging" "knative.dev/eventing/pkg/apis/messaging/v1beta1" ) @@ -36,6 +37,10 @@ func (source *InMemoryChannel) ConvertTo(ctx context.Context, obj apis.Convertib switch sink := obj.(type) { case *v1beta1.InMemoryChannel: sink.ObjectMeta = source.ObjectMeta + if sink.Annotations == nil { + sink.Annotations = make(map[string]string) + } + sink.Annotations[messaging.SubscribableDuckVersionAnnotation] = "v1beta1" source.Status.ConvertTo(ctx, &sink.Status) return source.Spec.ConvertTo(ctx, &sink.Spec) default: @@ -84,6 +89,10 @@ func (sink *InMemoryChannel) ConvertFrom(ctx context.Context, obj apis.Convertib sink.ObjectMeta = source.ObjectMeta sink.Status.ConvertFrom(ctx, source.Status) sink.Spec.ConvertFrom(ctx, source.Spec) + if sink.Annotations == nil { + sink.Annotations = make(map[string]string) + } + sink.Annotations[messaging.SubscribableDuckVersionAnnotation] = "v1alpha1" return nil default: return fmt.Errorf("unknown version, got: %T", source) diff --git a/pkg/apis/messaging/v1alpha1/in_memory_channel_conversion_test.go b/pkg/apis/messaging/v1alpha1/in_memory_channel_conversion_test.go index 1e07de92989..e17bba967c8 100644 --- a/pkg/apis/messaging/v1alpha1/in_memory_channel_conversion_test.go +++ b/pkg/apis/messaging/v1alpha1/in_memory_channel_conversion_test.go @@ -26,6 +26,7 @@ import ( "k8s.io/utils/pointer" eventingduck "knative.dev/eventing/pkg/apis/duck/v1alpha1" eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" + "knative.dev/eventing/pkg/apis/messaging" "knative.dev/eventing/pkg/apis/messaging/v1beta1" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -160,6 +161,12 @@ func TestInMemoryChannelConversion(t *testing.T) { if err := got.ConvertFrom(context.Background(), ver); err != nil { t.Errorf("ConvertFrom() = %v", err) } + // Make sure the annotation specifies the correct duck. + if test.in.Annotations == nil { + test.in.Annotations = make(map[string]string) + } + test.in.Annotations[messaging.SubscribableDuckVersionAnnotation] = "v1alpha1" + if diff := cmp.Diff(test.in, got); diff != "" { t.Errorf("roundtrip (-want, +got) = %v", diff) } @@ -285,6 +292,12 @@ func TestInMemoryChannelConversionWithV1Beta1(t *testing.T) { if err := ver.ConvertTo(context.Background(), got); err != nil { t.Errorf("ConvertFrom() = %v", err) } + // Make sure the annotation specifies the correct duck. + if test.in.Annotations == nil { + test.in.Annotations = make(map[string]string) + } + test.in.Annotations[messaging.SubscribableDuckVersionAnnotation] = "v1beta1" + if diff := cmp.Diff(test.in, got); diff != "" { t.Errorf("roundtrip (-want, +got) = %v", diff) } diff --git a/pkg/apis/messaging/v1beta1/in_memory_channel_defaults.go b/pkg/apis/messaging/v1beta1/in_memory_channel_defaults.go index 9105e51d4fc..e82c88d584e 100644 --- a/pkg/apis/messaging/v1beta1/in_memory_channel_defaults.go +++ b/pkg/apis/messaging/v1beta1/in_memory_channel_defaults.go @@ -23,12 +23,17 @@ import ( ) func (imc *InMemoryChannel) SetDefaults(ctx context.Context) { - // Set the duck subscription to indicate that we support - // v1beta1 version of the Subscribable. + // Set the duck subscription to the stored version of the duck + // we support. Reason for this is that the stored version will + // not get a chance to get modified, but for newer versions + // conversion webhook will be able to take a crack at it and + // can modify it to match the duck shape. if imc.Annotations == nil { imc.Annotations = make(map[string]string) } - imc.Annotations[messaging.SubscribableDuckVersionAnnotation] = "v1beta1" + if _, ok := imc.Annotations[messaging.SubscribableDuckVersionAnnotation]; !ok { + imc.Annotations[messaging.SubscribableDuckVersionAnnotation] = "v1alpha1" + } imc.Spec.SetDefaults(ctx) } diff --git a/pkg/reconciler/subscription/subscription.go b/pkg/reconciler/subscription/subscription.go index eb3c0c98a20..028555407d4 100644 --- a/pkg/reconciler/subscription/subscription.go +++ b/pkg/reconciler/subscription/subscription.go @@ -347,7 +347,7 @@ func (r *Reconciler) trackAndFetchChannel(ctx context.Context, sub *v1alpha1.Sub // If the Channel is a channels.messaging type (hence, it's only a factory for // underlying channels), fetch and validate the "backing" channel. func (r *Reconciler) getChannel(ctx context.Context, sub *v1alpha1.Subscription) (*eventingduckv1alpha1.ChannelableCombined, pkgreconciler.Event) { - logging.FromContext(ctx).Info("GETTING channel", zap.Any("channel", sub.Spec.Channel)) + logging.FromContext(ctx).Info("Getting channel", zap.Any("channel", sub.Spec.Channel)) // 1. Track the channel pointed by subscription. // a. If channel is a Channel.messaging.knative.dev @@ -357,6 +357,7 @@ func (r *Reconciler) getChannel(ctx context.Context, sub *v1alpha1.Subscription) } gvk := obj.GetObjectKind().GroupVersionKind() + // Test to see if the channel is Channel.messaging because it is going // to have a "backing" channel that is what we need to actually operate on // as well as keep track of. @@ -408,7 +409,20 @@ func (r *Reconciler) getChannel(ctx context.Context, sub *v1alpha1.Subscription) logging.FromContext(ctx).Error("Failed to convert to Channelable Object", zap.Any("channel", sub.Spec.Channel), zap.Error(err)) return nil, err } - return ch, nil + + retCh := ch.DeepCopy() + gvk = retCh.GetObjectKind().GroupVersionKind() + // IMC has been know to lie about the duck version it supports. We know that + // v1alpha1 supports v1alpha1 Subscribable duck so override it here. + // If there are other channels that have this lying behaviour, add them here... + if gvk.Kind == "InMemoryChannel" && gvk.Version == "v1alpha1" { + if retCh.Annotations == nil { + retCh.Annotations = make(map[string]string) + } + retCh.Annotations[messaging.SubscribableDuckVersionAnnotation] = "v1alpha1" + } + + return retCh, nil } func isNilOrEmptyDeliveryDeadLetterSink(delivery *eventingduckv1beta1.DeliverySpec) bool { diff --git a/test/e2e/channel_single_event_test.go b/test/e2e/channel_single_event_test.go index 9890cfc0a31..e0a240b9dc1 100644 --- a/test/e2e/channel_single_event_test.go +++ b/test/e2e/channel_single_event_test.go @@ -38,6 +38,7 @@ func TestSingleBinaryEventForChannel(t *testing.T) { t, cloudevents.Binary, "v1alpha1", + "", channelTestRunner, ) } @@ -47,6 +48,7 @@ func TestSingleStructuredEventForChannel(t *testing.T) { t, cloudevents.Structured, "v1alpha1", + "", channelTestRunner, ) } @@ -56,6 +58,17 @@ func TestSingleBinaryEventForChannelV1Beta1(t *testing.T) { t, cloudevents.Binary, "v1beta1", + "", + channelTestRunner, + ) +} + +func TestSingleBinaryEventForChannelV1Beta1SubscribeToV1Alpha1(t *testing.T) { + helpers.SingleEventForChannelTestHelper( + t, + cloudevents.Binary, + "v1beta1", + "messaging.knative.dev/v1alpha1", channelTestRunner, ) } @@ -65,6 +78,7 @@ func TestSingleStructuredEventForChannelV1Beta1(t *testing.T) { t, cloudevents.Structured, "v1beta1", + "", channelTestRunner, ) } diff --git a/test/e2e/helpers/channel_single_event_helper.go b/test/e2e/helpers/channel_single_event_helper.go index 1f5a13b80cc..b2210a7f715 100644 --- a/test/e2e/helpers/channel_single_event_helper.go +++ b/test/e2e/helpers/channel_single_event_helper.go @@ -36,8 +36,14 @@ const ( ) // SingleEventForChannelTestHelper is the helper function for channel_single_event_test +// channelVersion can be used to override which version you want to create the +// subscription against. For example, you could create a v1beta1 channel, but create +// a subscription to its v1alpha1 version by using channelVersion to override it. +// channelVersion == "" means that the version of the channel subscribed to is not +// modified. func SingleEventForChannelTestHelper(t *testing.T, encoding string, subscriptionVersion subscriptionVersion, + channelVersion string, channelTestRunner lib.ChannelTestRunner, options ...lib.SetupClientOption) { channelName := "e2e-singleevent-channel-" + encoding @@ -57,6 +63,11 @@ func SingleEventForChannelTestHelper(t *testing.T, encoding string, pod := resources.EventLoggerPod(loggerPodName) client.CreatePodOrFail(pod, lib.WithService(loggerPodName)) + // If the caller specified a different version, override it here. + if channelVersion != "" { + st.Logf("Changing API version from: %q to %q", channel.APIVersion, channelVersion) + channel.APIVersion = channelVersion + } // create subscription to subscribe the channel, and forward the received events to the logger service switch subscriptionVersion { case subscriptionV1alpha1: diff --git a/test/lib/duck/resource_checks.go b/test/lib/duck/resource_checks.go index 96c8ee762d8..7bdf25efbf3 100644 --- a/test/lib/duck/resource_checks.go +++ b/test/lib/duck/resource_checks.go @@ -35,7 +35,7 @@ import ( const ( // The interval and timeout used for polling in checking resource states. interval = 1 * time.Second - timeout = 4 * time.Minute + timeout = 2 * time.Minute ) // WaitForResourceReady polls the status of the MetaResource from client