From 63bee9a641d5ff3671338028c5d793ba7652be12 Mon Sep 17 00:00:00 2001 From: Eguzki Astiz Lezaun Date: Tue, 9 Apr 2024 23:51:05 +0200 Subject: [PATCH] limitador cluster envoy filter leverages DAG --- api/v1beta2/ratelimitpolicy_types.go | 6 -- ...imitador_cluster_envoyfilter_controller.go | 59 +++++++------ .../rate_limiting_wasmplugin_controller.go | 82 +------------------ controllers/suite_test.go | 7 ++ main.go | 9 ++ .../fieldindexers/httproute_parents.go | 44 ++++++++++ pkg/rlptools/topology_index.go | 62 ++++++++++++++ 7 files changed, 159 insertions(+), 110 deletions(-) create mode 100644 pkg/library/fieldindexers/httproute_parents.go create mode 100644 pkg/rlptools/topology_index.go diff --git a/api/v1beta2/ratelimitpolicy_types.go b/api/v1beta2/ratelimitpolicy_types.go index c7a2910c0..fb7c2f7a2 100644 --- a/api/v1beta2/ratelimitpolicy_types.go +++ b/api/v1beta2/ratelimitpolicy_types.go @@ -55,7 +55,6 @@ const ( ExcludeOperator WhenConditionOperator = "excl" MatchesOperator WhenConditionOperator = "matches" - RateLimitPolicyBackReferenceAnnotationName = "kuadrant.io/ratelimitpolicies" RateLimitPolicyDirectReferenceAnnotationName = "kuadrant.io/ratelimitpolicy" ) @@ -170,7 +169,6 @@ func (s *RateLimitPolicyStatus) Equals(other *RateLimitPolicyStatus, logger logr } var _ kuadrant.Policy = &RateLimitPolicy{} -var _ kuadrant.Referrer = &RateLimitPolicy{} // +kubebuilder:object:root=true // +kubebuilder:subresource:status @@ -256,10 +254,6 @@ func (r *RateLimitPolicy) Kind() string { return r.TypeMeta.Kind } -func (r *RateLimitPolicy) BackReferenceAnnotationName() string { - return RateLimitPolicyBackReferenceAnnotationName -} - func (r *RateLimitPolicy) DirectReferenceAnnotationName() string { return RateLimitPolicyDirectReferenceAnnotationName } diff --git a/controllers/limitador_cluster_envoyfilter_controller.go b/controllers/limitador_cluster_envoyfilter_controller.go index 218e7da1b..858402ccf 100644 --- a/controllers/limitador_cluster_envoyfilter_controller.go +++ b/controllers/limitador_cluster_envoyfilter_controller.go @@ -29,17 +29,18 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/handler" gatewayapiv1 "sigs.k8s.io/gateway-api/apis/v1" - "github.com/kuadrant/kuadrant-operator/api/v1beta2" + kuadrantv1beta2 "github.com/kuadrant/kuadrant-operator/api/v1beta2" "github.com/kuadrant/kuadrant-operator/pkg/common" kuadrantistioutils "github.com/kuadrant/kuadrant-operator/pkg/istio" "github.com/kuadrant/kuadrant-operator/pkg/library/kuadrant" + "github.com/kuadrant/kuadrant-operator/pkg/library/mappers" "github.com/kuadrant/kuadrant-operator/pkg/library/reconcilers" "github.com/kuadrant/kuadrant-operator/pkg/library/utils" + "github.com/kuadrant/kuadrant-operator/pkg/rlptools" ) // LimitadorClusterEnvoyFilterReconciler reconciles a EnvoyFilter object with limitador's cluster @@ -76,28 +77,22 @@ func (r *LimitadorClusterEnvoyFilterReconciler) Reconcile(eventCtx context.Conte logger.V(1).Info(string(jsonData)) } - err := r.reconcileRateLimitingClusterEnvoyFilter(ctx, gw) - + desired, err := r.desiredRateLimitingClusterEnvoyFilter(ctx, gw) if err != nil { return ctrl.Result{}, err } - logger.Info("EnvoyFilter reconciled successfully") - return ctrl.Result{}, nil -} - -func (r *LimitadorClusterEnvoyFilterReconciler) reconcileRateLimitingClusterEnvoyFilter(ctx context.Context, gw *gatewayapiv1.Gateway) error { - desired, err := r.desiredRateLimitingClusterEnvoyFilter(ctx, gw) + err = r.ReconcileResource(ctx, &istioclientnetworkingv1alpha3.EnvoyFilter{}, desired, kuadrantistioutils.AlwaysUpdateEnvoyFilter) if err != nil { - return err + return ctrl.Result{}, err } - err = r.ReconcileResource(ctx, &istioclientnetworkingv1alpha3.EnvoyFilter{}, desired, kuadrantistioutils.AlwaysUpdateEnvoyFilter) if err != nil { - return err + return ctrl.Result{}, err } - return nil + logger.Info("EnvoyFilter reconciled successfully") + return ctrl.Result{}, nil } func (r *LimitadorClusterEnvoyFilterReconciler) desiredRateLimitingClusterEnvoyFilter(ctx context.Context, gw *gatewayapiv1.Gateway) (*istioclientnetworkingv1alpha3.EnvoyFilter, error) { @@ -123,11 +118,15 @@ func (r *LimitadorClusterEnvoyFilterReconciler) desiredRateLimitingClusterEnvoyF }, } - gateway := kuadrant.GatewayWrapper{Gateway: gw, Referrer: &v1beta2.RateLimitPolicy{}} - rlpRefs := gateway.PolicyRefs() - logger.V(1).Info("desiredRateLimitingClusterEnvoyFilter", "rlpRefs", rlpRefs) + t, err := rlptools.TopologyIndexesFromGateway(ctx, r.Client(), gw) + if err != nil { + return nil, err + } + rateLimitPolicies := t.PoliciesFromGateway(gw) + + logger.V(1).Info("desiredRateLimitingClusterEnvoyFilter", "#RLPS", len(rateLimitPolicies)) - if len(rlpRefs) < 1 { + if len(rateLimitPolicies) < 1 { utils.TagObjectToDelete(ef) return ef, nil } @@ -165,11 +164,25 @@ func (r *LimitadorClusterEnvoyFilterReconciler) desiredRateLimitingClusterEnvoyF // SetupWithManager sets up the controller with the Manager. func (r *LimitadorClusterEnvoyFilterReconciler) SetupWithManager(mgr ctrl.Manager) error { + httpRouteToParentGatewaysEventMapper := mappers.NewHTTPRouteToParentGatewaysEventMapper( + mappers.WithLogger(r.Logger().WithName("httpRouteToParentGatewaysEventMapper")), + ) + + rlpToParentGatewaysEventMapper := mappers.NewPolicyToParentGatewaysEventMapper( + mappers.WithLogger(r.Logger().WithName("ratelimitpolicyToParentGatewaysEventMapper")), + mappers.WithClient(r.Client()), + ) + return ctrl.NewControllerManagedBy(mgr). - // Limitador cluster EnvoyFilter controller only cares about - // the annotation having references to RLP's - // kuadrant.io/ratelimitpolicies - For(&gatewayapiv1.Gateway{}, builder.WithPredicates(predicate.AnnotationChangedPredicate{})). + For(&gatewayapiv1.Gateway{}). Owns(&istioclientnetworkingv1alpha3.EnvoyFilter{}). + Watches( + &gatewayapiv1.HTTPRoute{}, + handler.EnqueueRequestsFromMapFunc(httpRouteToParentGatewaysEventMapper.Map), + ). + Watches( + &kuadrantv1beta2.RateLimitPolicy{}, + handler.EnqueueRequestsFromMapFunc(rlpToParentGatewaysEventMapper.Map), + ). Complete(r) } diff --git a/controllers/rate_limiting_wasmplugin_controller.go b/controllers/rate_limiting_wasmplugin_controller.go index 2e6b2c7be..a9427845e 100644 --- a/controllers/rate_limiting_wasmplugin_controller.go +++ b/controllers/rate_limiting_wasmplugin_controller.go @@ -19,7 +19,6 @@ package controllers import ( "context" "encoding/json" - "fmt" "sort" "github.com/go-logr/logr" @@ -27,7 +26,6 @@ import ( istioclientgoextensionv1alpha1 "istio.io/client-go/pkg/apis/extensions/v1alpha1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -44,10 +42,6 @@ import ( "github.com/kuadrant/kuadrant-operator/pkg/rlptools/wasm" ) -const ( - HTTPRouteGatewayParentField = ".metadata.parentRefs.gateway" -) - // RateLimitingWASMPluginReconciler reconciles a WASMPlugin object for rate limiting type RateLimitingWASMPluginReconciler struct { *reconcilers.BaseReconciler @@ -160,7 +154,7 @@ func (r *RateLimitingWASMPluginReconciler) wasmPluginConfig(ctx context.Context, RateLimitPolicies: make([]wasm.RateLimitPolicy, 0), } - t, err := r.topologyIndexesFromGateway(ctx, gw) + t, err := rlptools.TopologyIndexesFromGateway(ctx, r.Client(), gw) if err != nil { return nil, err } @@ -190,48 +184,6 @@ func (r *RateLimitingWASMPluginReconciler) wasmPluginConfig(ctx context.Context, return wasmPlugin, nil } -func (r *RateLimitingWASMPluginReconciler) topologyIndexesFromGateway(ctx context.Context, gw *gatewayapiv1.Gateway) (*kuadrantgatewayapi.TopologyIndexes, error) { - logger, err := logr.FromContext(ctx) - if err != nil { - return nil, err - } - - routeList := &gatewayapiv1.HTTPRouteList{} - // Get all the routes having the gateway as parent - err = r.Client().List(ctx, routeList, client.MatchingFields{HTTPRouteGatewayParentField: client.ObjectKeyFromObject(gw).String()}) - logger.V(1).Info("topologyIndexesFromGateway: list httproutes from gateway", - "gateway", client.ObjectKeyFromObject(gw), - "#HTTPRoutes", len(routeList.Items), - "err", err) - if err != nil { - return nil, err - } - - rlpList := &kuadrantv1beta2.RateLimitPolicyList{} - // Get all the rate limit policies - err = r.Client().List(ctx, rlpList) - logger.V(1).Info("topologyIndexesFromGateway: list rate limit policies", - "#RLPS", len(rlpList.Items), - "err", err) - if err != nil { - return nil, err - } - - policies := utils.Map(rlpList.Items, func(p kuadrantv1beta2.RateLimitPolicy) kuadrantgatewayapi.Policy { return &p }) - - t, err := kuadrantgatewayapi.NewTopology( - kuadrantgatewayapi.WithGateways([]*gatewayapiv1.Gateway{gw}), - kuadrantgatewayapi.WithRoutes(utils.Map(routeList.Items, ptr.To)), - kuadrantgatewayapi.WithPolicies(policies), - kuadrantgatewayapi.WithLogger(logger), - ) - if err != nil { - return nil, err - } - - return kuadrantgatewayapi.NewTopologyIndexes(t), nil -} - func (r *RateLimitingWASMPluginReconciler) wasmRateLimitPolicy(ctx context.Context, t *kuadrantgatewayapi.TopologyIndexes, rlp *kuadrantv1beta2.RateLimitPolicy, gw *gatewayapiv1.Gateway) (*wasm.RateLimitPolicy, error) { route, err := r.routeFromRLP(ctx, t, rlp, gw) if err != nil { @@ -320,40 +272,8 @@ func (r *RateLimitingWASMPluginReconciler) routeFromRLP(ctx context.Context, t * return route, nil } -// addHTTPRouteByGatewayIndexer declares an index key that we can later use with the client as a pseudo-field name, -// allowing to query all the routes parented by a given gateway -// to prevent creating the same index field multiple times, the function is declared private to be -// called only by this controller -func addHTTPRouteByGatewayIndexer(mgr ctrl.Manager, baseLogger logr.Logger) error { - if err := mgr.GetFieldIndexer().IndexField(context.Background(), &gatewayapiv1.HTTPRoute{}, HTTPRouteGatewayParentField, func(rawObj client.Object) []string { - // grab the route object, extract the parents - route, assertionOk := rawObj.(*gatewayapiv1.HTTPRoute) - if !assertionOk { - baseLogger.V(1).Error(fmt.Errorf("%T is not a *gatewayapiv1.HTTPRoute", rawObj), "cannot map") - return nil - } - - logger := baseLogger.WithValues("route", client.ObjectKeyFromObject(route).String()) - - return utils.Map(kuadrantgatewayapi.GetRouteAcceptedGatewayParentKeys(route), func(key client.ObjectKey) string { - logger.V(1).Info("new gateway added", "key", key.String()) - return key.String() - }) - }); err != nil { - return err - } - - return nil -} - // SetupWithManager sets up the controller with the Manager. func (r *RateLimitingWASMPluginReconciler) SetupWithManager(mgr ctrl.Manager) error { - // Add custom indexer - err := addHTTPRouteByGatewayIndexer(mgr, r.Logger().WithName("routeByGatewayIndexer")) - if err != nil { - return err - } - httpRouteToParentGatewaysEventMapper := mappers.NewHTTPRouteToParentGatewaysEventMapper( mappers.WithLogger(r.Logger().WithName("httpRouteToParentGatewaysEventMapper")), ) diff --git a/controllers/suite_test.go b/controllers/suite_test.go index 2c1396818..d627ebc94 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -48,6 +48,7 @@ import ( kuadrantv1alpha1 "github.com/kuadrant/kuadrant-operator/api/v1alpha1" kuadrantv1beta1 "github.com/kuadrant/kuadrant-operator/api/v1beta1" kuadrantv1beta2 "github.com/kuadrant/kuadrant-operator/api/v1beta2" + "github.com/kuadrant/kuadrant-operator/pkg/library/fieldindexers" "github.com/kuadrant/kuadrant-operator/pkg/library/kuadrant" "github.com/kuadrant/kuadrant-operator/pkg/library/reconcilers" "github.com/kuadrant/kuadrant-operator/pkg/log" @@ -138,6 +139,12 @@ var _ = BeforeSuite(func() { }) Expect(err).ToNot(HaveOccurred()) + err = fieldindexers.HTTPRouteIndexByGateway( + mgr, + log.Log.WithName("kuadrant").WithName("indexer").WithName("routeIndexByGateway"), + ) + Expect(err).ToNot(HaveOccurred()) + authPolicyBaseReconciler := reconcilers.NewBaseReconciler( mgr.GetClient(), mgr.GetScheme(), mgr.GetAPIReader(), log.Log.WithName("authpolicy"), diff --git a/main.go b/main.go index 57c42031e..460b9dbc9 100644 --- a/main.go +++ b/main.go @@ -50,6 +50,7 @@ import ( kuadrantv1beta1 "github.com/kuadrant/kuadrant-operator/api/v1beta1" kuadrantv1beta2 "github.com/kuadrant/kuadrant-operator/api/v1beta2" "github.com/kuadrant/kuadrant-operator/controllers" + "github.com/kuadrant/kuadrant-operator/pkg/library/fieldindexers" "github.com/kuadrant/kuadrant-operator/pkg/library/kuadrant" "github.com/kuadrant/kuadrant-operator/pkg/library/reconcilers" "github.com/kuadrant/kuadrant-operator/pkg/log" @@ -132,6 +133,14 @@ func main() { os.Exit(1) } + if err := fieldindexers.HTTPRouteIndexByGateway( + mgr, + log.Log.WithName("kuadrant").WithName("indexer").WithName("routeIndexByGateway"), + ); err != nil { + setupLog.Error(err, "unable to add indexer") + os.Exit(1) + } + kuadrantBaseReconciler := reconcilers.NewBaseReconciler( mgr.GetClient(), mgr.GetScheme(), mgr.GetAPIReader(), log.Log.WithName("kuadrant"), diff --git a/pkg/library/fieldindexers/httproute_parents.go b/pkg/library/fieldindexers/httproute_parents.go new file mode 100644 index 000000000..2a3557980 --- /dev/null +++ b/pkg/library/fieldindexers/httproute_parents.go @@ -0,0 +1,44 @@ +package fieldindexers + +import ( + "context" + "fmt" + + "github.com/go-logr/logr" + "github.com/kuadrant/kuadrant-operator/pkg/library/utils" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + gatewayapiv1 "sigs.k8s.io/gateway-api/apis/v1" + + kuadrantgatewayapi "github.com/kuadrant/kuadrant-operator/pkg/library/gatewayapi" +) + +const ( + HTTPRouteGatewayParentField = ".metadata.parentRefs.gateway" +) + +// HTTPRouteByGatewayIndexer declares an index key that we can later use with the client as a pseudo-field name, +// allowing to query all the routes parented by a given gateway +// to prevent creating the same index field multiple times, the function is declared private to be +// called only by this controller +func HTTPRouteIndexByGateway(mgr ctrl.Manager, baseLogger logr.Logger) error { + if err := mgr.GetFieldIndexer().IndexField(context.Background(), &gatewayapiv1.HTTPRoute{}, HTTPRouteGatewayParentField, func(rawObj client.Object) []string { + // grab the route object, extract the parents + route, assertionOk := rawObj.(*gatewayapiv1.HTTPRoute) + if !assertionOk { + baseLogger.V(1).Error(fmt.Errorf("%T is not a *gatewayapiv1.HTTPRoute", rawObj), "cannot map") + return nil + } + + logger := baseLogger.WithValues("route", client.ObjectKeyFromObject(route).String()) + + return utils.Map(kuadrantgatewayapi.GetRouteAcceptedGatewayParentKeys(route), func(key client.ObjectKey) string { + logger.V(1).Info("new gateway added", "key", key.String()) + return key.String() + }) + }); err != nil { + return err + } + + return nil +} diff --git a/pkg/rlptools/topology_index.go b/pkg/rlptools/topology_index.go new file mode 100644 index 000000000..62bed78ac --- /dev/null +++ b/pkg/rlptools/topology_index.go @@ -0,0 +1,62 @@ +package rlptools + +import ( + "context" + + "github.com/go-logr/logr" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + gatewayapiv1 "sigs.k8s.io/gateway-api/apis/v1" + + kuadrantv1beta2 "github.com/kuadrant/kuadrant-operator/api/v1beta2" + "github.com/kuadrant/kuadrant-operator/pkg/library/fieldindexers" + kuadrantgatewayapi "github.com/kuadrant/kuadrant-operator/pkg/library/gatewayapi" + "github.com/kuadrant/kuadrant-operator/pkg/library/utils" +) + +func TopologyIndexesFromGateway(ctx context.Context, cl client.Client, gw *gatewayapiv1.Gateway) (*kuadrantgatewayapi.TopologyIndexes, error) { + logger, err := logr.FromContext(ctx) + if err != nil { + return nil, err + } + + routeList := &gatewayapiv1.HTTPRouteList{} + // Get all the routes having the gateway as parent + err = cl.List( + ctx, + routeList, + client.MatchingFields{ + fieldindexers.HTTPRouteGatewayParentField: client.ObjectKeyFromObject(gw).String(), + }) + logger.V(1).Info("topologyIndexesFromGateway: list httproutes from gateway", + "gateway", client.ObjectKeyFromObject(gw), + "#HTTPRoutes", len(routeList.Items), + "err", err) + if err != nil { + return nil, err + } + + rlpList := &kuadrantv1beta2.RateLimitPolicyList{} + // Get all the rate limit policies + err = cl.List(ctx, rlpList) + logger.V(1).Info("topologyIndexesFromGateway: list rate limit policies", + "#RLPS", len(rlpList.Items), + "err", err) + if err != nil { + return nil, err + } + + policies := utils.Map(rlpList.Items, func(p kuadrantv1beta2.RateLimitPolicy) kuadrantgatewayapi.Policy { return &p }) + + t, err := kuadrantgatewayapi.NewTopology( + kuadrantgatewayapi.WithGateways([]*gatewayapiv1.Gateway{gw}), + kuadrantgatewayapi.WithRoutes(utils.Map(routeList.Items, ptr.To)), + kuadrantgatewayapi.WithPolicies(policies), + kuadrantgatewayapi.WithLogger(logger), + ) + if err != nil { + return nil, err + } + + return kuadrantgatewayapi.NewTopologyIndexes(t), nil +}