Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Commit

Permalink
KafkaSource v1beta1<>v1alpha1 conversion test
Browse files Browse the repository at this point in the history
  • Loading branch information
aliok committed Jun 25, 2020
1 parent c31935f commit 1901fe7
Show file tree
Hide file tree
Showing 2 changed files with 236 additions and 7 deletions.
27 changes: 20 additions & 7 deletions kafka/source/pkg/apis/sources/v1alpha1/kafka_conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,21 @@ func (source *KafkaSource) ConvertTo(ctx context.Context, obj apis.Convertible)
Topics: source.Spec.Topics,
ConsumerGroup: source.Spec.ConsumerGroup,
}
sink.Status = v1beta1.KafkaSourceStatus{
duckv1.SourceStatus{
Status: source.Status.Status,
SinkURI: source.Status.SinkURI,
CloudEventAttributes: source.Status.CloudEventAttributes,
},
}
sink.Status.Status = source.Status.Status
source.Status.Status.ConvertTo(ctx, &sink.Status.Status)
// Optionals
if source.Spec.Sink != nil {
sink.Spec.Sink = *source.Spec.Sink.DeepCopy()
}
if source.Status.SinkURI != nil {
sink.Status.SinkURI = source.Status.SinkURI.DeepCopy()
}
if source.Status.CloudEventAttributes != nil {
sink.Status.CloudEventAttributes = make([]duckv1.CloudEventAttributes, len(source.Status.CloudEventAttributes))
for i, cea := range source.Status.CloudEventAttributes {
sink.Status.CloudEventAttributes[i] = cea
}
}
return nil
default:
return fmt.Errorf("Unknown conversion, got: %T", sink)
Expand Down Expand Up @@ -122,6 +123,18 @@ func (sink *KafkaSource) ConvertFrom(ctx context.Context, obj apis.Convertible)
if reflect.DeepEqual(*sink.Spec.Sink, duckv1.Destination{}) {
sink.Spec.Sink = nil
}
sink.Status.Status = source.Status.Status
source.Status.Status.ConvertTo(ctx, &source.Status.Status)
// Optionals
if source.Status.SinkURI != nil {
sink.Status.SinkURI = source.Status.SinkURI.DeepCopy()
}
if source.Status.CloudEventAttributes != nil {
sink.Status.CloudEventAttributes = make([]duckv1.CloudEventAttributes, len(source.Status.CloudEventAttributes))
for i, cea := range source.Status.CloudEventAttributes {
sink.Status.CloudEventAttributes[i] = cea
}
}
return nil
default:
return fmt.Errorf("Unknown conversion, got: %T", source)
Expand Down
216 changes: 216 additions & 0 deletions kafka/source/pkg/apis/sources/v1alpha1/kafka_conversion_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1

import (
"context"
"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
"knative.dev/eventing-contrib/kafka/source/pkg/apis/bindings/v1alpha1"
v1beta1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1beta1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
"testing"
)

func TestKafkaSourceConversionBadType(t *testing.T) {
good, bad := &KafkaSource{}, &KafkaSource{}

if err := good.ConvertTo(context.Background(), bad); err == nil {
t.Errorf("ConvertTo() = %#v, wanted error", bad)
}

if err := good.ConvertFrom(context.Background(), bad); err == nil {
t.Errorf("ConvertFrom() = %#v, wanted error", good)
}
}

// Test v1alpha1 -> v1beta1 -> v1alpha1
func TestKafkaSourceConversionRoundTripV1beta1(t *testing.T) {
// Just one for now, just adding the for loop for ease of future changes.
versions := []apis.Convertible{&v1beta1.KafkaSource{}}

tests := []struct {
name string
in *KafkaSource
}{{
name: "min configuration",
in: &KafkaSource{
ObjectMeta: metav1.ObjectMeta{
Name: "kafka-source-name",
Namespace: "kafka-source-ns",
Generation: 17,
},
Spec: KafkaSourceSpec{},
Status: KafkaSourceStatus{
SourceStatus: duckv1.SourceStatus{
Status: duckv1.Status{
Conditions: duckv1.Conditions{},
},
},
},
},
}, {
name: "full configuration",
in: &KafkaSource{
ObjectMeta: metav1.ObjectMeta{
Name: "kafka-source-name",
Namespace: "kafka-source-ns",
Generation: 17,
},
Spec: KafkaSourceSpec{
KafkaAuthSpec: v1alpha1.KafkaAuthSpec{
BootstrapServers: []string{"bootstrap-server-1", "bootstrap-server-2"},
Net: v1alpha1.KafkaNetSpec{
SASL: v1alpha1.KafkaSASLSpec{
Enable: true,
User: v1alpha1.SecretValueFromSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: "sasl-user-secret-local-obj-ref",
},
Key: "sasl-user-secret-key",
Optional: pointer.BoolPtr(true),
},
},
Password: v1alpha1.SecretValueFromSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: "sasl-password-secret-local-obj-ref",
},
Key: "sasl-password-secret-key",
Optional: pointer.BoolPtr(true),
},
},
},
TLS: v1alpha1.KafkaTLSSpec{
Enable: false,
Cert: v1alpha1.SecretValueFromSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: "tls-cert-secret-local-obj-ref",
},
Key: "tls-cert-secret-key",
Optional: pointer.BoolPtr(true),
},
},
Key: v1alpha1.SecretValueFromSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: "tls-key-secret-local-obj-ref",
},
Key: "tls-key-secret-key",
Optional: pointer.BoolPtr(true),
},
},
CACert: v1alpha1.SecretValueFromSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: "tls-cacert-secret-local-obj-ref",
},
Key: "tls-cacert-secret-key",
Optional: pointer.BoolPtr(true),
},
},
},
},
},
Topics: []string{"topic1", "topic2"},
ConsumerGroup: "consumer-group",
Sink: &duckv1.Destination{
Ref: &duckv1.KReference{
Kind: "sink-kind",
Namespace: "sink-namespace",
Name: "sink-name",
APIVersion: "sink-api-version",
},
URI: apis.HTTP("sink-uri"),
},
ServiceAccountName: "kafka-sa-name",
Resources: KafkaResourceSpec{
Requests: KafkaRequestsSpec{
ResourceCPU: "5",
ResourceMemory: "100m",
},
Limits: KafkaLimitsSpec{
ResourceCPU: "10",
ResourceMemory: "200m",
},
},
},
Status: KafkaSourceStatus{
SourceStatus: duckv1.SourceStatus{
Status: duckv1.Status{
ObservedGeneration: 1,
Conditions: duckv1.Conditions{{
Type: "Ready",
Status: "True",
}},
Annotations: map[string]string{
"foo": "bar",
"hi": "hello",
},
},
SinkURI: apis.HTTP("sink-uri"),
CloudEventAttributes: []duckv1.CloudEventAttributes{
{
Type: "ce-attr-1-type",
Source: "ce-attr-1-source",
}, {
Type: "ce-attr-2-type",
Source: "ce-attr-2-source",
},
},
},
},
},
}}

for _, test := range tests {
for _, version := range versions {
t.Run(test.name, func(t *testing.T) {
ver := version
if err := test.in.ConvertTo(context.Background(), ver); err != nil {
t.Errorf("ConvertTo() = %v", err)
}

got := &KafkaSource{}
if err := got.ConvertFrom(context.Background(), ver); err != nil {
t.Errorf("ConvertFrom() = %v", err)
}

// Since on the way down, we lose the DeprecatedSourceAndType,
// convert the in to equivalent out.
fixed := fixKafkaSourceDeprecated(test.in)

if diff := cmp.Diff(fixed, got); diff != "" {
t.Errorf("roundtrip (-want, +got) = %v", diff)
}
})
}
}
}

// Since v1beta1 to v1alpha1 is lossy but semantically equivalent,
// fix that so diff works.
func fixKafkaSourceDeprecated(in *KafkaSource) *KafkaSource {
in.Spec.ServiceAccountName = ""
in.Spec.Resources = KafkaResourceSpec{}
return in
}

0 comments on commit 1901fe7

Please sign in to comment.