Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
pxaws committed Feb 7, 2025
1 parent 191bd5a commit 42f2f53
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,15 @@ func (w *endpointSliceWatcher) waitForCacheSync(stopCh chan struct{}) {
func (w *endpointSliceWatcher) extractEndpointSliceKeyValuePairs(slice *discv1.EndpointSlice) []kvPair {
var pairs []kvPair

isFirstPod := true
for _, endpoint := range slice.Endpoints {
podName := ""
ns := ""
if endpoint.TargetRef != nil {
if endpoint.TargetRef.Kind == "Pod" {
podName = endpoint.TargetRef.Name
if endpoint.TargetRef.Kind != "Pod" {
continue
}
ns = endpoint.TargetRef.Namespace

podName := endpoint.TargetRef.Name
ns := endpoint.TargetRef.Namespace

derivedWorkload := inferWorkloadName(podName)
if derivedWorkload == "" {
Expand Down Expand Up @@ -128,31 +129,21 @@ func (w *endpointSliceWatcher) extractEndpointSliceKeyValuePairs(slice *discv1.E
}
}
}
}

}

// Service reference: "kubernetes.io/service-name"
svcName := slice.Labels["kubernetes.io/service-name"]
if svcName != "" {
svcFull := svcName + "@" + slice.Namespace

// If there's at least one endpoint, derive the workload from it
if len(slice.Endpoints) > 0 {
if endpoint := slice.Endpoints[0]; endpoint.TargetRef != nil {
derived := inferWorkloadName(endpoint.TargetRef.Name)
if derived == "" {
w.logger.Warn("failed to infer workload name from Pod name", zap.String("podName", endpoint.TargetRef.Name))
} else {
firstWl := derived + "@" + endpoint.TargetRef.Namespace
// Build service name -> "workload@namespace" pair from the first pod
if isFirstPod {
svcName := slice.Labels["kubernetes.io/service-name"]
if svcName != "" {
isFirstPod = false
pairs = append(pairs, kvPair{
key: svcFull,
value: firstWl,
key: svcName + "@" + ns,
value: fullWl,
isService: true,
})
}
}
}

}

return pairs
Expand All @@ -169,7 +160,7 @@ func (w *endpointSliceWatcher) handleSliceAdd(obj interface{}) {
pairs := w.extractEndpointSliceKeyValuePairs(newSlice)

// Insert them into our ipToWorkload / serviceToWorkload, and track the keys.
var keys []string
keys := make([]string, 0, len(pairs))
for _, kv := range pairs {
if kv.isService {
w.serviceToWorkload.Store(kv.key, kv.value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,20 +95,10 @@ func getKubernetesResolver(platformCode, clusterName string, logger *zap.Logger)

if useListPod {
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0)
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
err = podInformer.SetTransform(minimizePod)
if err != nil {
logger.Error("failed to minimize Pod objects", zap.Error(err))
}
serviceInformer := sharedInformerFactory.Core().V1().Services().Informer()
err = serviceInformer.SetTransform(minimizeService)
if err != nil {
logger.Error("failed to minimize Service objects", zap.Error(err))
}

timedDeleter := &TimedDeleter{Delay: deletionDelay}
poWatcher := newPodWatcher(logger, podInformer, timedDeleter)
svcWatcher := newServiceWatcher(logger, serviceInformer, timedDeleter)

poWatcher := newPodWatcher(logger, sharedInformerFactory, timedDeleter)
svcWatcher := newServiceWatcher(logger, sharedInformerFactory, timedDeleter)

safeStopCh := &safeChannel{ch: make(chan struct{}), closed: false}
// initialize the pod and service watchers for the cluster
Expand Down Expand Up @@ -140,16 +130,11 @@ func getKubernetesResolver(platformCode, clusterName string, logger *zap.Logger)
}
} else {
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0)
serviceInformer := sharedInformerFactory.Core().V1().Services().Informer()
err = serviceInformer.SetTransform(minimizeService)
if err != nil {
logger.Error("failed to minimize Service objects", zap.Error(err))
}

timedDeleter := &TimedDeleter{Delay: deletionDelay}
svcWatcher := newServiceWatcher(logger, serviceInformer, timedDeleter)

svcWatcher := newServiceWatcher(logger, sharedInformerFactory, timedDeleter)
endptSliceWatcher := newEndpointSliceWatcher(logger, sharedInformerFactory, timedDeleter)

safeStopCh := &safeChannel{ch: make(chan struct{}), closed: false}
// initialize the pod and service watchers for the cluster
svcWatcher.Run(safeStopCh.ch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestEksResolver(t *testing.T) {
}
})

t.Run("Test Process", func(t *testing.T) {
t.Run("Test Process when useListPod is true", func(t *testing.T) {
// helper function to get string values from the attributes
getStrAttr := func(attributes pcommon.Map, key string, t *testing.T) string {
if value, ok := attributes.Get(key); ok {
Expand Down Expand Up @@ -159,6 +159,67 @@ func TestEksResolver(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, "192.168.1.2", getStrAttr(attributes, attr.AWSRemoteService, t))
})

t.Run("Test Process when useListPod is false", func(t *testing.T) {
// helper function to get string values from the attributes
getStrAttr := func(attributes pcommon.Map, key string, t *testing.T) string {
if value, ok := attributes.Get(key); ok {
return value.AsString()
}
t.Errorf("Failed to get value for key: %s", key)
return ""
}

logger, _ := zap.NewProduction()
resolver := &kubernetesResolver{
logger: logger,
clusterName: "test",
platformCode: config.PlatformEKS,
ipToWorkloadAndNamespace: &sync.Map{},
ipToServiceAndNamespace: &sync.Map{},
serviceToWorkload: &sync.Map{},
useListPod: false,
}

// Test case 1: "aws.remote.service" contains IP:Port
attributes := pcommon.NewMap()
attributes.PutStr(attr.AWSRemoteService, "192.0.2.1:8080")
resourceAttributes := pcommon.NewMap()
resolver.ipToWorkloadAndNamespace.Store("192.0.2.1:8080", "test-deployment@test-namespace")
err := resolver.Process(attributes, resourceAttributes)
assert.NoError(t, err)
assert.Equal(t, "test-deployment", getStrAttr(attributes, attr.AWSRemoteService, t))
assert.Equal(t, "eks:test/test-namespace", getStrAttr(attributes, attr.AWSRemoteEnvironment, t))

// Test case 2: "aws.remote.service" contains only IP
attributes = pcommon.NewMap()
attributes.PutStr(attr.AWSRemoteService, "192.0.2.2")
resourceAttributes = pcommon.NewMap()
resolver.ipToWorkloadAndNamespace.Store("192.0.2.2", "test-deployment-2@test-namespace-2")
err = resolver.Process(attributes, resourceAttributes)
assert.NoError(t, err)
assert.Equal(t, "test-deployment-2", getStrAttr(attributes, attr.AWSRemoteService, t))
assert.Equal(t, "eks:test/test-namespace-2", getStrAttr(attributes, attr.AWSRemoteEnvironment, t))

// Test case 3: "aws.remote.service" contains non-ip string
attributes = pcommon.NewMap()
attributes.PutStr(attr.AWSRemoteService, "not-an-ip")
resourceAttributes = pcommon.NewMap()
err = resolver.Process(attributes, resourceAttributes)
assert.NoError(t, err)
assert.Equal(t, "not-an-ip", getStrAttr(attributes, attr.AWSRemoteService, t))

// Test case 4: Process with cluster ip
attributes = pcommon.NewMap()
attributes.PutStr(attr.AWSRemoteService, "192.168.1.2")
resourceAttributes = pcommon.NewMap()
resolver.ipToServiceAndNamespace.Store("192.168.1.2", "service1@test-namespace-3")
resolver.serviceToWorkload.Store("service1@test-namespace-3", "service1-deployment@test-namespace-3")
err = resolver.Process(attributes, resourceAttributes)
assert.NoError(t, err)
assert.Equal(t, "service1-deployment", getStrAttr(attributes, attr.AWSRemoteService, t))
assert.Equal(t, "eks:test/test-namespace-3", getStrAttr(attributes, attr.AWSRemoteEnvironment, t))
})
}

func TestK8sResourceAttributesResolverOnEKS(t *testing.T) {
Expand All @@ -168,10 +229,9 @@ func TestK8sResourceAttributesResolverOnEKS(t *testing.T) {
getStrAttr := func(attributes pcommon.Map, key string, t *testing.T) string {
if value, ok := attributes.Get(key); ok {
return value.AsString()
} else {
t.Errorf("Failed to get value for key: %s", key)
return ""
}
t.Errorf("Failed to get value for key: %s", key)
return ""
}

resolver := newKubernetesResourceAttributesResolver(config.PlatformEKS, "test-cluster")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
mapset "github.com/deckarep/golang-set/v2"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)

Expand Down Expand Up @@ -112,14 +113,20 @@ type podWatcher struct {
deleter Deleter
}

func newPodWatcher(logger *zap.Logger, informer cache.SharedIndexInformer, deleter Deleter) *podWatcher {
func newPodWatcher(logger *zap.Logger, sharedInformerFactory informers.SharedInformerFactory, deleter Deleter) *podWatcher {
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
err := podInformer.SetTransform(minimizePod)
if err != nil {
logger.Error("failed to minimize Pod objects", zap.Error(err))
}

return &podWatcher{
ipToPod: &sync.Map{},
podToWorkloadAndNamespace: &sync.Map{},
workloadAndNamespaceToLabels: &sync.Map{},
workloadPodCount: make(map[string]int),
logger: logger,
informer: informer,
informer: podInformer,
deleter: deleter,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
mapset "github.com/deckarep/golang-set/v2"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)

Expand All @@ -20,12 +21,18 @@ type serviceWatcher struct {
deleter Deleter
}

func newServiceWatcher(logger *zap.Logger, informer cache.SharedIndexInformer, deleter Deleter) *serviceWatcher {
func newServiceWatcher(logger *zap.Logger, sharedInformerFactory informers.SharedInformerFactory, deleter Deleter) *serviceWatcher {
serviceInformer := sharedInformerFactory.Core().V1().Services().Informer()
err := serviceInformer.SetTransform(minimizeService)
if err != nil {
logger.Error("failed to minimize Service objects", zap.Error(err))
}

return &serviceWatcher{
ipToServiceAndNamespace: &sync.Map{},
serviceAndNamespaceToSelectors: &sync.Map{},
logger: logger,
informer: informer,
informer: serviceInformer,
deleter: deleter,
}
}
Expand Down

0 comments on commit 42f2f53

Please sign in to comment.