Skip to content

Commit

Permalink
Add an option to control whether to fetch ReplicaSet metadata (#492)
Browse files Browse the repository at this point in the history
Signed-off-by: Daxin Wang <daxinwang@harmonycloud.cn>
  • Loading branch information
dxsup authored Mar 23, 2023
1 parent c44b0c0 commit d489fbe
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 15 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
1. All notable changes to this project will be documented in this file.
2. Records in this file are not identical to the title of their Pull Requests. A detailed description is necessary for understanding what changes are and why they are made.

## Unreleased
### Enhancements
- Add an option `enable_fetch_replicaset` to control whether to fetch ReplicaSet metadata. The default value is false which aims to release pressure on Kubernetes API server. ([#492](https://github.com/KindlingProject/kindling/pull/492))

### Bug fixes


## v0.7.1 - 2023-03-01
### New features
- Support trace-profiling sampling to reduce data output. One trace is sampled every five seconds for each endpoint by default. ([#446](https://github.com/KindlingProject/kindling/pull/446)[#462](https://github.com/KindlingProject/kindling/pull/462))
Expand Down
7 changes: 7 additions & 0 deletions collector/docker/kindling-collector-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,14 @@ processors:
enable: true
kube_auth_type: kubeConfig
kube_config_dir: /root/.kube/config
# GraceDeletePeriod controls the delay interval after receiving delete event.
# The unit is seconds, and the default value is 60 seconds.
# Should not be lower than 30 seconds.
grace_delete_period: 60
# enable_fetch_replicaset controls whether to fetch ReplicaSet information.
# The default value is false. It should be enabled if the ReplicaSet
# is used to control pods in the third-party CRD except for Deployment.
enable_fetch_replicaset: false
aggregateprocessor:
# Aggregation duration window size. The unit is second.
ticker_interval: 5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ type Config struct {
// The unit is seconds, and the default value is 60 seconds.
// Should not be lower than 30 seconds.
GraceDeletePeriod int `mapstructure:"grace_delete_period"`
// EnableFetchReplicaSet controls whether to fetch ReplicaSet information.
// The default value is false. It should be enabled if the ReplicaSet
// is used to control pods in the third-party CRD except for Deployment.
EnableFetchReplicaSet bool `mapstructure:"enable_fetch_replicaset"`
// Set "Enable" false if you want to run the agent in the non-Kubernetes environment.
// Otherwise, the agent will panic if it can't connect to the API-server.
Enable bool `mapstructure:"enable"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ func NewKubernetesProcessor(cfg interface{}, telemetry *component.TelemetryTools
}
}
var options []kubernetes.Option
options = append(options, kubernetes.WithAuthType(config.KubeAuthType))
options = append(options, kubernetes.WithKubeConfigDir(config.KubeConfigDir))
options = append(options, kubernetes.WithGraceDeletePeriod(config.GraceDeletePeriod))
options = append(options,
kubernetes.WithAuthType(config.KubeAuthType),
kubernetes.WithKubeConfigDir(config.KubeConfigDir),
kubernetes.WithGraceDeletePeriod(config.GraceDeletePeriod),
kubernetes.WithFetchReplicaSet(config.EnableFetchReplicaSet),
)
err := kubernetes.InitK8sHandler(options...)
if err != nil {
telemetry.Logger.Panicf("Failed to initialize [%s]: %v. Set the option 'enable' false if you want to run the agent in the non-Kubernetes environment.", K8sMetadata, err)
Expand Down
11 changes: 11 additions & 0 deletions collector/pkg/metadata/kubernetes/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ type config struct {
// The unit is seconds, and the default value is 60 seconds.
// Should not be lower than 30 seconds.
GraceDeletePeriod time.Duration
// EnableFetchReplicaSet controls whether to fetch ReplicaSet information.
// The default value is false. It should be enabled if the ReplicaSet
// is used to control pods in the third-party CRD except for Deployment.
EnableFetchReplicaSet bool
}

type Option func(cfg *config)
Expand All @@ -36,3 +40,10 @@ func WithGraceDeletePeriod(interval int) Option {
cfg.GraceDeletePeriod = time.Duration(interval) * time.Second
}
}

// WithFetchReplicaSet sets whether to fetch ReplicaSet information.
func WithFetchReplicaSet(fetch bool) Option {
return func(cfg *config) {
cfg.EnableFetchReplicaSet = fetch
}
}
2 changes: 2 additions & 0 deletions collector/pkg/metadata/kubernetes/helper.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package kubernetes

const DeploymentKind = "deployment"

// CompleteGVK returns the complete string of the workload kind.
// If apiVersion is not one of the built-in groupVersion(see scheme.go), return {apiVersion}-{kind};
// return {kind}, otherwise.
Expand Down
21 changes: 11 additions & 10 deletions collector/pkg/metadata/kubernetes/initk8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ const (
AuthTypeServiceAccount AuthType = "serviceAccount"
// AuthTypeKubeConfig uses local credentials like those used by kubectl.
AuthTypeKubeConfig AuthType = "kubeConfig"
// Default kubeconfig path
// DefaultKubeConfigPath Default kubeconfig path
DefaultKubeConfigPath string = "~/.kube/config"
// Default grace delete period is 60 seconds
DefaultGraceDeletePeriod time.Duration = time.Second * 60
// DefaultGraceDeletePeriod is 60 seconds
DefaultGraceDeletePeriod = time.Second * 60
)

var authTypes = map[AuthType]bool{
Expand Down Expand Up @@ -57,7 +57,6 @@ func (c APIConfig) Validate() error {

var (
MetaDataCache = New()
KubeClient *k8s.Clientset
once sync.Once
IsInitSuccess = false
)
Expand All @@ -66,9 +65,10 @@ func InitK8sHandler(options ...Option) error {
var retErr error
once.Do(func() {
k8sConfig := config{
KubeAuthType: AuthTypeKubeConfig,
KubeConfigDir: DefaultKubeConfigPath,
GraceDeletePeriod: DefaultGraceDeletePeriod,
KubeAuthType: AuthTypeKubeConfig,
KubeConfigDir: DefaultKubeConfigPath,
GraceDeletePeriod: DefaultGraceDeletePeriod,
EnableFetchReplicaSet: false,
}
for _, option := range options {
option(&k8sConfig)
Expand All @@ -82,13 +82,14 @@ func InitK8sHandler(options ...Option) error {
IsInitSuccess = true
go NodeWatch(clientSet)
time.Sleep(1 * time.Second)
go RsWatch(clientSet)
time.Sleep(1 * time.Second)
if k8sConfig.EnableFetchReplicaSet {
go RsWatch(clientSet)
time.Sleep(1 * time.Second)
}
go ServiceWatch(clientSet)
time.Sleep(1 * time.Second)
go PodWatch(clientSet, k8sConfig.GraceDeletePeriod)
time.Sleep(1 * time.Second)
KubeClient = clientSet
})
return retErr
}
Expand Down
19 changes: 17 additions & 2 deletions collector/pkg/metadata/kubernetes/pod_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kubernetes
import (
"fmt"
_ "path/filepath"
"regexp"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -223,17 +224,31 @@ func getControllerKindName(pod *corev1.Pod) (workloadKind string, workloadName s
if workload, ok := globalRsInfo.GetOwnerReference(mapKey(pod.Namespace, owner.Name)); ok {
workloadKind = CompleteGVK(workload.APIVersion, strings.ToLower(workload.Kind))
workloadName = workload.Name
return
} else {
// If not found in 'globalRsInfo', just use 'Deployment' as the workload kind.
workloadKind = DeploymentKind
workloadName = extractDeploymentName(owner.Name)
}
return
}
// If the owner of pod is not ReplicaSet or the replicaset has no controller
// If the owner of pod is not ReplicaSet
workloadKind = CompleteGVK(owner.APIVersion, strings.ToLower(owner.Kind))
workloadName = owner.Name
return
}
return
}

var rRegex = regexp.MustCompile(`^(.*)-[0-9a-z]+$`)

func extractDeploymentName(replicaSetName string) string {
parts := rRegex.FindStringSubmatch(replicaSetName)
if len(parts) == 2 {
return parts[1]
}
return ""
}

func OnUpdate(objOld interface{}, objNew interface{}) {
oldPod := objOld.(*corev1.Pod)
newPod := objNew.(*corev1.Pod)
Expand Down
20 changes: 20 additions & 0 deletions collector/pkg/metadata/kubernetes/pod_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,3 +338,23 @@ func assertFindPod(t *testing.T, pod *corev1.Pod) {
_, find = MetaDataCache.GetContainerByHostIpPort(pod.Status.HostIP, uint32(pod.Spec.Containers[0].Ports[0].HostPort))
assert.True(t, find, "Didn't find the new HostIP Port in MetaDataCache")
}

func Test_extractDeploymentName(t *testing.T) {
type args struct {
replicaSetName string
}
tests := []struct {
name string
args args
want string
}{
{args: args{"nginx-deployment-75675f5897"}, want: "nginx-deployment"},
{args: args{"nginx-75675f5897"}, want: "nginx"},
{args: args{"nginx75675f5897"}, want: ""},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, extractDeploymentName(tt.args.replicaSetName), "extractDeploymentName(%v)", tt.args.replicaSetName)
})
}
}
7 changes: 7 additions & 0 deletions deploy/agent/kindling-collector-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,14 @@ processors:
enable: true
kube_auth_type: serviceAccount
kube_config_dir: /root/.kube/config
# GraceDeletePeriod controls the delay interval after receiving delete event.
# The unit is seconds, and the default value is 60 seconds.
# Should not be lower than 30 seconds.
grace_delete_period: 60
# enable_fetch_replicaset controls whether to fetch ReplicaSet information.
# The default value is false. It should be enabled if the ReplicaSet
# is used to control pods in the third-party CRD except for Deployment.
enable_fetch_replicaset: false
aggregateprocessor:
# Aggregation duration window size. The unit is second.
ticker_interval: 5
Expand Down

0 comments on commit d489fbe

Please sign in to comment.