Skip to content

Commit

Permalink
Merge pull request vmware-tanzu#637 from zhengxiexie/zhengxie/refacto…
Browse files Browse the repository at this point in the history
…r_initialize

Refactor startup process in HA mode
  • Loading branch information
zhengxiexie authored Aug 6, 2024
2 parents 9046db2 + ab5a9bd commit 7e0f497
Show file tree
Hide file tree
Showing 18 changed files with 57 additions and 110 deletions.
77 changes: 50 additions & 27 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

crdv1alpha1 "github.com/vmware-tanzu/nsx-operator/pkg/apis/crd.nsx.vmware.com/v1alpha1"
Expand All @@ -30,25 +31,27 @@ import (
networkinfocontroller "github.com/vmware-tanzu/nsx-operator/pkg/controllers/networkinfo"
networkpolicycontroller "github.com/vmware-tanzu/nsx-operator/pkg/controllers/networkpolicy"
"github.com/vmware-tanzu/nsx-operator/pkg/controllers/node"
nsxserviceaccountcontroller "github.com/vmware-tanzu/nsx-operator/pkg/controllers/nsxserviceaccount"
"github.com/vmware-tanzu/nsx-operator/pkg/controllers/pod"
securitypolicycontroller "github.com/vmware-tanzu/nsx-operator/pkg/controllers/securitypolicy"
"github.com/vmware-tanzu/nsx-operator/pkg/controllers/service"
staticroutecontroller "github.com/vmware-tanzu/nsx-operator/pkg/controllers/staticroute"
"github.com/vmware-tanzu/nsx-operator/pkg/controllers/subnet"
"github.com/vmware-tanzu/nsx-operator/pkg/controllers/subnetport"
"github.com/vmware-tanzu/nsx-operator/pkg/controllers/subnetset"
nodeservice "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/node"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/staticroute"
subnetservice "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/subnet"
subnetportservice "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/subnetport"

commonctl "github.com/vmware-tanzu/nsx-operator/pkg/controllers/common"
nsxserviceaccountcontroller "github.com/vmware-tanzu/nsx-operator/pkg/controllers/nsxserviceaccount"
"github.com/vmware-tanzu/nsx-operator/pkg/logger"
"github.com/vmware-tanzu/nsx-operator/pkg/metrics"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/common"
ipaddressallocationservice "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/ipaddressallocation"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/ippool"
nodeservice "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/node"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/nsxserviceaccount"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/staticroute"
subnetservice "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/subnet"
subnetportservice "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/subnetport"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/vpc"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/util"
)
Expand Down Expand Up @@ -109,6 +112,7 @@ func StartNSXServiceAccountController(mgr ctrl.Manager, commonService common.Ser
log.Error(err, "failed to create controller", "controller", "NSXServiceAccount")
os.Exit(1)
}
go commonctl.GenericGarbageCollector(make(chan bool), common.GCInterval, nsxServiceAccountReconcile.CollectGarbage)
}

func StartIPPoolController(mgr ctrl.Manager, ipPoolService *ippool.IPPoolService, vpcService common.VPCServiceProvider) {
Expand All @@ -124,6 +128,7 @@ func StartIPPoolController(mgr ctrl.Manager, ipPoolService *ippool.IPPoolService
log.Error(err, "failed to create controller", "controller", "IPPool")
os.Exit(1)
}
go commonctl.GenericGarbageCollector(make(chan bool), common.GCInterval, ippoolReconcile.CollectGarbage)
}

func StartNetworkInfoController(mgr ctrl.Manager, vpcService *vpc.VPCService) {
Expand All @@ -137,6 +142,7 @@ func StartNetworkInfoController(mgr ctrl.Manager, vpcService *vpc.VPCService) {
log.Error(err, "failed to create networkinfo controller", "controller", "NetworkInfo")
os.Exit(1)
}
go commonctl.GenericGarbageCollector(make(chan bool), common.GCInterval, networkInfoReconciler.CollectGarbage)
}

func StartNamespaceController(mgr ctrl.Manager, cf *config.NSXOperatorConfig, vpcService common.VPCServiceProvider) {
Expand Down Expand Up @@ -168,28 +174,7 @@ func StartIPAddressAllocationController(mgr ctrl.Manager, ipAddressAllocationSer
}
}

func main() {
log.Info("starting NSX Operator")
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
HealthProbeBindAddress: config.ProbeAddr,
Metrics: metricsserver.Options{BindAddress: config.MetricsAddr},
LeaderElection: cf.HAEnabled(),
LeaderElectionNamespace: nsxOperatorNamespace,
LeaderElectionID: "nsx-operator",
})
if err != nil {
log.Error(err, "failed to init manager")
os.Exit(1)
}

// nsxClient is used to interact with NSX API.
nsxClient := nsx.GetClient(cf)
if nsxClient == nil {
log.Error(err, "failed to get nsx client")
os.Exit(1)
}

func startServiceController(mgr manager.Manager, nsxClient *nsx.Client) {
// Embed the common commonService to sub-services.
commonService := common.Service{
Client: mgr.GetClient(),
Expand All @@ -209,6 +194,7 @@ func main() {
}
log.Info("VPC mode is enabled")

var err error
vpcService, err = vpc.InitializeVPC(commonService)
if err != nil {
log.Error(err, "failed to initialize vpc commonService", "controller", "VPC")
Expand Down Expand Up @@ -275,6 +261,43 @@ func main() {
StartNSXServiceAccountController(mgr, commonService)
}

}

func electMaster(mgr manager.Manager, nsxClient *nsx.Client) {
log.Info("I'm trying to be elected as master")
<-mgr.Elected()
log.Info("I'm the master now")
startServiceController(mgr, nsxClient)
}

func main() {
log.Info("starting NSX Operator")
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
HealthProbeBindAddress: config.ProbeAddr,
Metrics: metricsserver.Options{BindAddress: config.MetricsAddr},
LeaderElection: cf.HAEnabled(),
LeaderElectionNamespace: nsxOperatorNamespace,
LeaderElectionID: "nsx-operator",
})
if err != nil {
log.Error(err, "failed to init manager")
os.Exit(1)
}

// nsxClient is used to interact with NSX API.
nsxClient := nsx.GetClient(cf)
if nsxClient == nil {
log.Error(nil, "failed to get nsx client")
os.Exit(1)
}

if cf.HAEnabled() {
go electMaster(mgr, nsxClient)
} else {
startServiceController(mgr, nsxClient)
}

if metrics.AreMetricsExposed(cf) {
go updateHealthMetricsPeriodically(nsxClient)
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/controllers/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,3 @@ func GenericGarbageCollector(cancel chan bool, timeout time.Duration, f func(ctx
}
}
}

func GcOnce(gc GarbageCollector, once *sync.Once) {
once.Do(func() { go GenericGarbageCollector(make(chan bool), servicecommon.GCInterval, gc.CollectGarbage) })
}
5 changes: 0 additions & 5 deletions pkg/controllers/ippool/ippool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"fmt"
"regexp"
"sync"

"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -39,7 +38,6 @@ var (
resultNormal = common.ResultNormal
resultRequeue = common.ResultRequeue
MetricResType = common.MetricResTypeIPPool
once sync.Once
)

// IPPoolReconciler reconciles a IPPool object
Expand Down Expand Up @@ -115,9 +113,6 @@ func (r *IPPoolReconciler) setReadyStatusTrue(ctx *context.Context, ippool *v1al
}

func (r *IPPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Use once.Do to ensure gc is called only once
common.GcOnce(r, &once)

obj := &v1alpha2.IPPool{}
log.Info("reconciling ippool CR", "ippool", req.NamespacedName)
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerSyncTotal, MetricResType)
Expand Down
6 changes: 0 additions & 6 deletions pkg/controllers/ippool/ippool_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"errors"
"reflect"
"sync"
"testing"

"github.com/agiledragon/gomonkey/v2"
Expand Down Expand Up @@ -114,11 +113,6 @@ func TestIPPoolReconciler_Reconcile(t *testing.T) {
ctx := context.Background()
req := controllerruntime.Request{NamespacedName: types.NamespacedName{Namespace: "dummy", Name: "dummy"}}

// common.GcOnce do nothing
var once sync.Once
pat := gomonkey.ApplyMethod(reflect.TypeOf(&once), "Do", func(_ *sync.Once, _ func()) {})
defer pat.Reset()

// not found
errNotFound := errors.New("not found")
k8sClient.EXPECT().Get(ctx, gomock.Any(), gomock.Any()).Return(errNotFound)
Expand Down
5 changes: 0 additions & 5 deletions pkg/controllers/networkinfo/networkinfo_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package networkinfo

import (
"context"
"sync"

corev1 "k8s.io/api/core/v1"
apimachineryruntime "k8s.io/apimachinery/pkg/runtime"
Expand All @@ -29,7 +28,6 @@ import (
var (
log = &logger.Log
MetricResType = common.MetricResTypeNetworkInfo
once sync.Once
)

// NetworkInfoReconciler NetworkInfoReconcile reconciles a NetworkInfo object
Expand All @@ -42,9 +40,6 @@ type NetworkInfoReconciler struct {
}

func (r *NetworkInfoReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Use once.Do to ensure gc is called only once
common.GcOnce(r, &once)

obj := &v1alpha1.NetworkInfo{}
log.Info("reconciling NetworkInfo CR", "NetworkInfo", req.NamespacedName)
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerSyncTotal, common.MetricResTypeNetworkInfo)
Expand Down
5 changes: 1 addition & 4 deletions pkg/controllers/networkpolicy/networkpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"errors"
"fmt"
"os"
"sync"

v1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
Expand Down Expand Up @@ -36,7 +35,6 @@ var (
ResultRequeue = common.ResultRequeue
ResultRequeueAfter5mins = common.ResultRequeueAfter5mins
MetricResType = common.MetricResTypeNetworkPolicy
once sync.Once
)

// NetworkPolicyReconciler reconciles a NetworkPolicy object
Expand Down Expand Up @@ -68,8 +66,6 @@ func deleteSuccess(r *NetworkPolicyReconciler, _ *context.Context, o *networking
}

func (r *NetworkPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Use once.Do to ensure gc is called only once
common.GcOnce(r, &once)
networkPolicy := &networkingv1.NetworkPolicy{}
log.Info("reconciling networkpolicy", "networkpolicy", req.NamespacedName)
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerSyncTotal, MetricResType)
Expand Down Expand Up @@ -191,4 +187,5 @@ func StartNetworkPolicyController(mgr ctrl.Manager, commonService servicecommon.
log.Error(err, "failed to create controller", "controller", "NetworkPolicy")
os.Exit(1)
}
go common.GenericGarbageCollector(make(chan bool), servicecommon.GCInterval, networkPolicyReconcile.CollectGarbage)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"errors"
"fmt"
"sync"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -40,7 +39,6 @@ var (
MetricResType = common.MetricResTypeNSXServiceAccount
count = uint16(0)
ca []byte
once sync.Once
)

// NSXServiceAccountReconciler reconciles a NSXServiceAccount object.
Expand Down Expand Up @@ -68,9 +66,6 @@ type NSXServiceAccountReconciler struct {
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.13.0/pkg/reconcile
func (r *NSXServiceAccountReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Use once.Do to ensure gc is called only once
common.GcOnce(r, &once)

obj := &nsxvmwarecomv1alpha1.NSXServiceAccount{}
log.Info("reconciling CR", "nsxserviceaccount", req.NamespacedName)
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerSyncTotal, MetricResType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"fmt"
"reflect"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -513,9 +512,6 @@ func TestNSXServiceAccountReconciler_Reconcile(t *testing.T) {
patches := tt.prepareFunc(t, r, ctx)
defer patches.Reset()
}
var once sync.Once
patches2 := gomonkey.ApplyMethod(reflect.TypeOf(&once), "Do", func(_ *sync.Once, _ func()) {})
defer patches2.Reset()

got, err := r.Reconcile(ctx, tt.args.req)
if (err != nil) != tt.wantErr {
Expand Down
6 changes: 1 addition & 5 deletions pkg/controllers/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"os"
"strings"
"sync"

"github.com/vmware/vsphere-automation-sdk-go/services/nsxt/model"
v1 "k8s.io/api/core/v1"
Expand All @@ -34,7 +33,6 @@ import (
var (
log = &logger.Log
MetricResTypePod = common.MetricResTypePod
once sync.Once
)

// PodReconciler reconciles a Pod object
Expand All @@ -50,9 +48,6 @@ type PodReconciler struct {
}

func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Use once.Do to ensure gc is called only once
common.GcOnce(r, &once)

pod := &v1.Pod{}
log.Info("reconciling pod", "pod", req.NamespacedName)

Expand Down Expand Up @@ -187,6 +182,7 @@ func StartPodController(mgr ctrl.Manager, subnetPortService *subnetport.SubnetPo
log.Error(err, "failed to create controller", "controller", "Pod")
os.Exit(1)
}
go common.GenericGarbageCollector(make(chan bool), servicecommon.GCInterval, podPortReconciler.CollectGarbage)
}

// Start setup manager and launch GC
Expand Down
6 changes: 1 addition & 5 deletions pkg/controllers/securitypolicy/securitypolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"fmt"
"os"
"reflect"
"sync"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -46,7 +45,6 @@ var (
ResultRequeue = common.ResultRequeue
ResultRequeueAfter5mins = common.ResultRequeueAfter5mins
MetricResType = common.MetricResTypeSecurityPolicy
once sync.Once
)

// SecurityPolicyReconciler SecurityPolicyReconcile reconciles a SecurityPolicy object
Expand Down Expand Up @@ -89,9 +87,6 @@ func deleteSuccess(r *SecurityPolicyReconciler, _ *context.Context, o *v1alpha1.
}

func (r *SecurityPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Use once.Do to ensure gc is called only once
common.GcOnce(r, &once)

var obj client.Object
if r.Service.NSXConfig.EnableVPCNetwork {
obj = &crdv1alpha1.SecurityPolicy{}
Expand Down Expand Up @@ -445,4 +440,5 @@ func StartSecurityPolicyController(mgr ctrl.Manager, commonService servicecommon
log.Error(err, "failed to create controller", "controller", "SecurityPolicy")
os.Exit(1)
}
go common.GenericGarbageCollector(make(chan bool), servicecommon.GCInterval, securityPolicyReconcile.CollectGarbage)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"errors"
"fmt"
"reflect"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -191,11 +190,6 @@ func TestSecurityPolicyReconciler_Reconcile(t *testing.T) {
ctx := context.Background()
req := controllerruntime.Request{NamespacedName: types.NamespacedName{Namespace: "dummy", Name: "dummy"}}

// common.GcOnce do nothing
var once sync.Once
pat := gomonkey.ApplyMethod(reflect.TypeOf(&once), "Do", func(_ *sync.Once, _ func()) {})
defer pat.Reset()

// not found
errNotFound := errors.New("not found")
k8sClient.EXPECT().Get(ctx, gomock.Any(), gomock.Any()).Return(errNotFound)
Expand Down
Loading

0 comments on commit 7e0f497

Please sign in to comment.