From cdbf3298987e4f10485e3d5099c519da27fdc607 Mon Sep 17 00:00:00 2001 From: clyi Date: Tue, 7 Jan 2025 19:29:57 +0800 Subject: [PATCH] save temp Signed-off-by: clyi --- Makefile | 3 +- dist/images/Dockerfile | 4 +- dist/images/Dockerfile.base | 4 +- dist/images/install.sh | 19 +++-- mocks/pkg/ovs/interface.go | 28 +++++++ pkg/apis/kubeovn/v1/subnet.go | 1 + pkg/controller/config.go | 3 + pkg/controller/endpoint.go | 58 +++++++++++-- pkg/controller/init.go | 6 ++ pkg/controller/subnet.go | 12 ++- pkg/daemon/controller.go | 10 ++- pkg/daemon/controller_linux.go | 42 ++++++++++ pkg/ovs/interface.go | 1 + pkg/ovs/ovn-nb-load_balancer.go | 37 +++++++++ pkg/ovs/ovs-ofctl.go | 140 ++++++++++++++++++++++++++++++++ pkg/util/k8s.go | 18 ++++ pkg/util/validator.go | 6 +- 17 files changed, 367 insertions(+), 25 deletions(-) create mode 100644 pkg/ovs/ovs-ofctl.go diff --git a/Makefile b/Makefile index 956336747d9..690844e5793 100644 --- a/Makefile +++ b/Makefile @@ -134,9 +134,8 @@ build-go-arm: CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build $(GO_BUILD_FLAGS) -buildmode=pie -o $(CURDIR)/dist/images/kube-ovn-controller -v ./cmd/controller .PHONY: build-kube-ovn -build-kube-ovn: build-debug build-go +build-kube-ovn: build-go docker build -t $(REGISTRY)/kube-ovn:$(RELEASE_TAG) --build-arg VERSION=$(RELEASE_TAG) -f dist/images/Dockerfile dist/images/ - docker build -t $(REGISTRY)/kube-ovn:$(LEGACY_TAG) --build-arg VERSION=$(LEGACY_TAG) -f dist/images/Dockerfile dist/images/ .PHONY: build-kube-ovn-dpdk build-kube-ovn-dpdk: build-go diff --git a/dist/images/Dockerfile b/dist/images/Dockerfile index 30a89c6b8a4..91f7f79c439 100644 --- a/dist/images/Dockerfile +++ b/dist/images/Dockerfile @@ -1,7 +1,7 @@ # syntax = docker/dockerfile:experimental ARG VERSION ARG BASE_TAG=$VERSION -FROM kubeovn/kube-ovn-base:$BASE_TAG AS setcap +FROM yichanglu/kube-ovn-base:$BASE_TAG AS setcap COPY *.sh /kube-ovn/ COPY kubectl-ko /kube-ovn/kubectl-ko @@ -22,7 +22,7 @@ RUN ln -s /kube-ovn/kube-ovn-cmd /kube-ovn/kube-ovn-monitor && \ setcap CAP_NET_RAW,CAP_NET_BIND_SERVICE+eip /kube-ovn/kube-ovn-controller && \ setcap CAP_NET_ADMIN,CAP_NET_RAW,CAP_NET_BIND_SERVICE,CAP_SYS_ADMIN+eip /kube-ovn/kube-ovn-daemon -FROM kubeovn/kube-ovn-base:$BASE_TAG +FROM yichanglu/kube-ovn-base:$BASE_TAG COPY --chmod=0644 logrotate/* /etc/logrotate.d/ COPY grace_stop_ovn_controller /usr/share/ovn/scripts/grace_stop_ovn_controller diff --git a/dist/images/Dockerfile.base b/dist/images/Dockerfile.base index 2bbfbecf6d6..28050bf5393 100644 --- a/dist/images/Dockerfile.base +++ b/dist/images/Dockerfile.base @@ -65,7 +65,9 @@ RUN cd /usr/src/ && git clone -b branch-24.03 --depth=1 https://github.com/ovn-o # support dedicated BFD LRP curl -s https://github.com/kubeovn/ovn/commit/40345aa35d03c93cde877ccfa8111346291ebc7c.patch | git apply && \ # skip node local dns ip conntrack when set acl - curl -s https://github.com/kubeovn/ovn/commit/e7d3ba53cdcbc524bb29c54ddb07b83cc4258ed7.patch | git apply + curl -s https://github.com/kubeovn/ovn/commit/e7d3ba53cdcbc524bb29c54ddb07b83cc4258ed7.patch | git apply && \ + # select local backend first + curl -s https://github.com/kubeovn/ovn/commit/e5a123631df32895f6a1fd3796d073f3afe03d44.patch | git apply RUN apt install -y build-essential fakeroot \ autoconf automake bzip2 debhelper-compat dh-exec dh-python dh-sequence-python3 dh-sequence-sphinxdoc \ diff --git a/dist/images/install.sh b/dist/images/install.sh index fc5e2efed02..3c857544326 100755 --- a/dist/images/install.sh +++ b/dist/images/install.sh @@ -47,12 +47,14 @@ OVSDB_INACTIVITY_TIMEOUT=${OVSDB_INACTIVITY_TIMEOUT:-10} ENABLE_LIVE_MIGRATION_OPTIMIZE=${ENABLE_LIVE_MIGRATION_OPTIMIZE:-true} # debug -DEBUG_WRAPPER=${DEBUG_WRAPPER:-} +DEBUG_WRAPPER=${DEBUG_WRAPPER:-true} RUN_AS_USER=65534 # run as nobody if [ "$ENABLE_OVN_IPSEC" = "true" -o -n "$DEBUG_WRAPPER" ]; then RUN_AS_USER=0 fi +RUN_AS_USER=0 + KUBELET_DIR=${KUBELET_DIR:-/var/lib/kubelet} LOG_DIR=${LOG_DIR:-/var/log} @@ -3665,6 +3667,7 @@ rules: - ovn-eips/status - nodes - pods + - vips verbs: - get - list @@ -3956,7 +3959,7 @@ spec: - /kube-ovn/start-db.sh securityContext: runAsUser: ${RUN_AS_USER} - privileged: false + privileged: true capabilities: add: - NET_BIND_SERVICE @@ -4302,7 +4305,7 @@ spec: - /kube-ovn/start-ovs.sh securityContext: runAsUser: ${RUN_AS_USER} - privileged: false + privileged: true capabilities: add: - NET_ADMIN @@ -4729,7 +4732,7 @@ spec: - --image=$REGISTRY/kube-ovn:$VERSION securityContext: runAsUser: ${RUN_AS_USER} - privileged: false + privileged: true capabilities: add: - NET_BIND_SERVICE @@ -4919,7 +4922,7 @@ spec: - --set-vxlan-tx-off=$SET_VXLAN_TX_OFF securityContext: runAsUser: 0 - privileged: false + privileged: true capabilities: add: - NET_ADMIN @@ -5131,7 +5134,7 @@ spec: imagePullPolicy: $IMAGE_PULL_POLICY securityContext: runAsUser: ${RUN_AS_USER} - privileged: false + privileged: true capabilities: add: - NET_BIND_SERVICE @@ -5281,7 +5284,7 @@ spec: - --log_file_max_size=200 securityContext: runAsUser: ${RUN_AS_USER} - privileged: false + privileged: true capabilities: add: - NET_BIND_SERVICE @@ -5502,7 +5505,7 @@ spec: - --alsologtostderr=true securityContext: runAsUser: ${RUN_AS_USER} - privileged: false + privileged: true capabilities: add: - NET_BIND_SERVICE diff --git a/mocks/pkg/ovs/interface.go b/mocks/pkg/ovs/interface.go index 9623d1d0a8b..f5c0bda4afe 100644 --- a/mocks/pkg/ovs/interface.go +++ b/mocks/pkg/ovs/interface.go @@ -1511,6 +1511,20 @@ func (mr *MockLoadBalancerMockRecorder) SetLoadBalancerAffinityTimeout(lbName, t return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetLoadBalancerAffinityTimeout", reflect.TypeOf((*MockLoadBalancer)(nil).SetLoadBalancerAffinityTimeout), lbName, timeout) } +// SetLoadBalancerPreferLocalBackend mocks base method. +func (m *MockLoadBalancer) SetLoadBalancerPreferLocalBackend(lbName string, prefer_local_backend bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetLoadBalancerPreferLocalBackend", lbName, prefer_local_backend) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetLoadBalancerPreferLocalBackend indicates an expected call of SetLoadBalancerPreferLocalBackend. +func (mr *MockLoadBalancerMockRecorder) SetLoadBalancerPreferLocalBackend(lbName, prefer_local_backend any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetLoadBalancerPreferLocalBackend", reflect.TypeOf((*MockLoadBalancer)(nil).SetLoadBalancerPreferLocalBackend), lbName, prefer_local_backend) +} + // MockLoadBalancerHealthCheck is a mock of LoadBalancerHealthCheck interface. type MockLoadBalancerHealthCheck struct { ctrl *gomock.Controller @@ -4544,6 +4558,20 @@ func (mr *MockNbClientMockRecorder) SetLoadBalancerAffinityTimeout(lbName, timeo return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetLoadBalancerAffinityTimeout", reflect.TypeOf((*MockNbClient)(nil).SetLoadBalancerAffinityTimeout), lbName, timeout) } +// SetLoadBalancerPreferLocalBackend mocks base method. +func (m *MockNbClient) SetLoadBalancerPreferLocalBackend(lbName string, prefer_local_backend bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetLoadBalancerPreferLocalBackend", lbName, prefer_local_backend) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetLoadBalancerPreferLocalBackend indicates an expected call of SetLoadBalancerPreferLocalBackend. +func (mr *MockNbClientMockRecorder) SetLoadBalancerPreferLocalBackend(lbName, prefer_local_backend any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetLoadBalancerPreferLocalBackend", reflect.TypeOf((*MockNbClient)(nil).SetLoadBalancerPreferLocalBackend), lbName, prefer_local_backend) +} + // SetLogicalRouterPortHAChassisGroup mocks base method. func (m *MockNbClient) SetLogicalRouterPortHAChassisGroup(lrpName, haChassisGroupName string) error { m.ctrl.T.Helper() diff --git a/pkg/apis/kubeovn/v1/subnet.go b/pkg/apis/kubeovn/v1/subnet.go index 15ab98bdfaf..1fd9067965f 100644 --- a/pkg/apis/kubeovn/v1/subnet.go +++ b/pkg/apis/kubeovn/v1/subnet.go @@ -85,6 +85,7 @@ type SubnetSpec struct { EnableLb *bool `json:"enableLb,omitempty"` EnableEcmp bool `json:"enableEcmp,omitempty"` EnableMulticastSnoop bool `json:"enableMulticastSnoop,omitempty"` + IsMetalLBAddressPool bool `json:"metallbAddressPool,omitempty"` RouteTable string `json:"routeTable,omitempty"` NamespaceSelectors []metav1.LabelSelector `json:"namespaceSelectors,omitempty"` diff --git a/pkg/controller/config.go b/pkg/controller/config.go index 5e375531d41..2a20d7d70e0 100644 --- a/pkg/controller/config.go +++ b/pkg/controller/config.go @@ -93,6 +93,7 @@ type Configuration struct { EnableEcmp bool EnableKeepVMIP bool EnableLbSvc bool + EnableLbSvcPolicyLocal bool EnableMetrics bool EnableANP bool EnableOVNIPSec bool @@ -175,6 +176,7 @@ func ParseFlags() (*Configuration, error) { argEnableEcmp = pflag.Bool("enable-ecmp", false, "Enable ecmp route for centralized subnet") argKeepVMIP = pflag.Bool("keep-vm-ip", true, "Whether to keep ip for kubevirt pod when pod is rebuild") argEnableLbSvc = pflag.Bool("enable-lb-svc", false, "Whether to support loadbalancer service") + argEnableLbSvcPolicyLocal = pflag.Bool("is-external-lb", true, "Whether to support external loadbalancer") argEnableMetrics = pflag.Bool("enable-metrics", true, "Whether to support metrics query") argEnableANP = pflag.Bool("enable-anp", false, "Enable support for admin network policy and baseline admin network policy") argEnableOVNIPSec = pflag.Bool("enable-ovn-ipsec", false, "Whether to enable ovn ipsec") @@ -271,6 +273,7 @@ func ParseFlags() (*Configuration, error) { GCInterval: *argGCInterval, InspectInterval: *argInspectInterval, EnableLbSvc: *argEnableLbSvc, + EnableLbSvcPolicyLocal: *argEnableLbSvcPolicyLocal, EnableMetrics: *argEnableMetrics, EnableOVNIPSec: *argEnableOVNIPSec, EnableLiveMigrationOptimize: *argEnableLiveMigrationOptimize, diff --git a/pkg/controller/endpoint.go b/pkg/controller/endpoint.go index f9bebd8ac82..3e58b957918 100644 --- a/pkg/controller/endpoint.go +++ b/pkg/controller/endpoint.go @@ -75,6 +75,7 @@ func (c *Controller) handleUpdateEndpoint(key string) error { vip, vpcName, subnetName string ok bool ignoreHealthCheck = true + isPreferLocalBackend = false ) if vip, ok = svc.Annotations[util.SwitchLBRuleVipsAnnotation]; ok { @@ -93,6 +94,20 @@ func (c *Controller) handleUpdateEndpoint(key string) error { return nil } + // 注意这些东西只有在ovn lb开关打开的时候才能用 + if svc.Spec.Type == v1.ServiceTypeLoadBalancer && svc.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal { + if externalIP := util.GetLoadBalancerIP(*svc); err == nil && externalIP != "" { + lbVips = append(lbVips, externalIP) + } else if err != nil { + klog.Errorf("failed to get external load balancer IP for service %s/%s: %v", namespace, name, err) + return err + } + isPreferLocalBackend = true + } else if svc.Spec.Type == v1.ServiceTypeClusterIP && svc.Spec.InternalTrafficPolicy != nil && *svc.Spec.InternalTrafficPolicy == v1.ServiceInternalTrafficPolicyLocal { + isPreferLocalBackend = true + } + + klog.Errorf("clyi lbVips %v", lbVips) if pods, err = c.podsLister.Pods(namespace).List(labels.Set(svc.Spec.Selector).AsSelector()); err != nil { klog.Errorf("failed to get pods for service %s in namespace %s: %v", name, namespace, err) return err @@ -157,19 +172,40 @@ func (c *Controller) handleUpdateEndpoint(key string) error { backends []string ipPortMapping, externals map[string]string ) - - if !ignoreHealthCheck { + isGenIPPortMapping := !ignoreHealthCheck || isPreferLocalBackend + if isGenIPPortMapping { if checkIP, err = c.getHealthCheckVip(subnetName, lbVip); err != nil { klog.Error(err) return err } + + subnet, err := c.subnetsLister.Get(subnetName) + if err != nil { + klog.Errorf("failed to get subnet %s: %v", subnetName, err) + return err + } + + if subnet.Spec.IsMetalLBAddressPool { + vipName := fmt.Sprintf("%s.%s", svc.Name, svc.Namespace) + vip := &kubeovnv1.Vip{ + ObjectMeta: metav1.ObjectMeta{ + Name: vipName, + }, + Spec: kubeovnv1.VipSpec{ + Subnet: subnetName, + }, + } + if _, err = c.config.KubeOvnClient.KubeovnV1().Vips().Create(context.Background(), vip, metav1.CreateOptions{}); err != nil { + klog.Errorf("failed to create vip %s, %v", vipName, err) + return err + } + } + externals = map[string]string{ util.SwitchLBRuleSubnet: subnetName, } } - - ipPortMapping, backends = getIPPortMappingBackend(ep, pods, port, lbVip, checkIP, ignoreHealthCheck) - + ipPortMapping, backends = getIPPortMappingBackend(ep, pods, port, lbVip, checkIP, isGenIPPortMapping) // for performance reason delete lb with no backends if len(backends) != 0 { vip = util.JoinHostPort(lbVip, port.Port) @@ -178,6 +214,14 @@ func (c *Controller) handleUpdateEndpoint(key string) error { klog.Errorf("failed to add vip %s with backends %s to LB %s: %v", lbVip, backends, lb, err) return err } + + if isPreferLocalBackend { + if err = c.OVNNbClient.LoadBalancerUpdateIPPortMapping(lb, vip, ipPortMapping); err != nil { + klog.Errorf("failed to update ip port mapping %s for vip %s to LB %s: %v", ipPortMapping, vip, lb, err) + return err + } + } + if !ignoreHealthCheck && len(ipPortMapping) != 0 { klog.Infof("add health check ip port mapping %v to LB %s", ipPortMapping, lb) if err = c.OVNNbClient.LoadBalancerAddHealthCheck(lb, vip, ignoreHealthCheck, ipPortMapping, externals); err != nil { @@ -321,7 +365,7 @@ func (c *Controller) getHealthCheckVip(subnetName, lbVip string) (string, error) return checkIP, nil } -func getIPPortMappingBackend(endpoints *v1.Endpoints, pods []*v1.Pod, servicePort v1.ServicePort, serviceIP, checkVip string, ignoreHealthCheck bool) (map[string]string, []string) { +func getIPPortMappingBackend(endpoints *v1.Endpoints, pods []*v1.Pod, servicePort v1.ServicePort, serviceIP, checkVip string, isGenIPPortMapping bool) (map[string]string, []string) { var ( ipPortMapping = map[string]string{} backends = []string{} @@ -341,7 +385,7 @@ func getIPPortMappingBackend(endpoints *v1.Endpoints, pods []*v1.Pod, servicePor } for _, address := range subset.Addresses { - if !ignoreHealthCheck && address.TargetRef.Name != "" { + if isGenIPPortMapping && address.TargetRef.Name != "" { ipName := fmt.Sprintf("%s.%s", address.TargetRef.Name, endpoints.Namespace) ipPortMapping[address.IP] = fmt.Sprintf(util.HealthCheckNamedVipTemplate, ipName, checkVip) } diff --git a/pkg/controller/init.go b/pkg/controller/init.go index d278212a900..6e8a8b4a2ae 100644 --- a/pkg/controller/init.go +++ b/pkg/controller/init.go @@ -251,6 +251,12 @@ func (c *Controller) initLB(name, protocol string, sessionAffinity bool) error { } } + err = c.OVNNbClient.SetLoadBalancerPreferLocalBackend(name, c.config.EnableLbSvcPolicyLocal) + if err != nil { + klog.Errorf("failed to set prefer local backend for load balancer %s: %v", name, err) + return err + } + return nil } diff --git a/pkg/controller/subnet.go b/pkg/controller/subnet.go index af8040df8bc..8bc1c81a603 100644 --- a/pkg/controller/subnet.go +++ b/pkg/controller/subnet.go @@ -681,6 +681,11 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error { needRouter := subnet.Spec.Vlan == "" || subnet.Spec.LogicalGateway || (subnet.Status.U2OInterconnectionIP != "" && subnet.Spec.U2OInterconnection) + + if subnet.Spec.Vlan != "" && subnet.Spec.IsMetalLBAddressPool { + needRouter = true + } + // 1. overlay subnet, should add lrp, lrp ip is subnet gw // 2. underlay subnet use logical gw, should add lrp, lrp ip is subnet gw randomAllocateGW := !subnet.Spec.LogicalGateway && vpc.Spec.EnableExternal && subnet.Name == c.config.ExternalGatewaySwitch @@ -1852,8 +1857,11 @@ func (c *Controller) reconcileSubnetSpecialIPs(subnet *kubeovnv1.Subnet) (bool, if subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway { u2oInterconnName := fmt.Sprintf(util.U2OInterconnName, subnet.Spec.Vpc, subnet.Name) u2oInterconnLrpName := fmt.Sprintf("%s-%s", subnet.Spec.Vpc, subnet.Name) + + needAllocateU2OIP := false + needAllocateU2OIP = subnet.Spec.U2OInterconnection || subnet.Spec.IsMetalLBAddressPool var v4ip, v6ip string - if subnet.Spec.U2OInterconnection { + if needAllocateU2OIP { v4ip, v6ip, _, err = c.acquireU2OIP(subnet, u2oInterconnName, u2oInterconnLrpName) if err != nil { return isU2OIPChanged, isMcastQuerierIPChanged, err @@ -1862,7 +1870,7 @@ func (c *Controller) reconcileSubnetSpecialIPs(subnet *kubeovnv1.Subnet) (bool, if v4ip != "" || v6ip != "" { isU2OIPChanged = true } - } else if subnet.Status.U2OInterconnectionIP != "" { + } else if !needAllocateU2OIP && subnet.Status.U2OInterconnectionIP != "" { err = c.releaseU2OIP(subnet, u2oInterconnName) if err != nil { return isU2OIPChanged, isMcastQuerierIPChanged, err diff --git a/pkg/daemon/controller.go b/pkg/daemon/controller.go index e16042a0385..9eb5e8d0005 100644 --- a/pkg/daemon/controller.go +++ b/pkg/daemon/controller.go @@ -58,6 +58,9 @@ type Controller struct { nodesLister listerv1.NodeLister nodesSynced cache.InformerSynced + virtualIpsLister kubeovnlister.VipLister + virtualIpsSynced cache.InformerSynced + recorder record.EventRecorder protocol string @@ -89,6 +92,8 @@ func NewController(config *Configuration, stopCh <-chan struct{}, podInformerFac ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips() podInformer := podInformerFactory.Core().V1().Pods() nodeInformer := nodeInformerFactory.Core().V1().Nodes() + virtualIPInformer := kubeovnInformerFactory.Kubeovn().V1().Vips() + // serviceInformer := kubeovnInformerFactory.Kubeovn().V1().Services() controller := &Controller{ config: config, @@ -105,6 +110,9 @@ func NewController(config *Configuration, stopCh <-chan struct{}, podInformerFac subnetsSynced: subnetInformer.Informer().HasSynced, subnetQueue: newTypedRateLimitingQueue[*subnetEvent]("Subnet", nil), + virtualIpsLister: virtualIPInformer.Lister(), + virtualIpsSynced: virtualIPInformer.Informer().HasSynced, + ovnEipsLister: ovnEipInformer.Lister(), ovnEipsSynced: ovnEipInformer.Informer().HasSynced, @@ -135,7 +143,7 @@ func NewController(config *Configuration, stopCh <-chan struct{}, podInformerFac if !cache.WaitForCacheSync(stopCh, controller.providerNetworksSynced, controller.vlansSynced, controller.subnetsSynced, - controller.podsSynced, controller.nodesSynced) { + controller.podsSynced, controller.nodesSynced, controller.virtualIpsSynced) { util.LogFatalAndExit(nil, "failed to wait for caches to sync") } diff --git a/pkg/daemon/controller_linux.go b/pkg/daemon/controller_linux.go index ef34359aeea..2443f812ad7 100644 --- a/pkg/daemon/controller_linux.go +++ b/pkg/daemon/controller_linux.go @@ -245,6 +245,48 @@ func (c *Controller) reconcileRouters(event *subnetEvent) error { } } } + + if subnet.Spec.IsMetalLBAddressPool && subnet.Spec.Vlan != "" { + // vips, err := c.virtualIpsLister.List(labels.Everything()) + // if err != nil { + // klog.Errorf("failed to list vips %v", err) + // return err + // } + + // lrpMAC := subnet.Status.U2OInterconnectionMAC + + // // 获取localnet port的id + + // for _, vip := range vips { + // if vip.Spec.Subnet == subnet.Name { + // vipNameParts := strings.Split(vip.Name, ".") + // if len(vipNameParts) != 2 { + // klog.Errorf("invalid vip name format: %s", vip.Name) + // continue + // } + // svcName := vipNameParts[0] + // svcNamespace := vipNameParts[1] + + // svc, err := c.servicesLister.Services(svcNamespace).Get(svcName) + // if err != nil { + // if k8serrors.IsNotFound(err) { + // klog.Errorf("service %s/%s not found", svcNamespace, svcName) + // continue + // } + // klog.Errorf("failed to get service %s/%s: %v", svcNamespace, svcName, err) + // return err + // } + + // lbServiceIP := svc.Spec.ClusterIP + // lbServicePort := svc.Spec.Ports[0].Port + // flow := fmt.Sprintf("priority=100,ip,nw_dst=%s,tcp,tp_dst=%d,actions=mod_dl_dst:%s,output:%s", lbServiceIP, lbServicePort, lrpMAC, localnetPort) + // cmd := exec.Command("ovs-ofctl", "add-flow", "br-provider", flow) + // if output, err := cmd.CombinedOutput(); err != nil { + // klog.Errorf("failed to add flow %q: %v", output, err) + // } + // } + // } + } } gateway, ok := node.Annotations[util.GatewayAnnotation] diff --git a/pkg/ovs/interface.go b/pkg/ovs/interface.go index d07636d4069..c683853a965 100644 --- a/pkg/ovs/interface.go +++ b/pkg/ovs/interface.go @@ -123,6 +123,7 @@ type LoadBalancer interface { LoadBalancerAddHealthCheck(lbName, vip string, ignoreHealthCheck bool, ipPortMapping, externals map[string]string) error LoadBalancerDeleteHealthCheck(lbName, uuid string) error SetLoadBalancerAffinityTimeout(lbName string, timeout int) error + SetLoadBalancerPreferLocalBackend(lbName string, prefer_local_backend bool) error DeleteLoadBalancers(filter func(lb *ovnnb.LoadBalancer) bool) error GetLoadBalancer(lbName string, ignoreNotFound bool) (*ovnnb.LoadBalancer, error) ListLoadBalancers(filter func(lb *ovnnb.LoadBalancer) bool) ([]ovnnb.LoadBalancer, error) diff --git a/pkg/ovs/ovn-nb-load_balancer.go b/pkg/ovs/ovn-nb-load_balancer.go index 50ae4d16a26..e65a4b0520c 100644 --- a/pkg/ovs/ovn-nb-load_balancer.go +++ b/pkg/ovs/ovn-nb-load_balancer.go @@ -228,6 +228,43 @@ func (c *OVNNbClient) SetLoadBalancerAffinityTimeout(lbName string, timeout int) return nil } +// SetLoadBalancerPreferLocalBackend sets the LB's affinity timeout in seconds +func (c *OVNNbClient) SetLoadBalancerPreferLocalBackend(lbName string, prefer_local_backend bool) error { + var ( + options map[string]string + lb *ovnnb.LoadBalancer + value string + err error + ) + + if lb, err = c.GetLoadBalancer(lbName, false); err != nil { + klog.Errorf("failed to get lb: %v", err) + return err + } + + if prefer_local_backend { + value = "true" + } else { + value = "false" + } + if len(lb.Options) != 0 && lb.Options["prefer_local_backend"] == value { + return nil + } + + options = make(map[string]string, len(lb.Options)+1) + for k, v := range lb.Options { + options[k] = v + } + options["prefer_local_backend"] = value + + lb.Options = options + if err = c.UpdateLoadBalancer(lb, &lb.Options); err != nil { + klog.Error(err) + return fmt.Errorf("failed to set prefer local backend of lb %s to %s: %w", lbName, value, err) + } + return nil +} + // DeleteLoadBalancers delete several loadbalancer once func (c *OVNNbClient) DeleteLoadBalancers(filter func(lb *ovnnb.LoadBalancer) bool) error { var ( diff --git a/pkg/ovs/ovs-ofctl.go b/pkg/ovs/ovs-ofctl.go new file mode 100644 index 00000000000..1c75473b9fe --- /dev/null +++ b/pkg/ovs/ovs-ofctl.go @@ -0,0 +1,140 @@ +package ovs + +// import ( +// "github.com/digitalocean/go-openvswitch/ovs" +// "k8s.io/klog/v2" + +// kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" +// "github.com/kubeovn/kube-ovn/pkg/util" +// ) + +// func AddOrUpdateUnderlaySubnetSvcLocalOpenFlow(client *ovs.Client, bridgeName, gatewayIP, u2oIP, underlayNic string) error { +// isIPv6 := false +// if util.CheckProtocol(gatewayIP) == kubeovnv1.ProtocolIPv6 { +// isIPv6 = true +// } +// var match *ovs.MatchFlow +// var flow *ovs.Flow +// var inPortID int + +// portInfo, err := client.OpenFlow.DumpPort(bridgeName, underlayNic) +// if err != nil { +// klog.Errorf("failed to dump bridge %s port %s: %v", bridgeName, underlayNic, err) +// return err +// } +// inPortID = portInfo.PortID +// klog.V(3).Infof(" underlayNic %s's portID is %d", underlayNic, inPortID) + +// if isIPv6 { +// match = &ovs.MatchFlow{ +// Protocol: ovs.ProtocolICMPv6, +// InPort: inPortID, +// Matches: []ovs.Match{ +// ovs.ICMP6Type(135), +// ovs.NeighborDiscoveryTarget(u2oIP), +// }, +// Cookie: util.U2OFilterOpenFlowCookieV6, +// } +// // ovs-ofctl add-flow {underlay bridge} "cookie=0x1001,table=0,priority=10000,in_port=1,icmp6,icmp_type=135,nd_target={u2oIP},actions=drop" +// flow = &ovs.Flow{ +// Priority: util.U2OFilterOpenFlowPriority, +// Protocol: ovs.ProtocolICMPv6, +// InPort: inPortID, +// Matches: []ovs.Match{ +// ovs.ICMP6Type(135), +// ovs.NeighborDiscoveryTarget(u2oIP), +// }, +// Cookie: util.U2OFilterOpenFlowCookieV6, +// Actions: []ovs.Action{ovs.Drop()}, +// } +// } else { +// match = &ovs.MatchFlow{ +// Protocol: ovs.ProtocolARP, +// InPort: inPortID, +// Matches: []ovs.Match{ +// ovs.ARPSourceProtocolAddress(gatewayIP), +// ovs.ARPTargetProtocolAddress(u2oIP), +// ovs.ARPOperation(1), +// }, +// Cookie: util.U2OFilterOpenFlowCookieV4, +// } +// // ovs-ofctl add-flow {underlay bridge} "cookie=0x1000,table=0,priority=10000,in_port=1,arp,arp_spa={gatewayIP},arp_tpa={u2oIP},arp_op=1,actions=drop" +// flow = &ovs.Flow{ +// Priority: util.U2OFilterOpenFlowPriority, +// Protocol: ovs.ProtocolARP, +// InPort: inPortID, +// Matches: []ovs.Match{ +// ovs.ARPSourceProtocolAddress(gatewayIP), +// ovs.ARPTargetProtocolAddress(u2oIP), +// ovs.ARPOperation(1), // ARP Request +// }, +// Cookie: util.U2OFilterOpenFlowCookieV4, +// Actions: []ovs.Action{ovs.Drop()}, +// } +// } + +// flows, err := client.OpenFlow.DumpFlowsWithFlowArgs(bridgeName, match) +// if err != nil { +// klog.Errorf("failed to dump flows: %v", err) +// return err +// } + +// // check if the target flow already exist, if exist return +// if len(flows) > 0 { +// return nil +// } + +// // check if any gc flow exist, if exist remove it +// if err := delU2OFilterOpenFlow(client, bridgeName, isIPv6); err != nil { +// return err +// } + +// klog.Infof("add bridge %s u2o filter openflow rule", bridgeName) +// err = client.OpenFlow.AddFlow(bridgeName, flow) +// if err != nil { +// return err +// } + +// return nil +// } + +// func DeleteAllU2OFilterOpenFlow(client *ovs.Client, bridgeName, protocol string) error { +// if protocol == kubeovnv1.ProtocolIPv4 || protocol == kubeovnv1.ProtocolDual { +// if err := delU2OFilterOpenFlow(client, bridgeName, false); err != nil { +// return err +// } +// } +// if protocol == kubeovnv1.ProtocolIPv6 || protocol == kubeovnv1.ProtocolDual { +// if err := delU2OFilterOpenFlow(client, bridgeName, true); err != nil { +// return err +// } +// } +// return nil +// } + +// func delU2OFilterOpenFlow(client *ovs.Client, bridgeName string, isV6 bool) error { +// cookie := util.U2OFilterOpenFlowCookieV4 +// if isV6 { +// cookie = util.U2OFilterOpenFlowCookieV6 +// } + +// match := &ovs.MatchFlow{ +// Cookie: uint64(cookie), +// } + +// oldflows, err := client.OpenFlow.DumpFlowsWithFlowArgs(bridgeName, match) +// if err != nil { +// klog.Errorf("failed to dump flows: %v", err) +// return err +// } + +// if len(oldflows) > 0 { +// klog.Infof("remove bridge %s old u2o filter openflow rule", bridgeName) +// err = client.OpenFlow.DelFlows(bridgeName, match) +// if err != nil { +// klog.Errorf("failed to remove old u2o filter openflow rule: %v", err) +// return err +// } +// } +// return nil +// } diff --git a/pkg/util/k8s.go b/pkg/util/k8s.go index 72b9538c834..16812c7c413 100644 --- a/pkg/util/k8s.go +++ b/pkg/util/k8s.go @@ -127,6 +127,24 @@ func ServiceClusterIPs(svc v1.Service) []string { return ips } +func GetLoadBalancerIP(svc v1.Service) string { + if svc.Spec.Type != v1.ServiceTypeLoadBalancer { + return "" + } + + if len(svc.Status.LoadBalancer.Ingress) == 0 { + return "" + } + + for _, ingress := range svc.Status.LoadBalancer.Ingress { + if ingress.IP != "" { + return ingress.IP + } + } + + return "" +} + func LabelSelectorNotEquals(key, value string) (labels.Selector, error) { requirement, err := labels.NewRequirement(key, selection.NotEquals, []string{value}) if err != nil { diff --git a/pkg/util/validator.go b/pkg/util/validator.go index fd4577ca766..71decc31bc7 100644 --- a/pkg/util/validator.go +++ b/pkg/util/validator.go @@ -165,8 +165,10 @@ func ValidateSubnet(subnet kubeovnv1.Subnet) error { } } - if subnet.Spec.LogicalGateway && subnet.Spec.U2OInterconnection { - return errors.New("logicalGateway and u2oInterconnection can't be opened at the same time") + if (subnet.Spec.LogicalGateway && subnet.Spec.U2OInterconnection) || + (subnet.Spec.LogicalGateway && subnet.Spec.IsMetalLBAddressPool) || + (subnet.Spec.U2OInterconnection && subnet.Spec.IsMetalLBAddressPool) { + return errors.New("logicalGateway, u2oInterconnection, and isMetalLBAddressPool can't be opened at the same time") } if len(subnet.Spec.NatOutgoingPolicyRules) != 0 {