Skip to content

Commit

Permalink
refator subnet enable lb
Browse files Browse the repository at this point in the history
  • Loading branch information
zbb88888 committed Apr 11, 2023
1 parent a5d1b9e commit dcdc421
Show file tree
Hide file tree
Showing 15 changed files with 137 additions and 306 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ CONTROL_PLANE_TAINTS = node-role.kubernetes.io/master node-role.kubernetes.io/co

CHART_UPGRADE_RESTART_OVS=$(shell echo $${CHART_UPGRADE_RESTART_OVS:-false})

MULTUS_IMAGE = ghcr.lank8s.cn/k8snetworkplumbingwg/multus-cni:snapshot
MULTUS_IMAGE = ghcr.io/k8snetworkplumbingwg/multus-cni:snapshot
MULTUS_YAML = https://raw.githubusercontent.com/k8snetworkplumbingwg/multus-cni/master/deployments/multus-daemonset.yml

KUBEVIRT_VERSION = v0.58.0
Expand Down
6 changes: 3 additions & 3 deletions pkg/apis/kubeovn/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ type SubnetSpec struct {

Acls []Acl `json:"acls,omitempty"`

U2OInterconnection bool `json:"u2oInterconnection,omitempty"`
EnableLb *bool `json:"enableLb,omitempty"`
EnableEcmp bool `json:"enableEcmp,omitempty"`
U2OInterconnection bool `json:"u2oInterconnection,omitempty"`
EnableLb bool `json:"enableLb,omitempty"`
EnableEcmp bool `json:"enableEcmp,omitempty"`
}

type Acl struct {
Expand Down
5 changes: 0 additions & 5 deletions pkg/apis/kubeovn/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/controller/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (c *Controller) initDefaultLogicalSwitch() error {
NatOutgoing: true,
GatewayType: kubeovnv1.GWDistributedType,
Protocol: util.CheckProtocol(c.config.DefaultCIDR),
EnableLb: &c.config.EnableLb,
EnableLb: c.config.EnableLb,
},
}
if c.config.NetworkType == util.NetworkTypeVlan {
Expand Down
99 changes: 57 additions & 42 deletions pkg/controller/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,7 @@ func (c *Controller) enqueueUpdateSubnet(old, new interface{}) {
oldSubnet.Spec.EnableIPv6RA != newSubnet.Spec.EnableIPv6RA ||
oldSubnet.Spec.IPv6RAConfigs != newSubnet.Spec.IPv6RAConfigs ||
oldSubnet.Spec.Protocol != newSubnet.Spec.Protocol ||
(oldSubnet.Spec.EnableLb == nil && newSubnet.Spec.EnableLb != nil) ||
(oldSubnet.Spec.EnableLb != nil && newSubnet.Spec.EnableLb == nil) ||
(oldSubnet.Spec.EnableLb != nil && newSubnet.Spec.EnableLb != nil && *oldSubnet.Spec.EnableLb != *newSubnet.Spec.EnableLb) ||
oldSubnet.Spec.EnableLb != newSubnet.Spec.EnableLb ||
oldSubnet.Spec.EnableEcmp != newSubnet.Spec.EnableEcmp ||
!reflect.DeepEqual(oldSubnet.Spec.Acls, newSubnet.Spec.Acls) ||
oldSubnet.Spec.U2OInterconnection != newSubnet.Spec.U2OInterconnection {
Expand Down Expand Up @@ -313,8 +311,10 @@ func formatSubnet(subnet *kubeovnv1.Subnet, c *Controller) error {
changed = true
}
if subnet.Spec.Vpc == "" {
if subnet.Spec.Provider != "" && strings.HasSuffix(subnet.Spec.Provider, util.OvnProvider) {
if subnet.Spec.Provider != "" && !strings.HasSuffix(subnet.Spec.Provider, util.OvnProvider) {
klog.Infof("subnet %s is not ovn subnet, no vpc", subnet.Name)
changed = true
subnet.Spec.Vpc = ""
} else {
changed = true
subnet.Spec.Vpc = util.DefaultVpc
Expand All @@ -332,18 +332,20 @@ func formatSubnet(subnet *kubeovnv1.Subnet, c *Controller) error {
}
}

if subnet.Spec.EnableLb == nil && subnet.Name != c.config.NodeSwitch {
if subnet.Spec.Provider != "" && strings.HasSuffix(subnet.Spec.Provider, util.OvnProvider) {
klog.Infof("subnet %s is not ovn subnet, can not enable lb", subnet.Name)
if !subnet.Spec.EnableLb && subnet.Name != c.config.NodeSwitch {
if subnet.Spec.Provider != "" && !strings.HasSuffix(subnet.Spec.Provider, util.OvnProvider) {
klog.Infof("subnet %s is non ovn subnet, disable switch lb", subnet.Name)
changed = true
subnet.Spec.EnableLb = false
} else {
changed = true
subnet.Spec.EnableLb = &c.config.EnableLb
subnet.Spec.EnableLb = c.config.EnableLb
}
}
// set join subnet Spec.EnableLb to nil
if subnet.Spec.EnableLb != nil && subnet.Name == c.config.NodeSwitch {
if subnet.Spec.EnableLb && subnet.Name == c.config.NodeSwitch {
changed = true
subnet.Spec.EnableLb = nil
subnet.Spec.EnableLb = false
}

klog.Infof("format subnet %v, changed %v", subnet.Name, changed)
Expand Down Expand Up @@ -491,7 +493,10 @@ func (c Controller) patchSubnetStatus(subnet *kubeovnv1.Subnet, reason string, e
} else {
subnet.Status.Validated(reason, "")
c.recorder.Eventf(subnet, v1.EventTypeNormal, reason, errStr)
if reason == "SetPrivateLogicalSwitchSuccess" || reason == "ResetLogicalSwitchAclSuccess" || reason == "ReconcileCentralizedGatewaySuccess" {
if reason == "SetPrivateLogicalSwitchSuccess" ||
reason == "ResetLogicalSwitchAclSuccess" ||
reason == "ReconcileCentralizedGatewaySuccess" ||
reason == "SetNonOvnSubnetSuccess" {
subnet.Status.Ready(reason, "")
}
}
Expand Down Expand Up @@ -549,11 +554,43 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
return err
}

if subnet.Spec.Protocol == kubeovnv1.ProtocolDual {
err = calcDualSubnetStatusIP(subnet, c)
} else {
err = calcSubnetStatusIP(subnet, c)
}
if err != nil {
klog.Errorf("calculate subnet %s used ip failed, %v", subnet.Name, err)
return err
}

if err := c.ipam.AddOrUpdateSubnet(subnet.Name, subnet.Spec.CIDRBlock, subnet.Spec.Gateway, subnet.Spec.ExcludeIps); err != nil {
return err
}

if err = util.ValidateSubnet(*subnet); err != nil {
klog.Errorf("failed to validate subnet %s, %v", subnet.Name, err)
c.patchSubnetStatus(subnet, "ValidateLogicalSwitchFailed", err.Error())
return err
} else {
c.patchSubnetStatus(subnet, "ValidateLogicalSwitchSuccess", "")
}

if subnet.Spec.Vpc == "" && !isOvnSubnet(subnet) {
// subnet provider is not ovn, and vpc is empty, should not reconcile
c.patchSubnetStatus(subnet, "SetNonOvnSubnetSuccess", "")

subnet.Status.EnsureStandardConditions()
klog.Infof("non ovn subnet %s is ready", subnet.Name)
return nil
}

vpc, err := c.vpcsLister.Get(subnet.Spec.Vpc)
if err != nil {
klog.Errorf("failed to get subnet's vpc '%s', %v", subnet.Spec.Vpc, err)
return err
}

if !vpc.Status.Standby {
err = fmt.Errorf("the vpc '%s' not standby yet", vpc.Name)
klog.Error(err)
Expand Down Expand Up @@ -583,20 +620,6 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
}
}

if subnet.Spec.Protocol == kubeovnv1.ProtocolDual {
err = calcDualSubnetStatusIP(subnet, c)
} else {
err = calcSubnetStatusIP(subnet, c)
}
if err != nil {
klog.Errorf("calculate subnet %s used ip failed, %v", subnet.Name, err)
return err
}

if err := c.ipam.AddOrUpdateSubnet(subnet.Name, subnet.Spec.CIDRBlock, subnet.Spec.Gateway, subnet.Spec.ExcludeIps); err != nil {
return err
}

if err := c.reconcileU2OInterconnectionIP(subnet); err != nil {
klog.Errorf("failed to reconcile underlay subnet %s to overlay interconnection %v", subnet.Name, err)
return err
Expand All @@ -606,14 +629,6 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
return nil
}

if err = util.ValidateSubnet(*subnet); err != nil {
klog.Errorf("failed to validate subnet %s, %v", subnet.Name, err)
c.patchSubnetStatus(subnet, "ValidateLogicalSwitchFailed", err.Error())
return err
} else {
c.patchSubnetStatus(subnet, "ValidateLogicalSwitchSuccess", "")
}

subnetList, err := c.subnetsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list subnets %v", err)
Expand Down Expand Up @@ -709,16 +724,16 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
}
}

lbs := []string{
vpc.Status.TcpLoadBalancer,
vpc.Status.TcpSessionLoadBalancer,
vpc.Status.UdpLoadBalancer,
vpc.Status.UdpSessionLoadBalancer,
vpc.Status.SctpLoadBalancer,
vpc.Status.SctpSessionLoadBalancer,
}
if c.config.EnableLb && subnet.Name != c.config.NodeSwitch {
if subnet.Spec.EnableLb != nil && *subnet.Spec.EnableLb {
lbs := []string{
vpc.Status.TcpLoadBalancer,
vpc.Status.TcpSessionLoadBalancer,
vpc.Status.UdpLoadBalancer,
vpc.Status.UdpSessionLoadBalancer,
vpc.Status.SctpLoadBalancer,
vpc.Status.SctpSessionLoadBalancer,
}
if subnet.Spec.EnableLb {
if err := c.ovnClient.LogicalSwitchUpdateLoadBalancers(subnet.Name, ovsdb.MutateOperationInsert, lbs...); err != nil {
c.patchSubnetStatus(subnet, "AddLbToLogicalSwitchFailed", err.Error())
return err
Expand Down
3 changes: 1 addition & 2 deletions pkg/controller/vpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,8 +833,7 @@ func (c *Controller) handleAddVpcExternal(key string) error {
vpc.Name, types.MergePatchType, bytes, metav1.PatchOptions{}, "status"); err != nil {
return err
}
cachedEip, err = c.ovnEipsLister.Get(lrpEipName)
if err != nil {
if _, err = c.ovnEipsLister.Get(lrpEipName); err != nil {
return err
}
if err := c.patchLrpOvnEipEnableBfdLabel(lrpEipName, vpc.Spec.EnableBfd); err != nil {
Expand Down
38 changes: 5 additions & 33 deletions pkg/controller/vpc_nat_gw_eip.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ package controller
import (
"context"
"fmt"
"net"
"strings"

k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"net"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"strings"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/kubeovn/kube-ovn/pkg/ovs"
Expand Down Expand Up @@ -233,10 +234,6 @@ func (c *Controller) handleAddIptablesEip(key string) error {
return err
}
}
if err = c.patchEipIP(key, v4ip); err != nil {
klog.Errorf("failed to patch status for eip %s, %v", key, err)
return err
}
if eipV4Cidr, err = c.getEipV4Cidr(v4ip); err != nil {
klog.Errorf("failed to get eip cidr, err: %v", err)
return err
Expand Down Expand Up @@ -545,12 +542,12 @@ func (c *Controller) createOrUpdateCrdEip(key, v4ip, v6ip, mac, natGwDp string)
}
} else {
eip := cachedEip.DeepCopy()
if v4ip != "" && mac != "" {
if v4ip != "" {
klog.V(3).Infof("update eip cr %s", key)
eip.Spec.MacAddress = mac
eip.Spec.V4ip = v4ip
eip.Spec.V6ip = v6ip
eip.Spec.NatGwDp = natGwDp
eip.Spec.MacAddress = mac
if _, err := c.config.KubeOvnClient.KubeovnV1().IptablesEIPs().Update(context.Background(), eip, metav1.UpdateOptions{}); err != nil {
errMsg := fmt.Errorf("failed to update eip crd %s, %v", key, err)
klog.Error(errMsg)
Expand Down Expand Up @@ -672,31 +669,6 @@ func (c *Controller) handleDelIptablesEipFinalizer(key string) error {
return nil
}

func (c *Controller) patchEipIP(key, v4ip string) error {
oriEip, err := c.iptablesEipsLister.Get(key)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
return err
}
eip := oriEip.DeepCopy()
eip.Status.IP = v4ip
bytes, err := eip.Status.Bytes()
if err != nil {
return err
}
if _, err = c.config.KubeOvnClient.KubeovnV1().IptablesEIPs().Patch(context.Background(), key, types.MergePatchType,
bytes, metav1.PatchOptions{}, "status"); err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
klog.Errorf("failed to patch eip %s, %v", eip.Name, err)
return err
}
return nil
}

func (c *Controller) patchEipStatus(key, v4ip, redo, nat string, ready bool) error {
oriEip, err := c.iptablesEipsLister.Get(key)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions test/e2e/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
"k8s.io/kubernetes/test/e2e/framework"
admissionapi "k8s.io/pod-security-admission/api"

nadclientcs "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned"
attachnetclientset "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned"

"github.com/onsi/ginkgo/v2"

kubeovncs "github.com/kubeovn/kube-ovn/pkg/client/clientset/versioned"
Expand All @@ -27,8 +28,7 @@ type Framework struct {
KubeContext string
*framework.Framework
KubeOVNClientSet kubeovncs.Interface
NadClient nadclientcs.Interface

AttachNetClient attachnetclientset.Interface
// master/release-1.10/...
ClusterVersion string
// 999.999 for master
Expand Down Expand Up @@ -124,14 +124,14 @@ func (f *Framework) BeforeEach() {
ExpectNoError(err)
}

if f.NadClient == nil {
if f.AttachNetClient == nil {
ginkgo.By("Creating a nad client")
config, err := framework.LoadConfig()
ExpectNoError(err)

config.QPS = f.Options.ClientQPS
config.Burst = f.Options.ClientBurst
f.NadClient, err = nadclientcs.NewForConfig(config)
f.AttachNetClient, err = attachnetclientset.NewForConfig(config)
ExpectNoError(err)
}

Expand Down
2 changes: 2 additions & 0 deletions test/e2e/framework/iptables-dnat.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,10 @@ func (c *IptablesDnatClient) DeleteSync(name string) {
func (c *IptablesDnatClient) WaitToBeReady(name string, timeout time.Duration) bool {
for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) {
if c.Get(name).Status.Ready {
Logf("dnat %s is ready ", name)
return true
}
Logf("dnat %s is not ready ", name)
}
return false
}
Expand Down
5 changes: 4 additions & 1 deletion test/e2e/framework/iptables-eip.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,12 @@ func (c *IptablesEIPClient) DeleteSync(name string) {
// WaitToBeReady returns whether the iptables eip is ready within timeout.
func (c *IptablesEIPClient) WaitToBeReady(name string, timeout time.Duration) bool {
for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) {
if c.Get(name).Status.Ready {
eip := c.Get(name)
if eip.Status.Ready && eip.Status.IP != "" && eip.Spec.V4ip != "" {
Logf("eip %s is ready ", name)
return true
}
Logf("eip %s is not ready ", name)
}
return false
}
Expand Down
2 changes: 2 additions & 0 deletions test/e2e/framework/iptables-fip.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,10 @@ func (c *IptablesFIPClient) DeleteSync(name string) {
func (c *IptablesFIPClient) WaitToBeReady(name string, timeout time.Duration) bool {
for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) {
if c.Get(name).Status.Ready {
Logf("fip %s is ready ", name)
return true
}
Logf("fip %s is not ready ", name)
}
return false
}
Expand Down
2 changes: 2 additions & 0 deletions test/e2e/framework/iptables-snat.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,10 @@ func (c *IptablesSnatClient) DeleteSync(name string) {
func (c *IptablesSnatClient) WaitToBeReady(name string, timeout time.Duration) bool {
for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) {
if c.Get(name).Status.Ready {
Logf("snat %s is ready ", name)
return true
}
Logf("snat %s is not ready ", name)
}
return false
}
Expand Down
Loading

0 comments on commit dcdc421

Please sign in to comment.