diff --git a/Gopkg.lock b/Gopkg.lock index 90555e1c081..5e96b4ce9c6 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1317,6 +1317,7 @@ "sigs.k8s.io/controller-runtime/pkg/event", "sigs.k8s.io/controller-runtime/pkg/handler", "sigs.k8s.io/controller-runtime/pkg/manager", + "sigs.k8s.io/controller-runtime/pkg/predicate", "sigs.k8s.io/controller-runtime/pkg/reconcile", "sigs.k8s.io/controller-runtime/pkg/runtime/inject", "sigs.k8s.io/controller-runtime/pkg/runtime/log", diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 6e982c3c51d..ec840f8a27b 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -23,6 +23,7 @@ import ( "net/http" "time" + "github.com/knative/eventing/pkg/reconciler/v1alpha1/channel" "github.com/knative/eventing/pkg/reconciler/v1alpha1/subscription" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -119,6 +120,7 @@ func main() { // manager run it. providers := []ProvideFunc{ subscription.ProvideController, + channel.ProvideController, } for _, provider := range providers { if _, err := provider(mgr); err != nil { diff --git a/pkg/apis/eventing/v1alpha1/channel_types.go b/pkg/apis/eventing/v1alpha1/channel_types.go index 0771b022ddd..c0fc003aa9a 100644 --- a/pkg/apis/eventing/v1alpha1/channel_types.go +++ b/pkg/apis/eventing/v1alpha1/channel_types.go @@ -77,7 +77,7 @@ type ChannelSpec struct { Subscribable *eventingduck.Subscribable `json:"subscribable,omitempty"` } -var chanCondSet = duckv1alpha1.NewLivingConditionSet(ChannelConditionProvisioned, ChannelConditionAddressable) +var chanCondSet = duckv1alpha1.NewLivingConditionSet(ChannelConditionProvisioned, ChannelConditionAddressable, ChannelConditionProvisionerInstalled) // ChannelStatus represents the current state of a Channel. type ChannelStatus struct { @@ -119,6 +119,10 @@ const ( // ChannelConditionAddressable has status true when this Channel meets // the Addressable contract and has a non-empty hostname. ChannelConditionAddressable duckv1alpha1.ConditionType = "Addressable" + + // ChannelConditionProvisionerFound has status true when the channel is being watched + // by the provisioner's channel controller (in other words, the provisioner is installed) + ChannelConditionProvisionerInstalled duckv1alpha1.ConditionType = "ProvisionerInstalled" ) // GetCondition returns the condition currently associated with the given type, or nil. @@ -134,6 +138,11 @@ func (cs *ChannelStatus) IsReady() bool { // InitializeConditions sets relevant unset conditions to Unknown state. func (cs *ChannelStatus) InitializeConditions() { chanCondSet.Manage(cs).InitializeConditions() + // Channel-default-controller sets ChannelConditionProvisionerInstalled=False, and it needs to be set to True by individual controllers + // This is done so that each individual channel controller gets it for free. + // It is also implied here that the channel-default-controller never calls InitializeConditions(), while individual channel controllers + // call InitializeConditions() as one of the first things in its reconcile loop. + cs.MarkProvisionerInstalled() } // MarkProvisioned sets ChannelConditionProvisioned condition to True state. @@ -146,6 +155,16 @@ func (cs *ChannelStatus) MarkNotProvisioned(reason, messageFormat string, messag chanCondSet.Manage(cs).MarkFalse(ChannelConditionProvisioned, reason, messageFormat, messageA...) } +// MarkProvisionerInstalled sets ChannelConditionProvisionerInstalled condition to True state. +func (cs *ChannelStatus) MarkProvisionerInstalled() { + chanCondSet.Manage(cs).MarkTrue(ChannelConditionProvisionerInstalled) +} + +// MarkProvisionerNotInstalled sets ChannelConditionProvisionerInstalled condition to False state. +func (cs *ChannelStatus) MarkProvisionerNotInstalled(reason, messageFormat string, messageA ...interface{}) { + chanCondSet.Manage(cs).MarkFalse(ChannelConditionProvisionerInstalled, reason, messageFormat, messageA...) +} + // SetAddress makes this Channel addressable by setting the hostname. It also // sets the ChannelConditionAddressable to true. func (cs *ChannelStatus) SetAddress(hostname string) { diff --git a/pkg/apis/eventing/v1alpha1/channel_types_test.go b/pkg/apis/eventing/v1alpha1/channel_types_test.go index 2cbabce3696..1bed66647ec 100644 --- a/pkg/apis/eventing/v1alpha1/channel_types_test.go +++ b/pkg/apis/eventing/v1alpha1/channel_types_test.go @@ -101,6 +101,9 @@ func TestChannelInitializeConditions(t *testing.T) { }, { Type: ChannelConditionProvisioned, Status: corev1.ConditionUnknown, + }, { + Type: ChannelConditionProvisionerInstalled, + Status: corev1.ConditionTrue, }, { Type: ChannelConditionReady, Status: corev1.ConditionUnknown, @@ -121,6 +124,9 @@ func TestChannelInitializeConditions(t *testing.T) { }, { Type: ChannelConditionProvisioned, Status: corev1.ConditionFalse, + }, { + Type: ChannelConditionProvisionerInstalled, + Status: corev1.ConditionTrue, }, { Type: ChannelConditionReady, Status: corev1.ConditionUnknown, @@ -141,12 +147,15 @@ func TestChannelInitializeConditions(t *testing.T) { }, { Type: ChannelConditionProvisioned, Status: corev1.ConditionTrue, + }, { + Type: ChannelConditionProvisionerInstalled, + Status: corev1.ConditionTrue, }, { Type: ChannelConditionReady, Status: corev1.ConditionUnknown, - }}}, - }, - } + }}, + }, + }} for _, test := range tests { t.Run(test.name, func(t *testing.T) { @@ -178,6 +187,7 @@ func TestChannelIsReady(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { cs := &ChannelStatus{} + cs.InitializeConditions() if test.markProvisioned { cs.MarkProvisioned() } else { diff --git a/pkg/reconciler/v1alpha1/channel/channel.go b/pkg/reconciler/v1alpha1/channel/channel.go new file mode 100644 index 00000000000..c02fb49dbfd --- /dev/null +++ b/pkg/reconciler/v1alpha1/channel/channel.go @@ -0,0 +1,141 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package channel + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/source" + + "github.com/golang/glog" + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/dynamic" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +const ( + // controllerAgentName is the string used by this controller to identify + // itself when creating events. + controllerAgentName = "channel-default-controller" + channelReconciled = "ChannelReconciled" + channelUpdateStatusFailed = "ChannelUpdateStatusFailed" +) + +type reconciler struct { + client client.Client + restConfig *rest.Config + dynamicClient dynamic.Interface + recorder record.EventRecorder +} + +// Verify the struct implements reconcile.Reconciler +var _ reconcile.Reconciler = &reconciler{} + +// ProvideController returns a Channel controller. +// This Channel controller is a default controller for channels of all provisioner kinds +func ProvideController(mgr manager.Manager) (controller.Controller, error) { + // Setup a new controller to Reconcile channel + c, err := controller.New(controllerAgentName, mgr, controller.Options{ + Reconciler: &reconciler{ + recorder: mgr.GetRecorder(controllerAgentName), + }, + }) + if err != nil { + return nil, err + } + + // Watch channel events + // This controller is no-op when Channels are deleted + if err := c.Watch( + &source.Kind{Type: &v1alpha1.Channel{}}, + &handler.EnqueueRequestForObject{}, + predicate.Funcs{ + DeleteFunc: func(event.DeleteEvent) bool { + return false + }, + }); err != nil { + return nil, err + } + + return c, nil +} + +// Reconcile will check if the channel is being watched by provisioner's channel controller +// This will improve UX. See https://github.com/knative/eventing/issues/779 +func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) { + glog.Infof("Reconciling channel %s", request) + ch := &v1alpha1.Channel{} + + // Controller-runtime client Get() always deep copies the object. Hence no need to again deep copy it + err := r.client.Get(context.TODO(), request.NamespacedName, ch) + + if errors.IsNotFound(err) { + glog.Errorf("could not find channel %s\n", request) + return reconcile.Result{}, nil + } + + if err != nil { + glog.Errorf("could not fetch channel %s: %s\n", request, err) + return reconcile.Result{}, err + } + + err = r.reconcile(ch) + + if err != nil { + glog.Warningf("Error reconciling channel %s: %s. Will retry.", request, err) + r.recorder.Eventf(ch, corev1.EventTypeWarning, channelUpdateStatusFailed, "Failed to update channel status: %s", request) + return reconcile.Result{Requeue: true}, err + } + glog.Infof("Successfully reconciled channel %s", request) + r.recorder.Eventf(ch, corev1.EventTypeNormal, channelReconciled, "Channel reconciled: %s", request) + return reconcile.Result{Requeue: false}, nil +} + +func (r *reconciler) reconcile(ch *v1alpha1.Channel) error { + // Do not Initialize() Status in channel-default-controller. It will set ChannelConditionProvisionerInstalled=True + // Directly call GetCondition(). If the Status was never initialized then GetCondition() will return nil and + // IsUnknown() will return true + c := ch.Status.GetCondition(v1alpha1.ChannelConditionProvisionerInstalled) + + if c.IsUnknown() { + ch.Status.MarkProvisionerNotInstalled( + "Provisioner not found.", + "Specified provisioner [Name:%s Kind:%s] is not installed or not controlling the channel.", + ch.Spec.Provisioner.Name, + ch.Spec.Provisioner.Kind, + ) + err := r.client.Status().Update(context.TODO(), ch) + return err + } + return nil +} + +func (r *reconciler) InjectClient(c client.Client) error { + r.client = c + return nil +} diff --git a/pkg/reconciler/v1alpha1/channel/channel_test.go b/pkg/reconciler/v1alpha1/channel/channel_test.go new file mode 100644 index 00000000000..b4980d09076 --- /dev/null +++ b/pkg/reconciler/v1alpha1/channel/channel_test.go @@ -0,0 +1,232 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package channel + +import ( + "context" + "errors" + "fmt" + "testing" + + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + controllertesting "github.com/knative/eventing/pkg/reconciler/testing" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +const ( + testNamespace = "testnamespace" + testAPIVersion = "eventing.knative.dev/v1alpha1" + testCCPName = "TestProvisioner" + testCCPKind = "ClusterChannelProvisioner" +) + +var ( + events = map[string]corev1.Event{ + channelReconciled: {Reason: channelReconciled, Type: corev1.EventTypeNormal}, + channelUpdateStatusFailed: {Reason: channelUpdateStatusFailed, Type: corev1.EventTypeWarning}, + } +) + +func init() { + // Add types to scheme + eventingv1alpha1.AddToScheme(scheme.Scheme) + duckv1alpha1.AddToScheme(scheme.Scheme) +} + +func TestAllCases(t *testing.T) { + + testCases := []controllertesting.TestCase{ + { + Name: "No channels exist", + WantErr: false, + WantResult: reconcile.Result{}, + ReconcileKey: fmt.Sprintf("%v/%v", "chan-1", testNamespace), + }, { + Name: "Orphaned channel", + WantErr: false, + WantResult: reconcile.Result{}, + ReconcileKey: fmt.Sprintf("%v/%v", testNamespace, "chan-1"), + InitialState: []runtime.Object{ + Channel("chan-1", testNamespace), + Channel("chan-2", testNamespace).WithProvInstalledStatus(corev1.ConditionTrue), + Channel("chan-3", testNamespace).WithProvInstalledStatus(corev1.ConditionFalse), + }, + WantPresent: []runtime.Object{ + Channel("chan-1", testNamespace).WithProvInstalledStatus(corev1.ConditionFalse), + Channel("chan-2", testNamespace).WithProvInstalledStatus(corev1.ConditionTrue), + Channel("chan-3", testNamespace).WithProvInstalledStatus(corev1.ConditionFalse), + }, + WantEvent: []corev1.Event{ + events[channelReconciled], + }, + }, { + Name: "Non-oprphaned channel test 1", + WantErr: false, + WantResult: reconcile.Result{}, + ReconcileKey: fmt.Sprintf("%v/%v", testNamespace, "chan-2"), + InitialState: []runtime.Object{ + Channel("chan-1", testNamespace), + Channel("chan-2", testNamespace).WithProvInstalledStatus(corev1.ConditionTrue), + Channel("chan-3", testNamespace).WithProvInstalledStatus(corev1.ConditionFalse), + }, + WantPresent: []runtime.Object{ + Channel("chan-1", testNamespace), + Channel("chan-2", testNamespace).WithProvInstalledStatus(corev1.ConditionTrue), + Channel("chan-3", testNamespace).WithProvInstalledStatus(corev1.ConditionFalse), + }, + WantEvent: []corev1.Event{ + events[channelReconciled], + }, + }, { + Name: "Non-oprphaned channel test 2", + WantErr: false, + WantResult: reconcile.Result{}, + ReconcileKey: fmt.Sprintf("%v/%v", testNamespace, "chan-3"), + InitialState: []runtime.Object{ + Channel("chan-1", testNamespace), + Channel("chan-2", testNamespace).WithProvInstalledStatus(corev1.ConditionTrue), + Channel("chan-3", testNamespace).WithProvInstalledStatus(corev1.ConditionFalse), + }, + WantPresent: []runtime.Object{ + Channel("chan-1", testNamespace), + Channel("chan-2", testNamespace).WithProvInstalledStatus(corev1.ConditionTrue), + Channel("chan-3", testNamespace).WithProvInstalledStatus(corev1.ConditionFalse), + }, + WantEvent: []corev1.Event{ + events[channelReconciled], + }, + }, { + Name: "Fail orphaned channel status update", + WantErr: true, + WantErrMsg: "Update failed", + WantResult: reconcile.Result{Requeue: true}, + ReconcileKey: fmt.Sprintf("%v/%v", testNamespace, "chan-1"), + InitialState: []runtime.Object{ + Channel("chan-1", testNamespace), + Channel("chan-2", testNamespace).WithProvInstalledStatus(corev1.ConditionTrue), + Channel("chan-3", testNamespace).WithProvInstalledStatus(corev1.ConditionFalse), + }, + WantPresent: []runtime.Object{ + Channel("chan-1", testNamespace), + Channel("chan-2", testNamespace).WithProvInstalledStatus(corev1.ConditionTrue), + Channel("chan-3", testNamespace).WithProvInstalledStatus(corev1.ConditionFalse), + }, + WantEvent: []corev1.Event{ + events[channelUpdateStatusFailed], + }, + Mocks: controllertesting.Mocks{ + MockStatusUpdates: []controllertesting.MockStatusUpdate{failUpdate}, + }, + }, + } + for _, tc := range testCases { + c := tc.GetClient() + dc := tc.GetDynamicClient() + recorder := tc.GetEventRecorder() + + r := &reconciler{ + client: c, + dynamicClient: dc, + restConfig: &rest.Config{}, + recorder: recorder, + } + tc.IgnoreTimes = true + t.Run(tc.Name, tc.Runner(t, r, c, recorder)) + } +} + +func failUpdate(innerClient client.Client, ctx context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { + return controllertesting.Handled, errors.New("Update failed") +} + +type ChannelBuilder struct { + *eventingv1alpha1.Channel +} + +// Verify the Builder implements Buildable +var _ controllertesting.Buildable = &ChannelBuilder{} + +func (s *ChannelBuilder) Build() runtime.Object { + return s.Channel +} + +func Channel(name string, namespace string) *ChannelBuilder { + channel := getTestChannelWithoutStatus(name, namespace) + return &ChannelBuilder{ + Channel: channel, + } +} + +func (cb *ChannelBuilder) WithProvInstalledStatus(provInstalledStatus corev1.ConditionStatus) *ChannelBuilder { + cb.Status = eventingv1alpha1.ChannelStatus{} + + switch provInstalledStatus { + case corev1.ConditionTrue: + cb.Status.MarkProvisionerInstalled() + break + case corev1.ConditionFalse: + cb.Status.MarkProvisionerNotInstalled( + "Provisioner not found.", + "Specified provisioner [Name:%v Kind:%v] is not installed or not controlling the channel.", + testCCPName, + testCCPKind, + ) + break + } + return cb +} + +func getTestChannelWithoutStatus(name string, namespace string) *eventingv1alpha1.Channel { + ch := &eventingv1alpha1.Channel{} + ch.APIVersion = testAPIVersion + ch.Namespace = testNamespace + ch.Kind = "Channel" + ch.Name = name + ch.Namespace = namespace + ch.Spec = getTestChannelSpec() + return ch +} + +func getTestChannelSpec() eventingv1alpha1.ChannelSpec { + chSpec := eventingv1alpha1.ChannelSpec{ + Provisioner: &corev1.ObjectReference{}, + Subscribable: &eventingduck.Subscribable{ + Subscribers: []eventingduck.ChannelSubscriberSpec{ + getTestSubscriberSpec(), + getTestSubscriberSpec(), + }, + }, + } + chSpec.Provisioner.Name = testCCPName + chSpec.Provisioner.APIVersion = testAPIVersion + chSpec.Provisioner.Kind = testCCPKind + return chSpec +} + +func getTestSubscriberSpec() eventingduck.ChannelSubscriberSpec { + return eventingduck.ChannelSubscriberSpec{ + SubscriberURI: "TestSubscriberURI", + ReplyURI: "TestReplyURI", + } +}