Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Commit

Permalink
Bump KafkaChannel types to v1beta1 (#1356)
Browse files Browse the repository at this point in the history
* Bump KafkaChannel types to v1beta1

* Formatting fixes
  • Loading branch information
aliok authored Jul 1, 2020
1 parent ecf19f6 commit e81b71d
Show file tree
Hide file tree
Showing 35 changed files with 2,951 additions and 2 deletions.
24 changes: 22 additions & 2 deletions hack/update-codegen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ for DIR in "${API_DIRS_SOURCES_AND_BINDINGS[@]}"; do
--go-header-file ${REPO_ROOT_DIR}/hack/boilerplate.go.txt
done

# Channels
API_DIRS_CHANNELS=(kafka/channel/pkg natss/pkg)
# NatssChannel
API_DIRS_CHANNELS=(natss/pkg)

for DIR in "${API_DIRS_CHANNELS[@]}"; do
# generate the code with:
Expand All @@ -126,6 +126,26 @@ for DIR in "${API_DIRS_CHANNELS[@]}"; do
--go-header-file ${REPO_ROOT_DIR}/hack/boilerplate.go.txt
done

# KafkaChannel
API_DIRS_CHANNELS=(kafka/channel/pkg)

for DIR in "${API_DIRS_CHANNELS[@]}"; do
# generate the code with:
# --output-base because this script should also be able to run inside the vendor dir of
# k8s.io/kubernetes. The output-base is needed for the generators to output into the vendor dir
# instead of the $GOPATH directly. For normal projects this can be dropped.
${CODEGEN_PKG}/generate-groups.sh "deepcopy,client,informer,lister" \
"knative.dev/eventing-contrib/${DIR}/client" "knative.dev/eventing-contrib/${DIR}/apis" \
"messaging:v1alpha1 messaging:v1beta1" \
--go-header-file ${REPO_ROOT_DIR}/hack/boilerplate.go.txt

# Knative Injection
${KNATIVE_CODEGEN_PKG}/hack/generate-knative.sh "injection" \
"knative.dev/eventing-contrib/${DIR}/client" "knative.dev/eventing-contrib/${DIR}/apis" \
"messaging:v1alpha1 messaging:v1beta1" \
--go-header-file ${REPO_ROOT_DIR}/hack/boilerplate.go.txt
done

# Depends on generate-groups.sh to install bin/deepcopy-gen
${GOPATH}/bin/deepcopy-gen \
-O zz_generated.deepcopy \
Expand Down
20 changes: 20 additions & 0 deletions kafka/channel/pkg/apis/messaging/v1beta1/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package v1beta1 is the v1beta1 version of the API.
// +k8s:deepcopy-gen=package
// +groupName=messaging.knative.dev
package v1beta1
50 changes: 50 additions & 0 deletions kafka/channel/pkg/apis/messaging/v1beta1/kafka_channel_defaults.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1beta1

import (
"context"

"knative.dev/eventing/pkg/apis/messaging"

"knative.dev/eventing-contrib/kafka/channel/pkg/utils"
)

func (c *KafkaChannel) SetDefaults(ctx context.Context) {
// Set the duck subscription to the stored version of the duck
// we support. Reason for this is that the stored version will
// not get a chance to get modified, but for newer versions
// conversion webhook will be able to take a crack at it and
// can modify it to match the duck shape.
if c.Annotations == nil {
c.Annotations = make(map[string]string)
}
if _, ok := c.Annotations[messaging.SubscribableDuckVersionAnnotation]; !ok {
c.Annotations[messaging.SubscribableDuckVersionAnnotation] = "v1"
}

c.Spec.SetDefaults(ctx)
}

func (cs *KafkaChannelSpec) SetDefaults(ctx context.Context) {
if cs.NumPartitions == 0 {
cs.NumPartitions = utils.DefaultNumPartitions
}
if cs.ReplicationFactor == 0 {
cs.ReplicationFactor = utils.DefaultReplicationFactor
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1beta1

import (
"context"
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"knative.dev/eventing-contrib/kafka/channel/pkg/utils"

"github.com/google/go-cmp/cmp"
)

const (
testNumPartitions = 10
testReplicationFactor = 5
)

func TestKafkaChannelDefaults(t *testing.T) {
testCases := map[string]struct {
initial KafkaChannel
expected KafkaChannel
}{
"nil spec": {
initial: KafkaChannel{},
expected: KafkaChannel{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{"messaging.knative.dev/subscribable": "v1"},
},
Spec: KafkaChannelSpec{
NumPartitions: utils.DefaultNumPartitions,
ReplicationFactor: utils.DefaultReplicationFactor,
},
},
},
"numPartitions not set": {
initial: KafkaChannel{
Spec: KafkaChannelSpec{
ReplicationFactor: testReplicationFactor,
},
},
expected: KafkaChannel{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{"messaging.knative.dev/subscribable": "v1"},
},
Spec: KafkaChannelSpec{
NumPartitions: utils.DefaultNumPartitions,
ReplicationFactor: testReplicationFactor,
},
},
},
"replicationFactor not set": {
initial: KafkaChannel{
Spec: KafkaChannelSpec{
NumPartitions: testNumPartitions,
},
},
expected: KafkaChannel{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{"messaging.knative.dev/subscribable": "v1"},
},
Spec: KafkaChannelSpec{
NumPartitions: testNumPartitions,
ReplicationFactor: utils.DefaultReplicationFactor,
},
},
},
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
tc.initial.SetDefaults(context.TODO())
if diff := cmp.Diff(tc.expected, tc.initial); diff != "" {
t.Fatalf("Unexpected defaults (-want, +got): %s", diff)
}
})
}
}
168 changes: 168 additions & 0 deletions kafka/channel/pkg/apis/messaging/v1beta1/kafka_channel_lifecycle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1beta1

import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
)

var kc = apis.NewLivingConditionSet(
KafkaChannelConditionTopicReady,
KafkaChannelConditionDispatcherReady,
KafkaChannelConditionServiceReady,
KafkaChannelConditionEndpointsReady,
KafkaChannelConditionAddressable,
KafkaChannelConditionChannelServiceReady,
KafkaChannelConditionConfigReady)

const (
// KafkaChannelConditionReady has status True when all subconditions below have been set to True.
KafkaChannelConditionReady = apis.ConditionReady

// KafkaChannelConditionDispatcherReady has status True when a Dispatcher deployment is ready
// Keyed off appsv1.DeploymentAvailable, which means minimum available replicas required are up
// and running for at least minReadySeconds.
KafkaChannelConditionDispatcherReady apis.ConditionType = "DispatcherReady"

// KafkaChannelConditionServiceReady has status True when a k8s Service is ready. This
// basically just means it exists because there's no meaningful status in Service. See Endpoints
// below.
KafkaChannelConditionServiceReady apis.ConditionType = "ServiceReady"

// KafkaChannelConditionEndpointsReady has status True when a k8s Service Endpoints are backed
// by at least one endpoint.
KafkaChannelConditionEndpointsReady apis.ConditionType = "EndpointsReady"

// KafkaChannelConditionAddressable has status true when this KafkaChannel meets
// the Addressable contract and has a non-empty URL.
KafkaChannelConditionAddressable apis.ConditionType = "Addressable"

// KafkaChannelConditionServiceReady has status True when a k8s Service representing the channel is ready.
// Because this uses ExternalName, there are no endpoints to check.
KafkaChannelConditionChannelServiceReady apis.ConditionType = "ChannelServiceReady"

// KafkaChannelConditionTopicReady has status True when the Kafka topic to use by the channel exists.
KafkaChannelConditionTopicReady apis.ConditionType = "TopicReady"

// KafkaChannelConditionConfigReady has status True when the Kafka configuration to use by the channel exists and is valid
// (ie. the connection has been established).
KafkaChannelConditionConfigReady apis.ConditionType = "ConfigurationReady"
)

// GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface.
func (*KafkaChannel) GetConditionSet() apis.ConditionSet {
return kc
}

// GetCondition returns the condition currently associated with the given type, or nil.
func (cs *KafkaChannelStatus) GetCondition(t apis.ConditionType) *apis.Condition {
return kc.Manage(cs).GetCondition(t)
}

// IsReady returns true if the resource is ready overall.
func (cs *KafkaChannelStatus) IsReady() bool {
return kc.Manage(cs).IsHappy()
}

// InitializeConditions sets relevant unset conditions to Unknown state.
func (cs *KafkaChannelStatus) InitializeConditions() {
kc.Manage(cs).InitializeConditions()
}

// SetAddress sets the address (as part of Addressable contract) and marks the correct condition.
func (cs *KafkaChannelStatus) SetAddress(url *apis.URL) {
if cs.Address == nil {
cs.Address = &duckv1.Addressable{}
}
if url != nil {
cs.Address.URL = url
kc.Manage(cs).MarkTrue(KafkaChannelConditionAddressable)
} else {
cs.Address.URL = nil
kc.Manage(cs).MarkFalse(KafkaChannelConditionAddressable, "EmptyURL", "URL is nil")
}
}

func (cs *KafkaChannelStatus) MarkDispatcherFailed(reason, messageFormat string, messageA ...interface{}) {
kc.Manage(cs).MarkFalse(KafkaChannelConditionDispatcherReady, reason, messageFormat, messageA...)
}

func (cs *KafkaChannelStatus) MarkDispatcherUnknown(reason, messageFormat string, messageA ...interface{}) {
kc.Manage(cs).MarkUnknown(KafkaChannelConditionDispatcherReady, reason, messageFormat, messageA...)
}

// TODO: Unify this with the ones from Eventing. Say: Broker, Trigger.
func (cs *KafkaChannelStatus) PropagateDispatcherStatus(ds *appsv1.DeploymentStatus) {
for _, cond := range ds.Conditions {
if cond.Type == appsv1.DeploymentAvailable {
if cond.Status == corev1.ConditionTrue {
kc.Manage(cs).MarkTrue(KafkaChannelConditionDispatcherReady)
} else if cond.Status == corev1.ConditionFalse {
cs.MarkDispatcherFailed("DispatcherDeploymentFalse", "The status of Dispatcher Deployment is False: %s : %s", cond.Reason, cond.Message)
} else if cond.Status == corev1.ConditionUnknown {
cs.MarkDispatcherUnknown("DispatcherDeploymentUnknown", "The status of Dispatcher Deployment is Unknown: %s : %s", cond.Reason, cond.Message)
}
}
}
}

func (cs *KafkaChannelStatus) MarkServiceFailed(reason, messageFormat string, messageA ...interface{}) {
kc.Manage(cs).MarkFalse(KafkaChannelConditionServiceReady, reason, messageFormat, messageA...)
}

func (cs *KafkaChannelStatus) MarkServiceUnknown(reason, messageFormat string, messageA ...interface{}) {
kc.Manage(cs).MarkUnknown(KafkaChannelConditionServiceReady, reason, messageFormat, messageA...)
}

func (cs *KafkaChannelStatus) MarkServiceTrue() {
kc.Manage(cs).MarkTrue(KafkaChannelConditionServiceReady)
}

func (cs *KafkaChannelStatus) MarkChannelServiceFailed(reason, messageFormat string, messageA ...interface{}) {
kc.Manage(cs).MarkFalse(KafkaChannelConditionChannelServiceReady, reason, messageFormat, messageA...)
}

func (cs *KafkaChannelStatus) MarkChannelServiceTrue() {
kc.Manage(cs).MarkTrue(KafkaChannelConditionChannelServiceReady)
}

func (cs *KafkaChannelStatus) MarkEndpointsFailed(reason, messageFormat string, messageA ...interface{}) {
kc.Manage(cs).MarkFalse(KafkaChannelConditionEndpointsReady, reason, messageFormat, messageA...)
}

func (cs *KafkaChannelStatus) MarkEndpointsTrue() {
kc.Manage(cs).MarkTrue(KafkaChannelConditionEndpointsReady)
}

func (cs *KafkaChannelStatus) MarkTopicTrue() {
kc.Manage(cs).MarkTrue(KafkaChannelConditionTopicReady)
}

func (cs *KafkaChannelStatus) MarkTopicFailed(reason, messageFormat string, messageA ...interface{}) {
kc.Manage(cs).MarkFalse(KafkaChannelConditionTopicReady, reason, messageFormat, messageA...)
}

func (cs *KafkaChannelStatus) MarkConfigTrue() {
kc.Manage(cs).MarkTrue(KafkaChannelConditionConfigReady)
}

func (cs *KafkaChannelStatus) MarkConfigFailed(reason, messageFormat string, messageA ...interface{}) {
kc.Manage(cs).MarkFalse(KafkaChannelConditionConfigReady, reason, messageFormat, messageA...)
}
Loading

0 comments on commit e81b71d

Please sign in to comment.