diff --git a/plugins/processors/awsapplicationsignals/internal/resolver/endpointslicewatcher.go b/plugins/processors/awsapplicationsignals/internal/resolver/endpointslicewatcher.go index bf5fd6cb39..3841925ada 100644 --- a/plugins/processors/awsapplicationsignals/internal/resolver/endpointslicewatcher.go +++ b/plugins/processors/awsapplicationsignals/internal/resolver/endpointslicewatcher.go @@ -91,14 +91,15 @@ func (w *endpointSliceWatcher) waitForCacheSync(stopCh chan struct{}) { func (w *endpointSliceWatcher) extractEndpointSliceKeyValuePairs(slice *discv1.EndpointSlice) []kvPair { var pairs []kvPair + isFirstPod := true for _, endpoint := range slice.Endpoints { - podName := "" - ns := "" if endpoint.TargetRef != nil { - if endpoint.TargetRef.Kind == "Pod" { - podName = endpoint.TargetRef.Name + if endpoint.TargetRef.Kind != "Pod" { + continue } - ns = endpoint.TargetRef.Namespace + + podName := endpoint.TargetRef.Name + ns := endpoint.TargetRef.Namespace derivedWorkload := inferWorkloadName(podName) if derivedWorkload == "" { @@ -128,31 +129,21 @@ func (w *endpointSliceWatcher) extractEndpointSliceKeyValuePairs(slice *discv1.E } } } - } - } - - // Service reference: "kubernetes.io/service-name" - svcName := slice.Labels["kubernetes.io/service-name"] - if svcName != "" { - svcFull := svcName + "@" + slice.Namespace - - // If there's at least one endpoint, derive the workload from it - if len(slice.Endpoints) > 0 { - if endpoint := slice.Endpoints[0]; endpoint.TargetRef != nil { - derived := inferWorkloadName(endpoint.TargetRef.Name) - if derived == "" { - w.logger.Warn("failed to infer workload name from Pod name", zap.String("podName", endpoint.TargetRef.Name)) - } else { - firstWl := derived + "@" + endpoint.TargetRef.Namespace + // Build service name -> "workload@namespace" pair from the first pod + if isFirstPod { + svcName := slice.Labels["kubernetes.io/service-name"] + if svcName != "" { + isFirstPod = false pairs = append(pairs, kvPair{ - key: svcFull, - value: firstWl, + key: svcName + "@" + ns, + value: fullWl, isService: true, }) } } } + } return pairs @@ -169,7 +160,7 @@ func (w *endpointSliceWatcher) handleSliceAdd(obj interface{}) { pairs := w.extractEndpointSliceKeyValuePairs(newSlice) // Insert them into our ipToWorkload / serviceToWorkload, and track the keys. - var keys []string + keys := make([]string, 0, len(pairs)) for _, kv := range pairs { if kv.isService { w.serviceToWorkload.Store(kv.key, kv.value) diff --git a/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes.go b/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes.go index f97be9bc3d..45017a99a6 100644 --- a/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes.go +++ b/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes.go @@ -95,20 +95,10 @@ func getKubernetesResolver(platformCode, clusterName string, logger *zap.Logger) if useListPod { sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0) - podInformer := sharedInformerFactory.Core().V1().Pods().Informer() - err = podInformer.SetTransform(minimizePod) - if err != nil { - logger.Error("failed to minimize Pod objects", zap.Error(err)) - } - serviceInformer := sharedInformerFactory.Core().V1().Services().Informer() - err = serviceInformer.SetTransform(minimizeService) - if err != nil { - logger.Error("failed to minimize Service objects", zap.Error(err)) - } - timedDeleter := &TimedDeleter{Delay: deletionDelay} - poWatcher := newPodWatcher(logger, podInformer, timedDeleter) - svcWatcher := newServiceWatcher(logger, serviceInformer, timedDeleter) + + poWatcher := newPodWatcher(logger, sharedInformerFactory, timedDeleter) + svcWatcher := newServiceWatcher(logger, sharedInformerFactory, timedDeleter) safeStopCh := &safeChannel{ch: make(chan struct{}), closed: false} // initialize the pod and service watchers for the cluster @@ -140,16 +130,11 @@ func getKubernetesResolver(platformCode, clusterName string, logger *zap.Logger) } } else { sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0) - serviceInformer := sharedInformerFactory.Core().V1().Services().Informer() - err = serviceInformer.SetTransform(minimizeService) - if err != nil { - logger.Error("failed to minimize Service objects", zap.Error(err)) - } - timedDeleter := &TimedDeleter{Delay: deletionDelay} - svcWatcher := newServiceWatcher(logger, serviceInformer, timedDeleter) + svcWatcher := newServiceWatcher(logger, sharedInformerFactory, timedDeleter) endptSliceWatcher := newEndpointSliceWatcher(logger, sharedInformerFactory, timedDeleter) + safeStopCh := &safeChannel{ch: make(chan struct{}), closed: false} // initialize the pod and service watchers for the cluster svcWatcher.Run(safeStopCh.ch) diff --git a/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes_test.go b/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes_test.go index 404aa94b50..a0ce5b2225 100644 --- a/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes_test.go +++ b/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes_test.go @@ -98,7 +98,7 @@ func TestEksResolver(t *testing.T) { } }) - t.Run("Test Process", func(t *testing.T) { + t.Run("Test Process when useListPod is true", func(t *testing.T) { // helper function to get string values from the attributes getStrAttr := func(attributes pcommon.Map, key string, t *testing.T) string { if value, ok := attributes.Get(key); ok { @@ -159,6 +159,67 @@ func TestEksResolver(t *testing.T) { assert.NoError(t, err) assert.Equal(t, "192.168.1.2", getStrAttr(attributes, attr.AWSRemoteService, t)) }) + + t.Run("Test Process when useListPod is false", func(t *testing.T) { + // helper function to get string values from the attributes + getStrAttr := func(attributes pcommon.Map, key string, t *testing.T) string { + if value, ok := attributes.Get(key); ok { + return value.AsString() + } + t.Errorf("Failed to get value for key: %s", key) + return "" + } + + logger, _ := zap.NewProduction() + resolver := &kubernetesResolver{ + logger: logger, + clusterName: "test", + platformCode: config.PlatformEKS, + ipToWorkloadAndNamespace: &sync.Map{}, + ipToServiceAndNamespace: &sync.Map{}, + serviceToWorkload: &sync.Map{}, + useListPod: false, + } + + // Test case 1: "aws.remote.service" contains IP:Port + attributes := pcommon.NewMap() + attributes.PutStr(attr.AWSRemoteService, "192.0.2.1:8080") + resourceAttributes := pcommon.NewMap() + resolver.ipToWorkloadAndNamespace.Store("192.0.2.1:8080", "test-deployment@test-namespace") + err := resolver.Process(attributes, resourceAttributes) + assert.NoError(t, err) + assert.Equal(t, "test-deployment", getStrAttr(attributes, attr.AWSRemoteService, t)) + assert.Equal(t, "eks:test/test-namespace", getStrAttr(attributes, attr.AWSRemoteEnvironment, t)) + + // Test case 2: "aws.remote.service" contains only IP + attributes = pcommon.NewMap() + attributes.PutStr(attr.AWSRemoteService, "192.0.2.2") + resourceAttributes = pcommon.NewMap() + resolver.ipToWorkloadAndNamespace.Store("192.0.2.2", "test-deployment-2@test-namespace-2") + err = resolver.Process(attributes, resourceAttributes) + assert.NoError(t, err) + assert.Equal(t, "test-deployment-2", getStrAttr(attributes, attr.AWSRemoteService, t)) + assert.Equal(t, "eks:test/test-namespace-2", getStrAttr(attributes, attr.AWSRemoteEnvironment, t)) + + // Test case 3: "aws.remote.service" contains non-ip string + attributes = pcommon.NewMap() + attributes.PutStr(attr.AWSRemoteService, "not-an-ip") + resourceAttributes = pcommon.NewMap() + err = resolver.Process(attributes, resourceAttributes) + assert.NoError(t, err) + assert.Equal(t, "not-an-ip", getStrAttr(attributes, attr.AWSRemoteService, t)) + + // Test case 4: Process with cluster ip + attributes = pcommon.NewMap() + attributes.PutStr(attr.AWSRemoteService, "192.168.1.2") + resourceAttributes = pcommon.NewMap() + resolver.ipToServiceAndNamespace.Store("192.168.1.2", "service1@test-namespace-3") + resolver.serviceToWorkload.Store("service1@test-namespace-3", "service1-deployment@test-namespace-3") + err = resolver.Process(attributes, resourceAttributes) + assert.NoError(t, err) + assert.Equal(t, "service1-deployment", getStrAttr(attributes, attr.AWSRemoteService, t)) + assert.Equal(t, "eks:test/test-namespace-3", getStrAttr(attributes, attr.AWSRemoteEnvironment, t)) + }) } func TestK8sResourceAttributesResolverOnEKS(t *testing.T) { @@ -168,10 +229,9 @@ func TestK8sResourceAttributesResolverOnEKS(t *testing.T) { getStrAttr := func(attributes pcommon.Map, key string, t *testing.T) string { if value, ok := attributes.Get(key); ok { return value.AsString() - } else { - t.Errorf("Failed to get value for key: %s", key) - return "" } + t.Errorf("Failed to get value for key: %s", key) + return "" } resolver := newKubernetesResourceAttributesResolver(config.PlatformEKS, "test-cluster") diff --git a/plugins/processors/awsapplicationsignals/internal/resolver/podwatcher.go b/plugins/processors/awsapplicationsignals/internal/resolver/podwatcher.go index 97b5d0a978..1187f1f8bb 100644 --- a/plugins/processors/awsapplicationsignals/internal/resolver/podwatcher.go +++ b/plugins/processors/awsapplicationsignals/internal/resolver/podwatcher.go @@ -9,6 +9,7 @@ import ( mapset "github.com/deckarep/golang-set/v2" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" ) @@ -112,14 +113,20 @@ type podWatcher struct { deleter Deleter } -func newPodWatcher(logger *zap.Logger, informer cache.SharedIndexInformer, deleter Deleter) *podWatcher { +func newPodWatcher(logger *zap.Logger, sharedInformerFactory informers.SharedInformerFactory, deleter Deleter) *podWatcher { + podInformer := sharedInformerFactory.Core().V1().Pods().Informer() + err := podInformer.SetTransform(minimizePod) + if err != nil { + logger.Error("failed to minimize Pod objects", zap.Error(err)) + } + return &podWatcher{ ipToPod: &sync.Map{}, podToWorkloadAndNamespace: &sync.Map{}, workloadAndNamespaceToLabels: &sync.Map{}, workloadPodCount: make(map[string]int), logger: logger, - informer: informer, + informer: podInformer, deleter: deleter, } } diff --git a/plugins/processors/awsapplicationsignals/internal/resolver/servicewatcher.go b/plugins/processors/awsapplicationsignals/internal/resolver/servicewatcher.go index da84309821..54207119cc 100644 --- a/plugins/processors/awsapplicationsignals/internal/resolver/servicewatcher.go +++ b/plugins/processors/awsapplicationsignals/internal/resolver/servicewatcher.go @@ -9,6 +9,7 @@ import ( mapset "github.com/deckarep/golang-set/v2" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" ) @@ -20,12 +21,18 @@ type serviceWatcher struct { deleter Deleter } -func newServiceWatcher(logger *zap.Logger, informer cache.SharedIndexInformer, deleter Deleter) *serviceWatcher { +func newServiceWatcher(logger *zap.Logger, sharedInformerFactory informers.SharedInformerFactory, deleter Deleter) *serviceWatcher { + serviceInformer := sharedInformerFactory.Core().V1().Services().Informer() + err := serviceInformer.SetTransform(minimizeService) + if err != nil { + logger.Error("failed to minimize Service objects", zap.Error(err)) + } + return &serviceWatcher{ ipToServiceAndNamespace: &sync.Map{}, serviceAndNamespaceToSelectors: &sync.Map{}, logger: logger, - informer: informer, + informer: serviceInformer, deleter: deleter, } }