From f6bfdc8e6850c4337bdc29a224e4ee976ecd0ea6 Mon Sep 17 00:00:00 2001 From: Ping Xiang Date: Tue, 1 Jun 2021 17:03:50 +0000 Subject: [PATCH 1/4] add cluster-level metrics to container insight receiver --- internal/aws/k8s/k8sclient/clientset.go | 4 +- internal/aws/k8s/k8sclient/node.go | 7 +- internal/aws/k8s/k8sclient/node_test.go | 2 +- internal/aws/k8s/k8sclient/pod.go | 7 +- internal/aws/k8s/k8sclient/pod_test.go | 2 +- .../awscontainerinsightreceiver/factory.go | 14 +- receiver/awscontainerinsightreceiver/go.mod | 10 + receiver/awscontainerinsightreceiver/go.sum | 27 ++ .../internal/k8sapiserver/k8sapiserver.go | 236 +++++++++++++++++- .../k8sapiserver/k8sapiserver_test.go | 221 +++++++++++++++- .../awscontainerinsightreceiver/receiver.go | 15 +- .../receiver_test.go | 37 ++- 12 files changed, 512 insertions(+), 70 deletions(-) diff --git a/internal/aws/k8s/k8sclient/clientset.go b/internal/aws/k8s/k8sclient/clientset.go index 49d1bc5a9079..046701e23bc7 100644 --- a/internal/aws/k8s/k8sclient/clientset.go +++ b/internal/aws/k8s/k8sclient/clientset.go @@ -211,11 +211,11 @@ func (c *K8sClient) Shutdown() { c.Ep = nil } if c.Pod != nil && !reflect.ValueOf(c.Pod).IsNil() { - c.Pod.shutdown() + c.Pod.Shutdown() c.Pod = nil } if c.Node != nil && !reflect.ValueOf(c.Node).IsNil() { - c.Node.shutdown() + c.Node.Shutdown() c.Node = nil } if c.Job != nil && !reflect.ValueOf(c.Job).IsNil() { diff --git a/internal/aws/k8s/k8sclient/node.go b/internal/aws/k8s/k8sclient/node.go index 5c9ce2552082..1349a259d3e1 100644 --- a/internal/aws/k8s/k8sclient/node.go +++ b/internal/aws/k8s/k8sclient/node.go @@ -41,9 +41,8 @@ type NodeClient interface { ClusterFailedNodeCount() int // Get the number of nodes for current cluster ClusterNodeCount() int - - // shutdown is only used internally by clientset to stop the NodeClient - shutdown() + // Shutdown stops the NodeClient + Shutdown() } type nodeClientOption func(*nodeClient) @@ -143,7 +142,7 @@ func newNodeClient(clientSet kubernetes.Interface, logger *zap.Logger, options . return c } -func (c *nodeClient) shutdown() { +func (c *nodeClient) Shutdown() { c.mu.Lock() defer c.mu.Unlock() diff --git a/internal/aws/k8s/k8sclient/node_test.go b/internal/aws/k8s/k8sclient/node_test.go index 2d669fed27bd..c5afb974ae90 100644 --- a/internal/aws/k8s/k8sclient/node_test.go +++ b/internal/aws/k8s/k8sclient/node_test.go @@ -317,7 +317,7 @@ func TestNodeClient(t *testing.T) { assert.Equal(t, clusterNodeCount, expectedClusterNodeCount) assert.Equal(t, clusterFailedNodeCount, expectedClusterFailedNodeCount) - client.shutdown() + client.Shutdown() assert.True(t, client.stopped) } diff --git a/internal/aws/k8s/k8sclient/pod.go b/internal/aws/k8s/k8sclient/pod.go index 1f6e1c92839e..10092671064a 100644 --- a/internal/aws/k8s/k8sclient/pod.go +++ b/internal/aws/k8s/k8sclient/pod.go @@ -31,9 +31,8 @@ import ( type PodClient interface { // Get the mapping between the namespace and the number of belonging pods NamespaceToRunningPodNum() map[string]int - - // shutdown is only used internally by clientset to stop the PodClient - shutdown() + // Shutdown stops the PodClient + Shutdown() } type podClientOption func(*podClient) @@ -107,7 +106,7 @@ func newPodClient(clientSet kubernetes.Interface, logger *zap.Logger, options .. return c } -func (c *podClient) shutdown() { +func (c *podClient) Shutdown() { c.mu.Lock() defer c.mu.Unlock() close(c.stopChan) diff --git a/internal/aws/k8s/k8sclient/pod_test.go b/internal/aws/k8s/k8sclient/pod_test.go index 460510c4f938..ca2c08810662 100644 --- a/internal/aws/k8s/k8sclient/pod_test.go +++ b/internal/aws/k8s/k8sclient/pod_test.go @@ -188,7 +188,7 @@ func TestPodClient_NamespaceToRunningPodNum(t *testing.T) { resultMap := client.NamespaceToRunningPodNum() log.Printf("NamespaceToRunningPodNum (len=%v): %v", len(resultMap), awsutil.Prettify(resultMap)) assert.True(t, reflect.DeepEqual(resultMap, expectedMap)) - client.shutdown() + client.Shutdown() assert.True(t, client.stopped) } diff --git a/receiver/awscontainerinsightreceiver/factory.go b/receiver/awscontainerinsightreceiver/factory.go index b9abd207bbee..68d1a686c610 100644 --- a/receiver/awscontainerinsightreceiver/factory.go +++ b/receiver/awscontainerinsightreceiver/factory.go @@ -22,11 +22,6 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver/receiverhelper" - "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor" - hostInfo "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8sapiserver" ) // Factory for awscontainerinsightreceiver @@ -68,12 +63,5 @@ func createMetricsReceiver( rCfg := baseCfg.(*Config) logger := params.Logger - hostInfo, err := hostInfo.NewInfo(rCfg.CollectionInterval, logger) - // TODO: I will need to change the code here to let cadvisor and k8sapiserver return err as well - if err != nil { - logger.Warn("failed to initialize hostInfo", zap.Error(err)) - } - cadvisor := cadvisor.New(rCfg.ContainerOrchestrator, hostInfo, logger) - k8sapiserver := k8sapiserver.New(hostInfo, logger) - return New(logger, rCfg, consumer, cadvisor, k8sapiserver) + return New(logger, rCfg, consumer) } diff --git a/receiver/awscontainerinsightreceiver/go.mod b/receiver/awscontainerinsightreceiver/go.mod index d7e7b2a7061d..bd2999a4e939 100644 --- a/receiver/awscontainerinsightreceiver/go.mod +++ b/receiver/awscontainerinsightreceiver/go.mod @@ -5,10 +5,20 @@ go 1.16 require ( github.com/aws/aws-sdk-go v1.38.55 github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil v0.0.0-00010101000000-000000000000 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight v0.0.0-00010101000000-000000000000 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s v0.0.0-00010101000000-000000000000 github.com/shirou/gopsutil v3.21.5+incompatible github.com/stretchr/testify v1.7.0 go.opentelemetry.io/collector v0.27.1-0.20210608105628-44a4ae746c3c go.uber.org/zap v1.17.0 + k8s.io/api v0.21.0 + k8s.io/apimachinery v0.21.0 + k8s.io/client-go v0.21.0 + k8s.io/klog v1.0.0 ) replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil => ./../../internal/aws/awsutil + +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight => ./../../internal/aws/containerinsight + +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s => ./../../internal/aws/k8s diff --git a/receiver/awscontainerinsightreceiver/go.sum b/receiver/awscontainerinsightreceiver/go.sum index 7261e239fe09..2663dc1076ce 100644 --- a/receiver/awscontainerinsightreceiver/go.sum +++ b/receiver/awscontainerinsightreceiver/go.sum @@ -50,14 +50,17 @@ github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7O github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8= github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs= github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= +github.com/Azure/go-autorest/autorest v0.11.1/go.mod h1:JFgpikqFJ/MleTTxwepExTKnFUKKszPS8UavbQYUMuw= github.com/Azure/go-autorest/autorest v0.11.12/go.mod h1:eipySxLmqSyC5s5k1CLupqet0PSENBEDP93LQ9a8QYw= github.com/Azure/go-autorest/autorest v0.11.18 h1:90Y4srNYrwOtAgVo3ndrQkTYn6kf1Eg/AjTFJ8Is2aM= github.com/Azure/go-autorest/autorest v0.11.18/go.mod h1:dSiJPy22c3u0OtOKDNttNgqpNFY/GeWa7GH/Pz56QRA= +github.com/Azure/go-autorest/autorest/adal v0.9.0/go.mod h1:/c022QCutn2P7uY+/oQWWNcK9YU+MH96NgK+jErpbcg= github.com/Azure/go-autorest/autorest/adal v0.9.5/go.mod h1:B7KF7jKIeC9Mct5spmyCB/A8CG/sEz1vwIRGv/bbw7A= github.com/Azure/go-autorest/autorest/adal v0.9.13 h1:Mp5hbtOePIzM8pJVRa3YLrWWmZtoxRXqUEzCfJt3+/Q= github.com/Azure/go-autorest/autorest/adal v0.9.13/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M= github.com/Azure/go-autorest/autorest/date v0.3.0 h1:7gUk1U5M/CQbp9WoqinNzJar+8KY+LPI6wiWrP/myHw= github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSYnokU+TrmwEsOqdt8Y6sso74= +github.com/Azure/go-autorest/autorest/mocks v0.4.0/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k= github.com/Azure/go-autorest/autorest/mocks v0.4.1 h1:K0laFcLE6VLTOwNgSxaGbUcLPuGXlNkbVvq4cW4nIHk= github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k= github.com/Azure/go-autorest/autorest/to v0.4.0 h1:oXVqrxakqqV1UZdSazDOPOLvOIz+XA683u8EctwboHk= @@ -130,6 +133,7 @@ github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQ github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.34.28/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48= github.com/aws/aws-sdk-go v1.38.3/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= +github.com/aws/aws-sdk-go v1.38.52/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= github.com/aws/aws-sdk-go v1.38.55 h1:1Wv5CE1Zy0hJ6MJUQ1ekFiCsNKBK5W69+towYQ1P4Vs= github.com/aws/aws-sdk-go v1.38.55/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= @@ -187,6 +191,7 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:ma github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw= github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/crossdock/crossdock-go v0.0.0-20160816171116-049aabb0122b/go.mod h1:v9FBN7gdVTpiD/+LZ7Po0UKvROyT87uLVxTHVky/dlQ= github.com/dave/jennifer v1.2.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhrIygKg= @@ -215,6 +220,7 @@ github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5Xh github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw= github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= @@ -259,6 +265,7 @@ github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWo github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/gdamore/encoding v1.0.0/go.mod h1:alR0ol34c49FCSBLjhosxzcPHQbf2trDkoo5dl+VrEg= github.com/gdamore/tcell v1.3.0/go.mod h1:Hjvr+Ofd+gLglo7RYKxxnzCBmev3BzsS67MebKS4zMM= +github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/globalsign/mgo v0.0.0-20180905125535-1ca0a4f7cbcb/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q= github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q= @@ -276,6 +283,7 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V github.com/go-logfmt/logfmt v0.5.0 h1:TrB8swr/68K7m9CcGut2g3UOihhbcbiMAYiuTXdEih4= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= +github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= github.com/go-logr/logr v0.4.0 h1:K7/B1jt6fIBQVd4Owv2MqGQClcgf0R266+7C/QjRcLc= github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY= @@ -507,7 +515,9 @@ github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2z github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/securecookie v1.1.1 h1:miw7JPhV+b/lAHSXz4qd/nN9jRiAFV5FwjeKyCS8BvQ= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1 h1:DHd3rPN5lE3Ts3D8rKkQ8x/0kqfeNmBAaiSi+o7FsgI= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= @@ -580,10 +590,12 @@ github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb/go.mod h1:+NfK9FKe github.com/hashicorp/yamux v0.0.0-20190923154419-df201c70410d/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM= github.com/hetznercloud/hcloud-go v1.24.0 h1:/CeHDzhH3Fhm83pjxvE3xNNLbvACl0Lu1/auJ83gG5U= github.com/hetznercloud/hcloud-go v1.24.0/go.mod h1:3YmyK8yaZZ48syie6xpm3dt26rtB6s65AisBHylXYFA= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/imdario/mergo v0.3.5 h1:JboBksRwiiAJWvIYJVo46AfV+IAIKZpfrSzVKj42R4Q= github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= @@ -769,11 +781,13 @@ github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.12.0 h1:Iw5WCbBcaAAd0fpRb1c9r5YCylv4XDoCSigm1zLevwU= github.com/onsi/ginkgo v1.12.0/go.mod h1:oUhWkIvk5aDxtKvDDuw8gItl8pKl42LzjC9KZE0HfGg= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= +github.com/onsi/gomega v1.9.0 h1:R1uwffexN6Pr340GtYRIdZmAiN4J+iw6WG4wog1DUXg= github.com/onsi/gomega v1.9.0/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= @@ -1038,6 +1052,7 @@ go.opentelemetry.io/collector v0.27.1-0.20210608105628-44a4ae746c3c/go.mod h1:AP go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/automaxprocs v1.4.0/go.mod h1:/mTEdr7LvHhs0v7mjdxDreTz1OG5zdZGqgOnhWiR/+Q= @@ -1045,12 +1060,14 @@ go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= +go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= +go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ= go.uber.org/zap v1.17.0 h1:MTjgFu6ZLKvY6Pvaqk97GlxNBuMpV4Hy/3P6tRGlI2U= go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= @@ -1113,6 +1130,7 @@ golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.4.1 h1:Kvvh58BN8Y9/lBi7hTekvtMpm07eUZ0ck5pRHpsMWrY= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180530234432-1e491301e022/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1257,6 +1275,7 @@ golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200831180312-196b9ba8737a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201112073958-5cba982894dd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1291,6 +1310,7 @@ golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba h1:O8mE0/t419eoIwhTFpKVkHiTs/Igowgfkj25AcZrtiE= golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -1503,6 +1523,7 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/fsnotify/fsnotify.v1 v1.4.7 h1:XNNYLJHt73EyYiCZi6+xjupS9CpvmiDgjPTAjrBlQbo= gopkg.in/fsnotify/fsnotify.v1 v1.4.7/go.mod h1:Fyux9zXlo4rWoMSIzpn9fDAYjalPqJ/K1qJ27s+7ltE= @@ -1515,6 +1536,7 @@ gopkg.in/ini.v1 v1.52.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/square/go-jose.v2 v2.5.1 h1:7odma5RETjNHWJnR32wx8t+Io4djHE1PqxCFx3iiZ2w= gopkg.in/square/go-jose.v2 v2.5.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= @@ -1544,18 +1566,23 @@ honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.2.0/go.mod h1:lPVVZ2BS5TfnjLyizF7o7hv7j9/L+8cZY2hLyjP9cGY= +k8s.io/api v0.20.4/go.mod h1:++lNL1AJMkDymriNniQsWRkMDzRaX2Y/POTUi8yvqYQ= k8s.io/api v0.21.0 h1:gu5iGF4V6tfVCQ/R+8Hc0h7H1JuEhzyEi9S4R5LM8+Y= k8s.io/api v0.21.0/go.mod h1:+YbrhBBGgsxbF6o6Kj4KJPJnBmAKuXDeS3E18bgHNVU= +k8s.io/apimachinery v0.20.4/go.mod h1:WlLqWAHZGg07AeltaI0MV5uk1Omp8xaN0JGLY6gkRpU= k8s.io/apimachinery v0.21.0 h1:3Fx+41if+IRavNcKOz09FwEXDBG6ORh6iMsTSelhkMA= k8s.io/apimachinery v0.21.0/go.mod h1:jbreFvJo3ov9rj7eWT7+sYiRx+qZuCYXwWT1bcDswPY= +k8s.io/client-go v0.20.4/go.mod h1:LiMv25ND1gLUdBeYxBIwKpkSC5IsozMMmOOeSJboP+k= k8s.io/client-go v0.21.0 h1:n0zzzJsAQmJngpC0IhgFcApZyoGXPrDIAD601HD09ag= k8s.io/client-go v0.21.0/go.mod h1:nNBytTF9qPFDEhoqgEPaarobC8QPae13bElIVHzIglA= k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8= k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= +k8s.io/klog/v2 v2.4.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= k8s.io/klog/v2 v2.8.0 h1:Q3gmuM9hKEjefWFFYF0Mat+YyFJvsUyYuwyNNJ5C9Ts= k8s.io/klog/v2 v2.8.0/go.mod h1:hy9LJ/NvuK+iVyP4Ehqva4HxZG/oXyIS3n3Jmire4Ec= +k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd/go.mod h1:WOJ3KddDSol4tAGcJo0Tvi+dK12EcqSLqcWsryKMpfM= k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7 h1:vEx13qjvaZ4yfObSSXW7BrMc/KQBBT/Jyee8XtLf4x0= k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7/go.mod h1:wXW5VT87nVfh/iLV8FpR2uDvrFyomxbtb1KivDbvPTE= k8s.io/utils v0.0.0-20201110183641-67b214c5f920 h1:CbnUZsM497iRC5QMVkHwyl8s2tB3g7yaSHkYPkpgelw= diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go index 66b871cc613c..a81c07c26dd8 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go @@ -16,54 +16,270 @@ package k8sapiserver import ( "context" + "errors" + "fmt" + "os" + "strconv" + "time" "go.opentelemetry.io/collector/consumer/pdata" "go.uber.org/zap" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes/scheme" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/client-go/tools/record" + "k8s.io/klog" + + ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient" +) + +const ( + lockName = "aoc-clusterleader" ) +// eventBroadcaster is adpated from record.EventBroadcaster +type eventBroadcaster interface { + // StartRecordingToSink starts sending events received from this EventBroadcaster to the given + // sink. The return value can be ignored or used to stop recording, if desired. + StartRecordingToSink(sink record.EventSink) watch.Interface + // StartLogging starts sending events received from this EventBroadcaster to the given logging + // function. The return value can be ignored or used to stop recording, if desired. + StartLogging(logf func(format string, args ...interface{})) watch.Interface + // NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster + // with the event source set to the given event source. + NewRecorder(scheme *runtime.Scheme, source v1.EventSource) record.EventRecorder +} + // K8sAPIServer is a struct that produces metrics from kubernetes api server type K8sAPIServer struct { + nodeName string //get the value from downward API logger *zap.Logger clusterNameProvider clusterNameProvider cancel context.CancelFunc + + leading bool + k8sClient *k8sclient.K8sClient + + // the following can be set to mocks in testing + broadcaster eventBroadcaster + // the close of isLeadingC indicates the leader election is done. This is used in testing + isLeadingC chan bool } type clusterNameProvider interface { GetClusterName() string } +type k8sAPIServerOption func(*K8sAPIServer) + // New creates a k8sApiServer which can generate cluster-level metrics -func New(clusterNameProvider clusterNameProvider, logger *zap.Logger) *K8sAPIServer { +func New(clusterNameProvider clusterNameProvider, logger *zap.Logger, options ...k8sAPIServerOption) (*K8sAPIServer, error) { _, cancel := context.WithCancel(context.Background()) k := &K8sAPIServer{ logger: logger, clusterNameProvider: clusterNameProvider, + k8sClient: k8sclient.Get(logger), + broadcaster: record.NewBroadcaster(), cancel: cancel, } - if err := k.start(); err != nil { - k.logger.Warn("Fail to start k8sapiserver", zap.Error(err)) - return nil + for _, opt := range options { + opt(k) + } + + if k.k8sClient == nil { + return nil, errors.New("failed to start k8sapiserver because k8sclient is nil") } - return k + if err := k.init(); err != nil { + return nil, fmt.Errorf("fail to initialize k8sapiserver, err: %v", err) + } + + return k, nil } // GetMetrics returns an array of metrics func (k *K8sAPIServer) GetMetrics() []pdata.Metrics { - // TODO: add the logic to generate the metrics var result []pdata.Metrics + + //don't emit metrics if the cluster name is not detected + clusterName := k.clusterNameProvider.GetClusterName() + if clusterName == "" { + k.logger.Warn("Failed to detect cluster name. Drop all metrics") + return result + } + + if k.leading { + k.logger.Info("collect data from K8s API Server...") + timestampNs := strconv.FormatInt(time.Now().UnixNano(), 10) + client := k.k8sClient + + fields := map[string]interface{}{ + "cluster_failed_node_count": client.Node.ClusterFailedNodeCount(), + "cluster_node_count": client.Node.ClusterNodeCount(), + } + attributes := map[string]string{ + ci.ClusterNameKey: clusterName, + ci.MetricType: ci.TypeCluster, + ci.Timestamp: timestampNs, + ci.Version: "0", + } + if k.nodeName != "" { + attributes["NodeName"] = k.nodeName + } + md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) + result = append(result, md) + + for service, podNum := range client.Ep.ServiceToPodNum() { + fields := map[string]interface{}{ + "service_number_of_running_pods": podNum, + } + attributes := map[string]string{ + ci.ClusterNameKey: clusterName, + ci.MetricType: ci.TypeClusterService, + ci.Timestamp: timestampNs, + ci.TypeService: service.ServiceName, + ci.K8sNamespace: service.Namespace, + ci.Version: "0", + } + if k.nodeName != "" { + attributes["NodeName"] = k.nodeName + } + md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) + result = append(result, md) + } + + for namespace, podNum := range client.Pod.NamespaceToRunningPodNum() { + fields := map[string]interface{}{ + "namespace_number_of_running_pods": podNum, + } + attributes := map[string]string{ + ci.ClusterNameKey: clusterName, + ci.MetricType: ci.TypeClusterNamespace, + ci.Timestamp: timestampNs, + ci.K8sNamespace: namespace, + ci.Version: "0", + } + if k.nodeName != "" { + attributes["NodeName"] = k.nodeName + } + md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) + result = append(result, md) + } + } return result } -func (k *K8sAPIServer) start() error { - //TODO: add implementation +func (k *K8sAPIServer) init() error { + var ctx context.Context + ctx, k.cancel = context.WithCancel(context.Background()) + + k.nodeName = os.Getenv("HOST_NAME") + if k.nodeName == "" { + return errors.New("missing environment variable HOST_NAME. Please check your deployment YAML config") + } + + lockNamespace := os.Getenv("K8S_NAMESPACE") + if lockNamespace == "" { + return errors.New("missing environment variable K8S_NAMESPACE. Please check your deployment YAML config") + } + + configMapInterface := k.k8sClient.ClientSet.CoreV1().ConfigMaps(lockNamespace) + if configMap, err := configMapInterface.Get(ctx, lockName, metav1.GetOptions{}); configMap == nil || err != nil { + k.logger.Info(fmt.Sprintf("Cannot get the leader config map: %v, try to create the config map...", err)) + configMap, err = configMapInterface.Create(ctx, + &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: lockNamespace, + Name: lockName, + }, + }, metav1.CreateOptions{}) + k.logger.Info(fmt.Sprintf("configMap: %v, err: %v", configMap, err)) + } + + lock, err := resourcelock.New( + resourcelock.ConfigMapsResourceLock, + lockNamespace, lockName, + k.k8sClient.ClientSet.CoreV1(), + k.k8sClient.ClientSet.CoordinationV1(), + resourcelock.ResourceLockConfig{ + Identity: k.nodeName, + EventRecorder: k.createRecorder(lockName, lockNamespace), + }) + if err != nil { + k.logger.Warn("Failed to create resource lock", zap.Error(err)) + return err + } + + go k.startLeaderElection(ctx, lock) + return nil } -// Stop stops the k8sApiServer -func (k *K8sAPIServer) Stop() { +// Shutdown stops the k8sApiServer +func (k *K8sAPIServer) Shutdown() { if k.cancel != nil { k.cancel() } } + +func (k *K8sAPIServer) startLeaderElection(ctx context.Context, lock resourcelock.Interface) { + + for { + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: lock, + // IMPORTANT: you MUST ensure that any code you have that + // is protected by the lease must terminate **before** + // you call cancel. Otherwise, you could have a background + // loop still running and another process could + // get elected before your background loop finished, violating + // the stated goal of the lease. + LeaseDuration: 60 * time.Second, + RenewDeadline: 15 * time.Second, + RetryPeriod: 5 * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + k.logger.Info(fmt.Sprintf("k8sapiserver OnStartedLeading: %s", k.nodeName)) + // we're notified when we start + k.leading = true + + if k.isLeadingC != nil { + // this executes only in testing + close(k.isLeadingC) + } + }, + OnStoppedLeading: func() { + k.logger.Info(fmt.Sprintf("k8sapiserver OnStoppedLeading: %s", k.nodeName)) + // we can do cleanup here, or after the RunOrDie method returns + k.leading = false + //node and pod are only used for cluster level metrics, endpoint is used for decorator too. + k.k8sClient.Node.Shutdown() + k.k8sClient.Pod.Shutdown() + }, + OnNewLeader: func(identity string) { + k.logger.Info(fmt.Sprintf("k8sapiserver Switch New Leader: %s", identity)) + }, + }, + }) + + select { + case <-ctx.Done(): //when leader election ends, the channel ctx.Done() will be closed + k.logger.Info(fmt.Sprintf("k8sapiserver shutdown Leader Election: %s", k.nodeName)) + return + default: + } + } +} + +func (k *K8sAPIServer) createRecorder(name, namespace string) record.EventRecorder { + k.broadcaster.StartLogging(klog.Infof) + clientSet := k.k8sClient.ClientSet + k.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(clientSet.CoreV1().RESTClient()).Events(namespace)}) + return k.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: name}) +} diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go index 86cc6a5c3917..b0bf6286edfd 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go @@ -15,20 +15,223 @@ package k8sapiserver import ( + "fmt" + "os" + "strings" "testing" - "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "go.opentelemetry.io/collector/consumer/pdata" "go.uber.org/zap" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/record" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host" + ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient" ) -func TestK8sapiserver(t *testing.T) { - machineInfo, _ := host.NewInfo(time.Minute, zap.NewNop()) - k := New(machineInfo, zap.NewNop()) - k.start() - assert.NotNil(t, k) - assert.Nil(t, k.GetMetrics()) - k.Stop() +func NewService(name, namespace string) k8sclient.Service { + return k8sclient.Service{ServiceName: name, Namespace: namespace} +} + +var mockClient = new(MockClient) + +var mockK8sClient = &k8sclient.K8sClient{ + Pod: mockClient, + Node: mockClient, + Ep: mockClient, + ClientSet: fake.NewSimpleClientset(), +} + +type MockClient struct { + k8sclient.PodClient + k8sclient.NodeClient + k8sclient.EpClient + + mock.Mock +} + +// k8sclient.PodClient +func (client *MockClient) NamespaceToRunningPodNum() map[string]int { + args := client.Called() + return args.Get(0).(map[string]int) +} + +// k8sclient.NodeClient +func (client *MockClient) ClusterFailedNodeCount() int { + args := client.Called() + return args.Get(0).(int) +} + +func (client *MockClient) ClusterNodeCount() int { + args := client.Called() + return args.Get(0).(int) +} + +// k8sclient.EpClient +func (client *MockClient) ServiceToPodNum() map[k8sclient.Service]int { + args := client.Called() + return args.Get(0).(map[k8sclient.Service]int) +} + +func (client *MockClient) Shutdown() { +} + +type mockEventBroadcaster struct { +} + +func (m *mockEventBroadcaster) StartRecordingToSink(sink record.EventSink) watch.Interface { + return watch.NewFake() +} + +func (m *mockEventBroadcaster) StartLogging(logf func(format string, args ...interface{})) watch.Interface { + return watch.NewFake() +} + +func (m *mockEventBroadcaster) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) record.EventRecorder { + return record.NewFakeRecorder(100) +} + +func getStringAttrVal(m pdata.Metrics, key string) string { + rm := m.ResourceMetrics().At(0) + attributes := rm.Resource().Attributes() + if attributeValue, ok := attributes.Get(key); ok { + return attributeValue.StringVal() + } + return "" +} + +func assertMetricValueEqual(t *testing.T, m pdata.Metrics, metricName string, expected int64) { + rm := m.ResourceMetrics().At(0) + ilms := rm.InstrumentationLibraryMetrics() + + for j := 0; j < ilms.Len(); j++ { + metricSlice := ilms.At(j).Metrics() + for i := 0; i < metricSlice.Len(); i++ { + metric := metricSlice.At(i) + if metric.Name() == metricName { + if metric.DataType() == pdata.MetricDataTypeIntGauge { + assert.Equal(t, expected, metric.IntGauge().DataPoints().At(0).Value()) + return + } + + msg := fmt.Sprintf("Metric with name: %v has wrong type.", metricName) + assert.Fail(t, msg) + } + } + } + + msg := fmt.Sprintf("No metric with name: %v", metricName) + assert.Fail(t, msg) +} + +type MockClusterNameProvicer struct { +} + +func (m MockClusterNameProvicer) GetClusterName() string { + return "cluster-name" +} + +func TestK8sAPIServer_New(t *testing.T) { + k8sClientOption := func(k *K8sAPIServer) { + k.k8sClient = nil + } + k8sAPIServer, err := New(MockClusterNameProvicer{}, zap.NewNop(), k8sClientOption) + assert.Nil(t, k8sAPIServer) + assert.NotNil(t, err) +} + +func TestK8sAPIServer_GetMetrics(t *testing.T) { + hostName, err := os.Hostname() + assert.NoError(t, err) + k8sClientOption := func(k *K8sAPIServer) { + k.k8sClient = mockK8sClient + } + leadingOption := func(k *K8sAPIServer) { + k.leading = true + } + broadcasterOption := func(k *K8sAPIServer) { + k.broadcaster = &mockEventBroadcaster{} + } + isLeadingCOption := func(k *K8sAPIServer) { + k.isLeadingC = make(chan bool) + } + + originalHostName := os.Getenv("HOST_NAME") + originalNamespace := os.Getenv("K8S_NAMESPACE") + os.Setenv("HOST_NAME", hostName) + os.Setenv("K8S_NAMESPACE", "namespace") + k8sAPIServer, err := New(MockClusterNameProvicer{}, zap.NewNop(), k8sClientOption, + leadingOption, broadcasterOption, isLeadingCOption) + + assert.NotNil(t, k8sAPIServer) + assert.Nil(t, err) + + mockClient.On("NamespaceToRunningPodNum").Return(map[string]int{"default": 2}) + mockClient.On("ClusterFailedNodeCount").Return(1) + mockClient.On("ClusterNodeCount").Return(1) + mockClient.On("ServiceToPodNum").Return( + map[k8sclient.Service]int{ + NewService("service1", "kube-system"): 1, + NewService("service2", "kube-system"): 1, + }, + ) + + <-k8sAPIServer.isLeadingC + metrics := k8sAPIServer.GetMetrics() + assert.NoError(t, err) + + /* + tags: map[Timestamp:1557291396709 Type:Cluster], fields: map[cluster_failed_node_count:1 cluster_node_count:1], + tags: map[Service:service2 Timestamp:1557291396709 Type:ClusterService], fields: map[service_number_of_running_pods:1], + tags: map[Service:service1 Timestamp:1557291396709 Type:ClusterService], fields: map[service_number_of_running_pods:1], + tags: map[Namespace:default Timestamp:1557291396709 Type:ClusterNamespace], fields: map[namespace_number_of_running_pods:2], + */ + for _, metric := range metrics { + assert.Equal(t, "cluster-name", getStringAttrVal(metric, ci.ClusterNameKey)) + if metricType := getStringAttrVal(metric, ci.MetricType); metricType == ci.TypeCluster { + assertMetricValueEqual(t, metric, "cluster_failed_node_count", int64(1)) + assertMetricValueEqual(t, metric, "cluster_node_count", int64(1)) + } else if metricType == ci.TypeClusterService { + assertMetricValueEqual(t, metric, "service_number_of_running_pods", int64(1)) + if serviceTag := getStringAttrVal(metric, ci.TypeService); serviceTag != "service1" && serviceTag != "service2" { + assert.Fail(t, "Expect to see a tag named as Service") + } + if namespaceTag := getStringAttrVal(metric, ci.K8sNamespace); namespaceTag != "kube-system" { + assert.Fail(t, "Expect to see a tag named as Namespace") + } + } else if metricType == ci.TypeClusterNamespace { + assertMetricValueEqual(t, metric, "namespace_number_of_running_pods", int64(2)) + assert.Equal(t, "default", getStringAttrVal(metric, ci.K8sNamespace)) + } else { + assert.Fail(t, "Unexpected metric type: "+metricType) + } + } + + k8sAPIServer.Shutdown() + // restore env variables + os.Setenv("HOST_NAME", originalHostName) + os.Setenv("K8S_NAMESPACE", originalNamespace) +} + +func TestK8sAPIServer_init(t *testing.T) { + k8sAPIServer := &K8sAPIServer{} + + err := k8sAPIServer.init() + assert.NotNil(t, err) + assert.True(t, strings.HasPrefix(err.Error(), "missing environment variable HOST_NAME")) + + originalHostName := os.Getenv("HOST_NAME") + os.Setenv("HOST_NAME", "hostname") + + err = k8sAPIServer.init() + assert.NotNil(t, err) + assert.True(t, strings.HasPrefix(err.Error(), "missing environment variable K8S_NAMESPACE")) + + // restore env variables + os.Setenv("HOST_NAME", originalHostName) } diff --git a/receiver/awscontainerinsightreceiver/receiver.go b/receiver/awscontainerinsightreceiver/receiver.go index 2952503aa499..6053f7b5a7d4 100644 --- a/receiver/awscontainerinsightreceiver/receiver.go +++ b/receiver/awscontainerinsightreceiver/receiver.go @@ -24,6 +24,10 @@ import ( "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/obsreport" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor" + hostInfo "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8sapiserver" ) var _ component.MetricsReceiver = (*awsContainerInsightReceiver)(nil) @@ -46,9 +50,7 @@ type awsContainerInsightReceiver struct { func New( logger *zap.Logger, config *Config, - nextConsumer consumer.Metrics, - cadvisor MetricsProvider, - k8sapiserver MetricsProvider) (component.MetricsReceiver, error) { + nextConsumer consumer.Metrics) (component.MetricsReceiver, error) { if nextConsumer == nil { return nil, componenterror.ErrNilNextConsumer } @@ -57,8 +59,6 @@ func New( logger: logger, nextConsumer: nextConsumer, config: config, - cadvisor: cadvisor, - k8sapiserver: k8sapiserver, } return r, nil } @@ -66,6 +66,11 @@ func New( // Start collecting metrics from cadvisor and k8s api server (if it is an elected leader) func (acir *awsContainerInsightReceiver) Start(ctx context.Context, host component.Host) error { ctx, acir.cancel = context.WithCancel(obsreport.ReceiverContext(ctx, acir.config.ID(), "http")) + //ignore the error for now, will address it in later PR + machineInfo, _ := hostInfo.NewInfo(acir.config.CollectionInterval, acir.logger) + acir.cadvisor = cadvisor.New(acir.config.ContainerOrchestrator, machineInfo, acir.logger) + acir.k8sapiserver, _ = k8sapiserver.New(machineInfo, acir.logger) + // TODO: add more intialization code go func() { diff --git a/receiver/awscontainerinsightreceiver/receiver_test.go b/receiver/awscontainerinsightreceiver/receiver_test.go index 4f4f1c985ff1..ea78ac5e0bb3 100644 --- a/receiver/awscontainerinsightreceiver/receiver_test.go +++ b/receiver/awscontainerinsightreceiver/receiver_test.go @@ -25,11 +25,22 @@ import ( "go.uber.org/zap" ) -type MockMetricsProvider struct { +// Mock cadvisor +type MockCadvisor struct { +} + +func (c *MockCadvisor) GetMetrics() []pdata.Metrics { + md := pdata.NewMetrics() + return []pdata.Metrics{md} +} + +// Mock k8sapiserver +type MockK8sAPIServer struct { } -func (m *MockMetricsProvider) GetMetrics() []pdata.Metrics { - return []pdata.Metrics{} +func (m *MockK8sAPIServer) GetMetrics() []pdata.Metrics { + md := pdata.NewMetrics() + return []pdata.Metrics{md} } func TestReceiver(t *testing.T) { @@ -38,8 +49,6 @@ func TestReceiver(t *testing.T) { zap.NewNop(), cfg, consumertest.NewNop(), - &MockCadvisor{}, - &MockCadvisor{}, ) require.NoError(t, err) @@ -61,8 +70,6 @@ func TestReceiverForNilConsumer(t *testing.T) { zap.NewNop(), cfg, nil, - &MockCadvisor{}, - &MockCadvisor{}, ) require.NotNil(t, err) @@ -75,8 +82,6 @@ func TestCollectData(t *testing.T) { zap.NewNop(), cfg, new(consumertest.MetricsSink), - &MockCadvisor{}, - &MockCadvisor{}, ) require.NoError(t, err) @@ -85,7 +90,7 @@ func TestCollectData(t *testing.T) { r := metricsReceiver.(*awsContainerInsightReceiver) r.Start(context.Background(), nil) ctx := context.Background() - + r.k8sapiserver = &MockK8sAPIServer{} err = r.collectData(ctx) require.Nil(t, err) @@ -96,23 +101,12 @@ func TestCollectData(t *testing.T) { require.NotNil(t, err) } -//Mock cadvisor -type MockCadvisor struct { -} - -func (c *MockCadvisor) GetMetrics() []pdata.Metrics { - md := pdata.NewMetrics() - return []pdata.Metrics{md} -} - func TestCollectDataWithErrConsumer(t *testing.T) { cfg := createDefaultConfig().(*Config) metricsReceiver, err := New( zap.NewNop(), cfg, consumertest.NewErr(errors.New("an error")), - &MockCadvisor{}, - &MockCadvisor{}, ) require.NoError(t, err) @@ -121,6 +115,7 @@ func TestCollectDataWithErrConsumer(t *testing.T) { r := metricsReceiver.(*awsContainerInsightReceiver) r.Start(context.Background(), nil) r.cadvisor = &MockCadvisor{} + r.k8sapiserver = &MockK8sAPIServer{} ctx := context.Background() err = r.collectData(ctx) From 49821855196c266014d3456d76c1a5afc10751e9 Mon Sep 17 00:00:00 2001 From: Ping Xiang Date: Wed, 2 Jun 2021 20:32:09 +0000 Subject: [PATCH 2/4] minor changes to address comments --- .../internal/k8sapiserver/k8sapiserver.go | 10 +++++----- .../internal/k8sapiserver/k8sapiserver_test.go | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go index a81c07c26dd8..8f5c6c1fd34c 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go @@ -29,7 +29,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes/scheme" - v1core "k8s.io/client-go/kubernetes/typed/core/v1" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" @@ -40,7 +40,7 @@ import ( ) const ( - lockName = "aoc-clusterleader" + lockName = "otel-container-insight-clusterleader" ) // eventBroadcaster is adpated from record.EventBroadcaster @@ -182,12 +182,12 @@ func (k *K8sAPIServer) init() error { k.nodeName = os.Getenv("HOST_NAME") if k.nodeName == "" { - return errors.New("missing environment variable HOST_NAME. Please check your deployment YAML config") + return errors.New("environment variable HOST_NAME is not set in k8s deployment config") } lockNamespace := os.Getenv("K8S_NAMESPACE") if lockNamespace == "" { - return errors.New("missing environment variable K8S_NAMESPACE. Please check your deployment YAML config") + return errors.New("environment variable K8S_NAMESPACE is not set in k8s deployment config") } configMapInterface := k.k8sClient.ClientSet.CoreV1().ConfigMaps(lockNamespace) @@ -280,6 +280,6 @@ func (k *K8sAPIServer) startLeaderElection(ctx context.Context, lock resourceloc func (k *K8sAPIServer) createRecorder(name, namespace string) record.EventRecorder { k.broadcaster.StartLogging(klog.Infof) clientSet := k.k8sClient.ClientSet - k.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(clientSet.CoreV1().RESTClient()).Events(namespace)}) + k.broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: corev1.New(clientSet.CoreV1().RESTClient()).Events(namespace)}) return k.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: name}) } diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go index b0bf6286edfd..9ba2429ad778 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go @@ -223,14 +223,14 @@ func TestK8sAPIServer_init(t *testing.T) { err := k8sAPIServer.init() assert.NotNil(t, err) - assert.True(t, strings.HasPrefix(err.Error(), "missing environment variable HOST_NAME")) + assert.True(t, strings.HasPrefix(err.Error(), "environment variable HOST_NAME is not set")) originalHostName := os.Getenv("HOST_NAME") os.Setenv("HOST_NAME", "hostname") err = k8sAPIServer.init() assert.NotNil(t, err) - assert.True(t, strings.HasPrefix(err.Error(), "missing environment variable K8S_NAMESPACE")) + assert.True(t, strings.HasPrefix(err.Error(), "environment variable K8S_NAMESPACE is not set")) // restore env variables os.Setenv("HOST_NAME", originalHostName) From 055d8842f7ce37300995f266b8ae23acf890fe2a Mon Sep 17 00:00:00 2001 From: Ping Xiang Date: Sat, 5 Jun 2021 00:35:39 +0000 Subject: [PATCH 3/4] add lock to protect the leading field --- .../internal/k8sapiserver/k8sapiserver.go | 113 +++++++++++------- 1 file changed, 70 insertions(+), 43 deletions(-) diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go index 8f5c6c1fd34c..5d00104ac82d 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go @@ -20,6 +20,7 @@ import ( "fmt" "os" "strconv" + "sync" "time" "go.opentelemetry.io/collector/consumer/pdata" @@ -63,7 +64,9 @@ type K8sAPIServer struct { clusterNameProvider clusterNameProvider cancel context.CancelFunc - leading bool + mu sync.Mutex + leading bool + k8sClient *k8sclient.K8sClient // the following can be set to mocks in testing @@ -108,26 +111,48 @@ func New(clusterNameProvider clusterNameProvider, logger *zap.Logger, options .. func (k *K8sAPIServer) GetMetrics() []pdata.Metrics { var result []pdata.Metrics - //don't emit metrics if the cluster name is not detected + // don't generate any metrics if the current collector is not the leader + if !k.leading { + return result + } + + // don't emit metrics if the cluster name is not detected clusterName := k.clusterNameProvider.GetClusterName() if clusterName == "" { k.logger.Warn("Failed to detect cluster name. Drop all metrics") return result } - if k.leading { - k.logger.Info("collect data from K8s API Server...") - timestampNs := strconv.FormatInt(time.Now().UnixNano(), 10) - client := k.k8sClient + k.logger.Info("collect data from K8s API Server...") + timestampNs := strconv.FormatInt(time.Now().UnixNano(), 10) + client := k.k8sClient + + fields := map[string]interface{}{ + "cluster_failed_node_count": client.Node.ClusterFailedNodeCount(), + "cluster_node_count": client.Node.ClusterNodeCount(), + } + attributes := map[string]string{ + ci.ClusterNameKey: clusterName, + ci.MetricType: ci.TypeCluster, + ci.Timestamp: timestampNs, + ci.Version: "0", + } + if k.nodeName != "" { + attributes["NodeName"] = k.nodeName + } + md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) + result = append(result, md) + for service, podNum := range client.Ep.ServiceToPodNum() { fields := map[string]interface{}{ - "cluster_failed_node_count": client.Node.ClusterFailedNodeCount(), - "cluster_node_count": client.Node.ClusterNodeCount(), + "service_number_of_running_pods": podNum, } attributes := map[string]string{ ci.ClusterNameKey: clusterName, - ci.MetricType: ci.TypeCluster, + ci.MetricType: ci.TypeClusterService, ci.Timestamp: timestampNs, + ci.TypeService: service.ServiceName, + ci.K8sNamespace: service.Namespace, ci.Version: "0", } if k.nodeName != "" { @@ -135,44 +160,26 @@ func (k *K8sAPIServer) GetMetrics() []pdata.Metrics { } md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) result = append(result, md) + } - for service, podNum := range client.Ep.ServiceToPodNum() { - fields := map[string]interface{}{ - "service_number_of_running_pods": podNum, - } - attributes := map[string]string{ - ci.ClusterNameKey: clusterName, - ci.MetricType: ci.TypeClusterService, - ci.Timestamp: timestampNs, - ci.TypeService: service.ServiceName, - ci.K8sNamespace: service.Namespace, - ci.Version: "0", - } - if k.nodeName != "" { - attributes["NodeName"] = k.nodeName - } - md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) - result = append(result, md) + for namespace, podNum := range client.Pod.NamespaceToRunningPodNum() { + fields := map[string]interface{}{ + "namespace_number_of_running_pods": podNum, } - - for namespace, podNum := range client.Pod.NamespaceToRunningPodNum() { - fields := map[string]interface{}{ - "namespace_number_of_running_pods": podNum, - } - attributes := map[string]string{ - ci.ClusterNameKey: clusterName, - ci.MetricType: ci.TypeClusterNamespace, - ci.Timestamp: timestampNs, - ci.K8sNamespace: namespace, - ci.Version: "0", - } - if k.nodeName != "" { - attributes["NodeName"] = k.nodeName - } - md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) - result = append(result, md) + attributes := map[string]string{ + ci.ClusterNameKey: clusterName, + ci.MetricType: ci.TypeClusterNamespace, + ci.Timestamp: timestampNs, + ci.K8sNamespace: namespace, + ci.Version: "0", + } + if k.nodeName != "" { + attributes["NodeName"] = k.nodeName } + md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) + result = append(result, md) } + return result } @@ -247,16 +254,36 @@ func (k *K8sAPIServer) startLeaderElection(ctx context.Context, lock resourceloc OnStartedLeading: func(ctx context.Context) { k.logger.Info(fmt.Sprintf("k8sapiserver OnStartedLeading: %s", k.nodeName)) // we're notified when we start + k.mu.Lock() k.leading = true + k.mu.Unlock() if k.isLeadingC != nil { // this executes only in testing close(k.isLeadingC) } + + for { + k.mu.Lock() + leading := k.leading + k.mu.Unlock() + if !leading { + k.logger.Info("no longer leading") + return + } + select { + case <-ctx.Done(): + k.logger.Info("ctx cancelled") + return + case <-time.After(time.Second): + } + } }, OnStoppedLeading: func() { k.logger.Info(fmt.Sprintf("k8sapiserver OnStoppedLeading: %s", k.nodeName)) // we can do cleanup here, or after the RunOrDie method returns + k.mu.Lock() + defer k.mu.Unlock() k.leading = false //node and pod are only used for cluster level metrics, endpoint is used for decorator too. k.k8sClient.Node.Shutdown() From db2cc90679651467abcf2d10ccea4edfbf11bf3b Mon Sep 17 00:00:00 2001 From: Ping Xiang Date: Mon, 7 Jun 2021 22:34:34 +0000 Subject: [PATCH 4/4] add lock in GetMetrics --- .../internal/k8sapiserver/k8sapiserver.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go index 5d00104ac82d..cd20b54839f3 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go @@ -112,6 +112,8 @@ func (k *K8sAPIServer) GetMetrics() []pdata.Metrics { var result []pdata.Metrics // don't generate any metrics if the current collector is not the leader + k.mu.Lock() + defer k.mu.Unlock() if !k.leading { return result }