Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Defaultcontroller #843

Merged
merged 8 commits into from
Mar 7, 2019
Merged
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/http"
"time"

"github.com/knative/eventing/pkg/reconciler/v1alpha1/channel"
"github.com/knative/eventing/pkg/reconciler/v1alpha1/subscription"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -119,6 +120,7 @@ func main() {
// manager run it.
providers := []ProvideFunc{
subscription.ProvideController,
channel.ProvideController,
}
for _, provider := range providers {
if _, err := provider(mgr); err != nil {
Expand Down
21 changes: 20 additions & 1 deletion pkg/apis/eventing/v1alpha1/channel_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type ChannelSpec struct {
Subscribable *eventingduck.Subscribable `json:"subscribable,omitempty"`
}

var chanCondSet = duckv1alpha1.NewLivingConditionSet(ChannelConditionProvisioned, ChannelConditionAddressable)
var chanCondSet = duckv1alpha1.NewLivingConditionSet(ChannelConditionProvisioned, ChannelConditionAddressable, ChannelConditionProvisionerInstalled)

// ChannelStatus represents the current state of a Channel.
type ChannelStatus struct {
Expand Down Expand Up @@ -119,6 +119,10 @@ const (
// ChannelConditionAddressable has status true when this Channel meets
// the Addressable contract and has a non-empty hostname.
ChannelConditionAddressable duckv1alpha1.ConditionType = "Addressable"

// ChannelConditionProvisionerFound has status true when the channel is being watched
// by the provisioner's channel controller (in other words, the provisioner is installed)
ChannelConditionProvisionerInstalled duckv1alpha1.ConditionType = "ProvisionerInstalled"
)

// GetCondition returns the condition currently associated with the given type, or nil.
Expand All @@ -134,6 +138,11 @@ func (cs *ChannelStatus) IsReady() bool {
// InitializeConditions sets relevant unset conditions to Unknown state.
func (cs *ChannelStatus) InitializeConditions() {
chanCondSet.Manage(cs).InitializeConditions()
// Channel-default-controller sets ChannelConditionProvisionerInstalled=False, and it needs to be set to True by individual controllers
// This is done so that each individual channel controller gets it for free.
// It is also implied here that the channel-default-controller never calls InitializeConditions(), while individual channel controllers
// call InitializeConditions() as one of the first things in its reconcile loop.
cs.MarkProvisionerInstalled()
}

// MarkProvisioned sets ChannelConditionProvisioned condition to True state.
akashrv marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -146,6 +155,16 @@ func (cs *ChannelStatus) MarkNotProvisioned(reason, messageFormat string, messag
chanCondSet.Manage(cs).MarkFalse(ChannelConditionProvisioned, reason, messageFormat, messageA...)
}

// MarkProvisionerInstalled sets ChannelConditionProvisionerInstalled condition to True state.
func (cs *ChannelStatus) MarkProvisionerInstalled() {
chanCondSet.Manage(cs).MarkTrue(ChannelConditionProvisionerInstalled)
akashrv marked this conversation as resolved.
Show resolved Hide resolved
}

// MarkProvisionerNotInstalled sets ChannelConditionProvisionerInstalled condition to False state.
func (cs *ChannelStatus) MarkProvisionerNotInstalled(reason, messageFormat string, messageA ...interface{}) {
chanCondSet.Manage(cs).MarkFalse(ChannelConditionProvisionerInstalled, reason, messageFormat, messageA...)
akashrv marked this conversation as resolved.
Show resolved Hide resolved
}

// SetAddress makes this Channel addressable by setting the hostname. It also
// sets the ChannelConditionAddressable to true.
func (cs *ChannelStatus) SetAddress(hostname string) {
Expand Down
16 changes: 13 additions & 3 deletions pkg/apis/eventing/v1alpha1/channel_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ func TestChannelInitializeConditions(t *testing.T) {
}, {
Type: ChannelConditionProvisioned,
Status: corev1.ConditionUnknown,
}, {
Type: ChannelConditionProvisionerInstalled,
Status: corev1.ConditionTrue,
}, {
Type: ChannelConditionReady,
Status: corev1.ConditionUnknown,
Expand All @@ -121,6 +124,9 @@ func TestChannelInitializeConditions(t *testing.T) {
}, {
Type: ChannelConditionProvisioned,
Status: corev1.ConditionFalse,
}, {
Type: ChannelConditionProvisionerInstalled,
Status: corev1.ConditionTrue,
}, {
Type: ChannelConditionReady,
Status: corev1.ConditionUnknown,
Expand All @@ -141,12 +147,15 @@ func TestChannelInitializeConditions(t *testing.T) {
}, {
Type: ChannelConditionProvisioned,
Status: corev1.ConditionTrue,
}, {
Type: ChannelConditionProvisionerInstalled,
Status: corev1.ConditionTrue,
}, {
Type: ChannelConditionReady,
Status: corev1.ConditionUnknown,
}}},
},
}
}},
},
}}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
Expand Down Expand Up @@ -178,6 +187,7 @@ func TestChannelIsReady(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
cs := &ChannelStatus{}
cs.InitializeConditions()
if test.markProvisioned {
cs.MarkProvisioned()
} else {
Expand Down
141 changes: 141 additions & 0 deletions pkg/reconciler/v1alpha1/channel/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
Copyright 2019 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package channel

import (
"context"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/source"

"github.com/golang/glog"
"github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/dynamic"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

const (
// controllerAgentName is the string used by this controller to identify
// itself when creating events.
controllerAgentName = "channel-default-controller"
channelReconciled = "ChannelReconciled"
channelUpdateStatusFailed = "ChannelUpdateStatusFailed"
)

type reconciler struct {
client client.Client
restConfig *rest.Config
dynamicClient dynamic.Interface
recorder record.EventRecorder
}

// Verify the struct implements reconcile.Reconciler
var _ reconcile.Reconciler = &reconciler{}

// ProvideController returns a Channel controller.
// This Channel controller is a default controller for channels of all provisioner kinds
func ProvideController(mgr manager.Manager) (controller.Controller, error) {
// Setup a new controller to Reconcile channel
c, err := controller.New(controllerAgentName, mgr, controller.Options{
Reconciler: &reconciler{
recorder: mgr.GetRecorder(controllerAgentName),
},
})
if err != nil {
return nil, err
}

// Watch channel events
// This controller is no-op when Channels are deleted
if err := c.Watch(
&source.Kind{Type: &v1alpha1.Channel{}},
&handler.EnqueueRequestForObject{},
predicate.Funcs{
DeleteFunc: func(event.DeleteEvent) bool {
return false
},
}); err != nil {
return nil, err
}

return c, nil
}

// Reconcile will check if the channel is being watched by provisioner's channel controller
// This will improve UX. See https://github.com/knative/eventing/issues/779
func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) {
glog.Infof("Reconciling channel %s", request)
akashrv marked this conversation as resolved.
Show resolved Hide resolved
ch := &v1alpha1.Channel{}

// Controller-runtime client Get() always deep copies the object. Hence no need to again deep copy it
err := r.client.Get(context.TODO(), request.NamespacedName, ch)

if errors.IsNotFound(err) {
glog.Errorf("could not find channel %s\n", request)
return reconcile.Result{}, nil
}

if err != nil {
glog.Errorf("could not fetch channel %s: %s\n", request, err)
return reconcile.Result{}, err
}

err = r.reconcile(ch)

if err != nil {
glog.Warningf("Error reconciling channel %s: %s. Will retry.", request, err)
r.recorder.Eventf(ch, corev1.EventTypeWarning, channelUpdateStatusFailed, "Failed to update channel status: %s", request)
return reconcile.Result{Requeue: true}, err
}
glog.Infof("Successfully reconciled channel %s", request)
r.recorder.Eventf(ch, corev1.EventTypeNormal, channelReconciled, "Channel reconciled: %s", request)
return reconcile.Result{Requeue: false}, nil
}

func (r *reconciler) reconcile(ch *v1alpha1.Channel) error {
// Do not Initialize() Status in channel-default-controller. It will set ChannelConditionProvisionerInstalled=True
// Directly call GetCondition(). If the Status was never initialized then GetCondition() will return nil and
// IsUnknown() will return true
c := ch.Status.GetCondition(v1alpha1.ChannelConditionProvisionerInstalled)

if c.IsUnknown() {
ch.Status.MarkProvisionerNotInstalled(
"Provisioner not found.",
"Specified provisioner [Name:%s Kind:%s] is not installed or not controlling the channel.",
ch.Spec.Provisioner.Name,
ch.Spec.Provisioner.Kind,
)
err := r.client.Status().Update(context.TODO(), ch)
return err
}
return nil
}

func (r *reconciler) InjectClient(c client.Client) error {
r.client = c
return nil
}
Loading