Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Defaultcontroller #843

Merged
merged 8 commits into from
Mar 7, 2019
2 changes: 2 additions & 0 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/http"
"time"

"github.com/knative/eventing/pkg/reconciler/v1alpha1/channel"
"github.com/knative/eventing/pkg/reconciler/v1alpha1/subscription"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -119,6 +120,7 @@ func main() {
// manager run it.
providers := []ProvideFunc{
subscription.ProvideController,
channel.ProvideController,
}
for _, provider := range providers {
if _, err := provider(mgr); err != nil {
Expand Down
21 changes: 20 additions & 1 deletion pkg/apis/eventing/v1alpha1/channel_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type ChannelSpec struct {
Subscribable *eventingduck.Subscribable `json:"subscribable,omitempty"`
}

var chanCondSet = duckv1alpha1.NewLivingConditionSet(ChannelConditionProvisioned, ChannelConditionAddressable)
var chanCondSet = duckv1alpha1.NewLivingConditionSet(ChannelConditionProvisioned, ChannelConditionAddressable, ChannelConditionProvisionerInstalled)

// ChannelStatus represents the current state of a Channel.
type ChannelStatus struct {
Expand Down Expand Up @@ -119,6 +119,10 @@ const (
// ChannelConditionAddressable has status true when this Channel meets
// the Addressable contract and has a non-empty hostname.
ChannelConditionAddressable duckv1alpha1.ConditionType = "Addressable"

// ChannelConditionProvisionerFound has status true when the channel is being watched
// by the provisioner's channel controller (in other words, the provisioner is installed)
ChannelConditionProvisionerInstalled duckv1alpha1.ConditionType = "ProvisionerInstalled"
)

// GetCondition returns the condition currently associated with the given type, or nil.
Expand All @@ -139,11 +143,26 @@ func (cs *ChannelStatus) InitializeConditions() {
// MarkProvisioned sets ChannelConditionProvisioned condition to True state.
akashrv marked this conversation as resolved.
Show resolved Hide resolved
func (cs *ChannelStatus) MarkProvisioned() {
chanCondSet.Manage(cs).MarkTrue(ChannelConditionProvisioned)

akashrv marked this conversation as resolved.
Show resolved Hide resolved
// MarkProvisionedNotInstalled will be set by default controller and needs to be unset by individual controllers
akashrv marked this conversation as resolved.
Show resolved Hide resolved
// This is done so that each individual channel controller gets it for free.
cs.MarkProvisionerInstalled()
akashrv marked this conversation as resolved.
Show resolved Hide resolved
}

// MarkNotProvisioned sets ChannelConditionProvisioned condition to False state.
akashrv marked this conversation as resolved.
Show resolved Hide resolved
func (cs *ChannelStatus) MarkNotProvisioned(reason, messageFormat string, messageA ...interface{}) {
chanCondSet.Manage(cs).MarkFalse(ChannelConditionProvisioned, reason, messageFormat, messageA...)
// MarkProvisionedNotInstalled will be set by default controller and needs to be unset by individual controllers
akashrv marked this conversation as resolved.
Show resolved Hide resolved
// This is done so that each individual channel controller gets it for free.
cs.MarkProvisionerInstalled()
}

func (cs *ChannelStatus) MarkProvisionerInstalled() {
chanCondSet.Manage(cs).MarkTrue(ChannelConditionProvisionerInstalled)
akashrv marked this conversation as resolved.
Show resolved Hide resolved
}

func (cs *ChannelStatus) MarkProvisionerNotInstalled(reason, messageFormat string, messageA ...interface{}) {
chanCondSet.Manage(cs).MarkFalse(ChannelConditionProvisionerInstalled, reason, messageFormat, messageA...)
akashrv marked this conversation as resolved.
Show resolved Hide resolved
}

// SetAddress makes this Channel addressable by setting the hostname. It also
Expand Down
15 changes: 12 additions & 3 deletions pkg/apis/eventing/v1alpha1/channel_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ func TestChannelInitializeConditions(t *testing.T) {
}, {
Type: ChannelConditionProvisioned,
Status: corev1.ConditionUnknown,
}, {
Type: ChannelConditionProvisionerInstalled,
Status: corev1.ConditionUnknown,
akashrv marked this conversation as resolved.
Show resolved Hide resolved
}, {
Type: ChannelConditionReady,
Status: corev1.ConditionUnknown,
Expand All @@ -121,6 +124,9 @@ func TestChannelInitializeConditions(t *testing.T) {
}, {
Type: ChannelConditionProvisioned,
Status: corev1.ConditionFalse,
}, {
Type: ChannelConditionProvisionerInstalled,
Status: corev1.ConditionUnknown,
}, {
Type: ChannelConditionReady,
Status: corev1.ConditionUnknown,
Expand All @@ -141,12 +147,15 @@ func TestChannelInitializeConditions(t *testing.T) {
}, {
Type: ChannelConditionProvisioned,
Status: corev1.ConditionTrue,
}, {
Type: ChannelConditionProvisionerInstalled,
Status: corev1.ConditionUnknown,
}, {
Type: ChannelConditionReady,
Status: corev1.ConditionUnknown,
}}},
},
}
}},
},
}}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
Expand Down
141 changes: 141 additions & 0 deletions pkg/reconciler/v1alpha1/channel/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
Copyright 2018 The Knative Authors
akashrv marked this conversation as resolved.
Show resolved Hide resolved

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package channel

import (
"context"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/source"

"github.com/golang/glog"
"github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/dynamic"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

const (
// controllerAgentName is the string used by this controller to identify
// itself when creating events.
controllerAgentName = "channel-controller"
akashrv marked this conversation as resolved.
Show resolved Hide resolved
finalizerName = controllerAgentName
akashrv marked this conversation as resolved.
Show resolved Hide resolved
channelReconciled = "ChannelReconciled"
channelUpdateStatusFailed = "ChannelUpdateStatusFailed"
)

type reconciler struct {
client client.Client
restConfig *rest.Config
dynamicClient dynamic.Interface
recorder record.EventRecorder
}

// Verify the struct implements reconcile.Reconciler
var _ reconcile.Reconciler = &reconciler{}

// ProvideController returns a Channel controller.
// This Channel controller is a default controller for channels of all provisioner kinds
func ProvideController(mgr manager.Manager) (controller.Controller, error) {
// Setup a new controller to Reconcile channel
c, err := controller.New(controllerAgentName, mgr, controller.Options{
Reconciler: &reconciler{
recorder: mgr.GetRecorder(controllerAgentName),
},
})
if err != nil {
return nil, err
}

// Watch channel events
// This controller is no-op when Channels are deleted
if err := c.Watch(
&source.Kind{Type: &v1alpha1.Channel{}},
&handler.EnqueueRequestForObject{},
predicate.Funcs{
DeleteFunc: func(event.DeleteEvent) bool {
return false
},
}); err != nil {
return nil, err
}

return c, nil
}

// This detaul channel reconciler will check if the channel is being watched by provisioner's channel controller
akashrv marked this conversation as resolved.
Show resolved Hide resolved
// This will improve UX. See https://github.com/knative/eventing/issues/779
func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) {
glog.Infof("Reconciling channel %v", request)
ch := &v1alpha1.Channel{}

// Controller-runtime client Get() always deep copies the object. Hence no need to again deep copy it
err := r.client.Get(context.TODO(), request.NamespacedName, ch)

if errors.IsNotFound(err) {
glog.Errorf("could not find channel %v\n", request)
return reconcile.Result{}, nil
}

if err != nil {
glog.Errorf("could not fetch channel %v for %+v\n", err, request)
return reconcile.Result{}, err
}

err = r.reconcile(ch)

if err != nil {
glog.Warningf("Error reconciling channel: %v. Will retry.", err)
akashrv marked this conversation as resolved.
Show resolved Hide resolved
r.recorder.Eventf(ch, corev1.EventTypeWarning, channelUpdateStatusFailed, "Failed to update channel status: %v", request.NamespacedName)
return reconcile.Result{Requeue: true}, err
}
glog.Infof("Successfully reconciled channel %v", request.NamespacedName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should not use %v everywhere for logging but instead things like %q here and %s elsewhere where we might not want quotes.

r.recorder.Eventf(ch, corev1.EventTypeNormal, channelReconciled, "Channel reconciled: %v", request.NamespacedName)
return reconcile.Result{Requeue: false}, nil
}

func (r *reconciler) reconcile(ch *v1alpha1.Channel) error {
ch.Status.InitializeConditions()
akashrv marked this conversation as resolved.
Show resolved Hide resolved

c := ch.Status.GetCondition(v1alpha1.ChannelConditionProvisionerInstalled)

if c.IsUnknown() {
ch.Status.MarkProvisionerNotInstalled(
"Provisioner not found.",
"Specified provisioner [Name:%v Kind:%v] is not installed or not controlling the channel.",
akashrv marked this conversation as resolved.
Show resolved Hide resolved
ch.Spec.Provisioner.Name,
ch.Spec.Provisioner.Kind,
)
err := r.client.Status().Update(context.TODO(), ch)
return err
}
return nil
}

func (r *reconciler) InjectClient(c client.Client) error {
r.client = c
return nil
}
Loading