diff --git a/api/v1alpha1/policyendpoint_types.go b/api/v1alpha1/policyendpoint_types.go index eadeecd..4c82857 100644 --- a/api/v1alpha1/policyendpoint_types.go +++ b/api/v1alpha1/policyendpoint_types.go @@ -93,12 +93,46 @@ type PolicyEndpointSpec struct { // Egress is the list of egress rules containing resolved network addresses Egress []EndpointInfo `json:"egress,omitempty"` + + // AllPodsInNameSpace is the boolean value indicating should all pods in the policy namespace be selected + // +optional + AllPodsInNamespace bool `json:"allPodsInNamespace,omitempty"` } // PolicyEndpointStatus defines the observed state of PolicyEndpoint type PolicyEndpointStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster // Important: Run "make" to regenerate code after modifying this file + + // +optional + Conditions []PolicyEndpointCondition `json:"conditions,omitempty"` +} + +type PolicyEndpointConditionType string + +const ( + Packed PolicyEndpointConditionType = "PackedPolicyEndpoint" + Updated PolicyEndpointConditionType = "PatchedPolicyEndpoint" +) + +// PolicyEndpointCondition describes the state of a PolicyEndpoint at a certain point. +// For example, binpacking PE slices should be updated as a condition change +type PolicyEndpointCondition struct { + // Type of PolicyEndpoint condition. + // +optional + Type PolicyEndpointConditionType `json:"type"` + // Status of the condition, one of True, False, Unknown. + // +optional + Status corev1.ConditionStatus `json:"status"` + // Last time the condition transitioned from one status to another. + // +optional + LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"` + // The reason for the condition's last transition. + // +optional + Reason string `json:"reason,omitempty"` + // A human readable message indicating details about the transition. + // +optional + Message string `json:"message,omitempty"` } //+kubebuilder:object:root=true diff --git a/config/crd/bases/networking.k8s.aws_policyendpoints.yaml b/config/crd/bases/networking.k8s.aws_policyendpoints.yaml index 859579b..af4025a 100644 --- a/config/crd/bases/networking.k8s.aws_policyendpoints.yaml +++ b/config/crd/bases/networking.k8s.aws_policyendpoints.yaml @@ -35,6 +35,10 @@ spec: spec: description: PolicyEndpointSpec defines the desired state of PolicyEndpoint properties: + allPodsInNamespace: + description: AllPodsInNameSpace is the boolean value indicating should + all pods in the policy namespace be selected + type: boolean egress: description: Egress is the list of egress rules containing resolved network addresses @@ -225,6 +229,33 @@ spec: type: object status: description: PolicyEndpointStatus defines the observed state of PolicyEndpoint + properties: + conditions: + items: + description: PolicyEndpointCondition describes the state of a PolicyEndpoint + at a certain point. For example, binpacking PE slices should be + updated as a condition change + properties: + lastTransitionTime: + description: Last time the condition transitioned from one status + to another. + format: date-time + type: string + message: + description: A human readable message indicating details about + the transition. + type: string + reason: + description: The reason for the condition's last transition. + type: string + status: + description: Status of the condition, one of True, False, Unknown. + type: string + type: + description: Type of PolicyEndpoint condition. + type: string + type: object + type: array type: object type: object served: true diff --git a/pkg/policyendpoints/manager.go b/pkg/policyendpoints/manager.go index 15ba92a..eee5b65 100644 --- a/pkg/policyendpoints/manager.go +++ b/pkg/policyendpoints/manager.go @@ -4,6 +4,7 @@ import ( "context" "crypto/sha256" "encoding/hex" + "fmt" "strconv" "golang.org/x/exp/maps" @@ -11,6 +12,7 @@ import ( "github.com/go-logr/logr" "github.com/pkg/errors" "github.com/samber/lo" + corev1 "k8s.io/api/core/v1" networking "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -21,6 +23,8 @@ import ( policyinfo "github.com/aws/amazon-network-policy-controller-k8s/api/v1alpha1" "github.com/aws/amazon-network-policy-controller-k8s/pkg/k8s" "github.com/aws/amazon-network-policy-controller-k8s/pkg/resolvers" + "github.com/aws/amazon-network-policy-controller-k8s/pkg/utils/conditions" + "github.com/aws/amazon-network-policy-controller-k8s/pkg/utils/conversions" ) type PolicyEndpointsManager interface { @@ -39,6 +43,19 @@ func NewPolicyEndpointsManager(k8sClient client.Client, endpointChunkSize int, l } } +const ( + ingressShift = 2 + egressShift = 1 + psShift = 0 + + ingBit = 4 + egBit = 2 + psBit = 1 + + reasonBinPacking = "PEBinPacked" + reasonPatching = "PEPatched" +) + var _ PolicyEndpointsManager = (*policyEndpointsManager)(nil) type policyEndpointsManager struct { @@ -60,12 +77,9 @@ func (m *policyEndpointsManager) Reconcile(ctx context.Context, policy *networki client.MatchingFields{IndexKeyPolicyReferenceName: policy.Name}); err != nil { return err } - existingPolicyEndpoints := make([]policyinfo.PolicyEndpoint, 0, len(policyEndpointList.Items)) - for _, policyEndpoint := range policyEndpointList.Items { - existingPolicyEndpoints = append(existingPolicyEndpoints, policyEndpoint) - } - createList, updateList, deleteList, err := m.computePolicyEndpoints(policy, existingPolicyEndpoints, ingressRules, egressRules, podSelectorEndpoints) + createList, updateList, deleteList, packed, err := m.computePolicyEndpoints(policy, policyEndpointList.Items, ingressRules, egressRules, podSelectorEndpoints) + m.logger.Info("the controller is packing PE rules", "Packed", conversions.IntToBool(packed)) if err != nil { return err } @@ -79,17 +93,42 @@ func (m *policyEndpointsManager) Reconcile(ctx context.Context, policy *networki for _, policyEndpoint := range updateList { oldRes := &policyinfo.PolicyEndpoint{} - if err := m.k8sClient.Get(ctx, k8s.NamespacedName(&policyEndpoint), oldRes); err != nil { + peId := k8s.NamespacedName(&policyEndpoint) + if err := m.k8sClient.Get(ctx, peId, oldRes); err != nil { return err } if equality.Semantic.DeepEqual(oldRes.Spec, policyEndpoint.Spec) { - m.logger.V(1).Info("Policy endpoint already up to date", "id", k8s.NamespacedName(&policyEndpoint)) + m.logger.V(1).Info("Policy endpoint already up to date", "id", peId) continue } + if err := m.k8sClient.Patch(ctx, &policyEndpoint, client.MergeFrom(oldRes)); err != nil { + if cErr := conditions.UpdatePEConditions(ctx, m.k8sClient, + peId, + m.logger, + policyinfo.Updated, + corev1.ConditionFalse, + reasonPatching, + fmt.Sprintf("patching policy endpoint failed: %s", err.Error()), + ); cErr != nil { + m.logger.Error(cErr, "Adding PE patch failure condition updates to PE failed", "PENamespacedName", peId) + } return err } - m.logger.Info("Updated policy endpoint", "id", k8s.NamespacedName(&policyEndpoint)) + m.logger.Info("Updated policy endpoint", "id", peId) + + if packed > 0 { + if err := conditions.UpdatePEConditions(ctx, m.k8sClient, + peId, + m.logger, + policyinfo.Packed, + corev1.ConditionTrue, + reasonBinPacking, + fmt.Sprintf("binpacked network policy endpoint slices on Ingress - %t, Egress - %t, PodSelector - %t", packed&ingBit>>ingressShift == 1, packed&egBit>>egressShift == 1, packed&psBit>>psShift == 1), + ); err != nil { + m.logger.Error(err, "Adding bingpacking condition updates to PE failed", "PENamespacedName", peId) + } + } } for _, policyEndpoint := range deleteList { @@ -123,7 +162,7 @@ func (m *policyEndpointsManager) Cleanup(ctx context.Context, policy *networking func (m *policyEndpointsManager) computePolicyEndpoints(policy *networking.NetworkPolicy, existingPolicyEndpoints []policyinfo.PolicyEndpoint, ingressEndpoints []policyinfo.EndpointInfo, egressEndpoints []policyinfo.EndpointInfo, podSelectorEndpoints []policyinfo.PodEndpoint) ([]policyinfo.PolicyEndpoint, - []policyinfo.PolicyEndpoint, []policyinfo.PolicyEndpoint, error) { + []policyinfo.PolicyEndpoint, []policyinfo.PolicyEndpoint, int, error) { // Loop through ingressEndpoints, egressEndpoints and podSelectorEndpoints and put in map // also populate them into policy endpoints @@ -138,11 +177,11 @@ func (m *policyEndpointsManager) computePolicyEndpoints(policy *networking.Netwo var deletePolicyEndpoints []policyinfo.PolicyEndpoint // packing new ingress rules - createPolicyEndpoints, doNotDeleteIngress := m.packingIngressRules(policy, ingressEndpointsMap, createPolicyEndpoints, modifiedEndpoints, potentialDeletes) + createPolicyEndpoints, doNotDeleteIngress, ingPacked := m.packingIngressRules(policy, ingressEndpointsMap, createPolicyEndpoints, modifiedEndpoints, potentialDeletes) // packing new egress rules - createPolicyEndpoints, doNotDeleteEgress := m.packingEgressRules(policy, egressEndpointsMap, createPolicyEndpoints, modifiedEndpoints, potentialDeletes) + createPolicyEndpoints, doNotDeleteEgress, egPacked := m.packingEgressRules(policy, egressEndpointsMap, createPolicyEndpoints, modifiedEndpoints, potentialDeletes) // packing new pod selector - createPolicyEndpoints, doNotDeletePs := m.packingPodSelectorEndpoints(policy, podSelectorEndpointSet.UnsortedList(), createPolicyEndpoints, modifiedEndpoints, potentialDeletes) + createPolicyEndpoints, doNotDeletePs, psPacked := m.packingPodSelectorEndpoints(policy, podSelectorEndpointSet.UnsortedList(), createPolicyEndpoints, modifiedEndpoints, potentialDeletes) doNotDelete.Insert(doNotDeleteIngress.UnsortedList()...) doNotDelete.Insert(doNotDeleteEgress.UnsortedList()...) @@ -167,7 +206,7 @@ func (m *policyEndpointsManager) computePolicyEndpoints(policy *networking.Netwo } } - return createPolicyEndpoints, updatePolicyEndpoints, deletePolicyEndpoints, nil + return createPolicyEndpoints, updatePolicyEndpoints, deletePolicyEndpoints, (conversions.BoolToint(ingPacked) << ingressShift) | (conversions.BoolToint(egPacked) << egressShift) | (conversions.BoolToint(psPacked) << psShift), nil } func (m *policyEndpointsManager) newPolicyEndpoint(policy *networking.NetworkPolicy, @@ -202,6 +241,13 @@ func (m *policyEndpointsManager) newPolicyEndpoint(policy *networking.NetworkPol Egress: egressRules, }, } + + // if no pod selector is specified, the controller adds a boolean value true to AllPodsInNamespace + if policy.Spec.PodSelector.Size() == 0 { + m.logger.Info("Creating a new PE but requested NP doesn't have pod selector", "NPName", policy.Name, "NPNamespace", policy.Namespace) + policyEndpoint.Spec.AllPodsInNamespace = true + } + return policyEndpoint } @@ -319,24 +365,32 @@ func (m *policyEndpointsManager) processExistingPolicyEndpoints( // it returns the ingress rules packed in policy endpoints and a set of policy endpoints that need to be kept. func (m *policyEndpointsManager) packingIngressRules(policy *networking.NetworkPolicy, rulesMap map[string]policyinfo.EndpointInfo, - createPolicyEndpoints, modifiedEndpoints, potentialDeletes []policyinfo.PolicyEndpoint) ([]policyinfo.PolicyEndpoint, sets.Set[types.NamespacedName]) { + createPolicyEndpoints, modifiedEndpoints, potentialDeletes []policyinfo.PolicyEndpoint) ([]policyinfo.PolicyEndpoint, sets.Set[types.NamespacedName], bool) { doNotDelete := sets.Set[types.NamespacedName]{} chunkStartIdx := 0 chunkEndIdx := 0 ingressList := maps.Keys(rulesMap) + packed := false + // try to fill existing polciy endpoints first and then new ones if needed for _, sliceToCheck := range [][]policyinfo.PolicyEndpoint{modifiedEndpoints, potentialDeletes, createPolicyEndpoints} { for i := range sliceToCheck { // reset start pointer if end pointer is updated chunkStartIdx = chunkEndIdx + // Instead of adding the entire chunk we should try to add to full the slice - if len(sliceToCheck[i].Spec.Ingress) < m.endpointChunkSize && chunkEndIdx < len(ingressList) { + // when new ingress rule list is greater than available spots in current non-empty PE rule's list, we do binpacking + spots := m.endpointChunkSize - len(sliceToCheck[i].Spec.Ingress) + packed = spots > 0 && len(sliceToCheck[i].Spec.Ingress) > 0 && spots < len(ingressList) + + if spots > 0 && chunkEndIdx < len(ingressList) { for len(sliceToCheck[i].Spec.Ingress)+(chunkEndIdx-chunkStartIdx+1) < m.endpointChunkSize && chunkEndIdx < len(ingressList)-1 { chunkEndIdx++ } sliceToCheck[i].Spec.Ingress = append(sliceToCheck[i].Spec.Ingress, m.getListOfEndpointInfoFromHash(lo.Slice(ingressList, chunkStartIdx, chunkEndIdx+1), rulesMap)...) + // move the end to next available index to prepare next appending chunkEndIdx++ } @@ -355,26 +409,33 @@ func (m *policyEndpointsManager) packingIngressRules(policy *networking.NetworkP createPolicyEndpoints = append(createPolicyEndpoints, newEP) } } - return createPolicyEndpoints, doNotDelete + return createPolicyEndpoints, doNotDelete, packed } // packingEgressRules iterates over egress rules across available policy endpoints and required egress rule changes. // it returns the egress rules packed in policy endpoints and a set of policy endpoints that need to be kept. func (m *policyEndpointsManager) packingEgressRules(policy *networking.NetworkPolicy, rulesMap map[string]policyinfo.EndpointInfo, - createPolicyEndpoints, modifiedEndpoints, potentialDeletes []policyinfo.PolicyEndpoint) ([]policyinfo.PolicyEndpoint, sets.Set[types.NamespacedName]) { + createPolicyEndpoints, modifiedEndpoints, potentialDeletes []policyinfo.PolicyEndpoint) ([]policyinfo.PolicyEndpoint, sets.Set[types.NamespacedName], bool) { doNotDelete := sets.Set[types.NamespacedName]{} chunkStartIdx := 0 chunkEndIdx := 0 egressList := maps.Keys(rulesMap) + packed := false + // try to fill existing polciy endpoints first and then new ones if needed for _, sliceToCheck := range [][]policyinfo.PolicyEndpoint{modifiedEndpoints, potentialDeletes, createPolicyEndpoints} { for i := range sliceToCheck { // reset start pointer if end pointer is updated chunkStartIdx = chunkEndIdx + // Instead of adding the entire chunk we should try to add to full the slice - if len(sliceToCheck[i].Spec.Egress) < m.endpointChunkSize && chunkEndIdx < len(egressList) { + // when new egress rule list is greater than available spots in current non-empty PE rule's list, we do binpacking + spots := m.endpointChunkSize - len(sliceToCheck[i].Spec.Egress) + packed = spots > 0 && len(sliceToCheck[i].Spec.Egress) > 0 && spots < len(egressList) + + if spots > 0 && chunkEndIdx < len(egressList) { for len(sliceToCheck[i].Spec.Egress)+(chunkEndIdx-chunkStartIdx+1) < m.endpointChunkSize && chunkEndIdx < len(egressList)-1 { chunkEndIdx++ } @@ -398,26 +459,32 @@ func (m *policyEndpointsManager) packingEgressRules(policy *networking.NetworkPo createPolicyEndpoints = append(createPolicyEndpoints, newEP) } } - return createPolicyEndpoints, doNotDelete + return createPolicyEndpoints, doNotDelete, packed } // packingPodSelectorEndpoints iterates over pod selectors across available policy endpoints and required pod selector changes. // it returns the pod selectors packed in policy endpoints and a set of policy endpoints that need to be kept. func (m *policyEndpointsManager) packingPodSelectorEndpoints(policy *networking.NetworkPolicy, psList []policyinfo.PodEndpoint, - createPolicyEndpoints, modifiedEndpoints, potentialDeletes []policyinfo.PolicyEndpoint) ([]policyinfo.PolicyEndpoint, sets.Set[types.NamespacedName]) { + createPolicyEndpoints, modifiedEndpoints, potentialDeletes []policyinfo.PolicyEndpoint) ([]policyinfo.PolicyEndpoint, sets.Set[types.NamespacedName], bool) { doNotDelete := sets.Set[types.NamespacedName]{} chunkStartIdx := 0 chunkEndIdx := 0 + packed := false // try to fill existing polciy endpoints first and then new ones if needed for _, sliceToCheck := range [][]policyinfo.PolicyEndpoint{modifiedEndpoints, potentialDeletes, createPolicyEndpoints} { for i := range sliceToCheck { // reset start pointer if end pointer is updated chunkStartIdx = chunkEndIdx + // Instead of adding the entire chunk we should try to add to full the slice - if len(sliceToCheck[i].Spec.PodSelectorEndpoints) < m.endpointChunkSize && chunkEndIdx < len(psList) { + // when new pods list is greater than available spots in current non-empty PS list, we do binpacking + spots := m.endpointChunkSize - len(sliceToCheck[i].Spec.PodSelectorEndpoints) + packed = spots > 0 && len(sliceToCheck[i].Spec.PodSelectorEndpoints) > 0 && spots < len(psList) + + if spots > 0 && chunkEndIdx < len(psList) { for len(sliceToCheck[i].Spec.PodSelectorEndpoints)+(chunkEndIdx-chunkStartIdx+1) < m.endpointChunkSize && chunkEndIdx < len(psList)-1 { chunkEndIdx++ } @@ -441,5 +508,5 @@ func (m *policyEndpointsManager) packingPodSelectorEndpoints(policy *networking. createPolicyEndpoints = append(createPolicyEndpoints, newEP) } } - return createPolicyEndpoints, doNotDelete + return createPolicyEndpoints, doNotDelete, packed } diff --git a/pkg/policyendpoints/manager_test.go b/pkg/policyendpoints/manager_test.go index 2f7a15d..c16173f 100644 --- a/pkg/policyendpoints/manager_test.go +++ b/pkg/policyendpoints/manager_test.go @@ -448,7 +448,7 @@ func Test_policyEndpointsManager_computePolicyEndpoints(t *testing.T) { m := &policyEndpointsManager{ endpointChunkSize: tt.fields.endpointChunkSize, } - createList, updateList, deleteList, err := m.computePolicyEndpoints(tt.args.policy, tt.args.policyEndpoints, + createList, updateList, deleteList, _, err := m.computePolicyEndpoints(tt.args.policy, tt.args.policyEndpoints, tt.args.ingressRules, tt.args.egressRules, tt.args.podselectorEndpoints) if len(tt.wantErr) > 0 { diff --git a/pkg/utils/conditions/conditions.go b/pkg/utils/conditions/conditions.go new file mode 100644 index 0000000..f71399f --- /dev/null +++ b/pkg/utils/conditions/conditions.go @@ -0,0 +1,40 @@ +package conditions + +import ( + "context" + + policyinfo "github.com/aws/amazon-network-policy-controller-k8s/api/v1alpha1" + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func UpdatePEConditions(ctx context.Context, k8sClient client.Client, key types.NamespacedName, log logr.Logger, + cType policyinfo.PolicyEndpointConditionType, + cStatus corev1.ConditionStatus, + cReason string, + cMsg string) error { + pe := &policyinfo.PolicyEndpoint{} + var err error + if err = k8sClient.Get(ctx, key, pe); err != nil { + log.Error(err, "getting PE for conditions update failed", "PEName", pe.Name, "PENamespace", pe.Namespace) + } else { + copy := pe.DeepCopy() + cond := policyinfo.PolicyEndpointCondition{ + Type: cType, + Status: cStatus, + LastTransitionTime: metav1.Now(), + Reason: cReason, + Message: cMsg, + } + copy.Status.Conditions = append(copy.Status.Conditions, cond) + log.Info("the controller added condition to PE", "PEName", copy.Name, "PENamespace", copy.Namespace, "Conditions", copy.Status.Conditions) + if err = k8sClient.Status().Patch(ctx, copy, client.MergeFrom(pe)); err != nil { + log.Error(err, "updating PE status failed", "PEName", pe.Name, "PENamespace", pe.Namespace) + } + } + + return err +} diff --git a/pkg/utils/conversions/conversions.go b/pkg/utils/conversions/conversions.go new file mode 100644 index 0000000..1e1fa08 --- /dev/null +++ b/pkg/utils/conversions/conversions.go @@ -0,0 +1,14 @@ +package conversions + +// before golang supports the conversion natively, we use this function to convert bool to int. +// tracking golang support at https://github.com/golang/go/issues/64825 +func BoolToint(b bool) int { + if b { + return 1 + } + return 0 +} + +func IntToBool(i int) bool { + return i == 1 +} diff --git a/pkg/utils/conversions/conversions_test.go b/pkg/utils/conversions/conversions_test.go new file mode 100644 index 0000000..3b44a81 --- /dev/null +++ b/pkg/utils/conversions/conversions_test.go @@ -0,0 +1,18 @@ +package conversions + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestBoolToIntConversion(t *testing.T) { + assert.Equal(t, 1, BoolToint(true)) + assert.Equal(t, 0, BoolToint(false)) +} + +func TestIntToBoolConversion(t *testing.T) { + assert.Equal(t, false, IntToBool(0)) + assert.Equal(t, true, IntToBool(1)) + assert.Equal(t, false, IntToBool(2)) +}