Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add webhook for activation in case campaign doesn't exist #302

Merged
merged 10 commits into from
Jun 17, 2024
14 changes: 13 additions & 1 deletion api/pkg/apis/v1alpha1/providers/states/k8s/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ func (s *K8sStateProvider) Upsert(ctx context.Context, entry states.UpsertReques
}

j, _ := json.Marshal(entry.Value.Body)
item, err := s.DynamicClient.Resource(resourceId).Namespace(namespace).Get(ctx, entry.Value.ID, metav1.GetOptions{})
var item *unstructured.Unstructured
item, err = s.DynamicClient.Resource(resourceId).Namespace(namespace).Get(ctx, entry.Value.ID, metav1.GetOptions{})
if err != nil {
template := fmt.Sprintf(`{"apiVersion":"%s/v1", "kind": "%s", "metadata": {}}`, group, kind)
var unc *unstructured.Unstructured
Expand Down Expand Up @@ -252,6 +253,7 @@ func (s *K8sStateProvider) Upsert(ctx context.Context, entry states.UpsertReques
item.SetLabels(metadata.Labels)
item.SetAnnotations(metadata.Annotations)
}
getResourceVersion := false
if v, ok := dict["spec"]; ok {
item.Object["spec"] = v

Expand All @@ -260,8 +262,18 @@ func (s *K8sStateProvider) Upsert(ctx context.Context, entry states.UpsertReques
sLog.Errorf(" P (K8s State): failed to update object: %v", err)
return "", err
}
getResourceVersion = true
}
if v, ok := dict["status"]; ok {
if getResourceVersion {
// Get latest resource version in case the the object spec is also updated
RemindD marked this conversation as resolved.
Show resolved Hide resolved
item, err = s.DynamicClient.Resource(resourceId).Namespace(namespace).Get(ctx, entry.Value.ID, metav1.GetOptions{})
if err != nil {
sLog.Errorf(" P (K8s State): failed to get object when trying to update status: %v", err)
return "", err
}
}

if vMap, ok := v.(map[string]interface{}); ok {
status := &unstructured.Unstructured{
Object: map[string]interface{}{
Expand Down
51 changes: 35 additions & 16 deletions api/pkg/apis/v1alpha1/vendors/stage-vendor.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,29 @@ func (s *StageVendor) Init(config vendors.VendorConfig, factories []managers.IMa
return v1alpha2.NewCOAError(nil, "event body is not an activation job", v1alpha2.BadRequest)
}
campaignName := api_utils.ReplaceSeperator(actData.Campaign)

campaign, err := s.CampaignsManager.GetState(context.TODO(), campaignName, actData.Namespace)
if err != nil {
log.Error("V (Stage): unable to find campaign: %+v", err)
err = s.reportActivationStatusWithBadRequest(actData.Activation, actData.Namespace, err)
// If report status succeeded, return an empty err so the subscribe function will not be retried
// The actual error will be stored in Activation cr
return err
}
activation, err := s.ActivationsManager.GetState(context.TODO(), actData.Activation, actData.Namespace)
if err != nil {
log.Error("V (Stage): unable to find activation: %+v", err)
err = s.reportActivationStatusWithBadRequest(actData.Activation, actData.Namespace, err)
// If report status succeeded, return an empty err so the subscribe function will not be retried
// The actual error will be stored in Activation cr
return err
}

evt, err := s.StageManager.HandleActivationEvent(context.TODO(), actData, *campaign.Spec, activation)
if err != nil {
err = s.reportActivationStatusWithBadRequest(actData.Activation, actData.Namespace, err)
// If report status succeeded, return an empty err so the subscribe function will not be retried
// The actual error will be stored in Activation cr
return err
}

Expand All @@ -122,30 +132,22 @@ func (s *StageVendor) Init(config vendors.VendorConfig, factories []managers.IMa
err := json.Unmarshal(jData, &triggerData)
if err != nil {
err = v1alpha2.NewCOAError(nil, "event body is not an activation job", v1alpha2.BadRequest)
status.Status = v1alpha2.BadRequest
status.StatusMessage = v1alpha2.BadRequest.String()
status.ErrorMessage = err.Error()
status.IsActive = false
sLog.Errorf("V (Stage): failed to deserialize activation data: %v", err)
err = s.ActivationsManager.ReportStatus(context.TODO(), triggerData.Activation, triggerData.Namespace, status)
if err != nil {
sLog.Errorf("V (Stage): failed to report error status: %v (%v)", status.ErrorMessage, err)
}
err = s.reportActivationStatusWithBadRequest(triggerData.Activation, triggerData.Namespace, err)
// If report status succeeded, return an empty err so the subscribe function will not be retried
RemindD marked this conversation as resolved.
Show resolved Hide resolved
// The actual error will be stored in Activation cr
return err
}
status.Outputs["__namespace"] = triggerData.Namespace

campaignName := api_utils.ReplaceSeperator(triggerData.Campaign)
campaign, err := s.CampaignsManager.GetState(context.TODO(), campaignName, triggerData.Namespace)
if err != nil {
status.Status = v1alpha2.BadRequest
status.StatusMessage = v1alpha2.BadRequest.String()
status.ErrorMessage = err.Error()
status.IsActive = false
sLog.Errorf("V (Stage): failed to get campaign spec: %v", err)
err = s.ActivationsManager.ReportStatus(context.TODO(), triggerData.Activation, triggerData.Namespace, status)
if err != nil {
sLog.Errorf("V (Stage): failed to report error status: %v (%v)", status.ErrorMessage, err)
}
err = s.reportActivationStatusWithBadRequest(triggerData.Activation, triggerData.Namespace, err)
// If report status succeeded, return an empty err so the subscribe function will not be retried
// The actual error will be stored in Activation cr
return err
}
status.Stage = triggerData.Stage
status.ActivationGeneration = triggerData.ActivationGeneration
Expand Down Expand Up @@ -306,3 +308,20 @@ func (s *StageVendor) Init(config vendors.VendorConfig, factories []managers.IMa
})
return nil
}

func (s *StageVendor) reportActivationStatusWithBadRequest(activation string, namespace string, err error) error {
status := model.ActivationStatus{
Stage: "",
NextStage: "",
Outputs: map[string]interface{}{},
Status: v1alpha2.BadRequest,
StatusMessage: v1alpha2.BadRequest.String(),
ErrorMessage: err.Error(),
IsActive: true,
}
err = s.ActivationsManager.ReportStatus(context.TODO(), activation, namespace, status)
if err != nil {
sLog.Errorf("V (Stage): failed to report error status on activtion %s/%s: %v (%v)", namespace, activation, status.ErrorMessage, err)
}
return err
}
177 changes: 177 additions & 0 deletions k8s/apis/workflow/v1/activation_webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT license.
* SPDX-License-Identifier: MIT
*/

package v1

import (
"context"
"fmt"
"gopls-workspace/apis/metrics/v1"
"strings"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/validation/field"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/webhook"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)

// log is for logging in this package.
var activationlog = logf.Log.WithName("activation-resource")
var myActivationClient client.Client
var activationWebhookValidationMetrics *metrics.Metrics

func (r *Activation) SetupWebhookWithManager(mgr ctrl.Manager) error {
myActivationClient = mgr.GetClient()
mgr.GetFieldIndexer().IndexField(context.Background(), &Activation{}, ".metadata.name", func(rawObj client.Object) []string {
activation := rawObj.(*Activation)
return []string{activation.Name}
})

// initialize the controller operation metrics
if activationWebhookValidationMetrics == nil {
metrics, err := metrics.New()
if err != nil {
return err
}
activationWebhookValidationMetrics = metrics
}

return ctrl.NewWebhookManagedBy(mgr).
For(r).
Complete()
}

// TODO(user): EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!

//+kubebuilder:webhook:path=/mutate-workflow-symphony-v1-activation,mutating=true,failurePolicy=fail,sideEffects=None,groups=workflow.symphony,resources=activations,verbs=create;update,versions=v1,name=mactivation.kb.io,admissionReviewVersions=v1

var _ webhook.Defaulter = &Activation{}

// Default implements webhook.Defaulter so a webhook will be registered for the type
func (r *Activation) Default() {
activationlog.Info("default", "name", r.Name)
}

// TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation.

//+kubebuilder:webhook:path=/validate-workflow-symphony-v1-activation,mutating=false,failurePolicy=fail,sideEffects=None,groups=workflow.symphony,resources=activations,verbs=create;update,versions=v1,name=mactivation.kb.io,admissionReviewVersions=v1

var _ webhook.Validator = &Activation{}

// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
func (r *Activation) ValidateCreate() (admission.Warnings, error) {
RemindD marked this conversation as resolved.
Show resolved Hide resolved
activationlog.Info("validate create", "name", r.Name)

validateCreateTime := time.Now()
validationError := r.validateCreateActivation()
if validationError != nil {
activationWebhookValidationMetrics.ControllerValidationLatency(
validateCreateTime,
metrics.CreateOperationType,
metrics.InvalidResource,
metrics.CatalogResourceType)
} else {
activationWebhookValidationMetrics.ControllerValidationLatency(
validateCreateTime,
metrics.CreateOperationType,
metrics.ValidResource,
metrics.CatalogResourceType)
}

return nil, validationError
}

// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type
func (r *Activation) ValidateUpdate(old runtime.Object) (admission.Warnings, error) {
activationlog.Info("validate update", "name", r.Name)
validateUpdateTime := time.Now()
oldActivation, ok := old.(*Activation)
if !ok {
return nil, fmt.Errorf("expected an Activation object")
}
// Compare the Spec of the current and old Activation objects
validationError := r.validateSpecOnUpdate(oldActivation)
if validationError != nil {
activationWebhookValidationMetrics.ControllerValidationLatency(
validateUpdateTime,
metrics.UpdateOperationType,
metrics.InvalidResource,
metrics.InstanceResourceType,
)
} else {
activationWebhookValidationMetrics.ControllerValidationLatency(
validateUpdateTime,
metrics.UpdateOperationType,
metrics.ValidResource,
metrics.InstanceResourceType,
)
}
return nil, validationError
}

// ValidateDelete implements webhook.Validator so a webhook will be registered for the type
func (r *Activation) ValidateDelete() (admission.Warnings, error) {
activationlog.Info("validate delete", "name", r.Name)

return nil, nil
}

func (r *Activation) validateCreateActivation() error {
var allErrs field.ErrorList

if err := r.validateNameOnCreate(); err != nil {
allErrs = append(allErrs, err)
}
if err := r.validateCampaignOnCreate(); err != nil {
allErrs = append(allErrs, err)
}
if len(allErrs) == 0 {
return nil
}

return apierrors.NewInvalid(schema.GroupKind{Group: "workflow.symphony", Kind: "Activation"}, r.Name, allErrs)
}

func (r *Activation) validateNameOnCreate() *field.Error {
if strings.Contains(r.ObjectMeta.Name, "-") {
return field.Invalid(field.NewPath("metadata").Child("name"), r.ObjectMeta.Name, "name must not contain '-'")
}
return nil
}
func (r *Activation) validateCampaignOnCreate() *field.Error {
if r.Spec.Campaign == "" {
return field.Invalid(field.NewPath("spec").Child("campaign"), r.Spec.Campaign, "campaign must not be empty")
}
campaignName := strings.Replace(r.Spec.Campaign, ":", "-", -1)
RemindD marked this conversation as resolved.
Show resolved Hide resolved
var campaign Campaign
err := myActivationClient.Get(context.Background(), client.ObjectKey{Name: campaignName, Namespace: r.Namespace}, &campaign)
if err != nil {
return field.Invalid(field.NewPath("spec").Child("campaign"), r.Spec.Campaign, "campaign doesn't exist")
}
return nil
}

func (r *Activation) validateSpecOnUpdate(oldActivation *Activation) *field.Error {
if r.Spec.Campaign != oldActivation.Spec.Campaign {
return field.Invalid(field.NewPath("spec").Child("campaign"), r.Spec.Campaign, "updates to activation spec.Campaign are not allowed")
}
if r.Spec.Stage != oldActivation.Spec.Stage {
return field.Invalid(field.NewPath("spec").Child("stage"), r.Spec.Stage, "updates to activation spec.Stage are not allowed")
}
if r.Spec.Inputs.String() != oldActivation.Spec.Inputs.String() {
return field.Invalid(field.NewPath("spec").Child("inputs"), r.Spec.Inputs, "updates to activation spec.Inputs are not allowed")
}
if r.Spec.Generation != oldActivation.Spec.Generation {
return field.Invalid(field.NewPath("spec").Child("generation"), r.Spec.Generation, "updates to activation spec.Generation are not allowed")
}
return nil
}
40 changes: 40 additions & 0 deletions k8s/config/oss/webhook/manifests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,26 @@ webhooks:
resources:
- solutioncontainers
sideEffects: None
- admissionReviewVersions:
- v1
clientConfig:
service:
name: webhook-service
namespace: system
path: /mutate-workflow-symphony-v1-activation
failurePolicy: Fail
name: mactivation.kb.io
rules:
- apiGroups:
- workflow.symphony
apiVersions:
- v1
operations:
- CREATE
- UPDATE
resources:
- activations
sideEffects: None
- admissionReviewVersions:
- v1
clientConfig:
Expand Down Expand Up @@ -333,6 +353,26 @@ webhooks:
resources:
- solutioncontainers
sideEffects: None
- admissionReviewVersions:
- v1
clientConfig:
service:
name: webhook-service
namespace: system
path: /validate-workflow-symphony-v1-activation
failurePolicy: Fail
name: mactivation.kb.io
rules:
- apiGroups:
- workflow.symphony
apiVersions:
- v1
operations:
- CREATE
- UPDATE
resources:
- activations
sideEffects: None
- admissionReviewVersions:
- v1
clientConfig:
Expand Down
4 changes: 4 additions & 0 deletions k8s/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ func main() {
setupLog.Error(err, "unable to create webhook", "webhook", "Skill")
os.Exit(1)
}
if err = (&workflowv1.Activation{}).SetupWebhookWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create webhook", "webhook", "Skill")
os.Exit(1)
}
if err = (&federationv1.Catalog{}).SetupWebhookWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create webhook", "webhook", "Catalog")
os.Exit(1)
Expand Down
Loading
Loading