Skip to content

Commit

Permalink
add kubevirt live migration optimize (#4773)
Browse files Browse the repository at this point in the history
* add kubevirt live migration optimize

Signed-off-by: clyi <clyi@alauda.io>
  • Loading branch information
changluyi committed Dec 27, 2024
1 parent 7bc30db commit 0c2ba35
Show file tree
Hide file tree
Showing 11 changed files with 778 additions and 234 deletions.
1 change: 1 addition & 0 deletions charts/kube-ovn/templates/controller-deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ spec:
- --enable-anp={{- .Values.func.ENABLE_ANP }}
- --ovsdb-con-timeout={{- .Values.func.OVSDB_CON_TIMEOUT }}
- --ovsdb-inactivity-timeout={{- .Values.func.OVSDB_INACTIVITY_TIMEOUT }}
- --enable-live-migration-optimize={{- .Values.func.ENABLE_LIVE_MIGRATION_OPTIMIZE }}
securityContext:
runAsUser: {{ include "kubeovn.runAsUser" . }}
privileged: false
Expand Down
8 changes: 8 additions & 0 deletions charts/kube-ovn/templates/ovn-CR.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,14 @@ rules:
verbs:
- approve
- sign
- apiGroups:
- kubevirt.io
resources:
- virtualmachineinstancemigrations
verbs:
- "list"
- "watch"
- "get"
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
Expand Down
1 change: 1 addition & 0 deletions charts/kube-ovn/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func:
SET_VXLAN_TX_OFF: false
OVSDB_CON_TIMEOUT: 3
OVSDB_INACTIVITY_TIMEOUT: 10
ENABLE_LIVE_MIGRATION_OPTIMIZE: true

ipv4:
POD_CIDR: "10.16.0.0/16"
Expand Down
16 changes: 16 additions & 0 deletions dist/images/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ ENABLE_ANP=${ENABLE_ANP:-false}
SET_VXLAN_TX_OFF=${SET_VXLAN_TX_OFF:-false}
OVSDB_CON_TIMEOUT=${OVSDB_CON_TIMEOUT:-3}
OVSDB_INACTIVITY_TIMEOUT=${OVSDB_INACTIVITY_TIMEOUT:-10}
ENABLE_LIVE_MIGRATION_OPTIMIZE=${ENABLE_LIVE_MIGRATION_OPTIMIZE:-true}

# debug
DEBUG_WRAPPER=${DEBUG_WRAPPER:-}
Expand Down Expand Up @@ -3211,6 +3212,20 @@ rules:
verbs:
- approve
- sign
- apiGroups:
- kubevirt.io
resources:
- virtualmachineinstancemigrations
verbs:
- "list"
- "watch"
- "get"
- apiGroups:
- apiextensions.k8s.io
resources:
- customresourcedefinitions
verbs:
- get
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
Expand Down Expand Up @@ -4329,6 +4344,7 @@ spec:
- --enable-anp=$ENABLE_ANP
- --ovsdb-con-timeout=$OVSDB_CON_TIMEOUT
- --ovsdb-inactivity-timeout=$OVSDB_INACTIVITY_TIMEOUT
- --enable-live-migration-optimize=$ENABLE_LIVE_MIGRATION_OPTIMIZE
securityContext:
runAsUser: ${RUN_AS_USER}
privileged: false
Expand Down
225 changes: 166 additions & 59 deletions go.mod

Large diffs are not rendered by default.

409 changes: 355 additions & 54 deletions go.sum

Large diffs are not rendered by default.

71 changes: 41 additions & 30 deletions pkg/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

attachnetclientset "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned"
"github.com/spf13/pflag"
extClientSet "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -39,8 +40,8 @@ type Configuration struct {
AnpClient anpclientset.Interface
AttachNetClient attachnetclientset.Interface
KubevirtClient kubecli.KubevirtClient
ExtClient extClientSet.Interface

// with no timeout
KubeFactoryClient kubernetes.Interface
KubeOvnFactoryClient clientset.Interface

Expand Down Expand Up @@ -85,16 +86,17 @@ type Configuration struct {
LsDnatModDlDst bool
LsCtSkipDstLportIPs bool

EnableLb bool
EnableNP bool
EnableEipSnat bool
EnableExternalVpc bool
EnableEcmp bool
EnableKeepVMIP bool
EnableLbSvc bool
EnableMetrics bool
EnableANP bool
EnableOVNIPSec bool
EnableLb bool
EnableNP bool
EnableEipSnat bool
EnableExternalVpc bool
EnableEcmp bool
EnableKeepVMIP bool
EnableLbSvc bool
EnableMetrics bool
EnableANP bool
EnableOVNIPSec bool
EnableLiveMigrationOptimize bool

ExternalGatewaySwitch string
ExternalGatewayConfigNS string
Expand Down Expand Up @@ -154,25 +156,26 @@ func ParseFlags() (*Configuration, error) {
argSecureServing = pflag.Bool("secure-serving", false, "Enable secure serving")
argNodePgProbeTime = pflag.Int("nodepg-probe-time", 1, "The probe interval for node port-group, the unit is minute")

argNetworkType = pflag.String("network-type", util.NetworkTypeGeneve, "The ovn network type")
argDefaultProviderName = pflag.String("default-provider-name", "provider", "The vlan or vxlan type default provider interface name")
argDefaultInterfaceName = pflag.String("default-interface-name", "", "The default host interface name in the vlan/vxlan type")
argDefaultExchangeLinkName = pflag.Bool("default-exchange-link-name", false, "exchange link names of OVS bridge and the provider nic in the default provider-network")
argDefaultVlanName = pflag.String("default-vlan-name", "ovn-vlan", "The default vlan name")
argDefaultVlanID = pflag.Int("default-vlan-id", 1, "The default vlan id")
argLsDnatModDlDst = pflag.Bool("ls-dnat-mod-dl-dst", true, "Set ethernet destination address for DNAT on logical switch")
argLsCtSkipDstLportIPs = pflag.Bool("ls-ct-skip-dst-lport-ips", true, "Skip conntrack for direct traffic between lports")
argPodNicType = pflag.String("pod-nic-type", "veth-pair", "The default pod network nic implementation type")
argEnableLb = pflag.Bool("enable-lb", true, "Enable load balancer")
argEnableNP = pflag.Bool("enable-np", true, "Enable network policy support")
argEnableEipSnat = pflag.Bool("enable-eip-snat", true, "Enable EIP and SNAT")
argEnableExternalVpc = pflag.Bool("enable-external-vpc", true, "Enable external vpc support")
argEnableEcmp = pflag.Bool("enable-ecmp", false, "Enable ecmp route for centralized subnet")
argKeepVMIP = pflag.Bool("keep-vm-ip", true, "Whether to keep ip for kubevirt pod when pod is rebuild")
argEnableLbSvc = pflag.Bool("enable-lb-svc", false, "Whether to support loadbalancer service")
argEnableMetrics = pflag.Bool("enable-metrics", true, "Whether to support metrics query")
argEnableANP = pflag.Bool("enable-anp", false, "Enable support for admin network policy and baseline admin network policy")
argEnableOVNIPSec = pflag.Bool("enable-ovn-ipsec", false, "Whether to enable ovn ipsec")
argNetworkType = pflag.String("network-type", util.NetworkTypeGeneve, "The ovn network type")
argDefaultProviderName = pflag.String("default-provider-name", "provider", "The vlan or vxlan type default provider interface name")
argDefaultInterfaceName = pflag.String("default-interface-name", "", "The default host interface name in the vlan/vxlan type")
argDefaultExchangeLinkName = pflag.Bool("default-exchange-link-name", false, "exchange link names of OVS bridge and the provider nic in the default provider-network")
argDefaultVlanName = pflag.String("default-vlan-name", "ovn-vlan", "The default vlan name")
argDefaultVlanID = pflag.Int("default-vlan-id", 1, "The default vlan id")
argLsDnatModDlDst = pflag.Bool("ls-dnat-mod-dl-dst", true, "Set ethernet destination address for DNAT on logical switch")
argLsCtSkipDstLportIPs = pflag.Bool("ls-ct-skip-dst-lport-ips", true, "Skip conntrack for direct traffic between lports")
argPodNicType = pflag.String("pod-nic-type", "veth-pair", "The default pod network nic implementation type")
argEnableLb = pflag.Bool("enable-lb", true, "Enable load balancer")
argEnableNP = pflag.Bool("enable-np", true, "Enable network policy support")
argEnableEipSnat = pflag.Bool("enable-eip-snat", true, "Enable EIP and SNAT")
argEnableExternalVpc = pflag.Bool("enable-external-vpc", true, "Enable external vpc support")
argEnableEcmp = pflag.Bool("enable-ecmp", false, "Enable ecmp route for centralized subnet")
argKeepVMIP = pflag.Bool("keep-vm-ip", true, "Whether to keep ip for kubevirt pod when pod is rebuild")
argEnableLbSvc = pflag.Bool("enable-lb-svc", false, "Whether to support loadbalancer service")
argEnableMetrics = pflag.Bool("enable-metrics", true, "Whether to support metrics query")
argEnableANP = pflag.Bool("enable-anp", false, "Enable support for admin network policy and baseline admin network policy")
argEnableOVNIPSec = pflag.Bool("enable-ovn-ipsec", false, "Whether to enable ovn ipsec")
argEnableLiveMigrationOptimize = pflag.Bool("enable-live-migration-optimize", true, "Whether to enable kubevirt live migration optimize")

argExternalGatewayConfigNS = pflag.String("external-gateway-config-ns", "kube-system", "The namespace of configmap external-gateway-config, default: kube-system")
argExternalGatewaySwitch = pflag.String("external-gateway-switch", "external", "The name of the external gateway switch which is a ovs bridge to provide external network, default: external")
Expand Down Expand Up @@ -265,6 +268,7 @@ func ParseFlags() (*Configuration, error) {
EnableLbSvc: *argEnableLbSvc,
EnableMetrics: *argEnableMetrics,
EnableOVNIPSec: *argEnableOVNIPSec,
EnableLiveMigrationOptimize: *argEnableLiveMigrationOptimize,
BfdMinTx: *argBfdMinTx,
BfdMinRx: *argBfdMinRx,
BfdDetectMult: *argBfdDetectMult,
Expand Down Expand Up @@ -378,6 +382,13 @@ func (config *Configuration) initKubeClient() error {
}
config.KubeOvnClient = kubeOvnClient

ExtClient, err := extClientSet.NewForConfig(cfg)
if err != nil {
klog.Errorf("init extentsion client failed %v", err)
return err
}
config.ExtClient = ExtClient

cfg.ContentType = "application/vnd.kubernetes.protobuf"
cfg.AcceptContentTypes = "application/vnd.kubernetes.protobuf,application/json"
kubeClient, err := kubernetes.NewForConfig(cfg)
Expand Down
44 changes: 42 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/keymutex"
kubevirtController "kubevirt.io/kubevirt/pkg/controller"
v1alpha1 "sigs.k8s.io/network-policy-api/apis/v1alpha1"
anpinformer "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions"
anplister "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha1"
Expand Down Expand Up @@ -258,6 +259,11 @@ type Controller struct {
csrSynced cache.InformerSynced
addOrUpdateCsrQueue workqueue.TypedRateLimitingInterface[string]

vmiMigrationSynced cache.InformerSynced
addOrUpdateVMIMigrationQueue workqueue.TypedRateLimitingInterface[string]
kubevirtInformerFactory kubevirtController.KubeInformerFactory
hasKubevirtVMIMigration bool

recorder record.EventRecorder
informerFactory kubeinformers.SharedInformerFactory
cmInformerFactory kubeinformers.SharedInformerFactory
Expand Down Expand Up @@ -301,6 +307,8 @@ func Run(ctx context.Context, config *Configuration) {
listOption.AllowWatchBookmarks = true
}))

kubevirtInformerFactory := kubevirtController.NewKubeInformerFactory(config.KubevirtClient.RestClient(), config.KubevirtClient, nil, util.KubevirtNamespace)

vpcInformer := kubeovnInformerFactory.Kubeovn().V1().Vpcs()
vpcNatGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcNatGateways()
subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
Expand Down Expand Up @@ -331,6 +339,7 @@ func Run(ctx context.Context, config *Configuration) {
anpInformer := anpInformerFactory.Policy().V1alpha1().AdminNetworkPolicies()
banpInformer := anpInformerFactory.Policy().V1alpha1().BaselineAdminNetworkPolicies()
csrInformer := informerFactory.Certificates().V1().CertificateSigningRequests()
vmiMigrationInformer := kubevirtInformerFactory.VirtualMachineInstanceMigration()

numKeyLocks := runtime.NumCPU() * 2
if numKeyLocks < config.WorkerNum*2 {
Expand Down Expand Up @@ -505,7 +514,11 @@ func Run(ctx context.Context, config *Configuration) {

csrLister: csrInformer.Lister(),
csrSynced: csrInformer.Informer().HasSynced,
addOrUpdateCsrQueue: newTypedRateLimitingQueue[string]("AddOrUpdateCSR", nil),
addOrUpdateCsrQueue: newTypedRateLimitingQueue[string]("AddOrUpdateCSR", custCrdRateLimiter),

vmiMigrationSynced: vmiMigrationInformer.HasSynced,
addOrUpdateVMIMigrationQueue: newTypedRateLimitingQueue[string]("AddOrUpdateVMIMigration", nil),
kubevirtInformerFactory: kubevirtInformerFactory,

recorder: recorder,
informerFactory: informerFactory,
Expand Down Expand Up @@ -591,6 +604,11 @@ func Run(ctx context.Context, config *Configuration) {
controller.kubeovnInformerFactory.Start(ctx.Done())
controller.anpInformerFactory.Start(ctx.Done())

controller.hasKubevirtVMIMigration = controller.isVMIMigrationCRDInstalled()
if controller.config.EnableLiveMigrationOptimize && controller.hasKubevirtVMIMigration {
kubevirtInformerFactory.Start(ctx.Done())
}

klog.Info("Waiting for informer caches to sync")
cacheSyncs := []cache.InformerSynced{
controller.vpcNatGatewaySynced, controller.vpcSynced, controller.subnetSynced,
Expand All @@ -610,6 +628,11 @@ func Run(ctx context.Context, config *Configuration) {
if controller.config.EnableANP {
cacheSyncs = append(cacheSyncs, controller.anpsSynced, controller.banpsSynced)
}

if controller.config.EnableLiveMigrationOptimize && controller.hasKubevirtVMIMigration {
cacheSyncs = append(cacheSyncs, controller.vmiMigrationSynced)
}

if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
util.LogFatalAndExit(nil, "failed to wait for caches to sync")
}
Expand Down Expand Up @@ -847,6 +870,16 @@ func Run(ctx context.Context, config *Configuration) {
util.LogFatalAndExit(err, "failed to add csr event handler")
}
}

if config.EnableLiveMigrationOptimize && controller.hasKubevirtVMIMigration {
if _, err = vmiMigrationInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueAddVMIMigration,
UpdateFunc: controller.enqueueUpdateVMIMigration,
}); err != nil {
util.LogFatalAndExit(err, "failed to add VMI Migration event handler")
}
}

controller.Run(ctx)
}

Expand Down Expand Up @@ -1039,6 +1072,10 @@ func (c *Controller) shutdown() {
c.syncSgPortsQueue.ShutDown()

c.addOrUpdateCsrQueue.ShutDown()

if c.config.EnableLiveMigrationOptimize {
c.addOrUpdateVMIMigrationQueue.ShutDown()
}
}

func (c *Controller) startWorkers(ctx context.Context) {
Expand All @@ -1055,7 +1092,6 @@ func (c *Controller) startWorkers(ctx context.Context) {
go wait.Until(runWorker("update snat for vpc nat gateway", c.updateVpcSnatQueue, c.handleUpdateVpcSnat), time.Second, ctx.Done())
go wait.Until(runWorker("update subnet route for vpc nat gateway", c.updateVpcSubnetQueue, c.handleUpdateNatGwSubnetRoute), time.Second, ctx.Done())
go wait.Until(runWorker("add/update csr", c.addOrUpdateCsrQueue, c.handleAddOrUpdateCsr), time.Second, ctx.Done())

// add default and join subnet and wait them ready
go wait.Until(runWorker("add/update subnet", c.addOrUpdateSubnetQueue, c.handleAddOrUpdateSubnet), time.Second, ctx.Done())
go wait.Until(runWorker("add/update ippool", c.addOrUpdateIPPoolQueue, c.handleAddOrUpdateIPPool), time.Second, ctx.Done())
Expand Down Expand Up @@ -1244,6 +1280,10 @@ func (c *Controller) startWorkers(ctx context.Context) {
go wait.Until(runWorker("update base admin network policy", c.updateBanpQueue, c.handleUpdateBanp), time.Second, ctx.Done())
go wait.Until(runWorker("delete base admin network policy", c.deleteBanpQueue, c.handleDeleteBanp), time.Second, ctx.Done())
}

if c.config.EnableLiveMigrationOptimize && c.hasKubevirtVMIMigration {
go wait.Until(runWorker("add/update vmiMigration ", c.addOrUpdateVMIMigrationQueue, c.handleAddOrUpdateVMIMigration), 50*time.Millisecond, ctx.Done())
}
}

func (c *Controller) allSubnetReady(subnets ...string) (bool, error) {
Expand Down
Loading

0 comments on commit 0c2ba35

Please sign in to comment.