Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/k8s_cluster] Do not store unused service data in k8s API cache #23434

Merged
merged 1 commit into from
Jun 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions receiver/k8sclusterreceiver/informer_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/node"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/pod"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/replicaset"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/service"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/statefulset"
)

Expand All @@ -35,6 +36,8 @@ func transformObject(object interface{}) (interface{}, error) {
return demonset.Transform(o), nil
case *appsv1.StatefulSet:
return statefulset.Transform(o), nil
case *corev1.Service:
return service.Transform(o), nil
}
return object, nil
}
19 changes: 19 additions & 0 deletions receiver/k8sclusterreceiver/informer_transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,25 @@ func TestTransformObject(t *testing.T) {
},
same: false,
},
{
name: "service",
object: &corev1.Service{
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"app": "my-app",
},
Type: corev1.ServiceTypeClusterIP,
},
},
want: &corev1.Service{
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"app": "my-app",
},
},
},
same: false,
},
{
// This is a case where we don't transform the object.
name: "hpa",
Expand Down
20 changes: 2 additions & 18 deletions receiver/k8sclusterreceiver/internal/pod/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package pod // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/pod"

import (
"fmt"
"strings"
"time"

Expand All @@ -17,7 +16,6 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"

Expand All @@ -26,6 +24,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/container"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/service"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/utils"
)

Expand Down Expand Up @@ -184,7 +183,7 @@ func GetMetadata(pod *corev1.Pod, mc *metadata.Store, logger *zap.Logger) map[ex
}

if mc.Services != nil {
meta = maps.MergeStringMaps(meta, getPodServiceTags(pod, mc.Services))
meta = maps.MergeStringMaps(meta, service.GetPodServiceTags(pod, mc.Services))
}

if mc.Jobs != nil {
Expand Down Expand Up @@ -268,21 +267,6 @@ func logError(err error, ref *v1.OwnerReference, podUID types.UID, logger *zap.L
)
}

// getPodServiceTags returns a set of services associated with the pod.
func getPodServiceTags(pod *corev1.Pod, services cache.Store) map[string]string {
properties := map[string]string{}

for _, ser := range services.List() {
serObj := ser.(*corev1.Service)
if serObj.Namespace == pod.Namespace &&
labels.Set(serObj.Spec.Selector).AsSelectorPreValidated().Matches(labels.Set(pod.Labels)) {
properties[fmt.Sprintf("%s%s", constants.K8sServicePrefix, serObj.Name)] = ""
}
}

return properties
}

// getWorkloadProperties returns workload metadata for provided owner reference.
func getWorkloadProperties(ref *v1.OwnerReference, labelKey string) map[string]string {
uidKey := metadata.GetOTelUIDFromKind(strings.ToLower(ref.Kind))
Expand Down
40 changes: 40 additions & 0 deletions receiver/k8sclusterreceiver/internal/service/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package service // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/service"
import (
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata"
)

// Transform transforms the pod to remove the fields that we don't use to reduce RAM utilization.
// IMPORTANT: Make sure to update this function before using new service fields.
func Transform(service *corev1.Service) *corev1.Service {
return &corev1.Service{
ObjectMeta: metadata.TransformObjectMeta(service.ObjectMeta),
Spec: corev1.ServiceSpec{
Selector: service.Spec.Selector,
},
}
}

// GetPodServiceTags returns a set of services associated with the pod.
func GetPodServiceTags(pod *corev1.Pod, services cache.Store) map[string]string {
properties := map[string]string{}

for _, ser := range services.List() {
serObj := ser.(*corev1.Service)
if serObj.Namespace == pod.Namespace &&
labels.Set(serObj.Spec.Selector).AsSelectorPreValidated().Matches(labels.Set(pod.Labels)) {
properties[fmt.Sprintf("%s%s", constants.K8sServicePrefix, serObj.Name)] = ""
}
}

return properties
}
55 changes: 55 additions & 0 deletions receiver/k8sclusterreceiver/internal/service/service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package service

import (
"testing"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestTransform(t *testing.T) {
originalService := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "my-service",
Namespace: "default",
Labels: map[string]string{
"app": "my-app",
},
Annotations: map[string]string{
"annotation1": "value1",
},
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"app": "my-app",
},
Ports: []corev1.ServicePort{
{
Name: "http",
Port: 80,
Protocol: corev1.ProtocolTCP,
},
},
Type: corev1.ServiceTypeClusterIP,
},
}
wantService := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "my-service",
Namespace: "default",
Labels: map[string]string{
"app": "my-app",
},
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"app": "my-app",
},
},
}
assert.EqualValues(t, wantService, Transform(originalService))
}