Skip to content

Commit

Permalink
Integrate components and fix bugs for EKS Container Insights (#3846)
Browse files Browse the repository at this point in the history
* clean up code

* set region in aws session for ec2 tags

* add decoration with host info

* add config for metric decoration

* add decoration with k8sdecorator

* initialize k8s clients on demand

* collect metric in second half of a minute to avoid jitter issue

* add kubernetes and sources attibute to cluster-level metrics

* address comments

* accept decorator as an option for cadvisor

* validate implementation of interface defined in cadvisor
  • Loading branch information
pxaws authored Jun 24, 2021
1 parent 47087da commit 31211aa
Show file tree
Hide file tree
Showing 33 changed files with 622 additions and 153 deletions.
194 changes: 154 additions & 40 deletions internal/aws/k8s/k8sclient/clientset.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,19 @@ var mu = &sync.Mutex{}

var optionsToK8sClient = map[string]*K8sClient{}

type stopper interface {
shutdown()
}

func shutdownClient(client stopper, mu *sync.Mutex, afterShutdown func()) {
mu.Lock()
if client != nil {
client.shutdown()
afterShutdown()
}
mu.Unlock()
}

type cacheReflector interface {
LastSyncResourceVersion() string
Run(<-chan struct{})
Expand Down Expand Up @@ -132,19 +145,54 @@ func Get(logger *zap.Logger, options ...Option) *K8sClient {
return optionsToK8sClient[strOptions]
}

type epClientWithStopper interface {
EpClient
stopper
}

type jobClientWithStopper interface {
JobClient
stopper
}

type nodeClientWithStopper interface {
NodeClient
stopper
}

type podClientWithStopper interface {
PodClient
stopper
}

type replicaSetClientWithStopper interface {
ReplicaSetClient
stopper
}

type K8sClient struct {
kubeConfigPath string
initSyncPollInterval time.Duration
initSyncPollTimeout time.Duration

ClientSet kubernetes.Interface
clientSet kubernetes.Interface

syncChecker *reflectorSyncChecker

epMu sync.Mutex
ep epClientWithStopper

Ep EpClient
Pod PodClient
Node NodeClient
podMu sync.Mutex
pod podClientWithStopper

Job JobClient
ReplicaSet ReplicaSetClient
nodeMu sync.Mutex
node nodeClientWithStopper

jobMu sync.Mutex
job jobClientWithStopper

rsMu sync.Mutex
replicaSet replicaSetClientWithStopper

logger *zap.Logger
}
Expand Down Expand Up @@ -177,55 +225,121 @@ func (c *K8sClient) init(logger *zap.Logger, options ...Option) error {
return err
}

syncChecker := &reflectorSyncChecker{
c.syncChecker = &reflectorSyncChecker{
pollInterval: c.initSyncPollInterval,
pollTimeout: c.initSyncPollTimeout,
logger: c.logger,
}

c.ClientSet = client
c.Ep = newEpClient(client, c.logger, epSyncCheckerOption(syncChecker))
c.Pod = newPodClient(client, c.logger, podSyncCheckerOption(syncChecker))
c.Node = newNodeClient(client, c.logger, nodeSyncCheckerOption(syncChecker))
c.clientSet = client
c.ep = nil
c.pod = nil
c.node = nil
c.job = nil
c.replicaSet = nil

c.Job, err = newJobClient(client, c.logger, jobSyncCheckerOption(syncChecker))
if err != nil {
c.logger.Error("use an no-op job client instead because of error", zap.Error(err))
c.Job = &noOpJobClient{}
return nil
}

func (c *K8sClient) GetEpClient() EpClient {
c.epMu.Lock()
if c.ep == nil {
c.ep = newEpClient(c.clientSet, c.logger, epSyncCheckerOption(c.syncChecker))
}
c.ReplicaSet, err = newReplicaSetClient(client, c.logger, replicaSetSyncCheckerOption(syncChecker))
if err != nil {
c.logger.Error("use an no-op replica set client instead because of error", zap.Error(err))
c.ReplicaSet = &noOpReplicaSetClient{}
c.epMu.Unlock()
return c.ep
}

func (c *K8sClient) ShutdownEpClient() {
shutdownClient(c.ep, &c.epMu, func() {
c.ep = nil
})
}

func (c *K8sClient) GetPodClient() PodClient {
c.podMu.Lock()
if c.pod == nil {
c.pod = newPodClient(c.clientSet, c.logger, podSyncCheckerOption(c.syncChecker))
}
c.podMu.Unlock()
return c.pod
}

return nil
func (c *K8sClient) ShutdownPodClient() {
shutdownClient(c.pod, &c.podMu, func() {
c.pod = nil
})
}

func (c *K8sClient) GetNodeClient() NodeClient {
c.nodeMu.Lock()
if c.node == nil {
c.node = newNodeClient(c.clientSet, c.logger, nodeSyncCheckerOption(c.syncChecker))
}
c.nodeMu.Unlock()
return c.node
}

func (c *K8sClient) ShutdownNodeClient() {
shutdownClient(c.node, &c.nodeMu, func() {
c.node = nil
})
}

func (c *K8sClient) GetJobClient() JobClient {
var err error
c.jobMu.Lock()
if c.job == nil {
c.job, err = newJobClient(c.clientSet, c.logger, jobSyncCheckerOption(c.syncChecker))
if err != nil {
c.logger.Error("use an no-op job client instead because of error", zap.Error(err))
c.job = &noOpJobClient{}
}
}
c.jobMu.Unlock()
return c.job
}

func (c *K8sClient) ShutdownJobClient() {
shutdownClient(c.job, &c.jobMu, func() {
c.job = nil
})
}

func (c *K8sClient) GetReplicaSetClient() ReplicaSetClient {
var err error
c.rsMu.Lock()
if c.replicaSet == nil || reflect.ValueOf(c.replicaSet).IsNil() {
c.replicaSet, err = newReplicaSetClient(c.clientSet, c.logger, replicaSetSyncCheckerOption(c.syncChecker))
if err != nil {
c.logger.Error("use an no-op replica set client instead because of error", zap.Error(err))
c.replicaSet = &noOpReplicaSetClient{}
}
}
c.rsMu.Unlock()
return c.replicaSet
}

func (c *K8sClient) ShutdownReplicaSetClient() {
shutdownClient(c.replicaSet, &c.rsMu, func() {
c.replicaSet = nil
})
}

func (c *K8sClient) GetClientSet() kubernetes.Interface {
return c.clientSet
}

// Shutdown stops K8sClient
func (c *K8sClient) Shutdown() {
mu.Lock()
defer mu.Unlock()

if c.Ep != nil && !reflect.ValueOf(c.Ep).IsNil() {
c.Ep.shutdown()
c.Ep = nil
}
if c.Pod != nil && !reflect.ValueOf(c.Pod).IsNil() {
c.Pod.Shutdown()
c.Pod = nil
}
if c.Node != nil && !reflect.ValueOf(c.Node).IsNil() {
c.Node.Shutdown()
c.Node = nil
}
if c.Job != nil && !reflect.ValueOf(c.Job).IsNil() {
c.Job.shutdown()
c.Job = nil
}
if c.ReplicaSet != nil && !reflect.ValueOf(c.ReplicaSet).IsNil() {
c.ReplicaSet.shutdown()
c.ReplicaSet = nil
}
c.ShutdownEpClient()
c.ShutdownPodClient()
c.ShutdownNodeClient()
c.ShutdownJobClient()
c.ShutdownReplicaSetClient()

// remove the current instance of k8s client from map
for key, val := range optionsToK8sClient {
Expand Down
16 changes: 11 additions & 5 deletions internal/aws/k8s/k8sclient/clientset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,18 @@ func TestGetShutdown(t *testing.T) {
InitSyncPollTimeout(20*time.Nanosecond),
)
assert.Equal(t, 1, len(optionsToK8sClient))
assert.NotNil(t, k8sClient.GetClientSet())
assert.NotNil(t, k8sClient.GetEpClient())
assert.NotNil(t, k8sClient.GetJobClient())
assert.NotNil(t, k8sClient.GetNodeClient())
assert.NotNil(t, k8sClient.GetPodClient())
assert.NotNil(t, k8sClient.GetReplicaSetClient())
k8sClient.Shutdown()
assert.Nil(t, k8sClient.Ep)
assert.Nil(t, k8sClient.Job)
assert.Nil(t, k8sClient.Node)
assert.Nil(t, k8sClient.Pod)
assert.Nil(t, k8sClient.ReplicaSet)
assert.Nil(t, k8sClient.ep)
assert.Nil(t, k8sClient.job)
assert.Nil(t, k8sClient.node)
assert.Nil(t, k8sClient.pod)
assert.Nil(t, k8sClient.replicaSet)
assert.Equal(t, 0, len(optionsToK8sClient))
removeTempKubeConfig()
}
3 changes: 0 additions & 3 deletions internal/aws/k8s/k8sclient/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ type EpClient interface {
PodKeyToServiceNames() map[string][]string
// Get the mapping between the service and the number of belonging pods
ServiceToPodNum() map[Service]int

// shutdown is only used internally by clientset to stop the EpClient
shutdown()
}

type epClientOption func(*epClient)
Expand Down
3 changes: 0 additions & 3 deletions internal/aws/k8s/k8sclient/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ const (
type JobClient interface {
// get the mapping between job and cronjob
JobToCronJob() map[string]string

// shutdown is only used internally by clientset to stop the JobClient
shutdown()
}

type noOpJobClient struct {
Expand Down
4 changes: 1 addition & 3 deletions internal/aws/k8s/k8sclient/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ type NodeClient interface {
ClusterFailedNodeCount() int
// Get the number of nodes for current cluster
ClusterNodeCount() int
// Shutdown stops the NodeClient
Shutdown()
}

type nodeClientOption func(*nodeClient)
Expand Down Expand Up @@ -142,7 +140,7 @@ func newNodeClient(clientSet kubernetes.Interface, logger *zap.Logger, options .
return c
}

func (c *nodeClient) Shutdown() {
func (c *nodeClient) shutdown() {
c.mu.Lock()
defer c.mu.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion internal/aws/k8s/k8sclient/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func TestNodeClient(t *testing.T) {

assert.Equal(t, clusterNodeCount, expectedClusterNodeCount)
assert.Equal(t, clusterFailedNodeCount, expectedClusterFailedNodeCount)
client.Shutdown()
client.shutdown()
assert.True(t, client.stopped)
}

Expand Down
4 changes: 1 addition & 3 deletions internal/aws/k8s/k8sclient/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ import (
type PodClient interface {
// Get the mapping between the namespace and the number of belonging pods
NamespaceToRunningPodNum() map[string]int
// Shutdown stops the PodClient
Shutdown()
}

type podClientOption func(*podClient)
Expand Down Expand Up @@ -106,7 +104,7 @@ func newPodClient(clientSet kubernetes.Interface, logger *zap.Logger, options ..
return c
}

func (c *podClient) Shutdown() {
func (c *podClient) shutdown() {
c.mu.Lock()
defer c.mu.Unlock()
close(c.stopChan)
Expand Down
2 changes: 1 addition & 1 deletion internal/aws/k8s/k8sclient/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func TestPodClient_NamespaceToRunningPodNum(t *testing.T) {
resultMap := client.NamespaceToRunningPodNum()
log.Printf("NamespaceToRunningPodNum (len=%v): %v", len(resultMap), awsutil.Prettify(resultMap))
assert.True(t, reflect.DeepEqual(resultMap, expectedMap))
client.Shutdown()
client.shutdown()
assert.True(t, client.stopped)
}

Expand Down
3 changes: 0 additions & 3 deletions internal/aws/k8s/k8sclient/replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ const (
type ReplicaSetClient interface {
// Get the mapping between replica set and deployment
ReplicaSetToDeployment() map[string]string

// shutdown is only used internally by clientset to stop the ReplicaSetClient
shutdown()
}

type noOpReplicaSetClient struct {
Expand Down
8 changes: 8 additions & 0 deletions receiver/awscontainerinsightreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,12 @@ type Config struct {

// ContainerOrchestrator is the type of container orchestration service, e.g. eks or ecs. The default is eks.
ContainerOrchestrator string `mapstructure:"container_orchestrator"`

// Whether to add the associated service name as attribute. The default is true
TagService bool `mapstructure:"add_service_as_attribute"`

// The "PodName" attribute is set based on the name of the relevant controllers like Daemonset, Job, ReplicaSet, ReplicationController, ...
// If it can not be set that way and PrefFullPodName is true, the "PodName" attribute is set to the pod's own name.
// The default value is false
PrefFullPodName bool `mapstructure:"prefer_full_pod_name"`
}
2 changes: 2 additions & 0 deletions receiver/awscontainerinsightreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,7 @@ func TestLoadConfig(t *testing.T) {
ReceiverSettings: config.NewReceiverSettings(config.NewIDWithName(typeStr, "collection_interval_settings")),
CollectionInterval: 60 * time.Second,
ContainerOrchestrator: "eks",
TagService: true,
PrefFullPodName: false,
})
}
8 changes: 8 additions & 0 deletions receiver/awscontainerinsightreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ const (

// Default container orchestrator service is aws eks
defaultContainerOrchestrator = "eks"

// Metrics is tagged with service name by default
defaultTagService = true

// Don't use pod full name by default (as the full names contain suffix with random characters)
defaultPrefFullPodName = false
)

// NewFactory creates a factory for AWS container insight receiver
Expand All @@ -50,6 +56,8 @@ func createDefaultConfig() config.Receiver {
ReceiverSettings: config.NewReceiverSettings(config.NewID(typeStr)),
CollectionInterval: defaultCollectionInterval,
ContainerOrchestrator: defaultContainerOrchestrator,
TagService: defaultTagService,
PrefFullPodName: defaultPrefFullPodName,
}
}

Expand Down
Loading

0 comments on commit 31211aa

Please sign in to comment.