Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emitting corev1 Events for debugging purposes #746

Merged
merged 12 commits into from
Jan 25, 2019
Merged
6 changes: 6 additions & 0 deletions config/provisioners/in-memory-channel/in-memory-channel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ rules:
- watch
- create
- update
- apiGroups:
- "" # Core API Group.
resources:
- events
verbs:
nachocano marked this conversation as resolved.
Show resolved Hide resolved
- create

---

Expand Down
5 changes: 2 additions & 3 deletions contrib/kafka/pkg/controller/channel/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -223,13 +222,13 @@ var testCases = []controllertesting.TestCase{
}

func TestAllCases(t *testing.T) {
recorder := record.NewBroadcaster().NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

for _, tc := range testCases {
tc.ReconcileKey = fmt.Sprintf("%s/%s", testNS, channelName)
tc.IgnoreTimes = true

c := tc.GetClient()
recorder := tc.GetEventRecorder()
logger := provisioners.NewProvisionerLoggerFromConfig(provisioners.NewLoggingConfig())
r := &reconciler{
client: c,
Expand All @@ -239,7 +238,7 @@ func TestAllCases(t *testing.T) {
kafkaClusterAdmin: &mockClusterAdmin{},
}
t.Logf("Running test %s", tc.Name)
t.Run(tc.Name, tc.Runner(t, r, c))
t.Run(tc.Name, tc.Runner(t, r, c, recorder))
}
}

Expand Down
5 changes: 2 additions & 3 deletions contrib/kafka/pkg/controller/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
Expand Down Expand Up @@ -132,10 +131,10 @@ var testCases = []controllertesting.TestCase{
}

func TestAllCases(t *testing.T) {
recorder := record.NewBroadcaster().NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

for _, tc := range testCases {
c := tc.GetClient()
recorder := tc.GetEventRecorder()
logger := provisioners.NewProvisionerLoggerFromConfig(provisioners.NewLoggingConfig())
r := &reconciler{
client: c,
Expand All @@ -144,7 +143,7 @@ func TestAllCases(t *testing.T) {
config: getControllerConfig(),
}
t.Logf("Running test %s", tc.Name)
t.Run(tc.Name, tc.Runner(t, r, c))
t.Run(tc.Name, tc.Runner(t, r, c, recorder))
}
}

Expand Down
16 changes: 15 additions & 1 deletion pkg/controller/eventing/inmemory/channel/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ import (

const (
finalizerName = controllerAgentName

// Name of the corev1.Events emitted from the reconciliation process
channelReconciled = "ChannelReconciled"
channelUpdateStatusFailed = "ChannelUpdateStatusFailed"
channelConfigSyncFailed = "ChannelConfigSyncFailed"
k8sServiceCreateFailed = "K8sServiceCreateFailed"
virtualServiceCreateFailed = "VirtualServiceCreateFailed"
)

type reconciler struct {
Expand Down Expand Up @@ -93,10 +100,14 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err
logger.Info("Error reconciling Channel", zap.Error(err))
// Note that we do not return the error here, because we want to update the Status
// regardless of the error.
} else {
logger.Info("Channel reconciled")
r.recorder.Eventf(c, corev1.EventTypeNormal, channelReconciled, "Channel reconciled: %q", c.Name)
}

if updateStatusErr := util.UpdateChannel(ctx, r.client, c); updateStatusErr != nil {
logger.Info("Error updating Channel Status", zap.Error(updateStatusErr))
r.recorder.Eventf(c, corev1.EventTypeWarning, channelUpdateStatusFailed, "Failed to update Channel's status: %v", err)
return reconcile.Result{}, updateStatusErr
}

Expand Down Expand Up @@ -124,7 +135,8 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel)

// We always need to sync the Channel config, so do it first.
if err := r.syncChannelConfig(ctx); err != nil {
logger.Info("Error updating syncing the Channel config", zap.Error(err))
logger.Info("Error syncing the Channel config", zap.Error(err))
r.recorder.Eventf(c, corev1.EventTypeWarning, channelConfigSyncFailed, "Failed to sync Channel config: %v", err)
return err
}

Expand All @@ -140,13 +152,15 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel)
svc, err := util.CreateK8sService(ctx, r.client, c)
if err != nil {
logger.Info("Error creating the Channel's K8s Service", zap.Error(err))
r.recorder.Eventf(c, corev1.EventTypeWarning, k8sServiceCreateFailed, "Failed to reconcile Channel's K8s Service: %v", err)
return err
}
c.Status.SetAddress(controller.ServiceHostName(svc.Name, svc.Namespace))

_, err = util.CreateVirtualService(ctx, r.client, c, svc)
if err != nil {
logger.Info("Error creating the Virtual Service for the Channel", zap.Error(err))
r.recorder.Eventf(c, corev1.EventTypeWarning, virtualServiceCreateFailed, "Failed to reconcile Virtual Service for the Channel: %v", err)
return err
}

Expand Down
60 changes: 57 additions & 3 deletions pkg/controller/eventing/inmemory/channel/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)
Expand Down Expand Up @@ -178,6 +177,15 @@ var (
},
},
}

// map of events to set test cases' expectations easier
events = map[string]corev1.Event{
channelReconciled: {Reason: channelReconciled, Type: corev1.EventTypeNormal},
channelUpdateStatusFailed: {Reason: channelUpdateStatusFailed, Type: corev1.EventTypeWarning},
channelConfigSyncFailed: {Reason: channelConfigSyncFailed, Type: corev1.EventTypeWarning},
k8sServiceCreateFailed: {Reason: k8sServiceCreateFailed, Type: corev1.EventTypeWarning},
virtualServiceCreateFailed: {Reason: virtualServiceCreateFailed, Type: corev1.EventTypeWarning},
}
)

func init() {
Expand Down Expand Up @@ -252,6 +260,9 @@ func TestReconcile(t *testing.T) {
makeDeletingChannel(),
},
WantErrMsg: testErrorMessage,
WantEvent: []corev1.Event{
events[channelConfigSyncFailed],
},
},
{
Name: "Channel deleted - finalizer removed",
Expand All @@ -261,6 +272,9 @@ func TestReconcile(t *testing.T) {
WantPresent: []runtime.Object{
makeDeletingChannelWithoutFinalizer(),
},
WantEvent: []corev1.Event{
events[channelReconciled],
},
},
{
Name: "Channel config sync fails - can't list Channels",
Expand All @@ -271,6 +285,9 @@ func TestReconcile(t *testing.T) {
MockLists: errorListingChannels(),
},
WantErrMsg: testErrorMessage,
WantEvent: []corev1.Event{
events[channelConfigSyncFailed],
},
},
{
Name: "Channel config sync fails - can't get ConfigMap",
Expand All @@ -281,6 +298,9 @@ func TestReconcile(t *testing.T) {
MockGets: errorGettingConfigMap(),
},
WantErrMsg: testErrorMessage,
WantEvent: []corev1.Event{
events[channelConfigSyncFailed],
},
},
{
Name: "Channel config sync fails - can't create ConfigMap",
Expand All @@ -291,6 +311,9 @@ func TestReconcile(t *testing.T) {
MockCreates: errorCreatingConfigMap(),
},
WantErrMsg: testErrorMessage,
WantEvent: []corev1.Event{
events[channelConfigSyncFailed],
},
},
{
Name: "Channel config sync fails - can't update ConfigMap",
Expand All @@ -302,6 +325,9 @@ func TestReconcile(t *testing.T) {
MockUpdates: errorUpdatingConfigMap(),
},
WantErrMsg: testErrorMessage,
WantEvent: []corev1.Event{
events[channelConfigSyncFailed],
},
},
{
Name: "K8s service get fails",
Expand All @@ -316,6 +342,9 @@ func TestReconcile(t *testing.T) {
makeChannelWithFinalizer(),
},
WantErrMsg: testErrorMessage,
WantEvent: []corev1.Event{
events[k8sServiceCreateFailed],
},
},
{
Name: "K8s service creation fails",
Expand All @@ -331,6 +360,9 @@ func TestReconcile(t *testing.T) {
makeChannelWithFinalizer(),
},
WantErrMsg: testErrorMessage,
WantEvent: []corev1.Event{
events[k8sServiceCreateFailed],
},
},
{
Name: "Virtual service get fails",
Expand All @@ -349,6 +381,9 @@ func TestReconcile(t *testing.T) {
makeChannelWithFinalizerAndAddress(),
},
WantErrMsg: testErrorMessage,
WantEvent: []corev1.Event{
events[virtualServiceCreateFailed],
},
},
{
Name: "Virtual service creation fails",
Expand All @@ -366,6 +401,9 @@ func TestReconcile(t *testing.T) {
makeChannelWithFinalizerAndAddress(),
},
WantErrMsg: testErrorMessage,
WantEvent: []corev1.Event{
events[virtualServiceCreateFailed],
},
},
{
Name: "Channel get for update fails",
Expand All @@ -379,6 +417,9 @@ func TestReconcile(t *testing.T) {
MockGets: errorOnSecondChannelGet(),
},
WantErrMsg: testErrorMessage,
WantEvent: []corev1.Event{
events[channelReconciled], events[channelUpdateStatusFailed],
},
},
{
Name: "Channel update fails",
Expand All @@ -392,6 +433,9 @@ func TestReconcile(t *testing.T) {
MockUpdates: errorUpdatingChannel(),
},
WantErrMsg: testErrorMessage,
WantEvent: []corev1.Event{
events[channelReconciled], events[channelUpdateStatusFailed],
},
}, {
Name: "Channel status update fails",
InitialState: []runtime.Object{
Expand All @@ -404,6 +448,9 @@ func TestReconcile(t *testing.T) {
MockStatusUpdates: errorUpdatingChannelStatus(),
},
WantErrMsg: testErrorMessage,
WantEvent: []corev1.Event{
events[channelReconciled], events[channelUpdateStatusFailed],
},
}, {
Name: "Channel reconcile successful - Channel list follows pagination",
InitialState: []runtime.Object{
Expand All @@ -424,6 +471,9 @@ func TestReconcile(t *testing.T) {
makeVirtualService(),
makeConfigMapWithVerifyConfigMapData(),
},
WantEvent: []corev1.Event{
events[channelReconciled],
},
},
{
Name: "Channel reconcile successful - Channel has no subscribers",
Expand Down Expand Up @@ -464,15 +514,19 @@ func TestReconcile(t *testing.T) {
makeVirtualService(),
makeConfigMapWithVerifyConfigMapData(),
},
WantEvent: []corev1.Event{
events[channelReconciled],
},
},
}
recorder := record.NewBroadcaster().NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

for _, tc := range testCases {
configMapKey := types.NamespacedName{
Namespace: cmNamespace,
Name: cmName,
}
c := tc.GetClient()
recorder := tc.GetEventRecorder()
r := &reconciler{
client: c,
recorder: recorder,
Expand All @@ -483,7 +537,7 @@ func TestReconcile(t *testing.T) {
tc.ReconcileKey = fmt.Sprintf("/%s", cName)
}
tc.IgnoreTimes = true
t.Run(tc.Name, tc.Runner(t, r, c))
t.Run(tc.Name, tc.Runner(t, r, c, recorder))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ const (

// Channel is the name of the Channel resource in eventing.knative.dev/v1alpha1.
Channel = "Channel"

// Name of the corev1.Events emitted from the reconciliation process
ccpReconciled = "CcpReconciled"
ccpUpdateStatusFailed = "CcpUpdateStatusFailed"
k8sServiceCreateFailed = "K8sServiceCreateFailed"
k8sServiceDeleteFailed = "K8sServiceDeleteFailed"
)

type reconciler struct {
Expand Down Expand Up @@ -98,10 +104,14 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err
logger.Info("Error reconciling ClusterChannelProvisioner", zap.Error(err))
// Note that we do not return the error here, because we want to update the Status
// regardless of the error.
} else {
logger.Info("ClusterChannelProvisioner reconciled")
r.recorder.Eventf(ccp, corev1.EventTypeNormal, ccpReconciled, "ClusterChannelProvisioner reconciled: %q", ccp.Name)
}

if updateStatusErr := util.UpdateClusterChannelProvisionerStatus(ctx, r.client, ccp); updateStatusErr != nil {
logger.Info("Error updating ClusterChannelProvisioner Status", zap.Error(updateStatusErr))
r.recorder.Eventf(ccp, corev1.EventTypeWarning, ccpUpdateStatusFailed, "Failed to update ClusterChannelProvisioner's status: %v", err)
return reconcile.Result{}, updateStatusErr
}

Expand Down Expand Up @@ -140,6 +150,7 @@ func (r *reconciler) reconcile(ctx context.Context, ccp *eventingv1alpha1.Cluste

if err != nil {
logger.Info("Error creating the ClusterChannelProvisioner's K8s Service", zap.Error(err))
r.recorder.Eventf(ccp, corev1.EventTypeWarning, k8sServiceCreateFailed, "Failed to reconcile ClusterChannelProvisioner's K8s Service: %v", err)
return err
}

Expand All @@ -153,6 +164,7 @@ func (r *reconciler) reconcile(ctx context.Context, ccp *eventingv1alpha1.Cluste
err = r.deleteOldDispatcherService(ctx, ccp)
if err != nil {
logger.Info("Error deleting the old ClusterChannelProvisioner's K8s Service", zap.Error(err))
r.recorder.Eventf(ccp, corev1.EventTypeWarning, k8sServiceDeleteFailed, "Failed to delete the old ClusterChannelProvisioner's K8s Service: %v", err)
return err
}

Expand Down
Loading