Skip to content

Commit

Permalink
Deduplicate mutator controller logic (#1474)
Browse files Browse the repository at this point in the history
Fixes #1449

Signed-off-by: Max Smythe <smythe@google.com>
  • Loading branch information
maxsmythe committed Aug 9, 2021
1 parent f33db10 commit 407611a
Show file tree
Hide file tree
Showing 7 changed files with 352 additions and 787 deletions.
260 changes: 16 additions & 244 deletions pkg/controller/mutators/assign/assign_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,19 @@ limitations under the License.
package assign

import (
"context"
"fmt"
"time"

opa "github.com/open-policy-agent/frameworks/constraint/pkg/client"
mutationsv1alpha1 "github.com/open-policy-agent/gatekeeper/apis/mutations/v1alpha1"
statusv1beta1 "github.com/open-policy-agent/gatekeeper/apis/status/v1beta1"
ctrlmutators "github.com/open-policy-agent/gatekeeper/pkg/controller/mutators"
"github.com/open-policy-agent/gatekeeper/pkg/controller/mutatorstatus"
"github.com/open-policy-agent/gatekeeper/pkg/logging"
"github.com/open-policy-agent/gatekeeper/pkg/controller/mutators/core"
"github.com/open-policy-agent/gatekeeper/pkg/mutation"
"github.com/open-policy-agent/gatekeeper/pkg/mutation/mutators"
"github.com/open-policy-agent/gatekeeper/pkg/mutation/types"
"github.com/open-policy-agent/gatekeeper/pkg/readiness"
"github.com/open-policy-agent/gatekeeper/pkg/util"
"github.com/open-policy-agent/gatekeeper/pkg/watch"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
apiTypes "k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

var log = logf.Log.WithName("controller").WithValues(logging.Process, "assign_controller")

var gvkAssign = schema.GroupVersionKind{
Group: mutationsv1alpha1.GroupVersion.Group,
Version: mutationsv1alpha1.GroupVersion.Version,
Kind: "Assign",
}

type Adder struct {
MutationSystem *mutation.System
Tracker *readiness.Tracker
Expand All @@ -65,8 +39,21 @@ type Adder struct {
// Add creates a new Assign Controller and adds it to the Manager. The Manager will set fields on the Controller
// and Start it when the Manager is Started.
func (a *Adder) Add(mgr manager.Manager) error {
r := newReconciler(mgr, a.MutationSystem, a.Tracker, a.GetPod)
return add(mgr, r)
adder := core.Adder{
Tracker: a.Tracker,
GetPod: a.GetPod,
MutationSystem: a.MutationSystem,
Kind: "Assign",
NewMutationObj: func() client.Object { return &mutationsv1alpha1.Assign{} },
MutatorFor: func(obj client.Object) (types.Mutator, error) {
// The type is provided by the `NewObj` function above. If we
// are fed the wrong type, this is a non-recoverable error and we
// may as well crash for visibility
assign := obj.(*mutationsv1alpha1.Assign) // nolint:forcetypeassert
return mutators.MutatorForAssign(assign)
},
}
return adder.Add(mgr)
}

func (a *Adder) InjectOpa(o *opa.Client) {}
Expand All @@ -86,218 +73,3 @@ func (a *Adder) InjectGetPod(getPod func() (*corev1.Pod, error)) {
func (a *Adder) InjectMutationSystem(mutationSystem *mutation.System) {
a.MutationSystem = mutationSystem
}

// newReconciler returns a new reconcile.Reconciler.
func newReconciler(mgr manager.Manager, mutationSystem *mutation.System, tracker *readiness.Tracker, getPod func() (*corev1.Pod, error)) *Reconciler {
r := &Reconciler{
system: mutationSystem,
Client: mgr.GetClient(),
tracker: tracker,
getPod: getPod,
scheme: mgr.GetScheme(),
reporter: ctrlmutators.NewStatsReporter(),
cache: ctrlmutators.NewMutationCache(),
}
if getPod == nil {
r.getPod = r.defaultGetPod
}
return r
}

// add adds a new Controller to mgr with r as the reconcile.Reconciler.
func add(mgr manager.Manager, r reconcile.Reconciler) error {
if !*mutation.MutationEnabled {
return nil
}

// Create a new controller
c, err := controller.New("assign-controller", mgr, controller.Options{Reconciler: r})
if err != nil {
return err
}

// Watch for changes to Assign
if err = c.Watch(
&source.Kind{Type: &mutationsv1alpha1.Assign{}},
&handler.EnqueueRequestForObject{}); err != nil {
return err
}

// Watch for changes to MutatorPodStatus
err = c.Watch(
&source.Kind{Type: &statusv1beta1.MutatorPodStatus{}},
handler.EnqueueRequestsFromMapFunc(mutatorstatus.PodStatusToMutatorMapper(true, "Assign", util.EventPackerMapFunc())),
)
if err != nil {
return err
}
return nil
}

// Reconciler reconciles a Assign object.
type Reconciler struct {
client.Client
system *mutation.System
tracker *readiness.Tracker
getPod func() (*corev1.Pod, error)
scheme *runtime.Scheme
reporter ctrlmutators.StatsReporter
cache *ctrlmutators.Cache
}

// +kubebuilder:rbac:groups=mutations.gatekeeper.sh,resources=*,verbs=get;list;watch;create;update;patch;delete

// Reconcile reads that state of the cluster for a Assign object and makes changes based on the state read
// and what is in the Assign.Spec.
// TODO (https://github.com/open-policy-agent/gatekeeper/issues/1449): DRY this and assignmetadata_controller.go .
func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
log.Info("Reconcile", "request", request)
startTime := time.Now()

deleted := false
assign := &mutationsv1alpha1.Assign{}
err := r.Get(ctx, request.NamespacedName, assign)
if err != nil {
if !errors.IsNotFound(err) {
return reconcile.Result{}, err
}

deleted = true
assign = &mutationsv1alpha1.Assign{
ObjectMeta: metav1.ObjectMeta{
Name: request.NamespacedName.Name,
Namespace: request.NamespacedName.Namespace,
},
TypeMeta: metav1.TypeMeta{
Kind: "Assign",
APIVersion: fmt.Sprintf("%s/%s", mutationsv1alpha1.GroupVersion.Group, mutationsv1alpha1.GroupVersion.Version),
},
}
}
deleted = deleted || !assign.GetDeletionTimestamp().IsZero()
tracker := r.tracker.For(gvkAssign)

mID := types.MakeID(assign)

if deleted {
tracker.CancelExpect(assign)
r.cache.Remove(mID)

if err := r.system.Remove(mID); err != nil {
log.Error(err, "Remove failed", "resource", request.NamespacedName)
return reconcile.Result{}, err
}

sName, err := statusv1beta1.KeyForMutatorID(util.GetPodName(), mID)
if err != nil {
return reconcile.Result{}, err
}
status := &statusv1beta1.MutatorPodStatus{}
status.SetName(sName)
status.SetNamespace(util.GetNamespace())
if err := r.Delete(ctx, status); err != nil {
if !errors.IsNotFound(err) {
return reconcile.Result{}, err
}
}

return reconcile.Result{}, nil
}

ingestionStatus := ctrlmutators.MutatorStatusError
// encasing this call in a function prevents the arguments from being evaluated early
defer func() { r.reportMutator(mID, ingestionStatus, startTime) }()

status, err := r.getOrCreatePodStatus(mID)
if err != nil {
log.Info("could not get/create pod status object", "error", err)
return reconcile.Result{}, err
}
status.Status.MutatorUID = assign.GetUID()
status.Status.ObservedGeneration = assign.GetGeneration()
status.Status.Errors = nil

mutator, err := mutators.MutatorForAssign(assign)
if err != nil {
log.Error(err, "Creating mutator for resource failed", "resource", request.NamespacedName)
tracker.TryCancelExpect(assign)
status.Status.Errors = append(status.Status.Errors, statusv1beta1.MutatorError{Message: err.Error()})
if err2 := r.Update(ctx, status); err != nil {
log.Error(err2, "could not update mutator status")
}
return reconcile.Result{}, err
}

if err := r.system.Upsert(mutator); err != nil {
log.Error(err, "Insert failed", "resource", request.NamespacedName)
tracker.TryCancelExpect(assign)
status.Status.Errors = append(status.Status.Errors, statusv1beta1.MutatorError{Message: err.Error()})
if err2 := r.Update(ctx, status); err != nil {
log.Error(err2, "could not update mutator status")
}
return reconcile.Result{}, err
}

tracker.Observe(assign)
status.Status.Enforced = true

if err := r.Update(ctx, status); err != nil {
log.Error(err, "could not update mutator status")
return reconcile.Result{}, err
}

ingestionStatus = ctrlmutators.MutatorStatusActive
return reconcile.Result{}, nil
}

func (r *Reconciler) getOrCreatePodStatus(mutatorID types.ID) (*statusv1beta1.MutatorPodStatus, error) {
statusObj := &statusv1beta1.MutatorPodStatus{}
sName, err := statusv1beta1.KeyForMutatorID(util.GetPodName(), mutatorID)
if err != nil {
return nil, err
}
key := apiTypes.NamespacedName{Name: sName, Namespace: util.GetNamespace()}
if err := r.Get(context.TODO(), key, statusObj); err != nil {
if !errors.IsNotFound(err) {
return nil, err
}
} else {
return statusObj, nil
}
pod, err := r.getPod()
if err != nil {
return nil, err
}
statusObj, err = statusv1beta1.NewMutatorStatusForPod(pod, mutatorID, r.scheme)
if err != nil {
return nil, err
}
if err := r.Create(context.TODO(), statusObj); err != nil {
return nil, err
}
return statusObj, nil
}

func (r *Reconciler) defaultGetPod() (*corev1.Pod, error) {
// require injection of GetPod in order to control what client we use to
// guarantee we don't inadvertently create a watch
panic("GetPod must be injected")
}

func (r *Reconciler) reportMutator(mID types.ID, ingestionStatus ctrlmutators.MutatorIngestionStatus, startTime time.Time) {
r.cache.Upsert(mID, ingestionStatus)

if r.reporter == nil {
return
}

if err := r.reporter.ReportMutatorIngestionRequest(ingestionStatus, time.Since(startTime)); err != nil {
log.Error(err, "failed to report mutator ingestion request")
}

for status, count := range r.cache.Tally() {
if err := r.reporter.ReportMutatorsStatus(status, count); err != nil {
log.Error(err, "failed to report mutator status request")
}
}
}
Loading

0 comments on commit 407611a

Please sign in to comment.