diff --git a/collector/cmd/metadata-provider/main.go b/collector/cmd/metadata-provider/main.go new file mode 100644 index 000000000..ddc35d7f0 --- /dev/null +++ b/collector/cmd/metadata-provider/main.go @@ -0,0 +1,37 @@ +package main + +import ( + "flag" + "fmt" + "log" + "net/http" + _ "net/http/pprof" + + "github.com/Kindling-project/kindling/collector/pkg/metadata/kubernetes" + "github.com/Kindling-project/kindling/collector/pkg/metadata/metaprovider/service" +) + +func main() { + authType := flag.String("authType", "serviceAccount", "AuthType describes the type of authentication to use for the K8s API, support 'kubeConfig' or 'serviceAccount'. ") + kubeConfigPath := flag.String("kubeConfig", "/root/.kube/config", "kubeConfig describe the filePath to your kubeConfig,only used when authType is 'kubeConfig'") + httpPort := flag.Int("http-port", 9504, "port describe which port will be used to expose data") + enableFetchReplicaset := flag.Bool("enableFetchReplicaset", false, "controls whether to fetch ReplicaSet information. The default value is false. It should be enabled if the ReplicaSet is used to control pods in the third-party CRD except for Deployment.") + logInterval := flag.Int("logInterval", 120, "Interval(Second) to show how many event mp received, default 120s") + + flag.Parse() + + config := &service.Config{ + KubeAuthType: kubernetes.AuthType(*authType), + KubeConfigDir: *kubeConfigPath, + EnableFetchReplicaSet: *enableFetchReplicaset, + LogInterval: *logInterval, + } + + if mdw, err := service.NewMetaDataWrapper(config); err != nil { + log.Fatalf("create MetaData Wrapper failed, err: %v", err) + } else { + http.HandleFunc("/listAndWatch", mdw.ListAndWatch) + log.Printf("[http] service start at port: %d", *httpPort) + log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *httpPort), nil)) + } +} diff --git a/collector/docker/Dockerfile-metadata-provider b/collector/docker/Dockerfile-metadata-provider new file mode 100644 index 000000000..91f4c4c42 --- /dev/null +++ b/collector/docker/Dockerfile-metadata-provider @@ -0,0 +1,14 @@ +FROM golang:1.20.2-bullseye AS builder +WORKDIR /build + +ENV GOPROXY https://goproxy.cn +COPY go.mod go.sum ./ +RUN go mod download && go mod verify + +COPY . . +RUN go build -v -o metadata-provider ./cmd/metadata-provider + +FROM debian:bullseye-slim AS runner +WORKDIR /app +COPY --from=builder /build/metadata-provider /app/ +CMD ["/app/metadata-provider"] \ No newline at end of file diff --git a/collector/pkg/component/consumer/processor/k8sprocessor/config.go b/collector/pkg/component/consumer/processor/k8sprocessor/config.go index 709416c8c..2c68d84e8 100644 --- a/collector/pkg/component/consumer/processor/k8sprocessor/config.go +++ b/collector/pkg/component/consumer/processor/k8sprocessor/config.go @@ -18,11 +18,17 @@ type Config struct { // Set "Enable" false if you want to run the agent in the non-Kubernetes environment. // Otherwise, the agent will panic if it can't connect to the API-server. Enable bool `mapstructure:"enable"` + + // MetaDataProviderConfig is optional config to use another source of K8sMetadata named metadata-provider + // Used to reduce the stress caused by agent directly on APIServer + // Set "metadata_provider_config.enable" true and "metadata_provider_config.endpoint" as target service to enable it + MetaDataProviderConfig *kubernetes.MetaDataProviderConfig `mapstructure:"metadata_provider_config"` } var DefaultConfig Config = Config{ - KubeAuthType: "serviceAccount", - KubeConfigDir: "/root/.kube/config", - GraceDeletePeriod: 60, - Enable: true, + KubeAuthType: "serviceAccount", + KubeConfigDir: "/root/.kube/config", + GraceDeletePeriod: 60, + Enable: true, + MetaDataProviderConfig: &kubernetes.MetaDataProviderConfig{Enable: false, EnableTrace: false, Endpoint: ""}, } diff --git a/collector/pkg/component/consumer/processor/k8sprocessor/kubernetes_processor.go b/collector/pkg/component/consumer/processor/k8sprocessor/kubernetes_processor.go index 3d85bc71b..baa1ba4b7 100644 --- a/collector/pkg/component/consumer/processor/k8sprocessor/kubernetes_processor.go +++ b/collector/pkg/component/consumer/processor/k8sprocessor/kubernetes_processor.go @@ -12,6 +12,8 @@ import ( "github.com/Kindling-project/kindling/collector/pkg/model" "github.com/Kindling-project/kindling/collector/pkg/model/constlabels" "github.com/Kindling-project/kindling/collector/pkg/model/constnames" + + mpclient "github.com/Kindling-project/kindling/collector/pkg/metadata/metaprovider/client" ) const ( @@ -49,6 +51,10 @@ func NewKubernetesProcessor(cfg interface{}, telemetry *component.TelemetryTools kubernetes.WithGraceDeletePeriod(config.GraceDeletePeriod), kubernetes.WithFetchReplicaSet(config.EnableFetchReplicaSet), ) + if config.MetaDataProviderConfig != nil && config.MetaDataProviderConfig.Enable { + cli := mpclient.NewMetaDataWrapperClient(config.MetaDataProviderConfig.Endpoint, config.MetaDataProviderConfig.EnableTrace) + options = append(options, kubernetes.WithMetaDataProviderConfig(config.MetaDataProviderConfig, cli.ListAndWatch)) + } err := kubernetes.InitK8sHandler(options...) if err != nil { telemetry.Logger.Panicf("Failed to initialize [%s]: %v. Set the option 'enable' false if you want to run the agent in the non-Kubernetes environment.", K8sMetadata, err) diff --git a/collector/pkg/metadata/kubernetes/config.go b/collector/pkg/metadata/kubernetes/config.go index bb6bfbeec..77e7ff857 100644 --- a/collector/pkg/metadata/kubernetes/config.go +++ b/collector/pkg/metadata/kubernetes/config.go @@ -1,6 +1,10 @@ package kubernetes -import "time" +import ( + "time" + + "k8s.io/client-go/tools/cache" +) // config contains optional settings for connecting to kubernetes. type config struct { @@ -14,6 +18,23 @@ type config struct { // The default value is false. It should be enabled if the ReplicaSet // is used to control pods in the third-party CRD except for Deployment. EnableFetchReplicaSet bool + + MetaDataProviderConfig *MetaDataProviderConfig `mapstructure:"metadata_provider_config"` + + listAndWatchFromProvider func(setup SetPreprocessingMetaDataCache) error + podEventHander cache.ResourceEventHandler + rsEventHander cache.ResourceEventHandler + nodeEventHander cache.ResourceEventHandler + serviceEventHander cache.ResourceEventHandler +} + +type MetaDataProviderConfig struct { + Enable bool `mapstructure:"enable"` + // print every K8s Metadata received from mp, used for debug + EnableTrace bool `mapstructure:"enable_trace"` + // Endpoint is where metadata-provider deloyed and provide service + // e.g "http://localhost:9504" + Endpoint string `mapstructure:"endpoint"` } type Option func(cfg *config) @@ -47,3 +68,34 @@ func WithFetchReplicaSet(fetch bool) Option { cfg.EnableFetchReplicaSet = fetch } } + +func WithMetaDataProviderConfig(mpCfg *MetaDataProviderConfig, listAndWatch func(SetPreprocessingMetaDataCache) error) Option { + return func(cfg *config) { + cfg.MetaDataProviderConfig = mpCfg + cfg.listAndWatchFromProvider = listAndWatch + } +} + +func WithPodEventHander(handler cache.ResourceEventHandler) Option { + return func(cfg *config) { + cfg.podEventHander = handler + } +} + +func WithServiceEventHander(handler cache.ResourceEventHandler) Option { + return func(cfg *config) { + cfg.serviceEventHander = handler + } +} + +func WithNodeEventHander(handler cache.ResourceEventHandler) Option { + return func(cfg *config) { + cfg.nodeEventHander = handler + } +} + +func WithReplicaSetEventHander(handler cache.ResourceEventHandler) Option { + return func(cfg *config) { + cfg.rsEventHander = handler + } +} diff --git a/collector/pkg/metadata/kubernetes/hostport_map.go b/collector/pkg/metadata/kubernetes/hostport_map.go index 2203dd931..97cf1e9d6 100644 --- a/collector/pkg/metadata/kubernetes/hostport_map.go +++ b/collector/pkg/metadata/kubernetes/hostport_map.go @@ -1,6 +1,9 @@ package kubernetes -import "sync" +import ( + "strconv" + "sync" +) type IpPortKey struct { Ip string @@ -8,30 +11,37 @@ type IpPortKey struct { } type HostPortMap struct { - hostPortInfo sync.Map + HostPortInfo map[string]*K8sContainerInfo + mutex sync.RWMutex } -func newHostPortMap() *HostPortMap { +func NewHostPortMap() *HostPortMap { return &HostPortMap{ - hostPortInfo: sync.Map{}, + HostPortInfo: make(map[string]*K8sContainerInfo), } } func (m *HostPortMap) add(ip string, port uint32, containerInfo *K8sContainerInfo) { - key := IpPortKey{ip, port} - m.hostPortInfo.Store(key, containerInfo) + key := ip + ":" + strconv.FormatUint(uint64(port), 10) + m.mutex.Lock() + defer m.mutex.Unlock() + m.HostPortInfo[key] = containerInfo } func (m *HostPortMap) get(ip string, port uint32) (*K8sContainerInfo, bool) { - key := IpPortKey{ip, port} - containerInfo, ok := m.hostPortInfo.Load(key) + key := ip + ":" + strconv.FormatUint(uint64(port), 10) + m.mutex.RLock() + defer m.mutex.RUnlock() + containerInfo, ok := m.HostPortInfo[key] if !ok { return nil, false } - return containerInfo.(*K8sContainerInfo), true + return containerInfo, true } func (m *HostPortMap) delete(ip string, port uint32) { - key := IpPortKey{ip, port} - m.hostPortInfo.Delete(key) + key := ip + ":" + strconv.FormatUint(uint64(port), 10) + m.mutex.Lock() + defer m.mutex.Unlock() + delete(m.HostPortInfo, key) } diff --git a/collector/pkg/metadata/kubernetes/initk8s.go b/collector/pkg/metadata/kubernetes/initk8s.go index b6f430a91..dd984ede1 100644 --- a/collector/pkg/metadata/kubernetes/initk8s.go +++ b/collector/pkg/metadata/kubernetes/initk8s.go @@ -2,6 +2,7 @@ package kubernetes import ( "fmt" + "log" "net" "net/http" "os" @@ -16,6 +17,8 @@ import ( // AuthType describes the type of authentication to use for the K8s API type AuthType string +var ReWatch bool + const ( // AuthTypeNone means no auth is required AuthTypeNone AuthType = "none" @@ -74,26 +77,72 @@ func InitK8sHandler(options ...Option) error { option(&k8sConfig) } - clientSet, err := initClientSet(string(k8sConfig.KubeAuthType), k8sConfig.KubeConfigDir) - if err != nil { - retErr = fmt.Errorf("cannot connect to kubernetes: %w", err) - return - } - IsInitSuccess = true - go NodeWatch(clientSet) - time.Sleep(1 * time.Second) - if k8sConfig.EnableFetchReplicaSet { - go RsWatch(clientSet) - time.Sleep(1 * time.Second) + if k8sConfig.MetaDataProviderConfig != nil && k8sConfig.MetaDataProviderConfig.Enable { + retErr = initWatcherFromMetadataProvider(k8sConfig) + } else { + retErr = initWatcherFromAPIServer(k8sConfig) } - go ServiceWatch(clientSet) - time.Sleep(1 * time.Second) - go PodWatch(clientSet, k8sConfig.GraceDeletePeriod) - time.Sleep(1 * time.Second) }) return retErr } +func initWatcherFromAPIServer(k8sConfig config) error { + clientSet, err := initClientSet(string(k8sConfig.KubeAuthType), k8sConfig.KubeConfigDir) + if err != nil { + return fmt.Errorf("cannot connect to kubernetes: %w", err) + } + IsInitSuccess = true + go NodeWatch(clientSet, k8sConfig.nodeEventHander) + time.Sleep(1 * time.Second) + if k8sConfig.EnableFetchReplicaSet { + go RsWatch(clientSet, k8sConfig.rsEventHander) + time.Sleep(1 * time.Second) + } + go ServiceWatch(clientSet, k8sConfig.serviceEventHander) + time.Sleep(1 * time.Second) + go PodWatch(clientSet, k8sConfig.GraceDeletePeriod, k8sConfig.podEventHander) + time.Sleep(1 * time.Second) + return nil +} + +func initWatcherFromMetadataProvider(k8sConfig config) error { + stopCh := make(chan struct{}) + // Enable PodDeleteGrace + go podDeleteLoop(10*time.Second, k8sConfig.GraceDeletePeriod, stopCh) + go watchFromMPWithRetry(k8sConfig) + + // rewatch from MP every 30 minute + ReWatch = false + go func() { + ticker := time.NewTicker(30 * time.Minute) + for range ticker.C { + clearK8sMap() + ReWatch = true + } + }() + return nil +} + +func watchFromMPWithRetry(k8sConfig config) { + for { + for i := 0; i < 3; i++ { + if err := k8sConfig.listAndWatchFromProvider(SetupCache); err == nil { + i = 0 + // receiver ReWatch signal , clear cache and rewatch from MP + // TODO logger + log.Printf("clear K8sCache and rewatch from MP") + continue + } else { + log.Printf("listAndWatch From Provider failled! Error: %d", err) + } + } + + // Failed after 3 times + log.Printf("listAndWatch From Provider failled for 3 time, will retry after 1 minute") + time.Sleep(1 * time.Minute) + } +} + func initClientSet(authType string, dir string) (*k8s.Clientset, error) { return makeClient(APIConfig{ AuthType: AuthType(authType), @@ -173,3 +222,24 @@ func createRestConfig(apiConf APIConfig) (*rest.Config, error) { return authConf, nil } + +func clearK8sMap() { + GlobalPodInfo = newPodMap() + GlobalNodeInfo = newNodeMap() + GlobalRsInfo = newReplicaSetMap() + GlobalServiceInfo = newServiceMap() +} + +func RLockMetadataCache() { + MetaDataCache.cMut.RLock() + MetaDataCache.pMut.RLock() + MetaDataCache.sMut.RLock() + MetaDataCache.HostPortInfo.mutex.RLock() +} + +func RUnlockMetadataCache() { + MetaDataCache.HostPortInfo.mutex.RUnlock() + MetaDataCache.sMut.RUnlock() + MetaDataCache.pMut.RUnlock() + MetaDataCache.cMut.RUnlock() +} diff --git a/collector/pkg/metadata/kubernetes/initk8s_test.go b/collector/pkg/metadata/kubernetes/initk8s_test.go index 4c10b553b..beab4b94e 100644 --- a/collector/pkg/metadata/kubernetes/initk8s_test.go +++ b/collector/pkg/metadata/kubernetes/initk8s_test.go @@ -12,17 +12,17 @@ func TestWatch(t *testing.T) { if err != nil { t.Fatalf("cannot init clientSet, %s", err) } - go NodeWatch(clientSet) - go RsWatch(clientSet) - go ServiceWatch(clientSet) - go PodWatch(clientSet, 60*time.Second) + go NodeWatch(clientSet, nil) + go RsWatch(clientSet, nil) + go ServiceWatch(clientSet, nil) + go PodWatch(clientSet, 60*time.Second, nil) time.Sleep(2 * time.Second) - content, _ := json.Marshal(globalRsInfo) + content, _ := json.Marshal(GlobalRsInfo) fmt.Println(string(content)) - content, _ = json.Marshal(globalServiceInfo) + content, _ = json.Marshal(GlobalServiceInfo) fmt.Println(string(content)) - content, _ = json.Marshal(globalPodInfo) + content, _ = json.Marshal(GlobalPodInfo) fmt.Println(string(content)) - content, _ = json.Marshal(globalNodeInfo) + content, _ = json.Marshal(GlobalNodeInfo) fmt.Println(string(content)) } diff --git a/collector/pkg/metadata/kubernetes/k8scache.go b/collector/pkg/metadata/kubernetes/k8scache.go index 4083094be..2fdb93b9e 100644 --- a/collector/pkg/metadata/kubernetes/k8scache.go +++ b/collector/pkg/metadata/kubernetes/k8scache.go @@ -6,6 +6,8 @@ import ( "sync" ) +type SetPreprocessingMetaDataCache func(cache *K8sMetaDataCache, nodeMap *NodeMap, serviceMap *ServiceMap, rsMap *ReplicaSetMap) + type K8sContainerInfo struct { ContainerId string Name string @@ -54,7 +56,7 @@ func (s *K8sServiceInfo) emptySelf() { type K8sMetaDataCache struct { cMut sync.RWMutex - containerIdInfo map[string]*K8sContainerInfo + ContainerIdInfo map[string]*K8sContainerInfo // // "192.168.1.14": { // podIp // 9093: k8sResInfo, @@ -69,20 +71,20 @@ type K8sMetaDataCache struct { // } //} pMut sync.RWMutex - ipContainerInfo map[string]map[uint32]*K8sContainerInfo + IpContainerInfo map[string]map[uint32]*K8sContainerInfo sMut sync.RWMutex - ipServiceInfo map[string]map[uint32]*K8sServiceInfo + IpServiceInfo map[string]map[uint32]*K8sServiceInfo - hostPortInfo *HostPortMap + HostPortInfo *HostPortMap } func New() *K8sMetaDataCache { c := &K8sMetaDataCache{ - containerIdInfo: make(map[string]*K8sContainerInfo), - ipContainerInfo: make(map[string]map[uint32]*K8sContainerInfo), - ipServiceInfo: make(map[string]map[uint32]*K8sServiceInfo), - hostPortInfo: newHostPortMap(), + ContainerIdInfo: make(map[string]*K8sContainerInfo), + IpContainerInfo: make(map[string]map[uint32]*K8sContainerInfo), + IpServiceInfo: make(map[string]map[uint32]*K8sServiceInfo), + HostPortInfo: NewHostPortMap(), } return c @@ -90,13 +92,13 @@ func New() *K8sMetaDataCache { func (c *K8sMetaDataCache) AddByContainerId(containerId string, resource *K8sContainerInfo) { c.cMut.Lock() - c.containerIdInfo[containerId] = resource + c.ContainerIdInfo[containerId] = resource c.cMut.Unlock() } func (c *K8sMetaDataCache) GetByContainerId(containerId string) (*K8sContainerInfo, bool) { c.cMut.RLock() - res, ok := c.containerIdInfo[containerId] + res, ok := c.ContainerIdInfo[containerId] c.cMut.RUnlock() if ok { return res, ok @@ -106,7 +108,7 @@ func (c *K8sMetaDataCache) GetByContainerId(containerId string) (*K8sContainerIn func (c *K8sMetaDataCache) GetPodByContainerId(containerId string) (*K8sPodInfo, bool) { c.cMut.RLock() - containerInfo, ok := c.containerIdInfo[containerId] + containerInfo, ok := c.ContainerIdInfo[containerId] c.cMut.RUnlock() if ok { return containerInfo.RefPodInfo, ok @@ -116,13 +118,13 @@ func (c *K8sMetaDataCache) GetPodByContainerId(containerId string) (*K8sPodInfo, func (c *K8sMetaDataCache) DeleteByContainerId(containerId string) { c.cMut.Lock() - delete(c.containerIdInfo, containerId) + delete(c.ContainerIdInfo, containerId) c.cMut.Unlock() } func (c *K8sMetaDataCache) AddContainerByIpPort(ip string, port uint32, resource *K8sContainerInfo) { c.pMut.RLock() - portContainerInfo, ok := c.ipContainerInfo[ip] + portContainerInfo, ok := c.IpContainerInfo[ip] c.pMut.RUnlock() if ok { c.pMut.Lock() @@ -132,14 +134,14 @@ func (c *K8sMetaDataCache) AddContainerByIpPort(ip string, port uint32, resource portContainerInfo = make(map[uint32]*K8sContainerInfo) portContainerInfo[port] = resource c.pMut.Lock() - c.ipContainerInfo[ip] = portContainerInfo + c.IpContainerInfo[ip] = portContainerInfo c.pMut.Unlock() } } func (c *K8sMetaDataCache) GetContainerByIpPort(ip string, port uint32) (*K8sContainerInfo, bool) { c.pMut.RLock() - portContainerInfo, ok := c.ipContainerInfo[ip] + portContainerInfo, ok := c.IpContainerInfo[ip] defer c.pMut.RUnlock() if !ok { return nil, false @@ -176,7 +178,7 @@ func (c *K8sMetaDataCache) GetPodByIpPort(ip string, port uint32) (*K8sPodInfo, func (c *K8sMetaDataCache) GetPodByIp(ip string) (*K8sPodInfo, bool) { c.pMut.RLock() - portContainerInfo, ok := c.ipContainerInfo[ip] + portContainerInfo, ok := c.IpContainerInfo[ip] defer c.pMut.RUnlock() if !ok { return nil, false @@ -192,7 +194,7 @@ func (c *K8sMetaDataCache) GetPodByIp(ip string) (*K8sPodInfo, bool) { func (c *K8sMetaDataCache) DeleteContainerByIpPort(ip string, port uint32) { c.pMut.RLock() - portContainerInfo, ok := c.ipContainerInfo[ip] + portContainerInfo, ok := c.IpContainerInfo[ip] c.pMut.RUnlock() if !ok { return @@ -200,26 +202,26 @@ func (c *K8sMetaDataCache) DeleteContainerByIpPort(ip string, port uint32) { c.pMut.Lock() delete(portContainerInfo, port) if len(portContainerInfo) == 0 { - delete(c.ipContainerInfo, ip) + delete(c.IpContainerInfo, ip) } c.pMut.Unlock() } func (c *K8sMetaDataCache) AddContainerByHostIpPort(hostIp string, hostPort uint32, containerInfo *K8sContainerInfo) { - c.hostPortInfo.add(hostIp, hostPort, containerInfo) + c.HostPortInfo.add(hostIp, hostPort, containerInfo) } func (c *K8sMetaDataCache) GetContainerByHostIpPort(hostIp string, hostPort uint32) (*K8sContainerInfo, bool) { - return c.hostPortInfo.get(hostIp, hostPort) + return c.HostPortInfo.get(hostIp, hostPort) } func (c *K8sMetaDataCache) DeleteContainerByHostIpPort(hostIp string, hostPort uint32) { - c.hostPortInfo.delete(hostIp, hostPort) + c.HostPortInfo.delete(hostIp, hostPort) } func (c *K8sMetaDataCache) AddServiceByIpPort(ip string, port uint32, resource *K8sServiceInfo) { c.sMut.RLock() - portServiceInfo, ok := c.ipServiceInfo[ip] + portServiceInfo, ok := c.IpServiceInfo[ip] c.sMut.RUnlock() if ok { c.sMut.Lock() @@ -229,14 +231,14 @@ func (c *K8sMetaDataCache) AddServiceByIpPort(ip string, port uint32, resource * portServiceInfo = make(map[uint32]*K8sServiceInfo) portServiceInfo[port] = resource c.sMut.Lock() - c.ipServiceInfo[ip] = portServiceInfo + c.IpServiceInfo[ip] = portServiceInfo c.sMut.Unlock() } } func (c *K8sMetaDataCache) GetServiceByIpPort(ip string, port uint32) (*K8sServiceInfo, bool) { c.sMut.RLock() - portServiceInfo, ok := c.ipServiceInfo[ip] + portServiceInfo, ok := c.IpServiceInfo[ip] defer c.sMut.RUnlock() if !ok { return nil, false @@ -250,7 +252,7 @@ func (c *K8sMetaDataCache) GetServiceByIpPort(ip string, port uint32) (*K8sServi func (c *K8sMetaDataCache) DeleteServiceByIpPort(ip string, port uint32) { c.sMut.RLock() - portServiceInfo, ok := c.ipServiceInfo[ip] + portServiceInfo, ok := c.IpServiceInfo[ip] c.sMut.RUnlock() if !ok { return @@ -258,33 +260,62 @@ func (c *K8sMetaDataCache) DeleteServiceByIpPort(ip string, port uint32) { c.sMut.Lock() delete(portServiceInfo, port) if len(portServiceInfo) == 0 { - delete(c.ipServiceInfo, ip) + delete(c.IpServiceInfo, ip) } c.sMut.Unlock() } func (c *K8sMetaDataCache) ClearAll() { c.pMut.Lock() - c.ipContainerInfo = make(map[string]map[uint32]*K8sContainerInfo) + c.IpContainerInfo = make(map[string]map[uint32]*K8sContainerInfo) c.pMut.Unlock() c.sMut.Lock() - c.ipServiceInfo = make(map[string]map[uint32]*K8sServiceInfo) + c.IpServiceInfo = make(map[string]map[uint32]*K8sServiceInfo) c.sMut.Unlock() c.cMut.Lock() - c.containerIdInfo = make(map[string]*K8sContainerInfo) + c.ContainerIdInfo = make(map[string]*K8sContainerInfo) c.cMut.Unlock() } func (c *K8sMetaDataCache) String() string { - containerIdPodJson, _ := json.Marshal(c.containerIdInfo) - ipContainerJson, _ := json.Marshal(c.ipContainerInfo) - ipServiceJson, _ := json.Marshal(c.ipServiceInfo) + containerIdPodJson, _ := json.Marshal(c.ContainerIdInfo) + ipContainerJson, _ := json.Marshal(c.IpContainerInfo) + ipServiceJson, _ := json.Marshal(c.IpServiceInfo) return fmt.Sprintf("{\"containerIdPodInfo\": %s, \"ipContainerInfo\": %s, \"ipServiceInfo\": %s}", string(containerIdPodJson), string(ipContainerJson), string(ipServiceJson)) } func (c *K8sMetaDataCache) GetNodeNameByIp(ip string) (string, bool) { - return globalNodeInfo.getNodeName(ip) + return GlobalNodeInfo.getNodeName(ip) +} + +func SetupCache(cache *K8sMetaDataCache, nodeMap *NodeMap, serviceMap *ServiceMap, rsMap *ReplicaSetMap) { + if cache != nil { + if cache.ContainerIdInfo != nil { + for _, containersInfo := range cache.ContainerIdInfo { + GlobalPodInfo.add(containersInfo.RefPodInfo) + } + MetaDataCache.ContainerIdInfo = cache.ContainerIdInfo + } + if cache.HostPortInfo != nil { + MetaDataCache.HostPortInfo = cache.HostPortInfo + } + if cache.IpContainerInfo != nil { + MetaDataCache.IpContainerInfo = cache.IpContainerInfo + } + if cache.IpServiceInfo != nil { + MetaDataCache.IpServiceInfo = cache.IpServiceInfo + } + } + if nodeMap != nil { + GlobalNodeInfo = nodeMap + } + if serviceMap != nil { + GlobalServiceInfo = serviceMap + } + if GlobalRsInfo != nil { + GlobalRsInfo = rsMap + } } diff --git a/collector/pkg/metadata/kubernetes/k8scache_test.go b/collector/pkg/metadata/kubernetes/k8scache_test.go index 178d7d49a..9fa7b68a7 100644 --- a/collector/pkg/metadata/kubernetes/k8scache_test.go +++ b/collector/pkg/metadata/kubernetes/k8scache_test.go @@ -24,7 +24,7 @@ func TestK8sMetaDataCache_AddPodByIpPort(t *testing.T) { portContainerInfo := make(map[uint32]*K8sContainerInfo) portContainerInfo[port] = containerInfo cacheManual := New() - cacheManual.ipContainerInfo[containerInfo.RefPodInfo.Ip] = portContainerInfo + cacheManual.IpContainerInfo[containerInfo.RefPodInfo.Ip] = portContainerInfo cacheFunc := New() cacheFunc.AddContainerByIpPort(containerInfo.RefPodInfo.Ip, port, containerInfo) @@ -81,12 +81,12 @@ func TestK8sMetaDataCache_DeleteServiceByIpPort(t *testing.T) { Selector: nil, } MetaDataCache.AddServiceByIpPort("192.168.2.1", 80, serviceInfo) - if len(MetaDataCache.ipServiceInfo) != 1 && - len(MetaDataCache.ipServiceInfo["192.168.2.1"]) != 1 { + if len(MetaDataCache.IpServiceInfo) != 1 && + len(MetaDataCache.IpServiceInfo["192.168.2.1"]) != 1 { t.Fatalf("no service was added") } MetaDataCache.DeleteServiceByIpPort("192.168.2.1", 80) - if len(MetaDataCache.ipServiceInfo) != 0 { + if len(MetaDataCache.IpServiceInfo) != 0 { t.Fatalf("cache is not empty after deleting service") } } diff --git a/collector/pkg/metadata/kubernetes/node_watch.go b/collector/pkg/metadata/kubernetes/node_watch.go index f5d7b0456..4143a6204 100644 --- a/collector/pkg/metadata/kubernetes/node_watch.go +++ b/collector/pkg/metadata/kubernetes/node_watch.go @@ -17,18 +17,18 @@ type NodeInfo struct { Labels map[string]string } -type nodeMap struct { +type NodeMap struct { Info map[string]*NodeInfo mutex sync.RWMutex } -func newNodeMap() *nodeMap { - return &nodeMap{ +func newNodeMap() *NodeMap { + return &NodeMap{ Info: make(map[string]*NodeInfo), } } -func (n *nodeMap) add(info *NodeInfo) { +func (n *NodeMap) add(info *NodeInfo) { if info == nil { return } @@ -37,7 +37,7 @@ func (n *nodeMap) add(info *NodeInfo) { n.mutex.Unlock() } -func (n *nodeMap) getNodeName(ip string) (string, bool) { +func (n *NodeMap) getNodeName(ip string) (string, bool) { n.mutex.RLock() ret, ok := n.Info[ip] n.mutex.RUnlock() @@ -47,7 +47,7 @@ func (n *nodeMap) getNodeName(ip string) (string, bool) { return ret.Name, true } -func (n *nodeMap) getAllNodeAddresses() []string { +func (n *NodeMap) getAllNodeAddresses() []string { ret := make([]string, 0) n.mutex.RLock() for address := range n.Info { @@ -57,15 +57,15 @@ func (n *nodeMap) getAllNodeAddresses() []string { return ret } -func (n *nodeMap) delete(name string) { +func (n *NodeMap) delete(name string) { n.mutex.Lock() delete(n.Info, name) n.mutex.Unlock() } -var globalNodeInfo = newNodeMap() +var GlobalNodeInfo = newNodeMap() -func NodeWatch(clientSet *kubernetes.Clientset) { +func NodeWatch(clientSet *kubernetes.Clientset, handler cache.ResourceEventHandler) { stopper := make(chan struct{}) defer close(stopper) @@ -81,11 +81,16 @@ func NodeWatch(clientSet *kubernetes.Clientset) { return } - informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: AddNode, - UpdateFunc: UpdateNode, - DeleteFunc: DeleteNode, - }) + if handler != nil { + informer.AddEventHandler(handler) + } else { + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: AddNode, + UpdateFunc: UpdateNode, + DeleteFunc: DeleteNode, + }) + } + // TODO: use workqueue to avoid blocking <-stopper } @@ -103,7 +108,7 @@ func AddNode(obj interface{}) { nI.Ip = nodeAddress.Address } } - globalNodeInfo.add(nI) + GlobalNodeInfo.add(nI) } func UpdateNode(objOld interface{}, objNew interface{}) { @@ -125,5 +130,5 @@ func DeleteNode(obj interface{}) { return } } - globalNodeInfo.delete(node.Name) + GlobalNodeInfo.delete(node.Name) } diff --git a/collector/pkg/metadata/kubernetes/pod_delete.go b/collector/pkg/metadata/kubernetes/pod_delete.go index 3887038cf..cfc96a39c 100644 --- a/collector/pkg/metadata/kubernetes/pod_delete.go +++ b/collector/pkg/metadata/kubernetes/pod_delete.go @@ -62,7 +62,7 @@ func podDeleteLoop(interval time.Duration, gracePeriod time.Duration, stopCh cha func deletePodInfo(podInfo *deletedPodInfo) { if podInfo.name != "" { - deletePodInfo, ok := globalPodInfo.delete(podInfo.namespace, podInfo.name) + deletePodInfo, ok := GlobalPodInfo.delete(podInfo.namespace, podInfo.name) if ok { localWorkloadMap.delete(deletePodInfo.Namespace, deletePodInfo.WorkloadName) } diff --git a/collector/pkg/metadata/kubernetes/pod_delete_test.go b/collector/pkg/metadata/kubernetes/pod_delete_test.go index 855725918..2c480bd07 100644 --- a/collector/pkg/metadata/kubernetes/pod_delete_test.go +++ b/collector/pkg/metadata/kubernetes/pod_delete_test.go @@ -7,13 +7,13 @@ import ( func TestDeleteLoop(t *testing.T) { pod := CreatePod(true) - onAdd(pod) + AddPod(pod) verifyIfPodExist(true, t) if len(podDeleteQueue) != 0 { t.Fatalf("PodDeleteQueue should be 0, but is %d", len(podDeleteQueue)) } - onDelete(pod) + DeletePod(pod) verifyIfPodExist(true, t) if len(podDeleteQueue) != 1 { t.Fatalf("PodDeleteQueue should be 1, but is %d", len(podDeleteQueue)) @@ -44,7 +44,7 @@ func TestDeleteLoop(t *testing.T) { } func verifyIfPodExist(exist bool, t *testing.T) { - _, ok := globalPodInfo.get("CustomNamespace", "deploy-1a2b3c4d-5e6f7") + _, ok := GlobalPodInfo.get("CustomNamespace", "deploy-1a2b3c4d-5e6f7") if ok != exist { t.Fatalf("Finding pod at globalPodInfo. Expect %v, but get %v", false, ok) } diff --git a/collector/pkg/metadata/kubernetes/pod_watch.go b/collector/pkg/metadata/kubernetes/pod_watch.go index bdb3d5e99..101e526ee 100644 --- a/collector/pkg/metadata/kubernetes/pod_watch.go +++ b/collector/pkg/metadata/kubernetes/pod_watch.go @@ -42,7 +42,7 @@ type workloadInfo struct { WorkloadKind string } -var globalPodInfo = newPodMap() +var GlobalPodInfo = newPodMap() // localWorkloadMap only stores the workload whose pods are in the local Node. // The workload metadata will be sent to prometheus and used to filter metrics. @@ -164,7 +164,7 @@ func (m *podMap) getPodsMatchSelectors(namespace string, selectors map[string]st return retPodInfoSlice } -func PodWatch(clientSet *kubernetes.Clientset, graceDeletePeriod time.Duration) { +func PodWatch(clientSet *kubernetes.Clientset, graceDeletePeriod time.Duration, handler cache.ResourceEventHandler) { stopper := make(chan struct{}) defer close(stopper) @@ -180,17 +180,23 @@ func PodWatch(clientSet *kubernetes.Clientset, graceDeletePeriod time.Duration) runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) return } + go podDeleteLoop(10*time.Second, graceDeletePeriod, stopper) - informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: onAdd, - UpdateFunc: OnUpdate, - DeleteFunc: onDelete, - }) + + if handler == nil { + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: AddPod, + UpdateFunc: UpdatePod, + DeleteFunc: DeletePod, + }) + } else { + informer.AddEventHandler(handler) + } // TODO: use workqueue to avoid blocking <-stopper } -func onAdd(obj interface{}) { +func AddPod(obj interface{}) { pod := obj.(*corev1.Pod) // Find the controller workload of the pod @@ -199,7 +205,7 @@ func onAdd(obj interface{}) { rsUpdateMutex.RUnlock() // Find one of the services of the pod - serviceInfoSlice := globalServiceInfo.GetServiceMatchLabels(pod.Namespace, pod.Labels) + serviceInfoSlice := GlobalServiceInfo.GetServiceMatchLabels(pod.Namespace, pod.Labels) var serviceInfo *K8sServiceInfo if len(serviceInfoSlice) == 0 { serviceInfo = nil @@ -281,7 +287,7 @@ func onAdd(obj interface{}) { } } } - globalPodInfo.add(cachePodInfo) + GlobalPodInfo.add(cachePodInfo) nodeName, _ := os.LookupEnv("MY_NODE_NAME") //workloadMap only restore the workload in this machine if cachePodInfo.NodeName == nodeName { @@ -303,7 +309,7 @@ func getControllerKindName(pod *corev1.Pod) (workloadKind string, workloadName s // The owner of Pod is ReplicaSet, and it is Workload such as Deployment for ReplicaSet. // Therefore, find ReplicaSet's name in 'globalRsInfo' to find which kind of workload // the Pod belongs to. - if workload, ok := globalRsInfo.GetOwnerReference(mapKey(pod.Namespace, owner.Name)); ok { + if workload, ok := GlobalRsInfo.GetOwnerReference(mapKey(pod.Namespace, owner.Name)); ok { workloadKind = CompleteGVK(workload.APIVersion, strings.ToLower(workload.Kind)) workloadName = workload.Name } else { @@ -331,7 +337,7 @@ func extractDeploymentName(replicaSetName string) string { return "" } -func OnUpdate(objOld interface{}, objNew interface{}) { +func UpdatePod(objOld interface{}, objNew interface{}) { oldPod := objOld.(*corev1.Pod) newPod := objNew.(*corev1.Pod) if oldPod.ResourceVersion == newPod.ResourceVersion { @@ -340,13 +346,13 @@ func OnUpdate(objOld interface{}, objNew interface{}) { return } - oldCachePod, ok := globalPodInfo.get(oldPod.Namespace, oldPod.Name) + oldCachePod, ok := GlobalPodInfo.get(oldPod.Namespace, oldPod.Name) if !ok { - onAdd(objNew) + AddPod(objNew) return } // Always override the old pod in the cache - onAdd(objNew) + AddPod(objNew) // Delay delete the pod using the difference between the old pod and the new one deletedPodInfo := &deletedPodInfo{ @@ -419,7 +425,7 @@ func OnUpdate(objOld interface{}, objNew interface{}) { podDeleteQueueMut.Unlock() } -func onDelete(obj interface{}) { +func DeletePod(obj interface{}) { // Maybe get DeletedFinalStateUnknown instead of *corev1.Pod. // Fix https://github.com/KindlingProject/kindling/issues/445 pod, ok := obj.(*corev1.Pod) diff --git a/collector/pkg/metadata/kubernetes/pod_watch_test.go b/collector/pkg/metadata/kubernetes/pod_watch_test.go index 269646c61..31bdc133b 100644 --- a/collector/pkg/metadata/kubernetes/pod_watch_test.go +++ b/collector/pkg/metadata/kubernetes/pod_watch_test.go @@ -32,37 +32,37 @@ func TestTruncateContainerId(t *testing.T) { } func TestOnAdd(t *testing.T) { - globalPodInfo = &podMap{ + GlobalPodInfo = &podMap{ Info: make(map[string]map[string]*K8sPodInfo), } - globalServiceInfo = &ServiceMap{ + GlobalServiceInfo = &ServiceMap{ ServiceMap: make(map[string]map[string]*K8sServiceInfo), } - globalRsInfo = &ReplicaSetMap{ + GlobalRsInfo = &ReplicaSetMap{ Info: make(map[string]Controller), } // First add service, and then add pod - onAddService(CreateService()) - onAddReplicaSet(CreateReplicaSet()) - onAdd(CreatePod(true)) + AddService(CreateService()) + AddReplicaSet(CreateReplicaSet()) + AddPod(CreatePod(true)) t.Log(MetaDataCache) // Delete podInfo must not affect serviceMap - onDelete(CreatePod(true)) + DeletePod(CreatePod(true)) t.Log(MetaDataCache) // Empty all the metadata - onDeleteService(CreateService()) + DeleteService(CreateService()) t.Log(MetaDataCache) } // ISSUE https://github.com/KindlingProject/kindling/issues/229 func TestOnAddPodWhileReplicaSetUpdating(t *testing.T) { - globalPodInfo = &podMap{ + GlobalPodInfo = &podMap{ Info: make(map[string]map[string]*K8sPodInfo), } - globalServiceInfo = &ServiceMap{ + GlobalServiceInfo = &ServiceMap{ ServiceMap: make(map[string]map[string]*K8sServiceInfo), } - globalRsInfo = &ReplicaSetMap{ + GlobalRsInfo = &ReplicaSetMap{ Info: make(map[string]Controller), } // Firstly deployment created and add old RS and old POD @@ -77,19 +77,19 @@ func TestOnAddPodWhileReplicaSetUpdating(t *testing.T) { newPOD := CreatePod(true) newPOD.SetResourceVersion("new") newPOD.OwnerReferences[0].Controller = &controller - onAddReplicaSet(oldRs) - onAdd(oldPOD) + AddReplicaSet(oldRs) + AddPod(oldPOD) // Secondly POD&RS were been updated go func() { for i := 0; i < 1000; i++ { - OnUpdateReplicaSet(oldRs, newRs) + UpdateReplicaSet(oldRs, newRs) } }() for i := 0; i < 100; i++ { - OnUpdate(oldPOD, newPOD) + UpdatePod(oldPOD, newPOD) // Thirdly check the pod's workload_kind pod, ok := MetaDataCache.GetPodByContainerId(TruncateContainerId(newPOD.Status.ContainerStatuses[0].ContainerID)) require.True(t, ok, "failed to get target POD") @@ -98,19 +98,19 @@ func TestOnAddPodWhileReplicaSetUpdating(t *testing.T) { } func TestOnAddLowercaseWorkload(t *testing.T) { - globalPodInfo = &podMap{ + GlobalPodInfo = &podMap{ Info: make(map[string]map[string]*K8sPodInfo), } - globalServiceInfo = &ServiceMap{ + GlobalServiceInfo = &ServiceMap{ ServiceMap: make(map[string]map[string]*K8sServiceInfo), } - globalRsInfo = &ReplicaSetMap{ + GlobalRsInfo = &ReplicaSetMap{ Info: make(map[string]Controller), } higherCase := "DaemonSet" lowerCase := "daemonset" isController := true - onAdd(&corev1.Pod{ + AddPod(&corev1.Pod{ TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{ OwnerReferences: []metav1.OwnerReference{{ @@ -212,14 +212,14 @@ func TestUpdateAndDelayDelete(t *testing.T) { } podIp := addObj.Status.PodIP port := addObj.Spec.Containers[0].Ports[0].ContainerPort - onAdd(addObj) + AddPod(addObj) _, ok := MetaDataCache.GetContainerByIpPort(podIp, uint32(port)) if !ok { t.Fatalf("Not found container [%s:%d]", podIp, port) } stopCh := make(chan struct{}) go podDeleteLoop(100*time.Millisecond, 500*time.Millisecond, stopCh) - OnUpdate(addObj, updateObj) + UpdatePod(addObj, updateObj) // Check if the new container can be found assertFindPod(t, updateObj) @@ -247,10 +247,10 @@ func TestUpdateAndDelayDeleteWhenOnlyPodIpChanged(t *testing.T) { updateObj := new(corev1.Pod) _ = json.Unmarshal([]byte(updateObjJson), updateObj) - onAdd(addObj) + AddPod(addObj) stopCh := make(chan struct{}) go podDeleteLoop(100*time.Millisecond, 500*time.Millisecond, stopCh) - OnUpdate(addObj, updateObj) + UpdatePod(addObj, updateObj) // Check if the new container can be found assertFindPod(t, updateObj) @@ -277,10 +277,10 @@ func TestUpdateAndDelayDeleteWhenOnlyPortChanged(t *testing.T) { updateObj := new(corev1.Pod) _ = json.Unmarshal([]byte(updateObjJson), updateObj) - onAdd(addObj) + AddPod(addObj) stopCh := make(chan struct{}) go podDeleteLoop(100*time.Millisecond, 500*time.Millisecond, stopCh) - OnUpdate(addObj, updateObj) + UpdatePod(addObj, updateObj) // Check if new container can be found assertFindPod(t, updateObj) @@ -318,12 +318,12 @@ func TestDelayDeleteThenAddWithSameIP(t *testing.T) { stopCh := make(chan struct{}) go podDeleteLoop(20*time.Millisecond, 100*time.Millisecond, stopCh) - onAdd(addObj) + AddPod(addObj) // Check if the container can be found assertFindPod(t, addObj) - onDelete(deletedObj) - onAdd(newAddObj) + DeletePod(deletedObj) + AddPod(newAddObj) time.Sleep(200 * time.Millisecond) // Check if the new container can be found diff --git a/collector/pkg/metadata/kubernetes/replicaset_watch.go b/collector/pkg/metadata/kubernetes/replicaset_watch.go index 152e73dc9..fc83175aa 100644 --- a/collector/pkg/metadata/kubernetes/replicaset_watch.go +++ b/collector/pkg/metadata/kubernetes/replicaset_watch.go @@ -24,7 +24,7 @@ type ReplicaSetMap struct { mut sync.RWMutex } -var globalRsInfo = newReplicaSetMap() +var GlobalRsInfo = newReplicaSetMap() var rsUpdateMutex sync.RWMutex type Controller struct { @@ -58,7 +58,7 @@ func (rs *ReplicaSetMap) deleteOwnerReference(key string) { rs.mut.Unlock() } -func RsWatch(clientSet *kubernetes.Clientset) { +func RsWatch(clientSet *kubernetes.Clientset, handler cache.ResourceEventHandler) { stopper := make(chan struct{}) defer close(stopper) @@ -67,11 +67,15 @@ func RsWatch(clientSet *kubernetes.Clientset) { informer := rsInformer.Informer() defer runtime.HandleCrash() - informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: onAddReplicaSet, - UpdateFunc: OnUpdateReplicaSet, - DeleteFunc: onDeleteReplicaSet, - }) + if handler != nil { + informer.AddEventHandler(handler) + } else { + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: AddReplicaSet, + UpdateFunc: UpdateReplicaSet, + DeleteFunc: DeleteReplicaSet, + }) + } go factory.Start(stopper) @@ -83,7 +87,7 @@ func RsWatch(clientSet *kubernetes.Clientset) { <-stopper } -func onAddReplicaSet(obj interface{}) { +func AddReplicaSet(obj interface{}) { rs := obj.(*appv1.ReplicaSet) ownerRef := metav1.GetControllerOfNoCopy(rs) if ownerRef == nil { @@ -94,10 +98,10 @@ func onAddReplicaSet(obj interface{}) { Kind: ownerRef.Kind, APIVersion: ownerRef.APIVersion, } - globalRsInfo.put(mapKey(rs.Namespace, rs.Name), controller) + GlobalRsInfo.put(mapKey(rs.Namespace, rs.Name), controller) } -func OnUpdateReplicaSet(objOld interface{}, objNew interface{}) { +func UpdateReplicaSet(objOld interface{}, objNew interface{}) { oldRs := objOld.(*appv1.ReplicaSet) newRs := objNew.(*appv1.ReplicaSet) if newRs.ResourceVersion == oldRs.ResourceVersion { @@ -105,12 +109,12 @@ func OnUpdateReplicaSet(objOld interface{}, objNew interface{}) { } rsUpdateMutex.Lock() // TODO: re-implement the updated logic to reduce computation - onDeleteReplicaSet(objOld) - onAddReplicaSet(objNew) + DeleteReplicaSet(objOld) + AddReplicaSet(objNew) rsUpdateMutex.Unlock() } -func onDeleteReplicaSet(obj interface{}) { +func DeleteReplicaSet(obj interface{}) { // Maybe get DeletedFinalStateUnknown instead of *corev1.Pod. // Fix https://github.com/KindlingProject/kindling/issues/445 rs, ok := obj.(*appv1.ReplicaSet) @@ -124,5 +128,5 @@ func onDeleteReplicaSet(obj interface{}) { return } } - globalRsInfo.deleteOwnerReference(mapKey(rs.Namespace, rs.Name)) + GlobalRsInfo.deleteOwnerReference(mapKey(rs.Namespace, rs.Name)) } diff --git a/collector/pkg/metadata/kubernetes/replicaset_watch_test.go b/collector/pkg/metadata/kubernetes/replicaset_watch_test.go index 5026d7f12..708ee8d8d 100644 --- a/collector/pkg/metadata/kubernetes/replicaset_watch_test.go +++ b/collector/pkg/metadata/kubernetes/replicaset_watch_test.go @@ -1,19 +1,22 @@ package kubernetes -import "testing" -import appv1 "k8s.io/api/apps/v1" -import apimachinery "k8s.io/apimachinery/pkg/apis/meta/v1" +import ( + "testing" + + appv1 "k8s.io/api/apps/v1" + apimachinery "k8s.io/apimachinery/pkg/apis/meta/v1" +) func InitGlobalRsInfo() { - globalRsInfo = &ReplicaSetMap{ + GlobalRsInfo = &ReplicaSetMap{ Info: make(map[string]Controller), } } func TestOnAddReplicaSet(t *testing.T) { InitGlobalRsInfo() - onAddReplicaSet(CreateReplicaSet()) - owner, ok := globalRsInfo.GetOwnerReference(mapKey("CustomNamespace", "deploy-1a2b3c4d")) + AddReplicaSet(CreateReplicaSet()) + owner, ok := GlobalRsInfo.GetOwnerReference(mapKey("CustomNamespace", "deploy-1a2b3c4d")) if !ok || owner.Kind != "Deployment" || owner.APIVersion != "apps/v1" { t.Errorf("Error") } diff --git a/collector/pkg/metadata/kubernetes/service_watch.go b/collector/pkg/metadata/kubernetes/service_watch.go index 5a232d7e4..a87edd2b1 100644 --- a/collector/pkg/metadata/kubernetes/service_watch.go +++ b/collector/pkg/metadata/kubernetes/service_watch.go @@ -29,7 +29,7 @@ type ServiceMap struct { mut sync.RWMutex } -var globalServiceInfo = newServiceMap() +var GlobalServiceInfo = newServiceMap() var serviceUpdatedMutex sync.Mutex func newServiceMap() *ServiceMap { @@ -89,7 +89,7 @@ func (s *ServiceMap) delete(namespace string, serviceName string) { s.mut.Unlock() } -func ServiceWatch(clientSet *kubernetes.Clientset) { +func ServiceWatch(clientSet *kubernetes.Clientset, handler cache.ResourceEventHandler) { stopper := make(chan struct{}) defer close(stopper) @@ -105,16 +105,20 @@ func ServiceWatch(clientSet *kubernetes.Clientset) { return } - informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: onAddService, - UpdateFunc: OnUpdateService, - DeleteFunc: onDeleteService, - }) + if handler != nil { + informer.AddEventHandler(handler) + } else { + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: AddService, + UpdateFunc: UpdateService, + DeleteFunc: DeleteService, + }) + } // TODO: use workqueue to avoid blocking <-stopper } -func onAddService(obj interface{}) { +func AddService(obj interface{}) { service := obj.(*corev1.Service) sI := &K8sServiceInfo{ Ip: service.Spec.ClusterIP, @@ -123,9 +127,9 @@ func onAddService(obj interface{}) { isNodePort: service.Spec.Type == "NodePort", Selector: service.Spec.Selector, } - globalServiceInfo.add(sI) + GlobalServiceInfo.add(sI) // When new service is added, podInfo should be updated - podInfoSlice := globalPodInfo.getPodsMatchSelectors(sI.Namespace, sI.Selector) + podInfoSlice := GlobalPodInfo.getPodsMatchSelectors(sI.Namespace, sI.Selector) for _, podInfo := range podInfoSlice { for _, containerId := range podInfo.ContainerIds { if podInfo, ok := MetaDataCache.GetPodByContainerId(containerId); ok { @@ -150,7 +154,7 @@ func onAddService(obj interface{}) { for _, port := range service.Spec.Ports { MetaDataCache.AddServiceByIpPort(service.Spec.ClusterIP, uint32(port.Port), sI) if sI.isNodePort { - nodeAddresses := globalNodeInfo.getAllNodeAddresses() + nodeAddresses := GlobalNodeInfo.getAllNodeAddresses() for _, nodeAddress := range nodeAddresses { MetaDataCache.AddServiceByIpPort(nodeAddress, uint32(port.NodePort), sI) } @@ -158,7 +162,7 @@ func onAddService(obj interface{}) { } } -func OnUpdateService(objOld interface{}, objNew interface{}) { +func UpdateService(objOld interface{}, objNew interface{}) { oldSvc := objOld.(*corev1.Service) newSvc := objNew.(*corev1.Service) if oldSvc.ResourceVersion == newSvc.ResourceVersion { @@ -166,12 +170,12 @@ func OnUpdateService(objOld interface{}, objNew interface{}) { } serviceUpdatedMutex.Lock() // TODO: re-implement the updated logic to reduce computation - onDeleteService(objOld) - onAddService(objNew) + DeleteService(objOld) + AddService(objNew) serviceUpdatedMutex.Unlock() } -func onDeleteService(obj interface{}) { +func DeleteService(obj interface{}) { // Maybe get DeletedFinalStateUnknown instead of *corev1.Pod. // Fix https://github.com/KindlingProject/kindling/issues/445 service, ok := obj.(*corev1.Service) @@ -186,7 +190,7 @@ func onDeleteService(obj interface{}) { } } // 'delete' will delete all such service in MetaDataCache - globalServiceInfo.delete(service.Namespace, service.Name) + GlobalServiceInfo.delete(service.Namespace, service.Name) ip := service.Spec.ClusterIP if ip == "" || ip == "None" { return @@ -194,7 +198,7 @@ func onDeleteService(obj interface{}) { for _, port := range service.Spec.Ports { MetaDataCache.DeleteServiceByIpPort(ip, uint32(port.Port)) if service.Spec.Type == "NodePort" { - nodeAddresses := globalNodeInfo.getAllNodeAddresses() + nodeAddresses := GlobalNodeInfo.getAllNodeAddresses() for _, nodeAddress := range nodeAddresses { MetaDataCache.DeleteServiceByIpPort(nodeAddress, uint32(port.NodePort)) } diff --git a/collector/pkg/metadata/kubernetes/service_watch_test.go b/collector/pkg/metadata/kubernetes/service_watch_test.go index cc588839a..7e23f2c14 100644 --- a/collector/pkg/metadata/kubernetes/service_watch_test.go +++ b/collector/pkg/metadata/kubernetes/service_watch_test.go @@ -116,40 +116,40 @@ func TestServiceMap_GetServiceMatchLabels(t *testing.T) { } func TestOnAddService(t *testing.T) { - globalPodInfo = &podMap{ + GlobalPodInfo = &podMap{ Info: make(map[string]map[string]*K8sPodInfo), } - globalServiceInfo = &ServiceMap{ + GlobalServiceInfo = &ServiceMap{ ServiceMap: make(map[string]map[string]*K8sServiceInfo), } - globalRsInfo = &ReplicaSetMap{ + GlobalRsInfo = &ReplicaSetMap{ Info: make(map[string]Controller), } // First add pod, and then add service - onAddReplicaSet(CreateReplicaSet()) - onAdd(CreatePod(true)) - onAddService(CreateService()) + AddReplicaSet(CreateReplicaSet()) + AddPod(CreatePod(true)) + AddService(CreateService()) t.Log(MetaDataCache) // Delete service must empty the serviceInfo referenced by podInfo - onDeleteService(CreateService()) + DeleteService(CreateService()) t.Log(MetaDataCache) // Empty all the metadata - onDelete(CreatePod(true)) + DeletePod(CreatePod(true)) t.Log(MetaDataCache) } func TestServiceMap_Delete(t *testing.T) { - globalPodInfo = &podMap{ + GlobalPodInfo = &podMap{ Info: make(map[string]map[string]*K8sPodInfo), } - globalServiceInfo = &ServiceMap{ + GlobalServiceInfo = &ServiceMap{ ServiceMap: make(map[string]map[string]*K8sServiceInfo), } - globalRsInfo = &ReplicaSetMap{ + GlobalRsInfo = &ReplicaSetMap{ Info: make(map[string]Controller), } - onAddService(CreateService()) - onAdd(CreatePod(true)) + AddService(CreateService()) + AddPod(CreatePod(true)) containerId := "1a2b3c4d5e6f" podFromCache, ok := MetaDataCache.GetPodByContainerId(containerId) if !ok { @@ -169,7 +169,7 @@ func TestServiceMap_Delete(t *testing.T) { if !reflect.DeepEqual(serviceFromCache, expectedService) { t.Errorf("Before delete method is invoked %v is expected, but get %v", expectedService, serviceFromCache) } - globalServiceInfo.delete("CustomNamespace", "CustomService") + GlobalServiceInfo.delete("CustomNamespace", "CustomService") expectedService = &K8sServiceInfo{ Ip: "", ServiceName: "", diff --git a/collector/pkg/metadata/metaprovider/api/api.go b/collector/pkg/metadata/metaprovider/api/api.go new file mode 100644 index 000000000..daa16b12c --- /dev/null +++ b/collector/pkg/metadata/metaprovider/api/api.go @@ -0,0 +1,120 @@ +package api + +import ( + "encoding/json" + "fmt" + "strings" + + appv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + + "github.com/Kindling-project/kindling/collector/pkg/metadata/kubernetes" +) + +type AddObj func(obj interface{}) + +type UpdateObj func(oldObj interface{}, newObj interface{}) + +type DeleteObj func(obj interface{}) + +type Operation string + +const ( + Add Operation = "add" + Update Operation = "update" + Delete Operation = "delete" +) + +type MetaDataService interface { + ListAndWatch() +} + +type MetaDataRsyncResponse struct { + Type string // pod,rs,node,service + Operation string // add,update,delete + NewObj interface{} + OldObj interface{} +} + +type MetaDataVO struct { + Type string // pod,rs,node,service + Operation string // add,update,delete + NewObj json.RawMessage + OldObj json.RawMessage +} + +type ListVO struct { + Cache *kubernetes.K8sMetaDataCache + GlobalNodeInfo *kubernetes.NodeMap + GlobalRsInfo *kubernetes.ReplicaSetMap + GlobalServiceInfo *kubernetes.ServiceMap +} + +func MetaDataVO2String(resp *MetaDataVO) string { + var str strings.Builder + str.WriteString(fmt.Sprintf("Operation: [%s], ResType: [%s]", resp.Operation, resp.Type)) + + switch resp.Type { + case "pod": + if resp.NewObj != nil { + obj := corev1.Pod{} + err := json.Unmarshal(resp.NewObj, &obj) + if err == nil { + str.WriteString(fmt.Sprintf(", newObj: [%s/%s]", obj.Namespace, obj.Name)) + } + } + if resp.OldObj != nil { + obj := corev1.Pod{} + err := json.Unmarshal(resp.OldObj, &obj) + if err == nil { + str.WriteString(fmt.Sprintf(", oldObj: [%s/%s]", obj.Namespace, obj.Name)) + } + } + case "rs": + if resp.NewObj != nil { + obj := appv1.ReplicaSet{} + err := json.Unmarshal(resp.NewObj, &obj) + if err == nil { + str.WriteString(fmt.Sprintf(", newObj: [%s/%s]", obj.Namespace, obj.Name)) + } + } + if resp.OldObj != nil { + obj := appv1.ReplicaSet{} + err := json.Unmarshal(resp.OldObj, &obj) + if err == nil { + str.WriteString(fmt.Sprintf(", oldObj: [%s/%s]", obj.Namespace, obj.Name)) + } + } + case "service": + if resp.NewObj != nil { + obj := corev1.Service{} + err := json.Unmarshal(resp.NewObj, &obj) + if err == nil { + str.WriteString(fmt.Sprintf(", newObj: [%s/%s]", obj.Namespace, obj.Name)) + } + } + if resp.OldObj != nil { + obj := corev1.Service{} + err := json.Unmarshal(resp.OldObj, &obj) + if err == nil { + str.WriteString(fmt.Sprintf(", oldObj: [%s/%s]", obj.Namespace, obj.Name)) + } + } + case "node": + if resp.NewObj != nil { + obj := corev1.Node{} + err := json.Unmarshal(resp.NewObj, &obj) + if err == nil { + str.WriteString(fmt.Sprintf(", newObj: [%s]", obj.Name)) + } + } + if resp.OldObj != nil { + obj := corev1.Node{} + err := json.Unmarshal(resp.OldObj, &obj) + if err == nil { + str.WriteString(fmt.Sprintf(", oldObj: [%s]", obj.Name)) + } + } + } + return str.String() +} diff --git a/collector/pkg/metadata/metaprovider/client/client.go b/collector/pkg/metadata/metaprovider/client/client.go new file mode 100644 index 000000000..e7d275576 --- /dev/null +++ b/collector/pkg/metadata/metaprovider/client/client.go @@ -0,0 +1,118 @@ +package metadataclient + +import ( + "bufio" + "encoding/json" + "fmt" + "io" + "log" + "net" + "net/http" + "time" + + "github.com/Kindling-project/kindling/collector/pkg/metadata/kubernetes" + "github.com/Kindling-project/kindling/collector/pkg/metadata/metaprovider/api" +) + +type Client struct { + // tls wrap + cli http.Client + endpoint string + debug bool +} + +func NewMetaDataWrapperClient(endpoint string, debug bool) *Client { + return &Client{ + cli: *createHTTPClient(), + debug: debug, + endpoint: endpoint + "/listAndWatch", + } +} + +func (c *Client) ListAndWatch(setup kubernetes.SetPreprocessingMetaDataCache) error { + // handler cache.ResourceEventHandler, + resp, err := c.cli.Get(c.endpoint) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode == 500 { + return fmt.Errorf("provider server is not ready yet,please wait") + } + reader := bufio.NewReaderSize(resp.Body, 1024*32) + b, _ := reader.ReadBytes('\n') + // var listData api.MetaData + // json.Unmarshal(b, &listData) + // cache := kubernetes.K8sMetaDataCache{} + listVO := api.ListVO{} + err = json.Unmarshal(b, &listVO) + if err != nil { + // 本次连接失败,等待重试 + return err + } + + if c.debug { + formatCache, _ := json.MarshalIndent(listVO.Cache, "", "\t") + log.Printf("K8sCache Init:%s\n", string(formatCache)) + } + + setup(listVO.Cache, listVO.GlobalNodeInfo, listVO.GlobalServiceInfo, listVO.GlobalRsInfo) + SetEnableTraceFromMPClient(c.debug) + for { + b, err := reader.ReadBytes('\n') + if err != nil { + if err == io.EOF { + log.Printf("remote server send unexpected EOF,shutting down,err:%v", err) + break + } else { + log.Printf("receive unexpected error durning watch ,err:%v", err) + break + } + } + var resp api.MetaDataVO + err = json.Unmarshal(b, &resp) + if err != nil { + log.Printf("parse response failed ,err:%v", err) + continue + } + c.apply(&resp) + if kubernetes.ReWatch { + kubernetes.ReWatch = false + break + } + } + return nil +} + +func (c *Client) apply(resp *api.MetaDataVO) { + var err error + switch resp.Type { + case "pod": + err = podUnwrapperHander.Apply(resp) + case "service": + err = serviceUnwrapperHander.Apply(resp) + case "rs": + err = relicaSetUnwrapperHander.Apply(resp) + case "node": + err = nodeUnwrapperHander.Apply(resp) + default: + // TODO Detail + } + + if err != nil { + log.Panicf("ERROR: convert k8sMetadata falled, err: %v", err) + } +} + +func createHTTPClient() *http.Client { + client := &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + }, + } + return client +} diff --git a/collector/pkg/metadata/metaprovider/client/client_test.go b/collector/pkg/metadata/metaprovider/client/client_test.go new file mode 100644 index 000000000..2a96c8b1d --- /dev/null +++ b/collector/pkg/metadata/metaprovider/client/client_test.go @@ -0,0 +1,31 @@ +package metadataclient + +import ( + "fmt" + "testing" + + "github.com/Kindling-project/kindling/collector/pkg/metadata/kubernetes" +) + +// func TestClients_ListAndWatch(t *testing.T) { +// for i := 0; i < 1000; i++ { +// go func() { +// cli := NewMetaDataWrapperClient("http://localhost:9504", true) +// err := cli.ListAndWatch(kubernetes.SetupCache) +// if err != nil { +// fmt.Println(err) +// } +// }() +// fmt.Printf("init: %d\n", i) +// } + +// select {} +// } + +func TestClient_ListAndWatch(t *testing.T) { + cli := NewMetaDataWrapperClient("http://localhost:9504", true) + err := cli.ListAndWatch(kubernetes.SetupCache) + if err != nil { + fmt.Println(err) + } +} diff --git a/collector/pkg/metadata/metaprovider/client/unwrapper_handler.go b/collector/pkg/metadata/metaprovider/client/unwrapper_handler.go new file mode 100644 index 000000000..6faa11ecb --- /dev/null +++ b/collector/pkg/metadata/metaprovider/client/unwrapper_handler.go @@ -0,0 +1,129 @@ +package metadataclient + +import ( + "encoding/json" + "fmt" + "log" + + "github.com/Kindling-project/kindling/collector/pkg/metadata/kubernetes" + "github.com/Kindling-project/kindling/collector/pkg/metadata/metaprovider/api" + appv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" +) + +type unwrapper func([]byte) (interface{}, error) + +// globalOption to show each message received from MP +var enableTrace bool + +func SetEnableTraceFromMPClient(enable bool) { + enableTrace = enable +} + +type UnwrapperHandler struct { + add api.AddObj + update api.UpdateObj + delete api.DeleteObj + + unwrapper +} + +var podUnwrapperHander = NewUnwrapperHander( + kubernetes.AddPod, + kubernetes.UpdatePod, + kubernetes.DeletePod, + func(b []byte) (interface{}, error) { + obj := corev1.Pod{} + err := json.Unmarshal(b, &obj) + return &obj, err + }, +) + +var serviceUnwrapperHander = NewUnwrapperHander( + kubernetes.AddService, + kubernetes.UpdateService, + kubernetes.DeleteService, + func(b []byte) (interface{}, error) { + obj := corev1.Service{} + err := json.Unmarshal(b, &obj) + return &obj, err + }, +) + +var nodeUnwrapperHander = NewUnwrapperHander( + kubernetes.AddNode, + kubernetes.UpdateNode, + kubernetes.DeleteNode, + func(b []byte) (interface{}, error) { + obj := corev1.Node{} + err := json.Unmarshal(b, &obj) + return &obj, err + }, +) + +var relicaSetUnwrapperHander = NewUnwrapperHander( + kubernetes.AddReplicaSet, + kubernetes.UpdateReplicaSet, + kubernetes.DeleteReplicaSet, + func(b []byte) (interface{}, error) { + obj := appv1.ReplicaSet{} + err := json.Unmarshal(b, &obj) + return &obj, err + }, +) + +func NewUnwrapperHander(add api.AddObj, update api.UpdateObj, delete api.DeleteObj, unwrapper unwrapper) UnwrapperHandler { + return UnwrapperHandler{ + add: add, + update: update, + delete: delete, + unwrapper: unwrapper, + } +} + +func (uw *UnwrapperHandler) Apply(resp *api.MetaDataVO) error { + if enableTrace { + log.Println(api.MetaDataVO2String(resp)) + } + switch resp.Operation { + case string(api.Add): + if resp.NewObj == nil { + return fmt.Errorf("operation [add] missing data [newObj]") + } + if obj, err := uw.unwrapper(resp.NewObj); err == nil { + uw.add(obj) + return nil + } else { + return err + } + case string(api.Update): + if resp.NewObj == nil { + return fmt.Errorf("operation [update] missing data [newObj]") + } + if resp.OldObj == nil { + return fmt.Errorf("operation [update] missing data [oldObj]") + } + oldObj, err := uw.unwrapper(resp.OldObj) + if err != nil { + return err + } + newObj, err := uw.unwrapper(resp.NewObj) + if err != nil { + return err + } + uw.update(oldObj, newObj) + return nil + case string(api.Delete): + if resp.OldObj == nil { + return fmt.Errorf("operation [delete] missing data [oldObj]") + } + if obj, err := uw.unwrapper(resp.OldObj); err == nil { + uw.delete(obj) + return nil + } else { + return err + } + default: + return fmt.Errorf("unexpect operation: %s", resp.Operation) + } +} diff --git a/collector/pkg/metadata/metaprovider/ioutil/writeflusher.go b/collector/pkg/metadata/metaprovider/ioutil/writeflusher.go new file mode 100644 index 000000000..df3c39395 --- /dev/null +++ b/collector/pkg/metadata/metaprovider/ioutil/writeflusher.go @@ -0,0 +1,92 @@ +package ioutil + +import ( + "io" + "sync" +) + +// WriteFlusher wraps the Write and Flush operation ensuring that every write +// is a flush. In addition, the Close method can be called to intercept +// Read/Write calls if the targets lifecycle has already ended. +type WriteFlusher struct { + w io.Writer + flusher flusher + flushed chan struct{} + flushedOnce sync.Once + closed chan struct{} + closeLock sync.Mutex +} + +type flusher interface { + Flush() +} + +var errWriteFlusherClosed = io.EOF + +func (wf *WriteFlusher) Write(b []byte) (n int, err error) { + select { + case <-wf.closed: + return 0, errWriteFlusherClosed + default: + } + + n, err = wf.w.Write(b) + wf.Flush() // every write is a flush. + return n, err +} + +// Flush the stream immediately. +func (wf *WriteFlusher) Flush() { + select { + case <-wf.closed: + return + default: + } + + wf.flushedOnce.Do(func() { + close(wf.flushed) + }) + wf.flusher.Flush() +} + +// Flushed returns the state of flushed. +// If it's flushed, return true, or else it return false. +func (wf *WriteFlusher) Flushed() bool { + // BUG(stevvooe): Remove this method. Its use is inherently racy. Seems to + // be used to detect whether or a response code has been issued or not. + // Another hook should be used instead. + var flushed bool + select { + case <-wf.flushed: + flushed = true + default: + } + return flushed +} + +// Close closes the write flusher, disallowing any further writes to the +// target. After the flusher is closed, all calls to write or flush will +// result in an error. +func (wf *WriteFlusher) Close() error { + wf.closeLock.Lock() + defer wf.closeLock.Unlock() + + select { + case <-wf.closed: + return errWriteFlusherClosed + default: + close(wf.closed) + } + return nil +} + +// NewWriteFlusher returns a new WriteFlusher. +func NewWriteFlusher(w io.Writer) *WriteFlusher { + var fl flusher + if f, ok := w.(flusher); ok { + fl = f + } else { + return nil + } + return &WriteFlusher{w: w, flusher: fl, closed: make(chan struct{}), flushed: make(chan struct{})} +} diff --git a/collector/pkg/metadata/metaprovider/service/config.go b/collector/pkg/metadata/metaprovider/service/config.go new file mode 100644 index 000000000..aafbfda0c --- /dev/null +++ b/collector/pkg/metadata/metaprovider/service/config.go @@ -0,0 +1,13 @@ +package service + +import "github.com/Kindling-project/kindling/collector/pkg/metadata/kubernetes" + +type Config struct { + KubeAuthType kubernetes.AuthType + KubeConfigDir string + // EnableFetchReplicaSet controls whether to fetch ReplicaSet information. + // The default value is false. It should be enabled if the ReplicaSet + // is used to control pods in the third-party CRD except for Deployment. + EnableFetchReplicaSet bool + LogInterval int +} diff --git a/collector/pkg/metadata/metaprovider/service/handler.go b/collector/pkg/metadata/metaprovider/service/handler.go new file mode 100644 index 000000000..11c15d3fe --- /dev/null +++ b/collector/pkg/metadata/metaprovider/service/handler.go @@ -0,0 +1,113 @@ +package service + +import ( + "github.com/Kindling-project/kindling/collector/pkg/metadata/metaprovider/api" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" +) + +type boardcast func(data api.MetaDataRsyncResponse) + +type K8sResourceHandler struct { + resType string + + add api.AddObj + update api.UpdateObj + delete api.DeleteObj + + boardcast +} + +func (m *K8sResourceHandler) AddObj(obj interface{}) { + m.add(obj) + m.boardcast(api.MetaDataRsyncResponse{ + Type: m.resType, + Operation: string(api.Add), + NewObj: obj, + }) +} + +func (m *K8sResourceHandler) UpdateObj(objOld interface{}, objNew interface{}) { + m.update(objOld, objNew) + m.boardcast(api.MetaDataRsyncResponse{ + Type: m.resType, + Operation: string(api.Update), + OldObj: objOld, + NewObj: objNew, + }) +} + +func (m *K8sResourceHandler) DeleteObj(obj interface{}) { + m.delete(obj) + m.boardcast(api.MetaDataRsyncResponse{ + Type: m.resType, + Operation: string(api.Delete), + OldObj: obj, + }) +} + +type PodResourceHandler struct { + K8sResourceHandler +} + +func (prh *PodResourceHandler) AddPod(obj interface{}) { + decreasePodInfo(obj) + prh.K8sResourceHandler.AddObj(obj) +} + +func (prh *PodResourceHandler) UpdatePod(objOld interface{}, objNew interface{}) { + decreasePodInfo(objOld) + decreasePodInfo(objNew) + prh.K8sResourceHandler.UpdateObj(objNew, objOld) +} + +func (prh *PodResourceHandler) DeleteObj(obj interface{}) { + decreasePodInfo(obj) + prh.K8sResourceHandler.DeleteObj(obj) +} + +func NewHandler(typeName string, add api.AddObj, update api.UpdateObj, delete api.DeleteObj, boardcast boardcast) cache.ResourceEventHandlerFuncs { + handler := K8sResourceHandler{ + resType: typeName, + add: add, + update: update, + delete: delete, + boardcast: boardcast, + } + + if typeName == "pod" { + prh := PodResourceHandler{handler} + + return cache.ResourceEventHandlerFuncs{ + AddFunc: prh.AddObj, + UpdateFunc: prh.UpdateObj, + DeleteFunc: prh.DeleteObj, + } + } + + return cache.ResourceEventHandlerFuncs{ + AddFunc: handler.AddObj, + UpdateFunc: handler.UpdateObj, + DeleteFunc: handler.DeleteObj, + } +} + +func decreasePodInfo(obj interface{}) { + pod, ok := obj.(*corev1.Pod) + if !ok { + deletedState, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + return + } + pod, ok = deletedState.Obj.(*corev1.Pod) + if !ok { + return + } + } + + if pod != nil { + pod.ManagedFields = nil + pod.Spec.Volumes = nil + pod.Status.Conditions = nil + } +} diff --git a/collector/pkg/metadata/metaprovider/service/service.go b/collector/pkg/metadata/metaprovider/service/service.go new file mode 100644 index 000000000..f173641b5 --- /dev/null +++ b/collector/pkg/metadata/metaprovider/service/service.go @@ -0,0 +1,283 @@ +package service + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log" + "net" + "net/http" + "strings" + "sync" + "sync/atomic" + "time" + + "golang.org/x/sync/semaphore" + + "github.com/Kindling-project/kindling/collector/pkg/metadata/kubernetes" + "github.com/Kindling-project/kindling/collector/pkg/metadata/metaprovider/api" + "github.com/Kindling-project/kindling/collector/pkg/metadata/metaprovider/ioutil" +) + +var sem = semaphore.NewWeighted(int64(16)) + +type MetaDataWrapper struct { + flushersMap sync.Map + historyClientCount atomic.Int32 + historyDisconnectClientCount atomic.Int32 + // Signal + stopCh chan struct{} + + ReceivedPodsEventCount EventCount + ReceivedRsEventCount EventCount + ReceivedServiceEventCount EventCount + ReceivedNodesEventCount EventCount +} + +type EventCount struct { + Add atomic.Int64 + Delete atomic.Int64 + Update atomic.Int64 +} + +func (c *EventCount) AddEvent(operation string) { + switch operation { + case "add": + c.Add.Add(1) + case "update": + c.Update.Add(1) + case "delete": + c.Delete.Add(1) + } +} + +func (c *EventCount) String() string { + return fmt.Sprintf("[ Add: %d,Update: %d,Delete: %d ]", c.Add.Load(), c.Update.Load(), c.Delete.Load()) +} + +func NewMetaDataWrapper(config *Config) (*MetaDataWrapper, error) { + mp := &MetaDataWrapper{ + stopCh: make(chan struct{}), + } + + // DEBUG Watcher Size + if config.LogInterval > 0 { + go func() { + log.Printf("Log Status interval: %d(s)", config.LogInterval) + ticker := time.NewTicker(time.Duration(config.LogInterval) * time.Second) + for { + select { + case <-ticker.C: + log.Printf("Received Events: Pod: %s; ReplicaSet: %s; Node: %s; Service: %s\n", + mp.ReceivedPodsEventCount.String(), + mp.ReceivedRsEventCount.String(), + mp.ReceivedNodesEventCount.String(), + mp.ReceivedServiceEventCount.String(), + ) + + kubernetes.RLockMetadataCache() + log.Printf("Cached Resources Counts: Containers:%d, ReplicaSet: %d\n", + len(kubernetes.MetaDataCache.ContainerIdInfo), + len(kubernetes.GlobalRsInfo.Info), + ) // kubernetes.GlobalPodInfo.Info + kubernetes.RUnlockMetadataCache() + case <-mp.stopCh: + ticker.Stop() + return + } + } + }() + } + + var options []kubernetes.Option + options = append(options, kubernetes.WithAuthType(config.KubeAuthType)) + options = append(options, kubernetes.WithKubeConfigDir(config.KubeConfigDir)) + options = append(options, kubernetes.WithGraceDeletePeriod(0)) + options = append(options, kubernetes.WithFetchReplicaSet(config.EnableFetchReplicaSet)) + options = append(options, kubernetes.WithPodEventHander(NewHandler( + "pod", + kubernetes.AddPod, + kubernetes.UpdatePod, + kubernetes.DeletePod, + mp.broadcast, + ))) + options = append(options, kubernetes.WithReplicaSetEventHander(NewHandler( + "rs", + kubernetes.AddReplicaSet, + kubernetes.UpdateReplicaSet, + kubernetes.DeleteReplicaSet, + mp.broadcast, + ))) + options = append(options, kubernetes.WithNodeEventHander(NewHandler( + "node", + kubernetes.AddNode, + kubernetes.UpdateNode, + kubernetes.DeleteNode, + mp.broadcast, + ))) + options = append(options, kubernetes.WithServiceEventHander(NewHandler( + "service", + kubernetes.AddService, + kubernetes.UpdateService, + kubernetes.DeleteService, + mp.broadcast, + ))) + return mp, kubernetes.InitK8sHandler(options...) +} + +func (s *MetaDataWrapper) broadcast(data api.MetaDataRsyncResponse) { + switch data.Type { + case "pod": + s.ReceivedPodsEventCount.AddEvent(data.Operation) + case "rs": + s.ReceivedRsEventCount.AddEvent(data.Operation) + case "node": + s.ReceivedNodesEventCount.AddEvent(data.Operation) + case "service": + s.ReceivedServiceEventCount.AddEvent(data.Operation) + } + b, _ := json.Marshal(data) + idleMax := 10 * time.Second + idleTimeout := time.NewTimer(idleMax) + defer idleTimeout.Stop() + s.flushersMap.Range(func(_, flusher interface{}) bool { + idleTimeout.Reset(idleMax) + f := flusher.(*Watcher) + // 如果一个事件5s还未正常写入到发送至目标节点(有10个事件的缓冲区),则认为目标客户端不可用,关闭写入通道,并要求结束 + select { + case f.eventChannel <- append(b, '\n'): + case <-idleTimeout.C: + log.Printf("Event Flush Timeout, Disconnect client [%s] from server side!\n", f.IP) + f.Close() + s.RemoveFlusher(f) + return true + } + return true + }) +} + +func (s *MetaDataWrapper) list() ([]byte, error) { + resp := api.ListVO{ + Cache: kubernetes.MetaDataCache, + GlobalNodeInfo: kubernetes.GlobalNodeInfo, + GlobalRsInfo: kubernetes.GlobalRsInfo, + GlobalServiceInfo: kubernetes.GlobalServiceInfo, + } + return json.Marshal(resp) +} + +func GetIPFromRequest(r *http.Request) (string, error) { + ip := r.Header.Get("X-Real-IP") + if net.ParseIP(ip) != nil { + return ip, nil + } + + ip = r.Header.Get("X-Forward-For") + for _, i := range strings.Split(ip, ",") { + if net.ParseIP(i) != nil { + return i, nil + } + } + + ip, _, err := net.SplitHostPort(r.RemoteAddr) + if err != nil { + return "", err + } + + if net.ParseIP(ip) != nil { + return ip, nil + } + + return "", errors.New("no valid ip found") +} + +func (s *MetaDataWrapper) ListAndWatch(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + f := ioutil.NewWriteFlusher(w) + ip, err := GetIPFromRequest(r) + if err != nil { + ip = "unknow" + } + if watcher, err := s.ListWithSemaphore(ctx, f, ip); err != nil { + log.Printf("Failed to acquire semaphore: %v", err) + w.WriteHeader(500) + return + } else { + watcher.watch(ctx, s.stopCh) + s.RemoveFlusher(watcher) + } +} + +func (s *MetaDataWrapper) ListWithSemaphore(ctx context.Context, f *ioutil.WriteFlusher, ip string) (*Watcher, error) { + if err := sem.Acquire(ctx, 1); err != nil { + return nil, err + } + defer sem.Release(1) + kubernetes.RLockMetadataCache() + defer kubernetes.RUnlockMetadataCache() + b, _ := s.list() + f.Write(append(b, '\n')) + + w := &Watcher{ + eventChannel: make(chan []byte, 10), + IP: ip, + WriteFlusher: f, + } + s.AddFlusher(w) + addCount := s.historyClientCount.Load() + disconnectCount := s.historyDisconnectClientCount.Load() + log.Printf("Add Watcher , client is from IP: %s , current Watcher Size : %d", ip, addCount-disconnectCount) + return w, nil +} + +type Watcher struct { + Id int32 + IP string + eventChannel chan []byte + *ioutil.WriteFlusher + + isClosed atomic.Bool +} + +func (s *MetaDataWrapper) AddFlusher(w *Watcher) { + w.Id = s.historyClientCount.Add(1) + w.isClosed.Store(false) + s.flushersMap.Store(w.Id, w) +} + +func (s *MetaDataWrapper) RemoveFlusher(w *Watcher) { + if w.isClosed.Swap(true) { + // 如果旧值为 true,说明已经被boardcast线程关闭过,直接退出 + return + } + disconnectCount := s.historyDisconnectClientCount.Add(1) + addCount := s.historyClientCount.Load() + log.Printf("Remove Watcher: client from IP: %s is disconnected, current Watcher Size : %d", w.IP, addCount-disconnectCount) + s.flushersMap.Delete(w.Id) +} + +func (w *Watcher) watch(ctx context.Context, stopCh <-chan struct{}) { + defer w.Close() + for { + select { + case data := <-w.eventChannel: + _, err := w.WriteFlusher.Write(data) + if err != nil { + log.Printf("remote Connection closed durning flush, Error: %v\n", err) + return + } + case <-ctx.Done(): + // 客户端正常断开 + return + case <-stopCh: + // TODO clear eventChannel and return + return + } + } +} + +func (s *MetaDataWrapper) Shutdown() { + // TODO Check channel + close(s.stopCh) +} diff --git a/collector/pkg/metadata/metaprovider/service/service_test.go b/collector/pkg/metadata/metaprovider/service/service_test.go new file mode 100644 index 000000000..da29f44e5 --- /dev/null +++ b/collector/pkg/metadata/metaprovider/service/service_test.go @@ -0,0 +1,36 @@ +package service + +import ( + "flag" + "fmt" + "log" + "net/http" + "testing" + + "github.com/Kindling-project/kindling/collector/pkg/metadata/kubernetes" +) + +func TestMetaDataWrapper_ListAndWatch(t *testing.T) { + authType := flag.String("authType", "kubeConfig", "AuthType describes the type of authentication to use for the K8s API, support 'kubeConfig' or 'serviceAccount'. ") + kubeConfigPath := flag.String("kubeConfig", "/root/.kube/config", "kubeConfig describe the filePath to your kubeConfig,only used when authType is 'kubeConfig'") + httpPort := flag.Int("http-port", 9504, "port describe which port will be used to expose data") + enableFetchReplicaset := flag.Bool("enableFetchReplicaset", false, "controls whether to fetch ReplicaSet information. The default value is false. It should be enabled if the ReplicaSet is used to control pods in the third-party CRD except for Deployment.") + logInterval := flag.Int("logInterval", 10, "Interval(Second) to show how many event mp received, default 120s") + + flag.Parse() + + config := &Config{ + KubeAuthType: kubernetes.AuthType(*authType), + KubeConfigDir: *kubeConfigPath, + EnableFetchReplicaSet: *enableFetchReplicaset, + LogInterval: *logInterval, + } + + if mdw, err := NewMetaDataWrapper(config); err != nil { + log.Fatalf("create MetaData Wrapper failed, err: %v", err) + } else { + http.HandleFunc("/listAndWatch", mdw.ListAndWatch) + log.Printf("[http] service start at port: %d", *httpPort) + log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *httpPort), nil)) + } +} diff --git a/deploy/metadata-provider/metadata-provider-deploy.yml b/deploy/metadata-provider/metadata-provider-deploy.yml new file mode 100644 index 000000000..5f1b897b5 --- /dev/null +++ b/deploy/metadata-provider/metadata-provider-deploy.yml @@ -0,0 +1,53 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + k8s-app: metadata-provider + name: metadata-provider + namespace: kindling +spec: + selector: + matchLabels: + k8s-app: metadata-provider + template: + metadata: + labels: + k8s-app: metadata-provider + spec: + serviceAccount: kindling-agent + containers: + - name: metadata-provider + image: kindlingproject/metadata-provider:v1.0 + command: ["./metadata-provider"] + args: + - --http-port=9504 + - --authType=serviceAccount + # - --kubeConfig=/root/.kube/config + imagePullPolicy: Always + securityContext: + privileged: true + resources: + limits: + memory: 1Gi + requests: + memory: 300Mi + ports: + - containerPort: 9504 + protocol: TCP + name: http + restartPolicy: Always + terminationGracePeriodSeconds: 30 + +--- +apiVersion: v1 +kind: Service +metadata: + name: metadata-provider + namespace: kindling +spec: + ports: + - port: 9504 + protocol: TCP + targetPort: http + selector: + k8s-app: metadata-provider