-
Notifications
You must be signed in to change notification settings - Fork 408
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
ingress: add nodepool endpoints filtering for nginx ingress controller
When users access service through nodepool ingress, ingress controller will balance the users rquests to the endpoints binding to the service, the ingress controller in a nodepool should only balance the requests to endpoints belonging to this nodepool, so the service endpoints need to be filtered to exclude endpoints from other nodepools. Note: Currently we leverage yurthub data filtering framework to achieve the purpose, which is actually a node level sidecar solution, if a nodepool level data filtering sidecar is implemented in future, nodepool ingress data filtering is suggested to switch to that solution. Signed-off-by: zhenggu1 <zhengguang.zhang@intel.com>
- Loading branch information
zhenggu1
committed
Dec 28, 2021
1 parent
c23c8d1
commit ce91eac
Showing
4 changed files
with
342 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,168 @@ | ||
/* | ||
Copyright 2021 The OpenYurt Authors. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package ingresscontroller | ||
|
||
import ( | ||
"fmt" | ||
"io" | ||
"net/http" | ||
|
||
v1 "k8s.io/api/core/v1" | ||
"k8s.io/client-go/informers" | ||
listers "k8s.io/client-go/listers/core/v1" | ||
"k8s.io/client-go/tools/cache" | ||
"k8s.io/klog/v2" | ||
|
||
"github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" | ||
"github.com/openyurtio/openyurt/pkg/yurthub/filter" | ||
filterutil "github.com/openyurtio/openyurt/pkg/yurthub/filter/util" | ||
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" | ||
"github.com/openyurtio/openyurt/pkg/yurthub/util" | ||
yurtinformers "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/informers/externalversions" | ||
appslisters "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/listers/apps/v1alpha1" | ||
) | ||
|
||
// Register registers a filter | ||
func Register(filters *filter.Filters) { | ||
filters.Register(filter.IngressControllerFilterName, func() (filter.Interface, error) { | ||
return NewFilter(), nil | ||
}) | ||
} | ||
|
||
func NewFilter() *ingressControllerFilter { | ||
return &ingressControllerFilter{ | ||
Approver: filter.NewApprover("nginx-ingress-controller", "endpoints", []string{"list", "watch"}...), | ||
workingMode: util.WorkingModeEdge, | ||
stopCh: make(chan struct{}), | ||
} | ||
} | ||
|
||
type ingressControllerFilter struct { | ||
*filter.Approver | ||
serviceLister listers.ServiceLister | ||
serviceSynced cache.InformerSynced | ||
nodepoolLister appslisters.NodePoolLister | ||
nodePoolSynced cache.InformerSynced | ||
nodeGetter filter.NodeGetter | ||
nodeSynced cache.InformerSynced | ||
nodeName string | ||
serializerManager *serializer.SerializerManager | ||
workingMode util.WorkingMode | ||
stopCh chan struct{} | ||
} | ||
|
||
func (ssf *ingressControllerFilter) SetWorkingMode(mode util.WorkingMode) error { | ||
ssf.workingMode = mode | ||
return nil | ||
} | ||
|
||
func (ssf *ingressControllerFilter) SetSharedInformerFactory(factory informers.SharedInformerFactory) error { | ||
ssf.serviceLister = factory.Core().V1().Services().Lister() | ||
ssf.serviceSynced = factory.Core().V1().Services().Informer().HasSynced | ||
|
||
if ssf.workingMode == util.WorkingModeCloud { | ||
klog.Infof("prepare list/watch to sync node(%s) for cloud working mode", ssf.nodeName) | ||
ssf.nodeSynced = factory.Core().V1().Nodes().Informer().HasSynced | ||
ssf.nodeGetter = factory.Core().V1().Nodes().Lister().Get | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (ssf *ingressControllerFilter) SetYurtSharedInformerFactory(yurtFactory yurtinformers.SharedInformerFactory) error { | ||
ssf.nodepoolLister = yurtFactory.Apps().V1alpha1().NodePools().Lister() | ||
ssf.nodePoolSynced = yurtFactory.Apps().V1alpha1().NodePools().Informer().HasSynced | ||
|
||
return nil | ||
} | ||
|
||
func (ssf *ingressControllerFilter) SetNodeName(nodeName string) error { | ||
ssf.nodeName = nodeName | ||
|
||
return nil | ||
} | ||
|
||
func (ssf *ingressControllerFilter) SetStorageWrapper(s cachemanager.StorageWrapper) error { | ||
if len(ssf.nodeName) == 0 { | ||
return fmt.Errorf("node name for ingressControllerFilter is not ready") | ||
} | ||
|
||
// hub agent will list/watch node from kube-apiserver when hub agent work as cloud mode | ||
if ssf.workingMode == util.WorkingModeCloud { | ||
return nil | ||
} | ||
klog.Infof("prepare local disk storage to sync node(%s) for edge working mode", ssf.nodeName) | ||
|
||
nodeKey := fmt.Sprintf("kubelet/nodes/%s", ssf.nodeName) | ||
ssf.nodeSynced = func() bool { | ||
obj, err := s.Get(nodeKey) | ||
if err != nil || obj == nil { | ||
return false | ||
} | ||
|
||
if _, ok := obj.(*v1.Node); !ok { | ||
return false | ||
} | ||
|
||
return true | ||
} | ||
|
||
ssf.nodeGetter = func(name string) (*v1.Node, error) { | ||
obj, err := s.Get(nodeKey) | ||
if err != nil { | ||
return nil, err | ||
} else if obj == nil { | ||
return nil, fmt.Errorf("node(%s) is not ready", name) | ||
} | ||
|
||
if node, ok := obj.(*v1.Node); ok { | ||
return node, nil | ||
} | ||
|
||
return nil, fmt.Errorf("node(%s) is not found", name) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (ssf *ingressControllerFilter) SetSerializerManager(s *serializer.SerializerManager) error { | ||
ssf.serializerManager = s | ||
return nil | ||
} | ||
|
||
func (ssf *ingressControllerFilter) Approve(comp, resource, verb string) bool { | ||
if !ssf.Approver.Approve(comp, resource, verb) { | ||
return false | ||
} | ||
|
||
if ok := cache.WaitForCacheSync(ssf.stopCh, ssf.nodeSynced, ssf.serviceSynced, ssf.nodePoolSynced); !ok { | ||
return false | ||
} | ||
|
||
return true | ||
} | ||
|
||
func (ssf *ingressControllerFilter) Filter(req *http.Request, rc io.ReadCloser, stopCh <-chan struct{}) (int, io.ReadCloser, error) { | ||
s := filterutil.CreateSerializer(req, ssf.serializerManager) | ||
if s == nil { | ||
klog.Errorf("skip filter, failed to create serializer in ingressControllerFilter") | ||
return 0, rc, nil | ||
} | ||
|
||
handler := NewIngressControllerFilterHandler(ssf.nodeName, s, ssf.serviceLister, ssf.nodepoolLister, ssf.nodeGetter) | ||
return filter.NewFilterReadCloser(req, rc, handler, s, filter.IngressControllerFilterName, stopCh) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,168 @@ | ||
/* | ||
Copyright 2021 The OpenYurt Authors. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package ingresscontroller | ||
|
||
import ( | ||
"io" | ||
|
||
v1 "k8s.io/api/core/v1" | ||
"k8s.io/apimachinery/pkg/watch" | ||
listers "k8s.io/client-go/listers/core/v1" | ||
"k8s.io/klog/v2" | ||
|
||
"github.com/openyurtio/openyurt/pkg/yurthub/filter" | ||
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" | ||
nodepoolv1alpha1 "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/apis/apps/v1alpha1" | ||
appslisters "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/listers/apps/v1alpha1" | ||
) | ||
|
||
type ingressControllerFilterHandler struct { | ||
nodeName string | ||
serializer *serializer.Serializer | ||
serviceLister listers.ServiceLister | ||
nodePoolLister appslisters.NodePoolLister | ||
nodeGetter filter.NodeGetter | ||
} | ||
|
||
func NewIngressControllerFilterHandler( | ||
nodeName string, | ||
serializer *serializer.Serializer, | ||
serviceLister listers.ServiceLister, | ||
nodePoolLister appslisters.NodePoolLister, | ||
nodeGetter filter.NodeGetter) filter.Handler { | ||
return &ingressControllerFilterHandler{ | ||
nodeName: nodeName, | ||
serializer: serializer, | ||
serviceLister: serviceLister, | ||
nodePoolLister: nodePoolLister, | ||
nodeGetter: nodeGetter, | ||
} | ||
} | ||
|
||
//ObjectResponseFilter filter the endpoints from get response object and return the bytes | ||
func (fh *ingressControllerFilterHandler) ObjectResponseFilter(b []byte) ([]byte, error) { | ||
eps, err := fh.serializer.Decode(b) | ||
if err != nil || eps == nil { | ||
klog.Errorf("skip filter, failed to decode response in ObjectResponseFilter of ingressControllerFilterHandler, %v", err) | ||
return b, nil | ||
} | ||
|
||
endpointsList, ok := eps.(*v1.EndpointsList) | ||
if !ok { | ||
return b, nil | ||
} | ||
//filter endpoints | ||
var items []v1.Endpoints | ||
for i := range endpointsList.Items { | ||
item := fh.reassembleEndpoint(&endpointsList.Items[i]) | ||
if item != nil { | ||
items = append(items, *item) | ||
} | ||
} | ||
endpointsList.Items = items | ||
|
||
return fh.serializer.Encode(endpointsList) | ||
} | ||
|
||
//FilterWatchObject filter the endpoints from watch response object and return the bytes | ||
func (fh *ingressControllerFilterHandler) StreamResponseFilter(rc io.ReadCloser, ch chan watch.Event) error { | ||
defer func() { | ||
close(ch) | ||
}() | ||
|
||
d, err := fh.serializer.WatchDecoder(rc) | ||
if err != nil { | ||
klog.Errorf("StreamResponseFilter of ingressControllerFilterHandler ended with error, %v", err) | ||
return err | ||
} | ||
for { | ||
watchType, obj, err := d.Decode() | ||
if err != nil { | ||
return err | ||
} | ||
var wEvent watch.Event | ||
wEvent.Type = watchType | ||
endpoints, ok := obj.(*v1.Endpoints) | ||
if ok { | ||
item := fh.reassembleEndpoint(endpoints) | ||
if item == nil { | ||
continue | ||
} | ||
wEvent.Object = item | ||
} else { | ||
wEvent.Object = obj | ||
} | ||
klog.V(5).Infof("filter watch decode endpoint: type: %s, obj=%#+v", watchType, endpoints) | ||
ch <- wEvent | ||
} | ||
} | ||
|
||
// reassembleEndpoints will filter the valid endpoints to its nodepool | ||
func (fh *ingressControllerFilterHandler) reassembleEndpoint(endpoints *v1.Endpoints) *v1.Endpoints { | ||
svcName := endpoints.Name | ||
_, err := fh.serviceLister.Services(endpoints.Namespace).Get(svcName) | ||
if err != nil { | ||
klog.Infof("skip reassemble endpoints, failed to get service %s/%s, err: %v", endpoints.Namespace, svcName, err) | ||
return endpoints | ||
} | ||
// filter the endpoints on the node which is in the same nodepool with current node | ||
currentNode, err := fh.nodeGetter(fh.nodeName) | ||
if err != nil { | ||
klog.Infof("skip reassemble endpoints, failed to get current node %s, err: %v", fh.nodeName, err) | ||
return endpoints | ||
} | ||
if nodePoolName, ok := currentNode.Labels[nodepoolv1alpha1.LabelCurrentNodePool]; ok { | ||
nodePool, err := fh.nodePoolLister.Get(nodePoolName) | ||
if err != nil { | ||
klog.Infof("skip reassemble endpoints, failed to get nodepool %s, err: %v", nodePoolName, err) | ||
return endpoints | ||
} | ||
isNodePoolValidEps := false | ||
var newEpsAddr []v1.EndpointAddress | ||
for i := range endpoints.Subsets { | ||
for j := range endpoints.Subsets[i].Addresses { | ||
nodeName := endpoints.Subsets[i].Addresses[j].NodeName | ||
if nodeName == nil { | ||
//ignore endpoints whose NodeName is not set, for example "kubernetes" | ||
continue | ||
} | ||
if inSameNodePool(*nodeName, nodePool.Status.Nodes) { | ||
isNodePoolValidEps = true | ||
newEpsAddr = append(newEpsAddr, endpoints.Subsets[i].Addresses[j]) | ||
klog.Infof("endpoints/%s address/%s with nodeName/%s is valid to nodepool/%s", | ||
svcName, endpoints.Subsets[i].Addresses[j].IP, *nodeName, nodePoolName) | ||
} | ||
} | ||
endpoints.Subsets[i].Addresses = newEpsAddr | ||
newEpsAddr = nil | ||
} | ||
if !isNodePoolValidEps { | ||
return nil | ||
} | ||
} | ||
return endpoints | ||
} | ||
|
||
func inSameNodePool(nodeName string, nodeList []string) bool { | ||
for _, n := range nodeList { | ||
if nodeName == n { | ||
return true | ||
} | ||
} | ||
|
||
return false | ||
} |