Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds Kafka Channel Provisioner Controllers #468

Merged
merged 32 commits into from
Nov 9, 2018
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
8ea7c13
Kafka Channel Provisioner Controllers
neosab Sep 27, 2018
6637554
Remove controllerRuntimeStart and address PR comments
neosab Sep 28, 2018
0dd7028
Merge remote-tracking branch 'upstream/master' into try_kafka_provisi…
neosab Sep 28, 2018
dd502b0
Merge remote-tracking branch 'upstream/master' into try_kafka_provisi…
neosab Oct 8, 2018
c25ce1b
Remove fetching configmap in controller
neosab Oct 8, 2018
2ec153b
Add more tests, improv coverage
neosab Oct 10, 2018
4c8232d
Add ChannelStatus.IsReady
neosab Oct 10, 2018
72a219a
Merge remote-tracking branch 'upstream/master' into try_kafka_provisi…
neosab Oct 11, 2018
00ab78e
Address PR comments
neosab Oct 15, 2018
5dd3164
Merge remote-tracking branch 'upstream/master' into try_kafka_provisi…
neosab Oct 15, 2018
514773c
Remove unfinished README
neosab Oct 15, 2018
8f83d7c
Switch from logr to zap
neosab Oct 15, 2018
356da32
Provision Channel as Kafka Topic
neosab Oct 10, 2018
bc5515d
Deprovision Channel
neosab Oct 16, 2018
53cb8e4
Merge remote-tracking branch 'upstream/master' into kafka_provision_t…
neosab Oct 18, 2018
9097c21
Merge remote-tracking branch 'upstream/master' into kafka_provision_t…
neosab Oct 24, 2018
dddeeec
Merge remote-tracking branch 'upstream/master' into try_kafka_provisi…
neosab Oct 29, 2018
86dd3f8
Merge matzew/try_kafka_provisioner into try_kafka_provisioner
matzew Oct 29, 2018
b66ec8f
Fix few more after pr 562
neosab Oct 29, 2018
e4131f4
Fix tests and imports
neosab Oct 29, 2018
712fbc4
PR feedback for removing ClusterChannelProvisioner name from configmap
neosab Oct 31, 2018
7577094
Merge remote-tracking branch 'upstream/master' into try_kafka_provisi…
neosab Oct 31, 2018
c5dc969
Adding instructions for Channel provisioner
matzew Nov 5, 2018
9934290
short cut code for missing ...
matzew Nov 5, 2018
8a54f24
Updating to latest Kafka client, and fixing idle bug
matzew Nov 5, 2018
de54ab9
Merge pull request #2 from matzew/try_kafka_provisioner
neosab Nov 6, 2018
99f2132
Merge remote-tracking branch 'upstream/master' into HEAD
neosab Nov 6, 2018
240c125
Fix conflicts and unit tests
neosab Nov 6, 2018
32aa096
Merge remote-tracking branch 'upstream/master' into try_kafka_provisi…
neosab Nov 8, 2018
e9dad0d
Address PR feedback
neosab Nov 8, 2018
0190820
Updating docs, based on feedback
matzew Nov 8, 2018
c301c95
Merge pull request #4 from matzew/doc_feedback
neosab Nov 8, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions config/provisioners/kafka/clusterprovisioner.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright 2018 The Knative Authors
neosab marked this conversation as resolved.
Show resolved Hide resolved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: eventing.knative.dev/v1alpha1
kind: ClusterProvisioner
metadata:
name: kafka
spec:
reconciles:
group: eventing.knative.dev
kind: Channel
neosab marked this conversation as resolved.
Show resolved Hide resolved
28 changes: 28 additions & 0 deletions config/provisioners/kafka/clusterrole.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright 2018 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: kafka-provisioner
rules:
- apiGroups: ["eventing.knative.dev"]
resources: ["clusterprovisioners"]
verbs: ["get", "watch", "list", "update", "patch"]
- apiGroups: ["eventing.knative.dev"]
resources: ["channels"]
verbs: ["get", "watch", "list", "update", "patch"]
- apiGroups: [""]
resources: ["events"]
neosab marked this conversation as resolved.
Show resolved Hide resolved
verbs: ["create", "patch"]
neosab marked this conversation as resolved.
Show resolved Hide resolved
26 changes: 26 additions & 0 deletions config/provisioners/kafka/clusterrolebinding.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright 2018 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
name: kafka-provisioner-manage
subjects:
- kind: ServiceAccount
name: kafka-provisioner
namespace: knative-eventing
roleRef:
kind: ClusterRole
name: kafka-provisioner
apiGroup: rbac.authorization.k8s.io
neosab marked this conversation as resolved.
Show resolved Hide resolved
41 changes: 41 additions & 0 deletions config/provisioners/kafka/controller.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright 2018 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: apps/v1beta1
kind: Deployment
metadata:
name: kafka-provisioner
namespace: knative-eventing
spec:
replicas: 1
template:
metadata:
labels:
app: kafka-provisioner
spec:
serviceAccountName: kafka-provisioner
containers:
- name: kafka-provisioner-controller
image: github.com/knative/eventing/pkg/provisioners/kafka
args: [
"-logtostderr",
neosab marked this conversation as resolved.
Show resolved Hide resolved
"-stderrthreshold", "INFO",
]
volumeMounts:
- name: kafka-provisioner-config
mountPath: /etc/config-provisioner
volumes:
- name: kafka-provisioner-config
configMap:
name: kafka-provisioner-config
neosab marked this conversation as resolved.
Show resolved Hide resolved
24 changes: 24 additions & 0 deletions config/provisioners/kafka/kafka-provisioner-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copyright 2018 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-provisioner-config
namespace: knative-eventing
data:
# Name of the provisioner that this controller represents
cluster-provisioner-name: kafka
# Broker URL's for the provisioner
brokers: kafkabroker.kafka:9092
neosab marked this conversation as resolved.
Show resolved Hide resolved
19 changes: 19 additions & 0 deletions config/provisioners/kafka/service-account.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright 2018 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: v1
kind: ServiceAccount
metadata:
name: kafka-provisioner
namespace: knative-eventing
neosab marked this conversation as resolved.
Show resolved Hide resolved
10 changes: 10 additions & 0 deletions pkg/apis/eventing/v1alpha1/channel_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,16 @@ func (cs *ChannelStatus) GetCondition(t duckv1alpha1.ConditionType) *duckv1alpha
return chanCondSet.Manage(cs).GetCondition(t)
}

// InitializeConditions sets relevant unset conditions to Unknown state.
func (cs *ChannelStatus) InitializeConditions() {
chanCondSet.Manage(cs).InitializeConditions()
}

// InitializeConditions sets relevant unset conditions to Unknown state.
func (cs *ChannelStatus) MarkAsNotProvisioned(reason, messageFormat string, messageA ...interface{}) {
chanCondSet.Manage(cs).MarkFalse(ChannelConditionProvisioned, reason, messageFormat, messageA...)
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// ChannelList is a collection of Channels.
Expand Down
87 changes: 87 additions & 0 deletions pkg/apis/eventing/v1alpha1/channel_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
corev1 "k8s.io/api/core/v1"
)
Expand Down Expand Up @@ -80,3 +81,89 @@ func TestChannelGetCondition(t *testing.T) {
})
}
}

func TestChannelInitializeConditions(t *testing.T) {
tests := []struct {
name string
cs *ChannelStatus
want *ChannelStatus
}{{
name: "empty",
cs: &ChannelStatus{},
want: &ChannelStatus{
Conditions: []duckv1alpha1.Condition{{
Type: ChannelConditionProvisioned,
Status: corev1.ConditionUnknown,
}, {
Type: ChannelConditionReady,
Status: corev1.ConditionUnknown,
}},
},
}, {
name: "one false",
cs: &ChannelStatus{
Conditions: []duckv1alpha1.Condition{{
Type: ChannelConditionProvisioned,
Status: corev1.ConditionFalse,
}},
},
want: &ChannelStatus{
Conditions: []duckv1alpha1.Condition{{
Type: ChannelConditionProvisioned,
Status: corev1.ConditionFalse,
}, {
Type: ChannelConditionReady,
Status: corev1.ConditionUnknown,
}},
},
}, {
name: "one true",
cs: &ChannelStatus{
Conditions: []duckv1alpha1.Condition{{
Type: ChannelConditionProvisioned,
Status: corev1.ConditionTrue,
}},
},
want: &ChannelStatus{
Conditions: []duckv1alpha1.Condition{{
Type: ChannelConditionProvisioned,
Status: corev1.ConditionTrue,
}, {
Type: ChannelConditionReady,
Status: corev1.ConditionUnknown,
}}},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
test.cs.InitializeConditions()
ignore := cmpopts.IgnoreFields(duckv1alpha1.Condition{}, "LastTransitionTime")
if diff := cmp.Diff(test.want, test.cs, ignore); diff != "" {
t.Errorf("unexpected conditions (-want, +got) = %v", diff)
}
})
}
}

func TestChannelStatus_MarkAsNotProvisioned(t *testing.T) {
cs := &ChannelStatus{}
cs.InitializeConditions()
want := &ChannelStatus{
Conditions: []duckv1alpha1.Condition{{
Type: ChannelConditionProvisioned,
Status: corev1.ConditionFalse,
Reason: "Not Provisioned",
Message: "testing",
}, {
Type: ChannelConditionReady,
Status: corev1.ConditionFalse,
Reason: "Not Provisioned",
Message: "testing",
}}}
ignore := cmpopts.IgnoreFields(duckv1alpha1.Condition{}, "LastTransitionTime")
cs.MarkAsNotProvisioned("Not Provisioned", "testing")
if diff := cmp.Diff(want, cs, ignore); diff != "" {
t.Errorf("unexpected conditions (-want, +got) = %v", diff)
}
}
21 changes: 20 additions & 1 deletion pkg/apis/eventing/v1alpha1/cluster_provisioner_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type ClusterProvisionerSpec struct {
Reconciles metav1.GroupKind `json:"reconciles"`
}

var cProvCondSet = duckv1alpha1.NewLivingConditionSet()
var cProvCondSet = duckv1alpha1.NewLivingConditionSet(ClusterProvisionerConditionProvisionerReady)
neosab marked this conversation as resolved.
Show resolved Hide resolved

// ClusterProvisionerStatus is the status for a ClusterProvisioner resource
type ClusterProvisionerStatus struct {
Expand All @@ -82,6 +82,15 @@ type ClusterProvisionerStatus struct {
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
}

const (

// ClusterProvisionerConditionReady has status True when all subconditions below have been set to True.
ClusterProvisionerConditionReady = duckv1alpha1.ConditionReady

// ClusterProvisionerConditionProvisionerReady has status True when the provisioner is ready
ClusterProvisionerConditionProvisionerReady duckv1alpha1.ConditionType = "ProvisionerReady"
)

// GetCondition returns the condition currently associated with the given type, or nil.
func (ps *ClusterProvisionerStatus) GetCondition(t duckv1alpha1.ConditionType) *duckv1alpha1.Condition {
return cProvCondSet.Manage(ps).GetCondition(t)
Expand All @@ -92,6 +101,16 @@ func (ps *ClusterProvisionerStatus) IsReady() bool {
return cProvCondSet.Manage(ps).IsHappy()
}

// MarkProvisionerReady sets the condition that the provisioner is ready to provision backing resource.
func (ps *ClusterProvisionerStatus) MarkProvisionerReady() {
cProvCondSet.Manage(ps).MarkTrue(ClusterProvisionerConditionProvisionerReady)
neosab marked this conversation as resolved.
Show resolved Hide resolved
}

// MarkProvisionerNotReady sets the condition that the provisioner is not ready to provision backing resource.
func (ps *ClusterProvisionerStatus) MarkProvisionerNotReady(reason, messageFormat string, messageA ...interface{}) {
cProvCondSet.Manage(ps).MarkFalse(ClusterProvisionerConditionProvisionerReady, reason, messageFormat, messageA...)
}

// InitializeConditions sets relevant unset conditions to Unknown state.
func (ps *ClusterProvisionerStatus) InitializeConditions() {
cProvCondSet.Manage(ps).InitializeConditions()
Expand Down
21 changes: 20 additions & 1 deletion pkg/apis/eventing/v1alpha1/cluster_provisioner_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,26 @@ func TestClusterProvisionerStatusIsReady(t *testing.T) {
}},
},
want: false,
}}
}, {
name: "mark provisioner ready",
ps: func() *ClusterProvisionerStatus {
ps := &ClusterProvisionerStatus{}
ps.InitializeConditions()
ps.MarkProvisionerReady()
return ps
}(),
want: true,
},
{
name: "mark provisioner not ready",
ps: func() *ClusterProvisionerStatus {
ps := &ClusterProvisionerStatus{}
ps.InitializeConditions()
ps.MarkProvisionerNotReady("Not Ready", "testing")
return ps
}(),
want: false,
}}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
Expand Down
Loading