diff --git a/api/pkg/apis/v1alpha1/providers/states/k8s/k8s.go b/api/pkg/apis/v1alpha1/providers/states/k8s/k8s.go index 3500bbcce..cff8c9697 100644 --- a/api/pkg/apis/v1alpha1/providers/states/k8s/k8s.go +++ b/api/pkg/apis/v1alpha1/providers/states/k8s/k8s.go @@ -173,8 +173,6 @@ func (s *K8sStateProvider) Upsert(ctx context.Context, entry states.UpsertReques var err error = nil defer observ_utils.CloseSpanWithError(span, &err) - sLog.Info(" P (K8s State): upsert state") - namespace := model.ReadPropertyCompat(entry.Metadata, "namespace", nil) group := model.ReadPropertyCompat(entry.Metadata, "group", nil) version := model.ReadPropertyCompat(entry.Metadata, "version", nil) @@ -184,6 +182,7 @@ func (s *K8sStateProvider) Upsert(ctx context.Context, entry states.UpsertReques if namespace == "" { namespace = "default" } + sLog.Info(" P (K8s State): upsert state %s in namespace %s, traceId: %s", entry.Value.ID, namespace, span.SpanContext().TraceID().String()) resourceId := schema.GroupVersionResource{ Group: group, @@ -197,7 +196,8 @@ func (s *K8sStateProvider) Upsert(ctx context.Context, entry states.UpsertReques } j, _ := json.Marshal(entry.Value.Body) - item, err := s.DynamicClient.Resource(resourceId).Namespace(namespace).Get(ctx, entry.Value.ID, metav1.GetOptions{}) + var item *unstructured.Unstructured + item, err = s.DynamicClient.Resource(resourceId).Namespace(namespace).Get(ctx, entry.Value.ID, metav1.GetOptions{}) if err != nil { template := fmt.Sprintf(`{"apiVersion":"%s/v1", "kind": "%s", "metadata": {}}`, group, kind) var unc *unstructured.Unstructured @@ -252,6 +252,7 @@ func (s *K8sStateProvider) Upsert(ctx context.Context, entry states.UpsertReques item.SetLabels(metadata.Labels) item.SetAnnotations(metadata.Annotations) } + getResourceVersion := false if v, ok := dict["spec"]; ok { item.Object["spec"] = v @@ -260,8 +261,18 @@ func (s *K8sStateProvider) Upsert(ctx context.Context, entry states.UpsertReques sLog.Errorf(" P (K8s State): failed to update object: %v", err) return "", err } + getResourceVersion = true } if v, ok := dict["status"]; ok { + if getResourceVersion { + // Get latest resource version in case the the object spec is also updated + item, err = s.DynamicClient.Resource(resourceId).Namespace(namespace).Get(ctx, entry.Value.ID, metav1.GetOptions{}) + if err != nil { + sLog.Errorf(" P (K8s State): failed to get object when trying to update status: %v", err) + return "", err + } + } + if vMap, ok := v.(map[string]interface{}); ok { status := &unstructured.Unstructured{ Object: map[string]interface{}{ @@ -310,13 +321,13 @@ func (s *K8sStateProvider) List(ctx context.Context, request states.ListRequest) var err error = nil defer observ_utils.CloseSpanWithError(span, &err) - sLog.Info(" P (K8s State): list state") - namespace := model.ReadPropertyCompat(request.Metadata, "namespace", nil) group := model.ReadPropertyCompat(request.Metadata, "group", nil) version := model.ReadPropertyCompat(request.Metadata, "version", nil) resource := model.ReadPropertyCompat(request.Metadata, "resource", nil) + sLog.Infof(" P (K8s State): list state for %s.%s in namespace %s, traceId: %s", resource, group, namespace, span.SpanContext().TraceID().String()) + var namespaces []string if namespace == "" { ret, err := s.ListAllNamespaces(ctx, version) @@ -422,8 +433,6 @@ func (s *K8sStateProvider) Delete(ctx context.Context, request states.DeleteRequ var err error = nil defer observ_utils.CloseSpanWithError(span, &err) - sLog.Info(" P (K8s State): delete state") - namespace := model.ReadPropertyCompat(request.Metadata, "namespace", nil) group := model.ReadPropertyCompat(request.Metadata, "group", nil) version := model.ReadPropertyCompat(request.Metadata, "version", nil) @@ -437,6 +446,7 @@ func (s *K8sStateProvider) Delete(ctx context.Context, request states.DeleteRequ if namespace == "" { namespace = "default" } + sLog.Infof(" P (K8s State): delete state %s in namespace %s, traceId: %s", request.ID, namespace, span.SpanContext().TraceID().String()) if request.ID == "" { err := v1alpha2.NewCOAError(nil, "found invalid request ID", v1alpha2.BadRequest) @@ -458,8 +468,6 @@ func (s *K8sStateProvider) Get(ctx context.Context, request states.GetRequest) ( var err error = nil defer observ_utils.CloseSpanWithError(span, &err) - sLog.Info(" P (K8s State): get state") - namespace := model.ReadPropertyCompat(request.Metadata, "namespace", nil) group := model.ReadPropertyCompat(request.Metadata, "group", nil) version := model.ReadPropertyCompat(request.Metadata, "version", nil) @@ -469,6 +477,8 @@ func (s *K8sStateProvider) Get(ctx context.Context, request states.GetRequest) ( namespace = "default" } + sLog.Infof(" P (K8s State): get state %s in namespace %s, traceId: %s", request.ID, namespace, span.SpanContext().TraceID().String()) + resourceId := schema.GroupVersionResource{ Group: group, Version: version, diff --git a/api/pkg/apis/v1alpha1/vendors/stage-vendor.go b/api/pkg/apis/v1alpha1/vendors/stage-vendor.go index a0860f2e8..10fba49c5 100644 --- a/api/pkg/apis/v1alpha1/vendors/stage-vendor.go +++ b/api/pkg/apis/v1alpha1/vendors/stage-vendor.go @@ -83,19 +83,29 @@ func (s *StageVendor) Init(config vendors.VendorConfig, factories []managers.IMa return v1alpha2.NewCOAError(nil, "event body is not an activation job", v1alpha2.BadRequest) } campaignName := api_utils.ReplaceSeperator(actData.Campaign) + campaign, err := s.CampaignsManager.GetState(context.TODO(), campaignName, actData.Namespace) if err != nil { log.Error("V (Stage): unable to find campaign: %+v", err) + err = s.reportActivationStatusWithBadRequest(actData.Activation, actData.Namespace, err) + // If report status succeeded, return an empty err so the subscribe function will not be retried + // The actual error will be stored in Activation cr return err } activation, err := s.ActivationsManager.GetState(context.TODO(), actData.Activation, actData.Namespace) if err != nil { log.Error("V (Stage): unable to find activation: %+v", err) + err = s.reportActivationStatusWithBadRequest(actData.Activation, actData.Namespace, err) + // If report status succeeded, return an empty err so the subscribe function will not be retried + // The actual error will be stored in Activation cr return err } evt, err := s.StageManager.HandleActivationEvent(context.TODO(), actData, *campaign.Spec, activation) if err != nil { + err = s.reportActivationStatusWithBadRequest(actData.Activation, actData.Namespace, err) + // If report status succeeded, return an empty err so the subscribe function will not be retried + // The actual error will be stored in Activation cr return err } @@ -122,30 +132,22 @@ func (s *StageVendor) Init(config vendors.VendorConfig, factories []managers.IMa err := json.Unmarshal(jData, &triggerData) if err != nil { err = v1alpha2.NewCOAError(nil, "event body is not an activation job", v1alpha2.BadRequest) - status.Status = v1alpha2.BadRequest - status.StatusMessage = v1alpha2.BadRequest.String() - status.ErrorMessage = err.Error() - status.IsActive = false sLog.Errorf("V (Stage): failed to deserialize activation data: %v", err) - err = s.ActivationsManager.ReportStatus(context.TODO(), triggerData.Activation, triggerData.Namespace, status) - if err != nil { - sLog.Errorf("V (Stage): failed to report error status: %v (%v)", status.ErrorMessage, err) - } + err = s.reportActivationStatusWithBadRequest(triggerData.Activation, triggerData.Namespace, err) + // If report status succeeded, return an empty err so the subscribe function will not be retried + // The actual error will be stored in Activation cr + return err } status.Outputs["__namespace"] = triggerData.Namespace campaignName := api_utils.ReplaceSeperator(triggerData.Campaign) campaign, err := s.CampaignsManager.GetState(context.TODO(), campaignName, triggerData.Namespace) if err != nil { - status.Status = v1alpha2.BadRequest - status.StatusMessage = v1alpha2.BadRequest.String() - status.ErrorMessage = err.Error() - status.IsActive = false sLog.Errorf("V (Stage): failed to get campaign spec: %v", err) - err = s.ActivationsManager.ReportStatus(context.TODO(), triggerData.Activation, triggerData.Namespace, status) - if err != nil { - sLog.Errorf("V (Stage): failed to report error status: %v (%v)", status.ErrorMessage, err) - } + err = s.reportActivationStatusWithBadRequest(triggerData.Activation, triggerData.Namespace, err) + // If report status succeeded, return an empty err so the subscribe function will not be retried + // The actual error will be stored in Activation cr + return err } status.Stage = triggerData.Stage status.ActivationGeneration = triggerData.ActivationGeneration @@ -306,3 +308,20 @@ func (s *StageVendor) Init(config vendors.VendorConfig, factories []managers.IMa }) return nil } + +func (s *StageVendor) reportActivationStatusWithBadRequest(activation string, namespace string, err error) error { + status := model.ActivationStatus{ + Stage: "", + NextStage: "", + Outputs: map[string]interface{}{}, + Status: v1alpha2.BadRequest, + StatusMessage: v1alpha2.BadRequest.String(), + ErrorMessage: err.Error(), + IsActive: true, + } + err = s.ActivationsManager.ReportStatus(context.TODO(), activation, namespace, status) + if err != nil { + sLog.Errorf("V (Stage): failed to report error status on activtion %s/%s: %v (%v)", namespace, activation, status.ErrorMessage, err) + } + return err +} diff --git a/k8s/apis/workflow/v1/activation_webhook.go b/k8s/apis/workflow/v1/activation_webhook.go new file mode 100644 index 000000000..14242646d --- /dev/null +++ b/k8s/apis/workflow/v1/activation_webhook.go @@ -0,0 +1,183 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT license. + * SPDX-License-Identifier: MIT + */ + +package v1 + +import ( + "context" + "fmt" + "gopls-workspace/apis/metrics/v1" + "strings" + "time" + + "github.com/eclipse-symphony/symphony/k8s/utils" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/validation/field" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/webhook" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" +) + +// log is for logging in this package. +var activationlog = logf.Log.WithName("activation-resource") +var myActivationClient client.Client +var activationWebhookValidationMetrics *metrics.Metrics + +func (r *Activation) SetupWebhookWithManager(mgr ctrl.Manager) error { + myActivationClient = mgr.GetClient() + mgr.GetFieldIndexer().IndexField(context.Background(), &Activation{}, ".metadata.name", func(rawObj client.Object) []string { + activation := rawObj.(*Activation) + return []string{activation.Name} + }) + + // initialize the controller operation metrics + if activationWebhookValidationMetrics == nil { + metrics, err := metrics.New() + if err != nil { + return err + } + activationWebhookValidationMetrics = metrics + } + + return ctrl.NewWebhookManagedBy(mgr). + For(r). + Complete() +} + +// TODO(user): EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! + +//+kubebuilder:webhook:path=/mutate-workflow-symphony-v1-activation,mutating=true,failurePolicy=fail,sideEffects=None,groups=workflow.symphony,resources=activations,verbs=create;update,versions=v1,name=mactivation.kb.io,admissionReviewVersions=v1 + +var _ webhook.Defaulter = &Activation{} + +// Default implements webhook.Defaulter so a webhook will be registered for the type +func (r *Activation) Default() { + activationlog.Info("default", "name", r.Name) +} + +// TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation. + +//+kubebuilder:webhook:path=/validate-workflow-symphony-v1-activation,mutating=false,failurePolicy=fail,sideEffects=None,groups=workflow.symphony,resources=activations,verbs=create;update,versions=v1,name=mactivation.kb.io,admissionReviewVersions=v1 + +var _ webhook.Validator = &Activation{} + +// ValidateCreate implements webhook.Validator so a webhook will be registered for the type +func (r *Activation) ValidateCreate() (admission.Warnings, error) { + activationlog.Info("validate create", "name", r.Name) + + validateCreateTime := time.Now() + validationError := r.validateCreateActivation() + if validationError != nil { + activationWebhookValidationMetrics.ControllerValidationLatency( + validateCreateTime, + metrics.CreateOperationType, + metrics.InvalidResource, + metrics.CatalogResourceType) + } else { + activationWebhookValidationMetrics.ControllerValidationLatency( + validateCreateTime, + metrics.CreateOperationType, + metrics.ValidResource, + metrics.CatalogResourceType) + } + + return nil, validationError +} + +// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type +func (r *Activation) ValidateUpdate(old runtime.Object) (admission.Warnings, error) { + activationlog.Info("validate update", "name", r.Name) + validateUpdateTime := time.Now() + oldActivation, ok := old.(*Activation) + if !ok { + return nil, fmt.Errorf("expected an Activation object") + } + // Compare the Spec of the current and old Activation objects + validationError := r.validateSpecOnUpdate(oldActivation) + if validationError != nil { + activationWebhookValidationMetrics.ControllerValidationLatency( + validateUpdateTime, + metrics.UpdateOperationType, + metrics.InvalidResource, + metrics.InstanceResourceType, + ) + } else { + activationWebhookValidationMetrics.ControllerValidationLatency( + validateUpdateTime, + metrics.UpdateOperationType, + metrics.ValidResource, + metrics.InstanceResourceType, + ) + } + return nil, validationError +} + +// ValidateDelete implements webhook.Validator so a webhook will be registered for the type +func (r *Activation) ValidateDelete() (admission.Warnings, error) { + activationlog.Info("validate delete", "name", r.Name) + + return nil, nil +} + +func (r *Activation) validateCreateActivation() error { + var allErrs field.ErrorList + + if err := r.validateNameOnCreate(); err != nil { + allErrs = append(allErrs, err) + } + if err := r.validateCampaignOnCreate(); err != nil { + allErrs = append(allErrs, err) + } + if len(allErrs) == 0 { + return nil + } + + return apierrors.NewInvalid(schema.GroupKind{Group: "workflow.symphony", Kind: "Activation"}, r.Name, allErrs) +} + +func (r *Activation) validateNameOnCreate() *field.Error { + if strings.Contains(r.ObjectMeta.Name, "-") { + return field.Invalid(field.NewPath("metadata").Child("name"), r.ObjectMeta.Name, "name must not contain '-'") + } + return nil +} +func (r *Activation) validateCampaignOnCreate() *field.Error { + if r.Spec.Campaign == "" { + return field.Invalid(field.NewPath("spec").Child("campaign"), r.Spec.Campaign, "campaign must not be empty") + } + campaignName := utils.ReplaceLastSeperator(r.Spec.Campaign, ":", "-") + var campaign Campaign + err := myActivationClient.Get(context.Background(), client.ObjectKey{Name: campaignName, Namespace: r.Namespace}, &campaign) + if err != nil { + return field.Invalid(field.NewPath("spec").Child("campaign"), r.Spec.Campaign, "campaign doesn't exist") + } + return nil +} + +func (r *Activation) validateSpecOnUpdate(oldActivation *Activation) error { + var allErrs field.ErrorList + if r.Spec.Campaign != oldActivation.Spec.Campaign { + allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("campaign"), r.Spec.Campaign, "updates to activation spec.Campaign are not allowed")) + } + if r.Spec.Stage != oldActivation.Spec.Stage { + allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("stage"), r.Spec.Stage, "updates to activation spec.Stage are not allowed")) + } + if r.Spec.Inputs.String() != oldActivation.Spec.Inputs.String() { + allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("inputs"), r.Spec.Inputs, "updates to activation spec.Inputs are not allowed")) + } + if r.Spec.Generation != oldActivation.Spec.Generation { + allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("generation"), r.Spec.Generation, "updates to activation spec.Generation are not allowed")) + } + if len(allErrs) == 0 { + return nil + } + + return apierrors.NewInvalid(schema.GroupKind{Group: "workflow.symphony", Kind: "Activation"}, r.Name, allErrs) +} diff --git a/k8s/config/oss/webhook/manifests.yaml b/k8s/config/oss/webhook/manifests.yaml index 89adaa190..a356ea748 100644 --- a/k8s/config/oss/webhook/manifests.yaml +++ b/k8s/config/oss/webhook/manifests.yaml @@ -124,6 +124,26 @@ webhooks: resources: - solutioncontainers sideEffects: None +- admissionReviewVersions: + - v1 + clientConfig: + service: + name: webhook-service + namespace: system + path: /mutate-workflow-symphony-v1-activation + failurePolicy: Fail + name: mactivation.kb.io + rules: + - apiGroups: + - workflow.symphony + apiVersions: + - v1 + operations: + - CREATE + - UPDATE + resources: + - activations + sideEffects: None - admissionReviewVersions: - v1 clientConfig: @@ -333,6 +353,26 @@ webhooks: resources: - solutioncontainers sideEffects: None +- admissionReviewVersions: + - v1 + clientConfig: + service: + name: webhook-service + namespace: system + path: /validate-workflow-symphony-v1-activation + failurePolicy: Fail + name: mactivation.kb.io + rules: + - apiGroups: + - workflow.symphony + apiVersions: + - v1 + operations: + - CREATE + - UPDATE + resources: + - activations + sideEffects: None - admissionReviewVersions: - v1 clientConfig: diff --git a/k8s/main.go b/k8s/main.go index bfc615a05..206d4253d 100644 --- a/k8s/main.go +++ b/k8s/main.go @@ -294,6 +294,10 @@ func main() { setupLog.Error(err, "unable to create webhook", "webhook", "Skill") os.Exit(1) } + if err = (&workflowv1.Activation{}).SetupWebhookWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create webhook", "webhook", "Skill") + os.Exit(1) + } if err = (&federationv1.Catalog{}).SetupWebhookWithManager(mgr); err != nil { setupLog.Error(err, "unable to create webhook", "webhook", "Catalog") os.Exit(1) diff --git a/packages/helm/symphony/templates/symphony-core/symphonyk8s.yaml b/packages/helm/symphony/templates/symphony-core/symphonyk8s.yaml index cb85fcc57..42004edd4 100644 --- a/packages/helm/symphony/templates/symphony-core/symphonyk8s.yaml +++ b/packages/helm/symphony/templates/symphony-core/symphonyk8s.yaml @@ -2765,6 +2765,26 @@ webhooks: resources: - solutioncontainers sideEffects: None +- admissionReviewVersions: + - v1 + clientConfig: + service: + name: '{{ include "symphony.fullname" . }}-webhook-service' + namespace: '{{ .Release.Namespace }}' + path: /mutate-workflow-symphony-v1-activation + failurePolicy: Fail + name: mactivation.kb.io + rules: + - apiGroups: + - workflow.symphony + apiVersions: + - v1 + operations: + - CREATE + - UPDATE + resources: + - activations + sideEffects: None - admissionReviewVersions: - v1 clientConfig: @@ -2977,6 +2997,26 @@ webhooks: resources: - solutioncontainers sideEffects: None +- admissionReviewVersions: + - v1 + clientConfig: + service: + name: '{{ include "symphony.fullname" . }}-webhook-service' + namespace: '{{ .Release.Namespace }}' + path: /validate-workflow-symphony-v1-activation + failurePolicy: Fail + name: mactivation.kb.io + rules: + - apiGroups: + - workflow.symphony + apiVersions: + - v1 + operations: + - CREATE + - UPDATE + resources: + - activations + sideEffects: None - admissionReviewVersions: - v1 clientConfig: diff --git a/test/integration/scenarios/04.workflow/magefile.go b/test/integration/scenarios/04.workflow/magefile.go index e5f9925a7..0b38e0519 100644 --- a/test/integration/scenarios/04.workflow/magefile.go +++ b/test/integration/scenarios/04.workflow/magefile.go @@ -59,6 +59,10 @@ var ( testVerify = []string{ "./verify/...", } + + CampaignNotExistActivation = "test/integration/scenarios/04.workflow/manifest/activation-campaignnotexist.yaml" + + WithStageActivation = "test/integration/scenarios/04.workflow/manifest/activation-stage.yaml" ) // Entry point for running the tests @@ -80,6 +84,10 @@ func Test() error { if err != nil { return err } + err = FaultTest(namespace) + if err != nil { + return err + } } return nil @@ -181,6 +189,25 @@ func Verify() error { return nil } +func FaultTest(namespace string) error { + repoPath := os.Getenv("REPO_PATH") + if repoPath == "" { + repoPath = "../../../../" + } + var err error + CampaignNotExistActivationAbs := filepath.Join(repoPath, CampaignNotExistActivation) + err = shellcmd.Command(fmt.Sprintf("kubectl apply -f %s -n %s", CampaignNotExistActivationAbs, namespace)).Run() + if err == nil { + return fmt.Errorf("fault test failed for non-existing campaign") + } + WithStageActivationAbs := filepath.Join(repoPath, WithStageActivation) + err = shellcmd.Command(fmt.Sprintf("kubectl apply -f %s -n %s", WithStageActivationAbs, namespace)).Run() + if err == nil { + return fmt.Errorf("fault test failed for non-existing campaign") + } + return nil +} + // Clean up func Cleanup() { localenvCmd(fmt.Sprintf("dumpSymphonyLogsForTest '%s'", TEST_NAME), "") diff --git a/test/integration/scenarios/04.workflow/manifest/activation-campaignnotexist.yaml b/test/integration/scenarios/04.workflow/manifest/activation-campaignnotexist.yaml new file mode 100644 index 000000000..c0219095b --- /dev/null +++ b/test/integration/scenarios/04.workflow/manifest/activation-campaignnotexist.yaml @@ -0,0 +1,7 @@ +apiVersion: workflow.symphony/v1 +kind: Activation +metadata: + name: 04workflow2 +spec: + campaign: "04campaign:v2" + \ No newline at end of file diff --git a/test/integration/scenarios/04.workflow/manifest/activation-stage.yaml b/test/integration/scenarios/04.workflow/manifest/activation-stage.yaml new file mode 100644 index 000000000..7afe89c36 --- /dev/null +++ b/test/integration/scenarios/04.workflow/manifest/activation-stage.yaml @@ -0,0 +1,8 @@ +apiVersion: workflow.symphony/v1 +kind: Activation +metadata: + name: 04workflow +spec: + campaign: "04campaign:v1" + stage: "deploy" + \ No newline at end of file