diff --git a/CHANGELOG.md b/CHANGELOG.md index 5646911006..f7ecb1b82d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - fix(k8sprocessor): delay deleting the metadata from owner resources [#1242] +- fix(k8sprocessor): handle missed k8s resource deletions correctly [#1277] [#1230]: https://github.com/SumoLogic/sumologic-otel-collector/pull/1230 [#1242]: https://github.com/SumoLogic/sumologic-otel-collector/pull/1242 diff --git a/pkg/processor/k8sprocessor/kube/client.go b/pkg/processor/k8sprocessor/kube/client.go index d125239e54..2ab729aa6c 100644 --- a/pkg/processor/k8sprocessor/kube/client.go +++ b/pkg/processor/k8sprocessor/kube/client.go @@ -193,11 +193,28 @@ func (c *WatchClient) handlePodUpdate(old, new interface{}) { func (c *WatchClient) handlePodDelete(obj interface{}) { observability.RecordPodDeleted() - if pod, ok := obj.(*api_v1.Pod); ok { - c.forgetPod(pod) - } else { + + var pod *api_v1.Pod + + switch obj := obj.(type) { + case *api_v1.Pod: + pod = obj + case cache.DeletedFinalStateUnknown: + prev, ok := obj.Obj.(*api_v1.Pod) + if !ok { + c.logger.Error( + "object received was DeletedFinalStateUnknown but did not contain api_v1.Pod", + zap.Any("received", obj), + ) + return + } + pod = prev + default: c.logger.Error("object received was not of type api_v1.Pod", zap.Any("received", obj)) + return } + + c.forgetPod(pod) } func (c *WatchClient) deleteLoop(interval time.Duration, gracePeriod time.Duration) { diff --git a/pkg/processor/k8sprocessor/kube/client_test.go b/pkg/processor/k8sprocessor/kube/client_test.go index dd98dae10d..dddaf5b62c 100644 --- a/pkg/processor/k8sprocessor/kube/client_test.go +++ b/pkg/processor/k8sprocessor/kube/client_test.go @@ -37,6 +37,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" ) @@ -1233,6 +1234,36 @@ func Test_PodsGetAddedAndDeletedFromCache(t *testing.T) { require.NoError(t, err) eventuallyNPodsInCache(t, 0) }) + + t.Run("with deleted final state unknown", func(t *testing.T) { + pod := &api_v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-pod", + Namespace: namespace, + UID: "f15f0585-a0bc-43a3-96e4-dd2eace75392", + }, + } + + _, err = c.kc.CoreV1().Pods(namespace). + Create(context.Background(), pod, metav1.CreateOptions{}) + require.NoError(t, err) + eventuallyNPodsInCache(t, 2) + + // Rather than set up a stub Informer just for this case, bypass the + // informer + fake k8s client entirely. Manually call the delete + // handler with DeletedFinalStateUnknown. + c.handlePodDelete(cache.DeletedFinalStateUnknown{ + Key: fmt.Sprintf("%s/my-pod", namespace), + Obj: pod, + }) + defer func() { + err = c.kc.CoreV1().Pods(namespace). + Delete(context.Background(), pod.Name, metav1.DeleteOptions{}) + require.NoError(t, err) + }() + + eventuallyNPodsInCache(t, 0) + }) } func newTestClientWithRulesAndFilters(t *testing.T, e ExtractionRules, f Filters) (*WatchClient, *observer.ObservedLogs) { diff --git a/pkg/processor/k8sprocessor/kube/owner.go b/pkg/processor/k8sprocessor/kube/owner.go index 5e39d2a77f..b2c6550e15 100644 --- a/pkg/processor/k8sprocessor/kube/owner.go +++ b/pkg/processor/k8sprocessor/kube/owner.go @@ -254,9 +254,28 @@ func (op *OwnerCache) upsertNamespace(obj interface{}) { } func (op *OwnerCache) deleteNamespace(obj interface{}) { - namespace := obj.(*api_v1.Namespace) + var ns *api_v1.Namespace + + switch obj := obj.(type) { + case *api_v1.Namespace: + ns = obj + case cache.DeletedFinalStateUnknown: + prev, ok := obj.Obj.(*api_v1.Namespace) + if !ok { + op.logger.Error( + "object received was DeletedFinalStateUnknown but did not contain api_v1.Namespace", + zap.Any("received", obj), + ) + return + } + ns = prev + default: + op.logger.Error("object received was not of type api_v1.Namespace", zap.Any("received", obj)) + return + } + op.nsMutex.Lock() - delete(op.namespaces, namespace.Name) + delete(op.namespaces, ns.Name) op.nsMutex.Unlock() } @@ -326,8 +345,31 @@ func (op *OwnerCache) addOwnerInformer( } func (op *OwnerCache) deleteObject(obj interface{}) { + var metaObj meta_v1.Object + + switch obj := obj.(type) { + case meta_v1.Object: + metaObj = obj + case cache.DeletedFinalStateUnknown: + prev, ok := obj.Obj.(meta_v1.Object) + if !ok { + op.logger.Error( + "object received was DeletedFinalStateUnknown but did not contain meta_v1.Object", + zap.Any("received", obj), + ) + return + } + metaObj = prev + default: + op.logger.Error( + "object received was not of type meta_v1.Object", + zap.Any("received", obj), + ) + return + } + op.ownersMutex.Lock() - delete(op.objectOwners, string(obj.(meta_v1.Object).GetUID())) + delete(op.objectOwners, string(metaObj.GetUID())) op.ownersMutex.Unlock() } @@ -410,7 +452,28 @@ func (op *OwnerCache) deleteEndpointFromPod(pod string, endpoint string) { } func (op *OwnerCache) genericEndpointOp(obj interface{}, endpointFunc func(pod string, endpoint string)) { - ep := obj.(*api_v1.Endpoints) + var ep *api_v1.Endpoints + + switch obj := obj.(type) { + case *api_v1.Endpoints: + ep = obj + case cache.DeletedFinalStateUnknown: + prev, ok := obj.Obj.(*api_v1.Endpoints) + if !ok { + op.logger.Error( + "object received was DeletedFinalStateUnknown but did not contain api_v1.Endpoints", + zap.Any("received", obj), + ) + return + } + ep = prev + default: + op.logger.Error( + "object received was not of type api_v1.Endpoints", + zap.Any("received", obj), + ) + return + } for _, it := range ep.Subsets { for _, addr := range it.Addresses {