Skip to content

Commit

Permalink
Feature/add custom k8s metadata collector (#580)
Browse files Browse the repository at this point in the history
Signed-off-by: niejiangang <niejiangang@harmonycloud.cn>
  • Loading branch information
NeJan2020 authored Nov 3, 2023
1 parent 19b8017 commit f55d64d
Show file tree
Hide file tree
Showing 29 changed files with 1,427 additions and 191 deletions.
37 changes: 37 additions & 0 deletions collector/cmd/metadata-provider/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package main

import (
"flag"
"fmt"
"log"
"net/http"
_ "net/http/pprof"

"github.com/Kindling-project/kindling/collector/pkg/metadata/kubernetes"
"github.com/Kindling-project/kindling/collector/pkg/metadata/metaprovider/service"
)

func main() {
authType := flag.String("authType", "serviceAccount", "AuthType describes the type of authentication to use for the K8s API, support 'kubeConfig' or 'serviceAccount'. ")
kubeConfigPath := flag.String("kubeConfig", "/root/.kube/config", "kubeConfig describe the filePath to your kubeConfig,only used when authType is 'kubeConfig'")
httpPort := flag.Int("http-port", 9504, "port describe which port will be used to expose data")
enableFetchReplicaset := flag.Bool("enableFetchReplicaset", false, "controls whether to fetch ReplicaSet information. The default value is false. It should be enabled if the ReplicaSet is used to control pods in the third-party CRD except for Deployment.")
logInterval := flag.Int("logInterval", 120, "Interval(Second) to show how many event mp received, default 120s")

flag.Parse()

config := &service.Config{
KubeAuthType: kubernetes.AuthType(*authType),
KubeConfigDir: *kubeConfigPath,
EnableFetchReplicaSet: *enableFetchReplicaset,
LogInterval: *logInterval,
}

if mdw, err := service.NewMetaDataWrapper(config); err != nil {
log.Fatalf("create MetaData Wrapper failed, err: %v", err)
} else {
http.HandleFunc("/listAndWatch", mdw.ListAndWatch)
log.Printf("[http] service start at port: %d", *httpPort)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *httpPort), nil))
}
}
14 changes: 14 additions & 0 deletions collector/docker/Dockerfile-metadata-provider
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM golang:1.20.2-bullseye AS builder
WORKDIR /build

ENV GOPROXY https://goproxy.cn
COPY go.mod go.sum ./
RUN go mod download && go mod verify

COPY . .
RUN go build -v -o metadata-provider ./cmd/metadata-provider

FROM debian:bullseye-slim AS runner
WORKDIR /app
COPY --from=builder /build/metadata-provider /app/
CMD ["/app/metadata-provider"]
14 changes: 10 additions & 4 deletions collector/pkg/component/consumer/processor/k8sprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@ type Config struct {
// Set "Enable" false if you want to run the agent in the non-Kubernetes environment.
// Otherwise, the agent will panic if it can't connect to the API-server.
Enable bool `mapstructure:"enable"`

// MetaDataProviderConfig is optional config to use another source of K8sMetadata named metadata-provider
// Used to reduce the stress caused by agent directly on APIServer
// Set "metadata_provider_config.enable" true and "metadata_provider_config.endpoint" as target service to enable it
MetaDataProviderConfig *kubernetes.MetaDataProviderConfig `mapstructure:"metadata_provider_config"`
}

var DefaultConfig Config = Config{
KubeAuthType: "serviceAccount",
KubeConfigDir: "/root/.kube/config",
GraceDeletePeriod: 60,
Enable: true,
KubeAuthType: "serviceAccount",
KubeConfigDir: "/root/.kube/config",
GraceDeletePeriod: 60,
Enable: true,
MetaDataProviderConfig: &kubernetes.MetaDataProviderConfig{Enable: false, EnableTrace: false, Endpoint: ""},
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/Kindling-project/kindling/collector/pkg/model"
"github.com/Kindling-project/kindling/collector/pkg/model/constlabels"
"github.com/Kindling-project/kindling/collector/pkg/model/constnames"

mpclient "github.com/Kindling-project/kindling/collector/pkg/metadata/metaprovider/client"
)

const (
Expand Down Expand Up @@ -49,6 +51,10 @@ func NewKubernetesProcessor(cfg interface{}, telemetry *component.TelemetryTools
kubernetes.WithGraceDeletePeriod(config.GraceDeletePeriod),
kubernetes.WithFetchReplicaSet(config.EnableFetchReplicaSet),
)
if config.MetaDataProviderConfig != nil && config.MetaDataProviderConfig.Enable {
cli := mpclient.NewMetaDataWrapperClient(config.MetaDataProviderConfig.Endpoint, config.MetaDataProviderConfig.EnableTrace)
options = append(options, kubernetes.WithMetaDataProviderConfig(config.MetaDataProviderConfig, cli.ListAndWatch))
}
err := kubernetes.InitK8sHandler(options...)
if err != nil {
telemetry.Logger.Panicf("Failed to initialize [%s]: %v. Set the option 'enable' false if you want to run the agent in the non-Kubernetes environment.", K8sMetadata, err)
Expand Down
54 changes: 53 additions & 1 deletion collector/pkg/metadata/kubernetes/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package kubernetes

import "time"
import (
"time"

"k8s.io/client-go/tools/cache"
)

// config contains optional settings for connecting to kubernetes.
type config struct {
Expand All @@ -14,6 +18,23 @@ type config struct {
// The default value is false. It should be enabled if the ReplicaSet
// is used to control pods in the third-party CRD except for Deployment.
EnableFetchReplicaSet bool

MetaDataProviderConfig *MetaDataProviderConfig `mapstructure:"metadata_provider_config"`

listAndWatchFromProvider func(setup SetPreprocessingMetaDataCache) error
podEventHander cache.ResourceEventHandler
rsEventHander cache.ResourceEventHandler
nodeEventHander cache.ResourceEventHandler
serviceEventHander cache.ResourceEventHandler
}

type MetaDataProviderConfig struct {
Enable bool `mapstructure:"enable"`
// print every K8s Metadata received from mp, used for debug
EnableTrace bool `mapstructure:"enable_trace"`
// Endpoint is where metadata-provider deloyed and provide service
// e.g "http://localhost:9504"
Endpoint string `mapstructure:"endpoint"`
}

type Option func(cfg *config)
Expand Down Expand Up @@ -47,3 +68,34 @@ func WithFetchReplicaSet(fetch bool) Option {
cfg.EnableFetchReplicaSet = fetch
}
}

func WithMetaDataProviderConfig(mpCfg *MetaDataProviderConfig, listAndWatch func(SetPreprocessingMetaDataCache) error) Option {
return func(cfg *config) {
cfg.MetaDataProviderConfig = mpCfg
cfg.listAndWatchFromProvider = listAndWatch
}
}

func WithPodEventHander(handler cache.ResourceEventHandler) Option {
return func(cfg *config) {
cfg.podEventHander = handler
}
}

func WithServiceEventHander(handler cache.ResourceEventHandler) Option {
return func(cfg *config) {
cfg.serviceEventHander = handler
}
}

func WithNodeEventHander(handler cache.ResourceEventHandler) Option {
return func(cfg *config) {
cfg.nodeEventHander = handler
}
}

func WithReplicaSetEventHander(handler cache.ResourceEventHandler) Option {
return func(cfg *config) {
cfg.rsEventHander = handler
}
}
32 changes: 21 additions & 11 deletions collector/pkg/metadata/kubernetes/hostport_map.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,47 @@
package kubernetes

import "sync"
import (
"strconv"
"sync"
)

type IpPortKey struct {
Ip string
Port uint32
}

type HostPortMap struct {
hostPortInfo sync.Map
HostPortInfo map[string]*K8sContainerInfo
mutex sync.RWMutex
}

func newHostPortMap() *HostPortMap {
func NewHostPortMap() *HostPortMap {
return &HostPortMap{
hostPortInfo: sync.Map{},
HostPortInfo: make(map[string]*K8sContainerInfo),
}
}

func (m *HostPortMap) add(ip string, port uint32, containerInfo *K8sContainerInfo) {
key := IpPortKey{ip, port}
m.hostPortInfo.Store(key, containerInfo)
key := ip + ":" + strconv.FormatUint(uint64(port), 10)
m.mutex.Lock()
defer m.mutex.Unlock()
m.HostPortInfo[key] = containerInfo
}

func (m *HostPortMap) get(ip string, port uint32) (*K8sContainerInfo, bool) {
key := IpPortKey{ip, port}
containerInfo, ok := m.hostPortInfo.Load(key)
key := ip + ":" + strconv.FormatUint(uint64(port), 10)
m.mutex.RLock()
defer m.mutex.RUnlock()
containerInfo, ok := m.HostPortInfo[key]
if !ok {
return nil, false
}
return containerInfo.(*K8sContainerInfo), true
return containerInfo, true
}

func (m *HostPortMap) delete(ip string, port uint32) {
key := IpPortKey{ip, port}
m.hostPortInfo.Delete(key)
key := ip + ":" + strconv.FormatUint(uint64(port), 10)
m.mutex.Lock()
defer m.mutex.Unlock()
delete(m.HostPortInfo, key)
}
100 changes: 85 additions & 15 deletions collector/pkg/metadata/kubernetes/initk8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kubernetes

import (
"fmt"
"log"
"net"
"net/http"
"os"
Expand All @@ -16,6 +17,8 @@ import (
// AuthType describes the type of authentication to use for the K8s API
type AuthType string

var ReWatch bool

const (
// AuthTypeNone means no auth is required
AuthTypeNone AuthType = "none"
Expand Down Expand Up @@ -74,26 +77,72 @@ func InitK8sHandler(options ...Option) error {
option(&k8sConfig)
}

clientSet, err := initClientSet(string(k8sConfig.KubeAuthType), k8sConfig.KubeConfigDir)
if err != nil {
retErr = fmt.Errorf("cannot connect to kubernetes: %w", err)
return
}
IsInitSuccess = true
go NodeWatch(clientSet)
time.Sleep(1 * time.Second)
if k8sConfig.EnableFetchReplicaSet {
go RsWatch(clientSet)
time.Sleep(1 * time.Second)
if k8sConfig.MetaDataProviderConfig != nil && k8sConfig.MetaDataProviderConfig.Enable {
retErr = initWatcherFromMetadataProvider(k8sConfig)
} else {
retErr = initWatcherFromAPIServer(k8sConfig)
}
go ServiceWatch(clientSet)
time.Sleep(1 * time.Second)
go PodWatch(clientSet, k8sConfig.GraceDeletePeriod)
time.Sleep(1 * time.Second)
})
return retErr
}

func initWatcherFromAPIServer(k8sConfig config) error {
clientSet, err := initClientSet(string(k8sConfig.KubeAuthType), k8sConfig.KubeConfigDir)
if err != nil {
return fmt.Errorf("cannot connect to kubernetes: %w", err)
}
IsInitSuccess = true
go NodeWatch(clientSet, k8sConfig.nodeEventHander)
time.Sleep(1 * time.Second)
if k8sConfig.EnableFetchReplicaSet {
go RsWatch(clientSet, k8sConfig.rsEventHander)
time.Sleep(1 * time.Second)
}
go ServiceWatch(clientSet, k8sConfig.serviceEventHander)
time.Sleep(1 * time.Second)
go PodWatch(clientSet, k8sConfig.GraceDeletePeriod, k8sConfig.podEventHander)
time.Sleep(1 * time.Second)
return nil
}

func initWatcherFromMetadataProvider(k8sConfig config) error {
stopCh := make(chan struct{})
// Enable PodDeleteGrace
go podDeleteLoop(10*time.Second, k8sConfig.GraceDeletePeriod, stopCh)
go watchFromMPWithRetry(k8sConfig)

// rewatch from MP every 30 minute
ReWatch = false
go func() {
ticker := time.NewTicker(30 * time.Minute)
for range ticker.C {
clearK8sMap()
ReWatch = true
}
}()
return nil
}

func watchFromMPWithRetry(k8sConfig config) {
for {
for i := 0; i < 3; i++ {
if err := k8sConfig.listAndWatchFromProvider(SetupCache); err == nil {
i = 0
// receiver ReWatch signal , clear cache and rewatch from MP
// TODO logger
log.Printf("clear K8sCache and rewatch from MP")
continue
} else {
log.Printf("listAndWatch From Provider failled! Error: %d", err)
}
}

// Failed after 3 times
log.Printf("listAndWatch From Provider failled for 3 time, will retry after 1 minute")
time.Sleep(1 * time.Minute)
}
}

func initClientSet(authType string, dir string) (*k8s.Clientset, error) {
return makeClient(APIConfig{
AuthType: AuthType(authType),
Expand Down Expand Up @@ -173,3 +222,24 @@ func createRestConfig(apiConf APIConfig) (*rest.Config, error) {

return authConf, nil
}

func clearK8sMap() {
GlobalPodInfo = newPodMap()
GlobalNodeInfo = newNodeMap()
GlobalRsInfo = newReplicaSetMap()
GlobalServiceInfo = newServiceMap()
}

func RLockMetadataCache() {
MetaDataCache.cMut.RLock()
MetaDataCache.pMut.RLock()
MetaDataCache.sMut.RLock()
MetaDataCache.HostPortInfo.mutex.RLock()
}

func RUnlockMetadataCache() {
MetaDataCache.HostPortInfo.mutex.RUnlock()
MetaDataCache.sMut.RUnlock()
MetaDataCache.pMut.RUnlock()
MetaDataCache.cMut.RUnlock()
}
16 changes: 8 additions & 8 deletions collector/pkg/metadata/kubernetes/initk8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ func TestWatch(t *testing.T) {
if err != nil {
t.Fatalf("cannot init clientSet, %s", err)
}
go NodeWatch(clientSet)
go RsWatch(clientSet)
go ServiceWatch(clientSet)
go PodWatch(clientSet, 60*time.Second)
go NodeWatch(clientSet, nil)
go RsWatch(clientSet, nil)
go ServiceWatch(clientSet, nil)
go PodWatch(clientSet, 60*time.Second, nil)
time.Sleep(2 * time.Second)
content, _ := json.Marshal(globalRsInfo)
content, _ := json.Marshal(GlobalRsInfo)
fmt.Println(string(content))
content, _ = json.Marshal(globalServiceInfo)
content, _ = json.Marshal(GlobalServiceInfo)
fmt.Println(string(content))
content, _ = json.Marshal(globalPodInfo)
content, _ = json.Marshal(GlobalPodInfo)
fmt.Println(string(content))
content, _ = json.Marshal(globalNodeInfo)
content, _ = json.Marshal(GlobalNodeInfo)
fmt.Println(string(content))
}
Loading

0 comments on commit f55d64d

Please sign in to comment.