diff --git a/config/config-defaults.yaml b/config/config-defaults.yaml index 1c23c72f20a..052f39c87a9 100644 --- a/config/config-defaults.yaml +++ b/config/config-defaults.yaml @@ -54,4 +54,11 @@ data: # default-pod-template contains the default pod template to use # TaskRun and PipelineRun, if none is specified. If a pod template # is specified, the default pod template is ignored. - # default-pod-template: \ No newline at end of file + # default-pod-template: + + # default-cloud-events-sink contains the default CloudEvents sink to be + # used for TaskRun and PipelineRun, when no sink is specified. + # Note that right now it is still not possible to set a PipelineRun or + # TaskRun specific sink, so the default is the only option available. + # If no sink is specified, no CloudEvent is generated + # default-cloud-events-sink: \ No newline at end of file diff --git a/docs/events.md b/docs/events.md index dcdb46748ee..19e4dd97fa6 100644 --- a/docs/events.md +++ b/docs/events.md @@ -8,8 +8,10 @@ weight: 2 Tekton runtime resources, specifically `TaskRuns` and `PipelineRuns`, emit events when they are executed, so that users can monitor their lifecycle -and react to it. Tekton emits [kubernetes events](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#event-v1-core), that can be retrieve from the resource via -`kubectl describe [resource]`. +and react to it. + +Tekton emits [kubernetes events](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#event-v1-core), that can be retrieve from the resource via `kubectl describe [resource]`. +[Optionally](#CloudEvents) Tekton can emit [CloudEvents](https://github.com/cloudevents/spec) too. No events are emitted for `Conditions` today (https://github.com/tektoncd/pipeline/issues/2461). @@ -47,3 +49,24 @@ No events are emitted for `Conditions` today (https://github.com/tektoncd/pipeli successfully. Causes of failure may be: one the `Tasks` failed or the `PipelineRun` was cancelled or timed out. `Failed` events are also triggered in case the `PipelineRun` cannot be executed at all because of validation issues. + +# CloudEvents + +When a sink is [configured](../install.md#configuring-cloudevents-notifications), the following events +will be generated by appropriate controller when a lifecycle event happens for `TaskRun` or +`PipelineRun`. + +The complete table of events: + +Reasource |Event |Event Type +:-------------|:-------:|:---------------------------------------------------------- +TaskRun | Started | dev.tekton.event.taskrun.started.v1 +TaskRun | Running | dev.tekton.event.taskrun.runnning.v1 +TaskRun | Condition Change while Running | dev.tekton.event.taskrun.unknown.v1 +TaskRun | Succeed | dev.tekton.event.taskrun.successful.v1 +TaskRun | Failed | dev.tekton.event.taskrun.failed.v1 +PipelineRun | Started | dev.tekton.event.pipelinerun.started.v1 +PipelineRun | Running | dev.tekton.event.pipelinerun.runnning.v1 +PipelineRun | Condition Change while Running | dev.tekton.event.pipelinerun.unknown.v1 +PipelineRun | Succeed | dev.tekton.event.pipelinerun.successful.v1 +PipelineRun | Failed | dev.tekton.event.pipelinerun.failed.v1 diff --git a/docs/install.md b/docs/install.md index 57d10df40fc..e455c91a05e 100644 --- a/docs/install.md +++ b/docs/install.md @@ -235,6 +235,25 @@ data: bucket.service.account.field.name: GOOGLE_APPLICATION_CREDENTIALS ``` +## Configuring CloudEvents notifications + +When configured so, Tekton can generate `CloudEvents` for `TaskRun` and `PipelineRun` lifecycle +events. The only configuration parameter is the URL of the sink. When not set, no notification is +generared. + +``` +apiVersion: v1 +kind: ConfigMap +metadata: + name: config-defaults + namespace: tekton-pipelines + labels: + app.kubernetes.io/instance: default + app.kubernetes.io/part-of: tekton-pipelines +data: + default-cloud-events-sink: https://my-sink-url +``` + ## Customizing basic execution parameters You can specify your own values that replace the default service account (`ServiceAccount`), timeout (`Timeout`), and Pod template (`PodTemplate`) values used by Tekton Pipelines in `TaskRun` and `PipelineRun` definitions. To do so, modify the ConfigMap `config-defaults` with your desired values. diff --git a/pkg/apis/config/default.go b/pkg/apis/config/default.go index 4f89ca4bd45..04b30a10a82 100644 --- a/pkg/apis/config/default.go +++ b/pkg/apis/config/default.go @@ -35,6 +35,8 @@ const ( defaultManagedByLabelValueKey = "default-managed-by-label-value" DefaultManagedByLabelValue = "tekton-pipelines" defaultPodTemplateKey = "default-pod-template" + defaultCloudEventsSinkKey = "default-cloud-events-sink" + DefaultCloudEventSinkValue = "" ) // Defaults holds the default configurations @@ -44,6 +46,7 @@ type Defaults struct { DefaultServiceAccount string DefaultManagedByLabelValue string DefaultPodTemplate *pod.Template + DefaultCloudEventsSink string } // GetBucketConfigName returns the name of the configmap containing all @@ -68,7 +71,8 @@ func (cfg *Defaults) Equals(other *Defaults) bool { return other.DefaultTimeoutMinutes == cfg.DefaultTimeoutMinutes && other.DefaultServiceAccount == cfg.DefaultServiceAccount && other.DefaultManagedByLabelValue == cfg.DefaultManagedByLabelValue && - other.DefaultPodTemplate.Equals(cfg.DefaultPodTemplate) + other.DefaultPodTemplate.Equals(cfg.DefaultPodTemplate) && + other.DefaultCloudEventsSink == cfg.DefaultCloudEventsSink } // NewDefaultsFromMap returns a Config given a map corresponding to a ConfigMap @@ -76,6 +80,7 @@ func NewDefaultsFromMap(cfgMap map[string]string) (*Defaults, error) { tc := Defaults{ DefaultTimeoutMinutes: DefaultTimeoutMinutes, DefaultManagedByLabelValue: DefaultManagedByLabelValue, + DefaultCloudEventsSink: DefaultCloudEventSinkValue, } if defaultTimeoutMin, ok := cfgMap[defaultTimeoutMinutesKey]; ok { @@ -102,6 +107,10 @@ func NewDefaultsFromMap(cfgMap map[string]string) (*Defaults, error) { tc.DefaultPodTemplate = &podTemplate } + if defaultCloudEventsSink, ok := cfgMap[defaultCloudEventsSinkKey]; ok { + tc.DefaultCloudEventsSink = defaultCloudEventsSink + } + return &tc, nil } diff --git a/pkg/reconciler/events/cloudevent/cloud_event_controller.go b/pkg/reconciler/events/cloudevent/cloud_event_controller.go index 523520273b5..8d6db1d13d2 100644 --- a/pkg/reconciler/events/cloudevent/cloud_event_controller.go +++ b/pkg/reconciler/events/cloudevent/cloud_event_controller.go @@ -18,6 +18,7 @@ package cloudevent import ( "context" + "errors" "time" cloudevents "github.com/cloudevents/sdk-go/v2" @@ -26,7 +27,11 @@ import ( resource "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1" "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1/cloudevent" "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + controller "knative.dev/pkg/controller" + "knative.dev/pkg/logging" ) // InitializeCloudEvents initializes the CloudEvents part of the @@ -108,3 +113,40 @@ func SendCloudEvents(tr *v1beta1.TaskRun, ceclient CEClient, logger *zap.Sugared } return merr.ErrorOrNil() } + +// SendCloudEventWithRetries sends a cloud event for the specified resource. +// It does not block and it perform retries with backoff using the cloudevents +// sdk-go capabilities. +// It accepts a runtime.Object to avoid making objectWithCondition public since +// it's only used within the events/cloudevents packages. +func SendCloudEventWithRetries(ctx context.Context, object runtime.Object) error { + var ( + o objectWithCondition + ok bool + ) + if o, ok = object.(objectWithCondition); !ok { + return errors.New("Input object does not satisfy objectWithCondition") + } + logger := logging.FromContext(ctx) + ceClient := Get(ctx) + if ceClient == nil { + return errors.New("No cloud events client found in the context") + } + event, err := EventForObjectWithCondition(o) + if err != nil { + return err + } + + go func() { + if result := ceClient.Send(cloudevents.ContextWithRetriesExponentialBackoff(ctx, 10*time.Millisecond, 10), *event); !cloudevents.IsACK(result) { + logger.Warnf("Failed to send cloudevent: %s", result.Error()) + recorder := controller.GetEventRecorder(ctx) + if recorder == nil { + logger.Warnf("No recorder in context, cannot emit error event") + } + recorder.Event(object, corev1.EventTypeWarning, "Cloud Event Failure", result.Error()) + } + }() + + return nil +} diff --git a/pkg/reconciler/events/cloudevent/cloud_event_controller_test.go b/pkg/reconciler/events/cloudevent/cloud_event_controller_test.go index df462fa8488..a237c767835 100644 --- a/pkg/reconciler/events/cloudevent/cloud_event_controller_test.go +++ b/pkg/reconciler/events/cloudevent/cloud_event_controller_test.go @@ -17,7 +17,11 @@ limitations under the License. package cloudevent import ( + "context" + "fmt" + "strings" "testing" + "time" "github.com/google/go-cmp/cmp" tb "github.com/tektoncd/pipeline/internal/builder/v1beta1" @@ -25,8 +29,14 @@ import ( resourcev1alpha1 "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1" "github.com/tektoncd/pipeline/pkg/logging" "github.com/tektoncd/pipeline/test/diff" + "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/record" "knative.dev/pkg/apis" + duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1" + "knative.dev/pkg/controller" + rtesting "knative.dev/pkg/reconciler/testing" ) func TestCloudEventDeliveryFromTargets(t *testing.T) { @@ -283,3 +293,172 @@ func TestInitializeCloudEvents(t *testing.T) { }) } } + +func TestSendCloudEventWithRetries(t *testing.T) { + + objectStatus := duckv1beta1.Status{ + Conditions: []apis.Condition{{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionTrue, + }}, + } + + tests := []struct { + name string + clientBehaviour FakeClientBehaviour + object objectWithCondition + wantCEvent string + wantEvent string + }{{ + name: "test-send-cloud-event-taskrun", + clientBehaviour: FakeClientBehaviour{ + SendSuccessfully: true, + }, + object: &v1beta1.TaskRun{ + ObjectMeta: metav1.ObjectMeta{ + SelfLink: "/taskruns/test1", + }, + Status: v1beta1.TaskRunStatus{Status: objectStatus}, + }, + wantCEvent: "Validation: valid", + wantEvent: "", + }, { + name: "test-send-cloud-event-pipelinerun", + clientBehaviour: FakeClientBehaviour{ + SendSuccessfully: true, + }, + object: &v1beta1.PipelineRun{ + ObjectMeta: metav1.ObjectMeta{ + SelfLink: "/pipelineruns/test1", + }, + Status: v1beta1.PipelineRunStatus{Status: objectStatus}, + }, + wantCEvent: "Validation: valid", + wantEvent: "", + }, { + name: "test-send-cloud-event-failed", + clientBehaviour: FakeClientBehaviour{ + SendSuccessfully: false, + }, + object: &v1beta1.PipelineRun{ + Status: v1beta1.PipelineRunStatus{Status: objectStatus}, + }, + wantCEvent: "", + wantEvent: "Warning Cloud Event Failure", + }} + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx := setupFakeContext(t, tc.clientBehaviour, true) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + err := SendCloudEventWithRetries(ctx, tc.object) + if err != nil { + t.Fatalf("Unexpected error sending cloud events: %v", err) + } + ceClient := Get(ctx).(FakeClient) + err = checkCloudEvents(t, &ceClient, tc.name, tc.wantCEvent) + if err != nil { + t.Fatalf(err.Error()) + } + recorder := controller.GetEventRecorder(ctx).(*record.FakeRecorder) + err = checkEvents(t, recorder, tc.name, tc.wantEvent) + if err != nil { + t.Fatalf(err.Error()) + } + }) + } +} + +func TestSendCloudEventWithRetriesInvalid(t *testing.T) { + + tests := []struct { + name string + object objectWithCondition + wantCEvent string + wantEvent string + }{{ + name: "test-send-cloud-event-invalid-taskrun", + object: &v1beta1.TaskRun{ + Status: v1beta1.TaskRunStatus{}, + }, + wantCEvent: "Validation: valid", + wantEvent: "", + }, { + name: "test-send-cloud-event-pipelinerun", + object: &v1beta1.PipelineRun{ + Status: v1beta1.PipelineRunStatus{}, + }, + wantCEvent: "Validation: valid", + wantEvent: "", + }} + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx := setupFakeContext(t, FakeClientBehaviour{ + SendSuccessfully: true, + }, true) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + err := SendCloudEventWithRetries(ctx, tc.object) + if err == nil { + t.Fatalf("Expected an error sending cloud events for invalid object, got none") + } + }) + } +} + +func TestSendCloudEventWithRetriesNoClient(t *testing.T) { + + ctx := setupFakeContext(t, FakeClientBehaviour{}, false) + err := SendCloudEventWithRetries(ctx, &v1beta1.TaskRun{Status: v1beta1.TaskRunStatus{}}) + if err == nil { + t.Fatalf("Expected an error sending cloud events with no client in the context, got none") + } + if d := cmp.Diff("No cloud events client found in the context", err.Error()); d != "" { + t.Fatalf("Unexpected error message %s", diff.PrintWantGot(d)) + } +} + +func setupFakeContext(t *testing.T, behaviour FakeClientBehaviour, withClient bool) context.Context { + var ctx context.Context + ctx, _ = rtesting.SetupFakeContext(t) + if withClient { + ctx = WithClient(ctx, &behaviour) + } + return ctx +} + +func testLogger(t *testing.T) *zap.SugaredLogger { + logger, err := zap.NewDevelopment(zap.AddCaller()) + if err != nil { + t.Fatalf("failed to create logger: %s", err) + } + return logger.Sugar().Named(t.Name()) +} + +func eventFromChannel(c chan string, testName string, wantEvent string) error { + timer := time.NewTimer(1 * time.Second) + select { + case event := <-c: + if wantEvent == "" { + return fmt.Errorf("received event \"%s\" for %s but none expected", event, testName) + } + if !(strings.HasPrefix(event, wantEvent)) { + return fmt.Errorf("expected event \"%s\" but got \"%s\" instead for %s", wantEvent, event, testName) + } + case <-timer.C: + if wantEvent != "" { + return fmt.Errorf("received no events for %s but %s expected", testName, wantEvent) + } + } + return nil +} + +func checkEvents(t *testing.T, fr *record.FakeRecorder, testName string, wantEvent string) error { + t.Helper() + return eventFromChannel(fr.Events, testName, wantEvent) +} + +func checkCloudEvents(t *testing.T, fce *FakeClient, testName string, wantEvent string) error { + t.Helper() + return eventFromChannel(fce.Events, testName, wantEvent) +} diff --git a/pkg/reconciler/events/cloudevent/cloudevent.go b/pkg/reconciler/events/cloudevent/cloudevent.go index 16634741907..1bff4d4b852 100644 --- a/pkg/reconciler/events/cloudevent/cloudevent.go +++ b/pkg/reconciler/events/cloudevent/cloudevent.go @@ -133,6 +133,9 @@ func EventForPipelineRun(pipelineRun *v1beta1.PipelineRun) (*cloudevents.Event, func getEventType(runObject objectWithCondition) (*TektonEventType, error) { c := runObject.GetStatusCondition().GetCondition(apis.ConditionSucceeded) + if c == nil { + return nil, fmt.Errorf("no condition for ConditionSucceeded in %T", runObject) + } var eventType TektonEventType switch { case c.IsUnknown(): diff --git a/pkg/reconciler/events/cloudevent/cloudeventclient.go b/pkg/reconciler/events/cloudevent/cloudeventclient.go index 372f8db4ad5..1cd69b16bfc 100644 --- a/pkg/reconciler/events/cloudevent/cloudeventclient.go +++ b/pkg/reconciler/events/cloudevent/cloudeventclient.go @@ -34,7 +34,9 @@ type CECKey struct{} func withCloudEventClient(ctx context.Context, cfg *rest.Config) context.Context { logger := logging.FromContext(ctx) + p, err := cloudevents.NewHTTP() + if err != nil { logger.Panicf("Error creating the cloudevents http protocol: %s", err) return ctx @@ -52,8 +54,14 @@ func withCloudEventClient(ctx context.Context, cfg *rest.Config) context.Context func Get(ctx context.Context) CEClient { untyped := ctx.Value(CECKey{}) if untyped == nil { - logging.FromContext(ctx).Fatalf( - "Unable to fetch %T from context.", (CEClient)(nil)) + logging.FromContext(ctx).Errorf( + "Unable to fetch client from context.") + return nil } return untyped.(CEClient) } + +// ToContext adds the cloud events client to the context +func ToContext(ctx context.Context, cec CEClient) context.Context { + return context.WithValue(ctx, CECKey{}, cec) +} diff --git a/pkg/reconciler/events/cloudevent/cloudeventsfakeclient.go b/pkg/reconciler/events/cloudevent/cloudeventsfakeclient.go index a2665952b3d..252ba72be7e 100644 --- a/pkg/reconciler/events/cloudevent/cloudeventsfakeclient.go +++ b/pkg/reconciler/events/cloudevent/cloudeventsfakeclient.go @@ -24,44 +24,47 @@ import ( "github.com/cloudevents/sdk-go/v2/protocol" ) +const bufferSize = 100 + // FakeClientBehaviour defines how the client will behave type FakeClientBehaviour struct { SendSuccessfully bool } // FakeClient is a fake CloudEvent client for unit testing -// Holding a pointer to the behaviour allows to change the behaviour of a client +// Holding pointer to the behaviour allows to change the behaviour of a client type FakeClient struct { behaviour *FakeClientBehaviour - event cloudevents.Event + // Modelled after k8s.io/client-go fake recorder + Events chan string } // NewFakeClient is a FakeClient factory, it returns a client for the target func NewFakeClient(behaviour *FakeClientBehaviour) cloudevents.Client { - c := FakeClient{ + return FakeClient{ behaviour: behaviour, + Events: make(chan string, bufferSize), } - return c } var _ cloudevents.Client = (*FakeClient)(nil) // Send fakes the Send method from cloudevents.Client func (c FakeClient) Send(ctx context.Context, event cloudevents.Event) protocol.Result { - c.event = event if c.behaviour.SendSuccessfully { + c.Events <- fmt.Sprintf("%s", event.String()) return nil } - return fmt.Errorf("%s had to fail", event.ID()) + return fmt.Errorf("Had to fail. Event ID: %s", event.ID()) } // Request fakes the Request method from cloudevents.Client func (c FakeClient) Request(ctx context.Context, event cloudevents.Event) (*cloudevents.Event, protocol.Result) { - c.event = event if c.behaviour.SendSuccessfully { + c.Events <- fmt.Sprintf("%v", event.String()) return &event, nil } - return nil, fmt.Errorf("%s had to fail", event.ID()) + return nil, fmt.Errorf("Had to fail. Event ID: %s", event.ID()) } // StartReceiver fakes StartReceiver method from cloudevents.Client diff --git a/pkg/reconciler/events/cloudevent/interface.go b/pkg/reconciler/events/cloudevent/interface.go index de168fd9dbb..b58ad479d9b 100644 --- a/pkg/reconciler/events/cloudevent/interface.go +++ b/pkg/reconciler/events/cloudevent/interface.go @@ -18,12 +18,16 @@ package cloudevent import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "knative.dev/pkg/apis" ) // objectWithCondition is implemented by TaskRun and PipelineRun type objectWithCondition interface { + // Object requires GetObjectKind() and DeepCopyObject() + runtime.Object + // ObjectMetaAccessor requires a GetObjectMeta that returns the ObjectMeta metav1.ObjectMetaAccessor diff --git a/pkg/reconciler/events/event.go b/pkg/reconciler/events/event.go index 4571a3e54c7..54d0d1ede9c 100644 --- a/pkg/reconciler/events/event.go +++ b/pkg/reconciler/events/event.go @@ -16,11 +16,18 @@ limitations under the License. package events import ( + "context" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/tektoncd/pipeline/pkg/apis/config" + "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" "knative.dev/pkg/apis" + "knative.dev/pkg/controller" + "knative.dev/pkg/logging" ) const ( @@ -34,16 +41,39 @@ const ( EventReasonError = "Error" ) -// Emit emits an event for object if afterCondition is different from beforeCondition -// -// Status "ConditionUnknown": -// beforeCondition == nil, emit EventReasonStarted -// beforeCondition != nil, emit afterCondition.Reason -// -// Status "ConditionTrue": emit EventReasonSucceded -// Status "ConditionFalse": emit EventReasonFailed +// Emit emits events for object +// Two types of events are supported, k8s and cloud events. // -func Emit(c record.EventRecorder, beforeCondition *apis.Condition, afterCondition *apis.Condition, object runtime.Object) { +// k8s events are always sent if afterCondition is different from beforeCondition +// Cloud events are always sent if enabled, i.e. if a sink is available +func Emit(ctx context.Context, beforeCondition *apis.Condition, afterCondition *apis.Condition, object runtime.Object) { + recorder := controller.GetEventRecorder(ctx) + logger := logging.FromContext(ctx) + configs := config.FromContextOrDefaults(ctx) + sendCloudEvents := (configs.Defaults.DefaultCloudEventsSink != "") + if sendCloudEvents { + ctx = cloudevents.ContextWithTarget(ctx, configs.Defaults.DefaultCloudEventsSink) + } + + sendKubernetesEvents(recorder, beforeCondition, afterCondition, object) + + if sendCloudEvents { + err := cloudevent.SendCloudEventWithRetries(ctx, object) + if err != nil { + logger.Warnf("Failed to emit cloud events %v", err.Error()) + } + } +} + +func sendKubernetesEvents(c record.EventRecorder, beforeCondition *apis.Condition, afterCondition *apis.Condition, object runtime.Object) { + // Events that are going to be sent + // + // Status "ConditionUnknown": + // beforeCondition == nil, emit EventReasonStarted + // beforeCondition != nil, emit afterCondition.Reason + // + // Status "ConditionTrue": emit EventReasonSucceded + // Status "ConditionFalse": emit EventReasonFailed if !equality.Semantic.DeepEqual(beforeCondition, afterCondition) && afterCondition != nil { // If the condition changed, and the target condition is not empty, we send an event switch afterCondition.Status { @@ -82,8 +112,8 @@ func EmitError(c record.EventRecorder, err error, object runtime.Object) { // Status "ConditionTrue": emit EventReasonSucceded // Status "ConditionFalse": emit EventReasonFailed // Deprecated: use Emit -func EmitEvent(c record.EventRecorder, beforeCondition *apis.Condition, afterCondition *apis.Condition, object runtime.Object) { - Emit(c, beforeCondition, afterCondition, object) +func EmitEvent(ctx context.Context, beforeCondition *apis.Condition, afterCondition *apis.Condition, object runtime.Object) { + Emit(ctx, beforeCondition, afterCondition, object) } // EmitErrorEvent emits a failure associated to an error diff --git a/pkg/reconciler/events/event_test.go b/pkg/reconciler/events/event_test.go index 2c605d3c4da..9ba637259c3 100644 --- a/pkg/reconciler/events/event_test.go +++ b/pkg/reconciler/events/event_test.go @@ -19,17 +19,23 @@ package events import ( "errors" "fmt" - "strings" + "regexp" "testing" "time" + "github.com/tektoncd/pipeline/pkg/apis/config" + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" + "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/record" "knative.dev/pkg/apis" + duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1" + "knative.dev/pkg/controller" + rtesting "knative.dev/pkg/reconciler/testing" ) -func TestEmit(t *testing.T) { +func TestSendKubernetesEvents(t *testing.T) { testcases := []struct { name string before *apis.Condition @@ -136,7 +142,7 @@ func TestEmit(t *testing.T) { for _, ts := range testcases { fr := record.NewFakeRecorder(1) tr := &corev1.Pod{} - Emit(fr, ts.before, ts.after, tr) + sendKubernetesEvents(fr, ts.before, ts.after, tr) err := checkEvents(t, fr, ts.name, ts.wantEvent) if err != nil { @@ -172,16 +178,85 @@ func TestEmitError(t *testing.T) { } } -func checkEvents(t *testing.T, fr *record.FakeRecorder, testName string, wantEvent string) error { - t.Helper() +func TestEmit(t *testing.T) { + objectStatus := duckv1beta1.Status{ + Conditions: []apis.Condition{{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionUnknown, + Reason: v1beta1.PipelineRunReasonStarted.String(), + }}, + } + object := &v1beta1.PipelineRun{ + ObjectMeta: metav1.ObjectMeta{ + SelfLink: "/pipelineruns/test1", + }, + Status: v1beta1.PipelineRunStatus{Status: objectStatus}, + } + after := &apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionUnknown, + Message: "just starting", + } + testcases := []struct { + name string + data map[string]string + wantEvent string + wantCloudEvent string + }{{ + name: "without sink", + data: map[string]string{}, + wantEvent: "Normal Started", + wantCloudEvent: "", + }, { + name: "with empty string sink", + data: map[string]string{"default-cloud-events-sink": ""}, + wantEvent: "Normal Started", + wantCloudEvent: "", + }, { + name: "with sink", + data: map[string]string{"default-cloud-events-sink": "http://mysink"}, + wantEvent: "Normal Started", + wantCloudEvent: `(?s)dev.tekton.event.pipelinerun.started.v1.*test1`, + }} + + for _, tc := range testcases { + // Setup the context and seed test data + ctx, _ := rtesting.SetupFakeContext(t) + ctx = cloudevent.WithClient(ctx, &cloudevent.FakeClientBehaviour{SendSuccessfully: true}) + fakeClient := cloudevent.Get(ctx).(cloudevent.FakeClient) + + // Setup the config and add it to the context + defaults, _ := config.NewDefaultsFromMap(tc.data) + featureFlags, _ := config.NewFeatureFlagsFromMap(map[string]string{}) + cfg := &config.Config{ + Defaults: defaults, + FeatureFlags: featureFlags, + } + ctx = config.ToContext(ctx, cfg) + + recorder := controller.GetEventRecorder(ctx).(*record.FakeRecorder) + Emit(ctx, nil, after, object) + if err := checkEvents(t, recorder, tc.name, tc.wantEvent); err != nil { + t.Fatalf(err.Error()) + } + if err := checkCloudEvents(t, &fakeClient, tc.name, tc.wantCloudEvent); err != nil { + t.Fatalf(err.Error()) + } + } +} + +func eventFromChannel(c chan string, testName string, wantEvent string) error { timer := time.NewTimer(1 * time.Second) select { - case event := <-fr.Events: + case event := <-c: if wantEvent == "" { return fmt.Errorf("received event \"%s\" for %s but none expected", event, testName) } - if !(strings.HasPrefix(event, wantEvent)) { - return fmt.Errorf("expected event \"%s\" but got \"%s\" instead for %s", wantEvent, event, testName) + matching, err := regexp.MatchString(wantEvent, event) + if err == nil { + if !matching { + return fmt.Errorf("expected event \"%s\" but got \"%s\" instead for %s", wantEvent, event, testName) + } } case <-timer.C: if wantEvent != "" { @@ -190,3 +265,13 @@ func checkEvents(t *testing.T, fr *record.FakeRecorder, testName string, wantEve } return nil } + +func checkEvents(t *testing.T, fr *record.FakeRecorder, testName string, wantEvent string) error { + t.Helper() + return eventFromChannel(fr.Events, testName, wantEvent) +} + +func checkCloudEvents(t *testing.T, fce *cloudevent.FakeClient, testName string, wantEvent string) error { + t.Helper() + return eventFromChannel(fce.Events, testName, wantEvent) +} diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index 1730afbe7d0..dc440e2abf8 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -150,7 +150,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun) // We also want to send the "Started" event as soon as possible for anyone who may be waiting // on the event to perform user facing initialisations, such has reset a CI check status afterCondition := pr.Status.GetCondition(apis.ConditionSucceeded) - events.Emit(controller.GetEventRecorder(ctx), nil, afterCondition, pr) + events.Emit(ctx, nil, afterCondition, pr) // We already sent an event for start, so update `before` with the current status before = pr.Status.GetCondition(apis.ConditionSucceeded) @@ -213,15 +213,14 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun) } func (c *Reconciler) finishReconcileUpdateEmitEvents(ctx context.Context, pr *v1beta1.PipelineRun, beforeCondition *apis.Condition, previousError error) error { - recorder := controller.GetEventRecorder(ctx) logger := logging.FromContext(ctx) afterCondition := pr.Status.GetCondition(apis.ConditionSucceeded) - events.Emit(recorder, beforeCondition, afterCondition, pr) + events.Emit(ctx, beforeCondition, afterCondition, pr) _, err := c.updateLabelsAndAnnotations(pr) if err != nil { logger.Warn("Failed to update PipelineRun labels/annotations", zap.Error(err)) - events.EmitError(recorder, err, pr) + events.EmitError(controller.GetEventRecorder(ctx), err, pr) } merr := multierror.Append(previousError, err).ErrorOrNil() diff --git a/pkg/reconciler/pipelinerun/pipelinerun_test.go b/pkg/reconciler/pipelinerun/pipelinerun_test.go index 8ebe3c1c110..130361f4b1b 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun_test.go +++ b/pkg/reconciler/pipelinerun/pipelinerun_test.go @@ -32,6 +32,7 @@ import ( "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" resourcev1alpha1 "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1" + "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent" "github.com/tektoncd/pipeline/pkg/reconciler/pipelinerun/resources" taskrunresources "github.com/tektoncd/pipeline/pkg/reconciler/taskrun/resources" ttesting "github.com/tektoncd/pipeline/pkg/reconciler/testing" @@ -102,8 +103,18 @@ func conditionCheckFromTaskRun(tr *v1beta1.TaskRun) *v1beta1.ConditionCheck { return &cc } -func checkEvents(fr *record.FakeRecorder, testName string, wantEvents []string) error { - // The fake recorder runs in a go routine, so the timeout is here to avoid waiting +func checkEvents(t *testing.T, fr *record.FakeRecorder, testName string, wantEvents []string) error { + t.Helper() + return eventFromChannel(fr.Events, testName, wantEvents) +} + +func checkCloudEvents(t *testing.T, fce *cloudevent.FakeClient, testName string, wantEvents []string) error { + t.Helper() + return eventFromChannel(fce.Events, testName, wantEvents) +} + +func eventFromChannel(c chan string, testName string, wantEvents []string) error { + // We get events from a channel, so the timeout is here to avoid waiting // on the channel forever if fewer than expected events are received. // We only hit the timeout in case of failure of the test, so the actual value // of the timeout is not so relevant, it's only used when tests are going to fail. @@ -115,18 +126,23 @@ func checkEvents(fr *record.FakeRecorder, testName string, wantEvents []string) // we exit the loop. If we never receive enough events, the timeout takes us // out of the loop. select { - case event := <-fr.Events: + case event := <-c: foundEvents = append(foundEvents, event) if ii > len(wantEvents)-1 { - return fmt.Errorf("Received event \"%s\" for %s but not more expected", event, testName) + return fmt.Errorf("received event \"%s\" for %s but not more expected", event, testName) } wantEvent := wantEvents[ii] - if !(strings.HasPrefix(event, wantEvent)) { - return fmt.Errorf("Expected event \"%s\" but got \"%s\" instead for %s", wantEvent, event, testName) + matching, err := regexp.MatchString(wantEvent, event) + if err == nil { + if !matching { + return fmt.Errorf("expected event \"%s\" but got \"%s\" instead for %s", wantEvent, event, testName) + } + } else { + return fmt.Errorf("something went wrong matching the event: %s", err) } case <-timer.C: if len(foundEvents) > len(wantEvents) { - return fmt.Errorf("Received %d events for %s but %d expected. Found events: %#v", len(foundEvents), testName, len(wantEvents), foundEvents) + return fmt.Errorf("received %d events for %s but %d expected. Found events: %#v", len(foundEvents), testName, len(wantEvents), foundEvents) } } } @@ -339,7 +355,7 @@ func TestReconcile(t *testing.T) { "Normal Started", "Normal Running Tasks Completed: 0", } - err = checkEvents(testAssets.Recorder, "test-pipeline-run-success", wantEvents) + err = checkEvents(t, testAssets.Recorder, "test-pipeline-run-success", wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -434,7 +450,7 @@ func TestReconcile_PipelineSpecTaskSpec(t *testing.T) { "Normal Started", "Normal Running Tasks Completed: 0", } - err = checkEvents(testAssets.Recorder, "test-pipeline-run-success", wantEvents) + err = checkEvents(t, testAssets.Recorder, "test-pipeline-run-success", wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -612,7 +628,7 @@ func TestReconcile_InvalidPipelineRuns(t *testing.T) { permanentError: true, wantEvents: []string{ "Normal Started", - "Warning Failed Pipeline foo/embedded-pipeline-invalid can't be Run; it has an invalid spec: invalid value \"bad-t@$k\"", + "Warning Failed Pipeline foo/embedded-pipeline-invalid can't be Run; it has an invalid spec", }, }, { name: "invalid-embedded-pipeline-mismatching-parameter-types", @@ -708,7 +724,7 @@ func TestReconcile_InvalidPipelineRuns(t *testing.T) { } // Check generated events match what's expected wantEvents := append(tc.wantEvents, "Warning InternalError 1 error occurred") - err = checkEvents(testAssets.Recorder, tc.pipelineRun.Name, wantEvents) + err = checkEvents(t, testAssets.Recorder, tc.pipelineRun.Name, wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -1058,7 +1074,7 @@ func TestReconcileOnCompletedPipelineRun(t *testing.T) { wantEvents := []string{ "Normal Succeeded All Tasks have completed executing", } - err = checkEvents(testAssets.Recorder, "test-pipeline-run-completed", wantEvents) + err = checkEvents(t, testAssets.Recorder, "test-pipeline-run-completed", wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -1125,7 +1141,7 @@ func TestReconcileOnCancelledPipelineRun(t *testing.T) { wantEvents := []string{ "Warning Failed PipelineRun \"test-pipeline-run-cancelled\" was cancelled", } - err = checkEvents(testAssets.Recorder, "test-pipeline-run-cancelled", wantEvents) + err = checkEvents(t, testAssets.Recorder, "test-pipeline-run-cancelled", wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -1192,7 +1208,7 @@ func TestReconcileWithTimeout(t *testing.T) { wantEvents := []string{ "Warning Failed PipelineRun \"test-pipeline-run-with-timeout\" failed to finish within \"12h0m0s\"", } - err = checkEvents(testAssets.Recorder, "test-pipeline-run-with-timeout", wantEvents) + err = checkEvents(t, testAssets.Recorder, "test-pipeline-run-with-timeout", wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -1316,7 +1332,7 @@ func TestReconcileCancelledFailsTaskRunCancellation(t *testing.T) { "Normal PipelineRunCouldntCancel PipelineRun \"test-pipeline-fails-to-cancel\" was cancelled but had errors trying to cancel TaskRuns", "Warning InternalError 1 error occurred", } - err = checkEvents(testAssets.Recorder, prName, wantEvents) + err = checkEvents(t, testAssets.Recorder, prName, wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -1376,7 +1392,7 @@ func TestReconcileCancelledPipelineRun(t *testing.T) { wantEvents := []string{ "Warning Failed PipelineRun \"test-pipeline-run-cancelled\" was cancelled", } - err = checkEvents(testAssets.Recorder, "test-pipeline-run-cancelled", wantEvents) + err = checkEvents(t, testAssets.Recorder, "test-pipeline-run-cancelled", wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -1636,7 +1652,7 @@ func TestReconcileWithTimeoutAndRetry(t *testing.T) { if status := reconciledRun.Status.TaskRuns["hello-world-1"].Status.GetCondition(apis.ConditionSucceeded).Status; status != tc.conditionSucceeded { t.Fatalf("Succeeded expected to be %s but is %s", tc.conditionSucceeded, status) } - err = checkEvents(testAssets.Recorder, prs[0].Name, tc.wantEvents) + err = checkEvents(t, testAssets.Recorder, prs[0].Name, tc.wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -1967,9 +1983,9 @@ func TestReconcileWithConditionChecks(t *testing.T) { wantEvents := []string{ "Normal Started", - "Normal Running Tasks Completed: 0 (Failed: 0, Cancelled 0), Incomplete: 1, Skipped: 0", + "Normal Running Tasks Completed: 0 \\(Failed: 0, Cancelled 0\\), Incomplete: 1, Skipped: 0", } - err = checkEvents(testAssets.Recorder, "test-pipeline-run-completed", wantEvents) + err = checkEvents(t, testAssets.Recorder, "test-pipeline-run-completed", wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -2099,9 +2115,9 @@ func TestReconcileWithFailingConditionChecks(t *testing.T) { wantEvents := []string{ "Normal Started", - "Normal Running Tasks Completed: 1 (Failed: 0, Cancelled 0), Incomplete: 1, Skipped: 1", + "Normal Running Tasks Completed: 1 \\(Failed: 0, Cancelled 0\\), Incomplete: 1, Skipped: 1", } - err = checkEvents(testAssets.Recorder, "test-pipeline-run-completed", wantEvents) + err = checkEvents(t, testAssets.Recorder, "test-pipeline-run-completed", wantEvents) if !(err == nil) { t.Errorf(err.Error()) } diff --git a/pkg/reconciler/taskrun/taskrun.go b/pkg/reconciler/taskrun/taskrun.go index 06650cef615..f8c4a1c56a6 100644 --- a/pkg/reconciler/taskrun/taskrun.go +++ b/pkg/reconciler/taskrun/taskrun.go @@ -87,7 +87,8 @@ var _ taskrunreconciler.Interface = (*Reconciler)(nil) // resource with the current status of the resource. func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1beta1.TaskRun) pkgreconciler.Event { logger := logging.FromContext(ctx) - recorder := controller.GetEventRecorder(ctx) + ctx = cloudevent.ToContext(ctx, c.cloudEventClient) + // Read the initial condition before := tr.Status.GetCondition(apis.ConditionSucceeded) @@ -106,7 +107,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1beta1.TaskRun) pkg // We also want to send the "Started" event as soon as possible for anyone who may be waiting // on the event to perform user facing initialisations, such has reset a CI check status afterCondition := tr.Status.GetCondition(apis.ConditionSucceeded) - events.Emit(recorder, nil, afterCondition, tr) + events.Emit(ctx, nil, afterCondition, tr) } // If the TaskRun is complete, run some post run fixtures when applicable @@ -196,12 +197,15 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1beta1.TaskRun) pkg } func (c *Reconciler) finishReconcileUpdateEmitEvents(ctx context.Context, tr *v1beta1.TaskRun, beforeCondition *apis.Condition, previousError error) error { - recorder := controller.GetEventRecorder(ctx) - afterCondition := tr.Status.GetCondition(apis.ConditionSucceeded) - events.Emit(recorder, beforeCondition, afterCondition, tr) + + // Send k8s events and cloud events (when configured) + events.Emit(ctx, beforeCondition, afterCondition, tr) + _, err := c.updateLabelsAndAnnotations(tr) - events.EmitError(recorder, err, tr) + if err != nil { + events.EmitError(controller.GetEventRecorder(ctx), err, tr) + } if controller.IsPermanentError(previousError) { return controller.NewPermanentError(multierror.Append(previousError, err)) } diff --git a/pkg/reconciler/taskrun/taskrun_test.go b/pkg/reconciler/taskrun/taskrun_test.go index a11db4f5217..15010cd6eac 100644 --- a/pkg/reconciler/taskrun/taskrun_test.go +++ b/pkg/reconciler/taskrun/taskrun_test.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "regexp" "strings" "testing" "time" @@ -313,8 +314,18 @@ func getTaskRunController(t *testing.T, d test.Data) (test.Assets, func()) { }, cancel } -func checkEvents(fr *record.FakeRecorder, testName string, wantEvents []string) error { - // The fake recorder runs in a go routine, so the timeout is here to avoid waiting +func checkEvents(t *testing.T, fr *record.FakeRecorder, testName string, wantEvents []string) error { + t.Helper() + return eventFromChannel(fr.Events, testName, wantEvents) +} + +func checkCloudEvents(t *testing.T, fce *cloudevent.FakeClient, testName string, wantEvents []string) error { + t.Helper() + return eventFromChannel(fce.Events, testName, wantEvents) +} + +func eventFromChannel(c chan string, testName string, wantEvents []string) error { + // We get events from a channel, so the timeout is here to avoid waiting // on the channel forever if fewer than expected events are received. // We only hit the timeout in case of failure of the test, so the actual value // of the timeout is not so relevant, it's only used when tests are going to fail. @@ -326,18 +337,23 @@ func checkEvents(fr *record.FakeRecorder, testName string, wantEvents []string) // we exit the loop. If we never receive enough events, the timeout takes us // out of the loop. select { - case event := <-fr.Events: + case event := <-c: foundEvents = append(foundEvents, event) if ii > len(wantEvents)-1 { - return fmt.Errorf("Received event \"%s\" for %s but not more expected", event, testName) + return fmt.Errorf("received event \"%s\" for %s but not more expected", event, testName) } wantEvent := wantEvents[ii] - if !(strings.HasPrefix(event, wantEvent)) { - return fmt.Errorf("Expected event \"%s\" but got \"%s\" instead for %s", wantEvent, event, testName) + matching, err := regexp.MatchString(wantEvent, event) + if err == nil { + if !matching { + return fmt.Errorf("expected event \"%s\" but got \"%s\" instead for %s", wantEvent, event, testName) + } + } else { + return fmt.Errorf("something went wrong matching the event: %s", err) } case <-timer.C: if len(foundEvents) > len(wantEvents) { - return fmt.Errorf("Received %d events for %s but %d expected. Found events: %#v", len(foundEvents), testName, len(wantEvents), foundEvents) + return fmt.Errorf("received %d events for %s but %d expected. Found events: %#v", len(foundEvents), testName, len(wantEvents), foundEvents) } } } @@ -690,6 +706,89 @@ func TestReconcile_FeatureFlags(t *testing.T) { }) } } + +// TestReconcile_CloudEvents runs reconcile with a cloud event sink configured +// to ensure that events are sent in different cases +func TestReconcile_CloudEvents(t *testing.T) { + simpleTask := tb.Task("test-task", + tb.TaskSpec(tb.Step("foo", + tb.StepName("simple-step"), tb.StepCommand("/mycmd"), tb.StepEnvVar("foo", "bar"), + )), + tb.TaskNamespace("foo"), + ) + taskRun := tb.TaskRun("test-taskrun-not-started", + tb.TaskRunSelfLink("/test/taskrun1"), + tb.TaskRunNamespace("foo"), + tb.TaskRunSpec(tb.TaskRunTaskRef(simpleTask.Name)), + ) + d := test.Data{ + TaskRuns: []*v1beta1.TaskRun{taskRun}, + Tasks: []*v1beta1.Task{simpleTask}, + } + + names.TestingSeed() + d.ConfigMaps = []*corev1.ConfigMap{ + { + ObjectMeta: metav1.ObjectMeta{Name: config.GetDefaultsConfigName(), Namespace: system.GetNamespace()}, + Data: map[string]string{ + "default-cloud-events-sink": "http://synk:8080", + }, + }, + } + + testAssets, cancel := getTaskRunController(t, d) + defer cancel() + c := testAssets.Controller + clients := testAssets.Clients + saName := "default" + if _, err := clients.Kube.CoreV1().ServiceAccounts(taskRun.Namespace).Create(&corev1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Name: saName, + Namespace: taskRun.Namespace, + }, + }); err != nil { + t.Fatal(err) + } + + if err := c.Reconciler.Reconcile(context.Background(), getRunName(taskRun)); err != nil { + t.Errorf("expected no error. Got error %v", err) + } + if len(clients.Kube.Actions()) == 0 { + t.Errorf("Expected actions to be logged in the kubeclient, got none") + } + + tr, err := clients.Pipeline.TektonV1beta1().TaskRuns(taskRun.Namespace).Get(taskRun.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("getting updated taskrun: %v", err) + } + condition := tr.Status.GetCondition(apis.ConditionSucceeded) + if condition == nil || condition.Status != corev1.ConditionUnknown { + t.Errorf("Expected fresh TaskRun to have in progress status, but had %v", condition) + } + if condition != nil && condition.Reason != v1beta1.TaskRunReasonRunning.String() { + t.Errorf("Expected reason %q but was %s", v1beta1.TaskRunReasonRunning.String(), condition.Reason) + } + + wantEvents := []string{ + "Normal Start", + "Normal Running", + } + err = checkEvents(t, testAssets.Recorder, "reconcile-cloud-events", wantEvents) + if !(err == nil) { + t.Errorf(err.Error()) + } + + wantCloudEvents := []string{ + `(?s)dev.tekton.event.taskrun.started.v1.*test-taskrun-not-started`, + `(?s)dev.tekton.event.taskrun.running.v1.*test-taskrun-not-started`, + } + ceClient := clients.CloudEvents.(cloudevent.FakeClient) + err = checkCloudEvents(t, &ceClient, "reconcile-cloud-events", wantCloudEvents) + if !(err == nil) { + t.Errorf(err.Error()) + } +} + func TestReconcile(t *testing.T) { taskRunSuccess := tb.TaskRun("test-taskrun-run-success", tb.TaskRunNamespace("foo"), tb.TaskRunSpec( tb.TaskRunTaskRef(simpleTask.Name, tb.TaskRefAPIVersion("a1")), @@ -1306,7 +1405,7 @@ func TestReconcile(t *testing.T) { t.Fatalf("Expected actions to be logged in the kubeclient, got none") } - err = checkEvents(testAssets.Recorder, tc.name, tc.wantEvents) + err = checkEvents(t, testAssets.Recorder, tc.name, tc.wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -1524,7 +1623,7 @@ func TestReconcileInvalidTaskRuns(t *testing.T) { t.Errorf("expected 3 actions (first: list namespaces) created by the reconciler, got %d. Actions: %#v", len(actions), actions) } - err := checkEvents(testAssets.Recorder, tc.name, tc.wantEvents) + err := checkEvents(t, testAssets.Recorder, tc.name, tc.wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -1667,7 +1766,7 @@ func TestReconcilePodUpdateStatus(t *testing.T) { "Normal Running Not all Steps", "Normal Succeeded", } - err = checkEvents(testAssets.Recorder, "test-reconcile-pod-updateStatus", wantEvents) + err = checkEvents(t, testAssets.Recorder, "test-reconcile-pod-updateStatus", wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -1749,7 +1848,7 @@ func TestReconcileOnCancelledTaskRun(t *testing.T) { "Normal Started", "Warning Failed TaskRun \"test-taskrun-run-cancelled\" was cancelled", } - err = checkEvents(testAssets.Recorder, "test-reconcile-on-cancelled-taskrun", wantEvents) + err = checkEvents(t, testAssets.Recorder, "test-reconcile-on-cancelled-taskrun", wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -1848,7 +1947,7 @@ func TestReconcileTimeouts(t *testing.T) { if d := cmp.Diff(tc.expectedStatus, condition, ignoreLastTransitionTime); d != "" { t.Fatalf("Did not get expected condition %s", diff.PrintWantGot(d)) } - err = checkEvents(testAssets.Recorder, tc.taskRun.Name, tc.wantEvents) + err = checkEvents(t, testAssets.Recorder, tc.taskRun.Name, tc.wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -2629,7 +2728,7 @@ func TestReconcileTaskResourceResolutionAndValidation(t *testing.T) { } } - err = checkEvents(testAssets.Recorder, tt.desc, tt.wantEvents) + err = checkEvents(t, testAssets.Recorder, tt.desc, tt.wantEvents) if !(err == nil) { t.Errorf(err.Error()) }