Skip to content

Commit

Permalink
update PE to support a new ns scoped pods field and status conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
haouc committed Feb 1, 2024
1 parent f3d022d commit 8092ba2
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 23 deletions.
34 changes: 34 additions & 0 deletions api/v1alpha1/policyendpoint_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,46 @@ type PolicyEndpointSpec struct {

// Egress is the list of egress rules containing resolved network addresses
Egress []EndpointInfo `json:"egress,omitempty"`

// AllPodsInNameSpace is the boolean value indicating should all pods in the policy namespace be selected
// +optional
AllPodsInNamespace bool `json:"allPodsInNamespace,omitempty"`
}

// PolicyEndpointStatus defines the observed state of PolicyEndpoint
type PolicyEndpointStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file

// +optional
Conditions []PolicyEndpointCondition `json:"conditions,omitempty"`
}

type PolicyEndpointConditionType string

const (
Packed PolicyEndpointConditionType = "PackedPolicyEndpoint"
Updated PolicyEndpointConditionType = "PatchedPolicyEndpoint"
)

// PolicyEndpointCondition describes the state of a PolicyEndpoint at a certain point.
// For example, binpacking PE slices should be updated as a condition change
type PolicyEndpointCondition struct {
// Type of PolicyEndpoint condition.
// +optional
Type PolicyEndpointConditionType `json:"type"`
// Status of the condition, one of True, False, Unknown.
// +optional
Status corev1.ConditionStatus `json:"status"`
// Last time the condition transitioned from one status to another.
// +optional
LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"`
// The reason for the condition's last transition.
// +optional
Reason string `json:"reason,omitempty"`
// A human readable message indicating details about the transition.
// +optional
Message string `json:"message,omitempty"`
}

//+kubebuilder:object:root=true
Expand Down
31 changes: 31 additions & 0 deletions config/crd/bases/networking.k8s.aws_policyendpoints.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ spec:
spec:
description: PolicyEndpointSpec defines the desired state of PolicyEndpoint
properties:
allPodsInNamespace:
description: AllPodsInNameSpace is the boolean value indicating should
all pods in the policy namespace be selected
type: boolean
egress:
description: Egress is the list of egress rules containing resolved
network addresses
Expand Down Expand Up @@ -225,6 +229,33 @@ spec:
type: object
status:
description: PolicyEndpointStatus defines the observed state of PolicyEndpoint
properties:
conditions:
items:
description: PolicyEndpointCondition describes the state of a PolicyEndpoint
at a certain point. For example, binpacking PE slices should be
updated as a condition change
properties:
lastTransitionTime:
description: Last time the condition transitioned from one status
to another.
format: date-time
type: string
message:
description: A human readable message indicating details about
the transition.
type: string
reason:
description: The reason for the condition's last transition.
type: string
status:
description: Status of the condition, one of True, False, Unknown.
type: string
type:
description: Type of PolicyEndpoint condition.
type: string
type: object
type: array
type: object
type: object
served: true
Expand Down
111 changes: 89 additions & 22 deletions pkg/policyendpoints/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"strconv"

"golang.org/x/exp/maps"

"github.com/go-logr/logr"
"github.com/pkg/errors"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
networking "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -21,6 +23,8 @@ import (
policyinfo "github.com/aws/amazon-network-policy-controller-k8s/api/v1alpha1"
"github.com/aws/amazon-network-policy-controller-k8s/pkg/k8s"
"github.com/aws/amazon-network-policy-controller-k8s/pkg/resolvers"
"github.com/aws/amazon-network-policy-controller-k8s/pkg/utils/conditions"
"github.com/aws/amazon-network-policy-controller-k8s/pkg/utils/conversions"
)

type PolicyEndpointsManager interface {
Expand All @@ -39,6 +43,19 @@ func NewPolicyEndpointsManager(k8sClient client.Client, endpointChunkSize int, l
}
}

const (
ingressShift = 2
egressShift = 1
psShift = 0

ingBit = 4
egBit = 2
psBit = 1

reasonBinPacking = "PEBinPacked"
reasonPatching = "PEPatched"
)

var _ PolicyEndpointsManager = (*policyEndpointsManager)(nil)

type policyEndpointsManager struct {
Expand All @@ -60,12 +77,9 @@ func (m *policyEndpointsManager) Reconcile(ctx context.Context, policy *networki
client.MatchingFields{IndexKeyPolicyReferenceName: policy.Name}); err != nil {
return err
}
existingPolicyEndpoints := make([]policyinfo.PolicyEndpoint, 0, len(policyEndpointList.Items))
for _, policyEndpoint := range policyEndpointList.Items {
existingPolicyEndpoints = append(existingPolicyEndpoints, policyEndpoint)
}

createList, updateList, deleteList, err := m.computePolicyEndpoints(policy, existingPolicyEndpoints, ingressRules, egressRules, podSelectorEndpoints)
createList, updateList, deleteList, packed, err := m.computePolicyEndpoints(policy, policyEndpointList.Items, ingressRules, egressRules, podSelectorEndpoints)
m.logger.Info("the controller is packing PE rules", "Packed", conversions.IntToBool(packed))
if err != nil {
return err
}
Expand All @@ -79,17 +93,42 @@ func (m *policyEndpointsManager) Reconcile(ctx context.Context, policy *networki

for _, policyEndpoint := range updateList {
oldRes := &policyinfo.PolicyEndpoint{}
if err := m.k8sClient.Get(ctx, k8s.NamespacedName(&policyEndpoint), oldRes); err != nil {
peId := k8s.NamespacedName(&policyEndpoint)
if err := m.k8sClient.Get(ctx, peId, oldRes); err != nil {
return err
}
if equality.Semantic.DeepEqual(oldRes.Spec, policyEndpoint.Spec) {
m.logger.V(1).Info("Policy endpoint already up to date", "id", k8s.NamespacedName(&policyEndpoint))
m.logger.V(1).Info("Policy endpoint already up to date", "id", peId)
continue
}

if err := m.k8sClient.Patch(ctx, &policyEndpoint, client.MergeFrom(oldRes)); err != nil {
if cErr := conditions.UpdatePEConditions(ctx, m.k8sClient,
peId,
m.logger,
policyinfo.Updated,
corev1.ConditionFalse,
reasonPatching,
fmt.Sprintf("patching policy endpoint failed: %s", err.Error()),
); cErr != nil {
m.logger.Error(cErr, "Adding PE patch failure condition updates to PE failed", "PENamespacedName", peId)
}
return err
}
m.logger.Info("Updated policy endpoint", "id", k8s.NamespacedName(&policyEndpoint))
m.logger.Info("Updated policy endpoint", "id", peId)

if packed > 0 {
if err := conditions.UpdatePEConditions(ctx, m.k8sClient,
peId,
m.logger,
policyinfo.Packed,
corev1.ConditionTrue,
reasonBinPacking,
fmt.Sprintf("binpacked network policy endpoint slices on Ingress - %t, Egress - %t, PodSelector - %t", packed&ingBit>>ingressShift == 1, packed&egBit>>egressShift == 1, packed&psBit>>psShift == 1),
); err != nil {
m.logger.Error(err, "Adding bingpacking condition updates to PE failed", "PENamespacedName", peId)
}
}
}

for _, policyEndpoint := range deleteList {
Expand Down Expand Up @@ -123,7 +162,7 @@ func (m *policyEndpointsManager) Cleanup(ctx context.Context, policy *networking
func (m *policyEndpointsManager) computePolicyEndpoints(policy *networking.NetworkPolicy,
existingPolicyEndpoints []policyinfo.PolicyEndpoint, ingressEndpoints []policyinfo.EndpointInfo,
egressEndpoints []policyinfo.EndpointInfo, podSelectorEndpoints []policyinfo.PodEndpoint) ([]policyinfo.PolicyEndpoint,
[]policyinfo.PolicyEndpoint, []policyinfo.PolicyEndpoint, error) {
[]policyinfo.PolicyEndpoint, []policyinfo.PolicyEndpoint, int, error) {

// Loop through ingressEndpoints, egressEndpoints and podSelectorEndpoints and put in map
// also populate them into policy endpoints
Expand All @@ -138,11 +177,11 @@ func (m *policyEndpointsManager) computePolicyEndpoints(policy *networking.Netwo
var deletePolicyEndpoints []policyinfo.PolicyEndpoint

// packing new ingress rules
createPolicyEndpoints, doNotDeleteIngress := m.packingIngressRules(policy, ingressEndpointsMap, createPolicyEndpoints, modifiedEndpoints, potentialDeletes)
createPolicyEndpoints, doNotDeleteIngress, ingPacked := m.packingIngressRules(policy, ingressEndpointsMap, createPolicyEndpoints, modifiedEndpoints, potentialDeletes)
// packing new egress rules
createPolicyEndpoints, doNotDeleteEgress := m.packingEgressRules(policy, egressEndpointsMap, createPolicyEndpoints, modifiedEndpoints, potentialDeletes)
createPolicyEndpoints, doNotDeleteEgress, egPacked := m.packingEgressRules(policy, egressEndpointsMap, createPolicyEndpoints, modifiedEndpoints, potentialDeletes)
// packing new pod selector
createPolicyEndpoints, doNotDeletePs := m.packingPodSelectorEndpoints(policy, podSelectorEndpointSet.UnsortedList(), createPolicyEndpoints, modifiedEndpoints, potentialDeletes)
createPolicyEndpoints, doNotDeletePs, psPacked := m.packingPodSelectorEndpoints(policy, podSelectorEndpointSet.UnsortedList(), createPolicyEndpoints, modifiedEndpoints, potentialDeletes)

doNotDelete.Insert(doNotDeleteIngress.UnsortedList()...)
doNotDelete.Insert(doNotDeleteEgress.UnsortedList()...)
Expand All @@ -167,7 +206,7 @@ func (m *policyEndpointsManager) computePolicyEndpoints(policy *networking.Netwo
}
}

return createPolicyEndpoints, updatePolicyEndpoints, deletePolicyEndpoints, nil
return createPolicyEndpoints, updatePolicyEndpoints, deletePolicyEndpoints, (conversions.BoolToint(ingPacked) << ingressShift) | (conversions.BoolToint(egPacked) << egressShift) | (conversions.BoolToint(psPacked) << psShift), nil
}

func (m *policyEndpointsManager) newPolicyEndpoint(policy *networking.NetworkPolicy,
Expand Down Expand Up @@ -202,6 +241,13 @@ func (m *policyEndpointsManager) newPolicyEndpoint(policy *networking.NetworkPol
Egress: egressRules,
},
}

// if no pod selector is specified, the controller adds a boolean value true to AllPodsInNamespace
if policy.Spec.PodSelector.Size() == 0 {
m.logger.Info("Creating a new PE but requested NP doesn't have pod selector", "NPName", policy.Name, "NPNamespace", policy.Namespace)
policyEndpoint.Spec.AllPodsInNamespace = true
}

return policyEndpoint
}

Expand Down Expand Up @@ -319,24 +365,32 @@ func (m *policyEndpointsManager) processExistingPolicyEndpoints(
// it returns the ingress rules packed in policy endpoints and a set of policy endpoints that need to be kept.
func (m *policyEndpointsManager) packingIngressRules(policy *networking.NetworkPolicy,
rulesMap map[string]policyinfo.EndpointInfo,
createPolicyEndpoints, modifiedEndpoints, potentialDeletes []policyinfo.PolicyEndpoint) ([]policyinfo.PolicyEndpoint, sets.Set[types.NamespacedName]) {
createPolicyEndpoints, modifiedEndpoints, potentialDeletes []policyinfo.PolicyEndpoint) ([]policyinfo.PolicyEndpoint, sets.Set[types.NamespacedName], bool) {
doNotDelete := sets.Set[types.NamespacedName]{}
chunkStartIdx := 0
chunkEndIdx := 0
ingressList := maps.Keys(rulesMap)

packed := false

// try to fill existing polciy endpoints first and then new ones if needed
for _, sliceToCheck := range [][]policyinfo.PolicyEndpoint{modifiedEndpoints, potentialDeletes, createPolicyEndpoints} {
for i := range sliceToCheck {
// reset start pointer if end pointer is updated
chunkStartIdx = chunkEndIdx

// Instead of adding the entire chunk we should try to add to full the slice
if len(sliceToCheck[i].Spec.Ingress) < m.endpointChunkSize && chunkEndIdx < len(ingressList) {
// when new ingress rule list is greater than available spots in current non-empty PE rule's list, we do binpacking
spots := m.endpointChunkSize - len(sliceToCheck[i].Spec.Ingress)
packed = spots > 0 && len(sliceToCheck[i].Spec.Ingress) > 0 && spots < len(ingressList)

if spots > 0 && chunkEndIdx < len(ingressList) {
for len(sliceToCheck[i].Spec.Ingress)+(chunkEndIdx-chunkStartIdx+1) < m.endpointChunkSize && chunkEndIdx < len(ingressList)-1 {
chunkEndIdx++
}

sliceToCheck[i].Spec.Ingress = append(sliceToCheck[i].Spec.Ingress, m.getListOfEndpointInfoFromHash(lo.Slice(ingressList, chunkStartIdx, chunkEndIdx+1), rulesMap)...)

// move the end to next available index to prepare next appending
chunkEndIdx++
}
Expand All @@ -355,26 +409,33 @@ func (m *policyEndpointsManager) packingIngressRules(policy *networking.NetworkP
createPolicyEndpoints = append(createPolicyEndpoints, newEP)
}
}
return createPolicyEndpoints, doNotDelete
return createPolicyEndpoints, doNotDelete, packed
}

// packingEgressRules iterates over egress rules across available policy endpoints and required egress rule changes.
// it returns the egress rules packed in policy endpoints and a set of policy endpoints that need to be kept.
func (m *policyEndpointsManager) packingEgressRules(policy *networking.NetworkPolicy,
rulesMap map[string]policyinfo.EndpointInfo,
createPolicyEndpoints, modifiedEndpoints, potentialDeletes []policyinfo.PolicyEndpoint) ([]policyinfo.PolicyEndpoint, sets.Set[types.NamespacedName]) {
createPolicyEndpoints, modifiedEndpoints, potentialDeletes []policyinfo.PolicyEndpoint) ([]policyinfo.PolicyEndpoint, sets.Set[types.NamespacedName], bool) {
doNotDelete := sets.Set[types.NamespacedName]{}
chunkStartIdx := 0
chunkEndIdx := 0
egressList := maps.Keys(rulesMap)

packed := false

// try to fill existing polciy endpoints first and then new ones if needed
for _, sliceToCheck := range [][]policyinfo.PolicyEndpoint{modifiedEndpoints, potentialDeletes, createPolicyEndpoints} {
for i := range sliceToCheck {
// reset start pointer if end pointer is updated
chunkStartIdx = chunkEndIdx

// Instead of adding the entire chunk we should try to add to full the slice
if len(sliceToCheck[i].Spec.Egress) < m.endpointChunkSize && chunkEndIdx < len(egressList) {
// when new egress rule list is greater than available spots in current non-empty PE rule's list, we do binpacking
spots := m.endpointChunkSize - len(sliceToCheck[i].Spec.Egress)
packed = spots > 0 && len(sliceToCheck[i].Spec.Egress) > 0 && spots < len(egressList)

if spots > 0 && chunkEndIdx < len(egressList) {
for len(sliceToCheck[i].Spec.Egress)+(chunkEndIdx-chunkStartIdx+1) < m.endpointChunkSize && chunkEndIdx < len(egressList)-1 {
chunkEndIdx++
}
Expand All @@ -398,26 +459,32 @@ func (m *policyEndpointsManager) packingEgressRules(policy *networking.NetworkPo
createPolicyEndpoints = append(createPolicyEndpoints, newEP)
}
}
return createPolicyEndpoints, doNotDelete
return createPolicyEndpoints, doNotDelete, packed
}

// packingPodSelectorEndpoints iterates over pod selectors across available policy endpoints and required pod selector changes.
// it returns the pod selectors packed in policy endpoints and a set of policy endpoints that need to be kept.
func (m *policyEndpointsManager) packingPodSelectorEndpoints(policy *networking.NetworkPolicy,
psList []policyinfo.PodEndpoint,
createPolicyEndpoints, modifiedEndpoints, potentialDeletes []policyinfo.PolicyEndpoint) ([]policyinfo.PolicyEndpoint, sets.Set[types.NamespacedName]) {
createPolicyEndpoints, modifiedEndpoints, potentialDeletes []policyinfo.PolicyEndpoint) ([]policyinfo.PolicyEndpoint, sets.Set[types.NamespacedName], bool) {

doNotDelete := sets.Set[types.NamespacedName]{}
chunkStartIdx := 0
chunkEndIdx := 0
packed := false

// try to fill existing polciy endpoints first and then new ones if needed
for _, sliceToCheck := range [][]policyinfo.PolicyEndpoint{modifiedEndpoints, potentialDeletes, createPolicyEndpoints} {
for i := range sliceToCheck {
// reset start pointer if end pointer is updated
chunkStartIdx = chunkEndIdx

// Instead of adding the entire chunk we should try to add to full the slice
if len(sliceToCheck[i].Spec.PodSelectorEndpoints) < m.endpointChunkSize && chunkEndIdx < len(psList) {
// when new pods list is greater than available spots in current non-empty PS list, we do binpacking
spots := m.endpointChunkSize - len(sliceToCheck[i].Spec.PodSelectorEndpoints)
packed = spots > 0 && len(sliceToCheck[i].Spec.PodSelectorEndpoints) > 0 && spots < len(psList)

if spots > 0 && chunkEndIdx < len(psList) {
for len(sliceToCheck[i].Spec.PodSelectorEndpoints)+(chunkEndIdx-chunkStartIdx+1) < m.endpointChunkSize && chunkEndIdx < len(psList)-1 {
chunkEndIdx++
}
Expand All @@ -441,5 +508,5 @@ func (m *policyEndpointsManager) packingPodSelectorEndpoints(policy *networking.
createPolicyEndpoints = append(createPolicyEndpoints, newEP)
}
}
return createPolicyEndpoints, doNotDelete
return createPolicyEndpoints, doNotDelete, packed
}
2 changes: 1 addition & 1 deletion pkg/policyendpoints/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func Test_policyEndpointsManager_computePolicyEndpoints(t *testing.T) {
m := &policyEndpointsManager{
endpointChunkSize: tt.fields.endpointChunkSize,
}
createList, updateList, deleteList, err := m.computePolicyEndpoints(tt.args.policy, tt.args.policyEndpoints,
createList, updateList, deleteList, _, err := m.computePolicyEndpoints(tt.args.policy, tt.args.policyEndpoints,
tt.args.ingressRules, tt.args.egressRules, tt.args.podselectorEndpoints)

if len(tt.wantErr) > 0 {
Expand Down
Loading

0 comments on commit 8092ba2

Please sign in to comment.