Skip to content
This repository has been archived by the owner on Sep 15, 2023. It is now read-only.

Commit

Permalink
Merge pull request #102 from askuy/feature/optimizek8swatch
Browse files Browse the repository at this point in the history
optimize k8s endpoints watch
  • Loading branch information
askuy authored May 26, 2021
2 parents 9a9a314 + 5744bd1 commit 0b78d30
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 12 deletions.
10 changes: 3 additions & 7 deletions ek8s/examples/kubegrpc/config.toml
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
[k8s]
addr=""
token=""
namespaces=["default"]

[registry]
scheme = "k8s" # grpc resolver默认scheme为"k8s",你可以自行修改

[grpc.test]
debug = true # 开启后并加上export EGO_DEBUG=true,可以看到每次grpc请求,配置名、地址、耗时、请求数据、响应数据
addr = "k8s:///test:9090"
#balancerName = "round_robin" # 默认值
#dialTimeout = "1s" # 默认值
#enableAccessInterceptor = true
#enableAccessInterceptorReply = true
#enableAccessInterceptorReq = true
#enableAccessInterceptorRes = true
#enableAccessInterceptorReq = true

10 changes: 5 additions & 5 deletions ek8s/watcher_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (c *WatcherApp) watch(ctx context.Context, ns string) error {
)

informer := informersFactory.Core().V1().Pods()
c.logger.Debug("k8s watch prefix", zap.String("appname", c.appName), zap.String("kind", c.kind), zap.String("kind", c.kind))
c.logger.Debug("k8s watch pods", zap.String("appname", c.appName), zap.String("kind", c.kind), zap.String("kind", c.kind))
informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addPod,
UpdateFunc: c.updatePod,
Expand All @@ -63,24 +63,24 @@ func (c *WatcherApp) watch(ctx context.Context, ns string) error {
// 启动该命名空间里监听
go informersFactory.Start(ctx.Done())
case KindEndpoints:
label, err := c.getServicesSelector(ns, c.appName)
endPoints, err := c.CoreV1().Endpoints(ns).Get(context.Background(), c.getDeploymentName(c.appName), metav1.GetOptions{})
if err != nil {
return err
}
c.logger.Debug("watch prefix label", zap.String("appname", c.appName), zap.String("label", label), zap.String("kind", c.kind))

c.logger.Info("k8s watch endpoints", zap.String("appname", c.appName), zap.String("namespace", endPoints.Namespace), zap.String("endPointName", endPoints.Name), zap.String("kind", c.kind))
informersFactory := informers.NewSharedInformerFactoryWithOptions(
c.Clientset,
defaultResync,
informers.WithNamespace(ns),
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = label
options.FieldSelector = "metadata.name=" + endPoints.Name
// todo
options.ResourceVersion = "0"
}),
)

informer := informersFactory.Core().V1().Endpoints()
c.logger.Debug("k8s watch prefix", zap.String("appname", c.appName), zap.String("kind", c.kind))
informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addEndpoints,
UpdateFunc: c.updateEndpoints,
Expand Down

0 comments on commit 0b78d30

Please sign in to comment.