diff --git a/kafka/source/pkg/apis/sources/v1alpha1/kafka_conversion.go b/kafka/source/pkg/apis/sources/v1alpha1/kafka_conversion.go index 53e29af63c..0ebba048b8 100644 --- a/kafka/source/pkg/apis/sources/v1alpha1/kafka_conversion.go +++ b/kafka/source/pkg/apis/sources/v1alpha1/kafka_conversion.go @@ -63,13 +63,8 @@ func (source *KafkaSource) ConvertTo(ctx context.Context, obj apis.Convertible) Topics: source.Spec.Topics, ConsumerGroup: source.Spec.ConsumerGroup, } - sink.Status = v1beta1.KafkaSourceStatus{ - duckv1.SourceStatus{ - Status: source.Status.Status, - SinkURI: source.Status.SinkURI, - CloudEventAttributes: source.Status.CloudEventAttributes, - }, - } + sink.Status.Status = source.Status.Status + source.Status.Status.ConvertTo(ctx, &sink.Status.Status) // Optionals if source.Spec.Sink != nil { sink.Spec.Sink = *source.Spec.Sink.DeepCopy() @@ -77,6 +72,12 @@ func (source *KafkaSource) ConvertTo(ctx context.Context, obj apis.Convertible) if source.Status.SinkURI != nil { sink.Status.SinkURI = source.Status.SinkURI.DeepCopy() } + if source.Status.CloudEventAttributes != nil { + sink.Status.CloudEventAttributes = make([]duckv1.CloudEventAttributes, len(source.Status.CloudEventAttributes)) + for i, cea := range source.Status.CloudEventAttributes { + sink.Status.CloudEventAttributes[i] = cea + } + } return nil default: return fmt.Errorf("Unknown conversion, got: %T", sink) @@ -122,6 +123,18 @@ func (sink *KafkaSource) ConvertFrom(ctx context.Context, obj apis.Convertible) if reflect.DeepEqual(*sink.Spec.Sink, duckv1.Destination{}) { sink.Spec.Sink = nil } + sink.Status.Status = source.Status.Status + source.Status.Status.ConvertTo(ctx, &source.Status.Status) + // Optionals + if source.Status.SinkURI != nil { + sink.Status.SinkURI = source.Status.SinkURI.DeepCopy() + } + if source.Status.CloudEventAttributes != nil { + sink.Status.CloudEventAttributes = make([]duckv1.CloudEventAttributes, len(source.Status.CloudEventAttributes)) + for i, cea := range source.Status.CloudEventAttributes { + sink.Status.CloudEventAttributes[i] = cea + } + } return nil default: return fmt.Errorf("Unknown conversion, got: %T", source) diff --git a/kafka/source/pkg/apis/sources/v1alpha1/kafka_conversion_test.go b/kafka/source/pkg/apis/sources/v1alpha1/kafka_conversion_test.go new file mode 100644 index 0000000000..5ee08dceef --- /dev/null +++ b/kafka/source/pkg/apis/sources/v1alpha1/kafka_conversion_test.go @@ -0,0 +1,216 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "context" + "github.com/google/go-cmp/cmp" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" + "knative.dev/eventing-contrib/kafka/source/pkg/apis/bindings/v1alpha1" + v1beta1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1beta1" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" + "testing" +) + +func TestKafkaSourceConversionBadType(t *testing.T) { + good, bad := &KafkaSource{}, &KafkaSource{} + + if err := good.ConvertTo(context.Background(), bad); err == nil { + t.Errorf("ConvertTo() = %#v, wanted error", bad) + } + + if err := good.ConvertFrom(context.Background(), bad); err == nil { + t.Errorf("ConvertFrom() = %#v, wanted error", good) + } +} + +// Test v1alpha1 -> v1beta1 -> v1alpha1 +func TestKafkaSourceConversionRoundTripV1beta1(t *testing.T) { + // Just one for now, just adding the for loop for ease of future changes. + versions := []apis.Convertible{&v1beta1.KafkaSource{}} + + tests := []struct { + name string + in *KafkaSource + }{{ + name: "min configuration", + in: &KafkaSource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka-source-name", + Namespace: "kafka-source-ns", + Generation: 17, + }, + Spec: KafkaSourceSpec{}, + Status: KafkaSourceStatus{ + SourceStatus: duckv1.SourceStatus{ + Status: duckv1.Status{ + Conditions: duckv1.Conditions{}, + }, + }, + }, + }, + }, { + name: "full configuration", + in: &KafkaSource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka-source-name", + Namespace: "kafka-source-ns", + Generation: 17, + }, + Spec: KafkaSourceSpec{ + KafkaAuthSpec: v1alpha1.KafkaAuthSpec{ + BootstrapServers: []string{"bootstrap-server-1", "bootstrap-server-2"}, + Net: v1alpha1.KafkaNetSpec{ + SASL: v1alpha1.KafkaSASLSpec{ + Enable: true, + User: v1alpha1.SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "sasl-user-secret-local-obj-ref", + }, + Key: "sasl-user-secret-key", + Optional: pointer.BoolPtr(true), + }, + }, + Password: v1alpha1.SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "sasl-password-secret-local-obj-ref", + }, + Key: "sasl-password-secret-key", + Optional: pointer.BoolPtr(true), + }, + }, + }, + TLS: v1alpha1.KafkaTLSSpec{ + Enable: false, + Cert: v1alpha1.SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "tls-cert-secret-local-obj-ref", + }, + Key: "tls-cert-secret-key", + Optional: pointer.BoolPtr(true), + }, + }, + Key: v1alpha1.SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "tls-key-secret-local-obj-ref", + }, + Key: "tls-key-secret-key", + Optional: pointer.BoolPtr(true), + }, + }, + CACert: v1alpha1.SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "tls-cacert-secret-local-obj-ref", + }, + Key: "tls-cacert-secret-key", + Optional: pointer.BoolPtr(true), + }, + }, + }, + }, + }, + Topics: []string{"topic1", "topic2"}, + ConsumerGroup: "consumer-group", + Sink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "sink-kind", + Namespace: "sink-namespace", + Name: "sink-name", + APIVersion: "sink-api-version", + }, + URI: apis.HTTP("sink-uri"), + }, + ServiceAccountName: "kafka-sa-name", + Resources: KafkaResourceSpec{ + Requests: KafkaRequestsSpec{ + ResourceCPU: "5", + ResourceMemory: "100m", + }, + Limits: KafkaLimitsSpec{ + ResourceCPU: "10", + ResourceMemory: "200m", + }, + }, + }, + Status: KafkaSourceStatus{ + SourceStatus: duckv1.SourceStatus{ + Status: duckv1.Status{ + ObservedGeneration: 1, + Conditions: duckv1.Conditions{{ + Type: "Ready", + Status: "True", + }}, + Annotations: map[string]string{ + "foo": "bar", + "hi": "hello", + }, + }, + SinkURI: apis.HTTP("sink-uri"), + CloudEventAttributes: []duckv1.CloudEventAttributes{ + { + Type: "ce-attr-1-type", + Source: "ce-attr-1-source", + }, { + Type: "ce-attr-2-type", + Source: "ce-attr-2-source", + }, + }, + }, + }, + }, + }} + + for _, test := range tests { + for _, version := range versions { + t.Run(test.name, func(t *testing.T) { + ver := version + if err := test.in.ConvertTo(context.Background(), ver); err != nil { + t.Errorf("ConvertTo() = %v", err) + } + + got := &KafkaSource{} + if err := got.ConvertFrom(context.Background(), ver); err != nil { + t.Errorf("ConvertFrom() = %v", err) + } + + // Since on the way down, we lose the DeprecatedSourceAndType, + // convert the in to equivalent out. + fixed := fixKafkaSourceDeprecated(test.in) + + if diff := cmp.Diff(fixed, got); diff != "" { + t.Errorf("roundtrip (-want, +got) = %v", diff) + } + }) + } + } +} + +// Since v1beta1 to v1alpha1 is lossy but semantically equivalent, +// fix that so diff works. +func fixKafkaSourceDeprecated(in *KafkaSource) *KafkaSource { + in.Spec.ServiceAccountName = "" + in.Spec.Resources = KafkaResourceSpec{} + return in +}