diff --git a/plugins/processors/awsapplicationsignals/internal/resolver/endpointslicewatcher.go b/plugins/processors/awsapplicationsignals/internal/resolver/endpointslicewatcher.go new file mode 100644 index 0000000000..00fc80b70b --- /dev/null +++ b/plugins/processors/awsapplicationsignals/internal/resolver/endpointslicewatcher.go @@ -0,0 +1,297 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package resolver + +import ( + "fmt" + "sync" + + "go.uber.org/zap" + discv1 "k8s.io/api/discovery/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" +) + +// endpointSliceWatcher watches EndpointSlices and builds: +// 1. ip/ip:port -> "workload@namespace" +// 2. service@namespace -> "workload@namespace" +type endpointSliceWatcher struct { + logger *zap.Logger + informer cache.SharedIndexInformer + ipToWorkload *sync.Map // key: "ip" or "ip:port", val: "workload@ns" + serviceToWorkload *sync.Map // key: "service@namespace", val: "workload@ns" + + // For bookkeeping, so we can remove old mappings upon EndpointSlice deletion + sliceToKeysMap sync.Map // map[sliceUID string] -> []string of keys we inserted, which are "ip", "ip:port", or "service@namespace" + deleter Deleter +} + +// kvPair holds one mapping from key -> value. The isService flag +// indicates whether this key is for a Service or for an IP/IP:port. +type kvPair struct { + key string // key: "ip" or "ip:port" or "service@namespace" + value string // value: "workload@namespace" + isService bool // true if key = "service@namespace" +} + +// newEndpointSliceWatcher creates an EndpointSlice watcher for the new approach (when USE_LIST_POD=false). +func newEndpointSliceWatcher( + logger *zap.Logger, + factory informers.SharedInformerFactory, + deleter Deleter, +) *endpointSliceWatcher { + + esInformer := factory.Discovery().V1().EndpointSlices().Informer() + err := esInformer.SetTransform(minimizeEndpointSlice) + if err != nil { + logger.Error("failed to minimize Service objects", zap.Error(err)) + } + + return &endpointSliceWatcher{ + logger: logger, + informer: esInformer, + ipToWorkload: &sync.Map{}, + serviceToWorkload: &sync.Map{}, + deleter: deleter, + } +} + +// run starts the endpointSliceWatcher. +func (w *endpointSliceWatcher) Run(stopCh chan struct{}) { + w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + w.handleSliceAdd(obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + w.handleSliceUpdate(newObj, oldObj) + }, + DeleteFunc: func(obj interface{}) { + w.handleSliceDelete(obj) + }, + }) + go w.informer.Run(stopCh) +} + +func (w *endpointSliceWatcher) waitForCacheSync(stopCh chan struct{}) { + if !cache.WaitForNamedCacheSync("endpointSliceWatcher", stopCh, w.informer.HasSynced) { + w.logger.Fatal("timed out waiting for endpointSliceWatcher cache to sync") + } + w.logger.Info("endpointSliceWatcher: Cache synced") +} + +// extractEndpointSliceKeyValuePairs computes the relevant mappings from an EndpointSlice. +// +// It returns a list of kvPair: +// - All IP and IP:port keys (isService=false) -> "workload@ns" +// - The Service name key (isService=true) -> first "workload@ns" found +// +// This function does NOT modify ipToWorkload or serviceToWorkload. It's purely for computing +// the pairs, so it can be reused by both add and update methods. +func (w *endpointSliceWatcher) extractEndpointSliceKeyValuePairs(slice *discv1.EndpointSlice) []kvPair { + var pairs []kvPair + + for _, endpoint := range slice.Endpoints { + podName := "" + ns := "" + if endpoint.TargetRef != nil { + if endpoint.TargetRef.Kind == "Pod" { + podName = endpoint.TargetRef.Name + } + ns = endpoint.TargetRef.Namespace + + derivedWorkload := inferWorkloadName(podName) + if derivedWorkload == "" { + w.logger.Warn("failed to infer workload name from Pod name", zap.String("podName", podName)) + continue + } + fullWl := derivedWorkload + "@" + ns + + // Build IP and IP:port pairs + for _, addr := range endpoint.Addresses { + // "ip" -> "workload@namespace" + pairs = append(pairs, kvPair{ + key: addr, + value: fullWl, + isService: false, + }) + + // "ip:port" -> "workload@namespace" for each port + for _, portDef := range slice.Ports { + if portDef.Port != nil { + ipPort := fmt.Sprintf("%s:%d", addr, *portDef.Port) + pairs = append(pairs, kvPair{ + key: ipPort, + value: fullWl, + isService: false, + }) + } + } + } + } + + } + + // Service reference: "kubernetes.io/service-name" + svcName := slice.Labels["kubernetes.io/service-name"] + if svcName != "" { + svcFull := svcName + "@" + slice.Namespace + + // If there's at least one endpoint, derive the workload from it + if len(slice.Endpoints) > 0 { + if endpoint := slice.Endpoints[0]; endpoint.TargetRef != nil { + derived := inferWorkloadName(endpoint.TargetRef.Name) + if derived != "" { + firstWl := derived + "@" + endpoint.TargetRef.Namespace + pairs = append(pairs, kvPair{ + key: svcFull, + value: firstWl, + isService: true, + }) + } + } + } + } + + return pairs +} + +// handleSliceAdd handles a new EndpointSlice that wasn't seen before. +// It computes all keys and directly stores them. Then it records those keys +// in sliceToKeysMap so that we can remove them later upon deletion. +func (w *endpointSliceWatcher) handleSliceAdd(obj interface{}) { + newSlice := obj.(*discv1.EndpointSlice) + sliceUID := string(newSlice.UID) + + // Compute all key-value pairs for this new slice + pairs := w.extractEndpointSliceKeyValuePairs(newSlice) + + // Insert them into our ipToWorkload / serviceToWorkload, and track the keys. + var keys []string + for _, kv := range pairs { + if kv.isService { + w.serviceToWorkload.Store(kv.key, kv.value) + } else { + w.ipToWorkload.Store(kv.key, kv.value) + } + keys = append(keys, kv.key) + } + + // Save these keys so we can remove them on delete + w.sliceToKeysMap.Store(sliceUID, keys) +} + +// handleSliceUpdate handles an update from oldSlice -> newSlice. +// Instead of blindly removing all old keys and adding new ones, it diffs them: +// - remove only keys that no longer exist, +// - add only new keys that didn't exist before, +// - keep those that haven't changed. +func (w *endpointSliceWatcher) handleSliceUpdate(oldObj, newObj interface{}) { + oldSlice := oldObj.(*discv1.EndpointSlice) + newSlice := newObj.(*discv1.EndpointSlice) + + oldUID := string(oldSlice.UID) + newUID := string(newSlice.UID) + + // 1) Fetch old keys from sliceToKeysMap (if present). + var oldKeys []string + if val, ok := w.sliceToKeysMap.Load(oldUID); ok { + oldKeys = val.([]string) + } + + // 2) Compute fresh pairs (and thus keys) from the new slice. + newPairs := w.extractEndpointSliceKeyValuePairs(newSlice) + var newKeys []string + for _, kv := range newPairs { + newKeys = append(newKeys, kv.key) + } + + // Convert oldKeys/newKeys to sets for easy diff + oldKeysSet := make(map[string]struct{}, len(oldKeys)) + for _, k := range oldKeys { + oldKeysSet[k] = struct{}{} + } + newKeysSet := make(map[string]struct{}, len(newKeys)) + for _, k := range newKeys { + newKeysSet[k] = struct{}{} + } + + // 3) For each key in oldKeys that doesn't exist in newKeys, remove it + for k := range oldKeysSet { + if _, stillPresent := newKeysSet[k]; !stillPresent { + w.deleter.DeleteWithDelay(w.ipToWorkload, k) + w.deleter.DeleteWithDelay(w.serviceToWorkload, k) + } + } + + // 4) For each key in newKeys that wasn't in oldKeys, we need to store it + // in the appropriate sync.Map. We'll look up the value from newPairs. + for _, kv := range newPairs { + if _, alreadyHad := oldKeysSet[kv.key]; !alreadyHad { + if kv.isService { + w.serviceToWorkload.Store(kv.key, kv.value) + } else { + w.ipToWorkload.Store(kv.key, kv.value) + } + } + } + + // 5) Update sliceToKeysMap for the new slice UID + // (Often the UID doesn't change across updates, but we'll handle it properly.) + w.sliceToKeysMap.Delete(oldUID) + w.sliceToKeysMap.Store(newUID, newKeys) +} + +// handleSliceDelete removes any IP->workload or service->workload keys that were created by this slice. +func (w *endpointSliceWatcher) handleSliceDelete(obj interface{}) { + slice := obj.(*discv1.EndpointSlice) + w.removeSliceKeys(slice) +} + +func (w *endpointSliceWatcher) removeSliceKeys(slice *discv1.EndpointSlice) { + sliceUID := string(slice.UID) + val, ok := w.sliceToKeysMap.Load(sliceUID) + if !ok { + return + } + + keys := val.([]string) + for _, k := range keys { + w.deleter.DeleteWithDelay(w.ipToWorkload, k) + w.deleter.DeleteWithDelay(w.serviceToWorkload, k) + } + w.sliceToKeysMap.Delete(sliceUID) +} + +// minimizeEndpointSlice removes fields that are not required by our mapping logic, +// retaining only the minimal set of fields needed (ObjectMeta.Name, Namespace, UID, Labels, +// Endpoints (with their Addresses and TargetRef) and Ports). +func minimizeEndpointSlice(obj interface{}) (interface{}, error) { + eps, ok := obj.(*discv1.EndpointSlice) + if !ok { + return obj, fmt.Errorf("object is not an EndpointSlice") + } + + // Minimize metadata: we only really need Name, Namespace, UID and Labels. + eps.Annotations = nil + eps.ManagedFields = nil + eps.Finalizers = nil + + // The watcher only uses: + // - eps.Labels["kubernetes.io/service-name"] + // - eps.Namespace (from metadata) + // - eps.UID (from metadata) + // - eps.Endpoints: for each endpoint, its Addresses and TargetRef. + // - eps.Ports: each port's Port (and optionally Name/Protocol) + // + // For each endpoint, clear fields that we don’t use. + for i := range eps.Endpoints { + // We only need Addresses and TargetRef. Hostname, NodeName, and Zone are not used. + eps.Endpoints[i].Hostname = nil + eps.Endpoints[i].NodeName = nil + eps.Endpoints[i].Zone = nil + } + + // No transformation is needed for eps.Ports because we use them directly. + return eps, nil +} diff --git a/plugins/processors/awsapplicationsignals/internal/resolver/endpointslicewatcher_test.go b/plugins/processors/awsapplicationsignals/internal/resolver/endpointslicewatcher_test.go new file mode 100644 index 0000000000..57bb9dc583 --- /dev/null +++ b/plugins/processors/awsapplicationsignals/internal/resolver/endpointslicewatcher_test.go @@ -0,0 +1,296 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package resolver + +import ( + "fmt" + "reflect" + "sort" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + v1 "k8s.io/api/core/v1" + discv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +func newEndpointSliceWatcherForTest() *endpointSliceWatcher { + return &endpointSliceWatcher{ + logger: zap.NewNop(), + ipToWorkload: &sync.Map{}, + serviceToWorkload: &sync.Map{}, + deleter: mockDeleter, + } +} + +// createTestEndpointSlice is a helper to build a minimal EndpointSlice. +// The slice will have one Endpoint (with its TargetRef) and a list of Ports. +// svcName is stored in the Labels (key "kubernetes.io/service-name") if non-empty. +func createTestEndpointSlice(uid, namespace, svcName, podName string, addresses []string, portNumbers []int32) *discv1.EndpointSlice { + // Build the port list. + var ports []discv1.EndpointPort + for i, p := range portNumbers { + portVal := p // need a pointer + name := fmt.Sprintf("port-%d", i) + protocol := v1.ProtocolTCP + ports = append(ports, discv1.EndpointPort{ + Name: &name, + Protocol: &protocol, + Port: &portVal, + }) + } + + // Build a single endpoint with the given addresses and a TargetRef. + endpoint := discv1.Endpoint{ + Addresses: addresses, + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Name: podName, + Namespace: namespace, + }, + } + + labels := map[string]string{} + if svcName != "" { + labels["kubernetes.io/service-name"] = svcName + } + + return &discv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID(uid), + Namespace: namespace, + Labels: labels, + }, + Endpoints: []discv1.Endpoint{endpoint}, + Ports: ports, + } +} + +// --- Tests --- + +// TestEndpointSliceAddition verifies that when a new EndpointSlice is added, +// the appropriate keys are inserted into the maps. +func TestEndpointSliceAddition(t *testing.T) { + watcher := newEndpointSliceWatcherForTest() + + // Create a test EndpointSlice: + // - UID: "uid-1", Namespace: "testns" + // - Labels: "kubernetes.io/service-name" = "mysvc" + // - One Endpoint with TargetRef.Kind "Pod", Name "workload-69dww", Namespace "testns" + // - Endpoint.Addresses: ["1.2.3.4"] + // - One Port with value 80. + slice := createTestEndpointSlice("uid-1", "testns", "mysvc", "workload-69dww", []string{"1.2.3.4"}, []int32{80}) + + // Call the add handler. + watcher.handleSliceAdd(slice) + + // The dummy inferWorkloadName returns "workload", so full workload becomes "workload@testns" + expectedVal := "workload@testns" + + // We expect the following keys: + // - For the endpoint: "1.2.3.4" and "1.2.3.4:80" + // - From the service label: "mysvc@testns" + var expectedIPKeys = []string{"1.2.3.4", "1.2.3.4:80"} + var expectedSvcKeys = []string{"mysvc@testns"} + + // Verify ipToWorkload. + for _, key := range expectedIPKeys { + val, ok := watcher.ipToWorkload.Load(key) + assert.True(t, ok, "expected ipToWorkload key %s", key) + assert.Equal(t, expectedVal, val, "ipToWorkload[%s] mismatch", key) + } + + // Verify serviceToWorkload. + for _, key := range expectedSvcKeys { + val, ok := watcher.serviceToWorkload.Load(key) + assert.True(t, ok, "expected serviceToWorkload key %s", key) + assert.Equal(t, expectedVal, val, "serviceToWorkload[%s] mismatch", key) + } + + // Verify that sliceToKeysMap recorded all keys. + val, ok := watcher.sliceToKeysMap.Load(string(slice.UID)) + assert.True(t, ok, "expected sliceToKeysMap to contain UID %s", slice.UID) + keysIface := val.([]string) + // Sort for comparison. + sort.Strings(keysIface) + allExpected := append(expectedIPKeys, expectedSvcKeys...) + sort.Strings(allExpected) + assert.Equal(t, allExpected, keysIface, "sliceToKeysMap keys mismatch") +} + +// TestEndpointSliceDeletion verifies that when an EndpointSlice is deleted, +// all keys that were added are removed. +func TestEndpointSliceDeletion(t *testing.T) { + watcher := newEndpointSliceWatcherForTest() + + // Create a test EndpointSlice (same as addition test). + slice := createTestEndpointSlice("uid-1", "testns", "mysvc", "workload-76977669dc-lwx64", []string{"1.2.3.4"}, []int32{80}) + watcher.handleSliceAdd(slice) + + // Now call deletion. + watcher.handleSliceDelete(slice) + + // Verify that the keys are removed from ipToWorkload. + removedKeys := []string{"1.2.3.4", "1.2.3.4:80", "mysvc@testns"} + for _, key := range removedKeys { + _, ok := watcher.ipToWorkload.Load(key) + _, okSvc := watcher.serviceToWorkload.Load(key) + assert.False(t, ok, "expected ipToWorkload key %s to be deleted", key) + assert.False(t, okSvc, "expected serviceToWorkload key %s to be deleted", key) + } + + // Also verify that sliceToKeysMap no longer contains an entry. + _, ok := watcher.sliceToKeysMap.Load(string(slice.UID)) + assert.False(t, ok, "expected sliceToKeysMap entry for UID %s to be deleted", slice.UID) +} + +// TestEndpointSliceUpdate verifies that on updates, keys are added and/or removed as appropriate. +func TestEndpointSliceUpdate(t *testing.T) { + // --- Subtest: Complete change (no overlap) --- + t.Run("complete change", func(t *testing.T) { + watcher := newEndpointSliceWatcherForTest() + + // Old slice: + // UID "uid-2", Namespace "testns", svc label "mysvc", + // One endpoint with TargetRef Name "workload-75d9d5968d-fx8px", Addresses ["1.2.3.4"], Port 80. + oldSlice := createTestEndpointSlice("uid-2", "testns", "mysvc", "workload-75d9d5968d-fx8px", []string{"1.2.3.4"}, []int32{80}) + watcher.handleSliceAdd(oldSlice) + + // New slice: same UID, but svc label changed to "othersvc" + // and a different endpoint: TargetRef Name "workload-6d9b7f8597-wbvxn", Addresses ["1.2.3.5"], Port 443. + newSlice := createTestEndpointSlice("uid-2", "testns", "othersvc", "workload-6d9b7f8597-wbvxn", []string{"1.2.3.5"}, []int32{443}) + + // Call update handler. + watcher.handleSliceUpdate(oldSlice, newSlice) + + expectedVal := "workload@testns" + + // Old keys that should be removed: + // "1.2.3.4" and "1.2.3.4:80" and service key "mysvc@testns" + removedKeys := []string{"1.2.3.4", "1.2.3.4:80", "mysvc@testns"} + for _, key := range removedKeys { + _, ok := watcher.ipToWorkload.Load(key) + _, okSvc := watcher.serviceToWorkload.Load(key) + assert.False(t, ok, "expected ipToWorkload key %s to be removed", key) + assert.False(t, okSvc, "expected serviceToWorkload key %s to be removed", key) + } + + // New keys that should be added: + // "1.2.3.5", "1.2.3.5:443", and service key "othersvc@testns" + addedKeys := []string{"1.2.3.5", "1.2.3.5:443", "othersvc@testns"} + for _, key := range addedKeys { + var val interface{} + var ok bool + // For service key, check serviceToWorkload; for others, check ipToWorkload. + if key == "othersvc@testns" { + val, ok = watcher.serviceToWorkload.Load(key) + } else { + val, ok = watcher.ipToWorkload.Load(key) + } + assert.True(t, ok, "expected key %s to be added", key) + assert.Equal(t, expectedVal, val, "value for key %s mismatch", key) + } + + // Check that sliceToKeysMap now contains exactly the new keys. + val, ok := watcher.sliceToKeysMap.Load(string(newSlice.UID)) + assert.True(t, ok, "expected sliceToKeysMap entry for UID %s", newSlice.UID) + gotKeys := val.([]string) + sort.Strings(gotKeys) + expectedKeys := []string{"1.2.3.5", "1.2.3.5:443", "othersvc@testns"} + sort.Strings(expectedKeys) + assert.True(t, reflect.DeepEqual(expectedKeys, gotKeys), "sliceToKeysMap keys mismatch, got: %v, want: %v", gotKeys, expectedKeys) + }) + + // --- Subtest: Partial overlap --- + t.Run("partial overlap", func(t *testing.T) { + watcher := newEndpointSliceWatcherForTest() + + // Old slice: UID "uid-3", Namespace "testns", svc label "mysvc", + // with one endpoint: TargetRef "workload-6d9b7f8597-b5l2j", Addresses ["1.2.3.4"], Port 80. + oldSlice := createTestEndpointSlice("uid-3", "testns", "mysvc", "workload-6d9b7f8597-b5l2j", []string{"1.2.3.4"}, []int32{80}) + watcher.handleSliceAdd(oldSlice) + + // New slice: same UID, same svc label ("mysvc") but now two endpoints. + // First endpoint: same as before: Addresses ["1.2.3.4"], Port 80. + // Second endpoint: Addresses ["1.2.3.5"], Port 80. + // (Since svc label remains, the service key "mysvc@testns" remains the same.) + // We expect the new keys to be the union of: + // From first endpoint: "1.2.3.4", "1.2.3.4:80" + // From second endpoint: "1.2.3.5", "1.2.3.5:80" + // And the service key "mysvc@testns". + name := "port-0" + protocol := v1.ProtocolTCP + newSlice := &discv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + UID: "uid-3", // same UID + Namespace: "testns", + Labels: map[string]string{ + "kubernetes.io/service-name": "mysvc", + }, + }, + // Two endpoints. + Endpoints: []discv1.Endpoint{ + { + Addresses: []string{"1.2.3.4"}, + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Name: "workload-6d9b7f8597-b5l2j", + Namespace: "testns", + }, + }, + { + Addresses: []string{"1.2.3.5"}, + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Name: "workload-6d9b7f8597-fx8px", + Namespace: "testns", + }, + }, + }, + // Single port: 80. + Ports: []discv1.EndpointPort{ + { + Name: &name, + Protocol: &protocol, + Port: func() *int32 { p := int32(80); return &p }(), + }, + }, + } + + // Call update handler. + watcher.handleSliceUpdate(oldSlice, newSlice) + + expectedVal := "workload@testns" + // Expected keys now: + // From endpoint 1: "1.2.3.4", "1.2.3.4:80" + // From endpoint 2: "1.2.3.5", "1.2.3.5:80" + // And service key: "mysvc@testns" + expectedKeysIP := []string{"1.2.3.4", "1.2.3.4:80", "1.2.3.5", "1.2.3.5:80"} + expectedKeysSvc := []string{"mysvc@testns"} + + // Verify that all expected keys are present. + for _, key := range expectedKeysIP { + val, ok := watcher.ipToWorkload.Load(key) + assert.True(t, ok, "expected ipToWorkload key %s", key) + assert.Equal(t, expectedVal, val, "ipToWorkload[%s] mismatch", key) + } + for _, key := range expectedKeysSvc { + val, ok := watcher.serviceToWorkload.Load(key) + assert.True(t, ok, "expected serviceToWorkload key %s", key) + assert.Equal(t, expectedVal, val, "serviceToWorkload[%s] mismatch", key) + } + + // And check that sliceToKeysMap contains the union of the keys. + val, ok := watcher.sliceToKeysMap.Load("uid-3") + assert.True(t, ok, "expected sliceToKeysMap to contain uid-3") + gotKeys := val.([]string) + allExpected := append(expectedKeysIP, expectedKeysSvc...) + sort.Strings(gotKeys) + sort.Strings(allExpected) + assert.True(t, reflect.DeepEqual(allExpected, gotKeys), "sliceToKeysMap keys mismatch, got: %v, want: %v", gotKeys, allExpected) + }) +} diff --git a/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes.go b/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes.go index 262c61cf64..f97be9bc3d 100644 --- a/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes.go +++ b/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "math/rand" + "os" "sync" "time" @@ -39,18 +40,30 @@ const ( ) type kubernetesResolver struct { - logger *zap.Logger - clientset kubernetes.Interface - clusterName string - platformCode string - ipToPod *sync.Map - podToWorkloadAndNamespace *sync.Map + logger *zap.Logger + clientset kubernetes.Interface + clusterName string + platformCode string + // if ListPod api is used, the following maps are needed + ipToPod *sync.Map + podToWorkloadAndNamespace *sync.Map + workloadAndNamespaceToLabels *sync.Map + workloadPodCount map[string]int + + // if ListEndpointSlice api is used, the following maps are needed + ipToWorkloadAndNamespace *sync.Map + + // if ListService api is used, the following maps are needed ipToServiceAndNamespace *sync.Map serviceAndNamespaceToSelectors *sync.Map - workloadAndNamespaceToLabels *sync.Map - serviceToWorkload *sync.Map // computed from serviceAndNamespaceToSelectors and workloadAndNamespaceToLabels every 1 min - workloadPodCount map[string]int - safeStopCh *safeChannel // trace and metric processors share the same kubernetesResolver and might close the same channel separately + + // if ListPod and ListService apis are used, the serviceToWorkload map is computed by ServiceToWorkloadMapper + // from serviceAndNamespaceToSelectors and workloadAndNamespaceToLabels every 1 min + // if ListEndpointSlice is used, we can get serviceToWorkload directly from endpointSlice watcher + serviceToWorkload *sync.Map // + + safeStopCh *safeChannel // trace and metric processors share the same kubernetesResolver and might close the same channel separately + useListPod bool } var ( @@ -78,47 +91,89 @@ func getKubernetesResolver(platformCode, clusterName string, logger *zap.Logger) // jitter calls to the kubernetes api jitterSleep(jitterKubernetesAPISeconds) - sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0) - podInformer := sharedInformerFactory.Core().V1().Pods().Informer() - err = podInformer.SetTransform(minimizePod) - if err != nil { - logger.Error("failed to minimize Pod objects", zap.Error(err)) - } - serviceInformer := sharedInformerFactory.Core().V1().Services().Informer() - err = serviceInformer.SetTransform(minimizeService) - if err != nil { - logger.Error("failed to minimize Service objects", zap.Error(err)) - } + useListPod := (os.Getenv("USE_LIST_POD") == "true") + + if useListPod { + sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0) + podInformer := sharedInformerFactory.Core().V1().Pods().Informer() + err = podInformer.SetTransform(minimizePod) + if err != nil { + logger.Error("failed to minimize Pod objects", zap.Error(err)) + } + serviceInformer := sharedInformerFactory.Core().V1().Services().Informer() + err = serviceInformer.SetTransform(minimizeService) + if err != nil { + logger.Error("failed to minimize Service objects", zap.Error(err)) + } + + timedDeleter := &TimedDeleter{Delay: deletionDelay} + poWatcher := newPodWatcher(logger, podInformer, timedDeleter) + svcWatcher := newServiceWatcher(logger, serviceInformer, timedDeleter) + + safeStopCh := &safeChannel{ch: make(chan struct{}), closed: false} + // initialize the pod and service watchers for the cluster + poWatcher.run(safeStopCh.ch) + svcWatcher.Run(safeStopCh.ch) + // wait for caches to sync (for once) so that clients knows about the pods and services in the cluster + poWatcher.waitForCacheSync(safeStopCh.ch) + svcWatcher.waitForCacheSync(safeStopCh.ch) + + serviceToWorkload := &sync.Map{} + svcToWorkloadMapper := newServiceToWorkloadMapper(svcWatcher.serviceAndNamespaceToSelectors, poWatcher.workloadAndNamespaceToLabels, serviceToWorkload, logger, timedDeleter) + svcToWorkloadMapper.Start(safeStopCh.ch) + + instance = &kubernetesResolver{ + logger: logger, + clientset: clientset, + clusterName: clusterName, + platformCode: platformCode, + ipToServiceAndNamespace: svcWatcher.ipToServiceAndNamespace, + serviceAndNamespaceToSelectors: svcWatcher.serviceAndNamespaceToSelectors, + ipToPod: poWatcher.ipToPod, + podToWorkloadAndNamespace: poWatcher.podToWorkloadAndNamespace, + workloadAndNamespaceToLabels: poWatcher.workloadAndNamespaceToLabels, + serviceToWorkload: serviceToWorkload, + workloadPodCount: poWatcher.workloadPodCount, + ipToWorkloadAndNamespace: nil, + safeStopCh: safeStopCh, + useListPod: useListPod, + } + } else { + sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0) + serviceInformer := sharedInformerFactory.Core().V1().Services().Informer() + err = serviceInformer.SetTransform(minimizeService) + if err != nil { + logger.Error("failed to minimize Service objects", zap.Error(err)) + } + + timedDeleter := &TimedDeleter{Delay: deletionDelay} + svcWatcher := newServiceWatcher(logger, serviceInformer, timedDeleter) + + endptSliceWatcher := newEndpointSliceWatcher(logger, sharedInformerFactory, timedDeleter) + safeStopCh := &safeChannel{ch: make(chan struct{}), closed: false} + // initialize the pod and service watchers for the cluster + svcWatcher.Run(safeStopCh.ch) + endptSliceWatcher.Run(safeStopCh.ch) + // wait for caches to sync (for once) so that clients knows about the pods and services in the cluster + svcWatcher.waitForCacheSync(safeStopCh.ch) + endptSliceWatcher.waitForCacheSync(safeStopCh.ch) + + instance = &kubernetesResolver{ + logger: logger, + clientset: clientset, + clusterName: clusterName, + platformCode: platformCode, + ipToWorkloadAndNamespace: endptSliceWatcher.ipToWorkload, // endpointSlice provides pod IP → workload mapping + ipToPod: nil, + podToWorkloadAndNamespace: nil, + workloadAndNamespaceToLabels: nil, + workloadPodCount: nil, + ipToServiceAndNamespace: svcWatcher.ipToServiceAndNamespace, + serviceToWorkload: endptSliceWatcher.serviceToWorkload, // endpointSlice also provides service → workload mapping + safeStopCh: safeStopCh, + useListPod: useListPod, + } - timedDeleter := &TimedDeleter{Delay: deletionDelay} - poWatcher := newPodWatcher(logger, podInformer, timedDeleter) - svcWatcher := newServiceWatcher(logger, serviceInformer, timedDeleter) - - safeStopCh := &safeChannel{ch: make(chan struct{}), closed: false} - // initialize the pod and service watchers for the cluster - poWatcher.run(safeStopCh.ch) - svcWatcher.Run(safeStopCh.ch) - // wait for caches to sync (for once) so that clients knows about the pods and services in the cluster - poWatcher.waitForCacheSync(safeStopCh.ch) - svcWatcher.waitForCacheSync(safeStopCh.ch) - - serviceToWorkload := &sync.Map{} - svcToWorkloadMapper := newServiceToWorkloadMapper(svcWatcher.serviceAndNamespaceToSelectors, poWatcher.workloadAndNamespaceToLabels, serviceToWorkload, logger, timedDeleter) - svcToWorkloadMapper.Start(safeStopCh.ch) - - instance = &kubernetesResolver{ - logger: logger, - clientset: clientset, - clusterName: clusterName, - platformCode: platformCode, - ipToServiceAndNamespace: svcWatcher.ipToServiceAndNamespace, - serviceAndNamespaceToSelectors: svcWatcher.serviceAndNamespaceToSelectors, - ipToPod: poWatcher.ipToPod, - podToWorkloadAndNamespace: poWatcher.podToWorkloadAndNamespace, - workloadAndNamespaceToLabels: poWatcher.workloadAndNamespaceToLabels, - serviceToWorkload: serviceToWorkload, - workloadPodCount: poWatcher.workloadPodCount, - safeStopCh: safeStopCh, } }) @@ -133,9 +188,19 @@ func (e *kubernetesResolver) Stop(_ context.Context) error { // add a method to kubernetesResolver func (e *kubernetesResolver) getWorkloadAndNamespaceByIP(ip string) (string, string, error) { var workload, namespace string - if podKey, ok := e.ipToPod.Load(ip); ok { - pod := podKey.(string) - if workloadKey, ok := e.podToWorkloadAndNamespace.Load(pod); ok { + + if e.useListPod { + // use results from pod watcher + if podKey, ok := e.ipToPod.Load(ip); ok { + pod := podKey.(string) + if workloadKey, ok := e.podToWorkloadAndNamespace.Load(pod); ok { + workload, namespace = extractResourceAndNamespace(workloadKey.(string)) + return workload, namespace, nil + } + } + } else { + // use results from endpoint slice watcher + if workloadKey, ok := e.ipToWorkloadAndNamespace.Load(ip); ok { workload, namespace = extractResourceAndNamespace(workloadKey.(string)) return workload, namespace, nil } @@ -148,7 +213,6 @@ func (e *kubernetesResolver) getWorkloadAndNamespaceByIP(ip string) (string, str return workload, namespace, nil } } - return "", "", errors.New("no kubernetes workload found for ip: " + ip) } diff --git a/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes_test.go b/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes_test.go index 65728bd605..404aa94b50 100644 --- a/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes_test.go +++ b/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes_test.go @@ -42,6 +42,7 @@ func TestEksResolver(t *testing.T) { podToWorkloadAndNamespace: &sync.Map{}, ipToServiceAndNamespace: &sync.Map{}, serviceToWorkload: &sync.Map{}, + useListPod: true, } ip := "1.2.3.4" @@ -117,6 +118,7 @@ func TestEksResolver(t *testing.T) { podToWorkloadAndNamespace: &sync.Map{}, ipToServiceAndNamespace: &sync.Map{}, serviceToWorkload: &sync.Map{}, + useListPod: true, } // Test case 1: "aws.remote.service" contains IP:Port diff --git a/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes_utils.go b/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes_utils.go index 11bc68a3b0..5481a335d2 100644 --- a/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes_utils.go +++ b/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes_utils.go @@ -32,10 +32,17 @@ var ( // Alphanumeric Mapping: https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apimachinery/pkg/util/rand/rand.go#L121) replicaSetWithDeploymentNamePattern = fmt.Sprintf(`^(.+)-[%s]{6,10}$`, kubeAllowedStringAlphaNums) deploymentFromReplicaSetPattern = regexp.MustCompile(replicaSetWithDeploymentNamePattern) - // if a pod is launched directly by a replicaSet (with a given name by users), its name has the following pattern: + // if a pod is launched directly by a replicaSet or daemonSet (with a given name by users), its name has the following pattern: // Pod name = ReplicaSet name + 5 alphanumeric characters long string - podWithReplicaSetNamePattern = fmt.Sprintf(`^(.+)-[%s]{5}$`, kubeAllowedStringAlphaNums) - replicaSetFromPodPattern = regexp.MustCompile(podWithReplicaSetNamePattern) + // some code reference for daemon set: + // 1. daemonset uses the strategy to create pods: https://github.com/kubernetes/kubernetes/blob/82e3a671e79d1740ab9a3b3fac8a3bb7d065a6fb/pkg/registry/apps/daemonset/strategy.go#L46 + // 2. the strategy uses SimpleNameGenerator to create names: https://github.com/kubernetes/kubernetes/blob/82e3a671e79d1740ab9a3b3fac8a3bb7d065a6fb/staging/src/k8s.io/apiserver/pkg/storage/names/generate.go#L53 + // 3. the random name generator only use non vowels char + numbers: https://github.com/kubernetes/kubernetes/blob/82e3a671e79d1740ab9a3b3fac8a3bb7d065a6fb/staging/src/k8s.io/apimachinery/pkg/util/rand/rand.go#L83 + podWithSuffixPattern = fmt.Sprintf(`^(.+)-[%s]{5}$`, kubeAllowedStringAlphaNums) + replicaSetOrDaemonSetFromPodPattern = regexp.MustCompile(podWithSuffixPattern) + + // Pattern for StatefulSet: - + reStatefulSet = regexp.MustCompile(`^(.+)-(\d+)$`) ) func attachNamespace(resourceName, namespace string) string { @@ -66,7 +73,7 @@ func extractWorkloadNameFromRS(replicaSetName string) (string, error) { } func extractWorkloadNameFromPodName(podName string) (string, error) { - match := replicaSetFromPodPattern.FindStringSubmatch(podName) + match := replicaSetOrDaemonSetFromPodPattern.FindStringSubmatch(podName) if match != nil { return match[1], nil } @@ -101,6 +108,40 @@ func getWorkloadAndNamespace(pod *corev1.Pod) string { return workloadAndNamespace } +// InferWorkloadName tries to parse the given podName to find the top-level workload name. +// +// 1) If it matches -, return . +// 2) If it matches -<5charSuffix>: +// - If is -<6–10charSuffix>, return . +// - Else return (likely a bare ReplicaSet or DaemonSet). +// +// 3) If no pattern matches, return the original podName. +// +// Caveat: You can't reliably distinguish DaemonSet vs. bare ReplicaSet by name alone. +func inferWorkloadName(podName string) string { + // 1) Check if it's a StatefulSet pod: - + if matches := reStatefulSet.FindStringSubmatch(podName); matches != nil { + return matches[1] // e.g. "mysql-0" => "mysql" + } + + // 2) Check if it's a Pod with a 5-char random suffix: -<5Chars> + if matches := replicaSetOrDaemonSetFromPodPattern.FindStringSubmatch(podName); matches != nil { + parentName := matches[1] + + // If parentName ends with 6–10 random chars, that parent is a Deployment-based ReplicaSet. + // So the top-level workload is the first part before that suffix. + if rsMatches := deploymentFromReplicaSetPattern.FindStringSubmatch(parentName); rsMatches != nil { + return rsMatches[1] // e.g. "nginx-a2b3c4" => "nginx" + } + + // Otherwise, it's a "bare" ReplicaSet or DaemonSet—just return parentName. + return parentName + } + + // 3) If none of the patterns matched, return the entire podName. + return podName +} + const IP_PORT_PATTERN = `^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}):(\d+)$` var ipPortRegex = regexp.MustCompile(IP_PORT_PATTERN) diff --git a/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes_utils_test.go b/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes_utils_test.go index f8552aedab..0102338814 100644 --- a/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes_utils_test.go +++ b/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes_utils_test.go @@ -227,3 +227,32 @@ func TestExtractIPPort(t *testing.T) { assert.Equal(t, "", port) assert.False(t, ok) } + +func TestInferWorkloadName(t *testing.T) { + testCases := []struct { + name string + input string + expected string + }{ + {"StatefulSet single digit", "mysql-0", "mysql"}, + {"StatefulSet multiple digits", "mysql-10", "mysql"}, + {"ReplicaSet bare pod", "nginx-b2dfg", "nginx"}, + {"Deployment-based ReplicaSet pod", "nginx-76977669dc-lwx64", "nginx"}, + {"Non matching", "simplepod", "simplepod"}, + {"ReplicaSet name with number suffix", "nginx-123-d9stt", "nginx-123"}, + // in this case, the correct value should be "nginx-123456", but unfortunately the workload name matching the pattern, so we will get "nginx" instead + // however we think it's an edge case that we have to live with because we don't have any extra info for the pod to tell us the correct workload name + {"Some confusing case with a replicaSet/daemonset name matching the pattern", "nginx-245678-d9stt", "nginx"}, + {"Some confusing case with a replicaSet/daemonset name not matching the pattern", "nginx-123456-d9stt", "nginx-123456"}, + {"Empty", "", ""}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + got := inferWorkloadName(tc.input) + if got != tc.expected { + t.Errorf("inferWorkloadName(%q) = %q; expected %q", tc.input, got, tc.expected) + } + }) + } +}