diff --git a/go.mod b/go.mod index 05c4416b41..d639fcf99c 100644 --- a/go.mod +++ b/go.mod @@ -38,6 +38,7 @@ require ( k8s.io/api v0.18.1 k8s.io/apimachinery v0.18.1 k8s.io/client-go v11.0.1-0.20190805182717-6502b5e7b1b5+incompatible + k8s.io/utils v0.0.0-20200124190032-861946025e34 knative.dev/eventing v0.15.1-0.20200625220028-1e3a03b620b6 knative.dev/pkg v0.0.0-20200626022628-f1ee372577e1 knative.dev/serving v0.15.1-0.20200626061427-ce9c1723e56a diff --git a/kafka/source/pkg/apis/bindings/v1alpha1/kafka_conversion.go b/kafka/source/pkg/apis/bindings/v1alpha1/kafka_conversion.go new file mode 100644 index 0000000000..5c4f444b8d --- /dev/null +++ b/kafka/source/pkg/apis/bindings/v1alpha1/kafka_conversion.go @@ -0,0 +1,140 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "context" + "fmt" + + "knative.dev/eventing-contrib/kafka/source/pkg/apis/bindings/v1beta1" + bindingsv1beta1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/bindings/v1beta1" + "knative.dev/pkg/apis" +) + +// ConvertTo implements apis.Convertible. +// Converts source (from v1alpha1.KafkaBinding) into v1beta1.KafkaBinding +func (source *KafkaBinding) ConvertTo(ctx context.Context, obj apis.Convertible) error { + switch sink := obj.(type) { + case *v1beta1.KafkaBinding: + kafkaAuthSpec := bindingsv1beta1.KafkaAuthSpec{} + if err := source.Spec.KafkaAuthSpec.ConvertTo(ctx, &kafkaAuthSpec); err != nil { + return err + } + + sink.ObjectMeta = source.ObjectMeta + sink.Spec = v1beta1.KafkaBindingSpec{ + BindingSpec: source.Spec.BindingSpec, + KafkaAuthSpec: kafkaAuthSpec, + } + sink.Status.Status = source.Status.Status + source.Status.Status.ConvertTo(ctx, &sink.Status.Status) + return nil + default: + return fmt.Errorf("Unknown conversion, got: %T", sink) + } +} + +// ConvertFrom implements apis.Convertible. +// Converts obj from v1beta1.KafkaBinding into v1alpha1.KafkaBinding +func (sink *KafkaBinding) ConvertFrom(ctx context.Context, obj apis.Convertible) error { + switch source := obj.(type) { + case *v1beta1.KafkaBinding: + kafkaAuthSpec := KafkaAuthSpec{} + if err := kafkaAuthSpec.ConvertFrom(ctx, &source.Spec.KafkaAuthSpec); err != nil { + return err + } + + sink.ObjectMeta = source.ObjectMeta + sink.Spec = KafkaBindingSpec{ + BindingSpec: source.Spec.BindingSpec, + KafkaAuthSpec: kafkaAuthSpec, + } + sink.Status.Status = source.Status.Status + source.Status.Status.ConvertTo(ctx, &source.Status.Status) + return nil + default: + return fmt.Errorf("Unknown conversion, got: %T", source) + } +} + +// ConvertTo implements apis.Convertible. +// Converts source (from v1alpha1.KafkaAuthSpec) into v1beta1.KafkaAuthSpec +func (source *KafkaAuthSpec) ConvertTo(_ context.Context, obj apis.Convertible) error { + switch sink := obj.(type) { + case *v1beta1.KafkaAuthSpec: + sink.BootstrapServers = source.BootstrapServers + sink.Net = bindingsv1beta1.KafkaNetSpec{ + SASL: bindingsv1beta1.KafkaSASLSpec{ + Enable: source.Net.SASL.Enable, + User: bindingsv1beta1.SecretValueFromSource{ + SecretKeyRef: source.Net.SASL.User.SecretKeyRef, + }, + Password: bindingsv1beta1.SecretValueFromSource{ + SecretKeyRef: source.Net.SASL.Password.SecretKeyRef}, + }, + TLS: bindingsv1beta1.KafkaTLSSpec{ + Enable: source.Net.TLS.Enable, + Cert: bindingsv1beta1.SecretValueFromSource{ + SecretKeyRef: source.Net.TLS.Cert.SecretKeyRef, + }, + Key: bindingsv1beta1.SecretValueFromSource{ + SecretKeyRef: source.Net.TLS.Key.SecretKeyRef, + }, + CACert: bindingsv1beta1.SecretValueFromSource{ + SecretKeyRef: source.Net.TLS.CACert.SecretKeyRef, + }, + }, + } + return nil + default: + return fmt.Errorf("Unknown conversion, got: %T", sink) + } +} + +// ConvertFrom implements apis.Convertible. +// Converts obj from v1beta1.KafkaAuthSpec into v1alpha1.KafkaAuthSpec +func (sink *KafkaAuthSpec) ConvertFrom(_ context.Context, obj apis.Convertible) error { + switch source := obj.(type) { + case *v1beta1.KafkaAuthSpec: + sink.BootstrapServers = source.BootstrapServers + sink.Net = KafkaNetSpec{ + SASL: KafkaSASLSpec{ + Enable: source.Net.SASL.Enable, + User: SecretValueFromSource{ + SecretKeyRef: source.Net.SASL.User.SecretKeyRef, + }, + Password: SecretValueFromSource{ + SecretKeyRef: source.Net.SASL.Password.SecretKeyRef}, + }, + TLS: KafkaTLSSpec{ + Enable: source.Net.TLS.Enable, + Cert: SecretValueFromSource{ + SecretKeyRef: source.Net.TLS.Cert.SecretKeyRef, + }, + Key: SecretValueFromSource{ + SecretKeyRef: source.Net.TLS.Key.SecretKeyRef, + }, + CACert: SecretValueFromSource{ + SecretKeyRef: source.Net.TLS.CACert.SecretKeyRef, + }, + }, + } + return nil + default: + return fmt.Errorf("Unknown conversion, got: %T", source) + } +} diff --git a/kafka/source/pkg/apis/bindings/v1alpha1/kafka_conversion_test.go b/kafka/source/pkg/apis/bindings/v1alpha1/kafka_conversion_test.go new file mode 100644 index 0000000000..ccfc09dccb --- /dev/null +++ b/kafka/source/pkg/apis/bindings/v1alpha1/kafka_conversion_test.go @@ -0,0 +1,324 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "context" + "testing" + + duckv1alpha1 "knative.dev/pkg/apis/duck/v1alpha1" + "knative.dev/pkg/tracker" + + "github.com/google/go-cmp/cmp" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" + "knative.dev/eventing-contrib/kafka/source/pkg/apis/bindings/v1beta1" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" +) + +func TestKafkaBindingConversionBadType(t *testing.T) { + good, bad := &KafkaBinding{}, &KafkaBinding{} + + if err := good.ConvertTo(context.Background(), bad); err == nil { + t.Errorf("ConvertTo() = %#v, wanted error", bad) + } + + if err := good.ConvertFrom(context.Background(), bad); err == nil { + t.Errorf("ConvertFrom() = %#v, wanted error", good) + } +} + +// Test v1alpha1 -> v1beta1 -> v1alpha1 +func TestKafkaBindingConversionRoundTripV1alpha1(t *testing.T) { + // Just one for now, just adding the for loop for ease of future changes. + versions := []apis.Convertible{&v1beta1.KafkaBinding{}} + + tests := []struct { + name string + in *KafkaBinding + }{{ + name: "min configuration", + in: &KafkaBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka-binding-name", + Namespace: "kafka-binding-ns", + Generation: 17, + }, + Spec: KafkaBindingSpec{}, + Status: KafkaBindingStatus{ + Status: duckv1.Status{ + Conditions: duckv1.Conditions{}, + }, + }, + }, + }, { + name: "full configuration", + in: &KafkaBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka-binding-name", + Namespace: "kafka-binding-ns", + Generation: 17, + }, + Spec: KafkaBindingSpec{ + BindingSpec: duckv1alpha1.BindingSpec{ + Subject: tracker.Reference{ + APIVersion: "bindingAPIVersion", + Kind: "bindingKind", + Namespace: "bindingNamespace", + Name: "bindingName", + }, + }, + KafkaAuthSpec: KafkaAuthSpec{ + BootstrapServers: []string{"bootstrap-server-1", "bootstrap-server-2"}, + Net: KafkaNetSpec{ + SASL: KafkaSASLSpec{ + Enable: true, + User: SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "sasl-user-secret-local-obj-ref", + }, + Key: "sasl-user-secret-key", + Optional: pointer.BoolPtr(true), + }, + }, + Password: SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "sasl-password-secret-local-obj-ref", + }, + Key: "sasl-password-secret-key", + Optional: pointer.BoolPtr(true), + }, + }, + }, + TLS: KafkaTLSSpec{ + Enable: true, + Cert: SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "tls-cert-secret-local-obj-ref", + }, + Key: "tls-cert-secret-key", + Optional: pointer.BoolPtr(true), + }, + }, + Key: SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "tls-key-secret-local-obj-ref", + }, + Key: "tls-key-secret-key", + Optional: pointer.BoolPtr(true), + }, + }, + CACert: SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "tls-cacert-secret-local-obj-ref", + }, + Key: "tls-cacert-secret-key", + Optional: pointer.BoolPtr(true), + }, + }, + }, + }, + }, + }, + Status: KafkaBindingStatus{ + Status: duckv1.Status{ + ObservedGeneration: 1, + Conditions: duckv1.Conditions{{ + Type: "Ready", + Status: "True", + }}, + Annotations: map[string]string{ + "foo": "bar", + "hi": "hello", + }, + }, + }, + }, + }} + + for _, test := range tests { + for _, version := range versions { + t.Run(test.name, func(t *testing.T) { + ver := version + if err := test.in.ConvertTo(context.Background(), ver); err != nil { + t.Errorf("ConvertTo() = %v", err) + } + + got := &KafkaBinding{} + if err := got.ConvertFrom(context.Background(), ver); err != nil { + t.Errorf("ConvertFrom() = %v", err) + } + + // Since on the way down, we lose the DeprecatedSourceAndType, + // convert the in to equivalent out. + fixed := fixKafkaBindingDeprecated(test.in) + + if diff := cmp.Diff(fixed, got); diff != "" { + t.Errorf("roundtrip (-want, +got) = %v", diff) + } + }) + } + } +} + +// Test v1beta1 -> v1alpha1 -> v1beta1 +func TestKafkaBindingConversionRoundTripV1beta1(t *testing.T) { + // Just one for now, just adding the for loop for ease of future changes. + versions := []apis.Convertible{&KafkaBinding{}} + + tests := []struct { + name string + in *v1beta1.KafkaBinding + }{{ + name: "min configuration", + in: &v1beta1.KafkaBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka-binding-name", + Namespace: "kafka-binding-ns", + Generation: 17, + }, + Spec: v1beta1.KafkaBindingSpec{}, + Status: v1beta1.KafkaBindingStatus{ + Status: duckv1.Status{ + Conditions: duckv1.Conditions{}, + }, + }, + }, + }, { + name: "full configuration", + in: &v1beta1.KafkaBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka-binding-name", + Namespace: "kafka-binding-ns", + Generation: 17, + }, + Spec: v1beta1.KafkaBindingSpec{ + BindingSpec: duckv1alpha1.BindingSpec{ + Subject: tracker.Reference{ + APIVersion: "bindingAPIVersion", + Kind: "bindingKind", + Namespace: "bindingNamespace", + Name: "bindingName", + }, + }, + KafkaAuthSpec: v1beta1.KafkaAuthSpec{ + BootstrapServers: []string{"bootstrap-server-1", "bootstrap-server-2"}, + Net: v1beta1.KafkaNetSpec{ + SASL: v1beta1.KafkaSASLSpec{ + Enable: true, + User: v1beta1.SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "sasl-user-secret-local-obj-ref", + }, + Key: "sasl-user-secret-key", + Optional: pointer.BoolPtr(true), + }, + }, + Password: v1beta1.SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "sasl-password-secret-local-obj-ref", + }, + Key: "sasl-password-secret-key", + Optional: pointer.BoolPtr(true), + }, + }, + }, + TLS: v1beta1.KafkaTLSSpec{ + Enable: false, + Cert: v1beta1.SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "tls-cert-secret-local-obj-ref", + }, + Key: "tls-cert-secret-key", + Optional: pointer.BoolPtr(true), + }, + }, + Key: v1beta1.SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "tls-key-secret-local-obj-ref", + }, + Key: "tls-key-secret-key", + Optional: pointer.BoolPtr(true), + }, + }, + CACert: v1beta1.SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "tls-cacert-secret-local-obj-ref", + }, + Key: "tls-cacert-secret-key", + Optional: pointer.BoolPtr(true), + }, + }, + }, + }, + }, + }, + Status: v1beta1.KafkaBindingStatus{ + Status: duckv1.Status{ + ObservedGeneration: 1, + Conditions: duckv1.Conditions{{ + Type: "Ready", + Status: "True", + }}, + Annotations: map[string]string{ + "foo": "bar", + "hi": "hello", + }, + }, + }, + }, + }} + + for _, test := range tests { + for _, version := range versions { + t.Run(test.name, func(t *testing.T) { + ver := version + if err := ver.ConvertFrom(context.Background(), test.in); err != nil { + t.Errorf("ConvertDown() = %v", err) + } + got := &v1beta1.KafkaBinding{} + if err := ver.ConvertTo(context.Background(), got); err != nil { + t.Errorf("ConvertUp() = %v", err) + } + + if diff := cmp.Diff(test.in, got); diff != "" { + t.Errorf("roundtrip (-want, +got) = %v", diff) + } + }) + } + } +} + +// Since v1beta1 to v1alpha1 is lossy but semantically equivalent, +// fix that so diff works. +func fixKafkaBindingDeprecated(in *KafkaBinding) *KafkaBinding { + //in.Spec.ServiceAccountName = "" + //in.Spec.Resources = KafkaResourceSpec{} + return in +} diff --git a/kafka/source/pkg/apis/bindings/v1beta1/kafka_conversion.go b/kafka/source/pkg/apis/bindings/v1beta1/kafka_conversion.go new file mode 100644 index 0000000000..4465370e05 --- /dev/null +++ b/kafka/source/pkg/apis/bindings/v1beta1/kafka_conversion.go @@ -0,0 +1,44 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1beta1 + +import ( + "context" + "fmt" + + "knative.dev/pkg/apis" +) + +// ConvertTo implements apis.Convertible +func (source *KafkaBinding) ConvertTo(_ context.Context, sink apis.Convertible) error { + return fmt.Errorf("v1beta1 is the highest known version, got: %T", sink) +} + +// ConvertFrom implements apis.Convertible +func (sink *KafkaBinding) ConvertFrom(_ context.Context, source apis.Convertible) error { + return fmt.Errorf("v1beta1 is the highest known version, got: %T", source) +} + +// ConvertTo implements apis.Convertible +func (source *KafkaAuthSpec) ConvertTo(_ context.Context, sink apis.Convertible) error { + return fmt.Errorf("v1beta1 is the highest known version, got: %T", sink) +} + +// ConvertFrom implements apis.Convertible +func (sink *KafkaAuthSpec) ConvertFrom(_ context.Context, source apis.Convertible) error { + return fmt.Errorf("v1beta1 is the highest known version, got: %T", source) +} diff --git a/kafka/source/pkg/apis/bindings/v1beta1/kafka_conversion_test.go b/kafka/source/pkg/apis/bindings/v1beta1/kafka_conversion_test.go new file mode 100644 index 0000000000..6183acd3f6 --- /dev/null +++ b/kafka/source/pkg/apis/bindings/v1beta1/kafka_conversion_test.go @@ -0,0 +1,34 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1beta1 + +import ( + "context" + "testing" +) + +func TestKafkaBindingConversionBadType(t *testing.T) { + good, bad := &KafkaBinding{}, &KafkaBinding{} + + if err := good.ConvertTo(context.Background(), bad); err == nil { + t.Errorf("ConvertTo() = %#v, wanted error", bad) + } + + if err := good.ConvertFrom(context.Background(), bad); err == nil { + t.Errorf("ConvertFrom() = %#v, wanted error", good) + } +} diff --git a/kafka/source/pkg/apis/sources/v1alpha1/kafka_conversion.go b/kafka/source/pkg/apis/sources/v1alpha1/kafka_conversion.go index 53e29af63c..c513d6ccb6 100644 --- a/kafka/source/pkg/apis/sources/v1alpha1/kafka_conversion.go +++ b/kafka/source/pkg/apis/sources/v1alpha1/kafka_conversion.go @@ -21,7 +21,7 @@ import ( "fmt" "reflect" - "knative.dev/eventing-contrib/kafka/source/pkg/apis/bindings/v1alpha1" + bindingsv1alpha1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/bindings/v1alpha1" bindingsv1beta1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/bindings/v1beta1" "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1beta1" "knative.dev/pkg/apis" @@ -33,43 +33,19 @@ import ( func (source *KafkaSource) ConvertTo(ctx context.Context, obj apis.Convertible) error { switch sink := obj.(type) { case *v1beta1.KafkaSource: + kafkaAuthSpec := bindingsv1beta1.KafkaAuthSpec{} + if err := source.Spec.KafkaAuthSpec.ConvertTo(ctx, &kafkaAuthSpec); err != nil { + return err + } + sink.ObjectMeta = source.ObjectMeta sink.Spec = v1beta1.KafkaSourceSpec{ - KafkaAuthSpec: bindingsv1beta1.KafkaAuthSpec{ - BootstrapServers: source.Spec.KafkaAuthSpec.BootstrapServers, - Net: bindingsv1beta1.KafkaNetSpec{ - SASL: bindingsv1beta1.KafkaSASLSpec{ - Enable: source.Spec.KafkaAuthSpec.Net.SASL.Enable, - User: bindingsv1beta1.SecretValueFromSource{ - SecretKeyRef: source.Spec.KafkaAuthSpec.Net.SASL.User.SecretKeyRef, - }, - Password: bindingsv1beta1.SecretValueFromSource{ - SecretKeyRef: source.Spec.KafkaAuthSpec.Net.SASL.Password.SecretKeyRef}, - }, - TLS: bindingsv1beta1.KafkaTLSSpec{ - Enable: source.Spec.KafkaAuthSpec.Net.TLS.Enable, - Cert: bindingsv1beta1.SecretValueFromSource{ - SecretKeyRef: source.Spec.KafkaAuthSpec.Net.TLS.Cert.SecretKeyRef, - }, - Key: bindingsv1beta1.SecretValueFromSource{ - SecretKeyRef: source.Spec.KafkaAuthSpec.Net.TLS.Key.SecretKeyRef, - }, - CACert: bindingsv1beta1.SecretValueFromSource{ - SecretKeyRef: source.Spec.KafkaAuthSpec.Net.TLS.CACert.SecretKeyRef, - }, - }, - }, - }, + KafkaAuthSpec: kafkaAuthSpec, Topics: source.Spec.Topics, ConsumerGroup: source.Spec.ConsumerGroup, } - sink.Status = v1beta1.KafkaSourceStatus{ - duckv1.SourceStatus{ - Status: source.Status.Status, - SinkURI: source.Status.SinkURI, - CloudEventAttributes: source.Status.CloudEventAttributes, - }, - } + sink.Status.Status = source.Status.Status + source.Status.Status.ConvertTo(ctx, &sink.Status.Status) // Optionals if source.Spec.Sink != nil { sink.Spec.Sink = *source.Spec.Sink.DeepCopy() @@ -77,6 +53,12 @@ func (source *KafkaSource) ConvertTo(ctx context.Context, obj apis.Convertible) if source.Status.SinkURI != nil { sink.Status.SinkURI = source.Status.SinkURI.DeepCopy() } + if source.Status.CloudEventAttributes != nil { + sink.Status.CloudEventAttributes = make([]duckv1.CloudEventAttributes, len(source.Status.CloudEventAttributes)) + for i, cea := range source.Status.CloudEventAttributes { + sink.Status.CloudEventAttributes[i] = cea + } + } return nil default: return fmt.Errorf("Unknown conversion, got: %T", sink) @@ -88,33 +70,14 @@ func (source *KafkaSource) ConvertTo(ctx context.Context, obj apis.Convertible) func (sink *KafkaSource) ConvertFrom(ctx context.Context, obj apis.Convertible) error { switch source := obj.(type) { case *v1beta1.KafkaSource: + kafkaAuthSpec := bindingsv1alpha1.KafkaAuthSpec{} + if err := kafkaAuthSpec.ConvertFrom(ctx, &source.Spec.KafkaAuthSpec); err != nil { + return err + } + sink.ObjectMeta = source.ObjectMeta sink.Spec = KafkaSourceSpec{ - KafkaAuthSpec: v1alpha1.KafkaAuthSpec{ - BootstrapServers: source.Spec.KafkaAuthSpec.BootstrapServers, - Net: v1alpha1.KafkaNetSpec{ - SASL: v1alpha1.KafkaSASLSpec{ - Enable: source.Spec.KafkaAuthSpec.Net.SASL.Enable, - User: v1alpha1.SecretValueFromSource{ - SecretKeyRef: source.Spec.KafkaAuthSpec.Net.SASL.User.SecretKeyRef, - }, - Password: v1alpha1.SecretValueFromSource{ - SecretKeyRef: source.Spec.KafkaAuthSpec.Net.SASL.Password.SecretKeyRef}, - }, - TLS: v1alpha1.KafkaTLSSpec{ - Enable: source.Spec.KafkaAuthSpec.Net.TLS.Enable, - Cert: v1alpha1.SecretValueFromSource{ - SecretKeyRef: source.Spec.KafkaAuthSpec.Net.TLS.Cert.SecretKeyRef, - }, - Key: v1alpha1.SecretValueFromSource{ - SecretKeyRef: source.Spec.KafkaAuthSpec.Net.TLS.Key.SecretKeyRef, - }, - CACert: v1alpha1.SecretValueFromSource{ - SecretKeyRef: source.Spec.KafkaAuthSpec.Net.TLS.CACert.SecretKeyRef, - }, - }, - }, - }, + KafkaAuthSpec: kafkaAuthSpec, Topics: source.Spec.Topics, ConsumerGroup: source.Spec.ConsumerGroup, Sink: source.Spec.Sink.DeepCopy(), @@ -122,6 +85,18 @@ func (sink *KafkaSource) ConvertFrom(ctx context.Context, obj apis.Convertible) if reflect.DeepEqual(*sink.Spec.Sink, duckv1.Destination{}) { sink.Spec.Sink = nil } + sink.Status.Status = source.Status.Status + source.Status.Status.ConvertTo(ctx, &source.Status.Status) + // Optionals + if source.Status.SinkURI != nil { + sink.Status.SinkURI = source.Status.SinkURI.DeepCopy() + } + if source.Status.CloudEventAttributes != nil { + sink.Status.CloudEventAttributes = make([]duckv1.CloudEventAttributes, len(source.Status.CloudEventAttributes)) + for i, cea := range source.Status.CloudEventAttributes { + sink.Status.CloudEventAttributes[i] = cea + } + } return nil default: return fmt.Errorf("Unknown conversion, got: %T", source) diff --git a/kafka/source/pkg/apis/sources/v1alpha1/kafka_conversion_test.go b/kafka/source/pkg/apis/sources/v1alpha1/kafka_conversion_test.go new file mode 100644 index 0000000000..3c49db2a46 --- /dev/null +++ b/kafka/source/pkg/apis/sources/v1alpha1/kafka_conversion_test.go @@ -0,0 +1,377 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" + bindingsv1alpha1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/bindings/v1alpha1" + bindingsv1beta1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/bindings/v1beta1" + "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1beta1" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" +) + +func TestKafkaSourceConversionBadType(t *testing.T) { + good, bad := &KafkaSource{}, &KafkaSource{} + + if err := good.ConvertTo(context.Background(), bad); err == nil { + t.Errorf("ConvertTo() = %#v, wanted error", bad) + } + + if err := good.ConvertFrom(context.Background(), bad); err == nil { + t.Errorf("ConvertFrom() = %#v, wanted error", good) + } +} + +// Test v1alpha1 -> v1beta1 -> v1alpha1 +func TestKafkaSourceConversionRoundTripV1alpha1(t *testing.T) { + // Just one for now, just adding the for loop for ease of future changes. + versions := []apis.Convertible{&v1beta1.KafkaSource{}} + + tests := []struct { + name string + in *KafkaSource + }{{ + name: "min configuration", + in: &KafkaSource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka-source-name", + Namespace: "kafka-source-ns", + Generation: 17, + }, + Spec: KafkaSourceSpec{}, + Status: KafkaSourceStatus{ + SourceStatus: duckv1.SourceStatus{ + Status: duckv1.Status{ + Conditions: duckv1.Conditions{}, + }, + }, + }, + }, + }, { + name: "full configuration", + in: &KafkaSource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka-source-name", + Namespace: "kafka-source-ns", + Generation: 17, + }, + Spec: KafkaSourceSpec{ + KafkaAuthSpec: bindingsv1alpha1.KafkaAuthSpec{ + BootstrapServers: []string{"bootstrap-server-1", "bootstrap-server-2"}, + Net: bindingsv1alpha1.KafkaNetSpec{ + SASL: bindingsv1alpha1.KafkaSASLSpec{ + Enable: true, + User: bindingsv1alpha1.SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "sasl-user-secret-local-obj-ref", + }, + Key: "sasl-user-secret-key", + Optional: pointer.BoolPtr(true), + }, + }, + Password: bindingsv1alpha1.SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "sasl-password-secret-local-obj-ref", + }, + Key: "sasl-password-secret-key", + Optional: pointer.BoolPtr(true), + }, + }, + }, + TLS: bindingsv1alpha1.KafkaTLSSpec{ + Enable: true, + Cert: bindingsv1alpha1.SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "tls-cert-secret-local-obj-ref", + }, + Key: "tls-cert-secret-key", + Optional: pointer.BoolPtr(true), + }, + }, + Key: bindingsv1alpha1.SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "tls-key-secret-local-obj-ref", + }, + Key: "tls-key-secret-key", + Optional: pointer.BoolPtr(true), + }, + }, + CACert: bindingsv1alpha1.SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "tls-cacert-secret-local-obj-ref", + }, + Key: "tls-cacert-secret-key", + Optional: pointer.BoolPtr(true), + }, + }, + }, + }, + }, + Topics: []string{"topic1", "topic2"}, + ConsumerGroup: "consumer-group", + Sink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "sink-kind", + Namespace: "sink-namespace", + Name: "sink-name", + APIVersion: "sink-api-version", + }, + URI: apis.HTTP("sink-uri"), + }, + ServiceAccountName: "kafka-sa-name", + Resources: KafkaResourceSpec{ + Requests: KafkaRequestsSpec{ + ResourceCPU: "5", + ResourceMemory: "100m", + }, + Limits: KafkaLimitsSpec{ + ResourceCPU: "10", + ResourceMemory: "200m", + }, + }, + }, + Status: KafkaSourceStatus{ + SourceStatus: duckv1.SourceStatus{ + Status: duckv1.Status{ + ObservedGeneration: 1, + Conditions: duckv1.Conditions{{ + Type: "Ready", + Status: "True", + }}, + Annotations: map[string]string{ + "foo": "bar", + "hi": "hello", + }, + }, + SinkURI: apis.HTTP("sink-uri"), + CloudEventAttributes: []duckv1.CloudEventAttributes{ + { + Type: "ce-attr-1-type", + Source: "ce-attr-1-source", + }, { + Type: "ce-attr-2-type", + Source: "ce-attr-2-source", + }, + }, + }, + }, + }, + }} + + for _, test := range tests { + for _, version := range versions { + t.Run(test.name, func(t *testing.T) { + ver := version + if err := test.in.ConvertTo(context.Background(), ver); err != nil { + t.Errorf("ConvertTo() = %v", err) + } + + got := &KafkaSource{} + if err := got.ConvertFrom(context.Background(), ver); err != nil { + t.Errorf("ConvertFrom() = %v", err) + } + + // Since on the way down, we lose the DeprecatedSourceAndType, + // convert the in to equivalent out. + fixed := fixKafkaSourceDeprecated(test.in) + + if diff := cmp.Diff(fixed, got); diff != "" { + t.Errorf("roundtrip (-want, +got) = %v", diff) + } + }) + } + } +} + +// Test v1beta1 -> v1alpha1 -> v1beta1 +func TestKafkaSourceConversionRoundTripV1beta1(t *testing.T) { + // Just one for now, just adding the for loop for ease of future changes. + versions := []apis.Convertible{&KafkaSource{}} + + tests := []struct { + name string + in *v1beta1.KafkaSource + }{{ + name: "min configuration", + in: &v1beta1.KafkaSource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka-source-name", + Namespace: "kafka-source-ns", + Generation: 17, + }, + Spec: v1beta1.KafkaSourceSpec{}, + Status: v1beta1.KafkaSourceStatus{ + SourceStatus: duckv1.SourceStatus{ + Status: duckv1.Status{ + Conditions: duckv1.Conditions{}, + }, + }, + }, + }, + }, { + name: "full configuration", + in: &v1beta1.KafkaSource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka-source-name", + Namespace: "kafka-source-ns", + Generation: 17, + }, + Spec: v1beta1.KafkaSourceSpec{ + SourceSpec: duckv1.SourceSpec{ + Sink: duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "sink-kind", + Namespace: "sink-namespace", + Name: "sink-name", + APIVersion: "sink-api-version", + }, + URI: apis.HTTP("sink-uri"), + }, + // don't specify the following as v1alpha1 doesn't have that + //CloudEventOverrides: &duckv1.CloudEventOverrides{ + // Extensions: map[string]string{ + // "ext1": "foo", + // "ext2": "bar", + // }, + //}, + }, + KafkaAuthSpec: bindingsv1beta1.KafkaAuthSpec{ + BootstrapServers: []string{"bootstrap-server-1", "bootstrap-server-2"}, + Net: bindingsv1beta1.KafkaNetSpec{ + SASL: bindingsv1beta1.KafkaSASLSpec{ + Enable: true, + User: bindingsv1beta1.SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "sasl-user-secret-local-obj-ref", + }, + Key: "sasl-user-secret-key", + Optional: pointer.BoolPtr(true), + }, + }, + Password: bindingsv1beta1.SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "sasl-password-secret-local-obj-ref", + }, + Key: "sasl-password-secret-key", + Optional: pointer.BoolPtr(true), + }, + }, + }, + TLS: bindingsv1beta1.KafkaTLSSpec{ + Enable: false, + Cert: bindingsv1beta1.SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "tls-cert-secret-local-obj-ref", + }, + Key: "tls-cert-secret-key", + Optional: pointer.BoolPtr(true), + }, + }, + Key: bindingsv1beta1.SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "tls-key-secret-local-obj-ref", + }, + Key: "tls-key-secret-key", + Optional: pointer.BoolPtr(true), + }, + }, + CACert: bindingsv1beta1.SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "tls-cacert-secret-local-obj-ref", + }, + Key: "tls-cacert-secret-key", + Optional: pointer.BoolPtr(true), + }, + }, + }, + }, + }, + Topics: []string{"topic1", "topic2"}, + ConsumerGroup: "consumer-group", + }, + Status: v1beta1.KafkaSourceStatus{ + SourceStatus: duckv1.SourceStatus{ + Status: duckv1.Status{ + ObservedGeneration: 1, + Conditions: duckv1.Conditions{{ + Type: "Ready", + Status: "True", + }}, + Annotations: map[string]string{ + "foo": "bar", + "hi": "hello", + }, + }, + SinkURI: apis.HTTP("sink-uri"), + CloudEventAttributes: []duckv1.CloudEventAttributes{ + { + Type: "ce-attr-1-type", + Source: "ce-attr-1-source", + }, { + Type: "ce-attr-2-type", + Source: "ce-attr-2-source", + }, + }, + }, + }, + }, + }} + + for _, test := range tests { + for _, version := range versions { + t.Run(test.name, func(t *testing.T) { + ver := version + if err := ver.ConvertFrom(context.Background(), test.in); err != nil { + t.Errorf("ConvertDown() = %v", err) + } + got := &v1beta1.KafkaSource{} + if err := ver.ConvertTo(context.Background(), got); err != nil { + t.Errorf("ConvertUp() = %v", err) + } + + if diff := cmp.Diff(test.in, got); diff != "" { + t.Errorf("roundtrip (-want, +got) = %v", diff) + } + }) + } + } +} + +// Since v1beta1 to v1alpha1 is lossy but semantically equivalent, +// fix that so diff works. +func fixKafkaSourceDeprecated(in *KafkaSource) *KafkaSource { + in.Spec.ServiceAccountName = "" + in.Spec.Resources = KafkaResourceSpec{} + return in +} diff --git a/vendor/modules.txt b/vendor/modules.txt index cb315a2d20..f37c9cfd1e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1093,6 +1093,7 @@ k8s.io/kube-openapi/pkg/generators/rules k8s.io/kube-openapi/pkg/util/proto k8s.io/kube-openapi/pkg/util/sets # k8s.io/utils v0.0.0-20200124190032-861946025e34 +## explicit k8s.io/utils/buffer k8s.io/utils/integer k8s.io/utils/pointer