Skip to content

Commit

Permalink
Emitting corev1 Events for debugging purposes (#746)
Browse files Browse the repository at this point in the history
* Adding corev1.Events to in-memory-channel and subscription objects for
easier debugging.

* Adding mock eventRecorder

* Updating test cases. Removing unnecessary events on the success
condition

* Fixing UTs. Adding eventRecorder mock to tc.Runner invocations

* Changes after code review

* Boilerplate back to 2018. Should change it in another PR

* Setting initial capacity of slice

* Adding path verb to in-memory-channel-controller

* Adding update verb for events to in-memory-channel
  • Loading branch information
nachocano authored and knative-prow-robot committed Jan 25, 2019
1 parent ae89f61 commit 030370b
Show file tree
Hide file tree
Showing 16 changed files with 343 additions and 45 deletions.
8 changes: 8 additions & 0 deletions config/provisioners/in-memory-channel/in-memory-channel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ rules:
- watch
- create
- update
- apiGroups:
- "" # Core API Group.
resources:
- events
verbs:
- create
- patch
- update

---

Expand Down
5 changes: 2 additions & 3 deletions contrib/kafka/pkg/controller/channel/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -223,13 +222,13 @@ var testCases = []controllertesting.TestCase{
}

func TestAllCases(t *testing.T) {
recorder := record.NewBroadcaster().NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

for _, tc := range testCases {
tc.ReconcileKey = fmt.Sprintf("%s/%s", testNS, channelName)
tc.IgnoreTimes = true

c := tc.GetClient()
recorder := tc.GetEventRecorder()
logger := provisioners.NewProvisionerLoggerFromConfig(provisioners.NewLoggingConfig())
r := &reconciler{
client: c,
Expand All @@ -239,7 +238,7 @@ func TestAllCases(t *testing.T) {
kafkaClusterAdmin: &mockClusterAdmin{},
}
t.Logf("Running test %s", tc.Name)
t.Run(tc.Name, tc.Runner(t, r, c))
t.Run(tc.Name, tc.Runner(t, r, c, recorder))
}
}

Expand Down
5 changes: 2 additions & 3 deletions contrib/kafka/pkg/controller/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
Expand Down Expand Up @@ -132,10 +131,10 @@ var testCases = []controllertesting.TestCase{
}

func TestAllCases(t *testing.T) {
recorder := record.NewBroadcaster().NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

for _, tc := range testCases {
c := tc.GetClient()
recorder := tc.GetEventRecorder()
logger := provisioners.NewProvisionerLoggerFromConfig(provisioners.NewLoggingConfig())
r := &reconciler{
client: c,
Expand All @@ -144,7 +143,7 @@ func TestAllCases(t *testing.T) {
config: getControllerConfig(),
}
t.Logf("Running test %s", tc.Name)
t.Run(tc.Name, tc.Runner(t, r, c))
t.Run(tc.Name, tc.Runner(t, r, c, recorder))
}
}

Expand Down
16 changes: 15 additions & 1 deletion pkg/controller/eventing/inmemory/channel/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ import (

const (
finalizerName = controllerAgentName

// Name of the corev1.Events emitted from the reconciliation process
channelReconciled = "ChannelReconciled"
channelUpdateStatusFailed = "ChannelUpdateStatusFailed"
channelConfigSyncFailed = "ChannelConfigSyncFailed"
k8sServiceCreateFailed = "K8sServiceCreateFailed"
virtualServiceCreateFailed = "VirtualServiceCreateFailed"
)

type reconciler struct {
Expand Down Expand Up @@ -93,10 +100,14 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err
logger.Info("Error reconciling Channel", zap.Error(err))
// Note that we do not return the error here, because we want to update the Status
// regardless of the error.
} else {
logger.Info("Channel reconciled")
r.recorder.Eventf(c, corev1.EventTypeNormal, channelReconciled, "Channel reconciled: %q", c.Name)
}

if updateStatusErr := util.UpdateChannel(ctx, r.client, c); updateStatusErr != nil {
logger.Info("Error updating Channel Status", zap.Error(updateStatusErr))
r.recorder.Eventf(c, corev1.EventTypeWarning, channelUpdateStatusFailed, "Failed to update Channel's status: %v", err)
return reconcile.Result{}, updateStatusErr
}

Expand Down Expand Up @@ -124,7 +135,8 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel)

// We always need to sync the Channel config, so do it first.
if err := r.syncChannelConfig(ctx); err != nil {
logger.Info("Error updating syncing the Channel config", zap.Error(err))
logger.Info("Error syncing the Channel config", zap.Error(err))
r.recorder.Eventf(c, corev1.EventTypeWarning, channelConfigSyncFailed, "Failed to sync Channel config: %v", err)
return err
}

Expand All @@ -140,13 +152,15 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel)
svc, err := util.CreateK8sService(ctx, r.client, c)
if err != nil {
logger.Info("Error creating the Channel's K8s Service", zap.Error(err))
r.recorder.Eventf(c, corev1.EventTypeWarning, k8sServiceCreateFailed, "Failed to reconcile Channel's K8s Service: %v", err)
return err
}
c.Status.SetAddress(controller.ServiceHostName(svc.Name, svc.Namespace))

_, err = util.CreateVirtualService(ctx, r.client, c, svc)
if err != nil {
logger.Info("Error creating the Virtual Service for the Channel", zap.Error(err))
r.recorder.Eventf(c, corev1.EventTypeWarning, virtualServiceCreateFailed, "Failed to reconcile Virtual Service for the Channel: %v", err)
return err
}

Expand Down
60 changes: 57 additions & 3 deletions pkg/controller/eventing/inmemory/channel/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)
Expand Down Expand Up @@ -178,6 +177,15 @@ var (
},
},
}

// map of events to set test cases' expectations easier
events = map[string]corev1.Event{
channelReconciled: {Reason: channelReconciled, Type: corev1.EventTypeNormal},
channelUpdateStatusFailed: {Reason: channelUpdateStatusFailed, Type: corev1.EventTypeWarning},
channelConfigSyncFailed: {Reason: channelConfigSyncFailed, Type: corev1.EventTypeWarning},
k8sServiceCreateFailed: {Reason: k8sServiceCreateFailed, Type: corev1.EventTypeWarning},
virtualServiceCreateFailed: {Reason: virtualServiceCreateFailed, Type: corev1.EventTypeWarning},
}
)

func init() {
Expand Down Expand Up @@ -252,6 +260,9 @@ func TestReconcile(t *testing.T) {
makeDeletingChannel(),
},
WantErrMsg: testErrorMessage,
WantEvent: []corev1.Event{
events[channelConfigSyncFailed],
},
},
{
Name: "Channel deleted - finalizer removed",
Expand All @@ -261,6 +272,9 @@ func TestReconcile(t *testing.T) {
WantPresent: []runtime.Object{
makeDeletingChannelWithoutFinalizer(),
},
WantEvent: []corev1.Event{
events[channelReconciled],
},
},
{
Name: "Channel config sync fails - can't list Channels",
Expand All @@ -271,6 +285,9 @@ func TestReconcile(t *testing.T) {
MockLists: errorListingChannels(),
},
WantErrMsg: testErrorMessage,
WantEvent: []corev1.Event{
events[channelConfigSyncFailed],
},
},
{
Name: "Channel config sync fails - can't get ConfigMap",
Expand All @@ -281,6 +298,9 @@ func TestReconcile(t *testing.T) {
MockGets: errorGettingConfigMap(),
},
WantErrMsg: testErrorMessage,
WantEvent: []corev1.Event{
events[channelConfigSyncFailed],
},
},
{
Name: "Channel config sync fails - can't create ConfigMap",
Expand All @@ -291,6 +311,9 @@ func TestReconcile(t *testing.T) {
MockCreates: errorCreatingConfigMap(),
},
WantErrMsg: testErrorMessage,
WantEvent: []corev1.Event{
events[channelConfigSyncFailed],
},
},
{
Name: "Channel config sync fails - can't update ConfigMap",
Expand All @@ -302,6 +325,9 @@ func TestReconcile(t *testing.T) {
MockUpdates: errorUpdatingConfigMap(),
},
WantErrMsg: testErrorMessage,
WantEvent: []corev1.Event{
events[channelConfigSyncFailed],
},
},
{
Name: "K8s service get fails",
Expand All @@ -316,6 +342,9 @@ func TestReconcile(t *testing.T) {
makeChannelWithFinalizer(),
},
WantErrMsg: testErrorMessage,
WantEvent: []corev1.Event{
events[k8sServiceCreateFailed],
},
},
{
Name: "K8s service creation fails",
Expand All @@ -331,6 +360,9 @@ func TestReconcile(t *testing.T) {
makeChannelWithFinalizer(),
},
WantErrMsg: testErrorMessage,
WantEvent: []corev1.Event{
events[k8sServiceCreateFailed],
},
},
{
Name: "Virtual service get fails",
Expand All @@ -349,6 +381,9 @@ func TestReconcile(t *testing.T) {
makeChannelWithFinalizerAndAddress(),
},
WantErrMsg: testErrorMessage,
WantEvent: []corev1.Event{
events[virtualServiceCreateFailed],
},
},
{
Name: "Virtual service creation fails",
Expand All @@ -366,6 +401,9 @@ func TestReconcile(t *testing.T) {
makeChannelWithFinalizerAndAddress(),
},
WantErrMsg: testErrorMessage,
WantEvent: []corev1.Event{
events[virtualServiceCreateFailed],
},
},
{
Name: "Channel get for update fails",
Expand All @@ -379,6 +417,9 @@ func TestReconcile(t *testing.T) {
MockGets: errorOnSecondChannelGet(),
},
WantErrMsg: testErrorMessage,
WantEvent: []corev1.Event{
events[channelReconciled], events[channelUpdateStatusFailed],
},
},
{
Name: "Channel update fails",
Expand All @@ -392,6 +433,9 @@ func TestReconcile(t *testing.T) {
MockUpdates: errorUpdatingChannel(),
},
WantErrMsg: testErrorMessage,
WantEvent: []corev1.Event{
events[channelReconciled], events[channelUpdateStatusFailed],
},
}, {
Name: "Channel status update fails",
InitialState: []runtime.Object{
Expand All @@ -404,6 +448,9 @@ func TestReconcile(t *testing.T) {
MockStatusUpdates: errorUpdatingChannelStatus(),
},
WantErrMsg: testErrorMessage,
WantEvent: []corev1.Event{
events[channelReconciled], events[channelUpdateStatusFailed],
},
}, {
Name: "Channel reconcile successful - Channel list follows pagination",
InitialState: []runtime.Object{
Expand All @@ -424,6 +471,9 @@ func TestReconcile(t *testing.T) {
makeVirtualService(),
makeConfigMapWithVerifyConfigMapData(),
},
WantEvent: []corev1.Event{
events[channelReconciled],
},
},
{
Name: "Channel reconcile successful - Channel has no subscribers",
Expand Down Expand Up @@ -464,15 +514,19 @@ func TestReconcile(t *testing.T) {
makeVirtualService(),
makeConfigMapWithVerifyConfigMapData(),
},
WantEvent: []corev1.Event{
events[channelReconciled],
},
},
}
recorder := record.NewBroadcaster().NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

for _, tc := range testCases {
configMapKey := types.NamespacedName{
Namespace: cmNamespace,
Name: cmName,
}
c := tc.GetClient()
recorder := tc.GetEventRecorder()
r := &reconciler{
client: c,
recorder: recorder,
Expand All @@ -483,7 +537,7 @@ func TestReconcile(t *testing.T) {
tc.ReconcileKey = fmt.Sprintf("/%s", cName)
}
tc.IgnoreTimes = true
t.Run(tc.Name, tc.Runner(t, r, c))
t.Run(tc.Name, tc.Runner(t, r, c, recorder))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ const (

// Channel is the name of the Channel resource in eventing.knative.dev/v1alpha1.
Channel = "Channel"

// Name of the corev1.Events emitted from the reconciliation process
ccpReconciled = "CcpReconciled"
ccpUpdateStatusFailed = "CcpUpdateStatusFailed"
k8sServiceCreateFailed = "K8sServiceCreateFailed"
k8sServiceDeleteFailed = "K8sServiceDeleteFailed"
)

type reconciler struct {
Expand Down Expand Up @@ -98,10 +104,14 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err
logger.Info("Error reconciling ClusterChannelProvisioner", zap.Error(err))
// Note that we do not return the error here, because we want to update the Status
// regardless of the error.
} else {
logger.Info("ClusterChannelProvisioner reconciled")
r.recorder.Eventf(ccp, corev1.EventTypeNormal, ccpReconciled, "ClusterChannelProvisioner reconciled: %q", ccp.Name)
}

if updateStatusErr := util.UpdateClusterChannelProvisionerStatus(ctx, r.client, ccp); updateStatusErr != nil {
logger.Info("Error updating ClusterChannelProvisioner Status", zap.Error(updateStatusErr))
r.recorder.Eventf(ccp, corev1.EventTypeWarning, ccpUpdateStatusFailed, "Failed to update ClusterChannelProvisioner's status: %v", err)
return reconcile.Result{}, updateStatusErr
}

Expand Down Expand Up @@ -140,6 +150,7 @@ func (r *reconciler) reconcile(ctx context.Context, ccp *eventingv1alpha1.Cluste

if err != nil {
logger.Info("Error creating the ClusterChannelProvisioner's K8s Service", zap.Error(err))
r.recorder.Eventf(ccp, corev1.EventTypeWarning, k8sServiceCreateFailed, "Failed to reconcile ClusterChannelProvisioner's K8s Service: %v", err)
return err
}

Expand All @@ -153,6 +164,7 @@ func (r *reconciler) reconcile(ctx context.Context, ccp *eventingv1alpha1.Cluste
err = r.deleteOldDispatcherService(ctx, ccp)
if err != nil {
logger.Info("Error deleting the old ClusterChannelProvisioner's K8s Service", zap.Error(err))
r.recorder.Eventf(ccp, corev1.EventTypeWarning, k8sServiceDeleteFailed, "Failed to delete the old ClusterChannelProvisioner's K8s Service: %v", err)
return err
}

Expand Down
Loading

0 comments on commit 030370b

Please sign in to comment.