Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show pods in component health message #2496

Merged
merged 4 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/pods-in-bridge-message.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. operator, target allocator, github action)
component: bridge

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Sets pods in the component health map

# One or more tracking issues related to the change
issues: [2489]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: this change adds a requirement for a new permission for the bridge to list and get pods.
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ container-push:
container-target-allocator-push:
docker push ${TARGETALLOCATOR_IMG}

.PHONY: container-operator-opamp-bridge-push
container-operator-opamp-bridge-push:
docker push ${OPERATOROPAMPBRIDGE_IMG}

.PHONY: container-target-allocator
container-target-allocator: GOOS = linux
container-target-allocator: targetallocator
Expand Down Expand Up @@ -333,7 +337,7 @@ endif


.PHONY: load-image-operator-opamp-bridge
load-image-operator-opamp-bridge:
load-image-operator-opamp-bridge: container-operator-opamp-bridge
kind load --name $(KIND_CLUSTER_NAME) docker-image ${OPERATOROPAMPBRIDGE_IMG}

.PHONY: cert-manager
Expand Down
70 changes: 61 additions & 9 deletions cmd/operator-opamp-bridge/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"fmt"
"strings"
"time"

"github.com/go-logr/logr"
Expand All @@ -29,6 +30,7 @@ import (
"k8s.io/utils/clock"
"sigs.k8s.io/yaml"

"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
"github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/config"
"github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/metrics"
"github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/operator"
Expand All @@ -37,7 +39,7 @@ import (
type Agent struct {
logger logr.Logger

appliedKeys map[collectorKey]bool
appliedKeys map[kubeResourceKey]bool
clock clock.Clock
startTime uint64
lastHash []byte
Expand Down Expand Up @@ -65,7 +67,7 @@ func NewAgent(logger logr.Logger, applier operator.ConfigApplier, config *config
config: config,
applier: applier,
logger: logger,
appliedKeys: map[collectorKey]bool{},
appliedKeys: map[kubeResourceKey]bool{},
instanceId: config.GetNewInstanceId(),
agentDescription: config.GetDescription(),
remoteConfigEnabled: config.RemoteConfigEnabled(),
Expand All @@ -85,7 +87,7 @@ func NewAgent(logger logr.Logger, applier operator.ConfigApplier, config *config

// getHealth is called every heartbeat interval to report health.
func (agent *Agent) getHealth() *protobufs.ComponentHealth {
healthMap, err := agent.generateComponentHealthMap()
healthMap, err := agent.generateCollectorPoolHealth()
if err != nil {
return &protobufs.ComponentHealth{
Healthy: false,
Expand All @@ -102,20 +104,70 @@ func (agent *Agent) getHealth() *protobufs.ComponentHealth {
}
}

// generateComponentHealthMap allows the bridge to report the status of the collector pools it owns.
// TODO: implement enhanced health messaging.
func (agent *Agent) generateComponentHealthMap() (map[string]*protobufs.ComponentHealth, error) {
// generateCollectorPoolHealth allows the bridge to report the status of the collector pools it owns.
// TODO: implement enhanced health messaging using the collector's new healthcheck extension:
// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/26661
func (agent *Agent) generateCollectorPoolHealth() (map[string]*protobufs.ComponentHealth, error) {
cols, err := agent.applier.ListInstances()
if err != nil {
return nil, err
}
healthMap := map[string]*protobufs.ComponentHealth{}
for _, col := range cols {
key := newCollectorKey(col.GetNamespace(), col.GetName())
key := newKubeResourceKey(col.GetNamespace(), col.GetName())
podMap, err := agent.generateCollectorHealth(agent.getCollectorSelector(col), col.GetNamespace())
if err != nil {
return nil, err
}
healthMap[key.String()] = &protobufs.ComponentHealth{
StartTimeUnixNano: uint64(col.ObjectMeta.GetCreationTimestamp().UnixNano()),
StatusTimeUnixNano: uint64(agent.clock.Now().UnixNano()),
Status: col.Status.Scale.StatusReplicas,
ComponentHealthMap: podMap,
}
}
return healthMap, nil
}

// getCollectorSelector destructures the collectors scale selector if present, if uses the labelmap from the operator.
func (agent *Agent) getCollectorSelector(col v1alpha1.OpenTelemetryCollector) map[string]string {
if len(col.Status.Scale.Selector) > 0 {
selMap := map[string]string{}
for _, kvPair := range strings.Split(col.Status.Scale.Selector, ",") {
kv := strings.Split(kvPair, "=")
// skip malformed pairs
if len(kv) != 2 {
continue
}
selMap[kv[0]] = kv[1]
}
return selMap
}
return map[string]string{
"app.kubernetes.io/managed-by": "opentelemetry-operator",
"app.kubernetes.io/instance": fmt.Sprintf("%s.%s", col.GetNamespace(), col.GetName()),
"app.kubernetes.io/part-of": "opentelemetry",
"app.kubernetes.io/component": "opentelemetry-collector",
}
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved
}

func (agent *Agent) generateCollectorHealth(selectorLabels map[string]string, namespace string) (map[string]*protobufs.ComponentHealth, error) {
pods, err := agent.applier.GetCollectorPods(selectorLabels, namespace)
if err != nil {
return nil, err
}
healthMap := map[string]*protobufs.ComponentHealth{}
for _, item := range pods.Items {
key := newKubeResourceKey(item.GetNamespace(), item.GetName())
healthy := true
if item.Status.Phase != "Running" {
healthy = false
}
healthMap[key.String()] = &protobufs.ComponentHealth{
StartTimeUnixNano: uint64(item.Status.StartTime.UnixNano()),
StatusTimeUnixNano: uint64(agent.clock.Now().UnixNano()),
Status: string(item.Status.Phase),
Healthy: healthy,
}
}
return healthMap, nil
Expand Down Expand Up @@ -232,7 +284,7 @@ func (agent *Agent) getEffectiveConfig(ctx context.Context) (*protobufs.Effectiv
agent.logger.Error(err, "failed to marhsal config")
return nil, err
}
mapKey := newCollectorKey(instance.GetNamespace(), instance.GetName())
mapKey := newKubeResourceKey(instance.GetNamespace(), instance.GetName())
instanceMap[mapKey.String()] = &protobufs.AgentConfigFile{
Body: marshaled,
ContentType: "yaml",
Expand Down Expand Up @@ -277,7 +329,7 @@ func (agent *Agent) applyRemoteConfig(config *protobufs.AgentRemoteConfig) (*pro
if len(key) == 0 || len(file.Body) == 0 {
continue
}
colKey, err := collectorKeyFromKey(key)
colKey, err := kubeResourceFromKey(key)
if err != nil {
multiErr = multierr.Append(multiErr, err)
continue
Expand Down
78 changes: 75 additions & 3 deletions cmd/operator-opamp-bridge/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ import (
"github.com/spf13/pflag"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
testingclock "k8s.io/utils/clock/testing"
runtimeClient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
Expand All @@ -49,9 +51,11 @@ const (
testNamespace = "testnamespace"
testCollectorName = "collector"
otherCollectorName = "other"
thirdCollectorName = "third"
emptyConfigHash = ""
testCollectorKey = testNamespace + "/" + testCollectorName
otherCollectorKey = testNamespace + "/" + otherCollectorName
thirdCollectorKey = otherCollectorName + "/" + thirdCollectorName

agentTestFileName = "testdata/agent.yaml"
agentTestFileHttpName = "testdata/agenthttpbasic.yaml"
Expand All @@ -73,6 +77,32 @@ var (
invalidYamlConfigHash = getConfigHash(testCollectorKey, collectorInvalidFile)
updatedYamlConfigHash = getConfigHash(testCollectorKey, collectorUpdatedFile)
otherUpdatedYamlConfigHash = getConfigHash(otherCollectorKey, collectorUpdatedFile)

podTime = metav1.NewTime(time.UnixMicro(1704748549000000))
mockPodList = &v1.PodList{
TypeMeta: metav1.TypeMeta{
Kind: "PodList",
APIVersion: "v1",
},
Items: []v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: thirdCollectorName + "-1",
Namespace: otherCollectorName,
Labels: map[string]string{
"app.kubernetes.io/managed-by": "opentelemetry-operator",
"app.kubernetes.io/instance": fmt.Sprintf("%s.%s", otherCollectorName, thirdCollectorName),
"app.kubernetes.io/part-of": "opentelemetry",
"app.kubernetes.io/component": "opentelemetry-collector",
},
},
Spec: v1.PodSpec{},
Status: v1.PodStatus{
StartTime: &podTime,
Phase: v1.PodRunning,
},
},
}}
)

func getConfigHash(key, file string) string {
Expand Down Expand Up @@ -130,16 +160,17 @@ func (m *mockOpampClient) SetPackageStatuses(_ *protobufs.PackageStatuses) error
return nil
}

func getFakeApplier(t *testing.T, conf *config.Config) *operator.Client {
func getFakeApplier(t *testing.T, conf *config.Config, lists ...runtimeClient.ObjectList) *operator.Client {
schemeBuilder := runtime.NewSchemeBuilder(func(s *runtime.Scheme) error {
s.AddKnownTypes(v1alpha1.GroupVersion, &v1alpha1.OpenTelemetryCollector{}, &v1alpha1.OpenTelemetryCollectorList{})
s.AddKnownTypes(v1.SchemeGroupVersion, &v1.Pod{}, &v1.PodList{})
metav1.AddToGroupVersion(s, v1alpha1.GroupVersion)
return nil
})
scheme := runtime.NewScheme()
err := schemeBuilder.AddToScheme(scheme)
require.NoError(t, err, "Should be able to add custom types")
c := fake.NewClientBuilder().WithScheme(scheme)
c := fake.NewClientBuilder().WithLists(lists...).WithScheme(scheme)
return operator.NewClient("test-bridge", l, c.Build(), conf.GetComponentsAllowed())
}

Expand Down Expand Up @@ -205,6 +236,7 @@ func TestAgent_getHealth(t *testing.T) {
LastError: "",
Status: "",
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
ComponentHealthMap: map[string]*protobufs.ComponentHealth{},
},
},
},
Expand Down Expand Up @@ -236,13 +268,53 @@ func TestAgent_getHealth(t *testing.T) {
LastError: "",
Status: "",
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
ComponentHealthMap: map[string]*protobufs.ComponentHealth{},
},
"testnamespace/other": {
Healthy: false, // we're working with mocks so the status will never be reconciled.
StartTimeUnixNano: collectorStartTime,
LastError: "",
Status: "",
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
ComponentHealthMap: map[string]*protobufs.ComponentHealth{},
},
},
},
},
},
{
name: "with pod health",
fields: fields{
configFile: agentTestFileName,
},
args: args{
ctx: context.Background(),
configs: []map[string]string{
{
thirdCollectorKey: collectorBasicFile,
},
},
},
want: []*protobufs.ComponentHealth{
{
Healthy: true,
StartTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
ComponentHealthMap: map[string]*protobufs.ComponentHealth{
"other/third": {
Healthy: false, // we're working with mocks so the status will never be reconciled.
StartTimeUnixNano: collectorStartTime,
LastError: "",
Status: "",
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
ComponentHealthMap: map[string]*protobufs.ComponentHealth{
otherCollectorName + "/" + thirdCollectorName + "-1": {
Healthy: true,
Status: "Running",
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StartTimeUnixNano: uint64(podTime.UnixNano()),
},
},
},
},
},
Expand All @@ -255,7 +327,7 @@ func TestAgent_getHealth(t *testing.T) {
conf := config.NewConfig(logr.Discard())
loadErr := config.LoadFromFile(conf, tt.fields.configFile)
require.NoError(t, loadErr, "should be able to load config")
applier := getFakeApplier(t, conf)
applier := getFakeApplier(t, conf, mockPodList)
agent := NewAgent(l, applier, conf, mockClient)
agent.clock = fakeClock
err := agent.Start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,24 @@ import (
"strings"
)

type collectorKey struct {
type kubeResourceKey struct {
name string
namespace string
}

func newCollectorKey(namespace string, name string) collectorKey {
return collectorKey{name: name, namespace: namespace}
func newKubeResourceKey(namespace string, name string) kubeResourceKey {
return kubeResourceKey{name: name, namespace: namespace}
}

func collectorKeyFromKey(key string) (collectorKey, error) {
func kubeResourceFromKey(key string) (kubeResourceKey, error) {
s := strings.Split(key, "/")
// We expect map keys to be of the form name/namespace
if len(s) != 2 {
return collectorKey{}, errors.New("invalid key")
return kubeResourceKey{}, errors.New("invalid key")
}
return newCollectorKey(s[0], s[1]), nil
return newKubeResourceKey(s[0], s[1]), nil
}

func (k collectorKey) String() string {
func (k kubeResourceKey) String() string {
return fmt.Sprintf("%s/%s", k.namespace, k.name)
}
Loading
Loading