Skip to content

Commit

Permalink
Implementing CI for EKS as a systemd service (#212)
Browse files Browse the repository at this point in the history
  • Loading branch information
mitali-salvi authored Jun 3, 2024
1 parent 2728c19 commit d5d6147
Show file tree
Hide file tree
Showing 18 changed files with 230 additions and 104 deletions.
6 changes: 6 additions & 0 deletions internal/k8sconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ type APIConfig struct {
// from `~/.kube/config`.
AuthType AuthType `mapstructure:"auth_type"`

// When using auth_type `kubeConfig`, override default kubeConfig with custom path
KubeConfigPath string `mapstructure:"kube_config_path"`

// When using auth_type `kubeConfig`, override the current context.
Context string `mapstructure:"context"`
}
Expand Down Expand Up @@ -87,6 +90,9 @@ func CreateRestConfig(apiConf APIConfig) (*rest.Config, error) {
switch authType {
case AuthTypeKubeConfig:
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
if apiConf.KubeConfigPath != "" {
loadingRules.ExplicitPath = apiConf.KubeConfigPath
}
configOverrides := &clientcmd.ConfigOverrides{}
if apiConf.Context != "" {
configOverrides.CurrentContext = apiConf.Context
Expand Down
12 changes: 12 additions & 0 deletions receiver/awscontainerinsightreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,16 @@ type Config struct {

// EnableAcceleratedComputeMetrics enabled features with accelerated compute resources where metrics are scraped from vendor specific sources
EnableAcceleratedComputeMetrics bool `mapstructure:"accelerated_compute_metrics"`

// KubeConfigPath is an optional attribute to override the default kube config path in an EC2 environment
KubeConfigPath string `mapstructure:"kube_config_path"`

// HostIP is an optional attribute to override the default host_ip in an EC2 environment
HostIP string `mapstructure:"host_ip"`

// HostName is an optional attribute to override the default host_name in an EC2 environment
HostName string `mapstructure:"host_name"`

// RunOnSystemd is an optional attribute to run the receiver in an EC2 environment
RunOnSystemd bool `mapstructure:"run_on_systemd,omitempty"`
}
14 changes: 14 additions & 0 deletions receiver/awscontainerinsightreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,20 @@ func TestLoadConfig(t *testing.T) {
EnableControlPlaneMetrics: true,
},
},
{
id: component.NewIDWithName(metadata.Type, "custom_kube_config_path"),
expected: &Config{
CollectionInterval: 60 * time.Second,
ContainerOrchestrator: "eks",
TagService: true,
PrefFullPodName: false,
LeaderLockName: "otel-container-insight-clusterleader",
KubeConfigPath: "custom_kube_config_path",
HostIP: "1.2.3.4",
HostName: "test-hostname",
RunOnSystemd: true,
},
},
}

for _, tt := range tests {
Expand Down
24 changes: 19 additions & 5 deletions receiver/awscontainerinsightreceiver/internal/host/hostinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Info struct {
refreshInterval time.Duration
containerOrchestrator string
clusterName string
isSystemdEnabled bool // flag to indicate if agent is running on systemd in EC2 environment
instanceIDReadyC chan bool // close of this channel indicates instance ID is ready
instanceIPReadyC chan bool // close of this channel indicates instance Ip is ready

Expand All @@ -35,17 +36,30 @@ type Info struct {
ec2Tags ec2TagsProvider

awsSessionCreator func(*zap.Logger, awsutil.ConnAttr, *awsutil.AWSSessionSettings) (*aws.Config, *session.Session, error)
nodeCapacityCreator func(*zap.Logger, ...nodeCapacityOption) (nodeCapacityProvider, error)
nodeCapacityCreator func(*zap.Logger, ...Option) (nodeCapacityProvider, error)
ec2MetadataCreator func(context.Context, *session.Session, time.Duration, chan bool, chan bool, bool, int, *zap.Logger, ...ec2MetadataOption) ec2MetadataProvider
ebsVolumeCreator func(context.Context, *session.Session, string, string, time.Duration, *zap.Logger, ...ebsVolumeOption) ebsVolumeProvider
ec2TagsCreator func(context.Context, *session.Session, string, string, string, time.Duration, *zap.Logger, ...ec2TagsOption) ec2TagsProvider
}

type Option func(*Info)
type Option func(any)

func WithClusterName(name string) Option {
return func(info *Info) {
info.clusterName = name
return func(info any) {
if i, ok := info.(*Info); ok {
i.clusterName = name
}
}
}

func WithSystemdEnabled(enabled bool) Option {
return func(info any) {
switch i := info.(type) {
case *Info:
i.isSystemdEnabled = enabled
case *nodeCapacity:
i.isSystemdEnabled = enabled
}
}
}

Expand Down Expand Up @@ -75,7 +89,7 @@ func NewInfo(awsSessionSettings awsutil.AWSSessionSettings, containerOrchestrato
opt(mInfo)
}

nodeCapacity, err := mInfo.nodeCapacityCreator(logger)
nodeCapacity, err := mInfo.nodeCapacityCreator(logger, WithSystemdEnabled(mInfo.isSystemdEnabled))
if err != nil {
return nil, fmt.Errorf("failed to initialize NodeCapacity: %w", err)
}
Expand Down
56 changes: 28 additions & 28 deletions receiver/awscontainerinsightreceiver/internal/host/hostinfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func (m *mockEC2Tags) getAutoScalingGroupName() string {

func TestInfo(t *testing.T) {
// test the case when nodeCapacity fails to initialize
nodeCapacityCreatorOpt := func(m *Info) {
m.nodeCapacityCreator = func(*zap.Logger, ...nodeCapacityOption) (nodeCapacityProvider, error) {
nodeCapacityCreatorOpt := func(m any) {
m.(*Info).nodeCapacityCreator = func(*zap.Logger, ...Option) (nodeCapacityProvider, error) {
return nil, errors.New("error")
}
}
Expand All @@ -82,13 +82,13 @@ func TestInfo(t *testing.T) {
assert.Error(t, err)

// test the case when aws session fails to initialize
nodeCapacityCreatorOpt = func(m *Info) {
m.nodeCapacityCreator = func(*zap.Logger, ...nodeCapacityOption) (nodeCapacityProvider, error) {
nodeCapacityCreatorOpt = func(m any) {
m.(*Info).nodeCapacityCreator = func(*zap.Logger, ...Option) (nodeCapacityProvider, error) {
return &mockNodeCapacity{}, nil
}
}
awsSessionCreatorOpt := func(m *Info) {
m.awsSessionCreator = func(*zap.Logger, awsutil.ConnAttr, *awsutil.AWSSessionSettings) (*aws.Config, *session.Session, error) {
awsSessionCreatorOpt := func(m any) {
m.(*Info).awsSessionCreator = func(*zap.Logger, awsutil.ConnAttr, *awsutil.AWSSessionSettings) (*aws.Config, *session.Session, error) {
return nil, nil, errors.New("error")
}
}
Expand All @@ -97,25 +97,25 @@ func TestInfo(t *testing.T) {
assert.Error(t, err)

// test normal case where everything is working
awsSessionCreatorOpt = func(m *Info) {
m.awsSessionCreator = func(*zap.Logger, awsutil.ConnAttr, *awsutil.AWSSessionSettings) (*aws.Config, *session.Session, error) {
awsSessionCreatorOpt = func(m any) {
m.(*Info).awsSessionCreator = func(*zap.Logger, awsutil.ConnAttr, *awsutil.AWSSessionSettings) (*aws.Config, *session.Session, error) {
return &aws.Config{}, &session.Session{}, nil
}
}
ec2MetadataCreatorOpt := func(m *Info) {
m.ec2MetadataCreator = func(context.Context, *session.Session, time.Duration, chan bool, chan bool, bool, int, *zap.Logger,
ec2MetadataCreatorOpt := func(m any) {
m.(*Info).ec2MetadataCreator = func(context.Context, *session.Session, time.Duration, chan bool, chan bool, bool, int, *zap.Logger,
...ec2MetadataOption) ec2MetadataProvider {
return &mockEC2Metadata{}
}
}
ebsVolumeCreatorOpt := func(m *Info) {
m.ebsVolumeCreator = func(context.Context, *session.Session, string, string, time.Duration, *zap.Logger,
ebsVolumeCreatorOpt := func(m any) {
m.(*Info).ebsVolumeCreator = func(context.Context, *session.Session, string, string, time.Duration, *zap.Logger,
...ebsVolumeOption) ebsVolumeProvider {
return &mockEBSVolume{}
}
}
ec2TagsCreatorOpt := func(m *Info) {
m.ec2TagsCreator = func(context.Context, *session.Session, string, string, string, time.Duration, *zap.Logger,
ec2TagsCreatorOpt := func(m any) {
m.(*Info).ec2TagsCreator = func(context.Context, *session.Session, string, string, string, time.Duration, *zap.Logger,
...ec2TagsOption) ec2TagsProvider {
return &mockEC2Tags{}
}
Expand Down Expand Up @@ -154,8 +154,8 @@ func TestInfo(t *testing.T) {

func TestInfoForECS(t *testing.T) {
// test the case when nodeCapacity fails to initialize
nodeCapacityCreatorOpt := func(m *Info) {
m.nodeCapacityCreator = func(*zap.Logger, ...nodeCapacityOption) (nodeCapacityProvider, error) {
nodeCapacityCreatorOpt := func(m any) {
m.(*Info).nodeCapacityCreator = func(*zap.Logger, ...Option) (nodeCapacityProvider, error) {
return nil, errors.New("error")
}
}
Expand All @@ -164,13 +164,13 @@ func TestInfoForECS(t *testing.T) {
assert.Error(t, err)

// test the case when aws session fails to initialize
nodeCapacityCreatorOpt = func(m *Info) {
m.nodeCapacityCreator = func(*zap.Logger, ...nodeCapacityOption) (nodeCapacityProvider, error) {
nodeCapacityCreatorOpt = func(m any) {
m.(*Info).nodeCapacityCreator = func(*zap.Logger, ...Option) (nodeCapacityProvider, error) {
return &mockNodeCapacity{}, nil
}
}
awsSessionCreatorOpt := func(m *Info) {
m.awsSessionCreator = func(*zap.Logger, awsutil.ConnAttr, *awsutil.AWSSessionSettings) (*aws.Config, *session.Session, error) {
awsSessionCreatorOpt := func(m any) {
m.(*Info).awsSessionCreator = func(*zap.Logger, awsutil.ConnAttr, *awsutil.AWSSessionSettings) (*aws.Config, *session.Session, error) {
return nil, nil, errors.New("error")
}
}
Expand All @@ -179,25 +179,25 @@ func TestInfoForECS(t *testing.T) {
assert.Error(t, err)

// test normal case where everything is working
awsSessionCreatorOpt = func(m *Info) {
m.awsSessionCreator = func(*zap.Logger, awsutil.ConnAttr, *awsutil.AWSSessionSettings) (*aws.Config, *session.Session, error) {
awsSessionCreatorOpt = func(m any) {
m.(*Info).awsSessionCreator = func(*zap.Logger, awsutil.ConnAttr, *awsutil.AWSSessionSettings) (*aws.Config, *session.Session, error) {
return &aws.Config{}, &session.Session{}, nil
}
}
ec2MetadataCreatorOpt := func(m *Info) {
m.ec2MetadataCreator = func(context.Context, *session.Session, time.Duration, chan bool, chan bool, bool, int, *zap.Logger,
ec2MetadataCreatorOpt := func(m any) {
m.(*Info).ec2MetadataCreator = func(context.Context, *session.Session, time.Duration, chan bool, chan bool, bool, int, *zap.Logger,
...ec2MetadataOption) ec2MetadataProvider {
return &mockEC2Metadata{}
}
}
ebsVolumeCreatorOpt := func(m *Info) {
m.ebsVolumeCreator = func(context.Context, *session.Session, string, string, time.Duration, *zap.Logger,
ebsVolumeCreatorOpt := func(m any) {
m.(*Info).ebsVolumeCreator = func(context.Context, *session.Session, string, string, time.Duration, *zap.Logger,
...ebsVolumeOption) ebsVolumeProvider {
return &mockEBSVolume{}
}
}
ec2TagsCreatorOpt := func(m *Info) {
m.ec2TagsCreator = func(context.Context, *session.Session, string, string, string, time.Duration, *zap.Logger,
ec2TagsCreatorOpt := func(m any) {
m.(*Info).ec2TagsCreator = func(context.Context, *session.Session, string, string, string, time.Duration, *zap.Logger,
...ec2TagsOption) ec2TagsProvider {
return &mockEC2Tags{}
}
Expand Down
19 changes: 11 additions & 8 deletions receiver/awscontainerinsightreceiver/internal/host/nodeCapacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,18 @@ type nodeCapacityProvider interface {
}

type nodeCapacity struct {
memCapacity int64
cpuCapacity int64
logger *zap.Logger
memCapacity int64
cpuCapacity int64
isSystemdEnabled bool // flag to indicate if agent is running on systemd in EC2 environment
logger *zap.Logger

// osLstat returns a FileInfo describing the named file.
osLstat func(name string) (os.FileInfo, error)
virtualMemory func(ctx context.Context) (*mem.VirtualMemoryStat, error)
cpuInfo func(ctx context.Context) ([]cpu.InfoStat, error)
}

type nodeCapacityOption func(*nodeCapacity)

func newNodeCapacity(logger *zap.Logger, options ...nodeCapacityOption) (nodeCapacityProvider, error) {
func newNodeCapacity(logger *zap.Logger, options ...Option) (nodeCapacityProvider, error) {
nc := &nodeCapacity{
logger: logger,
osLstat: os.Lstat,
Expand All @@ -48,10 +47,14 @@ func newNodeCapacity(logger *zap.Logger, options ...nodeCapacityOption) (nodeCap

ctx := context.Background()
if runtime.GOOS != ci.OperatingSystemWindows {
if _, err := nc.osLstat(hostProc); os.IsNotExist(err) {
procPath := hostProc
if nc.isSystemdEnabled {
procPath = "/proc"
}
if _, err := nc.osLstat(procPath); os.IsNotExist(err) {
return nil, err
}
envMap := common.EnvMap{common.HostProcEnvKey: hostProc}
envMap := common.EnvMap{common.HostProcEnvKey: procPath}
ctx = context.WithValue(ctx, common.EnvKey, envMap)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (

func TestNodeCapacity(t *testing.T) {
// no proc directory
lstatOption := func(nc *nodeCapacity) {
nc.osLstat = func(string) (os.FileInfo, error) {
lstatOption := func(nc any) {
nc.(*nodeCapacity).osLstat = func(string) (os.FileInfo, error) {
return nil, os.ErrNotExist
}
}
Expand All @@ -30,19 +30,19 @@ func TestNodeCapacity(t *testing.T) {
assert.Error(t, err)

// can't set environment variables
lstatOption = func(nc *nodeCapacity) {
nc.osLstat = func(string) (os.FileInfo, error) {
lstatOption = func(nc any) {
nc.(*nodeCapacity).osLstat = func(string) (os.FileInfo, error) {
return nil, nil
}
}

virtualMemOption := func(nc *nodeCapacity) {
nc.virtualMemory = func(context.Context) (*mem.VirtualMemoryStat, error) {
virtualMemOption := func(nc any) {
nc.(*nodeCapacity).virtualMemory = func(context.Context) (*mem.VirtualMemoryStat, error) {
return nil, errors.New("error")
}
}
cpuInfoOption := func(nc *nodeCapacity) {
nc.cpuInfo = func(context.Context) ([]cpu.InfoStat, error) {
cpuInfoOption := func(nc any) {
nc.(*nodeCapacity).cpuInfo = func(context.Context) ([]cpu.InfoStat, error) {
return nil, errors.New("error")
}
}
Expand All @@ -53,15 +53,15 @@ func TestNodeCapacity(t *testing.T) {
assert.Equal(t, int64(0), nc.getNumCores())

// normal case where everything is working
virtualMemOption = func(nc *nodeCapacity) {
nc.virtualMemory = func(context.Context) (*mem.VirtualMemoryStat, error) {
virtualMemOption = func(nc any) {
nc.(*nodeCapacity).virtualMemory = func(context.Context) (*mem.VirtualMemoryStat, error) {
return &mem.VirtualMemoryStat{
Total: 1024,
}, nil
}
}
cpuInfoOption = func(nc *nodeCapacity) {
nc.cpuInfo = func(context.Context) ([]cpu.InfoStat, error) {
cpuInfoOption = func(nc any) {
nc.(*nodeCapacity).cpuInfo = func(context.Context) ([]cpu.InfoStat, error) {
return []cpu.InfoStat{
{},
{},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (kp *kubeletProvider) getClient() (*kubeletutil.KubeletClient, error) {
if kp.client != nil {
return kp.client, nil
}
kclient, err := kubeletutil.NewKubeletClient(kp.hostIP, kp.hostPort, kp.logger)
kclient, err := kubeletutil.NewKubeletClient(kp.hostIP, kp.hostPort, "", kp.logger)
if err != nil {
kp.logger.Error("failed to initialize new kubelet client, ", zap.Error(err))
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type KubeletClient struct {
restClient kubelet.Client
}

func NewKubeletClient(kubeIP string, port string, logger *zap.Logger) (*KubeletClient, error) {
func NewKubeletClient(kubeIP string, port string, kubeConfigPath string, logger *zap.Logger) (*KubeletClient, error) {
kubeClient := &KubeletClient{
Port: port,
KubeIP: kubeIP,
Expand All @@ -36,12 +36,21 @@ func NewKubeletClient(kubeIP string, port string, logger *zap.Logger) (*KubeletC
}
endpoint = endpoint + ":" + port

// use service account for authentication
// use service account for authentication by default
clientConfig := &kubelet.ClientConfig{
APIConfig: k8sconfig.APIConfig{
AuthType: k8sconfig.AuthTypeServiceAccount,
},
}
if kubeConfigPath != "" {
// use kube-config for authentication
clientConfig = &kubelet.ClientConfig{
APIConfig: k8sconfig.APIConfig{
AuthType: k8sconfig.AuthTypeKubeConfig,
KubeConfigPath: kubeConfigPath,
},
}
}

clientProvider, err := kubeletNewClientProvider(endpoint, clientConfig, logger)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestNewKubeletClient(t *testing.T) {
},
}
for _, tt := range tests {
client, err := NewKubeletClient(tt.kubeIP, tt.port, zap.NewNop())
client, err := NewKubeletClient(tt.kubeIP, tt.port, "", zap.NewNop())
require.NoError(t, err)
assert.Equal(t, client.KubeIP, tt.kubeIP)
fc := (client.restClient).(*fakeClient)
Expand Down
Loading

0 comments on commit d5d6147

Please sign in to comment.