Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add custom k8s metadata collector #580

Merged
37 changes: 37 additions & 0 deletions collector/cmd/metadata-provider/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package main

import (
"flag"
"fmt"
"log"
"net/http"
_ "net/http/pprof"

"github.com/Kindling-project/kindling/collector/pkg/metadata/kubernetes"
"github.com/Kindling-project/kindling/collector/pkg/metadata/metaprovider/service"
)

func main() {
authType := flag.String("authType", "serviceAccount", "AuthType describes the type of authentication to use for the K8s API, support 'kubeConfig' or 'serviceAccount'. ")
kubeConfigPath := flag.String("kubeConfig", "/root/.kube/config", "kubeConfig describe the filePath to your kubeConfig,only used when authType is 'kubeConfig'")
httpPort := flag.Int("http-port", 9504, "port describe which port will be used to expose data")
enableFetchReplicaset := flag.Bool("enableFetchReplicaset", false, "controls whether to fetch ReplicaSet information. The default value is false. It should be enabled if the ReplicaSet is used to control pods in the third-party CRD except for Deployment.")
logInterval := flag.Int("logInterval", 120, "Interval(Second) to show how many event mp received, default 120s")

flag.Parse()

config := &service.Config{
KubeAuthType: kubernetes.AuthType(*authType),
KubeConfigDir: *kubeConfigPath,
EnableFetchReplicaSet: *enableFetchReplicaset,
LogInterval: *logInterval,
}

if mdw, err := service.NewMetaDataWrapper(config); err != nil {
log.Fatalf("create MetaData Wrapper failed, err: %v", err)
} else {
http.HandleFunc("/listAndWatch", mdw.ListAndWatch)
log.Printf("[http] service start at port: %d", *httpPort)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *httpPort), nil))
}
}
14 changes: 14 additions & 0 deletions collector/docker/Dockerfile-metadata-provider
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM golang:1.20.2-bullseye AS builder
WORKDIR /build

ENV GOPROXY https://goproxy.cn
COPY go.mod go.sum ./
RUN go mod download && go mod verify

COPY . .
RUN go build -v -o metadata-provider ./cmd/metadata-provider

FROM debian:bullseye-slim AS runner
WORKDIR /app
COPY --from=builder /build/metadata-provider /app/
CMD ["/app/metadata-provider"]
14 changes: 10 additions & 4 deletions collector/pkg/component/consumer/processor/k8sprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@ type Config struct {
// Set "Enable" false if you want to run the agent in the non-Kubernetes environment.
// Otherwise, the agent will panic if it can't connect to the API-server.
Enable bool `mapstructure:"enable"`

// MetaDataProviderConfig is optional config to use another source of K8sMetadata named metadata-provider
// Used to reduce the stress caused by agent directly on APIServer
// Set "metadata_provider_config.enable" true and "metadata_provider_config.endpoint" as target service to enable it
MetaDataProviderConfig *kubernetes.MetaDataProviderConfig `mapstructure:"metadata_provider_config"`
}

var DefaultConfig Config = Config{
KubeAuthType: "serviceAccount",
KubeConfigDir: "/root/.kube/config",
GraceDeletePeriod: 60,
Enable: true,
KubeAuthType: "serviceAccount",
KubeConfigDir: "/root/.kube/config",
GraceDeletePeriod: 60,
Enable: true,
MetaDataProviderConfig: &kubernetes.MetaDataProviderConfig{Enable: false, EnableTrace: false, Endpoint: ""},
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/Kindling-project/kindling/collector/pkg/model"
"github.com/Kindling-project/kindling/collector/pkg/model/constlabels"
"github.com/Kindling-project/kindling/collector/pkg/model/constnames"

mpclient "github.com/Kindling-project/kindling/collector/pkg/metadata/metaprovider/client"
)

const (
Expand Down Expand Up @@ -49,6 +51,10 @@ func NewKubernetesProcessor(cfg interface{}, telemetry *component.TelemetryTools
kubernetes.WithGraceDeletePeriod(config.GraceDeletePeriod),
kubernetes.WithFetchReplicaSet(config.EnableFetchReplicaSet),
)
if config.MetaDataProviderConfig != nil && config.MetaDataProviderConfig.Enable {
cli := mpclient.NewMetaDataWrapperClient(config.MetaDataProviderConfig.Endpoint, config.MetaDataProviderConfig.EnableTrace)
options = append(options, kubernetes.WithMetaDataProviderConfig(config.MetaDataProviderConfig, cli.ListAndWatch))
}
err := kubernetes.InitK8sHandler(options...)
if err != nil {
telemetry.Logger.Panicf("Failed to initialize [%s]: %v. Set the option 'enable' false if you want to run the agent in the non-Kubernetes environment.", K8sMetadata, err)
Expand Down
54 changes: 53 additions & 1 deletion collector/pkg/metadata/kubernetes/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package kubernetes

import "time"
import (
"time"

"k8s.io/client-go/tools/cache"
)

// config contains optional settings for connecting to kubernetes.
type config struct {
Expand All @@ -14,6 +18,23 @@ type config struct {
// The default value is false. It should be enabled if the ReplicaSet
// is used to control pods in the third-party CRD except for Deployment.
EnableFetchReplicaSet bool

MetaDataProviderConfig *MetaDataProviderConfig `mapstructure:"metadata_provider_config"`

listAndWatchFromProvider func(setup SetPreprocessingMetaDataCache) error
podEventHander cache.ResourceEventHandler
rsEventHander cache.ResourceEventHandler
nodeEventHander cache.ResourceEventHandler
serviceEventHander cache.ResourceEventHandler
}

type MetaDataProviderConfig struct {
Enable bool `mapstructure:"enable"`
// print every K8s Metadata received from mp, used for debug
EnableTrace bool `mapstructure:"enable_trace"`
// Endpoint is where metadata-provider deloyed and provide service
// e.g "http://localhost:9504"
Endpoint string `mapstructure:"endpoint"`
}

type Option func(cfg *config)
Expand Down Expand Up @@ -47,3 +68,34 @@ func WithFetchReplicaSet(fetch bool) Option {
cfg.EnableFetchReplicaSet = fetch
}
}

func WithMetaDataProviderConfig(mpCfg *MetaDataProviderConfig, listAndWatch func(SetPreprocessingMetaDataCache) error) Option {
return func(cfg *config) {
cfg.MetaDataProviderConfig = mpCfg
cfg.listAndWatchFromProvider = listAndWatch
}
}

func WithPodEventHander(handler cache.ResourceEventHandler) Option {
return func(cfg *config) {
cfg.podEventHander = handler
}
}

func WithServiceEventHander(handler cache.ResourceEventHandler) Option {
return func(cfg *config) {
cfg.serviceEventHander = handler
}
}

func WithNodeEventHander(handler cache.ResourceEventHandler) Option {
return func(cfg *config) {
cfg.nodeEventHander = handler
}
}

func WithReplicaSetEventHander(handler cache.ResourceEventHandler) Option {
return func(cfg *config) {
cfg.rsEventHander = handler
}
}
32 changes: 21 additions & 11 deletions collector/pkg/metadata/kubernetes/hostport_map.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,47 @@
package kubernetes

import "sync"
import (
"strconv"
"sync"
)

type IpPortKey struct {
Ip string
Port uint32
}

type HostPortMap struct {
hostPortInfo sync.Map
HostPortInfo map[string]*K8sContainerInfo
mutex sync.RWMutex
}

func newHostPortMap() *HostPortMap {
func NewHostPortMap() *HostPortMap {
return &HostPortMap{
hostPortInfo: sync.Map{},
HostPortInfo: make(map[string]*K8sContainerInfo),
}
}

func (m *HostPortMap) add(ip string, port uint32, containerInfo *K8sContainerInfo) {
key := IpPortKey{ip, port}
m.hostPortInfo.Store(key, containerInfo)
key := ip + ":" + strconv.FormatUint(uint64(port), 10)
m.mutex.Lock()
defer m.mutex.Unlock()
m.HostPortInfo[key] = containerInfo
}

func (m *HostPortMap) get(ip string, port uint32) (*K8sContainerInfo, bool) {
key := IpPortKey{ip, port}
containerInfo, ok := m.hostPortInfo.Load(key)
key := ip + ":" + strconv.FormatUint(uint64(port), 10)
m.mutex.RLock()
defer m.mutex.RUnlock()
containerInfo, ok := m.HostPortInfo[key]
if !ok {
return nil, false
}
return containerInfo.(*K8sContainerInfo), true
return containerInfo, true
}

func (m *HostPortMap) delete(ip string, port uint32) {
key := IpPortKey{ip, port}
m.hostPortInfo.Delete(key)
key := ip + ":" + strconv.FormatUint(uint64(port), 10)
m.mutex.Lock()
defer m.mutex.Unlock()
delete(m.HostPortInfo, key)
}
100 changes: 85 additions & 15 deletions collector/pkg/metadata/kubernetes/initk8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kubernetes

import (
"fmt"
"log"
"net"
"net/http"
"os"
Expand All @@ -16,6 +17,8 @@ import (
// AuthType describes the type of authentication to use for the K8s API
type AuthType string

var ReWatch bool

const (
// AuthTypeNone means no auth is required
AuthTypeNone AuthType = "none"
Expand Down Expand Up @@ -74,26 +77,72 @@ func InitK8sHandler(options ...Option) error {
option(&k8sConfig)
}

clientSet, err := initClientSet(string(k8sConfig.KubeAuthType), k8sConfig.KubeConfigDir)
if err != nil {
retErr = fmt.Errorf("cannot connect to kubernetes: %w", err)
return
}
IsInitSuccess = true
go NodeWatch(clientSet)
time.Sleep(1 * time.Second)
if k8sConfig.EnableFetchReplicaSet {
go RsWatch(clientSet)
time.Sleep(1 * time.Second)
if k8sConfig.MetaDataProviderConfig != nil && k8sConfig.MetaDataProviderConfig.Enable {
retErr = initWatcherFromMetadataProvider(k8sConfig)
} else {
retErr = initWatcherFromAPIServer(k8sConfig)
}
go ServiceWatch(clientSet)
time.Sleep(1 * time.Second)
go PodWatch(clientSet, k8sConfig.GraceDeletePeriod)
time.Sleep(1 * time.Second)
})
return retErr
}

func initWatcherFromAPIServer(k8sConfig config) error {
clientSet, err := initClientSet(string(k8sConfig.KubeAuthType), k8sConfig.KubeConfigDir)
if err != nil {
return fmt.Errorf("cannot connect to kubernetes: %w", err)
}
IsInitSuccess = true
go NodeWatch(clientSet, k8sConfig.nodeEventHander)
time.Sleep(1 * time.Second)
if k8sConfig.EnableFetchReplicaSet {
go RsWatch(clientSet, k8sConfig.rsEventHander)
time.Sleep(1 * time.Second)
}
go ServiceWatch(clientSet, k8sConfig.serviceEventHander)
time.Sleep(1 * time.Second)
go PodWatch(clientSet, k8sConfig.GraceDeletePeriod, k8sConfig.podEventHander)
time.Sleep(1 * time.Second)
return nil
}

func initWatcherFromMetadataProvider(k8sConfig config) error {
stopCh := make(chan struct{})
// Enable PodDeleteGrace
go podDeleteLoop(10*time.Second, k8sConfig.GraceDeletePeriod, stopCh)
go watchFromMPWithRetry(k8sConfig)

// rewatch from MP every 30 minute
ReWatch = false
go func() {
ticker := time.NewTicker(30 * time.Minute)
for range ticker.C {
clearK8sMap()
ReWatch = true
}
}()
return nil
}

func watchFromMPWithRetry(k8sConfig config) {
for {
for i := 0; i < 3; i++ {
if err := k8sConfig.listAndWatchFromProvider(SetupCache); err == nil {
i = 0
// receiver ReWatch signal , clear cache and rewatch from MP
// TODO logger
log.Printf("clear K8sCache and rewatch from MP")
continue
} else {
log.Printf("listAndWatch From Provider failled! Error: %d", err)
}
}

// Failed after 3 times
log.Printf("listAndWatch From Provider failled for 3 time, will retry after 1 minute")
time.Sleep(1 * time.Minute)
}
}

func initClientSet(authType string, dir string) (*k8s.Clientset, error) {
return makeClient(APIConfig{
AuthType: AuthType(authType),
Expand Down Expand Up @@ -173,3 +222,24 @@ func createRestConfig(apiConf APIConfig) (*rest.Config, error) {

return authConf, nil
}

func clearK8sMap() {
GlobalPodInfo = newPodMap()
GlobalNodeInfo = newNodeMap()
GlobalRsInfo = newReplicaSetMap()
GlobalServiceInfo = newServiceMap()
}

func RLockMetadataCache() {
MetaDataCache.cMut.RLock()
MetaDataCache.pMut.RLock()
MetaDataCache.sMut.RLock()
MetaDataCache.HostPortInfo.mutex.RLock()
}

func RUnlockMetadataCache() {
MetaDataCache.HostPortInfo.mutex.RUnlock()
MetaDataCache.sMut.RUnlock()
MetaDataCache.pMut.RUnlock()
MetaDataCache.cMut.RUnlock()
}
16 changes: 8 additions & 8 deletions collector/pkg/metadata/kubernetes/initk8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ func TestWatch(t *testing.T) {
if err != nil {
t.Fatalf("cannot init clientSet, %s", err)
}
go NodeWatch(clientSet)
go RsWatch(clientSet)
go ServiceWatch(clientSet)
go PodWatch(clientSet, 60*time.Second)
go NodeWatch(clientSet, nil)
go RsWatch(clientSet, nil)
go ServiceWatch(clientSet, nil)
go PodWatch(clientSet, 60*time.Second, nil)
time.Sleep(2 * time.Second)
content, _ := json.Marshal(globalRsInfo)
content, _ := json.Marshal(GlobalRsInfo)
fmt.Println(string(content))
content, _ = json.Marshal(globalServiceInfo)
content, _ = json.Marshal(GlobalServiceInfo)
fmt.Println(string(content))
content, _ = json.Marshal(globalPodInfo)
content, _ = json.Marshal(GlobalPodInfo)
fmt.Println(string(content))
content, _ = json.Marshal(globalNodeInfo)
content, _ = json.Marshal(GlobalNodeInfo)
fmt.Println(string(content))
}
Loading