diff --git a/internal/aws/k8s/k8sclient/clientset.go b/internal/aws/k8s/k8sclient/clientset.go index 046701e23bc7..092bac316a02 100644 --- a/internal/aws/k8s/k8sclient/clientset.go +++ b/internal/aws/k8s/k8sclient/clientset.go @@ -45,6 +45,19 @@ var mu = &sync.Mutex{} var optionsToK8sClient = map[string]*K8sClient{} +type stopper interface { + shutdown() +} + +func shutdownClient(client stopper, mu *sync.Mutex, afterShutdown func()) { + mu.Lock() + if client != nil { + client.shutdown() + afterShutdown() + } + mu.Unlock() +} + type cacheReflector interface { LastSyncResourceVersion() string Run(<-chan struct{}) @@ -132,19 +145,54 @@ func Get(logger *zap.Logger, options ...Option) *K8sClient { return optionsToK8sClient[strOptions] } +type epClientWithStopper interface { + EpClient + stopper +} + +type jobClientWithStopper interface { + JobClient + stopper +} + +type nodeClientWithStopper interface { + NodeClient + stopper +} + +type podClientWithStopper interface { + PodClient + stopper +} + +type replicaSetClientWithStopper interface { + ReplicaSetClient + stopper +} + type K8sClient struct { kubeConfigPath string initSyncPollInterval time.Duration initSyncPollTimeout time.Duration - ClientSet kubernetes.Interface + clientSet kubernetes.Interface + + syncChecker *reflectorSyncChecker + + epMu sync.Mutex + ep epClientWithStopper - Ep EpClient - Pod PodClient - Node NodeClient + podMu sync.Mutex + pod podClientWithStopper - Job JobClient - ReplicaSet ReplicaSetClient + nodeMu sync.Mutex + node nodeClientWithStopper + + jobMu sync.Mutex + job jobClientWithStopper + + rsMu sync.Mutex + replicaSet replicaSetClientWithStopper logger *zap.Logger } @@ -177,55 +225,121 @@ func (c *K8sClient) init(logger *zap.Logger, options ...Option) error { return err } - syncChecker := &reflectorSyncChecker{ + c.syncChecker = &reflectorSyncChecker{ pollInterval: c.initSyncPollInterval, pollTimeout: c.initSyncPollTimeout, logger: c.logger, } - c.ClientSet = client - c.Ep = newEpClient(client, c.logger, epSyncCheckerOption(syncChecker)) - c.Pod = newPodClient(client, c.logger, podSyncCheckerOption(syncChecker)) - c.Node = newNodeClient(client, c.logger, nodeSyncCheckerOption(syncChecker)) + c.clientSet = client + c.ep = nil + c.pod = nil + c.node = nil + c.job = nil + c.replicaSet = nil - c.Job, err = newJobClient(client, c.logger, jobSyncCheckerOption(syncChecker)) - if err != nil { - c.logger.Error("use an no-op job client instead because of error", zap.Error(err)) - c.Job = &noOpJobClient{} + return nil +} + +func (c *K8sClient) GetEpClient() EpClient { + c.epMu.Lock() + if c.ep == nil { + c.ep = newEpClient(c.clientSet, c.logger, epSyncCheckerOption(c.syncChecker)) } - c.ReplicaSet, err = newReplicaSetClient(client, c.logger, replicaSetSyncCheckerOption(syncChecker)) - if err != nil { - c.logger.Error("use an no-op replica set client instead because of error", zap.Error(err)) - c.ReplicaSet = &noOpReplicaSetClient{} + c.epMu.Unlock() + return c.ep +} + +func (c *K8sClient) ShutdownEpClient() { + shutdownClient(c.ep, &c.epMu, func() { + c.ep = nil + }) +} + +func (c *K8sClient) GetPodClient() PodClient { + c.podMu.Lock() + if c.pod == nil { + c.pod = newPodClient(c.clientSet, c.logger, podSyncCheckerOption(c.syncChecker)) } + c.podMu.Unlock() + return c.pod +} - return nil +func (c *K8sClient) ShutdownPodClient() { + shutdownClient(c.pod, &c.podMu, func() { + c.pod = nil + }) +} + +func (c *K8sClient) GetNodeClient() NodeClient { + c.nodeMu.Lock() + if c.node == nil { + c.node = newNodeClient(c.clientSet, c.logger, nodeSyncCheckerOption(c.syncChecker)) + } + c.nodeMu.Unlock() + return c.node +} + +func (c *K8sClient) ShutdownNodeClient() { + shutdownClient(c.node, &c.nodeMu, func() { + c.node = nil + }) +} + +func (c *K8sClient) GetJobClient() JobClient { + var err error + c.jobMu.Lock() + if c.job == nil { + c.job, err = newJobClient(c.clientSet, c.logger, jobSyncCheckerOption(c.syncChecker)) + if err != nil { + c.logger.Error("use an no-op job client instead because of error", zap.Error(err)) + c.job = &noOpJobClient{} + } + } + c.jobMu.Unlock() + return c.job +} + +func (c *K8sClient) ShutdownJobClient() { + shutdownClient(c.job, &c.jobMu, func() { + c.job = nil + }) } +func (c *K8sClient) GetReplicaSetClient() ReplicaSetClient { + var err error + c.rsMu.Lock() + if c.replicaSet == nil || reflect.ValueOf(c.replicaSet).IsNil() { + c.replicaSet, err = newReplicaSetClient(c.clientSet, c.logger, replicaSetSyncCheckerOption(c.syncChecker)) + if err != nil { + c.logger.Error("use an no-op replica set client instead because of error", zap.Error(err)) + c.replicaSet = &noOpReplicaSetClient{} + } + } + c.rsMu.Unlock() + return c.replicaSet +} + +func (c *K8sClient) ShutdownReplicaSetClient() { + shutdownClient(c.replicaSet, &c.rsMu, func() { + c.replicaSet = nil + }) +} + +func (c *K8sClient) GetClientSet() kubernetes.Interface { + return c.clientSet +} + +// Shutdown stops K8sClient func (c *K8sClient) Shutdown() { mu.Lock() defer mu.Unlock() - if c.Ep != nil && !reflect.ValueOf(c.Ep).IsNil() { - c.Ep.shutdown() - c.Ep = nil - } - if c.Pod != nil && !reflect.ValueOf(c.Pod).IsNil() { - c.Pod.Shutdown() - c.Pod = nil - } - if c.Node != nil && !reflect.ValueOf(c.Node).IsNil() { - c.Node.Shutdown() - c.Node = nil - } - if c.Job != nil && !reflect.ValueOf(c.Job).IsNil() { - c.Job.shutdown() - c.Job = nil - } - if c.ReplicaSet != nil && !reflect.ValueOf(c.ReplicaSet).IsNil() { - c.ReplicaSet.shutdown() - c.ReplicaSet = nil - } + c.ShutdownEpClient() + c.ShutdownPodClient() + c.ShutdownNodeClient() + c.ShutdownJobClient() + c.ShutdownReplicaSetClient() // remove the current instance of k8s client from map for key, val := range optionsToK8sClient { diff --git a/internal/aws/k8s/k8sclient/clientset_test.go b/internal/aws/k8s/k8sclient/clientset_test.go index 4189aa58ec62..9f4e844f734a 100644 --- a/internal/aws/k8s/k8sclient/clientset_test.go +++ b/internal/aws/k8s/k8sclient/clientset_test.go @@ -31,12 +31,18 @@ func TestGetShutdown(t *testing.T) { InitSyncPollTimeout(20*time.Nanosecond), ) assert.Equal(t, 1, len(optionsToK8sClient)) + assert.NotNil(t, k8sClient.GetClientSet()) + assert.NotNil(t, k8sClient.GetEpClient()) + assert.NotNil(t, k8sClient.GetJobClient()) + assert.NotNil(t, k8sClient.GetNodeClient()) + assert.NotNil(t, k8sClient.GetPodClient()) + assert.NotNil(t, k8sClient.GetReplicaSetClient()) k8sClient.Shutdown() - assert.Nil(t, k8sClient.Ep) - assert.Nil(t, k8sClient.Job) - assert.Nil(t, k8sClient.Node) - assert.Nil(t, k8sClient.Pod) - assert.Nil(t, k8sClient.ReplicaSet) + assert.Nil(t, k8sClient.ep) + assert.Nil(t, k8sClient.job) + assert.Nil(t, k8sClient.node) + assert.Nil(t, k8sClient.pod) + assert.Nil(t, k8sClient.replicaSet) assert.Equal(t, 0, len(optionsToK8sClient)) removeTempKubeConfig() } diff --git a/internal/aws/k8s/k8sclient/endpoint.go b/internal/aws/k8s/k8sclient/endpoint.go index 7deede876111..722a64e56cf2 100644 --- a/internal/aws/k8s/k8sclient/endpoint.go +++ b/internal/aws/k8s/k8sclient/endpoint.go @@ -48,9 +48,6 @@ type EpClient interface { PodKeyToServiceNames() map[string][]string // Get the mapping between the service and the number of belonging pods ServiceToPodNum() map[Service]int - - // shutdown is only used internally by clientset to stop the EpClient - shutdown() } type epClientOption func(*epClient) diff --git a/internal/aws/k8s/k8sclient/job.go b/internal/aws/k8s/k8sclient/job.go index b69060464320..ea028800fcea 100644 --- a/internal/aws/k8s/k8sclient/job.go +++ b/internal/aws/k8s/k8sclient/job.go @@ -36,9 +36,6 @@ const ( type JobClient interface { // get the mapping between job and cronjob JobToCronJob() map[string]string - - // shutdown is only used internally by clientset to stop the JobClient - shutdown() } type noOpJobClient struct { diff --git a/internal/aws/k8s/k8sclient/node.go b/internal/aws/k8s/k8sclient/node.go index 1349a259d3e1..3b29577aadea 100644 --- a/internal/aws/k8s/k8sclient/node.go +++ b/internal/aws/k8s/k8sclient/node.go @@ -41,8 +41,6 @@ type NodeClient interface { ClusterFailedNodeCount() int // Get the number of nodes for current cluster ClusterNodeCount() int - // Shutdown stops the NodeClient - Shutdown() } type nodeClientOption func(*nodeClient) @@ -142,7 +140,7 @@ func newNodeClient(clientSet kubernetes.Interface, logger *zap.Logger, options . return c } -func (c *nodeClient) Shutdown() { +func (c *nodeClient) shutdown() { c.mu.Lock() defer c.mu.Unlock() diff --git a/internal/aws/k8s/k8sclient/node_test.go b/internal/aws/k8s/k8sclient/node_test.go index c5afb974ae90..2d669fed27bd 100644 --- a/internal/aws/k8s/k8sclient/node_test.go +++ b/internal/aws/k8s/k8sclient/node_test.go @@ -317,7 +317,7 @@ func TestNodeClient(t *testing.T) { assert.Equal(t, clusterNodeCount, expectedClusterNodeCount) assert.Equal(t, clusterFailedNodeCount, expectedClusterFailedNodeCount) - client.Shutdown() + client.shutdown() assert.True(t, client.stopped) } diff --git a/internal/aws/k8s/k8sclient/pod.go b/internal/aws/k8s/k8sclient/pod.go index 10092671064a..56629a67d04d 100644 --- a/internal/aws/k8s/k8sclient/pod.go +++ b/internal/aws/k8s/k8sclient/pod.go @@ -31,8 +31,6 @@ import ( type PodClient interface { // Get the mapping between the namespace and the number of belonging pods NamespaceToRunningPodNum() map[string]int - // Shutdown stops the PodClient - Shutdown() } type podClientOption func(*podClient) @@ -106,7 +104,7 @@ func newPodClient(clientSet kubernetes.Interface, logger *zap.Logger, options .. return c } -func (c *podClient) Shutdown() { +func (c *podClient) shutdown() { c.mu.Lock() defer c.mu.Unlock() close(c.stopChan) diff --git a/internal/aws/k8s/k8sclient/pod_test.go b/internal/aws/k8s/k8sclient/pod_test.go index ca2c08810662..460510c4f938 100644 --- a/internal/aws/k8s/k8sclient/pod_test.go +++ b/internal/aws/k8s/k8sclient/pod_test.go @@ -188,7 +188,7 @@ func TestPodClient_NamespaceToRunningPodNum(t *testing.T) { resultMap := client.NamespaceToRunningPodNum() log.Printf("NamespaceToRunningPodNum (len=%v): %v", len(resultMap), awsutil.Prettify(resultMap)) assert.True(t, reflect.DeepEqual(resultMap, expectedMap)) - client.Shutdown() + client.shutdown() assert.True(t, client.stopped) } diff --git a/internal/aws/k8s/k8sclient/replicaset.go b/internal/aws/k8s/k8sclient/replicaset.go index c7751f992ec7..2f155a57b988 100644 --- a/internal/aws/k8s/k8sclient/replicaset.go +++ b/internal/aws/k8s/k8sclient/replicaset.go @@ -36,9 +36,6 @@ const ( type ReplicaSetClient interface { // Get the mapping between replica set and deployment ReplicaSetToDeployment() map[string]string - - // shutdown is only used internally by clientset to stop the ReplicaSetClient - shutdown() } type noOpReplicaSetClient struct { diff --git a/receiver/awscontainerinsightreceiver/config.go b/receiver/awscontainerinsightreceiver/config.go index ad41da2b542e..25fd64381ddc 100644 --- a/receiver/awscontainerinsightreceiver/config.go +++ b/receiver/awscontainerinsightreceiver/config.go @@ -29,4 +29,12 @@ type Config struct { // ContainerOrchestrator is the type of container orchestration service, e.g. eks or ecs. The default is eks. ContainerOrchestrator string `mapstructure:"container_orchestrator"` + + // Whether to add the associated service name as attribute. The default is true + TagService bool `mapstructure:"add_service_as_attribute"` + + // The "PodName" attribute is set based on the name of the relevant controllers like Daemonset, Job, ReplicaSet, ReplicationController, ... + // If it can not be set that way and PrefFullPodName is true, the "PodName" attribute is set to the pod's own name. + // The default value is false + PrefFullPodName bool `mapstructure:"prefer_full_pod_name"` } diff --git a/receiver/awscontainerinsightreceiver/config_test.go b/receiver/awscontainerinsightreceiver/config_test.go index f39414d998e5..0994eb1a77b1 100644 --- a/receiver/awscontainerinsightreceiver/config_test.go +++ b/receiver/awscontainerinsightreceiver/config_test.go @@ -52,5 +52,7 @@ func TestLoadConfig(t *testing.T) { ReceiverSettings: config.NewReceiverSettings(config.NewIDWithName(typeStr, "collection_interval_settings")), CollectionInterval: 60 * time.Second, ContainerOrchestrator: "eks", + TagService: true, + PrefFullPodName: false, }) } diff --git a/receiver/awscontainerinsightreceiver/factory.go b/receiver/awscontainerinsightreceiver/factory.go index 68d1a686c610..f1c5d6f1638f 100644 --- a/receiver/awscontainerinsightreceiver/factory.go +++ b/receiver/awscontainerinsightreceiver/factory.go @@ -34,6 +34,12 @@ const ( // Default container orchestrator service is aws eks defaultContainerOrchestrator = "eks" + + // Metrics is tagged with service name by default + defaultTagService = true + + // Don't use pod full name by default (as the full names contain suffix with random characters) + defaultPrefFullPodName = false ) // NewFactory creates a factory for AWS container insight receiver @@ -50,6 +56,8 @@ func createDefaultConfig() config.Receiver { ReceiverSettings: config.NewReceiverSettings(config.NewID(typeStr)), CollectionInterval: defaultCollectionInterval, ContainerOrchestrator: defaultContainerOrchestrator, + TagService: defaultTagService, + PrefFullPodName: defaultPrefFullPodName, } } diff --git a/receiver/awscontainerinsightreceiver/go.sum b/receiver/awscontainerinsightreceiver/go.sum index 5871d125f93a..d72e8939a85f 100644 --- a/receiver/awscontainerinsightreceiver/go.sum +++ b/receiver/awscontainerinsightreceiver/go.sum @@ -210,6 +210,7 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:ma github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw= github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/crossdock/crossdock-go v0.0.0-20160816171116-049aabb0122b/go.mod h1:v9FBN7gdVTpiD/+LZ7Po0UKvROyT87uLVxTHVky/dlQ= github.com/cyphar/filepath-securejoin v0.2.2 h1:jCwT2GTP+PY5nBz3c/YL5PAIbusElVrPujOBSCj8xRg= @@ -549,7 +550,9 @@ github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2z github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/securecookie v1.1.1 h1:miw7JPhV+b/lAHSXz4qd/nN9jRiAFV5FwjeKyCS8BvQ= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1 h1:DHd3rPN5lE3Ts3D8rKkQ8x/0kqfeNmBAaiSi+o7FsgI= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= @@ -1223,6 +1226,7 @@ golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.4.1 h1:Kvvh58BN8Y9/lBi7hTekvtMpm07eUZ0ck5pRHpsMWrY= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180530234432-1e491301e022/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux.go index f5d3288ff832..f57eb8b92ae3 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux.go @@ -76,18 +76,35 @@ var defaultCreateManager = func(memoryCache *memory.InMemoryCache, sysfs sysfs.S return manager.New(memoryCache, sysfs, houskeepingConfig, includedMetricsSet, collectorHTTPClient, rawContainerCgroupPathPrefixWhiteList, perfEventsFile) } -type cadvisorOption func(*Cadvisor) +// Option is a function that can be used to configure Cadvisor struct +type Option func(*Cadvisor) -func cadvisorManagerCreator(f createCadvisorManager) cadvisorOption { +func cadvisorManagerCreator(f createCadvisorManager) Option { return func(c *Cadvisor) { c.createCadvisorManager = f } } +// WithDecorator constructs an option for configuring the metric decorator +func WithDecorator(d Decorator) Option { + return func(c *Cadvisor) { + c.k8sDecorator = d + } +} + type hostInfo interface { GetNumCores() int64 GetMemoryCapacity() int64 GetClusterName() string + GetEBSVolumeID(string) string + ExtractEbsIDsUsedByKubernetes() map[string]string + GetInstanceID() string + GetInstanceType() string + GetAutoScalingGroupName() string +} + +type Decorator interface { + Decorate(*extractors.CAdvisorMetric) *extractors.CAdvisorMetric } type Cadvisor struct { @@ -97,6 +114,7 @@ type Cadvisor struct { manager cadvisorManager version string hostInfo hostInfo + k8sDecorator Decorator containerOrchestrator string } @@ -107,7 +125,7 @@ func init() { } // New creates a Cadvisor struct which can generate metrics from embedded cadvisor lib -func New(containerOrchestrator string, hostInfo hostInfo, logger *zap.Logger, options ...cadvisorOption) (*Cadvisor, error) { +func New(containerOrchestrator string, hostInfo hostInfo, logger *zap.Logger, options ...Option) (*Cadvisor, error) { nodeName := os.Getenv("HOST_NAME") if nodeName == "" { return nil, errors.New("missing environment variable HOST_NAME. Please check your deployment YAML config") @@ -140,6 +158,63 @@ func GetMetricsExtractors() []extractors.MetricExtractor { return metricsExtractors } +func (c *Cadvisor) addEbsVolumeInfo(tags map[string]string, ebsVolumeIdsUsedAsPV map[string]string) { + deviceName, ok := tags[ci.DiskDev] + if !ok { + return + } + + if c.hostInfo != nil { + if volID := c.hostInfo.GetEBSVolumeID(deviceName); volID != "" { + tags[ci.HostEbsVolumeID] = volID + } + } + + if tags[ci.MetricType] == ci.TypeContainerFS || tags[ci.MetricType] == ci.TypeNodeFS || + tags[ci.MetricType] == ci.TypeNodeDiskIO || tags[ci.MetricType] == ci.TypeContainerDiskIO { + if volID := ebsVolumeIdsUsedAsPV[deviceName]; volID != "" { + tags[ci.EbsVolumeID] = volID + } + } +} + +func (c *Cadvisor) decorateMetrics(cadvisormetrics []*extractors.CAdvisorMetric) []*extractors.CAdvisorMetric { + ebsVolumeIdsUsedAsPV := c.hostInfo.ExtractEbsIDsUsedByKubernetes() + var result []*extractors.CAdvisorMetric + for _, m := range cadvisormetrics { + tags := m.GetTags() + c.addEbsVolumeInfo(tags, ebsVolumeIdsUsedAsPV) + + //add version + tags[ci.Version] = c.version + + //add NodeName for node, pod and container + metricType := tags[ci.MetricType] + if c.nodeName != "" && (ci.IsNode(metricType) || ci.IsInstance(metricType) || + ci.IsPod(metricType) || ci.IsContainer(metricType)) { + tags[ci.NodeNameKey] = c.nodeName + } + + //add instance id and type + if instanceID := c.hostInfo.GetInstanceID(); instanceID != "" { + tags[ci.InstanceID] = instanceID + } + if instanceType := c.hostInfo.GetInstanceType(); instanceType != "" { + tags[ci.InstanceType] = instanceType + } + + //add cluster name and auto scaling group name + tags[ci.ClusterNameKey] = c.hostInfo.GetClusterName() + tags[ci.AutoScalingGroupNameKey] = c.hostInfo.GetAutoScalingGroupName() + out := c.k8sDecorator.Decorate(m) + if out != nil { + result = append(result, out) + } + } + + return result +} + // GetMetrics generates metrics from cadvisor func (c *Cadvisor) GetMetrics() []pdata.Metrics { c.logger.Debug("collect data from cadvisor...") @@ -165,7 +240,9 @@ func (c *Cadvisor) GetMetrics() []pdata.Metrics { } c.logger.Debug("cadvisor containers stats", zap.Int("size", len(containerinfos))) - results := processContainers(containerinfos, c.hostInfo, c.containerOrchestrator, c.logger) + out := processContainers(containerinfos, c.hostInfo, c.containerOrchestrator, c.logger) + + results := c.decorateMetrics(out) for _, cadvisorMetric := range results { md := ci.ConvertToOTLPMetrics(cadvisorMetric.GetFields(), cadvisorMetric.GetTags(), c.logger) diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux_test.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux_test.go index 9c51fa64fb2a..82d8951f22a1 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux_test.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux_test.go @@ -30,6 +30,7 @@ import ( "github.com/stretchr/testify/assert" "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/testutils" ) @@ -72,24 +73,32 @@ var mockCreateManagerWithError = func(memoryCache *memory.InMemoryCache, sysfs s return nil, errors.New("error") } +type MockK8sDecorator struct { +} + +func (m *MockK8sDecorator) Decorate(metric *extractors.CAdvisorMetric) *extractors.CAdvisorMetric { + return metric +} + func TestGetMetrics(t *testing.T) { // normal case originalHostName := os.Getenv("HOST_NAME") os.Setenv("HOST_NAME", "host") hostInfo := testutils.MockHostInfo{ClusterName: "cluster"} + k8sdecoratorOption := WithDecorator(&MockK8sDecorator{}) mockCreateManager := func(memoryCache *memory.InMemoryCache, sysfs sysfs.SysFs, houskeepingConfig manager.HouskeepingConfig, includedMetricsSet container.MetricSet, collectorHTTPClient *http.Client, rawContainerCgroupPathPrefixWhiteList []string, perfEventsFile string) (cadvisorManager, error) { return &mockCadvisorManager{t: t}, nil } - c, err := New("eks", hostInfo, zap.NewNop(), cadvisorManagerCreator(mockCreateManager)) + c, err := New("eks", hostInfo, zap.NewNop(), cadvisorManagerCreator(mockCreateManager), k8sdecoratorOption) assert.NotNil(t, c) assert.Nil(t, err) assert.NotNil(t, c.GetMetrics()) os.Setenv("HOST_NAME", originalHostName) // no environmental variable - c, err = New("eks", hostInfo, zap.NewNop(), cadvisorManagerCreator(mockCreateManager)) + c, err = New("eks", hostInfo, zap.NewNop(), cadvisorManagerCreator(mockCreateManager), k8sdecoratorOption) assert.Nil(t, c) assert.NotNil(t, err) @@ -97,7 +106,7 @@ func TestGetMetrics(t *testing.T) { originalHostName = os.Getenv("HOST_NAME") os.Setenv("HOST_NAME", "host") hostInfo = testutils.MockHostInfo{} - c, err = New("eks", hostInfo, zap.NewNop(), cadvisorManagerCreator(mockCreateManager)) + c, err = New("eks", hostInfo, zap.NewNop(), cadvisorManagerCreator(mockCreateManager), k8sdecoratorOption) assert.NotNil(t, c) assert.Nil(t, err) assert.Nil(t, c.GetMetrics()) @@ -107,7 +116,7 @@ func TestGetMetrics(t *testing.T) { originalHostName = os.Getenv("HOST_NAME") os.Setenv("HOST_NAME", "host") hostInfo = testutils.MockHostInfo{ClusterName: "cluster"} - c, err = New("eks", hostInfo, zap.NewNop(), cadvisorManagerCreator(mockCreateManager2)) + c, err = New("eks", hostInfo, zap.NewNop(), cadvisorManagerCreator(mockCreateManager2), k8sdecoratorOption) assert.Nil(t, c) assert.NotNil(t, err) os.Setenv("HOST_NAME", originalHostName) @@ -116,7 +125,7 @@ func TestGetMetrics(t *testing.T) { originalHostName = os.Getenv("HOST_NAME") os.Setenv("HOST_NAME", "host") hostInfo = testutils.MockHostInfo{ClusterName: "cluster"} - c, err = New("eks", hostInfo, zap.NewNop(), cadvisorManagerCreator(mockCreateManagerWithError)) + c, err = New("eks", hostInfo, zap.NewNop(), cadvisorManagerCreator(mockCreateManagerWithError), k8sdecoratorOption) assert.Nil(t, c) assert.NotNil(t, err) diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_nolinux.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_nolinux.go index 8b9d86c364c7..996a722d9b53 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_nolinux.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_nolinux.go @@ -19,6 +19,8 @@ package cadvisor import ( "go.opentelemetry.io/collector/consumer/pdata" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" ) // cadvisor doesn't support windows, define the dummy functions @@ -33,8 +35,22 @@ type hostInfo interface { type Cadvisor struct { } +type Decorator interface { + Decorate(*extractors.CAdvisorMetric) *extractors.CAdvisorMetric +} + +// Option is a function that can be used to configure Cadvisor struct +type Option func(*Cadvisor) + +// WithDecorator constructs an option for configuring the metric decorator +func WithDecorator(d interface{}) Option { + return func(c *Cadvisor) { + // do nothing + } +} + // New is a dummy function to construct a dummy Cadvisor struct for windows -func New(containerOrchestrator string, hostInfo hostInfo, logger *zap.Logger) (*Cadvisor, error) { +func New(containerOrchestrator string, hostInfo hostInfo, logger *zap.Logger, options ...Option) (*Cadvisor, error) { return &Cadvisor{}, nil } diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractor.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractor.go index 2788eb392bef..dbb5dc5c0003 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractor.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractor.go @@ -86,6 +86,34 @@ func (c *CAdvisorMetric) AddTags(tags map[string]string) { } } +func (c *CAdvisorMetric) HasField(key string) bool { + return c.fields[key] != nil +} + +func (c *CAdvisorMetric) AddField(key string, val interface{}) { + c.fields[key] = val +} + +func (c *CAdvisorMetric) GetField(key string) interface{} { + return c.fields[key] +} + +func (c *CAdvisorMetric) HasTag(key string) bool { + return c.tags[key] != "" +} + +func (c *CAdvisorMetric) AddTag(key, val string) { + c.tags[key] = val +} + +func (c *CAdvisorMetric) GetTag(key string) string { + return c.tags[key] +} + +func (c *CAdvisorMetric) RemoveTag(key string) { + delete(c.tags, key) +} + func (c *CAdvisorMetric) Merge(src *CAdvisorMetric) { // If there is any conflict, keep the fields with earlier timestamp for k, v := range src.fields { diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/testutils/helpers.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/testutils/helpers.go index fa702c91acd2..85306593701d 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/testutils/helpers.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/testutils/helpers.go @@ -62,3 +62,23 @@ type MockHostInfo struct { func (m MockHostInfo) GetClusterName() string { return m.ClusterName } + +func (m MockHostInfo) GetEBSVolumeID(string) string { + return "ebs-volume-id" +} + +func (m MockHostInfo) GetInstanceID() string { + return "instance-id" +} + +func (m MockHostInfo) GetInstanceType() string { + return "instance-id" +} + +func (m MockHostInfo) GetAutoScalingGroupName() string { + return "asg" +} + +func (m MockHostInfo) ExtractEbsIDsUsedByKubernetes() map[string]string { + return map[string]string{} +} diff --git a/receiver/awscontainerinsightreceiver/internal/host/ec2tags.go b/receiver/awscontainerinsightreceiver/internal/host/ec2tags.go index 4c3bdcab6039..68ef3f9288d9 100644 --- a/receiver/awscontainerinsightreceiver/internal/host/ec2tags.go +++ b/receiver/awscontainerinsightreceiver/internal/host/ec2tags.go @@ -55,11 +55,11 @@ type ec2Tags struct { type ec2TagsOption func(*ec2Tags) -func newEC2Tags(ctx context.Context, session *session.Session, instanceID string, +func newEC2Tags(ctx context.Context, session *session.Session, instanceID string, region string, refreshInterval time.Duration, logger *zap.Logger, options ...ec2TagsOption) ec2TagsProvider { et := &ec2Tags{ instanceID: instanceID, - client: ec2.New(session), + client: ec2.New(session, aws.NewConfig().WithRegion(region)), refreshInterval: refreshInterval, maxJitterTime: 3 * time.Second, logger: logger, diff --git a/receiver/awscontainerinsightreceiver/internal/host/ec2tags_test.go b/receiver/awscontainerinsightreceiver/internal/host/ec2tags_test.go index ebb4308b3727..1ae47e675de1 100644 --- a/receiver/awscontainerinsightreceiver/internal/host/ec2tags_test.go +++ b/receiver/awscontainerinsightreceiver/internal/host/ec2tags_test.go @@ -78,7 +78,7 @@ func TestEC2Tags(t *testing.T) { isSucessOption := func(e *ec2Tags) { e.isSucess = make(chan bool) } - et := newEC2Tags(ctx, sess, "instanceId", time.Millisecond, zap.NewNop(), clientOption, + et := newEC2Tags(ctx, sess, "instanceId", "us-west-2", time.Millisecond, zap.NewNop(), clientOption, maxJitterOption, isSucessOption) // wait for ec2 tags are fetched diff --git a/receiver/awscontainerinsightreceiver/internal/host/hostinfo.go b/receiver/awscontainerinsightreceiver/internal/host/hostinfo.go index 13cc8b316b8e..39647da8029c 100644 --- a/receiver/awscontainerinsightreceiver/internal/host/hostinfo.go +++ b/receiver/awscontainerinsightreceiver/internal/host/hostinfo.go @@ -46,7 +46,7 @@ type Info struct { nodeCapacityCreator func(*zap.Logger, ...nodeCapacityOption) (nodeCapacityProvider, error) ec2MetadataCreator func(context.Context, *session.Session, time.Duration, chan bool, *zap.Logger, ...ec2MetadataOption) ec2MetadataProvider ebsVolumeCreator func(context.Context, *session.Session, string, string, time.Duration, *zap.Logger, ...ebsVolumeOption) ebsVolumeProvider - ec2TagsCreator func(context.Context, *session.Session, string, time.Duration, *zap.Logger, ...ec2TagsOption) ec2TagsProvider + ec2TagsCreator func(context.Context, *session.Session, string, string, time.Duration, *zap.Logger, ...ec2TagsOption) ec2TagsProvider } type machineInfoOption func(*Info) @@ -107,7 +107,7 @@ func (m *Info) lazyInitEBSVolume(ctx context.Context) { func (m *Info) lazyInitEC2Tags(ctx context.Context) { //wait until the instance id is ready <-m.instanceIDReadyC - m.ec2Tags = m.ec2TagsCreator(ctx, m.awsSession, m.GetInstanceID(), m.refreshInterval, m.logger) + m.ec2Tags = m.ec2TagsCreator(ctx, m.awsSession, m.GetInstanceID(), m.GetRegion(), m.refreshInterval, m.logger) close(m.ec2TagsReadyC) } @@ -163,6 +163,14 @@ func (m *Info) GetAutoScalingGroupName() string { return "" } +// ExtractEbsIDsUsedByKubernetes extracts the ebs volume id used by kubernetes cluster from host mount file +func (m *Info) ExtractEbsIDsUsedByKubernetes() map[string]string { + if m.ebsVolume != nil { + return m.ebsVolume.extractEbsIDsUsedByKubernetes() + } + return map[string]string{} +} + // Shutdown stops the host Info func (m *Info) Shutdown() { m.cancel() diff --git a/receiver/awscontainerinsightreceiver/internal/host/hostinfo_test.go b/receiver/awscontainerinsightreceiver/internal/host/hostinfo_test.go index d3c43af46a9f..fc9ebb722f3a 100644 --- a/receiver/awscontainerinsightreceiver/internal/host/hostinfo_test.go +++ b/receiver/awscontainerinsightreceiver/internal/host/hostinfo_test.go @@ -121,7 +121,7 @@ func TestInfo(t *testing.T) { } } ec2TagsCreatorOpt := func(m *Info) { - m.ec2TagsCreator = func(context.Context, *session.Session, string, time.Duration, *zap.Logger, + m.ec2TagsCreator = func(context.Context, *session.Session, string, string, time.Duration, *zap.Logger, ...ec2TagsOption) ec2TagsProvider { return &mockEC2Tags{} } diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go index cd20b54839f3..654b7f85871a 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go @@ -29,6 +29,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/leaderelection" @@ -57,6 +58,15 @@ type eventBroadcaster interface { NewRecorder(scheme *runtime.Scheme, source v1.EventSource) record.EventRecorder } +type K8sClient interface { + GetClientSet() kubernetes.Interface + GetEpClient() k8sclient.EpClient + GetNodeClient() k8sclient.NodeClient + GetPodClient() k8sclient.PodClient + ShutdownNodeClient() + ShutdownPodClient() +} + // K8sAPIServer is a struct that produces metrics from kubernetes api server type K8sAPIServer struct { nodeName string //get the value from downward API @@ -67,7 +77,10 @@ type K8sAPIServer struct { mu sync.Mutex leading bool - k8sClient *k8sclient.K8sClient + k8sClient K8sClient //*k8sclient.K8sClient + epClient k8sclient.EpClient + nodeClient k8sclient.NodeClient + podClient k8sclient.PodClient // the following can be set to mocks in testing broadcaster eventBroadcaster @@ -83,13 +96,11 @@ type k8sAPIServerOption func(*K8sAPIServer) // New creates a k8sApiServer which can generate cluster-level metrics func New(clusterNameProvider clusterNameProvider, logger *zap.Logger, options ...k8sAPIServerOption) (*K8sAPIServer, error) { - _, cancel := context.WithCancel(context.Background()) k := &K8sAPIServer{ logger: logger, clusterNameProvider: clusterNameProvider, k8sClient: k8sclient.Get(logger), broadcaster: record.NewBroadcaster(), - cancel: cancel, } for _, opt := range options { @@ -127,11 +138,10 @@ func (k *K8sAPIServer) GetMetrics() []pdata.Metrics { k.logger.Info("collect data from K8s API Server...") timestampNs := strconv.FormatInt(time.Now().UnixNano(), 10) - client := k.k8sClient fields := map[string]interface{}{ - "cluster_failed_node_count": client.Node.ClusterFailedNodeCount(), - "cluster_node_count": client.Node.ClusterNodeCount(), + "cluster_failed_node_count": k.nodeClient.ClusterFailedNodeCount(), + "cluster_node_count": k.nodeClient.ClusterNodeCount(), } attributes := map[string]string{ ci.ClusterNameKey: clusterName, @@ -142,10 +152,11 @@ func (k *K8sAPIServer) GetMetrics() []pdata.Metrics { if k.nodeName != "" { attributes["NodeName"] = k.nodeName } + attributes[ci.SourcesKey] = "[\"apiserver\"]" md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) result = append(result, md) - for service, podNum := range client.Ep.ServiceToPodNum() { + for service, podNum := range k.epClient.ServiceToPodNum() { fields := map[string]interface{}{ "service_number_of_running_pods": podNum, } @@ -160,11 +171,14 @@ func (k *K8sAPIServer) GetMetrics() []pdata.Metrics { if k.nodeName != "" { attributes["NodeName"] = k.nodeName } + attributes[ci.SourcesKey] = "[\"apiserver\"]" + attributes[ci.Kubernetes] = fmt.Sprintf("{\"namespace_name\":\"%s\",\"service_name\":\"%s\"}", + service.Namespace, service.ServiceName) md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) result = append(result, md) } - for namespace, podNum := range client.Pod.NamespaceToRunningPodNum() { + for namespace, podNum := range k.podClient.NamespaceToRunningPodNum() { fields := map[string]interface{}{ "namespace_number_of_running_pods": podNum, } @@ -178,6 +192,8 @@ func (k *K8sAPIServer) GetMetrics() []pdata.Metrics { if k.nodeName != "" { attributes["NodeName"] = k.nodeName } + attributes[ci.SourcesKey] = "[\"apiserver\"]" + attributes[ci.Kubernetes] = fmt.Sprintf("{\"namespace_name\":\"%s\"}", namespace) md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) result = append(result, md) } @@ -199,7 +215,8 @@ func (k *K8sAPIServer) init() error { return errors.New("environment variable K8S_NAMESPACE is not set in k8s deployment config") } - configMapInterface := k.k8sClient.ClientSet.CoreV1().ConfigMaps(lockNamespace) + clientSet := k.k8sClient.GetClientSet() + configMapInterface := clientSet.CoreV1().ConfigMaps(lockNamespace) if configMap, err := configMapInterface.Get(ctx, lockName, metav1.GetOptions{}); configMap == nil || err != nil { k.logger.Info(fmt.Sprintf("Cannot get the leader config map: %v, try to create the config map...", err)) configMap, err = configMapInterface.Create(ctx, @@ -215,8 +232,8 @@ func (k *K8sAPIServer) init() error { lock, err := resourcelock.New( resourcelock.ConfigMapsResourceLock, lockNamespace, lockName, - k.k8sClient.ClientSet.CoreV1(), - k.k8sClient.ClientSet.CoordinationV1(), + clientSet.CoreV1(), + clientSet.CoordinationV1(), resourcelock.ResourceLockConfig{ Identity: k.nodeName, EventRecorder: k.createRecorder(lockName, lockNamespace), @@ -258,6 +275,10 @@ func (k *K8sAPIServer) startLeaderElection(ctx context.Context, lock resourceloc // we're notified when we start k.mu.Lock() k.leading = true + // always retrieve clients in case previous ones shut down during leader switching + k.nodeClient = k.k8sClient.GetNodeClient() + k.podClient = k.k8sClient.GetPodClient() + k.epClient = k.k8sClient.GetEpClient() k.mu.Unlock() if k.isLeadingC != nil { @@ -288,8 +309,8 @@ func (k *K8sAPIServer) startLeaderElection(ctx context.Context, lock resourceloc defer k.mu.Unlock() k.leading = false //node and pod are only used for cluster level metrics, endpoint is used for decorator too. - k.k8sClient.Node.Shutdown() - k.k8sClient.Pod.Shutdown() + k.k8sClient.ShutdownNodeClient() + k.k8sClient.ShutdownPodClient() }, OnNewLeader: func(identity string) { k.logger.Info(fmt.Sprintf("k8sapiserver Switch New Leader: %s", identity)) @@ -308,7 +329,7 @@ func (k *K8sAPIServer) startLeaderElection(ctx context.Context, lock resourceloc func (k *K8sAPIServer) createRecorder(name, namespace string) record.EventRecorder { k.broadcaster.StartLogging(klog.Infof) - clientSet := k.k8sClient.ClientSet + clientSet := k.k8sClient.GetClientSet() k.broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: corev1.New(clientSet.CoreV1().RESTClient()).Events(namespace)}) return k.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: name}) } diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go index 9ba2429ad778..fe96cffcd3c2 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go @@ -27,6 +27,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/record" @@ -40,11 +41,31 @@ func NewService(name, namespace string) k8sclient.Service { var mockClient = new(MockClient) -var mockK8sClient = &k8sclient.K8sClient{ - Pod: mockClient, - Node: mockClient, - Ep: mockClient, - ClientSet: fake.NewSimpleClientset(), +type mockK8sClient struct { +} + +func (m *mockK8sClient) GetClientSet() kubernetes.Interface { + return fake.NewSimpleClientset() +} + +func (m *mockK8sClient) GetEpClient() k8sclient.EpClient { + return mockClient +} + +func (m *mockK8sClient) GetNodeClient() k8sclient.NodeClient { + return mockClient +} + +func (m *mockK8sClient) GetPodClient() k8sclient.PodClient { + return mockClient +} + +func (m *mockK8sClient) ShutdownNodeClient() { + +} + +func (m *mockK8sClient) ShutdownPodClient() { + } type MockClient struct { @@ -78,9 +99,6 @@ func (client *MockClient) ServiceToPodNum() map[k8sclient.Service]int { return args.Get(0).(map[k8sclient.Service]int) } -func (client *MockClient) Shutdown() { -} - type mockEventBroadcaster struct { } @@ -149,7 +167,7 @@ func TestK8sAPIServer_GetMetrics(t *testing.T) { hostName, err := os.Hostname() assert.NoError(t, err) k8sClientOption := func(k *K8sAPIServer) { - k.k8sClient = mockK8sClient + k.k8sClient = &mockK8sClient{} } leadingOption := func(k *K8sAPIServer) { k.leading = true diff --git a/receiver/awscontainerinsightreceiver/internal/stores/podstore.go b/receiver/awscontainerinsightreceiver/internal/stores/podstore.go index b9b59fee6b30..c6a4c9bf1ed3 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/podstore.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/podstore.go @@ -15,6 +15,7 @@ package stores import ( + "context" "errors" "fmt" "log" @@ -90,8 +91,8 @@ func newMapWithExpiry(ttl time.Duration) *mapWithExpiry { } } -type replicaSetInfo interface { - ReplicaSetToDeployment() map[string]string +type replicaSetInfoProvider interface { + GetReplicaSetClient() k8sclient.ReplicaSetClient } type podClient interface { @@ -102,7 +103,7 @@ type PodStore struct { cache *mapWithExpiry prevMeasurements map[string]*mapWithExpiry //preMeasurements per each Type (Pod, Container, etc) podClient podClient - replicasetInfo replicaSetInfo + k8sClient replicaSetInfoProvider lastRefreshed time.Time nodeInfo *nodeInfo prefFullPodName bool @@ -132,7 +133,8 @@ func NewPodStore(hostIP string, prefFullPodName bool, logger *zap.Logger) (*PodS podClient: podClient, nodeInfo: newNodeInfo(logger), prefFullPodName: prefFullPodName, - replicasetInfo: k8sClient.ReplicaSet, + k8sClient: k8sClient, + logger: logger, } return podStore, nil @@ -162,17 +164,22 @@ func (p *PodStore) setPrevMeasurement(metricType, metricKey string, content inte prevMeasurement.Set(metricKey, content) } -func (p *PodStore) RefreshTick() { +// RefreshTick triggers refreshing of the pod store. +// It will be called at relatively short intervals (e.g. 1 second). +// We can't do refresh in regular interval because the Decorate(...) function will +// call refresh(...) on demand when the pod metadata for the given metrics is not in +// cache yet. This will make the refresh interval irregular. +func (p *PodStore) RefreshTick(ctx context.Context) { now := time.Now() if now.Sub(p.lastRefreshed) >= refreshInterval { - p.refresh(now) + p.refresh(ctx, now) // call cleanup every refresh cycle p.cleanup(now) p.lastRefreshed = now } } -func (p *PodStore) Decorate(metric CIMetric, kubernetesBlob map[string]interface{}) bool { +func (p *PodStore) Decorate(ctx context.Context, metric CIMetric, kubernetesBlob map[string]interface{}) bool { if metric.GetTag(ci.MetricType) == ci.TypeNode { p.decorateNode(metric) } else if metric.GetTag(ci.K8sPodNameKey) != "" { @@ -185,7 +192,7 @@ func (p *PodStore) Decorate(metric CIMetric, kubernetesBlob map[string]interface entry := p.getCachedEntry(podKey) if entry == nil { p.logger.Debug(fmt.Sprintf("no pod is found for %s, refresh the cache now...", podKey)) - p.refresh(time.Now()) + p.refresh(ctx, time.Now()) entry = p.getCachedEntry(podKey) } @@ -228,8 +235,16 @@ func (p *PodStore) setCachedEntry(podKey string, entry *cachedEntry) { p.cache.Set(podKey, entry) } -func (p *PodStore) refresh(now time.Time) { - podList, _ := p.podClient.ListPods() +func (p *PodStore) refresh(ctx context.Context, now time.Time) { + var podList []corev1.Pod + var err error + doRefresh := func() { + podList, err = p.podClient.ListPods() + if err != nil { + p.logger.Error("fail to get pod from kubelet", zap.Error(err)) + } + } + refreshWithTimeout(ctx, doRefresh, refreshInterval) p.refreshInternal(now, podList) } @@ -374,7 +389,7 @@ func (p *PodStore) decorateMem(metric CIMetric, pod *corev1.Pod) { // only set podLimit when all the containers has limit if ok && podMemLimit != 0 { metric.AddField(ci.MetricName(ci.TypePod, ci.MemLimit), podMemLimit) - metric.AddField(ci.MetricName(ci.TypePod, ci.MemUtilizationOverPodLimit), podMemWorkingset.(float64)/float64(podMemLimit)*100) + metric.AddField(ci.MetricName(ci.TypePod, ci.MemUtilizationOverPodLimit), float64(podMemWorkingset.(uint64))/float64(podMemLimit)*100) } } } else if metric.GetTag(ci.MetricType) == ci.TypeContainer { @@ -555,7 +570,8 @@ func (p *PodStore) addPodOwnersAndPodName(metric CIMetric, pod *corev1.Pod, kube kind := owner.Kind name := owner.Name if owner.Kind == ci.ReplicaSet { - rsToDeployment := p.replicasetInfo.ReplicaSetToDeployment() + replicaSetClient := p.k8sClient.GetReplicaSetClient() + rsToDeployment := replicaSetClient.ReplicaSetToDeployment() if parent := rsToDeployment[owner.Name]; parent != "" { kind = ci.Deployment name = parent diff --git a/receiver/awscontainerinsightreceiver/internal/stores/podstore_test.go b/receiver/awscontainerinsightreceiver/internal/stores/podstore_test.go index 208396b71d6f..f3bfcb06aeb2 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/podstore_test.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/podstore_test.go @@ -15,6 +15,7 @@ package stores import ( + "context" "encoding/json" "fmt" "testing" @@ -25,6 +26,7 @@ import ( corev1 "k8s.io/api/core/v1" ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient" ) func getBaseTestPodInfo() *corev1.Pod { @@ -247,7 +249,7 @@ func TestPodStore_decorateMem(t *testing.T) { pod := getBaseTestPodInfo() tags := map[string]string{ci.MetricType: ci.TypePod} - fields := map[string]interface{}{ci.MetricName(ci.TypePod, ci.MemWorkingset): float64(10 * 1024 * 1024)} + fields := map[string]interface{}{ci.MetricName(ci.TypePod, ci.MemWorkingset): uint64(10 * 1024 * 1024)} metric := generateMetric(fields, tags) podStore.decorateMem(metric, pod) @@ -256,7 +258,7 @@ func TestPodStore_decorateMem(t *testing.T) { assert.Equal(t, uint64(52428800), metric.GetField("pod_memory_limit").(uint64)) assert.Equal(t, float64(12.5), metric.GetField("pod_memory_reserved_capacity").(float64)) assert.Equal(t, float64(20), metric.GetField("pod_memory_utilization_over_pod_limit").(float64)) - assert.Equal(t, float64(10*1024*1024), metric.GetField("pod_memory_working_set").(float64)) + assert.Equal(t, uint64(10*1024*1024), metric.GetField("pod_memory_working_set").(uint64)) tags = map[string]string{ci.MetricType: ci.TypeContainer, ci.ContainerNamekey: "ubuntu"} fields = map[string]interface{}{ci.MetricName(ci.TypeContainer, ci.MemWorkingset): float64(10 * 1024 * 1024)} @@ -397,14 +399,26 @@ func (m *mockReplicaSetInfo1) ReplicaSetToDeployment() map[string]string { return map[string]string{} } +type mockK8sClient1 struct{} + +func (m *mockK8sClient1) GetReplicaSetClient() k8sclient.ReplicaSetClient { + return &mockReplicaSetInfo1{} +} + type mockReplicaSetInfo2 struct{} func (m *mockReplicaSetInfo2) ReplicaSetToDeployment() map[string]string { return map[string]string{"DeploymentTest-sftrz2785": "DeploymentTest"} } +type mockK8sClient2 struct{} + +func (m *mockK8sClient2) GetReplicaSetClient() k8sclient.ReplicaSetClient { + return &mockReplicaSetInfo2{} +} + func TestPodStore_addPodOwnersAndPodNameFallback(t *testing.T) { - podStore := &PodStore{replicasetInfo: &mockReplicaSetInfo1{}} + podStore := &PodStore{k8sClient: &mockK8sClient1{}} pod := getBaseTestPodInfo() tags := map[string]string{ci.MetricType: ci.TypePod, ci.ContainerNamekey: "ubuntu"} fields := map[string]interface{}{ci.MetricName(ci.TypePod, ci.CPUTotal): float64(1)} @@ -438,7 +452,7 @@ func TestPodStore_addPodOwnersAndPodNameFallback(t *testing.T) { } func TestPodStore_addPodOwnersAndPodName(t *testing.T) { - podStore := &PodStore{replicasetInfo: &mockReplicaSetInfo2{}} + podStore := &PodStore{k8sClient: &mockK8sClient2{}} pod := getBaseTestPodInfo() tags := map[string]string{ci.MetricType: ci.TypePod, ci.ContainerNamekey: "ubuntu"} @@ -571,7 +585,7 @@ func TestPodStore_RefreshTick(t *testing.T) { podStore := getPodStore() podStore.podClient = &mockPodClient{} podStore.lastRefreshed = time.Now().Add(-time.Minute) - podStore.RefreshTick() + podStore.RefreshTick(context.Background()) assert.Equal(t, uint64(10), podStore.nodeInfo.nodeStats.cpuReq) assert.Equal(t, uint64(50*1024*1024), podStore.nodeInfo.nodeStats.memReq) @@ -622,7 +636,8 @@ func TestPodStore_Decorate(t *testing.T) { podStore := getPodStore() podStore.podClient = &mockPodClient{} kubernetesBlob := map[string]interface{}{} - ok := podStore.Decorate(metric, kubernetesBlob) + ctx := context.Background() + ok := podStore.Decorate(ctx, metric, kubernetesBlob) assert.True(t, ok) // metric with no namespace @@ -637,7 +652,7 @@ func TestPodStore_Decorate(t *testing.T) { metric = &mockCIMetric{ tags: tags, } - ok = podStore.Decorate(metric, kubernetesBlob) + ok = podStore.Decorate(ctx, metric, kubernetesBlob) assert.False(t, ok) // metric with pod info not in cache @@ -652,10 +667,10 @@ func TestPodStore_Decorate(t *testing.T) { metric = &mockCIMetric{ tags: tags, } - ok = podStore.Decorate(metric, kubernetesBlob) + ok = podStore.Decorate(ctx, metric, kubernetesBlob) assert.False(t, ok) // decorate the same metric with a placeholder item in cache - ok = podStore.Decorate(metric, kubernetesBlob) + ok = podStore.Decorate(ctx, metric, kubernetesBlob) assert.False(t, ok) } diff --git a/receiver/awscontainerinsightreceiver/internal/stores/servicestore.go b/receiver/awscontainerinsightreceiver/internal/stores/servicestore.go index 6feb62411361..c4574f547376 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/servicestore.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/servicestore.go @@ -15,6 +15,7 @@ package stores import ( + "context" "errors" "time" @@ -25,7 +26,7 @@ import ( ) const ( - refreshIntervalService = 10 //10s + refreshIntervalService = 10 * time.Second ) type endpointInfo interface { @@ -48,21 +49,21 @@ func NewServiceStore(logger *zap.Logger) (*ServiceStore, error) { if k8sClient == nil { return nil, errors.New("failed to start service store because k8sclient is nil") } - s.endpointInfo = k8sClient.Ep + s.endpointInfo = k8sClient.GetEpClient() return s, nil } -func (s *ServiceStore) RefreshTick() { +func (s *ServiceStore) RefreshTick(ctx context.Context) { now := time.Now() - if now.Sub(s.lastRefreshed).Seconds() >= refreshIntervalService { - s.refresh() + if now.Sub(s.lastRefreshed) >= refreshIntervalService { + s.refresh(ctx) s.lastRefreshed = now } } // Decorate decorates metrics and update kubernetesBlob // service info is not mandatory -func (s *ServiceStore) Decorate(metric CIMetric, _ map[string]interface{}) bool { +func (s *ServiceStore) Decorate(ctx context.Context, metric CIMetric, _ map[string]interface{}) bool { if metric.HasTag(ci.K8sPodNameKey) { podKey := createPodKeyFromMetric(metric) if podKey == "" { @@ -79,9 +80,13 @@ func (s *ServiceStore) Decorate(metric CIMetric, _ map[string]interface{}) bool return true } -func (s *ServiceStore) refresh() { - s.podKeyToServiceNamesMap = s.endpointInfo.PodKeyToServiceNames() - s.logger.Debug("pod to service name map", zap.Any("podKeyToServiceNamesMap", s.podKeyToServiceNamesMap)) +func (s *ServiceStore) refresh(ctx context.Context) { + doRefresh := func() { + s.podKeyToServiceNamesMap = s.endpointInfo.PodKeyToServiceNames() + s.logger.Debug("pod to service name map", zap.Any("podKeyToServiceNamesMap", s.podKeyToServiceNamesMap)) + } + + refreshWithTimeout(ctx, doRefresh, refreshIntervalService) } func addServiceNameTag(metric CIMetric, serviceNames []string) { diff --git a/receiver/awscontainerinsightreceiver/internal/stores/servicestore_test.go b/receiver/awscontainerinsightreceiver/internal/stores/servicestore_test.go index ec4fbc8cb6d0..594d924c5692 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/servicestore_test.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/servicestore_test.go @@ -15,6 +15,7 @@ package stores import ( + "context" "testing" "time" @@ -40,8 +41,9 @@ func TestServiceStore(t *testing.T) { endpointInfo: &mockEndpoint{}, } + ctx := context.Background() s.lastRefreshed = time.Now().Add(-20 * time.Second) - s.RefreshTick() + s.RefreshTick(ctx) // test the case when it decorates metrics successfully metric := &mockCIMetric{ @@ -51,7 +53,7 @@ func TestServiceStore(t *testing.T) { }, } kubernetesBlob := map[string]interface{}{} - ok := s.Decorate(metric, kubernetesBlob) + ok := s.Decorate(ctx, metric, kubernetesBlob) assert.True(t, ok) assert.Equal(t, "test-service", metric.GetTag(ci.TypeService)) @@ -62,7 +64,7 @@ func TestServiceStore(t *testing.T) { }, } kubernetesBlob = map[string]interface{}{} - ok = s.Decorate(metric, kubernetesBlob) + ok = s.Decorate(ctx, metric, kubernetesBlob) assert.False(t, ok) assert.Equal(t, "", metric.GetTag(ci.TypeService)) } diff --git a/receiver/awscontainerinsightreceiver/internal/stores/store.go b/receiver/awscontainerinsightreceiver/internal/stores/store.go index b7ebbb85b94b..5fc1086a3f83 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/store.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/store.go @@ -14,6 +14,20 @@ package stores +import ( + "context" + "errors" + "os" + "time" + + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" +) + +var _ cadvisor.Decorator = &K8sDecorator{} + // CIMetric represents the raw metric interface for container insights type CIMetric interface { HasField(key string) bool @@ -26,8 +40,71 @@ type CIMetric interface { } type K8sStore interface { - Decorate(metric CIMetric, kubernetesBlob map[string]interface{}) bool - RefreshTick() + Decorate(ctx context.Context, metric CIMetric, kubernetesBlob map[string]interface{}) bool + RefreshTick(ctx context.Context) } -// TODO: add code to initialize pod and service store and provide api for decorating metrics +type K8sDecorator struct { + stores []K8sStore + // We save ctx in the struct because it is used in Decorate(...) function when calling K8sStore.Decorate(...) + // It would be easier to keep the ctx here than passing it as a parameter for Decorate(...) function. + // The K8sStore (e.g. podstore) does network request in Decorate function, thus needs to take a context + // object for canceling the request + ctx context.Context +} + +func NewK8sDecorator(ctx context.Context, tagService bool, prefFullPodName bool, logger *zap.Logger) (*K8sDecorator, error) { + hostIP := os.Getenv("HOST_IP") + if hostIP == "" { + return nil, errors.New("environment variable HOST_IP is not set in k8s deployment config") + } + + k := &K8sDecorator{ + ctx: ctx, + } + + podstore, err := NewPodStore(hostIP, prefFullPodName, logger) + if err != nil { + return nil, err + } + k.stores = append(k.stores, podstore) + + if tagService { + servicestore, err := NewServiceStore(logger) + if err != nil { + return nil, err + } + k.stores = append(k.stores, servicestore) + } + + go func() { + refreshTicker := time.NewTicker(time.Second) + for { + select { + case <-refreshTicker.C: + for _, store := range k.stores { + store.RefreshTick(k.ctx) + } + case <-k.ctx.Done(): + refreshTicker.Stop() + return + } + } + }() + + return k, nil +} + +func (k *K8sDecorator) Decorate(metric *extractors.CAdvisorMetric) *extractors.CAdvisorMetric { + kubernetesBlob := map[string]interface{}{} + for _, store := range k.stores { + ok := store.Decorate(k.ctx, metric, kubernetesBlob) + if !ok { + return nil + } + } + + AddKubernetesInfo(metric, kubernetesBlob) + TagMetricSource(metric) + return metric +} diff --git a/receiver/awscontainerinsightreceiver/internal/stores/utils.go b/receiver/awscontainerinsightreceiver/internal/stores/utils.go index 4beb9b72d62c..b40936670237 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/utils.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/utils.go @@ -15,8 +15,10 @@ package stores import ( + "context" "encoding/json" "strings" + "time" corev1 "k8s.io/api/core/v1" @@ -130,8 +132,6 @@ func TagMetricSource(metric CIMetric) { sources = append(sources, []string{"cadvisor", "calculated"}...) case ci.TypeContainerDiskIO: sources = append(sources, []string{"cadvisor"}...) - case ci.TypeCluster, ci.TypeClusterService, ci.TypeClusterNamespace: - sources = append(sources, []string{"apiserver"}...) } if len(sources) > 0 { @@ -169,3 +169,15 @@ func AddKubernetesInfo(metric CIMetric, kubernetesBlob map[string]interface{}) { metric.AddTag(ci.Kubernetes, string(kubernetesInfo)) } } + +func refreshWithTimeout(parentContext context.Context, refresh func(), timeout time.Duration) { + ctx, cancel := context.WithTimeout(parentContext, timeout) + // spawn a goroutine to process the actual refresh + go func(cancel func()) { + refresh() + cancel() + }(cancel) + // block until either refresh() has executed or the timeout expires + <-ctx.Done() + cancel() +} diff --git a/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go b/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go index 3fc3e2d606b0..c5fb3ccd031b 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go @@ -110,9 +110,6 @@ func TestUtils_TagMetricSource(t *testing.T) { ci.TypeContainer, ci.TypeContainerFS, ci.TypeContainerDiskIO, - ci.TypeCluster, - ci.TypeClusterService, - ci.TypeClusterNamespace, } expectedSources := []string{ @@ -126,9 +123,6 @@ func TestUtils_TagMetricSource(t *testing.T) { "[\"cadvisor\",\"pod\",\"calculated\"]", "[\"cadvisor\",\"calculated\"]", "[\"cadvisor\"]", - "[\"apiserver\"]", - "[\"apiserver\"]", - "[\"apiserver\"]", } for i, mtype := range types { tags := map[string]string{ diff --git a/receiver/awscontainerinsightreceiver/receiver.go b/receiver/awscontainerinsightreceiver/receiver.go index b1bb611eae76..e65230608925 100644 --- a/receiver/awscontainerinsightreceiver/receiver.go +++ b/receiver/awscontainerinsightreceiver/receiver.go @@ -22,12 +22,12 @@ import ( "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/obsreport" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor" hostInfo "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8sapiserver" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores" ) var _ component.MetricsReceiver = (*awsContainerInsightReceiver)(nil) @@ -65,15 +65,37 @@ func New( // Start collecting metrics from cadvisor and k8s api server (if it is an elected leader) func (acir *awsContainerInsightReceiver) Start(ctx context.Context, host component.Host) error { - ctx, acir.cancel = context.WithCancel(obsreport.ReceiverContext(ctx, acir.config.ID(), "http")) - //ignore the error for now, will address it in later PR - machineInfo, _ := hostInfo.NewInfo(acir.config.CollectionInterval, acir.logger) - acir.cadvisor, _ = cadvisor.New(acir.config.ContainerOrchestrator, machineInfo, acir.logger) - acir.k8sapiserver, _ = k8sapiserver.New(machineInfo, acir.logger) + ctx, acir.cancel = context.WithCancel(context.Background()) - // TODO: add more intialization code + hostinfo, err := hostInfo.NewInfo(acir.config.CollectionInterval, acir.logger) + if err != nil { + return err + } + + k8sDecorator, err := stores.NewK8sDecorator(ctx, acir.config.TagService, acir.config.PrefFullPodName, acir.logger) + if err != nil { + return err + } + + decoratorOption := cadvisor.WithDecorator(k8sDecorator) + acir.cadvisor, err = cadvisor.New(acir.config.ContainerOrchestrator, hostinfo, acir.logger, decoratorOption) + if err != nil { + return err + } + + acir.k8sapiserver, err = k8sapiserver.New(hostinfo, acir.logger) + if err != nil { + return err + } go func() { + //cadvisor collects data at dynamical intervals (from 1 to 15 seconds). If the ticker happens + //at beginning of a minute, it might read the data collected at end of last minute. To avoid this, + //we want to wait until at least two cadvisor collection intervals happens before collecting the metrics + secondsInMin := time.Now().Second() + if secondsInMin < 30 { + time.Sleep(time.Duration(30-secondsInMin) * time.Second) + } ticker := time.NewTicker(acir.config.CollectionInterval) defer ticker.Stop() diff --git a/receiver/awscontainerinsightreceiver/receiver_test.go b/receiver/awscontainerinsightreceiver/receiver_test.go index a850477b1ea0..cf3483e76668 100644 --- a/receiver/awscontainerinsightreceiver/receiver_test.go +++ b/receiver/awscontainerinsightreceiver/receiver_test.go @@ -58,7 +58,7 @@ func TestReceiver(t *testing.T) { ctx := context.Background() err = r.Start(ctx, componenttest.NewNopHost()) - require.NoError(t, err) + require.Error(t, err) err = r.Shutdown(ctx) require.NoError(t, err)