From 84a60bd7e8f0420ba98b9a5087ed900136dc7f55 Mon Sep 17 00:00:00 2001 From: Andrea Frittoli Date: Thu, 18 Jun 2020 22:29:29 +0100 Subject: [PATCH] Start emitting CloudEvents for TaskRuns Add a new method 'SendCloudEventWithRetries' to the cloud events controller. It that allows emitting cloud events asynchronously (in a go routine), taking benefit of the retries capabilities of the cloudevents go sdk. Rework the fake client to allow for unit testing of 'SendCloudEventWithRetries'. The new implementation is similar to that of the fake recorder from client-go, which allows to unit test k8s events and cloud events in a similar fashion. Add a new config option default-cloud-events-sink in the defaults config map. This options allows setting a default sink for cloud events. If the default sink is setup, cloud events are sent, else they're disabled. Invoke 'SendCloudEventWithRetries' from the events modules so that we start sending cloud events in the same places we were send k8s events (except for errors). Most of the plumbing is in place to start emitting cloud events for pipeline runs too, however there is a small refactor for the config maps required before we can add that, so this commit only enabled cloud events for task runs for now. --- config/config-defaults.yaml | 9 +- docs/events.md | 27 ++- docs/install.md | 19 ++ pkg/apis/config/default.go | 11 +- .../cloudevent/cloud_event_controller.go | 42 ++++ .../cloudevent/cloud_event_controller_test.go | 179 ++++++++++++++++++ .../events/cloudevent/cloudevent.go | 3 + .../events/cloudevent/cloudeventclient.go | 12 +- .../cloudevent/cloudeventsfakeclient.go | 19 +- pkg/reconciler/events/cloudevent/interface.go | 4 + pkg/reconciler/events/event.go | 52 +++-- pkg/reconciler/events/event_test.go | 101 +++++++++- pkg/reconciler/pipelinerun/pipelinerun.go | 7 +- .../pipelinerun/pipelinerun_test.go | 58 ++++-- pkg/reconciler/taskrun/taskrun.go | 16 +- pkg/reconciler/taskrun/taskrun_test.go | 125 ++++++++++-- 16 files changed, 607 insertions(+), 77 deletions(-) diff --git a/config/config-defaults.yaml b/config/config-defaults.yaml index 1c23c72f20a..052f39c87a9 100644 --- a/config/config-defaults.yaml +++ b/config/config-defaults.yaml @@ -54,4 +54,11 @@ data: # default-pod-template contains the default pod template to use # TaskRun and PipelineRun, if none is specified. If a pod template # is specified, the default pod template is ignored. - # default-pod-template: \ No newline at end of file + # default-pod-template: + + # default-cloud-events-sink contains the default CloudEvents sink to be + # used for TaskRun and PipelineRun, when no sink is specified. + # Note that right now it is still not possible to set a PipelineRun or + # TaskRun specific sink, so the default is the only option available. + # If no sink is specified, no CloudEvent is generated + # default-cloud-events-sink: \ No newline at end of file diff --git a/docs/events.md b/docs/events.md index dcdb46748ee..19e4dd97fa6 100644 --- a/docs/events.md +++ b/docs/events.md @@ -8,8 +8,10 @@ weight: 2 Tekton runtime resources, specifically `TaskRuns` and `PipelineRuns`, emit events when they are executed, so that users can monitor their lifecycle -and react to it. Tekton emits [kubernetes events](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#event-v1-core), that can be retrieve from the resource via -`kubectl describe [resource]`. +and react to it. + +Tekton emits [kubernetes events](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#event-v1-core), that can be retrieve from the resource via `kubectl describe [resource]`. +[Optionally](#CloudEvents) Tekton can emit [CloudEvents](https://github.com/cloudevents/spec) too. No events are emitted for `Conditions` today (https://github.com/tektoncd/pipeline/issues/2461). @@ -47,3 +49,24 @@ No events are emitted for `Conditions` today (https://github.com/tektoncd/pipeli successfully. Causes of failure may be: one the `Tasks` failed or the `PipelineRun` was cancelled or timed out. `Failed` events are also triggered in case the `PipelineRun` cannot be executed at all because of validation issues. + +# CloudEvents + +When a sink is [configured](../install.md#configuring-cloudevents-notifications), the following events +will be generated by appropriate controller when a lifecycle event happens for `TaskRun` or +`PipelineRun`. + +The complete table of events: + +Reasource |Event |Event Type +:-------------|:-------:|:---------------------------------------------------------- +TaskRun | Started | dev.tekton.event.taskrun.started.v1 +TaskRun | Running | dev.tekton.event.taskrun.runnning.v1 +TaskRun | Condition Change while Running | dev.tekton.event.taskrun.unknown.v1 +TaskRun | Succeed | dev.tekton.event.taskrun.successful.v1 +TaskRun | Failed | dev.tekton.event.taskrun.failed.v1 +PipelineRun | Started | dev.tekton.event.pipelinerun.started.v1 +PipelineRun | Running | dev.tekton.event.pipelinerun.runnning.v1 +PipelineRun | Condition Change while Running | dev.tekton.event.pipelinerun.unknown.v1 +PipelineRun | Succeed | dev.tekton.event.pipelinerun.successful.v1 +PipelineRun | Failed | dev.tekton.event.pipelinerun.failed.v1 diff --git a/docs/install.md b/docs/install.md index 57d10df40fc..e455c91a05e 100644 --- a/docs/install.md +++ b/docs/install.md @@ -235,6 +235,25 @@ data: bucket.service.account.field.name: GOOGLE_APPLICATION_CREDENTIALS ``` +## Configuring CloudEvents notifications + +When configured so, Tekton can generate `CloudEvents` for `TaskRun` and `PipelineRun` lifecycle +events. The only configuration parameter is the URL of the sink. When not set, no notification is +generared. + +``` +apiVersion: v1 +kind: ConfigMap +metadata: + name: config-defaults + namespace: tekton-pipelines + labels: + app.kubernetes.io/instance: default + app.kubernetes.io/part-of: tekton-pipelines +data: + default-cloud-events-sink: https://my-sink-url +``` + ## Customizing basic execution parameters You can specify your own values that replace the default service account (`ServiceAccount`), timeout (`Timeout`), and Pod template (`PodTemplate`) values used by Tekton Pipelines in `TaskRun` and `PipelineRun` definitions. To do so, modify the ConfigMap `config-defaults` with your desired values. diff --git a/pkg/apis/config/default.go b/pkg/apis/config/default.go index 4f89ca4bd45..04b30a10a82 100644 --- a/pkg/apis/config/default.go +++ b/pkg/apis/config/default.go @@ -35,6 +35,8 @@ const ( defaultManagedByLabelValueKey = "default-managed-by-label-value" DefaultManagedByLabelValue = "tekton-pipelines" defaultPodTemplateKey = "default-pod-template" + defaultCloudEventsSinkKey = "default-cloud-events-sink" + DefaultCloudEventSinkValue = "" ) // Defaults holds the default configurations @@ -44,6 +46,7 @@ type Defaults struct { DefaultServiceAccount string DefaultManagedByLabelValue string DefaultPodTemplate *pod.Template + DefaultCloudEventsSink string } // GetBucketConfigName returns the name of the configmap containing all @@ -68,7 +71,8 @@ func (cfg *Defaults) Equals(other *Defaults) bool { return other.DefaultTimeoutMinutes == cfg.DefaultTimeoutMinutes && other.DefaultServiceAccount == cfg.DefaultServiceAccount && other.DefaultManagedByLabelValue == cfg.DefaultManagedByLabelValue && - other.DefaultPodTemplate.Equals(cfg.DefaultPodTemplate) + other.DefaultPodTemplate.Equals(cfg.DefaultPodTemplate) && + other.DefaultCloudEventsSink == cfg.DefaultCloudEventsSink } // NewDefaultsFromMap returns a Config given a map corresponding to a ConfigMap @@ -76,6 +80,7 @@ func NewDefaultsFromMap(cfgMap map[string]string) (*Defaults, error) { tc := Defaults{ DefaultTimeoutMinutes: DefaultTimeoutMinutes, DefaultManagedByLabelValue: DefaultManagedByLabelValue, + DefaultCloudEventsSink: DefaultCloudEventSinkValue, } if defaultTimeoutMin, ok := cfgMap[defaultTimeoutMinutesKey]; ok { @@ -102,6 +107,10 @@ func NewDefaultsFromMap(cfgMap map[string]string) (*Defaults, error) { tc.DefaultPodTemplate = &podTemplate } + if defaultCloudEventsSink, ok := cfgMap[defaultCloudEventsSinkKey]; ok { + tc.DefaultCloudEventsSink = defaultCloudEventsSink + } + return &tc, nil } diff --git a/pkg/reconciler/events/cloudevent/cloud_event_controller.go b/pkg/reconciler/events/cloudevent/cloud_event_controller.go index 523520273b5..8d6db1d13d2 100644 --- a/pkg/reconciler/events/cloudevent/cloud_event_controller.go +++ b/pkg/reconciler/events/cloudevent/cloud_event_controller.go @@ -18,6 +18,7 @@ package cloudevent import ( "context" + "errors" "time" cloudevents "github.com/cloudevents/sdk-go/v2" @@ -26,7 +27,11 @@ import ( resource "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1" "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1/cloudevent" "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + controller "knative.dev/pkg/controller" + "knative.dev/pkg/logging" ) // InitializeCloudEvents initializes the CloudEvents part of the @@ -108,3 +113,40 @@ func SendCloudEvents(tr *v1beta1.TaskRun, ceclient CEClient, logger *zap.Sugared } return merr.ErrorOrNil() } + +// SendCloudEventWithRetries sends a cloud event for the specified resource. +// It does not block and it perform retries with backoff using the cloudevents +// sdk-go capabilities. +// It accepts a runtime.Object to avoid making objectWithCondition public since +// it's only used within the events/cloudevents packages. +func SendCloudEventWithRetries(ctx context.Context, object runtime.Object) error { + var ( + o objectWithCondition + ok bool + ) + if o, ok = object.(objectWithCondition); !ok { + return errors.New("Input object does not satisfy objectWithCondition") + } + logger := logging.FromContext(ctx) + ceClient := Get(ctx) + if ceClient == nil { + return errors.New("No cloud events client found in the context") + } + event, err := EventForObjectWithCondition(o) + if err != nil { + return err + } + + go func() { + if result := ceClient.Send(cloudevents.ContextWithRetriesExponentialBackoff(ctx, 10*time.Millisecond, 10), *event); !cloudevents.IsACK(result) { + logger.Warnf("Failed to send cloudevent: %s", result.Error()) + recorder := controller.GetEventRecorder(ctx) + if recorder == nil { + logger.Warnf("No recorder in context, cannot emit error event") + } + recorder.Event(object, corev1.EventTypeWarning, "Cloud Event Failure", result.Error()) + } + }() + + return nil +} diff --git a/pkg/reconciler/events/cloudevent/cloud_event_controller_test.go b/pkg/reconciler/events/cloudevent/cloud_event_controller_test.go index df462fa8488..a237c767835 100644 --- a/pkg/reconciler/events/cloudevent/cloud_event_controller_test.go +++ b/pkg/reconciler/events/cloudevent/cloud_event_controller_test.go @@ -17,7 +17,11 @@ limitations under the License. package cloudevent import ( + "context" + "fmt" + "strings" "testing" + "time" "github.com/google/go-cmp/cmp" tb "github.com/tektoncd/pipeline/internal/builder/v1beta1" @@ -25,8 +29,14 @@ import ( resourcev1alpha1 "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1" "github.com/tektoncd/pipeline/pkg/logging" "github.com/tektoncd/pipeline/test/diff" + "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/record" "knative.dev/pkg/apis" + duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1" + "knative.dev/pkg/controller" + rtesting "knative.dev/pkg/reconciler/testing" ) func TestCloudEventDeliveryFromTargets(t *testing.T) { @@ -283,3 +293,172 @@ func TestInitializeCloudEvents(t *testing.T) { }) } } + +func TestSendCloudEventWithRetries(t *testing.T) { + + objectStatus := duckv1beta1.Status{ + Conditions: []apis.Condition{{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionTrue, + }}, + } + + tests := []struct { + name string + clientBehaviour FakeClientBehaviour + object objectWithCondition + wantCEvent string + wantEvent string + }{{ + name: "test-send-cloud-event-taskrun", + clientBehaviour: FakeClientBehaviour{ + SendSuccessfully: true, + }, + object: &v1beta1.TaskRun{ + ObjectMeta: metav1.ObjectMeta{ + SelfLink: "/taskruns/test1", + }, + Status: v1beta1.TaskRunStatus{Status: objectStatus}, + }, + wantCEvent: "Validation: valid", + wantEvent: "", + }, { + name: "test-send-cloud-event-pipelinerun", + clientBehaviour: FakeClientBehaviour{ + SendSuccessfully: true, + }, + object: &v1beta1.PipelineRun{ + ObjectMeta: metav1.ObjectMeta{ + SelfLink: "/pipelineruns/test1", + }, + Status: v1beta1.PipelineRunStatus{Status: objectStatus}, + }, + wantCEvent: "Validation: valid", + wantEvent: "", + }, { + name: "test-send-cloud-event-failed", + clientBehaviour: FakeClientBehaviour{ + SendSuccessfully: false, + }, + object: &v1beta1.PipelineRun{ + Status: v1beta1.PipelineRunStatus{Status: objectStatus}, + }, + wantCEvent: "", + wantEvent: "Warning Cloud Event Failure", + }} + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx := setupFakeContext(t, tc.clientBehaviour, true) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + err := SendCloudEventWithRetries(ctx, tc.object) + if err != nil { + t.Fatalf("Unexpected error sending cloud events: %v", err) + } + ceClient := Get(ctx).(FakeClient) + err = checkCloudEvents(t, &ceClient, tc.name, tc.wantCEvent) + if err != nil { + t.Fatalf(err.Error()) + } + recorder := controller.GetEventRecorder(ctx).(*record.FakeRecorder) + err = checkEvents(t, recorder, tc.name, tc.wantEvent) + if err != nil { + t.Fatalf(err.Error()) + } + }) + } +} + +func TestSendCloudEventWithRetriesInvalid(t *testing.T) { + + tests := []struct { + name string + object objectWithCondition + wantCEvent string + wantEvent string + }{{ + name: "test-send-cloud-event-invalid-taskrun", + object: &v1beta1.TaskRun{ + Status: v1beta1.TaskRunStatus{}, + }, + wantCEvent: "Validation: valid", + wantEvent: "", + }, { + name: "test-send-cloud-event-pipelinerun", + object: &v1beta1.PipelineRun{ + Status: v1beta1.PipelineRunStatus{}, + }, + wantCEvent: "Validation: valid", + wantEvent: "", + }} + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx := setupFakeContext(t, FakeClientBehaviour{ + SendSuccessfully: true, + }, true) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + err := SendCloudEventWithRetries(ctx, tc.object) + if err == nil { + t.Fatalf("Expected an error sending cloud events for invalid object, got none") + } + }) + } +} + +func TestSendCloudEventWithRetriesNoClient(t *testing.T) { + + ctx := setupFakeContext(t, FakeClientBehaviour{}, false) + err := SendCloudEventWithRetries(ctx, &v1beta1.TaskRun{Status: v1beta1.TaskRunStatus{}}) + if err == nil { + t.Fatalf("Expected an error sending cloud events with no client in the context, got none") + } + if d := cmp.Diff("No cloud events client found in the context", err.Error()); d != "" { + t.Fatalf("Unexpected error message %s", diff.PrintWantGot(d)) + } +} + +func setupFakeContext(t *testing.T, behaviour FakeClientBehaviour, withClient bool) context.Context { + var ctx context.Context + ctx, _ = rtesting.SetupFakeContext(t) + if withClient { + ctx = WithClient(ctx, &behaviour) + } + return ctx +} + +func testLogger(t *testing.T) *zap.SugaredLogger { + logger, err := zap.NewDevelopment(zap.AddCaller()) + if err != nil { + t.Fatalf("failed to create logger: %s", err) + } + return logger.Sugar().Named(t.Name()) +} + +func eventFromChannel(c chan string, testName string, wantEvent string) error { + timer := time.NewTimer(1 * time.Second) + select { + case event := <-c: + if wantEvent == "" { + return fmt.Errorf("received event \"%s\" for %s but none expected", event, testName) + } + if !(strings.HasPrefix(event, wantEvent)) { + return fmt.Errorf("expected event \"%s\" but got \"%s\" instead for %s", wantEvent, event, testName) + } + case <-timer.C: + if wantEvent != "" { + return fmt.Errorf("received no events for %s but %s expected", testName, wantEvent) + } + } + return nil +} + +func checkEvents(t *testing.T, fr *record.FakeRecorder, testName string, wantEvent string) error { + t.Helper() + return eventFromChannel(fr.Events, testName, wantEvent) +} + +func checkCloudEvents(t *testing.T, fce *FakeClient, testName string, wantEvent string) error { + t.Helper() + return eventFromChannel(fce.Events, testName, wantEvent) +} diff --git a/pkg/reconciler/events/cloudevent/cloudevent.go b/pkg/reconciler/events/cloudevent/cloudevent.go index 16634741907..1bff4d4b852 100644 --- a/pkg/reconciler/events/cloudevent/cloudevent.go +++ b/pkg/reconciler/events/cloudevent/cloudevent.go @@ -133,6 +133,9 @@ func EventForPipelineRun(pipelineRun *v1beta1.PipelineRun) (*cloudevents.Event, func getEventType(runObject objectWithCondition) (*TektonEventType, error) { c := runObject.GetStatusCondition().GetCondition(apis.ConditionSucceeded) + if c == nil { + return nil, fmt.Errorf("no condition for ConditionSucceeded in %T", runObject) + } var eventType TektonEventType switch { case c.IsUnknown(): diff --git a/pkg/reconciler/events/cloudevent/cloudeventclient.go b/pkg/reconciler/events/cloudevent/cloudeventclient.go index 372f8db4ad5..1cd69b16bfc 100644 --- a/pkg/reconciler/events/cloudevent/cloudeventclient.go +++ b/pkg/reconciler/events/cloudevent/cloudeventclient.go @@ -34,7 +34,9 @@ type CECKey struct{} func withCloudEventClient(ctx context.Context, cfg *rest.Config) context.Context { logger := logging.FromContext(ctx) + p, err := cloudevents.NewHTTP() + if err != nil { logger.Panicf("Error creating the cloudevents http protocol: %s", err) return ctx @@ -52,8 +54,14 @@ func withCloudEventClient(ctx context.Context, cfg *rest.Config) context.Context func Get(ctx context.Context) CEClient { untyped := ctx.Value(CECKey{}) if untyped == nil { - logging.FromContext(ctx).Fatalf( - "Unable to fetch %T from context.", (CEClient)(nil)) + logging.FromContext(ctx).Errorf( + "Unable to fetch client from context.") + return nil } return untyped.(CEClient) } + +// ToContext adds the cloud events client to the context +func ToContext(ctx context.Context, cec CEClient) context.Context { + return context.WithValue(ctx, CECKey{}, cec) +} diff --git a/pkg/reconciler/events/cloudevent/cloudeventsfakeclient.go b/pkg/reconciler/events/cloudevent/cloudeventsfakeclient.go index a2665952b3d..252ba72be7e 100644 --- a/pkg/reconciler/events/cloudevent/cloudeventsfakeclient.go +++ b/pkg/reconciler/events/cloudevent/cloudeventsfakeclient.go @@ -24,44 +24,47 @@ import ( "github.com/cloudevents/sdk-go/v2/protocol" ) +const bufferSize = 100 + // FakeClientBehaviour defines how the client will behave type FakeClientBehaviour struct { SendSuccessfully bool } // FakeClient is a fake CloudEvent client for unit testing -// Holding a pointer to the behaviour allows to change the behaviour of a client +// Holding pointer to the behaviour allows to change the behaviour of a client type FakeClient struct { behaviour *FakeClientBehaviour - event cloudevents.Event + // Modelled after k8s.io/client-go fake recorder + Events chan string } // NewFakeClient is a FakeClient factory, it returns a client for the target func NewFakeClient(behaviour *FakeClientBehaviour) cloudevents.Client { - c := FakeClient{ + return FakeClient{ behaviour: behaviour, + Events: make(chan string, bufferSize), } - return c } var _ cloudevents.Client = (*FakeClient)(nil) // Send fakes the Send method from cloudevents.Client func (c FakeClient) Send(ctx context.Context, event cloudevents.Event) protocol.Result { - c.event = event if c.behaviour.SendSuccessfully { + c.Events <- fmt.Sprintf("%s", event.String()) return nil } - return fmt.Errorf("%s had to fail", event.ID()) + return fmt.Errorf("Had to fail. Event ID: %s", event.ID()) } // Request fakes the Request method from cloudevents.Client func (c FakeClient) Request(ctx context.Context, event cloudevents.Event) (*cloudevents.Event, protocol.Result) { - c.event = event if c.behaviour.SendSuccessfully { + c.Events <- fmt.Sprintf("%v", event.String()) return &event, nil } - return nil, fmt.Errorf("%s had to fail", event.ID()) + return nil, fmt.Errorf("Had to fail. Event ID: %s", event.ID()) } // StartReceiver fakes StartReceiver method from cloudevents.Client diff --git a/pkg/reconciler/events/cloudevent/interface.go b/pkg/reconciler/events/cloudevent/interface.go index de168fd9dbb..b58ad479d9b 100644 --- a/pkg/reconciler/events/cloudevent/interface.go +++ b/pkg/reconciler/events/cloudevent/interface.go @@ -18,12 +18,16 @@ package cloudevent import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "knative.dev/pkg/apis" ) // objectWithCondition is implemented by TaskRun and PipelineRun type objectWithCondition interface { + // Object requires GetObjectKind() and DeepCopyObject() + runtime.Object + // ObjectMetaAccessor requires a GetObjectMeta that returns the ObjectMeta metav1.ObjectMetaAccessor diff --git a/pkg/reconciler/events/event.go b/pkg/reconciler/events/event.go index 4571a3e54c7..54d0d1ede9c 100644 --- a/pkg/reconciler/events/event.go +++ b/pkg/reconciler/events/event.go @@ -16,11 +16,18 @@ limitations under the License. package events import ( + "context" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/tektoncd/pipeline/pkg/apis/config" + "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" "knative.dev/pkg/apis" + "knative.dev/pkg/controller" + "knative.dev/pkg/logging" ) const ( @@ -34,16 +41,39 @@ const ( EventReasonError = "Error" ) -// Emit emits an event for object if afterCondition is different from beforeCondition -// -// Status "ConditionUnknown": -// beforeCondition == nil, emit EventReasonStarted -// beforeCondition != nil, emit afterCondition.Reason -// -// Status "ConditionTrue": emit EventReasonSucceded -// Status "ConditionFalse": emit EventReasonFailed +// Emit emits events for object +// Two types of events are supported, k8s and cloud events. // -func Emit(c record.EventRecorder, beforeCondition *apis.Condition, afterCondition *apis.Condition, object runtime.Object) { +// k8s events are always sent if afterCondition is different from beforeCondition +// Cloud events are always sent if enabled, i.e. if a sink is available +func Emit(ctx context.Context, beforeCondition *apis.Condition, afterCondition *apis.Condition, object runtime.Object) { + recorder := controller.GetEventRecorder(ctx) + logger := logging.FromContext(ctx) + configs := config.FromContextOrDefaults(ctx) + sendCloudEvents := (configs.Defaults.DefaultCloudEventsSink != "") + if sendCloudEvents { + ctx = cloudevents.ContextWithTarget(ctx, configs.Defaults.DefaultCloudEventsSink) + } + + sendKubernetesEvents(recorder, beforeCondition, afterCondition, object) + + if sendCloudEvents { + err := cloudevent.SendCloudEventWithRetries(ctx, object) + if err != nil { + logger.Warnf("Failed to emit cloud events %v", err.Error()) + } + } +} + +func sendKubernetesEvents(c record.EventRecorder, beforeCondition *apis.Condition, afterCondition *apis.Condition, object runtime.Object) { + // Events that are going to be sent + // + // Status "ConditionUnknown": + // beforeCondition == nil, emit EventReasonStarted + // beforeCondition != nil, emit afterCondition.Reason + // + // Status "ConditionTrue": emit EventReasonSucceded + // Status "ConditionFalse": emit EventReasonFailed if !equality.Semantic.DeepEqual(beforeCondition, afterCondition) && afterCondition != nil { // If the condition changed, and the target condition is not empty, we send an event switch afterCondition.Status { @@ -82,8 +112,8 @@ func EmitError(c record.EventRecorder, err error, object runtime.Object) { // Status "ConditionTrue": emit EventReasonSucceded // Status "ConditionFalse": emit EventReasonFailed // Deprecated: use Emit -func EmitEvent(c record.EventRecorder, beforeCondition *apis.Condition, afterCondition *apis.Condition, object runtime.Object) { - Emit(c, beforeCondition, afterCondition, object) +func EmitEvent(ctx context.Context, beforeCondition *apis.Condition, afterCondition *apis.Condition, object runtime.Object) { + Emit(ctx, beforeCondition, afterCondition, object) } // EmitErrorEvent emits a failure associated to an error diff --git a/pkg/reconciler/events/event_test.go b/pkg/reconciler/events/event_test.go index 2c605d3c4da..9ba637259c3 100644 --- a/pkg/reconciler/events/event_test.go +++ b/pkg/reconciler/events/event_test.go @@ -19,17 +19,23 @@ package events import ( "errors" "fmt" - "strings" + "regexp" "testing" "time" + "github.com/tektoncd/pipeline/pkg/apis/config" + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" + "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/record" "knative.dev/pkg/apis" + duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1" + "knative.dev/pkg/controller" + rtesting "knative.dev/pkg/reconciler/testing" ) -func TestEmit(t *testing.T) { +func TestSendKubernetesEvents(t *testing.T) { testcases := []struct { name string before *apis.Condition @@ -136,7 +142,7 @@ func TestEmit(t *testing.T) { for _, ts := range testcases { fr := record.NewFakeRecorder(1) tr := &corev1.Pod{} - Emit(fr, ts.before, ts.after, tr) + sendKubernetesEvents(fr, ts.before, ts.after, tr) err := checkEvents(t, fr, ts.name, ts.wantEvent) if err != nil { @@ -172,16 +178,85 @@ func TestEmitError(t *testing.T) { } } -func checkEvents(t *testing.T, fr *record.FakeRecorder, testName string, wantEvent string) error { - t.Helper() +func TestEmit(t *testing.T) { + objectStatus := duckv1beta1.Status{ + Conditions: []apis.Condition{{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionUnknown, + Reason: v1beta1.PipelineRunReasonStarted.String(), + }}, + } + object := &v1beta1.PipelineRun{ + ObjectMeta: metav1.ObjectMeta{ + SelfLink: "/pipelineruns/test1", + }, + Status: v1beta1.PipelineRunStatus{Status: objectStatus}, + } + after := &apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionUnknown, + Message: "just starting", + } + testcases := []struct { + name string + data map[string]string + wantEvent string + wantCloudEvent string + }{{ + name: "without sink", + data: map[string]string{}, + wantEvent: "Normal Started", + wantCloudEvent: "", + }, { + name: "with empty string sink", + data: map[string]string{"default-cloud-events-sink": ""}, + wantEvent: "Normal Started", + wantCloudEvent: "", + }, { + name: "with sink", + data: map[string]string{"default-cloud-events-sink": "http://mysink"}, + wantEvent: "Normal Started", + wantCloudEvent: `(?s)dev.tekton.event.pipelinerun.started.v1.*test1`, + }} + + for _, tc := range testcases { + // Setup the context and seed test data + ctx, _ := rtesting.SetupFakeContext(t) + ctx = cloudevent.WithClient(ctx, &cloudevent.FakeClientBehaviour{SendSuccessfully: true}) + fakeClient := cloudevent.Get(ctx).(cloudevent.FakeClient) + + // Setup the config and add it to the context + defaults, _ := config.NewDefaultsFromMap(tc.data) + featureFlags, _ := config.NewFeatureFlagsFromMap(map[string]string{}) + cfg := &config.Config{ + Defaults: defaults, + FeatureFlags: featureFlags, + } + ctx = config.ToContext(ctx, cfg) + + recorder := controller.GetEventRecorder(ctx).(*record.FakeRecorder) + Emit(ctx, nil, after, object) + if err := checkEvents(t, recorder, tc.name, tc.wantEvent); err != nil { + t.Fatalf(err.Error()) + } + if err := checkCloudEvents(t, &fakeClient, tc.name, tc.wantCloudEvent); err != nil { + t.Fatalf(err.Error()) + } + } +} + +func eventFromChannel(c chan string, testName string, wantEvent string) error { timer := time.NewTimer(1 * time.Second) select { - case event := <-fr.Events: + case event := <-c: if wantEvent == "" { return fmt.Errorf("received event \"%s\" for %s but none expected", event, testName) } - if !(strings.HasPrefix(event, wantEvent)) { - return fmt.Errorf("expected event \"%s\" but got \"%s\" instead for %s", wantEvent, event, testName) + matching, err := regexp.MatchString(wantEvent, event) + if err == nil { + if !matching { + return fmt.Errorf("expected event \"%s\" but got \"%s\" instead for %s", wantEvent, event, testName) + } } case <-timer.C: if wantEvent != "" { @@ -190,3 +265,13 @@ func checkEvents(t *testing.T, fr *record.FakeRecorder, testName string, wantEve } return nil } + +func checkEvents(t *testing.T, fr *record.FakeRecorder, testName string, wantEvent string) error { + t.Helper() + return eventFromChannel(fr.Events, testName, wantEvent) +} + +func checkCloudEvents(t *testing.T, fce *cloudevent.FakeClient, testName string, wantEvent string) error { + t.Helper() + return eventFromChannel(fce.Events, testName, wantEvent) +} diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index 1730afbe7d0..dc440e2abf8 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -150,7 +150,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun) // We also want to send the "Started" event as soon as possible for anyone who may be waiting // on the event to perform user facing initialisations, such has reset a CI check status afterCondition := pr.Status.GetCondition(apis.ConditionSucceeded) - events.Emit(controller.GetEventRecorder(ctx), nil, afterCondition, pr) + events.Emit(ctx, nil, afterCondition, pr) // We already sent an event for start, so update `before` with the current status before = pr.Status.GetCondition(apis.ConditionSucceeded) @@ -213,15 +213,14 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun) } func (c *Reconciler) finishReconcileUpdateEmitEvents(ctx context.Context, pr *v1beta1.PipelineRun, beforeCondition *apis.Condition, previousError error) error { - recorder := controller.GetEventRecorder(ctx) logger := logging.FromContext(ctx) afterCondition := pr.Status.GetCondition(apis.ConditionSucceeded) - events.Emit(recorder, beforeCondition, afterCondition, pr) + events.Emit(ctx, beforeCondition, afterCondition, pr) _, err := c.updateLabelsAndAnnotations(pr) if err != nil { logger.Warn("Failed to update PipelineRun labels/annotations", zap.Error(err)) - events.EmitError(recorder, err, pr) + events.EmitError(controller.GetEventRecorder(ctx), err, pr) } merr := multierror.Append(previousError, err).ErrorOrNil() diff --git a/pkg/reconciler/pipelinerun/pipelinerun_test.go b/pkg/reconciler/pipelinerun/pipelinerun_test.go index 8ebe3c1c110..130361f4b1b 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun_test.go +++ b/pkg/reconciler/pipelinerun/pipelinerun_test.go @@ -32,6 +32,7 @@ import ( "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" resourcev1alpha1 "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1" + "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent" "github.com/tektoncd/pipeline/pkg/reconciler/pipelinerun/resources" taskrunresources "github.com/tektoncd/pipeline/pkg/reconciler/taskrun/resources" ttesting "github.com/tektoncd/pipeline/pkg/reconciler/testing" @@ -102,8 +103,18 @@ func conditionCheckFromTaskRun(tr *v1beta1.TaskRun) *v1beta1.ConditionCheck { return &cc } -func checkEvents(fr *record.FakeRecorder, testName string, wantEvents []string) error { - // The fake recorder runs in a go routine, so the timeout is here to avoid waiting +func checkEvents(t *testing.T, fr *record.FakeRecorder, testName string, wantEvents []string) error { + t.Helper() + return eventFromChannel(fr.Events, testName, wantEvents) +} + +func checkCloudEvents(t *testing.T, fce *cloudevent.FakeClient, testName string, wantEvents []string) error { + t.Helper() + return eventFromChannel(fce.Events, testName, wantEvents) +} + +func eventFromChannel(c chan string, testName string, wantEvents []string) error { + // We get events from a channel, so the timeout is here to avoid waiting // on the channel forever if fewer than expected events are received. // We only hit the timeout in case of failure of the test, so the actual value // of the timeout is not so relevant, it's only used when tests are going to fail. @@ -115,18 +126,23 @@ func checkEvents(fr *record.FakeRecorder, testName string, wantEvents []string) // we exit the loop. If we never receive enough events, the timeout takes us // out of the loop. select { - case event := <-fr.Events: + case event := <-c: foundEvents = append(foundEvents, event) if ii > len(wantEvents)-1 { - return fmt.Errorf("Received event \"%s\" for %s but not more expected", event, testName) + return fmt.Errorf("received event \"%s\" for %s but not more expected", event, testName) } wantEvent := wantEvents[ii] - if !(strings.HasPrefix(event, wantEvent)) { - return fmt.Errorf("Expected event \"%s\" but got \"%s\" instead for %s", wantEvent, event, testName) + matching, err := regexp.MatchString(wantEvent, event) + if err == nil { + if !matching { + return fmt.Errorf("expected event \"%s\" but got \"%s\" instead for %s", wantEvent, event, testName) + } + } else { + return fmt.Errorf("something went wrong matching the event: %s", err) } case <-timer.C: if len(foundEvents) > len(wantEvents) { - return fmt.Errorf("Received %d events for %s but %d expected. Found events: %#v", len(foundEvents), testName, len(wantEvents), foundEvents) + return fmt.Errorf("received %d events for %s but %d expected. Found events: %#v", len(foundEvents), testName, len(wantEvents), foundEvents) } } } @@ -339,7 +355,7 @@ func TestReconcile(t *testing.T) { "Normal Started", "Normal Running Tasks Completed: 0", } - err = checkEvents(testAssets.Recorder, "test-pipeline-run-success", wantEvents) + err = checkEvents(t, testAssets.Recorder, "test-pipeline-run-success", wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -434,7 +450,7 @@ func TestReconcile_PipelineSpecTaskSpec(t *testing.T) { "Normal Started", "Normal Running Tasks Completed: 0", } - err = checkEvents(testAssets.Recorder, "test-pipeline-run-success", wantEvents) + err = checkEvents(t, testAssets.Recorder, "test-pipeline-run-success", wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -612,7 +628,7 @@ func TestReconcile_InvalidPipelineRuns(t *testing.T) { permanentError: true, wantEvents: []string{ "Normal Started", - "Warning Failed Pipeline foo/embedded-pipeline-invalid can't be Run; it has an invalid spec: invalid value \"bad-t@$k\"", + "Warning Failed Pipeline foo/embedded-pipeline-invalid can't be Run; it has an invalid spec", }, }, { name: "invalid-embedded-pipeline-mismatching-parameter-types", @@ -708,7 +724,7 @@ func TestReconcile_InvalidPipelineRuns(t *testing.T) { } // Check generated events match what's expected wantEvents := append(tc.wantEvents, "Warning InternalError 1 error occurred") - err = checkEvents(testAssets.Recorder, tc.pipelineRun.Name, wantEvents) + err = checkEvents(t, testAssets.Recorder, tc.pipelineRun.Name, wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -1058,7 +1074,7 @@ func TestReconcileOnCompletedPipelineRun(t *testing.T) { wantEvents := []string{ "Normal Succeeded All Tasks have completed executing", } - err = checkEvents(testAssets.Recorder, "test-pipeline-run-completed", wantEvents) + err = checkEvents(t, testAssets.Recorder, "test-pipeline-run-completed", wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -1125,7 +1141,7 @@ func TestReconcileOnCancelledPipelineRun(t *testing.T) { wantEvents := []string{ "Warning Failed PipelineRun \"test-pipeline-run-cancelled\" was cancelled", } - err = checkEvents(testAssets.Recorder, "test-pipeline-run-cancelled", wantEvents) + err = checkEvents(t, testAssets.Recorder, "test-pipeline-run-cancelled", wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -1192,7 +1208,7 @@ func TestReconcileWithTimeout(t *testing.T) { wantEvents := []string{ "Warning Failed PipelineRun \"test-pipeline-run-with-timeout\" failed to finish within \"12h0m0s\"", } - err = checkEvents(testAssets.Recorder, "test-pipeline-run-with-timeout", wantEvents) + err = checkEvents(t, testAssets.Recorder, "test-pipeline-run-with-timeout", wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -1316,7 +1332,7 @@ func TestReconcileCancelledFailsTaskRunCancellation(t *testing.T) { "Normal PipelineRunCouldntCancel PipelineRun \"test-pipeline-fails-to-cancel\" was cancelled but had errors trying to cancel TaskRuns", "Warning InternalError 1 error occurred", } - err = checkEvents(testAssets.Recorder, prName, wantEvents) + err = checkEvents(t, testAssets.Recorder, prName, wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -1376,7 +1392,7 @@ func TestReconcileCancelledPipelineRun(t *testing.T) { wantEvents := []string{ "Warning Failed PipelineRun \"test-pipeline-run-cancelled\" was cancelled", } - err = checkEvents(testAssets.Recorder, "test-pipeline-run-cancelled", wantEvents) + err = checkEvents(t, testAssets.Recorder, "test-pipeline-run-cancelled", wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -1636,7 +1652,7 @@ func TestReconcileWithTimeoutAndRetry(t *testing.T) { if status := reconciledRun.Status.TaskRuns["hello-world-1"].Status.GetCondition(apis.ConditionSucceeded).Status; status != tc.conditionSucceeded { t.Fatalf("Succeeded expected to be %s but is %s", tc.conditionSucceeded, status) } - err = checkEvents(testAssets.Recorder, prs[0].Name, tc.wantEvents) + err = checkEvents(t, testAssets.Recorder, prs[0].Name, tc.wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -1967,9 +1983,9 @@ func TestReconcileWithConditionChecks(t *testing.T) { wantEvents := []string{ "Normal Started", - "Normal Running Tasks Completed: 0 (Failed: 0, Cancelled 0), Incomplete: 1, Skipped: 0", + "Normal Running Tasks Completed: 0 \\(Failed: 0, Cancelled 0\\), Incomplete: 1, Skipped: 0", } - err = checkEvents(testAssets.Recorder, "test-pipeline-run-completed", wantEvents) + err = checkEvents(t, testAssets.Recorder, "test-pipeline-run-completed", wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -2099,9 +2115,9 @@ func TestReconcileWithFailingConditionChecks(t *testing.T) { wantEvents := []string{ "Normal Started", - "Normal Running Tasks Completed: 1 (Failed: 0, Cancelled 0), Incomplete: 1, Skipped: 1", + "Normal Running Tasks Completed: 1 \\(Failed: 0, Cancelled 0\\), Incomplete: 1, Skipped: 1", } - err = checkEvents(testAssets.Recorder, "test-pipeline-run-completed", wantEvents) + err = checkEvents(t, testAssets.Recorder, "test-pipeline-run-completed", wantEvents) if !(err == nil) { t.Errorf(err.Error()) } diff --git a/pkg/reconciler/taskrun/taskrun.go b/pkg/reconciler/taskrun/taskrun.go index 06650cef615..f8c4a1c56a6 100644 --- a/pkg/reconciler/taskrun/taskrun.go +++ b/pkg/reconciler/taskrun/taskrun.go @@ -87,7 +87,8 @@ var _ taskrunreconciler.Interface = (*Reconciler)(nil) // resource with the current status of the resource. func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1beta1.TaskRun) pkgreconciler.Event { logger := logging.FromContext(ctx) - recorder := controller.GetEventRecorder(ctx) + ctx = cloudevent.ToContext(ctx, c.cloudEventClient) + // Read the initial condition before := tr.Status.GetCondition(apis.ConditionSucceeded) @@ -106,7 +107,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1beta1.TaskRun) pkg // We also want to send the "Started" event as soon as possible for anyone who may be waiting // on the event to perform user facing initialisations, such has reset a CI check status afterCondition := tr.Status.GetCondition(apis.ConditionSucceeded) - events.Emit(recorder, nil, afterCondition, tr) + events.Emit(ctx, nil, afterCondition, tr) } // If the TaskRun is complete, run some post run fixtures when applicable @@ -196,12 +197,15 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1beta1.TaskRun) pkg } func (c *Reconciler) finishReconcileUpdateEmitEvents(ctx context.Context, tr *v1beta1.TaskRun, beforeCondition *apis.Condition, previousError error) error { - recorder := controller.GetEventRecorder(ctx) - afterCondition := tr.Status.GetCondition(apis.ConditionSucceeded) - events.Emit(recorder, beforeCondition, afterCondition, tr) + + // Send k8s events and cloud events (when configured) + events.Emit(ctx, beforeCondition, afterCondition, tr) + _, err := c.updateLabelsAndAnnotations(tr) - events.EmitError(recorder, err, tr) + if err != nil { + events.EmitError(controller.GetEventRecorder(ctx), err, tr) + } if controller.IsPermanentError(previousError) { return controller.NewPermanentError(multierror.Append(previousError, err)) } diff --git a/pkg/reconciler/taskrun/taskrun_test.go b/pkg/reconciler/taskrun/taskrun_test.go index a11db4f5217..15010cd6eac 100644 --- a/pkg/reconciler/taskrun/taskrun_test.go +++ b/pkg/reconciler/taskrun/taskrun_test.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "regexp" "strings" "testing" "time" @@ -313,8 +314,18 @@ func getTaskRunController(t *testing.T, d test.Data) (test.Assets, func()) { }, cancel } -func checkEvents(fr *record.FakeRecorder, testName string, wantEvents []string) error { - // The fake recorder runs in a go routine, so the timeout is here to avoid waiting +func checkEvents(t *testing.T, fr *record.FakeRecorder, testName string, wantEvents []string) error { + t.Helper() + return eventFromChannel(fr.Events, testName, wantEvents) +} + +func checkCloudEvents(t *testing.T, fce *cloudevent.FakeClient, testName string, wantEvents []string) error { + t.Helper() + return eventFromChannel(fce.Events, testName, wantEvents) +} + +func eventFromChannel(c chan string, testName string, wantEvents []string) error { + // We get events from a channel, so the timeout is here to avoid waiting // on the channel forever if fewer than expected events are received. // We only hit the timeout in case of failure of the test, so the actual value // of the timeout is not so relevant, it's only used when tests are going to fail. @@ -326,18 +337,23 @@ func checkEvents(fr *record.FakeRecorder, testName string, wantEvents []string) // we exit the loop. If we never receive enough events, the timeout takes us // out of the loop. select { - case event := <-fr.Events: + case event := <-c: foundEvents = append(foundEvents, event) if ii > len(wantEvents)-1 { - return fmt.Errorf("Received event \"%s\" for %s but not more expected", event, testName) + return fmt.Errorf("received event \"%s\" for %s but not more expected", event, testName) } wantEvent := wantEvents[ii] - if !(strings.HasPrefix(event, wantEvent)) { - return fmt.Errorf("Expected event \"%s\" but got \"%s\" instead for %s", wantEvent, event, testName) + matching, err := regexp.MatchString(wantEvent, event) + if err == nil { + if !matching { + return fmt.Errorf("expected event \"%s\" but got \"%s\" instead for %s", wantEvent, event, testName) + } + } else { + return fmt.Errorf("something went wrong matching the event: %s", err) } case <-timer.C: if len(foundEvents) > len(wantEvents) { - return fmt.Errorf("Received %d events for %s but %d expected. Found events: %#v", len(foundEvents), testName, len(wantEvents), foundEvents) + return fmt.Errorf("received %d events for %s but %d expected. Found events: %#v", len(foundEvents), testName, len(wantEvents), foundEvents) } } } @@ -690,6 +706,89 @@ func TestReconcile_FeatureFlags(t *testing.T) { }) } } + +// TestReconcile_CloudEvents runs reconcile with a cloud event sink configured +// to ensure that events are sent in different cases +func TestReconcile_CloudEvents(t *testing.T) { + simpleTask := tb.Task("test-task", + tb.TaskSpec(tb.Step("foo", + tb.StepName("simple-step"), tb.StepCommand("/mycmd"), tb.StepEnvVar("foo", "bar"), + )), + tb.TaskNamespace("foo"), + ) + taskRun := tb.TaskRun("test-taskrun-not-started", + tb.TaskRunSelfLink("/test/taskrun1"), + tb.TaskRunNamespace("foo"), + tb.TaskRunSpec(tb.TaskRunTaskRef(simpleTask.Name)), + ) + d := test.Data{ + TaskRuns: []*v1beta1.TaskRun{taskRun}, + Tasks: []*v1beta1.Task{simpleTask}, + } + + names.TestingSeed() + d.ConfigMaps = []*corev1.ConfigMap{ + { + ObjectMeta: metav1.ObjectMeta{Name: config.GetDefaultsConfigName(), Namespace: system.GetNamespace()}, + Data: map[string]string{ + "default-cloud-events-sink": "http://synk:8080", + }, + }, + } + + testAssets, cancel := getTaskRunController(t, d) + defer cancel() + c := testAssets.Controller + clients := testAssets.Clients + saName := "default" + if _, err := clients.Kube.CoreV1().ServiceAccounts(taskRun.Namespace).Create(&corev1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Name: saName, + Namespace: taskRun.Namespace, + }, + }); err != nil { + t.Fatal(err) + } + + if err := c.Reconciler.Reconcile(context.Background(), getRunName(taskRun)); err != nil { + t.Errorf("expected no error. Got error %v", err) + } + if len(clients.Kube.Actions()) == 0 { + t.Errorf("Expected actions to be logged in the kubeclient, got none") + } + + tr, err := clients.Pipeline.TektonV1beta1().TaskRuns(taskRun.Namespace).Get(taskRun.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("getting updated taskrun: %v", err) + } + condition := tr.Status.GetCondition(apis.ConditionSucceeded) + if condition == nil || condition.Status != corev1.ConditionUnknown { + t.Errorf("Expected fresh TaskRun to have in progress status, but had %v", condition) + } + if condition != nil && condition.Reason != v1beta1.TaskRunReasonRunning.String() { + t.Errorf("Expected reason %q but was %s", v1beta1.TaskRunReasonRunning.String(), condition.Reason) + } + + wantEvents := []string{ + "Normal Start", + "Normal Running", + } + err = checkEvents(t, testAssets.Recorder, "reconcile-cloud-events", wantEvents) + if !(err == nil) { + t.Errorf(err.Error()) + } + + wantCloudEvents := []string{ + `(?s)dev.tekton.event.taskrun.started.v1.*test-taskrun-not-started`, + `(?s)dev.tekton.event.taskrun.running.v1.*test-taskrun-not-started`, + } + ceClient := clients.CloudEvents.(cloudevent.FakeClient) + err = checkCloudEvents(t, &ceClient, "reconcile-cloud-events", wantCloudEvents) + if !(err == nil) { + t.Errorf(err.Error()) + } +} + func TestReconcile(t *testing.T) { taskRunSuccess := tb.TaskRun("test-taskrun-run-success", tb.TaskRunNamespace("foo"), tb.TaskRunSpec( tb.TaskRunTaskRef(simpleTask.Name, tb.TaskRefAPIVersion("a1")), @@ -1306,7 +1405,7 @@ func TestReconcile(t *testing.T) { t.Fatalf("Expected actions to be logged in the kubeclient, got none") } - err = checkEvents(testAssets.Recorder, tc.name, tc.wantEvents) + err = checkEvents(t, testAssets.Recorder, tc.name, tc.wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -1524,7 +1623,7 @@ func TestReconcileInvalidTaskRuns(t *testing.T) { t.Errorf("expected 3 actions (first: list namespaces) created by the reconciler, got %d. Actions: %#v", len(actions), actions) } - err := checkEvents(testAssets.Recorder, tc.name, tc.wantEvents) + err := checkEvents(t, testAssets.Recorder, tc.name, tc.wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -1667,7 +1766,7 @@ func TestReconcilePodUpdateStatus(t *testing.T) { "Normal Running Not all Steps", "Normal Succeeded", } - err = checkEvents(testAssets.Recorder, "test-reconcile-pod-updateStatus", wantEvents) + err = checkEvents(t, testAssets.Recorder, "test-reconcile-pod-updateStatus", wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -1749,7 +1848,7 @@ func TestReconcileOnCancelledTaskRun(t *testing.T) { "Normal Started", "Warning Failed TaskRun \"test-taskrun-run-cancelled\" was cancelled", } - err = checkEvents(testAssets.Recorder, "test-reconcile-on-cancelled-taskrun", wantEvents) + err = checkEvents(t, testAssets.Recorder, "test-reconcile-on-cancelled-taskrun", wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -1848,7 +1947,7 @@ func TestReconcileTimeouts(t *testing.T) { if d := cmp.Diff(tc.expectedStatus, condition, ignoreLastTransitionTime); d != "" { t.Fatalf("Did not get expected condition %s", diff.PrintWantGot(d)) } - err = checkEvents(testAssets.Recorder, tc.taskRun.Name, tc.wantEvents) + err = checkEvents(t, testAssets.Recorder, tc.taskRun.Name, tc.wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -2629,7 +2728,7 @@ func TestReconcileTaskResourceResolutionAndValidation(t *testing.T) { } } - err = checkEvents(testAssets.Recorder, tt.desc, tt.wantEvents) + err = checkEvents(t, testAssets.Recorder, tt.desc, tt.wantEvents) if !(err == nil) { t.Errorf(err.Error()) }