diff --git a/config/provisioners/in-memory-channel/in-memory-channel.yaml b/config/provisioners/in-memory-channel/in-memory-channel.yaml index 51dc7b1a957..0445e341e22 100644 --- a/config/provisioners/in-memory-channel/in-memory-channel.yaml +++ b/config/provisioners/in-memory-channel/in-memory-channel.yaml @@ -92,6 +92,14 @@ rules: - watch - create - update + - apiGroups: + - "" # Core API Group. + resources: + - events + verbs: + - create + - patch + - update --- diff --git a/contrib/kafka/pkg/controller/channel/reconcile_test.go b/contrib/kafka/pkg/controller/channel/reconcile_test.go index 715c1d9efff..6fdb03bd29b 100644 --- a/contrib/kafka/pkg/controller/channel/reconcile_test.go +++ b/contrib/kafka/pkg/controller/channel/reconcile_test.go @@ -37,7 +37,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -223,13 +222,13 @@ var testCases = []controllertesting.TestCase{ } func TestAllCases(t *testing.T) { - recorder := record.NewBroadcaster().NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) for _, tc := range testCases { tc.ReconcileKey = fmt.Sprintf("%s/%s", testNS, channelName) tc.IgnoreTimes = true c := tc.GetClient() + recorder := tc.GetEventRecorder() logger := provisioners.NewProvisionerLoggerFromConfig(provisioners.NewLoggingConfig()) r := &reconciler{ client: c, @@ -239,7 +238,7 @@ func TestAllCases(t *testing.T) { kafkaClusterAdmin: &mockClusterAdmin{}, } t.Logf("Running test %s", tc.Name) - t.Run(tc.Name, tc.Runner(t, r, c)) + t.Run(tc.Name, tc.Runner(t, r, c, recorder)) } } diff --git a/contrib/kafka/pkg/controller/reconcile_test.go b/contrib/kafka/pkg/controller/reconcile_test.go index 3a4798b169e..b8f3d4772da 100644 --- a/contrib/kafka/pkg/controller/reconcile_test.go +++ b/contrib/kafka/pkg/controller/reconcile_test.go @@ -29,7 +29,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -132,10 +131,10 @@ var testCases = []controllertesting.TestCase{ } func TestAllCases(t *testing.T) { - recorder := record.NewBroadcaster().NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) for _, tc := range testCases { c := tc.GetClient() + recorder := tc.GetEventRecorder() logger := provisioners.NewProvisionerLoggerFromConfig(provisioners.NewLoggingConfig()) r := &reconciler{ client: c, @@ -144,7 +143,7 @@ func TestAllCases(t *testing.T) { config: getControllerConfig(), } t.Logf("Running test %s", tc.Name) - t.Run(tc.Name, tc.Runner(t, r, c)) + t.Run(tc.Name, tc.Runner(t, r, c, recorder)) } } diff --git a/pkg/controller/eventing/inmemory/channel/reconcile.go b/pkg/controller/eventing/inmemory/channel/reconcile.go index c1c0d3d1cce..079a3b62d38 100644 --- a/pkg/controller/eventing/inmemory/channel/reconcile.go +++ b/pkg/controller/eventing/inmemory/channel/reconcile.go @@ -39,6 +39,13 @@ import ( const ( finalizerName = controllerAgentName + + // Name of the corev1.Events emitted from the reconciliation process + channelReconciled = "ChannelReconciled" + channelUpdateStatusFailed = "ChannelUpdateStatusFailed" + channelConfigSyncFailed = "ChannelConfigSyncFailed" + k8sServiceCreateFailed = "K8sServiceCreateFailed" + virtualServiceCreateFailed = "VirtualServiceCreateFailed" ) type reconciler struct { @@ -93,10 +100,14 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err logger.Info("Error reconciling Channel", zap.Error(err)) // Note that we do not return the error here, because we want to update the Status // regardless of the error. + } else { + logger.Info("Channel reconciled") + r.recorder.Eventf(c, corev1.EventTypeNormal, channelReconciled, "Channel reconciled: %q", c.Name) } if updateStatusErr := util.UpdateChannel(ctx, r.client, c); updateStatusErr != nil { logger.Info("Error updating Channel Status", zap.Error(updateStatusErr)) + r.recorder.Eventf(c, corev1.EventTypeWarning, channelUpdateStatusFailed, "Failed to update Channel's status: %v", err) return reconcile.Result{}, updateStatusErr } @@ -124,7 +135,8 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) // We always need to sync the Channel config, so do it first. if err := r.syncChannelConfig(ctx); err != nil { - logger.Info("Error updating syncing the Channel config", zap.Error(err)) + logger.Info("Error syncing the Channel config", zap.Error(err)) + r.recorder.Eventf(c, corev1.EventTypeWarning, channelConfigSyncFailed, "Failed to sync Channel config: %v", err) return err } @@ -140,6 +152,7 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) svc, err := util.CreateK8sService(ctx, r.client, c) if err != nil { logger.Info("Error creating the Channel's K8s Service", zap.Error(err)) + r.recorder.Eventf(c, corev1.EventTypeWarning, k8sServiceCreateFailed, "Failed to reconcile Channel's K8s Service: %v", err) return err } c.Status.SetAddress(controller.ServiceHostName(svc.Name, svc.Namespace)) @@ -147,6 +160,7 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) _, err = util.CreateVirtualService(ctx, r.client, c, svc) if err != nil { logger.Info("Error creating the Virtual Service for the Channel", zap.Error(err)) + r.recorder.Eventf(c, corev1.EventTypeWarning, virtualServiceCreateFailed, "Failed to reconcile Virtual Service for the Channel: %v", err) return err } diff --git a/pkg/controller/eventing/inmemory/channel/reconcile_test.go b/pkg/controller/eventing/inmemory/channel/reconcile_test.go index 27bb7298e23..7c698de1262 100644 --- a/pkg/controller/eventing/inmemory/channel/reconcile_test.go +++ b/pkg/controller/eventing/inmemory/channel/reconcile_test.go @@ -38,7 +38,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" ) @@ -178,6 +177,15 @@ var ( }, }, } + + // map of events to set test cases' expectations easier + events = map[string]corev1.Event{ + channelReconciled: {Reason: channelReconciled, Type: corev1.EventTypeNormal}, + channelUpdateStatusFailed: {Reason: channelUpdateStatusFailed, Type: corev1.EventTypeWarning}, + channelConfigSyncFailed: {Reason: channelConfigSyncFailed, Type: corev1.EventTypeWarning}, + k8sServiceCreateFailed: {Reason: k8sServiceCreateFailed, Type: corev1.EventTypeWarning}, + virtualServiceCreateFailed: {Reason: virtualServiceCreateFailed, Type: corev1.EventTypeWarning}, + } ) func init() { @@ -252,6 +260,9 @@ func TestReconcile(t *testing.T) { makeDeletingChannel(), }, WantErrMsg: testErrorMessage, + WantEvent: []corev1.Event{ + events[channelConfigSyncFailed], + }, }, { Name: "Channel deleted - finalizer removed", @@ -261,6 +272,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeDeletingChannelWithoutFinalizer(), }, + WantEvent: []corev1.Event{ + events[channelReconciled], + }, }, { Name: "Channel config sync fails - can't list Channels", @@ -271,6 +285,9 @@ func TestReconcile(t *testing.T) { MockLists: errorListingChannels(), }, WantErrMsg: testErrorMessage, + WantEvent: []corev1.Event{ + events[channelConfigSyncFailed], + }, }, { Name: "Channel config sync fails - can't get ConfigMap", @@ -281,6 +298,9 @@ func TestReconcile(t *testing.T) { MockGets: errorGettingConfigMap(), }, WantErrMsg: testErrorMessage, + WantEvent: []corev1.Event{ + events[channelConfigSyncFailed], + }, }, { Name: "Channel config sync fails - can't create ConfigMap", @@ -291,6 +311,9 @@ func TestReconcile(t *testing.T) { MockCreates: errorCreatingConfigMap(), }, WantErrMsg: testErrorMessage, + WantEvent: []corev1.Event{ + events[channelConfigSyncFailed], + }, }, { Name: "Channel config sync fails - can't update ConfigMap", @@ -302,6 +325,9 @@ func TestReconcile(t *testing.T) { MockUpdates: errorUpdatingConfigMap(), }, WantErrMsg: testErrorMessage, + WantEvent: []corev1.Event{ + events[channelConfigSyncFailed], + }, }, { Name: "K8s service get fails", @@ -316,6 +342,9 @@ func TestReconcile(t *testing.T) { makeChannelWithFinalizer(), }, WantErrMsg: testErrorMessage, + WantEvent: []corev1.Event{ + events[k8sServiceCreateFailed], + }, }, { Name: "K8s service creation fails", @@ -331,6 +360,9 @@ func TestReconcile(t *testing.T) { makeChannelWithFinalizer(), }, WantErrMsg: testErrorMessage, + WantEvent: []corev1.Event{ + events[k8sServiceCreateFailed], + }, }, { Name: "Virtual service get fails", @@ -349,6 +381,9 @@ func TestReconcile(t *testing.T) { makeChannelWithFinalizerAndAddress(), }, WantErrMsg: testErrorMessage, + WantEvent: []corev1.Event{ + events[virtualServiceCreateFailed], + }, }, { Name: "Virtual service creation fails", @@ -366,6 +401,9 @@ func TestReconcile(t *testing.T) { makeChannelWithFinalizerAndAddress(), }, WantErrMsg: testErrorMessage, + WantEvent: []corev1.Event{ + events[virtualServiceCreateFailed], + }, }, { Name: "Channel get for update fails", @@ -379,6 +417,9 @@ func TestReconcile(t *testing.T) { MockGets: errorOnSecondChannelGet(), }, WantErrMsg: testErrorMessage, + WantEvent: []corev1.Event{ + events[channelReconciled], events[channelUpdateStatusFailed], + }, }, { Name: "Channel update fails", @@ -392,6 +433,9 @@ func TestReconcile(t *testing.T) { MockUpdates: errorUpdatingChannel(), }, WantErrMsg: testErrorMessage, + WantEvent: []corev1.Event{ + events[channelReconciled], events[channelUpdateStatusFailed], + }, }, { Name: "Channel status update fails", InitialState: []runtime.Object{ @@ -404,6 +448,9 @@ func TestReconcile(t *testing.T) { MockStatusUpdates: errorUpdatingChannelStatus(), }, WantErrMsg: testErrorMessage, + WantEvent: []corev1.Event{ + events[channelReconciled], events[channelUpdateStatusFailed], + }, }, { Name: "Channel reconcile successful - Channel list follows pagination", InitialState: []runtime.Object{ @@ -424,6 +471,9 @@ func TestReconcile(t *testing.T) { makeVirtualService(), makeConfigMapWithVerifyConfigMapData(), }, + WantEvent: []corev1.Event{ + events[channelReconciled], + }, }, { Name: "Channel reconcile successful - Channel has no subscribers", @@ -464,15 +514,19 @@ func TestReconcile(t *testing.T) { makeVirtualService(), makeConfigMapWithVerifyConfigMapData(), }, + WantEvent: []corev1.Event{ + events[channelReconciled], + }, }, } - recorder := record.NewBroadcaster().NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) + for _, tc := range testCases { configMapKey := types.NamespacedName{ Namespace: cmNamespace, Name: cmName, } c := tc.GetClient() + recorder := tc.GetEventRecorder() r := &reconciler{ client: c, recorder: recorder, @@ -483,7 +537,7 @@ func TestReconcile(t *testing.T) { tc.ReconcileKey = fmt.Sprintf("/%s", cName) } tc.IgnoreTimes = true - t.Run(tc.Name, tc.Runner(t, r, c)) + t.Run(tc.Name, tc.Runner(t, r, c, recorder)) } } diff --git a/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile.go b/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile.go index 5eb68ef5250..15e861f8896 100644 --- a/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile.go +++ b/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile.go @@ -40,6 +40,12 @@ const ( // Channel is the name of the Channel resource in eventing.knative.dev/v1alpha1. Channel = "Channel" + + // Name of the corev1.Events emitted from the reconciliation process + ccpReconciled = "CcpReconciled" + ccpUpdateStatusFailed = "CcpUpdateStatusFailed" + k8sServiceCreateFailed = "K8sServiceCreateFailed" + k8sServiceDeleteFailed = "K8sServiceDeleteFailed" ) type reconciler struct { @@ -98,10 +104,14 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err logger.Info("Error reconciling ClusterChannelProvisioner", zap.Error(err)) // Note that we do not return the error here, because we want to update the Status // regardless of the error. + } else { + logger.Info("ClusterChannelProvisioner reconciled") + r.recorder.Eventf(ccp, corev1.EventTypeNormal, ccpReconciled, "ClusterChannelProvisioner reconciled: %q", ccp.Name) } if updateStatusErr := util.UpdateClusterChannelProvisionerStatus(ctx, r.client, ccp); updateStatusErr != nil { logger.Info("Error updating ClusterChannelProvisioner Status", zap.Error(updateStatusErr)) + r.recorder.Eventf(ccp, corev1.EventTypeWarning, ccpUpdateStatusFailed, "Failed to update ClusterChannelProvisioner's status: %v", err) return reconcile.Result{}, updateStatusErr } @@ -140,6 +150,7 @@ func (r *reconciler) reconcile(ctx context.Context, ccp *eventingv1alpha1.Cluste if err != nil { logger.Info("Error creating the ClusterChannelProvisioner's K8s Service", zap.Error(err)) + r.recorder.Eventf(ccp, corev1.EventTypeWarning, k8sServiceCreateFailed, "Failed to reconcile ClusterChannelProvisioner's K8s Service: %v", err) return err } @@ -153,6 +164,7 @@ func (r *reconciler) reconcile(ctx context.Context, ccp *eventingv1alpha1.Cluste err = r.deleteOldDispatcherService(ctx, ccp) if err != nil { logger.Info("Error deleting the old ClusterChannelProvisioner's K8s Service", zap.Error(err)) + r.recorder.Eventf(ccp, corev1.EventTypeWarning, k8sServiceDeleteFailed, "Failed to delete the old ClusterChannelProvisioner's K8s Service: %v", err) return err } diff --git a/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile_test.go b/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile_test.go index 8c299ffb288..4ad88ddd2de 100644 --- a/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile_test.go +++ b/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile_test.go @@ -29,7 +29,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -51,6 +50,14 @@ var ( deletionTime = metav1.Now().Rfc3339Copy() truePointer = true + + // map of events to set test cases' expectations easier + events = map[string]corev1.Event{ + ccpReconciled: {Reason: ccpReconciled, Type: corev1.EventTypeNormal}, + ccpUpdateStatusFailed: {Reason: ccpUpdateStatusFailed, Type: corev1.EventTypeWarning}, + k8sServiceCreateFailed: {Reason: k8sServiceCreateFailed, Type: corev1.EventTypeWarning}, + k8sServiceDeleteFailed: {Reason: k8sServiceDeleteFailed, Type: corev1.EventTypeWarning}, + } ) func init() { @@ -156,6 +163,9 @@ func TestReconcile(t *testing.T) { InitialState: []runtime.Object{ makeDeletingClusterChannelProvisioner(), }, + WantEvent: []corev1.Event{ + events[ccpReconciled], + }, }, { Name: "Create dispatcher fails", @@ -168,6 +178,9 @@ func TestReconcile(t *testing.T) { }, }, WantErrMsg: testErrorMessage, + WantEvent: []corev1.Event{ + events[k8sServiceCreateFailed], + }, }, { Name: "Create dispatcher - already exists", @@ -178,6 +191,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeReadyClusterChannelProvisioner(), }, + WantEvent: []corev1.Event{ + events[ccpReconciled], + }, }, { Name: "Delete old dispatcher", @@ -192,6 +208,9 @@ func TestReconcile(t *testing.T) { WantAbsent: []runtime.Object{ makeOldK8sService(), }, + WantEvent: []corev1.Event{ + events[ccpReconciled], + }, }, { Name: "Create dispatcher - not owned by CCP", @@ -202,6 +221,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeReadyClusterChannelProvisioner(), }, + WantEvent: []corev1.Event{ + events[ccpReconciled], + }, }, { Name: "Create dispatcher succeeds", @@ -212,6 +234,9 @@ func TestReconcile(t *testing.T) { makeReadyClusterChannelProvisioner(), makeK8sService(), }, + WantEvent: []corev1.Event{ + events[ccpReconciled], + }, }, { Name: "Create dispatcher succeeds - request is namespace-scoped", @@ -223,6 +248,9 @@ func TestReconcile(t *testing.T) { makeK8sService(), }, ReconcileKey: fmt.Sprintf("%s/%s", testNS, Name), + WantEvent: []corev1.Event{ + events[ccpReconciled], + }, }, { Name: "Error getting CCP for updating Status", @@ -235,6 +263,9 @@ func TestReconcile(t *testing.T) { MockGets: oneSuccessfulClusterChannelProvisionerGet(), }, WantErrMsg: testErrorMessage, + WantEvent: []corev1.Event{ + events[ccpReconciled], events[ccpUpdateStatusFailed], + }, }, { Name: "Error updating Status", @@ -249,11 +280,15 @@ func TestReconcile(t *testing.T) { }, }, WantErrMsg: testErrorMessage, + WantEvent: []corev1.Event{ + events[ccpReconciled], events[ccpUpdateStatusFailed], + }, }, } - recorder := record.NewBroadcaster().NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) + for _, tc := range testCases { c := tc.GetClient() + recorder := tc.GetEventRecorder() r := &reconciler{ client: c, recorder: recorder, @@ -263,7 +298,7 @@ func TestReconcile(t *testing.T) { tc.ReconcileKey = fmt.Sprintf("/%s", Name) } tc.IgnoreTimes = true - t.Run(tc.Name, tc.Runner(t, r, c)) + t.Run(tc.Name, tc.Runner(t, r, c, recorder)) } } diff --git a/pkg/controller/eventing/subscription/reconcile.go b/pkg/controller/eventing/subscription/reconcile.go index d867dfd6987..def95e2a1a0 100644 --- a/pkg/controller/eventing/subscription/reconcile.go +++ b/pkg/controller/eventing/subscription/reconcile.go @@ -42,6 +42,14 @@ import ( const ( finalizerName = controllerAgentName + + // Name of the corev1.Events emitted from the reconciliation process + subscriptionReconciled = "SubscriptionReconciled" + subscriptionUpdateStatusFailed = "SubscriptionUpdateStatusFailed" + physicalChannelSyncFailed = "PhysicalChannelSyncFailed" + channelReferenceFetchFailed = "ChannelReferenceFetchFailed" + subscriberResolveFailed = "SubscriberResolveFailed" + resultResolveFailed = "ResultResolveFailed" ) // Reconcile compares the actual state with the desired, and attempts to @@ -65,8 +73,16 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err // Reconcile this copy of the Subscription and then write back any status // updates regardless of whether the reconcile error out. err = r.reconcile(subscription) + if err != nil { + glog.Warningf("Error reconciling Subscription: %v", err) + } else { + glog.Info("Subscription reconciled") + r.recorder.Eventf(subscription, corev1.EventTypeNormal, subscriptionReconciled, "Subscription reconciled: %q", subscription.Name) + } + if _, updateStatusErr := r.updateStatus(subscription.DeepCopy()); updateStatusErr != nil { glog.Warningf("Failed to update subscription status: %v", updateStatusErr) + r.recorder.Eventf(subscription, corev1.EventTypeWarning, subscriptionUpdateStatusFailed, "Failed to update Subscription's status: %v", err) return reconcile.Result{}, updateStatusErr } @@ -93,6 +109,7 @@ func (r *reconciler) reconcile(subscription *v1alpha1.Subscription) error { err := r.syncPhysicalChannel(subscription, true) if err != nil { glog.Warningf("Failed to sync physical from Channel : %s", err) + r.recorder.Eventf(subscription, corev1.EventTypeWarning, physicalChannelSyncFailed, "Failed to sync physical Channel: %v", err) return err } } @@ -104,11 +121,13 @@ func (r *reconciler) reconcile(subscription *v1alpha1.Subscription) error { _, err = r.fetchObjectReference(subscription.Namespace, &subscription.Spec.Channel) if err != nil { glog.Warningf("Failed to validate `channel` exists: %+v, %v", subscription.Spec.Channel, err) + r.recorder.Eventf(subscription, corev1.EventTypeWarning, channelReferenceFetchFailed, "Failed to validate spec.channel exists: %v", err) return err } if subscriberURI, err := r.resolveSubscriberSpec(subscription.Namespace, subscription.Spec.Subscriber); err != nil { glog.Warningf("Failed to resolve Subscriber %+v : %s", *subscription.Spec.Subscriber, err) + r.recorder.Eventf(subscription, corev1.EventTypeWarning, subscriberResolveFailed, "Failed to resolve spec.subscriber: %v", err) return err } else { subscription.Status.PhysicalSubscription.SubscriberURI = subscriberURI @@ -117,6 +136,7 @@ func (r *reconciler) reconcile(subscription *v1alpha1.Subscription) error { if replyURI, err := r.resolveResult(subscription.Namespace, subscription.Spec.Reply); err != nil { glog.Warningf("Failed to resolve Result %v : %v", subscription.Spec.Reply, err) + r.recorder.Eventf(subscription, corev1.EventTypeWarning, resultResolveFailed, "Failed to resolve spec.reply: %v", err) return err } else { subscription.Status.PhysicalSubscription.ReplyURI = replyURI @@ -131,6 +151,7 @@ func (r *reconciler) reconcile(subscription *v1alpha1.Subscription) error { err = r.syncPhysicalChannel(subscription, false) if err != nil { glog.Warningf("Failed to sync physical Channel : %s", err) + r.recorder.Eventf(subscription, corev1.EventTypeWarning, physicalChannelSyncFailed, "Failed to sync physical Channel: %v", err) return err } // Everything went well, set the fact that subscriptions have been modified diff --git a/pkg/controller/eventing/subscription/reconcile_test.go b/pkg/controller/eventing/subscription/reconcile_test.go index 4b7383e7583..d719fcccf4b 100644 --- a/pkg/controller/eventing/subscription/reconcile_test.go +++ b/pkg/controller/eventing/subscription/reconcile_test.go @@ -31,7 +31,6 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" - "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -41,6 +40,16 @@ var ( // deletionTime is used when objects are marked as deleted. Rfc3339Copy() // truncates to seconds to match the loss of precision during serialization. deletionTime = metav1.Now().Rfc3339Copy() + + // map of events to set test cases' expectations easier + events = map[string]corev1.Event{ + subscriptionReconciled: {Reason: subscriptionReconciled, Type: corev1.EventTypeNormal}, + subscriptionUpdateStatusFailed: {Reason: subscriptionUpdateStatusFailed, Type: corev1.EventTypeWarning}, + physicalChannelSyncFailed: {Reason: physicalChannelSyncFailed, Type: corev1.EventTypeWarning}, + channelReferenceFetchFailed: {Reason: channelReferenceFetchFailed, Type: corev1.EventTypeWarning}, + subscriberResolveFailed: {Reason: subscriberResolveFailed, Type: corev1.EventTypeWarning}, + resultResolveFailed: {Reason: resultResolveFailed, Type: corev1.EventTypeWarning}, + } ) const ( @@ -78,6 +87,9 @@ var testCases = []controllertesting.TestCase{ Subscription(), }, WantErrMsg: `channels.eventing.knative.dev "fromchannel" not found`, + WantEvent: []corev1.Event{ + events[channelReferenceFetchFailed], + }, }, { Name: "subscription, but From is not subscribable", InitialState: []runtime.Object{ @@ -88,7 +100,10 @@ var testCases = []controllertesting.TestCase{ // failure for now, until upstream is fixed. It should actually fail saying that there is no // Spec.Subscribers field. WantErrMsg: "invalid JSON document", - Scheme: scheme.Scheme, + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Scheme: scheme.Scheme, Objects: []runtime.Object{ // Source channel &unstructured.Unstructured{ @@ -147,6 +162,9 @@ var testCases = []controllertesting.TestCase{ WantPresent: []runtime.Object{ Subscription().UnknownConditions(), }, + WantEvent: []corev1.Event{ + events[subscriberResolveFailed], + }, Scheme: scheme.Scheme, Objects: []runtime.Object{ // Source channel @@ -173,7 +191,10 @@ var testCases = []controllertesting.TestCase{ Subscription().UnknownConditions(), }, WantErrMsg: "status does not contain address", - Scheme: scheme.Scheme, + WantEvent: []corev1.Event{ + events[subscriberResolveFailed], + }, + Scheme: scheme.Scheme, Objects: []runtime.Object{ // Source channel &unstructured.Unstructured{ @@ -213,7 +234,10 @@ var testCases = []controllertesting.TestCase{ Subscription().UnknownConditions().PhysicalSubscriber(targetDNS), }, WantErrMsg: `channels.eventing.knative.dev "resultchannel" not found`, - Scheme: scheme.Scheme, + WantEvent: []corev1.Event{ + events[resultResolveFailed], + }, + Scheme: scheme.Scheme, Objects: []runtime.Object{ // Source channel &unstructured.Unstructured{ @@ -258,6 +282,9 @@ var testCases = []controllertesting.TestCase{ // Subscription().ReferencesResolved(), Subscription().UnknownConditions().PhysicalSubscriber(targetDNS), }, + WantEvent: []corev1.Event{ + events[resultResolveFailed], + }, Scheme: scheme.Scheme, Objects: []runtime.Object{ // Source channel @@ -318,7 +345,10 @@ var testCases = []controllertesting.TestCase{ Subscription().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), }, WantErrMsg: "invalid JSON document", - Scheme: scheme.Scheme, + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Scheme: scheme.Scheme, Objects: []runtime.Object{ // Source channel &unstructured.Unstructured{ @@ -383,7 +413,10 @@ var testCases = []controllertesting.TestCase{ Subscription().NilReply().ReferencesResolved().PhysicalSubscriber(targetDNS), }, WantErrMsg: "invalid JSON document", - Scheme: scheme.Scheme, + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Scheme: scheme.Scheme, Objects: []runtime.Object{ // Source channel &unstructured.Unstructured{ @@ -429,7 +462,10 @@ var testCases = []controllertesting.TestCase{ Subscription().ReferencesResolved().PhysicalSubscriber(targetDNS).EmptyNonNilReply(), }, WantErrMsg: "invalid JSON document", - Scheme: scheme.Scheme, + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Scheme: scheme.Scheme, Objects: []runtime.Object{ // Source channel &unstructured.Unstructured{ @@ -475,7 +511,10 @@ var testCases = []controllertesting.TestCase{ Subscription().ReferencesResolved().PhysicalSubscriber(targetDNS).EmptyNonNilReply(), }, WantErrMsg: "invalid JSON document", - Scheme: scheme.Scheme, + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Scheme: scheme.Scheme, Objects: []runtime.Object{ // Source channel &unstructured.Unstructured{ @@ -520,7 +559,10 @@ var testCases = []controllertesting.TestCase{ Subscription().NilSubscriber().ReferencesResolved().Reply(), }, WantErrMsg: "invalid JSON document", - Scheme: scheme.Scheme, + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Scheme: scheme.Scheme, Objects: []runtime.Object{ // Source channel &unstructured.Unstructured{ @@ -570,7 +612,10 @@ var testCases = []controllertesting.TestCase{ Subscription().NilReply().ReferencesResolved().PhysicalSubscriber(targetDNS), }, WantErrMsg: "invalid JSON document", - Scheme: scheme.Scheme, + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Scheme: scheme.Scheme, Objects: []runtime.Object{ // Source channel &unstructured.Unstructured{ @@ -614,7 +659,10 @@ var testCases = []controllertesting.TestCase{ Subscription().NilSubscriber().ReferencesResolved().Reply(), }, WantErrMsg: "invalid JSON document", - Scheme: scheme.Scheme, + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Scheme: scheme.Scheme, Objects: []runtime.Object{ // Source channel &unstructured.Unstructured{ @@ -663,7 +711,10 @@ var testCases = []controllertesting.TestCase{ Subscription().EmptyNonNilSubscriber().ReferencesResolved().Reply(), }, WantErrMsg: "invalid JSON document", - Scheme: scheme.Scheme, + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Scheme: scheme.Scheme, Objects: []runtime.Object{ // Source channel &unstructured.Unstructured{ @@ -709,7 +760,10 @@ var testCases = []controllertesting.TestCase{ Subscription().ToK8sService().UnknownConditions(), }, WantErrMsg: "services \"testk8sservice\" not found", - Scheme: scheme.Scheme, + WantEvent: []corev1.Event{ + events[subscriberResolveFailed], + }, + Scheme: scheme.Scheme, Objects: []runtime.Object{ // Source channel &unstructured.Unstructured{ @@ -740,7 +794,10 @@ var testCases = []controllertesting.TestCase{ Subscription().ToK8sService().ReferencesResolved().PhysicalSubscriber(k8sServiceDNS).Reply(), }, WantErrMsg: "invalid JSON document", - Scheme: scheme.Scheme, + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Scheme: scheme.Scheme, Objects: []runtime.Object{ // Source channel &unstructured.Unstructured{ @@ -800,6 +857,9 @@ var testCases = []controllertesting.TestCase{ WantPresent: []runtime.Object{ Subscription().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), }, + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, Scheme: scheme.Scheme, Objects: []runtime.Object{ // Source with a reference to the From Channel @@ -895,6 +955,9 @@ var testCases = []controllertesting.TestCase{ Subscription().Renamed().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), Subscription().DifferentChannel(), }, + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, Scheme: scheme.Scheme, Objects: []runtime.Object{ // Source with a reference to the From Channel @@ -988,6 +1051,9 @@ var testCases = []controllertesting.TestCase{ // out. //getChannelWithOtherSubscription(), }, + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, Objects: []runtime.Object{ // Source channel &unstructured.Unstructured{ @@ -1019,11 +1085,11 @@ var testCases = []controllertesting.TestCase{ } func TestAllCases(t *testing.T) { - recorder := record.NewBroadcaster().NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) for _, tc := range testCases { c := tc.GetClient() dc := tc.GetDynamicClient() + recorder := tc.GetEventRecorder() r := &reconciler{ client: c, @@ -1033,7 +1099,7 @@ func TestAllCases(t *testing.T) { } tc.ReconcileKey = fmt.Sprintf("%s/%s", testNS, subscriptionName) tc.IgnoreTimes = true - t.Run(tc.Name, tc.Runner(t, r, c)) + t.Run(tc.Name, tc.Runner(t, r, c, recorder)) } } diff --git a/pkg/controller/testing/mock_event_recorder.go b/pkg/controller/testing/mock_event_recorder.go new file mode 100644 index 00000000000..793d8f56fe5 --- /dev/null +++ b/pkg/controller/testing/mock_event_recorder.go @@ -0,0 +1,59 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +// MockEventRecorder is a recorder.EventRecorder that saves emitted v1 Events. +type MockEventRecorder struct { + events []corev1.Event +} + +func NewEventRecorder() *MockEventRecorder { + return &MockEventRecorder{} +} + +func (m *MockEventRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) { + appendEvent(eventtype, reason, m) +} + +func (m *MockEventRecorder) Event(object runtime.Object, eventtype, reason, message string) { + appendEvent(eventtype, reason, m) +} + +func (m *MockEventRecorder) PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{}) { + appendEvent(eventtype, reason, m) +} + +func (m *MockEventRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) { + appendEvent(eventtype, reason, m) +} + +// Helper function to append an event type and reason to the MockEventRecorder +// Only interested in type and reason of Events for now. If we are planning on verifying other fields in +// the test cases, we need to include them here. +func appendEvent(eventtype, reason string, m *MockEventRecorder) { + event := corev1.Event{ + Reason: reason, + Type: eventtype, + } + m.events = append(m.events, event) +} diff --git a/pkg/controller/testing/table.go b/pkg/controller/testing/table.go index a7691c59d3d..8075d406a02 100644 --- a/pkg/controller/testing/table.go +++ b/pkg/controller/testing/table.go @@ -19,9 +19,12 @@ package testing import ( "context" "fmt" + "reflect" "strings" "testing" + corev1 "k8s.io/api/core/v1" + "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/knative/pkg/apis" @@ -70,6 +73,10 @@ type TestCase struct { // after reconciliation completes. WantAbsent []runtime.Object + // WantEvent holds the list of events expected to exist after + // reconciliation completes. + WantEvent []corev1.Event + // Mocks that tamper with the client's responses. Mocks Mocks @@ -93,7 +100,7 @@ type TestCase struct { } // Runner returns a testing func that can be passed to t.Run. -func (tc *TestCase) Runner(t *testing.T, r reconcile.Reconciler, c *MockClient) func(t *testing.T) { +func (tc *TestCase) Runner(t *testing.T, r reconcile.Reconciler, c *MockClient, recorder *MockEventRecorder) func(t *testing.T) { return func(t *testing.T) { result, recErr := tc.Reconcile(r) @@ -116,6 +123,10 @@ func (tc *TestCase) Runner(t *testing.T, r reconcile.Reconciler, c *MockClient) t.Error(err) } + if err := tc.VerifyWantEvent(recorder); err != nil { + t.Error(err) + } + for _, av := range tc.AdditionalVerification { av(t, tc) } @@ -137,6 +148,11 @@ func (tc *TestCase) GetClient() *MockClient { return NewMockClient(innerClient, tc.Mocks) } +// GetEventRecorder returns the mockEventRecorder to use for this test case. +func (tc *TestCase) GetEventRecorder() *MockEventRecorder { + return NewEventRecorder() +} + // Reconcile calls the given reconciler's Reconcile() function with the test // case's reconcile request. func (tc *TestCase) Reconcile(r reconcile.Reconciler) (reconcile.Result, error) { @@ -265,6 +281,23 @@ func (tc *TestCase) VerifyWantAbsent(c client.Client) error { return nil } +// VerifyWantEvent verifies that the eventRecorder does contain the events +// expected in the same order as they were emitted after reconciliation. +func (tc *TestCase) VerifyWantEvent(eventRecorder *MockEventRecorder) error { + if !reflect.DeepEqual(tc.WantEvent, eventRecorder.events) { + return fmt.Errorf("expected %s, got %s", getEventsAsString(tc.WantEvent), getEventsAsString(eventRecorder.events)) + } + return nil +} + +func getEventsAsString(events []corev1.Event) []string { + eventsAsString := make([]string, 0, len(events)) + for _, event := range events { + eventsAsString = append(eventsAsString, fmt.Sprintf("(%s,%s)", event.Reason, event.Type)) + } + return eventsAsString +} + func buildAllObjects(objs []runtime.Object) []runtime.Object { builtObjs := []runtime.Object{} for _, obj := range objs { diff --git a/pkg/provisioners/gcppubsub/controller/channel/reconcile_test.go b/pkg/provisioners/gcppubsub/controller/channel/reconcile_test.go index 1435ac6534c..c50e8880dc2 100644 --- a/pkg/provisioners/gcppubsub/controller/channel/reconcile_test.go +++ b/pkg/provisioners/gcppubsub/controller/channel/reconcile_test.go @@ -43,7 +43,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" ) @@ -693,9 +692,10 @@ func TestReconcile(t *testing.T) { WantErrMsg: testErrorMessage, }, } - recorder := record.NewBroadcaster().NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) + for _, tc := range testCases { c := tc.GetClient() + recorder := tc.GetEventRecorder() r := &reconciler{ client: c, recorder: recorder, @@ -710,7 +710,7 @@ func TestReconcile(t *testing.T) { tc.ReconcileKey = fmt.Sprintf("/%s", cName) } tc.IgnoreTimes = true - t.Run(tc.Name, tc.Runner(t, r, c)) + t.Run(tc.Name, tc.Runner(t, r, c, recorder)) } } diff --git a/pkg/provisioners/gcppubsub/controller/clusterchannelprovisioner/reconcile_test.go b/pkg/provisioners/gcppubsub/controller/clusterchannelprovisioner/reconcile_test.go index c8f867ca85d..fdb33a8d6e2 100644 --- a/pkg/provisioners/gcppubsub/controller/clusterchannelprovisioner/reconcile_test.go +++ b/pkg/provisioners/gcppubsub/controller/clusterchannelprovisioner/reconcile_test.go @@ -30,7 +30,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" ) @@ -184,9 +183,10 @@ func TestReconcile(t *testing.T) { WantErrMsg: testErrorMessage, }, } - recorder := record.NewBroadcaster().NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) + for _, tc := range testCases { c := tc.GetClient() + recorder := tc.GetEventRecorder() r := &reconciler{ client: c, recorder: recorder, @@ -196,7 +196,7 @@ func TestReconcile(t *testing.T) { tc.ReconcileKey = fmt.Sprintf("/%s", Name) } tc.IgnoreTimes = true - t.Run(tc.Name, tc.Runner(t, r, c)) + t.Run(tc.Name, tc.Runner(t, r, c, recorder)) } } diff --git a/pkg/provisioners/gcppubsub/dispatcher/dispatcher/reconcile_test.go b/pkg/provisioners/gcppubsub/dispatcher/dispatcher/reconcile_test.go index 79420cd335e..491838d4367 100644 --- a/pkg/provisioners/gcppubsub/dispatcher/dispatcher/reconcile_test.go +++ b/pkg/provisioners/gcppubsub/dispatcher/dispatcher/reconcile_test.go @@ -45,7 +45,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" ) @@ -338,9 +337,10 @@ func TestReconcile(t *testing.T) { // Note - we do not test update status since this dispatcher only adds // finalizers to the channel } - recorder := record.NewBroadcaster().NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) + for _, tc := range testCases { c := tc.GetClient() + recorder := tc.GetEventRecorder() r := &reconciler{ client: c, recorder: recorder, @@ -383,7 +383,7 @@ func TestReconcile(t *testing.T) { } tc.AdditionalVerification = append(tc.AdditionalVerification, cc.verify) tc.IgnoreTimes = true - t.Run(tc.Name, tc.Runner(t, r, c)) + t.Run(tc.Name, tc.Runner(t, r, c, recorder)) } } diff --git a/pkg/provisioners/natss/controller/channel/reconcile_test.go b/pkg/provisioners/natss/controller/channel/reconcile_test.go index c245e28ba24..0dc4b341e24 100644 --- a/pkg/provisioners/natss/controller/channel/reconcile_test.go +++ b/pkg/provisioners/natss/controller/channel/reconcile_test.go @@ -30,7 +30,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -115,10 +114,10 @@ var testCases = []controllertesting.TestCase{ } func TestAllCases(t *testing.T) { - recorder := record.NewBroadcaster().NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) for _, tc := range testCases { c := tc.GetClient() + recorder := tc.GetEventRecorder() logger := provisioners.NewProvisionerLoggerFromConfig(provisioners.NewLoggingConfig()) r := &reconciler{ client: c, @@ -126,7 +125,7 @@ func TestAllCases(t *testing.T) { logger: logger.Desugar(), } t.Logf("Running test %s", tc.Name) - t.Run(tc.Name, tc.Runner(t, r, c)) + t.Run(tc.Name, tc.Runner(t, r, c, recorder)) } } diff --git a/pkg/provisioners/natss/controller/clusterchannelprovisioner/reconcile_test.go b/pkg/provisioners/natss/controller/clusterchannelprovisioner/reconcile_test.go index 95882ea60e4..fe6e4d4a8bd 100644 --- a/pkg/provisioners/natss/controller/clusterchannelprovisioner/reconcile_test.go +++ b/pkg/provisioners/natss/controller/clusterchannelprovisioner/reconcile_test.go @@ -31,7 +31,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -154,10 +153,10 @@ var testCases = []controllertesting.TestCase{ } func TestAllCases(t *testing.T) { - recorder := record.NewBroadcaster().NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) for _, tc := range testCases { c := tc.GetClient() + recorder := tc.GetEventRecorder() logger := provisioners.NewProvisionerLoggerFromConfig(provisioners.NewLoggingConfig()) r := &reconciler{ client: c, @@ -166,7 +165,7 @@ func TestAllCases(t *testing.T) { } tc.IgnoreTimes = true t.Logf("Running test %s", tc.Name) - t.Run(tc.Name, tc.Runner(t, r, c)) + t.Run(tc.Name, tc.Runner(t, r, c, recorder)) } }