Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite the onUpdate and delete method of pod #245

Merged
merged 7 commits into from
Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions collector/metadata/kubernetes/k8scache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ type K8sContainerInfo struct {
}

type K8sPodInfo struct {
Ip string
PodName string
Ip string
PodName string
Ports []int32
HostPorts []int32
ContainerIds []string
Labels map[string]string
// TODO: There may be multiple kinds of workload or services for the same pod
WorkloadKind string
WorkloadName string
Expand Down
54 changes: 30 additions & 24 deletions collector/metadata/kubernetes/pod_delete.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
// Copyright 2020 OpenTelemetry Authors
// Source: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/processor/k8sattributesprocessor/internal/kube/client.go
// Modification: Use deletedPodInfo as deleted elements and delete our cache map as needed.

package kubernetes

import (
"sync"
"time"

corev1 "k8s.io/api/core/v1"
)

var (
Expand All @@ -15,8 +15,18 @@ var (
)

type deleteRequest struct {
pod *corev1.Pod
ts time.Time
podInfo *deletedPodInfo
ts time.Time
}

type deletedPodInfo struct {
name string
namespace string
containerIds []string
ip string
ports []int32
hostIp string
hostPorts []int32
}

// deleteLoop deletes pods from cache periodically.
Expand All @@ -40,7 +50,7 @@ func podDeleteLoop(interval time.Duration, gracePeriod time.Duration, stopCh cha
podDeleteQueue = podDeleteQueue[cutoff:]
podDeleteQueueMut.Unlock()
for _, d := range toDelete {
deletePod(d.pod)
deletePodInfo(d.podInfo)
}

case <-stopCh:
Expand All @@ -49,28 +59,24 @@ func podDeleteLoop(interval time.Duration, gracePeriod time.Duration, stopCh cha
}
}

func deletePod(pod *corev1.Pod) {
for i := 0; i < len(pod.Status.ContainerStatuses); i++ {
containerId := pod.Status.ContainerStatuses[i].ContainerID
realContainerId := TruncateContainerId(containerId)
if realContainerId == "" {
continue
}
MetaDataCache.DeleteByContainerId(realContainerId)
func deletePodInfo(podInfo *deletedPodInfo) {
if podInfo.name != "" {
globalPodInfo.delete(podInfo.namespace, podInfo.name)
}

for _, container := range pod.Spec.Containers {
if len(container.Ports) == 0 {
MetaDataCache.DeleteContainerByIpPort(pod.Status.PodIP, 0)
continue
if len(podInfo.containerIds) != 0 {
for i := 0; i < len(podInfo.containerIds); i++ {
MetaDataCache.DeleteByContainerId(podInfo.containerIds[i])
}
for _, port := range container.Ports {
}
if podInfo.ip != "" && len(podInfo.ports) != 0 {
for _, port := range podInfo.ports {
// Assume that PodIP:Port can't be reused in a few seconds
MetaDataCache.DeleteContainerByIpPort(pod.Status.PodIP, uint32(port.ContainerPort))
// If hostPort is specified, add the container using HostIP and HostPort
if port.HostPort != 0 {
MetaDataCache.DeleteContainerByHostIpPort(pod.Status.HostIP, uint32(port.HostPort))
}
MetaDataCache.DeleteContainerByIpPort(podInfo.ip, uint32(port))
}
}
if podInfo.hostIp != "" && len(podInfo.hostPorts) != 0 {
for _, port := range podInfo.hostPorts {
MetaDataCache.DeleteContainerByHostIpPort(podInfo.hostIp, uint32(port))
}
}
}
14 changes: 5 additions & 9 deletions collector/metadata/kubernetes/pod_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,12 @@ import (
func TestDeleteLoop(t *testing.T) {
pod := CreatePod(true)
onAdd(pod)
_, ok := globalPodInfo.get("CustomNamespace", "deploy-1a2b3c4d-5e6f7")
if !ok {
t.Fatalf("Finding pod at globalPodInfo. Expect %v, but get %v", true, ok)
}
verifyIfPodExist(true, t)
if len(podDeleteQueue) != 0 {
t.Fatalf("PodDeleteQueue should be 0, but is %d", len(podDeleteQueue))
}

onDelete(pod)
_, ok = globalPodInfo.get("CustomNamespace", "deploy-1a2b3c4d-5e6f7")
if ok {
t.Fatalf("Finding pod at globalPodInfo. Expect %v, but get %v", false, ok)
}
verifyIfPodExist(true, t)
if len(podDeleteQueue) != 1 {
t.Fatalf("PodDeleteQueue should be 1, but is %d", len(podDeleteQueue))
Expand Down Expand Up @@ -52,7 +44,11 @@ func TestDeleteLoop(t *testing.T) {
}

func verifyIfPodExist(exist bool, t *testing.T) {
_, ok := MetaDataCache.GetByContainerId("1a2b3c4d5e6f")
_, ok := globalPodInfo.get("CustomNamespace", "deploy-1a2b3c4d-5e6f7")
if ok != exist {
t.Fatalf("Finding pod at globalPodInfo. Expect %v, but get %v", false, ok)
}
_, ok = MetaDataCache.GetByContainerId("1a2b3c4d5e6f")
if ok != exist {
t.Errorf("Finding container using containerid. Expect %v, but get %v", exist, ok)
}
Expand Down
Loading