Skip to content

Commit

Permalink
support service without selectors
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel-GrunbergerCA <danielg@armosec.io>
  • Loading branch information
Daniel-GrunbergerCA committed Oct 26, 2023
1 parent 2614613 commit 1899181
Showing 1 changed file with 117 additions and 63 deletions.
180 changes: 117 additions & 63 deletions pkg/networkmanager/network_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package networkmanager
import (
"context"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"net"
Expand All @@ -25,10 +26,14 @@ import (
"github.com/kubescape/k8s-interface/k8sinterface"
"github.com/kubescape/k8s-interface/workloadinterface"
"github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1"
"github.com/kubescape/storage/pkg/generated/clientset/versioned/scheme"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/utils/ptr"
)

Expand Down Expand Up @@ -376,21 +381,22 @@ func (am *NetworkManager) generateNetworkNeighborsEntries(namespace string, netw
}

selector := svc.GetServiceSelector()

if len(selector) == 0 {
logger.L().Debug("service selector is empty", helpers.String("service name", networkEvent.Destination.Name))
if err = am.handleServiceWithNoSelectors(svc, networkEvent, egressIdentifiersMap, ingressIdentifiersMap); err != nil {
logger.L().Error("failed to handle service with no selectors", helpers.String("reason", err.Error()), helpers.String("service name", networkEvent.Destination.Name))
}
continue
}

neighborEntry.PodSelector = &metav1.LabelSelector{
MatchLabels: selector,
}

if namespaceLabels := getNamespaceMatchLabels(networkEvent.Destination.Namespace, namespace); namespaceLabels != nil {
neighborEntry.NamespaceSelector = &metav1.LabelSelector{
MatchLabels: namespaceLabels,
} else {
neighborEntry.PodSelector = &metav1.LabelSelector{
MatchLabels: selector,
}
if namespaceLabels := getNamespaceMatchLabels(networkEvent.Destination.Namespace, namespace); namespaceLabels != nil {
neighborEntry.NamespaceSelector = &metav1.LabelSelector{
MatchLabels: namespaceLabels,
}
}
}

} else {
if networkEvent.Destination.IPAddress == "127.0.0.1" {
// No need to generate for localhost
Expand All @@ -399,58 +405,7 @@ func (am *NetworkManager) generateNetworkNeighborsEntries(namespace string, netw
neighborEntry.IPAddress = networkEvent.Destination.IPAddress
}

portIdentifier := generatePortIdentifier(networkEvent)

neighborEntry.Ports = []v1beta1.NetworkPort{
{
Protocol: v1beta1.Protocol(networkEvent.Protocol),
Port: ptr.To(int32(networkEvent.Port)),
Name: portIdentifier,
}}

neighborEntry.Type = internalTrafficType
if len(networkEvent.GetDestinationPodLabels()) == 0 {
neighborEntry.Type = externalTrafficType
}

// generate identifier for this neighborEntry
identifier, err := generateNeighborsIdentifier(neighborEntry)
if err != nil {
// if we fail to hash, use a random identifier so at least we have the data on the crd
logger.L().Error("failed to hash identifier", helpers.String("identifier", identifier), helpers.String("error", err.Error()))
identifier = uuid.New().String()
}

if networkEvent.PktType == "OUTGOING" {
if existingNeighborEntry, ok := egressIdentifiersMap[identifier]; ok {
// if we already have this identifier, check if there is a new port
for _, port := range existingNeighborEntry.Ports {
if port.Name == portIdentifier {
// port already exists in neighborEntry.Ports...
continue
}
// new port, add it to same entry
neighborEntry.Ports = append(existingNeighborEntry.Ports, neighborEntry.Ports...)
}
}
neighborEntry.Identifier = identifier
egressIdentifiersMap[identifier] = neighborEntry
} else {
if existingNeighborEntry, ok := ingressIdentifiersMap[identifier]; ok {
// if we already have this identifier, check if there is a new port
for _, port := range existingNeighborEntry.Ports {
if port.Name == portIdentifier {
// port already exists in neighborEntry.Ports...
continue
}
// new port, add it to same entry
neighborEntry.Ports = append(existingNeighborEntry.Ports, neighborEntry.Ports...)
break
}
}
neighborEntry.Identifier = identifier
ingressIdentifiersMap[identifier] = neighborEntry
}
saveNeighborEntry(networkEvent, neighborEntry, egressIdentifiersMap, ingressIdentifiersMap)

}

Expand All @@ -467,6 +422,105 @@ func (am *NetworkManager) generateNetworkNeighborsEntries(namespace string, netw
return networkNeighborsSpec
}

func saveNeighborEntry(networkEvent NetworkEvent, neighborEntry v1beta1.NetworkNeighbor, egressIdentifiersMap map[string]v1beta1.NetworkNeighbor, ingressIdentifiersMap map[string]v1beta1.NetworkNeighbor) {

portIdentifier := generatePortIdentifier(networkEvent)

neighborEntry.Ports = []v1beta1.NetworkPort{
{
Protocol: v1beta1.Protocol(networkEvent.Protocol),
Port: ptr.To(int32(networkEvent.Port)),
Name: portIdentifier,
}}

neighborEntry.Type = internalTrafficType
if neighborEntry.NamespaceSelector == nil && neighborEntry.PodSelector == nil {
neighborEntry.Type = externalTrafficType
}

// generate identifier for this neighborEntry
identifier, err := generateNeighborsIdentifier(neighborEntry)
if err != nil {
// if we fail to hash, use a random identifier so at least we have the data on the crd
logger.L().Debug("failed to hash identifier", helpers.String("identifier", identifier), helpers.String("error", err.Error()))
identifier = uuid.New().String()
}

if networkEvent.PktType == "OUTGOING" {
addToMap(egressIdentifiersMap, identifier, portIdentifier, neighborEntry)
} else {
addToMap(ingressIdentifiersMap, identifier, portIdentifier, neighborEntry)
}
}

// addToMap adds neighborEntry to identifiersMap, if identifier already exists, it will add the ports to the existing entry
func addToMap(identifiersMap map[string]v1beta1.NetworkNeighbor, identifier string, portIdentifier string, neighborEntry v1beta1.NetworkNeighbor) {
if existingNeighborEntry, ok := identifiersMap[identifier]; ok {
found := false
for _, port := range existingNeighborEntry.Ports {
if port.Name == portIdentifier {
found = true
break
}
}
if !found {
neighborEntry.Ports = append(existingNeighborEntry.Ports, neighborEntry.Ports...)
}
}
neighborEntry.Identifier = identifier
identifiersMap[identifier] = neighborEntry
}

func (am *NetworkManager) handleServiceWithNoSelectors(svc workloadinterface.IWorkload, networkEvent NetworkEvent, egressIdentifiersMap map[string]v1beta1.NetworkNeighbor, ingressIdentifiersMap map[string]v1beta1.NetworkNeighbor) error {

// retrieve endpoint
endpoints, err := am.k8sClient.GetWorkload(networkEvent.Destination.Namespace, "Endpoint", networkEvent.Destination.Name)

endpointsMap := endpoints.GetObject()

endpointsBytes, err := json.Marshal(endpointsMap)
if err != nil {
return err
}

decoder := serializer.NewCodecFactory(scheme.Scheme).UniversalDecoder()
obj := &v1.Endpoints{}
err = runtime.DecodeInto(decoder, endpointsBytes, obj)
if err != nil {
return err
}

for _, subset := range obj.Subsets {
for _, address := range subset.Addresses {
neighborEntry := v1beta1.NetworkNeighbor{
IPAddress: address.IP,
Type: internalTrafficType,
}
for _, ports := range subset.Ports {
neighborEntry.Ports = append(neighborEntry.Ports, v1beta1.NetworkPort{
Protocol: v1beta1.Protocol(ports.Protocol),
Port: ptr.To(int32(ports.Port)),
Name: fmt.Sprintf("%s-%d", ports.Protocol, ports.Port),
})
}

identifier, err := generateNeighborsIdentifier(neighborEntry)
if err != nil {
identifier = uuid.New().String()
}
neighborEntry.Identifier = identifier

if networkEvent.PktType == "OUTGOING" {
egressIdentifiersMap[identifier] = neighborEntry
} else {
ingressIdentifiersMap[identifier] = neighborEntry
}
}
}

return nil
}

func getNamespaceMatchLabels(destinationNamespace, sourceNamespace string) map[string]string {
if destinationNamespace != sourceNamespace {
// from version 1.22, all namespace have the kubernetes.io/metadata.name label
Expand Down

0 comments on commit 1899181

Please sign in to comment.