From d5d6147689c9b61183016fcf54fbb4e75b2ff4b3 Mon Sep 17 00:00:00 2001 From: Mitali Salvi <44349099+mitali-salvi@users.noreply.github.com> Date: Mon, 3 Jun 2024 15:28:20 -0400 Subject: [PATCH] Implementing CI for EKS as a systemd service (#212) --- internal/k8sconfig/config.go | 6 ++ .../awscontainerinsightreceiver/config.go | 12 ++++ .../config_test.go | 14 +++++ .../internal/host/hostinfo.go | 24 ++++++-- .../internal/host/hostinfo_test.go | 56 +++++++++---------- .../internal/host/nodeCapacity.go | 19 ++++--- .../internal/host/nodeCapacity_test.go | 24 ++++---- .../internal/k8swindows/kubelet/client.go | 2 +- .../stores/kubeletutil/kubeletclient.go | 13 ++++- .../stores/kubeletutil/kubeletclient_test.go | 2 +- .../internal/stores/localnode.go | 7 ++- .../internal/stores/localnode_test.go | 17 ++++-- .../internal/stores/podstore.go | 53 +++++++++++------- .../internal/stores/servicestore.go | 8 ++- .../internal/stores/store.go | 16 +++--- .../awscontainerinsightreceiver/receiver.go | 33 ++++++----- .../receiver_test.go | 23 ++++++++ .../testdata/config.yaml | 5 ++ 18 files changed, 230 insertions(+), 104 deletions(-) diff --git a/internal/k8sconfig/config.go b/internal/k8sconfig/config.go index b1ef343d97b2..8b81497a1e1f 100644 --- a/internal/k8sconfig/config.go +++ b/internal/k8sconfig/config.go @@ -55,6 +55,9 @@ type APIConfig struct { // from `~/.kube/config`. AuthType AuthType `mapstructure:"auth_type"` + // When using auth_type `kubeConfig`, override default kubeConfig with custom path + KubeConfigPath string `mapstructure:"kube_config_path"` + // When using auth_type `kubeConfig`, override the current context. Context string `mapstructure:"context"` } @@ -87,6 +90,9 @@ func CreateRestConfig(apiConf APIConfig) (*rest.Config, error) { switch authType { case AuthTypeKubeConfig: loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() + if apiConf.KubeConfigPath != "" { + loadingRules.ExplicitPath = apiConf.KubeConfigPath + } configOverrides := &clientcmd.ConfigOverrides{} if apiConf.Context != "" { configOverrides.CurrentContext = apiConf.Context diff --git a/receiver/awscontainerinsightreceiver/config.go b/receiver/awscontainerinsightreceiver/config.go index 708ea1726747..4a2ae21c1ef6 100644 --- a/receiver/awscontainerinsightreceiver/config.go +++ b/receiver/awscontainerinsightreceiver/config.go @@ -60,4 +60,16 @@ type Config struct { // EnableAcceleratedComputeMetrics enabled features with accelerated compute resources where metrics are scraped from vendor specific sources EnableAcceleratedComputeMetrics bool `mapstructure:"accelerated_compute_metrics"` + + // KubeConfigPath is an optional attribute to override the default kube config path in an EC2 environment + KubeConfigPath string `mapstructure:"kube_config_path"` + + // HostIP is an optional attribute to override the default host_ip in an EC2 environment + HostIP string `mapstructure:"host_ip"` + + // HostName is an optional attribute to override the default host_name in an EC2 environment + HostName string `mapstructure:"host_name"` + + // RunOnSystemd is an optional attribute to run the receiver in an EC2 environment + RunOnSystemd bool `mapstructure:"run_on_systemd,omitempty"` } diff --git a/receiver/awscontainerinsightreceiver/config_test.go b/receiver/awscontainerinsightreceiver/config_test.go index dbaf5aeecd02..3d95c624f2c6 100644 --- a/receiver/awscontainerinsightreceiver/config_test.go +++ b/receiver/awscontainerinsightreceiver/config_test.go @@ -87,6 +87,20 @@ func TestLoadConfig(t *testing.T) { EnableControlPlaneMetrics: true, }, }, + { + id: component.NewIDWithName(metadata.Type, "custom_kube_config_path"), + expected: &Config{ + CollectionInterval: 60 * time.Second, + ContainerOrchestrator: "eks", + TagService: true, + PrefFullPodName: false, + LeaderLockName: "otel-container-insight-clusterleader", + KubeConfigPath: "custom_kube_config_path", + HostIP: "1.2.3.4", + HostName: "test-hostname", + RunOnSystemd: true, + }, + }, } for _, tt := range tests { diff --git a/receiver/awscontainerinsightreceiver/internal/host/hostinfo.go b/receiver/awscontainerinsightreceiver/internal/host/hostinfo.go index 96dc9d50ea54..51c9e1668f65 100644 --- a/receiver/awscontainerinsightreceiver/internal/host/hostinfo.go +++ b/receiver/awscontainerinsightreceiver/internal/host/hostinfo.go @@ -23,6 +23,7 @@ type Info struct { refreshInterval time.Duration containerOrchestrator string clusterName string + isSystemdEnabled bool // flag to indicate if agent is running on systemd in EC2 environment instanceIDReadyC chan bool // close of this channel indicates instance ID is ready instanceIPReadyC chan bool // close of this channel indicates instance Ip is ready @@ -35,17 +36,30 @@ type Info struct { ec2Tags ec2TagsProvider awsSessionCreator func(*zap.Logger, awsutil.ConnAttr, *awsutil.AWSSessionSettings) (*aws.Config, *session.Session, error) - nodeCapacityCreator func(*zap.Logger, ...nodeCapacityOption) (nodeCapacityProvider, error) + nodeCapacityCreator func(*zap.Logger, ...Option) (nodeCapacityProvider, error) ec2MetadataCreator func(context.Context, *session.Session, time.Duration, chan bool, chan bool, bool, int, *zap.Logger, ...ec2MetadataOption) ec2MetadataProvider ebsVolumeCreator func(context.Context, *session.Session, string, string, time.Duration, *zap.Logger, ...ebsVolumeOption) ebsVolumeProvider ec2TagsCreator func(context.Context, *session.Session, string, string, string, time.Duration, *zap.Logger, ...ec2TagsOption) ec2TagsProvider } -type Option func(*Info) +type Option func(any) func WithClusterName(name string) Option { - return func(info *Info) { - info.clusterName = name + return func(info any) { + if i, ok := info.(*Info); ok { + i.clusterName = name + } + } +} + +func WithSystemdEnabled(enabled bool) Option { + return func(info any) { + switch i := info.(type) { + case *Info: + i.isSystemdEnabled = enabled + case *nodeCapacity: + i.isSystemdEnabled = enabled + } } } @@ -75,7 +89,7 @@ func NewInfo(awsSessionSettings awsutil.AWSSessionSettings, containerOrchestrato opt(mInfo) } - nodeCapacity, err := mInfo.nodeCapacityCreator(logger) + nodeCapacity, err := mInfo.nodeCapacityCreator(logger, WithSystemdEnabled(mInfo.isSystemdEnabled)) if err != nil { return nil, fmt.Errorf("failed to initialize NodeCapacity: %w", err) } diff --git a/receiver/awscontainerinsightreceiver/internal/host/hostinfo_test.go b/receiver/awscontainerinsightreceiver/internal/host/hostinfo_test.go index 4d843af50e1f..b37f1bdc98d1 100644 --- a/receiver/awscontainerinsightreceiver/internal/host/hostinfo_test.go +++ b/receiver/awscontainerinsightreceiver/internal/host/hostinfo_test.go @@ -72,8 +72,8 @@ func (m *mockEC2Tags) getAutoScalingGroupName() string { func TestInfo(t *testing.T) { // test the case when nodeCapacity fails to initialize - nodeCapacityCreatorOpt := func(m *Info) { - m.nodeCapacityCreator = func(*zap.Logger, ...nodeCapacityOption) (nodeCapacityProvider, error) { + nodeCapacityCreatorOpt := func(m any) { + m.(*Info).nodeCapacityCreator = func(*zap.Logger, ...Option) (nodeCapacityProvider, error) { return nil, errors.New("error") } } @@ -82,13 +82,13 @@ func TestInfo(t *testing.T) { assert.Error(t, err) // test the case when aws session fails to initialize - nodeCapacityCreatorOpt = func(m *Info) { - m.nodeCapacityCreator = func(*zap.Logger, ...nodeCapacityOption) (nodeCapacityProvider, error) { + nodeCapacityCreatorOpt = func(m any) { + m.(*Info).nodeCapacityCreator = func(*zap.Logger, ...Option) (nodeCapacityProvider, error) { return &mockNodeCapacity{}, nil } } - awsSessionCreatorOpt := func(m *Info) { - m.awsSessionCreator = func(*zap.Logger, awsutil.ConnAttr, *awsutil.AWSSessionSettings) (*aws.Config, *session.Session, error) { + awsSessionCreatorOpt := func(m any) { + m.(*Info).awsSessionCreator = func(*zap.Logger, awsutil.ConnAttr, *awsutil.AWSSessionSettings) (*aws.Config, *session.Session, error) { return nil, nil, errors.New("error") } } @@ -97,25 +97,25 @@ func TestInfo(t *testing.T) { assert.Error(t, err) // test normal case where everything is working - awsSessionCreatorOpt = func(m *Info) { - m.awsSessionCreator = func(*zap.Logger, awsutil.ConnAttr, *awsutil.AWSSessionSettings) (*aws.Config, *session.Session, error) { + awsSessionCreatorOpt = func(m any) { + m.(*Info).awsSessionCreator = func(*zap.Logger, awsutil.ConnAttr, *awsutil.AWSSessionSettings) (*aws.Config, *session.Session, error) { return &aws.Config{}, &session.Session{}, nil } } - ec2MetadataCreatorOpt := func(m *Info) { - m.ec2MetadataCreator = func(context.Context, *session.Session, time.Duration, chan bool, chan bool, bool, int, *zap.Logger, + ec2MetadataCreatorOpt := func(m any) { + m.(*Info).ec2MetadataCreator = func(context.Context, *session.Session, time.Duration, chan bool, chan bool, bool, int, *zap.Logger, ...ec2MetadataOption) ec2MetadataProvider { return &mockEC2Metadata{} } } - ebsVolumeCreatorOpt := func(m *Info) { - m.ebsVolumeCreator = func(context.Context, *session.Session, string, string, time.Duration, *zap.Logger, + ebsVolumeCreatorOpt := func(m any) { + m.(*Info).ebsVolumeCreator = func(context.Context, *session.Session, string, string, time.Duration, *zap.Logger, ...ebsVolumeOption) ebsVolumeProvider { return &mockEBSVolume{} } } - ec2TagsCreatorOpt := func(m *Info) { - m.ec2TagsCreator = func(context.Context, *session.Session, string, string, string, time.Duration, *zap.Logger, + ec2TagsCreatorOpt := func(m any) { + m.(*Info).ec2TagsCreator = func(context.Context, *session.Session, string, string, string, time.Duration, *zap.Logger, ...ec2TagsOption) ec2TagsProvider { return &mockEC2Tags{} } @@ -154,8 +154,8 @@ func TestInfo(t *testing.T) { func TestInfoForECS(t *testing.T) { // test the case when nodeCapacity fails to initialize - nodeCapacityCreatorOpt := func(m *Info) { - m.nodeCapacityCreator = func(*zap.Logger, ...nodeCapacityOption) (nodeCapacityProvider, error) { + nodeCapacityCreatorOpt := func(m any) { + m.(*Info).nodeCapacityCreator = func(*zap.Logger, ...Option) (nodeCapacityProvider, error) { return nil, errors.New("error") } } @@ -164,13 +164,13 @@ func TestInfoForECS(t *testing.T) { assert.Error(t, err) // test the case when aws session fails to initialize - nodeCapacityCreatorOpt = func(m *Info) { - m.nodeCapacityCreator = func(*zap.Logger, ...nodeCapacityOption) (nodeCapacityProvider, error) { + nodeCapacityCreatorOpt = func(m any) { + m.(*Info).nodeCapacityCreator = func(*zap.Logger, ...Option) (nodeCapacityProvider, error) { return &mockNodeCapacity{}, nil } } - awsSessionCreatorOpt := func(m *Info) { - m.awsSessionCreator = func(*zap.Logger, awsutil.ConnAttr, *awsutil.AWSSessionSettings) (*aws.Config, *session.Session, error) { + awsSessionCreatorOpt := func(m any) { + m.(*Info).awsSessionCreator = func(*zap.Logger, awsutil.ConnAttr, *awsutil.AWSSessionSettings) (*aws.Config, *session.Session, error) { return nil, nil, errors.New("error") } } @@ -179,25 +179,25 @@ func TestInfoForECS(t *testing.T) { assert.Error(t, err) // test normal case where everything is working - awsSessionCreatorOpt = func(m *Info) { - m.awsSessionCreator = func(*zap.Logger, awsutil.ConnAttr, *awsutil.AWSSessionSettings) (*aws.Config, *session.Session, error) { + awsSessionCreatorOpt = func(m any) { + m.(*Info).awsSessionCreator = func(*zap.Logger, awsutil.ConnAttr, *awsutil.AWSSessionSettings) (*aws.Config, *session.Session, error) { return &aws.Config{}, &session.Session{}, nil } } - ec2MetadataCreatorOpt := func(m *Info) { - m.ec2MetadataCreator = func(context.Context, *session.Session, time.Duration, chan bool, chan bool, bool, int, *zap.Logger, + ec2MetadataCreatorOpt := func(m any) { + m.(*Info).ec2MetadataCreator = func(context.Context, *session.Session, time.Duration, chan bool, chan bool, bool, int, *zap.Logger, ...ec2MetadataOption) ec2MetadataProvider { return &mockEC2Metadata{} } } - ebsVolumeCreatorOpt := func(m *Info) { - m.ebsVolumeCreator = func(context.Context, *session.Session, string, string, time.Duration, *zap.Logger, + ebsVolumeCreatorOpt := func(m any) { + m.(*Info).ebsVolumeCreator = func(context.Context, *session.Session, string, string, time.Duration, *zap.Logger, ...ebsVolumeOption) ebsVolumeProvider { return &mockEBSVolume{} } } - ec2TagsCreatorOpt := func(m *Info) { - m.ec2TagsCreator = func(context.Context, *session.Session, string, string, string, time.Duration, *zap.Logger, + ec2TagsCreatorOpt := func(m any) { + m.(*Info).ec2TagsCreator = func(context.Context, *session.Session, string, string, string, time.Duration, *zap.Logger, ...ec2TagsOption) ec2TagsProvider { return &mockEC2Tags{} } diff --git a/receiver/awscontainerinsightreceiver/internal/host/nodeCapacity.go b/receiver/awscontainerinsightreceiver/internal/host/nodeCapacity.go index 0edf374dc850..c9be674be56e 100644 --- a/receiver/awscontainerinsightreceiver/internal/host/nodeCapacity.go +++ b/receiver/awscontainerinsightreceiver/internal/host/nodeCapacity.go @@ -22,9 +22,10 @@ type nodeCapacityProvider interface { } type nodeCapacity struct { - memCapacity int64 - cpuCapacity int64 - logger *zap.Logger + memCapacity int64 + cpuCapacity int64 + isSystemdEnabled bool // flag to indicate if agent is running on systemd in EC2 environment + logger *zap.Logger // osLstat returns a FileInfo describing the named file. osLstat func(name string) (os.FileInfo, error) @@ -32,9 +33,7 @@ type nodeCapacity struct { cpuInfo func(ctx context.Context) ([]cpu.InfoStat, error) } -type nodeCapacityOption func(*nodeCapacity) - -func newNodeCapacity(logger *zap.Logger, options ...nodeCapacityOption) (nodeCapacityProvider, error) { +func newNodeCapacity(logger *zap.Logger, options ...Option) (nodeCapacityProvider, error) { nc := &nodeCapacity{ logger: logger, osLstat: os.Lstat, @@ -48,10 +47,14 @@ func newNodeCapacity(logger *zap.Logger, options ...nodeCapacityOption) (nodeCap ctx := context.Background() if runtime.GOOS != ci.OperatingSystemWindows { - if _, err := nc.osLstat(hostProc); os.IsNotExist(err) { + procPath := hostProc + if nc.isSystemdEnabled { + procPath = "/proc" + } + if _, err := nc.osLstat(procPath); os.IsNotExist(err) { return nil, err } - envMap := common.EnvMap{common.HostProcEnvKey: hostProc} + envMap := common.EnvMap{common.HostProcEnvKey: procPath} ctx = context.WithValue(ctx, common.EnvKey, envMap) } diff --git a/receiver/awscontainerinsightreceiver/internal/host/nodeCapacity_test.go b/receiver/awscontainerinsightreceiver/internal/host/nodeCapacity_test.go index 8ab6ebe1cf8b..13d01274496d 100644 --- a/receiver/awscontainerinsightreceiver/internal/host/nodeCapacity_test.go +++ b/receiver/awscontainerinsightreceiver/internal/host/nodeCapacity_test.go @@ -20,8 +20,8 @@ import ( func TestNodeCapacity(t *testing.T) { // no proc directory - lstatOption := func(nc *nodeCapacity) { - nc.osLstat = func(string) (os.FileInfo, error) { + lstatOption := func(nc any) { + nc.(*nodeCapacity).osLstat = func(string) (os.FileInfo, error) { return nil, os.ErrNotExist } } @@ -30,19 +30,19 @@ func TestNodeCapacity(t *testing.T) { assert.Error(t, err) // can't set environment variables - lstatOption = func(nc *nodeCapacity) { - nc.osLstat = func(string) (os.FileInfo, error) { + lstatOption = func(nc any) { + nc.(*nodeCapacity).osLstat = func(string) (os.FileInfo, error) { return nil, nil } } - virtualMemOption := func(nc *nodeCapacity) { - nc.virtualMemory = func(context.Context) (*mem.VirtualMemoryStat, error) { + virtualMemOption := func(nc any) { + nc.(*nodeCapacity).virtualMemory = func(context.Context) (*mem.VirtualMemoryStat, error) { return nil, errors.New("error") } } - cpuInfoOption := func(nc *nodeCapacity) { - nc.cpuInfo = func(context.Context) ([]cpu.InfoStat, error) { + cpuInfoOption := func(nc any) { + nc.(*nodeCapacity).cpuInfo = func(context.Context) ([]cpu.InfoStat, error) { return nil, errors.New("error") } } @@ -53,15 +53,15 @@ func TestNodeCapacity(t *testing.T) { assert.Equal(t, int64(0), nc.getNumCores()) // normal case where everything is working - virtualMemOption = func(nc *nodeCapacity) { - nc.virtualMemory = func(context.Context) (*mem.VirtualMemoryStat, error) { + virtualMemOption = func(nc any) { + nc.(*nodeCapacity).virtualMemory = func(context.Context) (*mem.VirtualMemoryStat, error) { return &mem.VirtualMemoryStat{ Total: 1024, }, nil } } - cpuInfoOption = func(nc *nodeCapacity) { - nc.cpuInfo = func(context.Context) ([]cpu.InfoStat, error) { + cpuInfoOption = func(nc any) { + nc.(*nodeCapacity).cpuInfo = func(context.Context) ([]cpu.InfoStat, error) { return []cpu.InfoStat{ {}, {}, diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet/client.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet/client.go index 6592d842f95f..28194910d3d2 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet/client.go +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet/client.go @@ -32,7 +32,7 @@ func (kp *kubeletProvider) getClient() (*kubeletutil.KubeletClient, error) { if kp.client != nil { return kp.client, nil } - kclient, err := kubeletutil.NewKubeletClient(kp.hostIP, kp.hostPort, kp.logger) + kclient, err := kubeletutil.NewKubeletClient(kp.hostIP, kp.hostPort, "", kp.logger) if err != nil { kp.logger.Error("failed to initialize new kubelet client, ", zap.Error(err)) return nil, err diff --git a/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/kubeletclient.go b/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/kubeletclient.go index 8ec921a04322..1873d2480911 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/kubeletclient.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/kubeletclient.go @@ -24,7 +24,7 @@ type KubeletClient struct { restClient kubelet.Client } -func NewKubeletClient(kubeIP string, port string, logger *zap.Logger) (*KubeletClient, error) { +func NewKubeletClient(kubeIP string, port string, kubeConfigPath string, logger *zap.Logger) (*KubeletClient, error) { kubeClient := &KubeletClient{ Port: port, KubeIP: kubeIP, @@ -36,12 +36,21 @@ func NewKubeletClient(kubeIP string, port string, logger *zap.Logger) (*KubeletC } endpoint = endpoint + ":" + port - // use service account for authentication + // use service account for authentication by default clientConfig := &kubelet.ClientConfig{ APIConfig: k8sconfig.APIConfig{ AuthType: k8sconfig.AuthTypeServiceAccount, }, } + if kubeConfigPath != "" { + // use kube-config for authentication + clientConfig = &kubelet.ClientConfig{ + APIConfig: k8sconfig.APIConfig{ + AuthType: k8sconfig.AuthTypeKubeConfig, + KubeConfigPath: kubeConfigPath, + }, + } + } clientProvider, err := kubeletNewClientProvider(endpoint, clientConfig, logger) if err != nil { diff --git a/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/kubeletclient_test.go b/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/kubeletclient_test.go index 053769c8b97d..c766b66457fb 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/kubeletclient_test.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/kubeletclient_test.go @@ -62,7 +62,7 @@ func TestNewKubeletClient(t *testing.T) { }, } for _, tt := range tests { - client, err := NewKubeletClient(tt.kubeIP, tt.port, zap.NewNop()) + client, err := NewKubeletClient(tt.kubeIP, tt.port, "", zap.NewNop()) require.NoError(t, err) assert.Equal(t, client.KubeIP, tt.kubeIP) fc := (client.restClient).(*fakeClient) diff --git a/receiver/awscontainerinsightreceiver/internal/stores/localnode.go b/receiver/awscontainerinsightreceiver/internal/stores/localnode.go index 510d576c0448..eddfec048273 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/localnode.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/localnode.go @@ -41,10 +41,13 @@ type Decorator interface { Shutdown() error } -func NewLocalNodeDecorator(logger *zap.Logger, containerOrchestrator string, hostInfo hostInfo, options ...Option) (*LocalNodeDecorator, error) { +func NewLocalNodeDecorator(logger *zap.Logger, containerOrchestrator string, hostInfo hostInfo, hostName string, options ...Option) (*LocalNodeDecorator, error) { nodeName := os.Getenv(ci.HostName) if nodeName == "" && containerOrchestrator == ci.EKS { - return nil, fmt.Errorf("missing environment variable %s. Please check your deployment YAML config", ci.HostName) + nodeName = hostName + if nodeName == "" { + return nil, fmt.Errorf("missing environment variable %s. Please check your deployment YAML config or agent config", ci.HostName) + } } d := &LocalNodeDecorator{ diff --git a/receiver/awscontainerinsightreceiver/internal/stores/localnode_test.go b/receiver/awscontainerinsightreceiver/internal/stores/localnode_test.go index b7de79de5188..50ae5d43b53b 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/localnode_test.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/localnode_test.go @@ -20,25 +20,32 @@ var logger = zap.NewNop() func TestNewLocalNodeDecorator(t *testing.T) { // don't set HostName environment variable, expect error in eks - d, err := NewLocalNodeDecorator(logger, "eks", nil) + d, err := NewLocalNodeDecorator(logger, "eks", nil, "") assert.Nil(t, d) assert.Error(t, err) // don't expect error in ecs - d, err = NewLocalNodeDecorator(logger, "ecs", nil) + d, err = NewLocalNodeDecorator(logger, "ecs", nil, "") assert.NotNil(t, d) assert.NoError(t, err) + assert.Empty(t, d.nodeName) + + d, err = NewLocalNodeDecorator(logger, "eks", nil, "test-hostname") + assert.NotNil(t, d) + assert.NoError(t, err) + assert.Equal(t, d.nodeName, "test-hostname") t.Setenv(ci.HostName, "host") - d, err = NewLocalNodeDecorator(logger, "eks", nil) + d, err = NewLocalNodeDecorator(logger, "eks", nil, "") assert.NotNil(t, d) assert.NoError(t, err) + assert.Equal(t, d.nodeName, "host") } func TestEbsVolumeInfo(t *testing.T) { t.Setenv(ci.HostName, "host") hostInfo := testutils.MockHostInfo{} - d, err := NewLocalNodeDecorator(logger, "eks", hostInfo) + d, err := NewLocalNodeDecorator(logger, "eks", hostInfo, "") assert.NotNil(t, d) assert.NoError(t, err) @@ -129,7 +136,7 @@ func TestExpectedTags(t *testing.T) { for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - d, err := NewLocalNodeDecorator(logger, testCase.containerOrchestrator, hostInfo, WithK8sDecorator(k8sDecorator), WithECSInfo(&ecsInfo)) + d, err := NewLocalNodeDecorator(logger, testCase.containerOrchestrator, hostInfo, "", WithK8sDecorator(k8sDecorator), WithECSInfo(&ecsInfo)) assert.NotNil(t, d) assert.NoError(t, err) diff --git a/receiver/awscontainerinsightreceiver/internal/stores/podstore.go b/receiver/awscontainerinsightreceiver/internal/stores/podstore.go index fcec43518c6f..2239733d6302 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/podstore.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/podstore.go @@ -122,8 +122,8 @@ type PodStore struct { includeEnhancedMetrics bool } -func NewPodStore(hostIP string, prefFullPodName bool, addFullPodNameMetricLabel bool, includeEnhancedMetrics bool, logger *zap.Logger) (*PodStore, error) { - podClient, err := kubeletutil.NewKubeletClient(hostIP, ci.KubeSecurePort, logger) +func NewPodStore(hostIP string, prefFullPodName bool, addFullPodNameMetricLabel bool, includeEnhancedMetrics bool, kubeConfigPath string, hostName string, isSystemdEnabled bool, logger *zap.Logger) (*PodStore, error) { + podClient, err := kubeletutil.NewKubeletClient(hostIP, ci.KubeSecurePort, kubeConfigPath, logger) if err != nil { return nil, err } @@ -135,15 +135,28 @@ func NewPodStore(hostIP string, prefFullPodName bool, addFullPodNameMetricLabel nodeName := os.Getenv(ci.HostName) if nodeName == "" { - return nil, fmt.Errorf("missing environment variable %s. Please check your deployment YAML config", ci.HostName) + nodeName = hostName + if nodeName == "" { + return nil, fmt.Errorf("missing environment variable %s. Please check your deployment YAML config or passed as part of the agent config", ci.HostName) + } + } + + k8sClient := &k8sclient.K8sClient{} + nodeInfo := &nodeInfo{ + nodeName: nodeName, + provider: nil, + logger: logger, } + if !isSystemdEnabled { + k8sClient = k8sclient.Get(logger, + k8sclient.NodeSelector(fields.OneTermEqualSelector("metadata.name", nodeName)), + k8sclient.CaptureNodeLevelInfo(true), + ) - k8sClient := k8sclient.Get(logger, - k8sclient.NodeSelector(fields.OneTermEqualSelector("metadata.name", nodeName)), - k8sclient.CaptureNodeLevelInfo(true), - ) - if k8sClient == nil { - return nil, errors.New("failed to start pod store because k8sclient is nil") + if k8sClient == nil { + return nil, errors.New("failed to start pod store because k8sclient is nil") + } + nodeInfo = newNodeInfo(nodeName, k8sClient.GetNodeClient(), logger) } podStore := &PodStore{ @@ -151,7 +164,7 @@ func NewPodStore(hostIP string, prefFullPodName bool, addFullPodNameMetricLabel prevMeasurements: sync.Map{}, //prevMeasurements: make(map[string]*mapWithExpiry), podClient: podClient, - nodeInfo: newNodeInfo(nodeName, k8sClient.GetNodeClient(), logger), + nodeInfo: nodeInfo, prefFullPodName: prefFullPodName, includeEnhancedMetrics: includeEnhancedMetrics, k8sClient: k8sClient, @@ -358,7 +371,7 @@ func (p *PodStore) decorateNode(metric CIMetric) { metric.AddField(ci.MetricName(ci.TypeNode, ci.RunningPodCount), nodeStats.podCnt) metric.AddField(ci.MetricName(ci.TypeNode, ci.RunningContainerCount), nodeStats.containerCnt) - if p.includeEnhancedMetrics { + if p.includeEnhancedMetrics && p.nodeInfo.provider != nil { if nodeStatusCapacityPods, ok := p.nodeInfo.getNodeStatusCapacityPods(); ok { metric.AddField(ci.MetricName(ci.TypeNode, ci.StatusCapacityPods), nodeStatusCapacityPods) } @@ -723,14 +736,16 @@ func (p *PodStore) addPodOwnersAndPodName(metric CIMetric, pod *corev1.Pod, kube kind := owner.Kind name := owner.Name if owner.Kind == ci.ReplicaSet { - replicaSetClient := p.k8sClient.GetReplicaSetClient() - rsToDeployment := replicaSetClient.ReplicaSetToDeployment() - if parent := rsToDeployment[owner.Name]; parent != "" { - kind = ci.Deployment - name = parent - } else if parent := parseDeploymentFromReplicaSet(owner.Name); parent != "" { - kind = ci.Deployment - name = parent + if p.k8sClient != nil { + replicaSetClient := p.k8sClient.GetReplicaSetClient() + rsToDeployment := replicaSetClient.ReplicaSetToDeployment() + if parent := rsToDeployment[owner.Name]; parent != "" { + kind = ci.Deployment + name = parent + } else if parent := parseDeploymentFromReplicaSet(owner.Name); parent != "" { + kind = ci.Deployment + name = parent + } } } else if owner.Kind == ci.Job { if parent := parseCronJobFromJob(owner.Name); parent != "" { diff --git a/receiver/awscontainerinsightreceiver/internal/stores/servicestore.go b/receiver/awscontainerinsightreceiver/internal/stores/servicestore.go index 8d28474b0e39..94b4f21be8d3 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/servicestore.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/servicestore.go @@ -29,12 +29,18 @@ type ServiceStore struct { logger *zap.Logger } -func NewServiceStore(logger *zap.Logger) (*ServiceStore, error) { +func NewServiceStore(kubeConfigPath string, logger *zap.Logger) (*ServiceStore, error) { s := &ServiceStore{ podKeyToServiceNamesMap: make(map[string][]string), logger: logger, } k8sClient := k8sclient.Get(logger) + if kubeConfigPath != "" { + k8sClient = k8sclient.Get(logger, + k8sclient.KubeConfigPath(kubeConfigPath), + ) + } + if k8sClient == nil { return nil, errors.New("failed to start service store because k8sclient is nil") } diff --git a/receiver/awscontainerinsightreceiver/internal/stores/store.go b/receiver/awscontainerinsightreceiver/internal/stores/store.go index 3752d4ac98d9..b5f6d5ba7c34 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/store.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/store.go @@ -43,10 +43,13 @@ type K8sDecorator struct { podStore *PodStore } -func NewK8sDecorator(ctx context.Context, tagService bool, prefFullPodName bool, addFullPodNameMetricLabel bool, addContainerNameMetricLabel bool, includeEnhancedMetrics bool, logger *zap.Logger) (*K8sDecorator, error) { +func NewK8sDecorator(ctx context.Context, tagService bool, prefFullPodName bool, addFullPodNameMetricLabel bool, addContainerNameMetricLabel bool, includeEnhancedMetrics bool, kubeConfigPath string, customHostIP string, customHostName string, isSystemd bool, logger *zap.Logger) (*K8sDecorator, error) { hostIP := os.Getenv("HOST_IP") if hostIP == "" { - return nil, errors.New("environment variable HOST_IP is not set in k8s deployment config") + hostIP = customHostIP + if hostIP == "" { + return nil, errors.New("environment variable HOST_IP is not set in k8s deployment config or passed as part of the agent config") + } } k := &K8sDecorator{ @@ -54,7 +57,7 @@ func NewK8sDecorator(ctx context.Context, tagService bool, prefFullPodName bool, addContainerNameMetricLabel: addContainerNameMetricLabel, } - podstore, err := NewPodStore(hostIP, prefFullPodName, addFullPodNameMetricLabel, includeEnhancedMetrics, logger) + podstore, err := NewPodStore(hostIP, prefFullPodName, addFullPodNameMetricLabel, includeEnhancedMetrics, kubeConfigPath, customHostName, isSystemd, logger) if err != nil { return nil, err } @@ -62,11 +65,10 @@ func NewK8sDecorator(ctx context.Context, tagService bool, prefFullPodName bool, k.stores = append(k.stores, podstore) if tagService { - servicestore, err := NewServiceStore(logger) - if err != nil { - return nil, err + servicestore, err := NewServiceStore(kubeConfigPath, logger) + if err == nil { + k.stores = append(k.stores, servicestore) } - k.stores = append(k.stores, servicestore) } go func() { diff --git a/receiver/awscontainerinsightreceiver/receiver.go b/receiver/awscontainerinsightreceiver/receiver.go index 79b463f40209..7758ccf55f15 100644 --- a/receiver/awscontainerinsightreceiver/receiver.go +++ b/receiver/awscontainerinsightreceiver/receiver.go @@ -72,16 +72,17 @@ func newAWSContainerInsightReceiver( func (acir *awsContainerInsightReceiver) Start(ctx context.Context, host component.Host) error { ctx, acir.cancel = context.WithCancel(ctx) - hostinfo, err := hostInfo.NewInfo(acir.config.AWSSessionSettings, acir.config.ContainerOrchestrator, acir.config.CollectionInterval, acir.settings.Logger, hostInfo.WithClusterName(acir.config.ClusterName)) + hostinfo, err := hostInfo.NewInfo(acir.config.AWSSessionSettings, acir.config.ContainerOrchestrator, acir.config.CollectionInterval, acir.settings.Logger, hostInfo.WithClusterName(acir.config.ClusterName), hostInfo.WithSystemdEnabled(acir.config.RunOnSystemd)) if err != nil { return err } if acir.config.ContainerOrchestrator == ci.EKS { - k8sDecorator, err := stores.NewK8sDecorator(ctx, acir.config.TagService, acir.config.PrefFullPodName, acir.config.AddFullPodNameMetricLabel, acir.config.AddContainerNameMetricLabel, acir.config.EnableControlPlaneMetrics, acir.settings.Logger) - acir.decorators = append(acir.decorators, k8sDecorator) + k8sDecorator, err := stores.NewK8sDecorator(ctx, acir.config.TagService, acir.config.PrefFullPodName, acir.config.AddFullPodNameMetricLabel, acir.config.AddContainerNameMetricLabel, acir.config.EnableControlPlaneMetrics, acir.config.KubeConfigPath, acir.config.HostIP, acir.config.HostName, acir.config.RunOnSystemd, acir.settings.Logger) if err != nil { - return err + acir.settings.Logger.Warn("Unable to start K8s decorator", zap.Error(err)) + } else { + acir.decorators = append(acir.decorators, k8sDecorator) } if runtime.GOOS == ci.OperatingSystemWindows { @@ -91,11 +92,12 @@ func (acir *awsContainerInsightReceiver) Start(ctx context.Context, host compone } } else { localnodeDecorator, err := stores.NewLocalNodeDecorator(acir.settings.Logger, acir.config.ContainerOrchestrator, - hostinfo, stores.WithK8sDecorator(k8sDecorator)) + hostinfo, acir.config.HostName, stores.WithK8sDecorator(k8sDecorator)) if err != nil { - return err + acir.settings.Logger.Warn("Unable to start local node decorator", zap.Error(err)) + } else { + acir.decorators = append(acir.decorators, localnodeDecorator) } - acir.decorators = append(acir.decorators, localnodeDecorator) acir.containerMetricsProvider, err = cadvisor.New(acir.config.ContainerOrchestrator, hostinfo, acir.settings.Logger, cadvisor.WithDecorator(localnodeDecorator)) @@ -107,17 +109,22 @@ func (acir *awsContainerInsightReceiver) Start(ctx context.Context, host compone leaderElection, err = k8sapiserver.NewLeaderElection(acir.settings.Logger, k8sapiserver.WithLeaderLockName(acir.config.LeaderLockName), k8sapiserver.WithLeaderLockUsingConfigMapOnly(acir.config.LeaderLockUsingConfigMapOnly)) if err != nil { - return err + acir.settings.Logger.Warn("Unable to elect leader node", zap.Error(err)) } acir.k8sapiserver, err = k8sapiserver.NewK8sAPIServer(hostinfo, acir.settings.Logger, leaderElection, acir.config.AddFullPodNameMetricLabel, acir.config.EnableControlPlaneMetrics, acir.config.EnableAcceleratedComputeMetrics) if err != nil { - return err + acir.k8sapiserver = nil + acir.settings.Logger.Warn("Unable to connect to api-server", zap.Error(err)) } - err = acir.initPrometheusScraper(ctx, host, hostinfo, leaderElection) - if err != nil { - acir.settings.Logger.Debug("Unable to start kube apiserver prometheus scraper", zap.Error(err)) + + if acir.k8sapiserver != nil { + err = acir.initPrometheusScraper(ctx, host, hostinfo, leaderElection) + if err != nil { + acir.settings.Logger.Warn("Unable to start kube apiserver prometheus scraper", zap.Error(err)) + } } + err = acir.initDcgmScraper(ctx, host, hostinfo, k8sDecorator) if err != nil { acir.settings.Logger.Debug("Unable to start dcgm scraper", zap.Error(err)) @@ -143,7 +150,7 @@ func (acir *awsContainerInsightReceiver) Start(ctx context.Context, host compone } localnodeDecorator, err := stores.NewLocalNodeDecorator(acir.settings.Logger, acir.config.ContainerOrchestrator, - hostinfo, stores.WithECSInfo(ecsInfo)) + hostinfo, acir.config.HostName, stores.WithECSInfo(ecsInfo)) if err != nil { return err } diff --git a/receiver/awscontainerinsightreceiver/receiver_test.go b/receiver/awscontainerinsightreceiver/receiver_test.go index 7ffde0d67335..8eed8d7b625a 100644 --- a/receiver/awscontainerinsightreceiver/receiver_test.go +++ b/receiver/awscontainerinsightreceiver/receiver_test.go @@ -135,3 +135,26 @@ func TestCollectDataWithECS(t *testing.T) { err = r.collectData(ctx) require.Error(t, err) } + +func TestCollectDataWithSystemd(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.ContainerOrchestrator = ci.EKS + cfg.KubeConfigPath = "/tmp/kube-config" + cfg.HostIP = "1.2.3.4" + metricsReceiver, err := newAWSContainerInsightReceiver( + componenttest.NewNopTelemetrySettings(), + cfg, + new(consumertest.MetricsSink), + ) + + require.NoError(t, err) + require.NotNil(t, metricsReceiver) + + r := metricsReceiver.(*awsContainerInsightReceiver) + _ = r.Start(context.Background(), nil) + ctx := context.Background() + + r.containerMetricsProvider = &mockCadvisor{} + err = r.collectData(ctx) + require.Nil(t, err) +} diff --git a/receiver/awscontainerinsightreceiver/testdata/config.yaml b/receiver/awscontainerinsightreceiver/testdata/config.yaml index 68c276ea24d8..a4881409e80d 100644 --- a/receiver/awscontainerinsightreceiver/testdata/config.yaml +++ b/receiver/awscontainerinsightreceiver/testdata/config.yaml @@ -10,3 +10,8 @@ awscontainerinsightreceiver/leader_lock_using_config_map_only: leader_lock_using_config_map_only: true awscontainerinsightreceiver/enable_control_plane_metrics: enable_control_plane_metrics: true +awscontainerinsightreceiver/custom_kube_config_path: + kube_config_path: "custom_kube_config_path" + host_ip: "1.2.3.4" + host_name: "test-hostname" + run_on_systemd: true