Skip to content

Commit

Permalink
refactor: use DAG for applying RLP overrides
Browse files Browse the repository at this point in the history
  • Loading branch information
KevFan committed Apr 12, 2024
1 parent 9875bb0 commit fe2a40e
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 35 deletions.
4 changes: 2 additions & 2 deletions controllers/ratelimitpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (r *RateLimitPolicyReconciler) reconcileResources(ctx context.Context, rlp
return err
}

if err := r.reconcileLimits(ctx, rlp, targetNetworkObject); err != nil {
if err := r.reconcileLimits(ctx, rlp); err != nil {
return err
}

Expand All @@ -195,7 +195,7 @@ func (r *RateLimitPolicyReconciler) deleteResources(ctx context.Context, rlp *ku
return err
}

if err := r.deleteLimits(ctx, rlp, targetNetworkObject); err != nil && !apierrors.IsNotFound(err) {
if err := r.deleteLimits(ctx, rlp); err != nil && !apierrors.IsNotFound(err) {
return err
}

Expand Down
119 changes: 90 additions & 29 deletions controllers/ratelimitpolicy_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,32 @@ package controllers

import (
"context"
"slices"
"sort"

"github.com/go-logr/logr"
limitadorv1alpha1 "github.com/kuadrant/limitador-operator/api/v1alpha1"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
gatewayapiv1 "sigs.k8s.io/gateway-api/apis/v1"

kuadrantv1beta2 "github.com/kuadrant/kuadrant-operator/api/v1beta2"
"github.com/kuadrant/kuadrant-operator/pkg/common"
"github.com/kuadrant/kuadrant-operator/pkg/library/gatewayapi"
kuadrantgatewayapi "github.com/kuadrant/kuadrant-operator/pkg/library/gatewayapi"
"github.com/kuadrant/kuadrant-operator/pkg/library/kuadrant"
"github.com/kuadrant/kuadrant-operator/pkg/library/utils"
"github.com/kuadrant/kuadrant-operator/pkg/rlptools"
)

func (r *RateLimitPolicyReconciler) reconcileLimits(ctx context.Context, rlp *kuadrantv1beta2.RateLimitPolicy, targetNetworkObject client.Object) error {
func (r *RateLimitPolicyReconciler) reconcileLimits(ctx context.Context, rlp *kuadrantv1beta2.RateLimitPolicy) error {
rlpRefs, err := r.TargetRefReconciler.GetAllGatewayPolicyRefs(ctx, rlp)
if err != nil {
return err
}
return r.reconcileLimitador(ctx, rlp, append(rlpRefs, client.ObjectKeyFromObject(rlp)), targetNetworkObject)
return r.reconcileLimitador(ctx, rlp, append(rlpRefs, client.ObjectKeyFromObject(rlp)))
}

func (r *RateLimitPolicyReconciler) deleteLimits(ctx context.Context, rlp *kuadrantv1beta2.RateLimitPolicy, targetNetworkObject client.Object) error {
func (r *RateLimitPolicyReconciler) deleteLimits(ctx context.Context, rlp *kuadrantv1beta2.RateLimitPolicy) error {
rlpRefs, err := r.TargetRefReconciler.GetAllGatewayPolicyRefs(ctx, rlp)
if err != nil {
return err
Expand All @@ -34,14 +37,14 @@ func (r *RateLimitPolicyReconciler) deleteLimits(ctx context.Context, rlp *kuadr
return rlpRef.Name != rlp.Name || rlpRef.Namespace != rlp.Namespace
})

return r.reconcileLimitador(ctx, rlp, rlpRefsWithoutRLP, targetNetworkObject)
return r.reconcileLimitador(ctx, rlp, rlpRefsWithoutRLP)
}

func (r *RateLimitPolicyReconciler) reconcileLimitador(ctx context.Context, rlp *kuadrantv1beta2.RateLimitPolicy, rlpRefs []client.ObjectKey, targetNetworkObject client.Object) error {
func (r *RateLimitPolicyReconciler) reconcileLimitador(ctx context.Context, rlp *kuadrantv1beta2.RateLimitPolicy, rlpRefs []client.ObjectKey) error {
logger, _ := logr.FromContext(ctx)
logger = logger.WithName("reconcileLimitador").WithValues("rlp refs", utils.Map(rlpRefs, func(ref client.ObjectKey) string { return ref.String() }))

rateLimitIndex, err := r.buildRateLimitIndex(ctx, rlpRefs, targetNetworkObject)
rateLimitIndex, err := r.buildRateLimitIndex(ctx, rlpRefs)
if err != nil {
return err
}
Expand Down Expand Up @@ -89,10 +92,15 @@ func (r *RateLimitPolicyReconciler) reconcileLimitador(ctx context.Context, rlp
return nil
}

func (r *RateLimitPolicyReconciler) buildRateLimitIndex(ctx context.Context, rlpRefs []client.ObjectKey, targetNetworkObject client.Object) (*rlptools.RateLimitIndex, error) {
func (r *RateLimitPolicyReconciler) buildRateLimitIndex(ctx context.Context, rlpRefs []client.ObjectKey) (*rlptools.RateLimitIndex, error) {
logger, _ := logr.FromContext(ctx)
logger = logger.WithName("buildRateLimitIndex").WithValues("ratelimitpolicies", rlpRefs)

t, err := r.generateTopology(ctx)
if err != nil {
return nil, err
}

rateLimitIndex := rlptools.NewRateLimitIndex()

for _, rlpKey := range rlpRefs {
Expand All @@ -107,9 +115,7 @@ func (r *RateLimitPolicyReconciler) buildRateLimitIndex(ctx context.Context, rlp
return nil, err
}

if err := r.applyOverrides(ctx, rlp, targetNetworkObject); err != nil {
return nil, err
}
r.applyOverrides(ctx, rlp, t)

rateLimitIndex.Set(rlpKey, rlptools.LimitadorRateLimitsFromRLP(rlp))
}
Expand All @@ -118,27 +124,82 @@ func (r *RateLimitPolicyReconciler) buildRateLimitIndex(ctx context.Context, rlp
}

// applyOverrides checks for any overrides set for the RateLimitPolicy.
// It iterates through the RateLimitPolicyList to find overrides for the provided target HTTPRoute.
// If an override is found, it updates the limits in the RateLimitPolicySpec in accordingly.
func (r *RateLimitPolicyReconciler) applyOverrides(ctx context.Context, rlp *kuadrantv1beta2.RateLimitPolicy, targetNetworkObject client.Object) error {
if route, ok := targetNetworkObject.(*gatewayapiv1.HTTPRoute); ok {
rlpList := &kuadrantv1beta2.RateLimitPolicyList{}
if err := r.Client().List(ctx, rlpList); err != nil {
return err
// It iterates through the slice of policies to find overrides for the provided target HTTPRoute.
// If an override is found, it updates the limits in the RateLimitPolicySpec accordingly.
func (r *RateLimitPolicyReconciler) applyOverrides(ctx context.Context, rlp *kuadrantv1beta2.RateLimitPolicy, t *kuadrantgatewayapi.Topology) {
logger, _ := logr.FromContext(ctx)
logger = logger.WithName("applyOverrides")

topologyIndexes := kuadrantgatewayapi.NewTopologyIndexes(t)

var policies []kuadrantgatewayapi.Policy
// For each gw, get all the policies if the current rlp is within the topology for this gateway
for _, gw := range t.Gateways() {
policyList := topologyIndexes.PoliciesFromGateway(gw.Gateway)
policyKeys := utils.Map(policyList, func(p kuadrantgatewayapi.Policy) client.ObjectKey {
return client.ObjectKeyFromObject(p)
})

// this policy is potentially affected by other policies from this gateway
if slices.Contains(policyKeys, client.ObjectKeyFromObject(rlp)) {
policies = append(policies, policyList...)
}
}

filteredPolicies := utils.Filter(policies, func(policy kuadrantgatewayapi.Policy) bool {
// HTTPRoute RLPs should only care about overrides from gateways
if kuadrantgatewayapi.IsTargetRefHTTPRoute(rlp.GetTargetRef()) {
return kuadrantgatewayapi.IsTargetRefGateway(policy.GetTargetRef())
}
// Gateway RLPs are not affected by other Gateway RLPs
return false
})

for _, p := range rlpList.Items {
clientKeys := gatewayapi.GetRouteAcceptedGatewayParentKeys(route)
for _, clientKey := range clientKeys {
if gatewayapi.IsTargetRefGateway(p.GetTargetRef()) &&
clientKey.Name == string(p.Spec.TargetRef.Name) && clientKey.Namespace == p.Namespace {
if p.Spec.Overrides != nil {
rlp.Spec.CommonSpec().Limits = p.Spec.Overrides.Limits
}
}
}
// Sort by TargetRefKind and creation timestamp
// Gateways RLPs are listed first in the order of oldest policy
sort.Sort(kuadrantgatewayapi.PolicyByTargetRefKindAndCreationTimeStamp(filteredPolicies))

// Iterate in order of precedence until finding a block of overrides
for _, policy := range filteredPolicies {
p := policy.(*kuadrantv1beta2.RateLimitPolicy)
if p.Spec.Overrides != nil {
rlp.Spec.CommonSpec().Limits = p.Spec.Overrides.Limits
logger.V(1).Info("applying overrides from parent policy", "parentPolicy", client.ObjectKeyFromObject(p))
break
}
}
}

return nil
func (r *RateLimitPolicyReconciler) generateTopology(ctx context.Context) (*kuadrantgatewayapi.Topology, error) {
logger, _ := logr.FromContext(ctx)

gwList := &gatewayapiv1.GatewayList{}
err := r.Client().List(ctx, gwList)
logger.V(1).Info("topology: list gateways", "#Gateways", len(gwList.Items), "err", err)
if err != nil {
return nil, err
}

routeList := &gatewayapiv1.HTTPRouteList{}
err = r.Client().List(ctx, routeList)
logger.V(1).Info("topology: list httproutes", "#HTTPRoutes", len(routeList.Items), "err", err)
if err != nil {
return nil, err
}

rlpList := &kuadrantv1beta2.RateLimitPolicyList{}
err = r.Client().List(ctx, rlpList)
logger.V(1).Info("topology: list rate limit policies", "#RLPS", len(rlpList.Items), "err", err)
if err != nil {
return nil, err
}

policies := utils.Map(rlpList.Items, func(p kuadrantv1beta2.RateLimitPolicy) kuadrantgatewayapi.Policy { return &p })

return kuadrantgatewayapi.NewTopology(
kuadrantgatewayapi.WithGateways(utils.Map(gwList.Items, ptr.To[gatewayapiv1.Gateway])),
kuadrantgatewayapi.WithRoutes(utils.Map(routeList.Items, ptr.To[gatewayapiv1.HTTPRoute])),
kuadrantgatewayapi.WithPolicies(policies),
kuadrantgatewayapi.WithLogger(logger),
)
}
26 changes: 25 additions & 1 deletion pkg/library/gatewayapi/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,31 @@ func (a PolicyByCreationTimestamp) Less(i, j int) bool {
p1Time := ptr.To(a[i].GetCreationTimestamp())
p2Time := ptr.To(a[j].GetCreationTimestamp())
if !p1Time.Equal(p2Time) {
return ptr.To(a[i].GetCreationTimestamp()).Before(ptr.To(a[j].GetCreationTimestamp()))
return p1Time.Before(p2Time)
}

// The policy appearing first in alphabetical order by "{namespace}/{name}".
return client.ObjectKeyFromObject(a[i]).String() < client.ObjectKeyFromObject(a[j]).String()
}

type PolicyByTargetRefKindAndCreationTimeStamp []Policy

func (a PolicyByTargetRefKindAndCreationTimeStamp) Len() int { return len(a) }
func (a PolicyByTargetRefKindAndCreationTimeStamp) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a PolicyByTargetRefKindAndCreationTimeStamp) Less(i, j int) bool {
targetRef1 := a[i].GetTargetRef()
targetRef2 := a[j].GetTargetRef()

// Compare kind first
if targetRef1.Kind != targetRef2.Kind {
return targetRef1.Kind < targetRef2.Kind
}

// Then compare timestamp
p1Time := ptr.To(a[i].GetCreationTimestamp())
p2Time := ptr.To(a[j].GetCreationTimestamp())
if !p1Time.Equal(p2Time) {
return p1Time.Before(p2Time)
}

// The policy appearing first in alphabetical order by "{namespace}/{name}".
Expand Down
72 changes: 69 additions & 3 deletions pkg/library/gatewayapi/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
gatewayapiv1 "sigs.k8s.io/gateway-api/apis/v1"
gatewayapiv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
)

Expand Down Expand Up @@ -105,12 +106,77 @@ func TestPolicyByCreationTimestamp(t *testing.T) {
}
}

func createTestPolicy(name string, creationTime time.Time) *TestPolicy {
return &TestPolicy{
func TestPolicyByTargetRef(t *testing.T) {
testCases := []struct {
name string
policies []Policy
sortedPolicies []Policy
}{
{
name: "nil input",
policies: nil,
sortedPolicies: nil,
},
{
name: "empty slices",
policies: make([]Policy, 0),
sortedPolicies: make([]Policy, 0),
},
{
name: "by kind, and creation date",
policies: []Policy{
createTestPolicy("ccc", time.Date(2020, time.November, 10, 23, 0, 0, 0, time.UTC), withTargetRefKind("Gateway")),
createTestPolicy("bbb", time.Date(2000, time.November, 10, 23, 0, 0, 0, time.UTC), withTargetRefKind("HTTPRoute")),
createTestPolicy("aaa", time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), withTargetRefKind("Gateway")),
},
sortedPolicies: []Policy{
createTestPolicy("aaa", time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), withTargetRefKind("Gateway")),
createTestPolicy("ccc", time.Date(2020, time.November, 10, 23, 0, 0, 0, time.UTC), withTargetRefKind("Gateway")),
createTestPolicy("bbb", time.Date(2000, time.November, 10, 23, 0, 0, 0, time.UTC), withTargetRefKind("HTTPRoute")),
},
},
{
name: "by kind, and then name when creation date equal",
policies: []Policy{
createTestPolicy("ccc", time.Date(2020, time.November, 10, 23, 0, 0, 0, time.UTC), withTargetRefKind("Gateway")),
createTestPolicy("bbb", time.Date(2020, time.November, 10, 23, 0, 0, 0, time.UTC), withTargetRefKind("HTTPRoute")),
createTestPolicy("aaa", time.Date(2020, time.November, 10, 23, 0, 0, 0, time.UTC), withTargetRefKind("Gateway")),
},
sortedPolicies: []Policy{
createTestPolicy("aaa", time.Date(2020, time.November, 10, 23, 0, 0, 0, time.UTC), withTargetRefKind("Gateway")),
createTestPolicy("ccc", time.Date(2020, time.November, 10, 23, 0, 0, 0, time.UTC), withTargetRefKind("Gateway")),
createTestPolicy("bbb", time.Date(2020, time.November, 10, 23, 0, 0, 0, time.UTC), withTargetRefKind("HTTPRoute")),
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(subT *testing.T) {
sort.Sort(PolicyByTargetRefKindAndCreationTimeStamp(tc.policies))
if !reflect.DeepEqual(tc.policies, tc.sortedPolicies) {
subT.Errorf("expected=%v; got=%v", tc.sortedPolicies, tc.policies)
}
})
}
}

func createTestPolicy(name string, creationTime time.Time, mutateFn ...func(p *TestPolicy)) *TestPolicy {
p := &TestPolicy{
ObjectMeta: metav1.ObjectMeta{
Namespace: "testnamespace",
Name: name,
CreationTimestamp: metav1.Time{creationTime},
CreationTimestamp: metav1.Time{Time: creationTime},
},
}
for _, fn := range mutateFn {
fn(p)
}

return p
}

func withTargetRefKind(targetRefKind string) func(p *TestPolicy) {
return func(p *TestPolicy) {
p.TargetRef = gatewayapiv1alpha2.PolicyTargetReference{Kind: gatewayapiv1.Kind(targetRefKind)}
}
}

0 comments on commit fe2a40e

Please sign in to comment.