From 47cda9029b6afd94d0ac54a8b321e334c168ad82 Mon Sep 17 00:00:00 2001 From: Sunny Date: Thu, 1 Jun 2023 00:24:57 +0000 Subject: [PATCH] Refactor event handler - Break down the EventServer.handleEvent() implementation into multiple smaller functions which are extensively tested on their own. - New implementation of filter Alerts for Event - New implementation of Event matches Alert - Remove any readiness check on Alert or Provider. - Convert the event handler test from controllers/ dir to work with just EventServer without any reconciler, keeping all the test cases and slightly modified test set up code. Signed-off-by: Sunny --- internal/server/event_handlers.go | 520 +++++++----- internal/server/event_handlers_test.go | 890 +++++++++++++++++++- internal/server/event_server_test.go | 402 ++++++++- internal/server/testdata/gitrepo2.yaml | 13 + internal/server/testdata/kustomization.yaml | 10 + internal/server/testdata/repo.yaml | 13 + 6 files changed, 1589 insertions(+), 259 deletions(-) create mode 100644 internal/server/testdata/gitrepo2.yaml create mode 100644 internal/server/testdata/kustomization.yaml create mode 100644 internal/server/testdata/repo.yaml diff --git a/internal/server/event_handlers.go b/internal/server/event_handlers.go index c49081221..816a6da90 100644 --- a/internal/server/event_handlers.go +++ b/internal/server/event_handlers.go @@ -22,21 +22,22 @@ import ( "errors" "fmt" "net/http" + "net/url" "regexp" "time" - "github.com/fluxcd/pkg/runtime/conditions" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/yaml" eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" "github.com/fluxcd/pkg/masktoken" apiv1 "github.com/fluxcd/notification-controller/api/v1" - apiv1beta2 "github.com/fluxcd/notification-controller/api/v1beta2" + apiv1beta3 "github.com/fluxcd/notification-controller/api/v1beta3" "github.com/fluxcd/notification-controller/internal/notifier" ) @@ -47,65 +48,9 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request) ctx, cancel := context.WithTimeout(r.Context(), 15*time.Second) defer cancel() - var allAlerts apiv1beta2.AlertList - err := s.kubeClient.List(ctx, &allAlerts) + alerts, err := s.getAllAlertsForEvent(ctx, event) if err != nil { - s.logger.Error(err, "listing alerts failed") - w.WriteHeader(http.StatusBadRequest) - return - } - - // find matching alerts - alerts := make([]apiv1beta2.Alert, 0) - each_alert: - for _, alert := range allAlerts.Items { - // skip suspended and not ready alerts - isReady := conditions.IsReady(&alert) - if alert.Spec.Suspend || !isReady { - continue each_alert - } - - // skip alert if the message does not match any regex from the inclusion list - if len(alert.Spec.InclusionList) > 0 { - var include bool - for _, inclusionRegex := range alert.Spec.InclusionList { - if r, err := regexp.Compile(inclusionRegex); err == nil { - if r.Match([]byte(event.Message)) { - include = true - break - } - } else { - s.logger.Error(err, fmt.Sprintf("failed to compile inclusion regex: %s", inclusionRegex)) - } - } - if !include { - continue each_alert - } - } - - // skip alert if the message matches a regex from the exclusion list - if len(alert.Spec.ExclusionList) > 0 { - for _, exclusionRegex := range alert.Spec.ExclusionList { - if r, err := regexp.Compile(exclusionRegex); err == nil { - if r.Match([]byte(event.Message)) { - continue each_alert - } - } else { - s.logger.Error(err, fmt.Sprintf("failed to compile exclusion regex: %s", exclusionRegex)) - } - } - } - - // filter alerts by object and severity - for _, source := range alert.Spec.EventSources { - if source.Namespace == "" { - source.Namespace = alert.Namespace - } - - if s.eventMatchesAlert(ctx, event, source, alert.Spec.EventSeverity) { - alerts = append(alerts, alert) - } - } + s.logger.Error(err, "failed to get alerts for the event: %w", err) } if len(alerts) == 0 { @@ -122,199 +67,322 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request) "name", event.InvolvedObject.Name, "namespace", event.InvolvedObject.Namespace) - // dispatch notifications + // Dispatch notifications. for _, alert := range alerts { - // verify if event comes from a different namespace - if s.noCrossNamespaceRefs && event.InvolvedObject.Namespace != alert.Namespace { - accessDenied := fmt.Errorf( - "alert '%s/%s' can't process event from '%s/%s/%s', cross-namespace references have been blocked", - alert.Namespace, alert.Name, event.InvolvedObject.Kind, event.InvolvedObject.Namespace, event.InvolvedObject.Name) - s.logger.Error(accessDenied, "Discarding event, access denied to cross-namespace sources") - continue + if err := s.dispatchNotification(ctx, event, alert); err != nil { + s.logger.Error(err, "failed to dispatch notification to provider", + "reconciler kind", apiv1beta3.ProviderKind, + "name", alert.Spec.ProviderRef.Name, + "namespace", alert.Namespace) } + } + + w.WriteHeader(http.StatusAccepted) + } +} - var provider apiv1beta2.Provider - providerName := types.NamespacedName{Namespace: alert.Namespace, Name: alert.Spec.ProviderRef.Name} +func (s *EventServer) getAllAlertsForEvent(ctx context.Context, event *eventv1.Event) ([]apiv1beta3.Alert, error) { + var allAlerts apiv1beta3.AlertList + err := s.kubeClient.List(ctx, &allAlerts) + if err != nil { + return nil, fmt.Errorf("failed listing alerts: %w", err) + } - err = s.kubeClient.Get(ctx, providerName, &provider) - if err != nil { - s.logger.Error(err, "failed to read provider", - "reconciler kind", apiv1beta2.ProviderKind, - "name", providerName.Name, - "namespace", providerName.Namespace) - continue + return s.filterAlertsForEvent(ctx, allAlerts.Items, event), nil +} + +// filterAlertsForEvent filters a given set of alerts against a given event, +// checking if the event matches with any of the alert event sources and is +// allowed by the exclusion list. +func (s *EventServer) filterAlertsForEvent(ctx context.Context, alerts []apiv1beta3.Alert, event *eventv1.Event) []apiv1beta3.Alert { + results := make([]apiv1beta3.Alert, 0) + for _, alert := range alerts { + // Skip suspended alert. + if alert.Spec.Suspend { + continue + } + // Check if the event matches any of the alert sources. + if !s.eventMatchesAlertSources(ctx, event, alert) { + continue + } + // Check if the event message is allowed for the alert based on the + // inclusion list. + if !s.messageIsIncluded(event.Message, alert.Spec.InclusionList) { + continue + } + // Check if the event message is allowed for the alert based on the + // exclusion list. + if s.messageIsExcluded(event.Message, alert.Spec.ExclusionList) { + continue + } + results = append(results, alert) + } + return results +} + +// eventMatchesAlertSources returns if a given event matches with any of the +// alert sources. +func (s *EventServer) eventMatchesAlertSources(ctx context.Context, event *eventv1.Event, alert apiv1beta3.Alert) bool { + for _, source := range alert.Spec.EventSources { + if source.Namespace == "" { + source.Namespace = alert.Namespace + } + if s.eventMatchesAlert(ctx, event, source, alert.Spec.EventSeverity) { + return true + } + } + return false +} + +// messageIsIncluded returns if the given message matches with the inclusion +// rules. +func (s *EventServer) messageIsIncluded(msg string, inclusionList []string) bool { + if len(inclusionList) == 0 { + return true + } + + for _, exp := range inclusionList { + if r, err := regexp.Compile(exp); err == nil { + if r.Match([]byte(msg)) { + return true } + } else { + // TODO: Record event on the respective Alert object. + s.logger.Error(err, fmt.Sprintf("failed to compile inclusion regex: %s", exp)) + } + } + return false +} - if provider.Spec.Suspend { - continue +// messageIsExcluded returns if the given message matches with the exclusion +// rules. +func (s *EventServer) messageIsExcluded(msg string, exclusionList []string) bool { + if len(exclusionList) == 0 { + return false + } + + for _, exp := range exclusionList { + if r, err := regexp.Compile(exp); err == nil { + if r.Match([]byte(msg)) { + return true } + } else { + // TODO: Record event on the respective Alert object. + s.logger.Error(err, fmt.Sprintf("failed to compile exclusion regex: %s", exp)) + } + } + return false +} - webhook := provider.Spec.Address - username := provider.Spec.Username - proxy := provider.Spec.Proxy - token := "" - password := "" - headers := make(map[string]string) - if provider.Spec.SecretRef != nil { - var secret corev1.Secret - secretName := types.NamespacedName{Namespace: alert.Namespace, Name: provider.Spec.SecretRef.Name} - - err = s.kubeClient.Get(ctx, secretName, &secret) - if err != nil { - s.logger.Error(err, "failed to read secret", - "reconciler kind", apiv1beta2.ProviderKind, - "name", providerName.Name, - "namespace", providerName.Namespace) - continue - } - - if address, ok := secret.Data["address"]; ok { - webhook = string(address) - } - - if p, ok := secret.Data["password"]; ok { - password = string(p) - } - - if p, ok := secret.Data["proxy"]; ok { - proxy = string(p) - } - - if t, ok := secret.Data["token"]; ok { - token = string(t) - } - - if u, ok := secret.Data["username"]; ok { - username = string(u) - } - - if h, ok := secret.Data["headers"]; ok { - err := yaml.Unmarshal(h, &headers) - if err != nil { - s.logger.Error(err, "failed to read headers from secret", - "reconciler kind", apiv1beta2.ProviderKind, - "name", providerName.Name, - "namespace", providerName.Namespace) - continue - } - } +// dispatchNotification constructs and sends notification from the given event +// and alert data. +func (s *EventServer) dispatchNotification(ctx context.Context, event *eventv1.Event, alert apiv1beta3.Alert) error { + sender, notification, token, timeout, err := s.getNotificationParams(ctx, event, alert) + if err != nil { + return err + } + // Skip when either sender or notification couldn't be created. + if sender == nil || notification == nil { + return nil + } + + go func(n notifier.Interface, e eventv1.Event) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + if err := n.Post(ctx, e); err != nil { + maskedErrStr, maskErr := masktoken.MaskTokenFromString(err.Error(), token) + if maskErr != nil { + err = maskErr + } else { + err = errors.New(maskedErrStr) } + // TODO: Record failed event on the associated Alert object. + s.logger.Error(err, "failed to send notification", + "reconciler kind", event.InvolvedObject.Kind, + "name", event.InvolvedObject.Name, + "namespace", event.InvolvedObject.Namespace) + } + }(sender, *notification) + + return nil +} + +// getNotificationParams constructs the notification parameters from the given +// event and alert, and returns a notifier, event, token and timeout for sending +// the notification. The returned event is a mutated form of the input event +// based on the alert configuration. +func (s *EventServer) getNotificationParams(ctx context.Context, event *eventv1.Event, alert apiv1beta3.Alert) (notifier.Interface, *eventv1.Event, string, time.Duration, error) { + // Check if event comes from a different namespace. + if s.noCrossNamespaceRefs && event.InvolvedObject.Namespace != alert.Namespace { + accessDenied := fmt.Errorf( + "alert '%s/%s' can't process event from '%s/%s/%s', cross-namespace references have been blocked", + alert.Namespace, alert.Name, event.InvolvedObject.Kind, event.InvolvedObject.Namespace, event.InvolvedObject.Name) + return nil, nil, "", 0, fmt.Errorf("discarding event, access denied to cross-namespace sources: %w", accessDenied) + } + + var provider apiv1beta3.Provider + providerName := types.NamespacedName{Namespace: alert.Namespace, Name: alert.Spec.ProviderRef.Name} + + err := s.kubeClient.Get(ctx, providerName, &provider) + if err != nil { + return nil, nil, "", 0, fmt.Errorf("failed to read provider: %w", err) + } + + // Skip if the provider is suspended. + if provider.Spec.Suspend { + return nil, nil, "", 0, nil + } + + sender, token, err := createNotifier(ctx, s.kubeClient, provider) + if err != nil { + return nil, nil, "", 0, fmt.Errorf("failed to initialize notifier: %w", err) + } + + notification := *event.DeepCopy() + s.enhanceEventWithAlertMetadata(¬ification, alert) - var certPool *x509.CertPool - if provider.Spec.CertSecretRef != nil { - var secret corev1.Secret - secretName := types.NamespacedName{Namespace: alert.Namespace, Name: provider.Spec.CertSecretRef.Name} - - err = s.kubeClient.Get(ctx, secretName, &secret) - if err != nil { - s.logger.Error(err, "failed to read secret", - "reconciler kind", apiv1beta2.ProviderKind, - "name", providerName.Name, - "namespace", providerName.Namespace) - continue - } - - caFile, ok := secret.Data["caFile"] - if !ok { - s.logger.Error(err, "failed to read secret key caFile", - "reconciler kind", apiv1beta2.ProviderKind, - "name", providerName.Name, - "namespace", providerName.Namespace) - continue - } - - certPool = x509.NewCertPool() - ok = certPool.AppendCertsFromPEM(caFile) - if !ok { - s.logger.Error(err, "could not append to cert pool", - "reconciler kind", apiv1beta2.ProviderKind, - "name", providerName.Name, - "namespace", providerName.Namespace) - continue - } + return sender, ¬ification, token, provider.GetTimeout(), nil +} + +// createNotifier returns a notifier.Interface for the given Provider. +func createNotifier(ctx context.Context, kubeClient client.Client, provider apiv1beta3.Provider) (notifier.Interface, string, error) { + webhook := provider.Spec.Address + username := provider.Spec.Username + proxy := provider.Spec.Proxy + token := "" + password := "" + headers := make(map[string]string) + if provider.Spec.SecretRef != nil { + var secret corev1.Secret + secretName := types.NamespacedName{Namespace: provider.Namespace, Name: provider.Spec.SecretRef.Name} + + err := kubeClient.Get(ctx, secretName, &secret) + if err != nil { + return nil, "", fmt.Errorf("failed to read secret: %w", err) + } + + if address, ok := secret.Data["address"]; ok { + webhook = string(address) + _, err := url.Parse(webhook) + if err != nil { + return nil, "", fmt.Errorf("invalid address in secret '%s': %w", webhook, err) } + } - if webhook == "" { - s.logger.Error(nil, "provider has no address", - "reconciler kind", apiv1beta2.ProviderKind, - "name", providerName.Name, - "namespace", providerName.Namespace) - continue + if p, ok := secret.Data["password"]; ok { + password = string(p) + } + + if p, ok := secret.Data["proxy"]; ok { + proxy = string(p) + _, err := url.Parse(proxy) + if err != nil { + return nil, "", fmt.Errorf("invalid proxy in secret '%s': %w", proxy, err) } + } - factory := notifier.NewFactory(webhook, proxy, username, provider.Spec.Channel, token, headers, certPool, password, string(provider.UID)) - sender, err := factory.Notifier(provider.Spec.Type) + if t, ok := secret.Data["token"]; ok { + token = string(t) + } + + if u, ok := secret.Data["username"]; ok { + username = string(u) + } + + if h, ok := secret.Data["headers"]; ok { + err := yaml.Unmarshal(h, &headers) if err != nil { - s.logger.Error(err, "failed to initialize provider", - "reconciler kind", apiv1beta2.ProviderKind, - "name", providerName.Name, - "namespace", providerName.Namespace) - continue + return nil, "", fmt.Errorf("failed to read headers from secret: %w", err) } + } + } + + var certPool *x509.CertPool + if provider.Spec.CertSecretRef != nil { + var secret corev1.Secret + secretName := types.NamespacedName{Namespace: provider.Namespace, Name: provider.Spec.CertSecretRef.Name} - notification := *event.DeepCopy() - s.enhanceEventWithAlertMetadata(¬ification, alert) - - go func(n notifier.Interface, e eventv1.Event) { - ctx, cancel := context.WithTimeout(context.Background(), provider.GetTimeout()) - defer cancel() - if err := n.Post(ctx, e); err != nil { - maskedErrStr, maskErr := masktoken.MaskTokenFromString(err.Error(), token) - if maskErr != nil { - err = maskErr - } else { - err = errors.New(maskedErrStr) - } - s.logger.Error(err, "failed to send notification", - "reconciler kind", event.InvolvedObject.Kind, - "name", event.InvolvedObject.Name, - "namespace", event.InvolvedObject.Namespace) - } - }(sender, notification) + err := kubeClient.Get(ctx, secretName, &secret) + if err != nil { + return nil, "", fmt.Errorf("failed to read cert secret: %w", err) } - w.WriteHeader(http.StatusAccepted) + caFile, ok := secret.Data["caFile"] + if !ok { + return nil, "", fmt.Errorf("failed to read secret key caFile: %w", err) + } + + certPool = x509.NewCertPool() + ok = certPool.AppendCertsFromPEM(caFile) + if !ok { + return nil, "", fmt.Errorf("could not append to cert pool: %w", err) + } + } + + if webhook == "" { + return nil, "", fmt.Errorf("provider has no address") } + + factory := notifier.NewFactory(webhook, proxy, username, provider.Spec.Channel, token, headers, certPool, password, string(provider.UID)) + sender, err := factory.Notifier(provider.Spec.Type) + if err != nil { + return nil, "", fmt.Errorf("failed to initialize notifier: %w", err) + } + return sender, token, nil } +// eventMatchesAlert returns if a given event matches with the given alert +// source configuration and severity. func (s *EventServer) eventMatchesAlert(ctx context.Context, event *eventv1.Event, source apiv1.CrossNamespaceObjectReference, severity string) bool { - if event.InvolvedObject.Namespace == source.Namespace && event.InvolvedObject.Kind == source.Kind { - if event.Severity == severity || severity == eventv1.EventSeverityInfo { - labelMatch := true - if source.Name == "*" && source.MatchLabels != nil { - var obj metav1.PartialObjectMetadata - obj.SetGroupVersionKind(event.InvolvedObject.GroupVersionKind()) - obj.SetName(event.InvolvedObject.Name) - obj.SetNamespace(event.InvolvedObject.Namespace) - - if err := s.kubeClient.Get(ctx, types.NamespacedName{ - Namespace: event.InvolvedObject.Namespace, - Name: event.InvolvedObject.Name, - }, &obj); err != nil { - s.logger.Error(err, "error getting object", "kind", event.InvolvedObject.Kind, - "name", event.InvolvedObject.Name, "apiVersion", event.InvolvedObject.APIVersion) - } - - sel, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ - MatchLabels: source.MatchLabels, - }) - if err != nil { - s.logger.Error(err, fmt.Sprintf("error using matchLabels from event source '%s'", source.Name)) - } - - labelMatch = sel.Matches(labels.Set(obj.GetLabels())) - } + // No match if the event and source don't have the same namespace and kind. + if event.InvolvedObject.Namespace != source.Namespace || + event.InvolvedObject.Kind != source.Kind { + return false + } - if source.Name == "*" && labelMatch || event.InvolvedObject.Name == source.Name { - return true - } - } + // No match if the alert severity doesn't match the event severity and + // the alert severity isn't info. + if event.Severity != severity && severity != eventv1.EventSeverityInfo { + return false } - return false + // No match if the source name isn't wildcard, and source and event names + // don't match. + if source.Name != "*" && source.Name != event.InvolvedObject.Name { + return false + } + + // Match if no match labels specified. + if source.MatchLabels == nil { + return true + } + + // Perform label selector matching. + var obj metav1.PartialObjectMetadata + obj.SetGroupVersionKind(event.InvolvedObject.GroupVersionKind()) + obj.SetName(event.InvolvedObject.Name) + obj.SetNamespace(event.InvolvedObject.Namespace) + + if err := s.kubeClient.Get(ctx, types.NamespacedName{ + Namespace: event.InvolvedObject.Namespace, + Name: event.InvolvedObject.Name, + }, &obj); err != nil { + s.logger.Error(err, "error getting object", "kind", event.InvolvedObject.Kind, + "name", event.InvolvedObject.Name, "apiVersion", event.InvolvedObject.APIVersion) + } + + sel, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ + MatchLabels: source.MatchLabels, + }) + if err != nil { + s.logger.Error(err, fmt.Sprintf("error using matchLabels from event source '%s'", source.Name)) + } + + return sel.Matches(labels.Set(obj.GetLabels())) } -func (s *EventServer) enhanceEventWithAlertMetadata(event *eventv1.Event, alert apiv1beta2.Alert) { +// enhanceEventWithAlertMetadata enhances the event with Alert metadata. +func (s *EventServer) enhanceEventWithAlertMetadata(event *eventv1.Event, alert apiv1beta3.Alert) { meta := event.Metadata if meta == nil { meta = make(map[string]string) @@ -325,7 +393,7 @@ func (s *EventServer) enhanceEventWithAlertMetadata(event *eventv1.Event, alert meta[key] = value } else { s.logger.Info("metadata key found in the existing set of metadata", - "reconciler kind", apiv1beta2.AlertKind, + "reconciler kind", apiv1beta3.AlertKind, "name", alert.Name, "namespace", alert.Namespace, "key", key) diff --git a/internal/server/event_handlers_test.go b/internal/server/event_handlers_test.go index 668802e0a..209a97623 100644 --- a/internal/server/event_handlers_test.go +++ b/internal/server/event_handlers_test.go @@ -17,32 +17,896 @@ limitations under the License. package server import ( + "context" + "net/http" + "net/http/httptest" + "strconv" "testing" - "github.com/go-logr/logr" . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" + log "sigs.k8s.io/controller-runtime/pkg/log" - apiv1beta2 "github.com/fluxcd/notification-controller/api/v1beta2" eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" + "github.com/fluxcd/pkg/apis/meta" + + apiv1 "github.com/fluxcd/notification-controller/api/v1" + apiv1beta3 "github.com/fluxcd/notification-controller/api/v1beta3" ) +func TestFilterAlertsForEvent(t *testing.T) { + testNamespace := "foo-ns" + + testProvider := &apiv1beta3.Provider{} + testProvider.Name = "provider-foo" + testProvider.Namespace = testNamespace + testProvider.Spec = apiv1beta3.ProviderSpec{ + Type: "generic", + Address: "https://example.com", + } + + // Event involved object. + involvedObj := corev1.ObjectReference{ + APIVersion: "kustomize.toolkit.fluxcd.io/v1", + Kind: "Kustomization", + Name: "foo", + Namespace: testNamespace, + } + testEvent := &eventv1.Event{ + InvolvedObject: involvedObj, + Message: "some excluded message", + } + + tests := []struct { + name string + alertSpecs []apiv1beta3.AlertSpec + resultAlertCount int + }{ + { + name: "all match", + alertSpecs: []apiv1beta3.AlertSpec{ + { + EventSources: []apiv1.CrossNamespaceObjectReference{ + { + Kind: "Kustomization", + Name: "*", + }, + }, + }, + { + EventSources: []apiv1.CrossNamespaceObjectReference{ + { + Kind: "Kustomization", + Name: "foo", + }, + }, + }, + }, + resultAlertCount: 2, + }, + { + name: "some suspended alerts", + alertSpecs: []apiv1beta3.AlertSpec{ + { + EventSources: []apiv1.CrossNamespaceObjectReference{ + { + Kind: "Kustomization", + Name: "*", + }, + }, + Suspend: true, + }, + { + EventSources: []apiv1.CrossNamespaceObjectReference{ + { + Kind: "Kustomization", + Name: "foo", + }, + }, + }, + }, + resultAlertCount: 1, + }, + { + name: "alerts with inclusion list unmatch", + alertSpecs: []apiv1beta3.AlertSpec{ + { + EventSources: []apiv1.CrossNamespaceObjectReference{ + { + Kind: "Kustomization", + Name: "*", + }, + }, + InclusionList: []string{"some unmatch include"}, + }, + }, + resultAlertCount: 0, + }, + { + name: "alerts with inclusion list match", + alertSpecs: []apiv1beta3.AlertSpec{ + { + EventSources: []apiv1.CrossNamespaceObjectReference{ + { + Kind: "Kustomization", + Name: "*", + }, + }, + InclusionList: []string{"some unmatch include"}, + }, + { + EventSources: []apiv1.CrossNamespaceObjectReference{ + { + Kind: "Kustomization", + Name: "*", + }, + }, + InclusionList: []string{"some"}, + }, + }, + resultAlertCount: 1, + }, + { + name: "alerts with invalid inclusion rule", + alertSpecs: []apiv1beta3.AlertSpec{ + { + EventSources: []apiv1.CrossNamespaceObjectReference{ + { + Kind: "Kustomization", + Name: "*", + }, + }, + InclusionList: []string{"["}, + }, + }, + resultAlertCount: 0, + }, + { + name: "alerts with exclusion list match", + alertSpecs: []apiv1beta3.AlertSpec{ + { + EventSources: []apiv1.CrossNamespaceObjectReference{ + { + Kind: "Kustomization", + Name: "*", + }, + }, + }, + { + EventSources: []apiv1.CrossNamespaceObjectReference{ + { + Kind: "Kustomization", + Name: "foo", + }, + }, + ExclusionList: []string{"excluded message"}, + }, + }, + resultAlertCount: 1, + }, + { + name: "alerts with invalid exclusion rule", + alertSpecs: []apiv1beta3.AlertSpec{ + { + EventSources: []apiv1.CrossNamespaceObjectReference{ + { + Kind: "Kustomization", + Name: "*", + }, + }, + }, + { + EventSources: []apiv1.CrossNamespaceObjectReference{ + { + Kind: "Kustomization", + Name: "foo", + }, + }, + ExclusionList: []string{"["}, + }, + }, + resultAlertCount: 2, + }, + { + name: "alerts with inclusion and exclusion list match", + alertSpecs: []apiv1beta3.AlertSpec{ + { + EventSources: []apiv1.CrossNamespaceObjectReference{ + { + Kind: "Kustomization", + Name: "*", + }, + }, + }, + { + EventSources: []apiv1.CrossNamespaceObjectReference{ + { + Kind: "Kustomization", + Name: "foo", + }, + }, + InclusionList: []string{"excluded message"}, + ExclusionList: []string{"excluded message"}, + }, + }, + resultAlertCount: 1, + }, + { + name: "event source NS is not overwritten by alert NS", + alertSpecs: []apiv1beta3.AlertSpec{ + { + EventSources: []apiv1.CrossNamespaceObjectReference{ + { + Kind: "Kustomization", + Name: "*", + Namespace: "foo-bar", + }, + }, + }, + }, + resultAlertCount: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + alerts := []apiv1beta3.Alert{} + for i, alertSpec := range tt.alertSpecs { + // Add the default provider ref for this test. + alertSpec.ProviderRef = meta.LocalObjectReference{Name: testProvider.Name} + // Create new Alert with the spec. + alert := apiv1beta3.Alert{} + alert.Name = "test-alert-" + strconv.Itoa(i) + alert.Namespace = testNamespace + alert.Spec = alertSpec + alerts = append(alerts, alert) + } + + // Create fake objects and event server. + scheme := runtime.NewScheme() + g.Expect(apiv1beta3.AddToScheme(scheme)).ToNot(HaveOccurred()) + builder := fakeclient.NewClientBuilder().WithScheme(scheme) + builder.WithObjects(testProvider) + eventServer := EventServer{ + kubeClient: builder.Build(), + logger: log.Log, + } + + result := eventServer.filterAlertsForEvent(context.TODO(), alerts, testEvent) + g.Expect(len(result)).To(Equal(tt.resultAlertCount)) + }) + } +} + +func TestDispatchNotification(t *testing.T) { + testNamespace := "foo-ns" + + // Run test notification receiver server. + rcvServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + })) + defer rcvServer.Close() + + testProvider := &apiv1beta3.Provider{} + testProvider.Name = "provider-foo" + testProvider.Namespace = testNamespace + testProvider.Spec = apiv1beta3.ProviderSpec{ + Type: "generic", + Address: rcvServer.URL, + } + + testAlert := &apiv1beta3.Alert{} + testAlert.Name = "alert-foo" + testAlert.Namespace = testNamespace + testAlert.Spec = apiv1beta3.AlertSpec{ + ProviderRef: meta.LocalObjectReference{Name: testProvider.Name}, + } + + // Event involved object. + involvedObj := corev1.ObjectReference{ + APIVersion: "kustomize.toolkit.fluxcd.io/v1", + Kind: "Kustomization", + Name: "foo", + Namespace: testNamespace, + } + testEvent := &eventv1.Event{InvolvedObject: involvedObj} + + tests := []struct { + name string + providerNamespace string + providerSuspended bool + wantErr bool + }{ + { + name: "dispatch notification successfully", + }, + { + name: "provider in different namespace", + providerNamespace: "bar-ns", + wantErr: true, + }, + { + name: "provider suspended, skip", + providerSuspended: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + alert := testAlert.DeepCopy() + provider := testProvider.DeepCopy() + + // Override the alert and provider with test parameters. + if tt.providerNamespace != "" { + provider.Namespace = tt.providerNamespace + } + provider.Spec.Suspend = tt.providerSuspended + + // Create fake objects and event server. + scheme := runtime.NewScheme() + g.Expect(apiv1beta3.AddToScheme(scheme)).ToNot(HaveOccurred()) + g.Expect(corev1.AddToScheme(scheme)).ToNot(HaveOccurred()) + builder := fakeclient.NewClientBuilder().WithScheme(scheme) + builder.WithObjects(provider) + eventServer := EventServer{ + kubeClient: builder.Build(), + logger: log.Log, + } + + err := eventServer.dispatchNotification(context.TODO(), testEvent, *alert) + g.Expect(err != nil).To(Equal(tt.wantErr)) + }) + } +} + +func TestGetNotificationParams(t *testing.T) { + testNamespace := "foo-ns" + + // Provider secret. + testSecret := &corev1.Secret{} + testSecret.Name = "secret-foo" + testSecret.Namespace = testNamespace + + testProvider := &apiv1beta3.Provider{} + testProvider.Name = "provider-foo" + testProvider.Namespace = testNamespace + testProvider.Spec = apiv1beta3.ProviderSpec{ + Type: "generic", + Address: "https://example.com", + SecretRef: &meta.LocalObjectReference{Name: testSecret.Name}, + } + + testAlert := &apiv1beta3.Alert{} + testAlert.Name = "alert-foo" + testAlert.Namespace = testNamespace + testAlert.Spec = apiv1beta3.AlertSpec{ + ProviderRef: meta.LocalObjectReference{Name: testProvider.Name}, + } + + // Event involved object. + involvedObj := corev1.ObjectReference{ + APIVersion: "kustomize.toolkit.fluxcd.io/v1", + Kind: "Kustomization", + Name: "foo", + Namespace: testNamespace, + } + testEvent := &eventv1.Event{InvolvedObject: involvedObj} + + tests := []struct { + name string + alertNamespace string + alertSummary string + alertEventMetadata map[string]string + providerNamespace string + providerSuspended bool + secretNamespace string + noCrossNSRefs bool + eventMetadata map[string]string + wantErr bool + }{ + { + name: "event src and alert in diff NS", + alertNamespace: "bar-ns", + providerNamespace: "bar-ns", + secretNamespace: "bar-ns", + }, + { + name: "event src and alert in diff NS with no cross NS refs", + alertNamespace: "bar-ns", + providerNamespace: "bar-ns", + noCrossNSRefs: true, + wantErr: true, + }, + { + name: "provider not found", + providerNamespace: "bar-ns", + wantErr: true, + }, + { + name: "provider secret in diff NS but provider suspended", + providerSuspended: true, + secretNamespace: "bar-ns", + }, + { + name: "provider secret in different NS, fail to create notifier", + secretNamespace: "bar-ns", + wantErr: true, + }, + { + name: "alert with summary, no event metadata", + alertSummary: "some summary text", + }, + { + name: "alert with summary, with event metadata", + alertSummary: "some summary text", + eventMetadata: map[string]string{ + "foo": "bar", + "summary": "baz", + }, + }, + { + name: "alert with event metadata", + alertEventMetadata: map[string]string{ + "aaa": "bbb", + "ccc": "ddd", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + alert := testAlert.DeepCopy() + provider := testProvider.DeepCopy() + secret := testSecret.DeepCopy() + event := testEvent.DeepCopy() + + // Override the alert, provider, secret and event with test + // parameters. + if tt.alertNamespace != "" { + alert.Namespace = tt.alertNamespace + } + if tt.alertSummary != "" { + alert.Spec.Summary = tt.alertSummary + } + if tt.alertEventMetadata != nil { + alert.Spec.EventMetadata = tt.alertEventMetadata + } + if tt.providerNamespace != "" { + provider.Namespace = tt.providerNamespace + } + provider.Spec.Suspend = tt.providerSuspended + if tt.secretNamespace != "" { + secret.Namespace = tt.secretNamespace + } + if tt.eventMetadata != nil { + event.Metadata = tt.eventMetadata + } + + // Create fake objects and event server. + scheme := runtime.NewScheme() + g.Expect(apiv1beta3.AddToScheme(scheme)).ToNot(HaveOccurred()) + g.Expect(corev1.AddToScheme(scheme)).ToNot(HaveOccurred()) + builder := fakeclient.NewClientBuilder().WithScheme(scheme) + builder.WithObjects(provider, secret) + eventServer := EventServer{ + kubeClient: builder.Build(), + logger: log.Log, + noCrossNamespaceRefs: tt.noCrossNSRefs, + } + + _, n, _, _, err := eventServer.getNotificationParams(context.TODO(), event, *alert) + g.Expect(err != nil).To(Equal(tt.wantErr)) + if tt.alertSummary != "" { + g.Expect(n.Metadata["summary"]).To(Equal(tt.alertSummary)) + } + // NOTE: This is performing simple check. Thorough test for event + // metadata is performed in TestEnhanceEventWithAlertMetadata. + if tt.alertEventMetadata != nil { + for k, v := range tt.alertEventMetadata { + g.Expect(n.Metadata).To(HaveKeyWithValue(k, v)) + } + } + }) + } +} + +func TestCreateNotifier(t *testing.T) { + secretName := "foo-secret" + certSecretName := "cert-secret" + tests := []struct { + name string + providerSpec *apiv1beta3.ProviderSpec + secretData map[string][]byte + certSecretData map[string][]byte + wantErr bool + }{ + { + name: "no address, no secret ref", + providerSpec: &apiv1beta3.ProviderSpec{ + Type: "slack", + }, + wantErr: true, + }, + { + name: "valid address, no secret ref", + providerSpec: &apiv1beta3.ProviderSpec{ + Type: "slack", + Address: "https://example.com", + }, + }, + { + name: "reference to non-existing secret ref", + providerSpec: &apiv1beta3.ProviderSpec{ + Type: "slack", + SecretRef: &meta.LocalObjectReference{Name: "foo"}, + }, + wantErr: true, + }, + { + name: "reference to secret with valid address, proxy, headers", + providerSpec: &apiv1beta3.ProviderSpec{ + Type: "slack", + SecretRef: &meta.LocalObjectReference{Name: secretName}, + }, + secretData: map[string][]byte{ + "address": []byte("https://example.com"), + "proxy": []byte("https://exampleproxy.com"), + "headers": []byte(`foo: bar`), + }, + }, + { + name: "reference to secret with invalid address", + providerSpec: &apiv1beta3.ProviderSpec{ + Type: "slack", + SecretRef: &meta.LocalObjectReference{Name: secretName}, + }, + secretData: map[string][]byte{ + "address": []byte("https://example.com|"), + }, + wantErr: true, + }, + { + name: "reference to secret with invalid proxy", + providerSpec: &apiv1beta3.ProviderSpec{ + Type: "slack", + SecretRef: &meta.LocalObjectReference{Name: secretName}, + }, + secretData: map[string][]byte{ + "address": []byte("https://example.com"), + "proxy": []byte("https://exampleproxy.com|"), + }, + wantErr: true, + }, + { + name: "invalid headers in secret reference", + providerSpec: &apiv1beta3.ProviderSpec{ + Type: "slack", + SecretRef: &meta.LocalObjectReference{Name: secretName}, + }, + secretData: map[string][]byte{ + "address": []byte("https://example.com"), + "headers": []byte("foo"), + }, + wantErr: true, + }, + { + name: "invalid spec address overridden by valid secret ref address", + providerSpec: &apiv1beta3.ProviderSpec{ + Type: "slack", + SecretRef: &meta.LocalObjectReference{Name: secretName}, + Address: "https://example.com|", + }, + secretData: map[string][]byte{ + "address": []byte("https://example.com"), + }, + }, + { + name: "invalid spec proxy overridden by valid secret ref proxy", + providerSpec: &apiv1beta3.ProviderSpec{ + Type: "slack", + SecretRef: &meta.LocalObjectReference{Name: secretName}, + Proxy: "https://example.com|", + }, + secretData: map[string][]byte{ + "address": []byte("https://example.com"), + "proxy": []byte("https://example.com"), + }, + }, + { + name: "reference to non-existing cert secret", + providerSpec: &apiv1beta3.ProviderSpec{ + Type: "slack", + Address: "https://example.com", + CertSecretRef: &meta.LocalObjectReference{Name: "some-secret"}, + }, + wantErr: true, + }, + { + name: "reference to cert secret without caFile data", + providerSpec: &apiv1beta3.ProviderSpec{ + Type: "slack", + Address: "https://example.com", + CertSecretRef: &meta.LocalObjectReference{Name: certSecretName}, + }, + certSecretData: map[string][]byte{ + "aaa": []byte("bbb"), + }, + wantErr: true, + }, + { + name: "cert secret reference with valid CA", + providerSpec: &apiv1beta3.ProviderSpec{ + Type: "slack", + Address: "https://example.com", + CertSecretRef: &meta.LocalObjectReference{Name: certSecretName}, + }, + certSecretData: map[string][]byte{ + // Based on https://pkg.go.dev/crypto/tls#X509KeyPair example. + "caFile": []byte(`-----BEGIN CERTIFICATE----- +MIIBhTCCASugAwIBAgIQIRi6zePL6mKjOipn+dNuaTAKBggqhkjOPQQDAjASMRAw +DgYDVQQKEwdBY21lIENvMB4XDTE3MTAyMDE5NDMwNloXDTE4MTAyMDE5NDMwNlow +EjEQMA4GA1UEChMHQWNtZSBDbzBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABD0d +7VNhbWvZLWPuj/RtHFjvtJBEwOkhbN/BnnE8rnZR8+sbwnc/KhCk3FhnpHZnQz7B +5aETbbIgmuvewdjvSBSjYzBhMA4GA1UdDwEB/wQEAwICpDATBgNVHSUEDDAKBggr +BgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MCkGA1UdEQQiMCCCDmxvY2FsaG9zdDo1 +NDUzgg4xMjcuMC4wLjE6NTQ1MzAKBggqhkjOPQQDAgNIADBFAiEA2zpJEPQyz6/l +Wf86aX6PepsntZv2GYlA5UpabfT2EZICICpJ5h/iI+i341gBmLiAFQOyTDT+/wQc +6MF9+Yw1Yy0t +-----END CERTIFICATE-----`), + }, + }, + { + name: "cert secret reference with invalid CA", + providerSpec: &apiv1beta3.ProviderSpec{ + Type: "slack", + Address: "https://example.com", + CertSecretRef: &meta.LocalObjectReference{Name: certSecretName}, + }, + certSecretData: map[string][]byte{ + "caFile": []byte(`aaaaa`), + }, + wantErr: true, + }, + { + name: "unsupported provider", + providerSpec: &apiv1beta3.ProviderSpec{ + Type: "foo", + Address: "https://example.com", + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + // Create fake objects and event server. + scheme := runtime.NewScheme() + g.Expect(apiv1beta3.AddToScheme(scheme)).ToNot(HaveOccurred()) + g.Expect(corev1.AddToScheme(scheme)).ToNot(HaveOccurred()) + builder := fakeclient.NewClientBuilder().WithScheme(scheme) + if tt.secretData != nil { + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Name: secretName}, + Data: tt.secretData, + } + builder.WithObjects(secret) + } + if tt.certSecretData != nil { + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Name: certSecretName}, + Data: tt.certSecretData, + } + builder.WithObjects(secret) + } + provider := apiv1beta3.Provider{Spec: *tt.providerSpec} + + _, _, err := createNotifier(context.TODO(), builder.Build(), provider) + g.Expect(err != nil).To(Equal(tt.wantErr)) + }) + } +} + +func TestEventMatchesAlert(t *testing.T) { + testNamespace := "foo-ns" + involvedObj := corev1.ObjectReference{ + APIVersion: "kustomize.toolkit.fluxcd.io/v1", + Kind: "Kustomization", + Name: "foo", + Namespace: testNamespace, + } + + tests := []struct { + name string + event *eventv1.Event + source apiv1.CrossNamespaceObjectReference + severity string + resourcesFile string + wantResult bool + }{ + { + name: "source and event namespace mismatch", + event: &eventv1.Event{InvolvedObject: involvedObj}, + source: apiv1.CrossNamespaceObjectReference{ + Kind: "Kustomization", + Name: "*", + Namespace: "test-ns", + }, + severity: "info", + wantResult: false, + }, + { + name: "source and event kind mismatch", + event: &eventv1.Event{InvolvedObject: involvedObj}, + source: apiv1.CrossNamespaceObjectReference{ + Kind: "GitRepository", + Name: "*", + Namespace: testNamespace, + }, + severity: "info", + wantResult: false, + }, + { + name: "event and alert severity mismatch, alert severity error", + event: &eventv1.Event{ + InvolvedObject: involvedObj, + Severity: "info", + }, + source: apiv1.CrossNamespaceObjectReference{ + Kind: "Kustomization", + Name: "*", + Namespace: testNamespace, + }, + severity: "error", + wantResult: false, + }, + { + name: "event and alert severity mismatch, alert severity info", + event: &eventv1.Event{ + InvolvedObject: involvedObj, + Severity: "error", + }, + source: apiv1.CrossNamespaceObjectReference{ + Kind: "Kustomization", + Name: "*", + Namespace: testNamespace, + }, + severity: "info", + wantResult: true, + }, + { + name: "source with matching kind and namespace, any name", + event: &eventv1.Event{InvolvedObject: involvedObj}, + source: apiv1.CrossNamespaceObjectReference{ + Kind: "Kustomization", + Name: "*", + Namespace: testNamespace, + }, + severity: "info", + wantResult: true, + }, + { + name: "source with matching kind and namespace, unmatched name", + event: &eventv1.Event{InvolvedObject: involvedObj}, + source: apiv1.CrossNamespaceObjectReference{ + Kind: "Kustomization", + Name: "bar", + Namespace: testNamespace, + }, + severity: "info", + wantResult: false, + }, + { + name: "source with matching kind and namespace, matched name", + event: &eventv1.Event{InvolvedObject: involvedObj}, + source: apiv1.CrossNamespaceObjectReference{ + Kind: "Kustomization", + Name: "foo", + Namespace: testNamespace, + }, + severity: "info", + wantResult: true, + }, + { + name: "label selector match", + resourcesFile: "./testdata/kustomization.yaml", + event: &eventv1.Event{InvolvedObject: involvedObj}, + source: apiv1.CrossNamespaceObjectReference{ + Kind: "Kustomization", + Name: "*", + Namespace: testNamespace, + MatchLabels: map[string]string{ + "app": "podinfo", + }, + }, + severity: "info", + wantResult: true, + }, + { + name: "label selector mismatch", + resourcesFile: "./testdata/kustomization.yaml", + event: &eventv1.Event{InvolvedObject: involvedObj}, + source: apiv1.CrossNamespaceObjectReference{ + Kind: "Kustomization", + Name: "*", + Namespace: testNamespace, + MatchLabels: map[string]string{ + "aaa": "bbb", + }, + }, + severity: "info", + wantResult: false, + }, + { + name: "label selector, object not found", + event: &eventv1.Event{InvolvedObject: involvedObj}, + source: apiv1.CrossNamespaceObjectReference{ + Kind: "Kustomization", + Name: "*", + Namespace: testNamespace, + MatchLabels: map[string]string{ + "aaa": "bbb", + }, + }, + severity: "info", + wantResult: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + scheme := runtime.NewScheme() + g.Expect(apiv1beta3.AddToScheme(scheme)).ToNot(HaveOccurred()) + + builder := fakeclient.NewClientBuilder().WithScheme(scheme) + + // Create pre-existing resource from manifest file. + if tt.resourcesFile != "" { + obj, err := readManifest(tt.resourcesFile, testNamespace) + g.Expect(err).ToNot(HaveOccurred()) + builder.WithObjects(obj) + } + + eventServer := EventServer{ + kubeClient: builder.Build(), + logger: log.Log, + } + + result := eventServer.eventMatchesAlert(context.TODO(), tt.event, tt.source, tt.severity) + g.Expect(result).To(Equal(tt.wantResult)) + }) + } +} + func TestEnhanceEventWithAlertMetadata(t *testing.T) { - s := &EventServer{logger: logr.New(nil)} + s := &EventServer{logger: log.Log} for name, tt := range map[string]struct { event eventv1.Event - alert apiv1beta2.Alert + alert apiv1beta3.Alert expectedMetadata map[string]string }{ "empty metadata": { event: eventv1.Event{}, - alert: apiv1beta2.Alert{}, + alert: apiv1beta3.Alert{}, expectedMetadata: nil, }, "enhanced with summary": { event: eventv1.Event{}, - alert: apiv1beta2.Alert{ - Spec: apiv1beta2.AlertSpec{ + alert: apiv1beta3.Alert{ + Spec: apiv1beta3.AlertSpec{ Summary: "summary", }, }, @@ -56,8 +920,8 @@ func TestEnhanceEventWithAlertMetadata(t *testing.T) { "summary": "original summary", }, }, - alert: apiv1beta2.Alert{ - Spec: apiv1beta2.AlertSpec{ + alert: apiv1beta3.Alert{ + Spec: apiv1beta3.AlertSpec{ Summary: "summary", }, }, @@ -67,8 +931,8 @@ func TestEnhanceEventWithAlertMetadata(t *testing.T) { }, "enhanced with metadata": { event: eventv1.Event{}, - alert: apiv1beta2.Alert{ - Spec: apiv1beta2.AlertSpec{ + alert: apiv1beta3.Alert{ + Spec: apiv1beta3.AlertSpec{ EventMetadata: map[string]string{ "foo": "bar", }, @@ -84,8 +948,8 @@ func TestEnhanceEventWithAlertMetadata(t *testing.T) { "foo": "baz", }, }, - alert: apiv1beta2.Alert{ - Spec: apiv1beta2.AlertSpec{ + alert: apiv1beta3.Alert{ + Spec: apiv1beta3.AlertSpec{ EventMetadata: map[string]string{ "foo": "bar", }, diff --git a/internal/server/event_server_test.go b/internal/server/event_server_test.go index fc829e368..a40c77a83 100644 --- a/internal/server/event_server_test.go +++ b/internal/server/event_server_test.go @@ -21,29 +21,324 @@ import ( "context" "encoding/json" "fmt" + "net" "net/http" "net/http/httptest" + "os" + "strconv" + "strings" "testing" "time" - "github.com/onsi/gomega" + . "github.com/onsi/gomega" "github.com/sethvargo/go-limiter/httplimit" "github.com/sethvargo/go-limiter/memorystore" + prommetrics "github.com/slok/go-http-metrics/metrics/prometheus" + "github.com/slok/go-http-metrics/middleware" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + yamlutil "k8s.io/apimachinery/pkg/util/yaml" + fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" + log "sigs.k8s.io/controller-runtime/pkg/log" eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" + "github.com/fluxcd/pkg/apis/meta" + + apiv1 "github.com/fluxcd/notification-controller/api/v1" + apiv1beta3 "github.com/fluxcd/notification-controller/api/v1beta3" ) +func TestEventServer(t *testing.T) { + g := NewWithT(t) + + testNamespace := "foo-ns" + var req *http.Request + + // Run receiver server. + rcvServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + req = r + w.WriteHeader(200) + })) + defer rcvServer.Close() + + provider := &apiv1beta3.Provider{} + provider.Name = "provider-foo" + provider.Namespace = testNamespace + provider.Spec = apiv1beta3.ProviderSpec{ + Type: "generic", + Address: rcvServer.URL, + } + + testAlert := &apiv1beta3.Alert{} + testAlert.Name = "alert-foo" + testAlert.Namespace = testNamespace + testAlert.Spec = apiv1beta3.AlertSpec{ + ProviderRef: meta.LocalObjectReference{Name: provider.Name}, + EventSeverity: "info", + EventSources: []apiv1.CrossNamespaceObjectReference{ + { + Kind: "Bucket", + Name: "hyacinth", + Namespace: testNamespace, + }, + { + Kind: "Kustomization", + Name: "*", + }, + { + Kind: "GitRepository", + Name: "*", + MatchLabels: map[string]string{ + "app": "podinfo", + }, + }, + { + Kind: "Kustomization", + Name: "*", + Namespace: "test", + }, + }, + } + + // Create objects to be used as involved object in the test events. + repo1, err := readManifest("./testdata/repo.yaml", testNamespace) + g.Expect(err).ToNot(HaveOccurred()) + repo2, err := readManifest("./testdata/gitrepo2.yaml", testNamespace) + g.Expect(err).ToNot(HaveOccurred()) + + scheme := runtime.NewScheme() + g.Expect(apiv1beta3.AddToScheme(scheme)).ToNot(HaveOccurred()) + g.Expect(corev1.AddToScheme(scheme)).ToNot(HaveOccurred()) + + // Create a fake kube client with the above objects. + builder := fakeclient.NewClientBuilder().WithScheme(scheme) + builder.WithObjects(provider, repo1, repo2) + kclient := builder.Build() + + // Get a free port to run the event server at. + l, err := net.Listen("tcp", ":0") + g.Expect(err).ToNot(HaveOccurred()) + eventServerPort := strconv.Itoa(l.Addr().(*net.TCPAddr).Port) + g.Expect(l.Close()).ToNot(HaveOccurred()) + + // Create the event server to test. + eventMdlw := middleware.New(middleware.Config{ + Recorder: prommetrics.NewRecorder(prommetrics.Config{ + Prefix: "gotk_event", + }), + }) + store, err := memorystore.New(&memorystore.Config{ + Interval: 5 * time.Minute, + }) + if err != nil { + t.Fatalf("failed to create memory storage") + } + // eventServer := NewEventServer("127.0.0.1:"+eventServerPort, log.Log, builder.Build(), true) + eventServer := NewEventServer("127.0.0.1:"+eventServerPort, log.Log, kclient, true) + stopCh := make(chan struct{}) + go eventServer.ListenAndServe(stopCh, eventMdlw, store) + defer close(stopCh) + + // Create a base event which is copied and mutated in the test cases. + testEvent := eventv1.Event{ + InvolvedObject: corev1.ObjectReference{ + Kind: "Bucket", + Name: "hyacinth", + Namespace: testNamespace, + }, + Severity: "info", + Timestamp: metav1.Now(), + Message: "well that happened", + Reason: "event-happened", + ReportingController: "source-controller", + } + + tests := []struct { + name string + inclusionList []string + exclusionList []string + modifyEventFunc func(e *eventv1.Event) *eventv1.Event + forwarded bool + }{ + { + name: "forwards when source is a match", + modifyEventFunc: func(e *eventv1.Event) *eventv1.Event { return e }, + forwarded: true, + }, + { + name: "drops event when source Kind does not match", + modifyEventFunc: func(e *eventv1.Event) *eventv1.Event { + e.InvolvedObject.Kind = "GitRepository" + return e + }, + forwarded: false, + }, + { + name: "drops event when source name does not match", + modifyEventFunc: func(e *eventv1.Event) *eventv1.Event { + e.InvolvedObject.Name = "slop" + return e + }, + forwarded: false, + }, + { + name: "drops event when source namespace does not match", + modifyEventFunc: func(e *eventv1.Event) *eventv1.Event { + e.InvolvedObject.Namespace = "all-buckets" + return e + }, + forwarded: false, + }, + { + name: "forwards when message matches inclusion list", + inclusionList: []string{"^included"}, + modifyEventFunc: func(e *eventv1.Event) *eventv1.Event { + e.Message = "included" + return e + }, + forwarded: true, + }, + { + name: "drops when message does not match inclusion list", + inclusionList: []string{"^included"}, + modifyEventFunc: func(e *eventv1.Event) *eventv1.Event { + e.Message = "not included" + return e + }, + forwarded: false, + }, + { + name: "drops event that is matched by exclusion", + exclusionList: []string{ + "doesnotoccur", // Not intended to match. + "excluded", + }, + modifyEventFunc: func(e *eventv1.Event) *eventv1.Event { + e.Message = "this is excluded" + return e + }, + forwarded: false, + }, + { + name: "drops when message matches inclusion and exclusion list", + inclusionList: []string{"^included"}, + exclusionList: []string{"excluded"}, + modifyEventFunc: func(e *eventv1.Event) *eventv1.Event { + e.Message = "included excluded" + return e + }, + forwarded: false, + }, + { + name: "forwards events when name wildcard is used", + modifyEventFunc: func(e *eventv1.Event) *eventv1.Event { + e.InvolvedObject.Kind = "Kustomization" + e.InvolvedObject.Name = "test" + e.InvolvedObject.Namespace = testNamespace + e.Message = "test" + return e + }, + forwarded: true, + }, + { + name: "forwards events when the label matches", + modifyEventFunc: func(e *eventv1.Event) *eventv1.Event { + e.InvolvedObject.Kind = "GitRepository" + e.InvolvedObject.Name = "podinfo" + e.InvolvedObject.APIVersion = "source.toolkit.fluxcd.io/v1" + e.InvolvedObject.Namespace = testNamespace + e.Message = "test" + return e + }, + forwarded: true, + }, + { + name: "drops events when the labels don't match", + modifyEventFunc: func(e *eventv1.Event) *eventv1.Event { + e.InvolvedObject.Kind = "GitRepository" + e.InvolvedObject.Name = "podinfo-two" + e.InvolvedObject.APIVersion = "source.toolkit.fluxcd.io/v1" + e.InvolvedObject.Namespace = testNamespace + e.Message = "test" + return e + }, + forwarded: false, + }, + { + name: "drops events for cross-namespace sources", + modifyEventFunc: func(e *eventv1.Event) *eventv1.Event { + e.InvolvedObject.Kind = "Kustomization" + e.InvolvedObject.Name = "test" + e.InvolvedObject.Namespace = "test" + e.Message = "test" + return e + }, + forwarded: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + // Reset the common receiver server request variable. + req = nil + + // Create the test alert. + alert := testAlert.DeepCopy() + if tt.inclusionList != nil { + alert.Spec.InclusionList = tt.inclusionList + } + if tt.exclusionList != nil { + alert.Spec.ExclusionList = tt.exclusionList + } + g.Expect(kclient.Create(context.TODO(), alert)).ToNot(HaveOccurred()) + defer func() { + g.Expect(kclient.Delete(context.TODO(), alert)) + }() + + // Create the test event. + event := testEvent.DeepCopy() + event = tt.modifyEventFunc(event) + + buf := &bytes.Buffer{} + g.Expect(json.NewEncoder(buf).Encode(event)).To(Succeed()) + res, err := http.Post("http://localhost:"+eventServerPort, "application/json", buf) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(res.StatusCode).To(Equal(http.StatusAccepted)) // Event server responds with 202 Accepted. + + if tt.forwarded { + g.Eventually(func() bool { + return req == nil + }, "2s", "0.1s").Should(BeFalse()) + } else { + // Check filtered requests. + // + // The event_server does forwarding in a goroutine, after + // responding to the POST of the event. This makes it + // difficult to know whether the provider has filtered the + // event, or just not run the goroutine yet. For now, use a + // timeout (and consistently so it can fail early). + g.Consistently(func() bool { + return req == nil + }, "1s", "0.1s").Should(BeTrue()) + } + }) + } +} + func TestEventKeyFunc(t *testing.T) { - g := gomega.NewGomegaWithT(t) + g := NewWithT(t) // Setup middleware store, err := memorystore.New(&memorystore.Config{ Interval: 10 * time.Minute, }) - g.Expect(err).ShouldNot(gomega.HaveOccurred()) + g.Expect(err).ShouldNot(HaveOccurred()) middleware, err := httplimit.NewMiddleware(store, eventKeyFunc) - g.Expect(err).ShouldNot(gomega.HaveOccurred()) + g.Expect(err).ShouldNot(HaveOccurred()) handler := middleware.Handle(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) })) @@ -58,7 +353,7 @@ func TestEventKeyFunc(t *testing.T) { }{ { involvedObject: corev1.ObjectReference{ - APIVersion: "kustomize.toolkit.fluxcd.io/v1beta1", + APIVersion: "kustomize.toolkit.fluxcd.io/v1", Kind: "Kustomization", Name: "1", Namespace: "1", @@ -69,7 +364,7 @@ func TestEventKeyFunc(t *testing.T) { }, { involvedObject: corev1.ObjectReference{ - APIVersion: "kustomize.toolkit.fluxcd.io/v1beta1", + APIVersion: "kustomize.toolkit.fluxcd.io/v1", Kind: "Kustomization", Name: "1", Namespace: "1", @@ -80,7 +375,7 @@ func TestEventKeyFunc(t *testing.T) { }, { involvedObject: corev1.ObjectReference{ - APIVersion: "kustomize.toolkit.fluxcd.io/v1beta1", + APIVersion: "kustomize.toolkit.fluxcd.io/v1", Kind: "Kustomization", Name: "1", Namespace: "1", @@ -91,7 +386,7 @@ func TestEventKeyFunc(t *testing.T) { }, { involvedObject: corev1.ObjectReference{ - APIVersion: "kustomize.toolkit.fluxcd.io/v1beta1", + APIVersion: "kustomize.toolkit.fluxcd.io/v1", Kind: "Kustomization", Name: "2", Namespace: "2", @@ -102,7 +397,7 @@ func TestEventKeyFunc(t *testing.T) { }, { involvedObject: corev1.ObjectReference{ - APIVersion: "kustomize.toolkit.fluxcd.io/v1beta1", + APIVersion: "kustomize.toolkit.fluxcd.io/v1", Kind: "Kustomization", Name: "3", Namespace: "3", @@ -113,7 +408,7 @@ func TestEventKeyFunc(t *testing.T) { }, { involvedObject: corev1.ObjectReference{ - APIVersion: "kustomize.toolkit.fluxcd.io/v1beta1", + APIVersion: "kustomize.toolkit.fluxcd.io/v1", Kind: "Kustomization", Name: "2", Namespace: "2", @@ -124,7 +419,7 @@ func TestEventKeyFunc(t *testing.T) { }, { involvedObject: corev1.ObjectReference{ - APIVersion: "kustomize.toolkit.fluxcd.io/v1beta1", + APIVersion: "kustomize.toolkit.fluxcd.io/v1", Kind: "Kustomization", Name: "4", Namespace: "4", @@ -138,7 +433,7 @@ func TestEventKeyFunc(t *testing.T) { }, { involvedObject: corev1.ObjectReference{ - APIVersion: "kustomize.toolkit.fluxcd.io/v1beta1", + APIVersion: "kustomize.toolkit.fluxcd.io/v1", Kind: "Kustomization", Name: "4", Namespace: "4", @@ -152,7 +447,7 @@ func TestEventKeyFunc(t *testing.T) { }, { involvedObject: corev1.ObjectReference{ - APIVersion: "kustomize.toolkit.fluxcd.io/v1beta1", + APIVersion: "kustomize.toolkit.fluxcd.io/v1", Kind: "Kustomization", Name: "4", Namespace: "4", @@ -166,7 +461,7 @@ func TestEventKeyFunc(t *testing.T) { }, { involvedObject: corev1.ObjectReference{ - APIVersion: "kustomize.toolkit.fluxcd.io/v1beta1", + APIVersion: "kustomize.toolkit.fluxcd.io/v1", Kind: "Kustomization", Name: "4", Namespace: "4", @@ -180,7 +475,7 @@ func TestEventKeyFunc(t *testing.T) { }, { involvedObject: corev1.ObjectReference{ - APIVersion: "kustomize.toolkit.fluxcd.io/v1beta1", + APIVersion: "kustomize.toolkit.fluxcd.io/v1", Kind: "Kustomization", Name: "4", Namespace: "4", @@ -194,7 +489,7 @@ func TestEventKeyFunc(t *testing.T) { }, { involvedObject: corev1.ObjectReference{ - APIVersion: "kustomize.toolkit.fluxcd.io/v1beta1", + APIVersion: "kustomize.toolkit.fluxcd.io/v1", Kind: "Kustomization", Name: "4", Namespace: "4", @@ -217,7 +512,7 @@ func TestEventKeyFunc(t *testing.T) { } cleanupMetadata(event) eventData, err := json.Marshal(event) - g.Expect(err).ShouldNot(gomega.HaveOccurred()) + g.Expect(err).ShouldNot(HaveOccurred()) res := httptest.NewRecorder() req := httptest.NewRequest("POST", "/", bytes.NewBuffer(eventData)) @@ -226,11 +521,78 @@ func TestEventKeyFunc(t *testing.T) { handler.ServeHTTP(res, reqWithEvent) if tt.rateLimit { - g.Expect(res.Code).Should(gomega.Equal(429)) - g.Expect(res.Header().Get("X-Ratelimit-Remaining")).Should(gomega.Equal("0")) + g.Expect(res.Code).Should(Equal(429)) + g.Expect(res.Header().Get("X-Ratelimit-Remaining")).Should(Equal("0")) } else { - g.Expect(res.Code).Should(gomega.Equal(200)) + g.Expect(res.Code).Should(Equal(200)) } }) } } + +func TestCleanupMetadata(t *testing.T) { + group := "kustomize.toolkit.fluxcd.io" + involvedObj := corev1.ObjectReference{ + APIVersion: "kustomize.toolkit.fluxcd.io/v1", + Kind: "Kustomization", + Name: "foo", + Namespace: "foo-ns", + } + + tests := []struct { + name string + event *eventv1.Event + wantMeta map[string]string + }{ + { + name: "event with no metadata", + event: &eventv1.Event{InvolvedObject: involvedObj}, + wantMeta: map[string]string{}, + }, + { + name: "event with metadata", + event: &eventv1.Event{ + InvolvedObject: involvedObj, + Metadata: map[string]string{ + group + "/foo": "fooval", + group + "/bar": "barval", + group + "/" + eventv1.MetaChecksumKey: "aaaaa", + group + "/" + eventv1.MetaDigestKey: "bbbbbbbb", + "source.toolkit.fluxcd.io/baz": "bazval", + group + "/zzz": "zzzz", + group + "/aa/bb": "cc", + }, + }, + wantMeta: map[string]string{ + "foo": "fooval", + "bar": "barval", + "zzz": "zzzz", + "aa/bb": "cc", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + cleanupMetadata(tt.event) + + g.Expect(tt.event.Metadata).To(BeEquivalentTo(tt.wantMeta)) + }) + } +} + +func readManifest(path, namespace string) (*unstructured.Unstructured, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, err + } + yml := fmt.Sprintf(string(data), namespace) + reader := yamlutil.NewYAMLOrJSONDecoder(strings.NewReader(string(yml)), 2048) + obj := &unstructured.Unstructured{} + if err := reader.Decode(obj); err != nil { + return nil, err + } + return obj, nil +} diff --git a/internal/server/testdata/gitrepo2.yaml b/internal/server/testdata/gitrepo2.yaml new file mode 100644 index 000000000..94995e0c2 --- /dev/null +++ b/internal/server/testdata/gitrepo2.yaml @@ -0,0 +1,13 @@ +--- +apiVersion: source.toolkit.fluxcd.io/v1 +kind: GitRepository +metadata: + name: podinfo-two + namespace: "%[1]s" + labels: + app: podinfo-two +spec: + interval: 1m + url: https://github.com/stefanprodan/podinfo + ref: + semver: 6.0.x diff --git a/internal/server/testdata/kustomization.yaml b/internal/server/testdata/kustomization.yaml new file mode 100644 index 000000000..601869678 --- /dev/null +++ b/internal/server/testdata/kustomization.yaml @@ -0,0 +1,10 @@ +--- +apiVersion: kustomize.toolkit.fluxcd.io/v1 +kind: Kustomization +metadata: + name: foo + namespace: "%[1]s" + labels: + app: podinfo +spec: + interval: 1m diff --git a/internal/server/testdata/repo.yaml b/internal/server/testdata/repo.yaml new file mode 100644 index 000000000..1fa6334ea --- /dev/null +++ b/internal/server/testdata/repo.yaml @@ -0,0 +1,13 @@ +--- +apiVersion: source.toolkit.fluxcd.io/v1 +kind: GitRepository +metadata: + name: podinfo + namespace: "%[1]s" + labels: + app: podinfo +spec: + interval: 1m + url: https://github.com/stefanprodan/podinfo + ref: + semver: 6.0.x