From e1121aeaafc46ba0a1642244482418c16706acd5 Mon Sep 17 00:00:00 2001 From: Vivek Anand T Kallampally Date: Mon, 22 Feb 2021 12:47:02 +0530 Subject: [PATCH] Lazy Replication of SFService and SFPlan - Replicate these as part of Instance Replicator - Remove SFService Replicator Fetaure: HCPCFS-2505 --- .../setup_multiclusterdeploy.go | 9 - .../sfserviceinstancereplicator_controller.go | 117 ++++++++ ...rviceinstancereplicator_controller_test.go | 73 +++++ .../sfservices_mcd_controller.go | 271 ------------------ .../sfservices_mcd_controller_suite_test.go | 103 ------- .../sfservices_mcd_controller_test.go | 265 ----------------- 6 files changed, 190 insertions(+), 648 deletions(-) delete mode 100644 interoperator/controllers/multiclusterdeploy/sfservicesreplicator/sfservices_mcd_controller.go delete mode 100644 interoperator/controllers/multiclusterdeploy/sfservicesreplicator/sfservices_mcd_controller_suite_test.go delete mode 100644 interoperator/controllers/multiclusterdeploy/sfservicesreplicator/sfservices_mcd_controller_test.go diff --git a/interoperator/controllers/multiclusterdeploy/setup_multiclusterdeploy.go b/interoperator/controllers/multiclusterdeploy/setup_multiclusterdeploy.go index e42eedb07..e1c809486 100644 --- a/interoperator/controllers/multiclusterdeploy/setup_multiclusterdeploy.go +++ b/interoperator/controllers/multiclusterdeploy/setup_multiclusterdeploy.go @@ -23,7 +23,6 @@ import ( "github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/controllers/multiclusterdeploy/sfclusterreplicator" "github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/controllers/multiclusterdeploy/sfservicebindingreplicator" "github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/controllers/multiclusterdeploy/sfserviceinstancereplicator" - "github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/controllers/multiclusterdeploy/sfservicesreplicator" "github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/controllers/multiclusterdeploy/watchmanager" ctrl "sigs.k8s.io/controller-runtime" @@ -70,14 +69,6 @@ func SetupWithManager(mgr ctrl.Manager) error { return err } - if err = (&sfservicesreplicator.ReconcileSFServices{ - Client: mgr.GetClient(), - Log: ctrl.Log.WithName("mcd").WithName("replicator").WithName("service"), - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create service replicator", "controller", "ReconcileSFServices") - return err - } - if err = (&sfclusterreplicator.SFClusterReplicator{ Client: mgr.GetClient(), Log: ctrl.Log.WithName("mcd").WithName("replicator").WithName("cluster"), diff --git a/interoperator/controllers/multiclusterdeploy/sfserviceinstancereplicator/sfserviceinstancereplicator_controller.go b/interoperator/controllers/multiclusterdeploy/sfserviceinstancereplicator/sfserviceinstancereplicator_controller.go index 90a3853c8..d16855b00 100644 --- a/interoperator/controllers/multiclusterdeploy/sfserviceinstancereplicator/sfserviceinstancereplicator_controller.go +++ b/interoperator/controllers/multiclusterdeploy/sfserviceinstancereplicator/sfserviceinstancereplicator_controller.go @@ -24,6 +24,7 @@ import ( "github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/internal/config" "github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/pkg/cluster/registry" "github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/pkg/constants" + "github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/pkg/utils" "github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/pkg/watches" "github.com/go-logr/logr" @@ -143,6 +144,8 @@ func (r *InstanceReplicator) Reconcile(req ctrl.Request) (ctrl.Result, error) { return ctrl.Result{}, err } + lastErr := r.reconcileServicePlan(targetClient, instance, clusterID) + if !instance.GetDeletionTimestamp().IsZero() && state == "delete" { replica.SetName(instance.GetName()) replica.SetNamespace(instance.GetNamespace()) @@ -251,6 +254,11 @@ func (r *InstanceReplicator) Reconcile(req ctrl.Request) (ctrl.Result, error) { "replicaState", replicaState, "replicaLastOperation", replicaLastOperation) } + if lastErr != nil { + // re que if service/plan replication failed + return ctrl.Result{}, err + } + return ctrl.Result{}, nil } @@ -322,6 +330,101 @@ func (r *InstanceReplicator) reconcileNamespace(targetClient client.Client, name return nil } +func (r *InstanceReplicator) reconcileServicePlan(targetClient client.Client, instance *osbv1alpha1.SFServiceInstance, clusterID string) error { + ctx := context.Background() + serviceID := instance.Spec.ServiceID + planID := instance.Spec.PlanID + log := r.Log.WithValues("clusterID", clusterID, "serviceID", serviceID, "planID", planID) + + var lastErr error + + service, serviceReplica := &osbv1alpha1.SFService{}, &osbv1alpha1.SFService{} + serviceKey := types.NamespacedName{ + Name: serviceID, + Namespace: constants.InteroperatorNamespace, + } + + plan, planReplica := &osbv1alpha1.SFPlan{}, &osbv1alpha1.SFPlan{} + planKey := types.NamespacedName{ + Name: planID, + Namespace: constants.InteroperatorNamespace, + } + + err := r.Get(ctx, serviceKey, service) + if err != nil { + log.Error(err, "Failed to get SFService from leader") + lastErr = err + } + + err = targetClient.Get(ctx, serviceKey, serviceReplica) + if err != nil { + if apiErrors.IsNotFound(err) { + replicateSFServiceResourceData(service, serviceReplica) + err = targetClient.Create(ctx, serviceReplica) + if err != nil { + log.Error(err, "Error occurred while replicating SFService to cluster ") + lastErr = err + } else { + log.Info("SFService not found in target cluster. created as copy from leader") + } + } else if !apiErrors.IsNotFound(err) { + log.Error(err, "Failed to fetch SFService from target cluster") + lastErr = err + } + } else { + replicateSFServiceResourceData(service, serviceReplica) + err = targetClient.Update(ctx, serviceReplica) + if err != nil { + log.Error(err, "Error occurred while replicating SFService to cluster") + lastErr = err + } else { + log.Info("updated SFService in target cluster") + } + } + + err = r.Get(ctx, planKey, plan) + if err != nil { + log.Error(err, "Failed to get SFPlan from leader") + lastErr = err + } + + err = targetClient.Get(ctx, planKey, planReplica) + if err != nil { + if apiErrors.IsNotFound(err) { + replicateSFPlanResourceData(plan, planReplica) + err = utils.SetOwnerReference(serviceReplica, planReplica, r.scheme) + if err != nil { + lastErr = err + } + err = targetClient.Create(ctx, planReplica) + if err != nil { + log.Error(err, "Error occurred while replicating SFPlan to cluster ") + lastErr = err + } else { + log.Info("SFPlan not found in target cluster. created as copy from leader") + } + } else if !apiErrors.IsNotFound(err) { + log.Error(err, "Failed to fetch SFPlan from target cluster") + lastErr = err + } + } else { + replicateSFPlanResourceData(plan, planReplica) + err = utils.SetOwnerReference(serviceReplica, planReplica, r.scheme) + if err != nil { + return err + } + err = targetClient.Update(ctx, planReplica) + if err != nil { + log.Error(err, "Error occurred while replicating SFPlan to cluster") + lastErr = err + } else { + log.Info("updated SFPlan in target cluster") + } + } + + return lastErr +} + func (r *InstanceReplicator) setInProgress(instance *osbv1alpha1.SFServiceInstance, state string) error { instanceID := instance.GetName() clusterID, _ := instance.GetClusterID() @@ -390,6 +493,20 @@ func copyObject(source, destination *osbv1alpha1.SFServiceInstance, preserveReso } } +func replicateSFServiceResourceData(source *osbv1alpha1.SFService, dest *osbv1alpha1.SFService) { + source.Spec.DeepCopyInto(&dest.Spec) + dest.SetName(source.GetName()) + dest.SetNamespace(source.GetNamespace()) + dest.SetLabels(source.GetLabels()) +} + +func replicateSFPlanResourceData(source *osbv1alpha1.SFPlan, dest *osbv1alpha1.SFPlan) { + source.Spec.DeepCopyInto(&dest.Spec) + dest.SetName(source.GetName()) + dest.SetNamespace(source.GetNamespace()) + dest.SetLabels(source.GetLabels()) +} + // SetupWithManager registers the MCD Instance replicator with manager // and setups the watches. func (r *InstanceReplicator) SetupWithManager(mgr ctrl.Manager) error { diff --git a/interoperator/controllers/multiclusterdeploy/sfserviceinstancereplicator/sfserviceinstancereplicator_controller_test.go b/interoperator/controllers/multiclusterdeploy/sfserviceinstancereplicator/sfserviceinstancereplicator_controller_test.go index 15d682bcd..66f188483 100644 --- a/interoperator/controllers/multiclusterdeploy/sfserviceinstancereplicator/sfserviceinstancereplicator_controller_test.go +++ b/interoperator/controllers/multiclusterdeploy/sfserviceinstancereplicator/sfserviceinstancereplicator_controller_test.go @@ -44,6 +44,8 @@ var c, c2 client.Client var expectedRequest = reconcile.Request{NamespacedName: types.NamespacedName{Name: "foo", Namespace: constants.InteroperatorNamespace}} var instanceKey = types.NamespacedName{Name: "instance-id", Namespace: "sf-instance-id"} +var serviceKey = types.NamespacedName{Name: "service-id", Namespace: constants.InteroperatorNamespace} +var planKey = types.NamespacedName{Name: "plan-id", Namespace: constants.InteroperatorNamespace} const timeout = time.Second * 5 @@ -70,6 +72,60 @@ var instance = &osbv1alpha1.SFServiceInstance{ }, } +var service = &osbv1alpha1.SFService{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-id", + Namespace: constants.InteroperatorNamespace, + }, + Spec: osbv1alpha1.SFServiceSpec{ + ID: "service-id", + }, +} +var templateSpec = []osbv1alpha1.TemplateSpec{ + { + Action: "provision", + Type: "gotemplate", + Content: "provisioncontent", + }, + { + Action: "bind", + Type: "gotemplate", + Content: "bindcontent", + }, + { + Action: "status", + Type: "gotemplate", + Content: "statuscontent", + }, + { + Action: "sources", + Type: "gotemplate", + Content: "sourcescontent", + }, +} +var plan = &osbv1alpha1.SFPlan{ + ObjectMeta: metav1.ObjectMeta{ + Name: "plan-id", + Namespace: constants.InteroperatorNamespace, + Labels: map[string]string{ + "serviceId": "service-id", + }, + }, + Spec: osbv1alpha1.SFPlanSpec{ + Name: "plan-name", + ID: "plan-id", + Description: "description", + Metadata: nil, + Free: false, + Bindable: true, + PlanUpdatable: true, + Schemas: nil, + Templates: templateSpec, + ServiceID: "service-id", + RawContext: nil, + Manager: nil, + }} + func TestReconcile(t *testing.T) { instance2 := &osbv1alpha1.SFServiceInstance{} watchChannel := make(chan event.GenericEvent) @@ -121,6 +177,9 @@ func TestReconcile(t *testing.T) { mgrStopped.Wait() }() + g.Expect(c.Create(context.TODO(), service)).NotTo(gomega.HaveOccurred()) + g.Expect(c.Create(context.TODO(), plan)).NotTo(gomega.HaveOccurred()) + // Create a new namespace ns := &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{ @@ -157,6 +216,20 @@ func TestReconcile(t *testing.T) { }) g.Expect(err).NotTo(gomega.HaveOccurred()) + // Plan and servive should be replicated in follower cluster + g.Eventually(func() error { + err = c2.Get(context.TODO(), serviceKey, service) + if err != nil { + return err + } + err = c2.Get(context.TODO(), planKey, plan) + if err != nil { + return err + } + + return nil + }, timeout).Should(gomega.Succeed()) + // State should be updated in master cluster g.Eventually(func() error { err := c.Get(context.TODO(), instanceKey, instance) diff --git a/interoperator/controllers/multiclusterdeploy/sfservicesreplicator/sfservices_mcd_controller.go b/interoperator/controllers/multiclusterdeploy/sfservicesreplicator/sfservices_mcd_controller.go deleted file mode 100644 index dd19a24cb..000000000 --- a/interoperator/controllers/multiclusterdeploy/sfservicesreplicator/sfservices_mcd_controller.go +++ /dev/null @@ -1,271 +0,0 @@ -/* -Copyright 2018 The Service Fabrik Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package sfservicesreplicator - -import ( - "context" - - osbv1alpha1 "github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/api/osb/v1alpha1" - resourcev1alpha1 "github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/api/resource/v1alpha1" - "github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/internal/config" - "github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/pkg/cluster/registry" - "github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/pkg/utils" - "github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/pkg/watches" - - "github.com/go-logr/logr" - apiErrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - ctrl "sigs.k8s.io/controller-runtime" - kubernetes "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/source" -) - -// ReconcileSFServices reconciles SFServices state across clusters -type ReconcileSFServices struct { - kubernetes.Client - Log logr.Logger - scheme *runtime.Scheme - clusterRegistry registry.ClusterRegistry - cfgManager config.Config -} - -// Reconcile is called for a SFCluster. It replicates all SFServices and all SFPlans to -// the SFCluster -func (r *ReconcileSFServices) Reconcile(req ctrl.Request) (ctrl.Result, error) { - ctx := context.Background() - log := r.Log.WithValues("sfcluster", req.NamespacedName) - - // Fetch the SFCluster - clusterInstance := &resourcev1alpha1.SFCluster{} - err := r.Get(ctx, req.NamespacedName, clusterInstance) - if err != nil { - if apiErrors.IsNotFound(err) { - // Object not found, return. - return ctrl.Result{}, nil - } - log.Error(err, "Failed to get SFCluster") - // Error reading the object - requeue the request. - return ctrl.Result{}, err - } - - clusterID := clusterInstance.GetName() - // Fetch current primary cluster id from configmap - interoperatorCfg := r.cfgManager.GetConfig() - currPrimaryClusterID := interoperatorCfg.PrimaryClusterID - - if clusterID == currPrimaryClusterID { - // Target cluster is mastercluster itself - // Replication not needed - return ctrl.Result{}, nil - } - - log.Info("Reconcile started for cluster", "clusterID", clusterID) - targetClient, err := r.clusterRegistry.GetClient(clusterID) - if err != nil { - log.Error(err, "Following error occurred while getting client for cluster ", "clusterID", clusterID) - return ctrl.Result{}, err - } - - log.Info("Trying to list all the services", "namespace", req.NamespacedName.Namespace) - options := &kubernetes.ListOptions{ - Namespace: req.NamespacedName.Namespace, - } - services := &osbv1alpha1.SFServiceList{} - err = r.List(ctx, services, options) - if err != nil { - log.Error(err, "error while fetching services while processing cluster id ", "clusterID", clusterID) - return ctrl.Result{}, err - } - log.Info("services fetched ", "count", len(services.Items), "clusterID", clusterID) - for _, obj := range services.Items { - log.Info("Service is fetched from master cluster", "serviceID", obj.Spec.ID) - service := &osbv1alpha1.SFService{} - serviceKey := types.NamespacedName{ - Name: obj.GetName(), - Namespace: obj.GetNamespace(), - } - log.Info("Checking if service already exists on target cluster", "serviceID", obj.Spec.ID, "clusterID", clusterID) - err = targetClient.Get(ctx, serviceKey, service) - if err != nil { - if apiErrors.IsNotFound(err) { - replicateSFServiceResourceData(&obj, service) - err = targetClient.Create(ctx, service) - if err != nil { - log.Error(err, "Creating new service on sister cluster failed due to following error: ") - return ctrl.Result{}, err - } - log.Info("Created service on cluster", "serviceName", service.Spec.Name, "clusterID", clusterID) - err := r.handleServicePlans(service, clusterID, &targetClient) - if err != nil { - log.Error(err, "Error while replicating plans for service ", "serviceName", service.Spec.Name) - return ctrl.Result{}, err - } - } else { - log.Error(err, "Getting the service from sister cluster ", "clusterID", clusterID) - return ctrl.Result{}, err - } - } else { - replicateSFServiceResourceData(&obj, service) - err = targetClient.Update(ctx, service) - if err != nil { - log.Error(err, "Updating service on sister cluster failed due to following error: ") - return ctrl.Result{}, err - } - log.Info("Updated service on cluster", "serviceName", service.Spec.Name, "clusterID", clusterID) - err = r.handleServicePlans(service, clusterID, &targetClient) - if err != nil { - log.Error(err, "Error while replicating plans for service ", "serviceName", service.Spec.Name) - return ctrl.Result{}, err - } - } - } - return ctrl.Result{}, nil -} - -func (r *ReconcileSFServices) handleServicePlans(service *osbv1alpha1.SFService, clusterID string, targetClient *kubernetes.Client) error { - ctx := context.Background() - log := r.Log.WithValues("clusterID", clusterID) - - log.Info("Trying to list all the plans for service in the master cluster", "serviceName", service.Spec.Name) - plans := &osbv1alpha1.SFPlanList{} - searchLabels := make(kubernetes.MatchingLabels) - searchLabels["serviceId"] = service.Spec.ID - options := &kubernetes.ListOptions{ - Namespace: service.GetNamespace(), - } - searchLabels.ApplyToList(options) - err := r.List(ctx, plans, options) - if err != nil { - log.Error(err, "error while fetching plans while processing cluster id ", "clusterID", clusterID) - return err - } - log.Info("plans fetched for cluster", "count", len(plans.Items), "clusterID", clusterID) - for _, obj := range plans.Items { - log.Info("Plan is fetched from master cluster", "planID", obj.Spec.ID) - plan := &osbv1alpha1.SFPlan{} - planKey := types.NamespacedName{ - Name: obj.GetName(), - Namespace: obj.GetNamespace(), - } - log.Info("Checking if plan already exists on target cluster", "clusterID", clusterID, "planID", obj.Spec.ID) - err = (*targetClient).Get(ctx, planKey, plan) - if err != nil { - if apiErrors.IsNotFound(err) { - replicateSFPlanResourceData(&obj, plan) - err = utils.SetOwnerReference(service, plan, r.scheme) - if err != nil { - return err - } - err = (*targetClient).Create(ctx, plan) - if err != nil { - log.Error(err, "Creating new plan on sister cluster failed") - return err - } - log.Info("Created plan on cluster", "clusterID", clusterID, "planName", plan.Spec.Name) - } - } else { - replicateSFPlanResourceData(&obj, plan) - err = utils.SetOwnerReference(service, plan, r.scheme) - if err != nil { - return err - } - err = (*targetClient).Update(ctx, plan) - if err != nil { - log.Error(err, "Updating plan on sister cluster failed") - return err - } - log.Info("Updated plan on cluster ", "clusterID", clusterID, "planName", plan.Spec.Name) - } - } - return nil -} - -func enqueueRequestForAllClusters(clusterRegistry registry.ClusterRegistry) []ctrl.Request { - clusterList, err := clusterRegistry.ListClusters(nil) - if err != nil { - return nil - } - reconcileRequests := make([]ctrl.Request, len(clusterList.Items)) - for i, cluster := range clusterList.Items { - reconcileRequests[i] = ctrl.Request{ - NamespacedName: types.NamespacedName{ - Name: cluster.GetName(), - Namespace: cluster.GetNamespace(), - }, - } - } - return reconcileRequests -} - -func replicateSFServiceResourceData(source *osbv1alpha1.SFService, dest *osbv1alpha1.SFService) { - source.Spec.DeepCopyInto(&dest.Spec) - dest.SetName(source.GetName()) - dest.SetNamespace(source.GetNamespace()) - dest.SetLabels(source.GetLabels()) -} - -func replicateSFPlanResourceData(source *osbv1alpha1.SFPlan, dest *osbv1alpha1.SFPlan) { - source.Spec.DeepCopyInto(&dest.Spec) - dest.SetName(source.GetName()) - dest.SetNamespace(source.GetNamespace()) - dest.SetLabels(source.GetLabels()) -} - -// SetupWithManager registers the MCD Services Controller with manager -// and setups the watches. -func (r *ReconcileSFServices) SetupWithManager(mgr ctrl.Manager) error { - r.scheme = mgr.GetScheme() - - if r.Log == nil { - r.Log = ctrl.Log.WithName("mcd").WithName("replicator").WithName("service") - } - if r.clusterRegistry == nil { - clusterRegistry, err := registry.New(mgr.GetConfig(), mgr.GetScheme(), mgr.GetRESTMapper()) - if err != nil { - return err - } - r.clusterRegistry = clusterRegistry - } - - cfgManager, err := config.New(mgr.GetConfig(), mgr.GetScheme(), mgr.GetRESTMapper()) - if err != nil { - return err - } - r.cfgManager = cfgManager - - // Define a mapping from the object in the event(sfservice/sfplan) to - // list of sfclusters to reconcile - mapFn := handler.ToRequestsFunc( - func(a handler.MapObject) []ctrl.Request { - return enqueueRequestForAllClusters(r.clusterRegistry) - }) - - builder := ctrl.NewControllerManagedBy(mgr). - Named("mcd_replicator_service"). - For(&resourcev1alpha1.SFCluster{}). - Watches(&source.Kind{Type: &osbv1alpha1.SFService{}}, &handler.EnqueueRequestsFromMapFunc{ - ToRequests: mapFn, - }). - Watches(&source.Kind{Type: &osbv1alpha1.SFPlan{}}, &handler.EnqueueRequestsFromMapFunc{ - ToRequests: mapFn, - }). - WithEventFilter(watches.NamespaceFilter()) - - return builder.Complete(r) -} diff --git a/interoperator/controllers/multiclusterdeploy/sfservicesreplicator/sfservices_mcd_controller_suite_test.go b/interoperator/controllers/multiclusterdeploy/sfservicesreplicator/sfservices_mcd_controller_suite_test.go deleted file mode 100644 index e520182f5..000000000 --- a/interoperator/controllers/multiclusterdeploy/sfservicesreplicator/sfservices_mcd_controller_suite_test.go +++ /dev/null @@ -1,103 +0,0 @@ -/* -Copyright 2018 The Service Fabrik Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package sfservicesreplicator - -import ( - stdlog "log" - "os" - "path/filepath" - "sync" - "testing" - - osbv1alpha1 "github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/api/osb/v1alpha1" - resourcev1alpha1 "github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/api/resource/v1alpha1" - "github.com/go-logr/logr" - "github.com/onsi/ginkgo" - "github.com/onsi/gomega" - "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/rest" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/envtest" - logf "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/controller-runtime/pkg/log/zap" - "sigs.k8s.io/controller-runtime/pkg/manager" -) - -var cfg, cfg2 *rest.Config -var k8sClient, k8sClient2 client.Client -var testEnv, testEnv2 *envtest.Environment -var testLog logr.Logger - -func TestMain(m *testing.M) { - var err error - logf.SetLogger(zap.New(zap.UseDevMode(true), zap.WriteTo(ginkgo.GinkgoWriter))) - testLog = ctrl.Log.WithName("test").WithName("mcd_services") - - testEnv = &envtest.Environment{ - CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crd", "bases")}, - } - - testEnv2 = &envtest.Environment{ - CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crd", "bases")}, - } - - err = osbv1alpha1.AddToScheme(scheme.Scheme) - if err != nil { - stdlog.Fatal(err) - } - - err = resourcev1alpha1.AddToScheme(scheme.Scheme) - if err != nil { - stdlog.Fatal(err) - } - - if cfg, err = testEnv.Start(); err != nil { - stdlog.Fatal(err) - } - - if cfg2, err = testEnv2.Start(); err != nil { - stdlog.Fatal(err) - } - - k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) - if err != nil { - stdlog.Fatal(err) - } - - k8sClient2, err = client.New(cfg2, client.Options{Scheme: scheme.Scheme}) - if err != nil { - stdlog.Fatal(err) - } - - code := m.Run() - testEnv.Stop() - testEnv2.Stop() - os.Exit(code) -} - -// StartTestManager adds recFn -func StartTestManager(mgr manager.Manager, g *gomega.GomegaWithT) (chan struct{}, *sync.WaitGroup) { - stop := make(chan struct{}) - wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - g.Expect(mgr.Start(stop)).NotTo(gomega.HaveOccurred()) - }() - return stop, wg -} diff --git a/interoperator/controllers/multiclusterdeploy/sfservicesreplicator/sfservices_mcd_controller_test.go b/interoperator/controllers/multiclusterdeploy/sfservicesreplicator/sfservices_mcd_controller_test.go deleted file mode 100644 index f3cb5db7c..000000000 --- a/interoperator/controllers/multiclusterdeploy/sfservicesreplicator/sfservices_mcd_controller_test.go +++ /dev/null @@ -1,265 +0,0 @@ -/* -Copyright 2018 The Service Fabrik Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package sfservicesreplicator - -import ( - "context" - "fmt" - "testing" - "time" - - osbv1alpha1 "github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/api/osb/v1alpha1" - resourcev1alpha1 "github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/api/resource/v1alpha1" - mock_clusterRegistry "github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/pkg/cluster/registry/mock_registry" - "github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/pkg/constants" - - "github.com/golang/mock/gomock" - "github.com/onsi/gomega" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - ctrlrun "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/reconcile" -) - -var c, c2 client.Client - -var expectedRequest = reconcile.Request{NamespacedName: types.NamespacedName{Name: "foo", Namespace: constants.InteroperatorNamespace}} - -const timeout = time.Second * 5 - -func createAndTestSFServiceAndPlans(serviceName string, planName string, service *osbv1alpha1.SFService, plan *osbv1alpha1.SFPlan, t *testing.T, g *gomega.GomegaWithT) { - err := c.Create(context.TODO(), service) - if apierrors.IsInvalid(err) { - t.Logf("failed to create object, got an invalid object error: %v", err) - return - } - g.Expect(err).NotTo(gomega.HaveOccurred()) - - err = c.Create(context.TODO(), plan) - if apierrors.IsInvalid(err) { - t.Logf("failed to create object, got an invalid object error: %v", err) - return - } - g.Expect(err).NotTo(gomega.HaveOccurred()) - - g.Eventually(func() error { - err := c2.Get(context.TODO(), types.NamespacedName{ - Name: serviceName, - Namespace: constants.InteroperatorNamespace, - }, service) - if err != nil { - return err - } - return nil - }, timeout).Should(gomega.Succeed()) - g.Expect(service.GetName()).To(gomega.Equal(serviceName)) - - err = c.Get(context.TODO(), types.NamespacedName{Name: planName, Namespace: constants.InteroperatorNamespace}, plan) - g.Expect(err).NotTo(gomega.HaveOccurred()) - - g.Eventually(func() error { - err := c2.Get(context.TODO(), types.NamespacedName{ - Name: planName, - Namespace: constants.InteroperatorNamespace, - }, plan) - if err != nil { - return err - } - return nil - }, timeout).Should(gomega.Succeed()) - g.Expect(plan.GetName()).To(gomega.Equal(planName)) - - g.Expect(c.Delete(context.TODO(), plan)).NotTo(gomega.HaveOccurred()) - g.Expect(c.Delete(context.TODO(), service)).NotTo(gomega.HaveOccurred()) - - g.Eventually(func() error { - err := c.Get(context.TODO(), types.NamespacedName{ - Name: plan.GetName(), - Namespace: plan.GetNamespace(), - }, plan) - if err != nil { - if apierrors.IsNotFound(err) { - return nil - } - return err - } - return fmt.Errorf("not deleted") - }, 2*timeout).Should(gomega.Succeed()) - g.Eventually(func() error { - err := c.Get(context.TODO(), types.NamespacedName{ - Name: service.GetName(), - Namespace: service.GetNamespace(), - }, service) - if err != nil { - if apierrors.IsNotFound(err) { - return nil - } - return err - } - return fmt.Errorf("not deleted") - }, timeout).Should(gomega.Succeed()) -} - -func TestReconcile(t *testing.T) { - g := gomega.NewGomegaWithT(t) - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - service1 := &osbv1alpha1.SFService{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: constants.InteroperatorNamespace, - }, - Spec: osbv1alpha1.SFServiceSpec{ - ID: "foo", - }, - } - var templateSpec = []osbv1alpha1.TemplateSpec{ - osbv1alpha1.TemplateSpec{ - Action: "provision", - Type: "gotemplate", - Content: "provisioncontent", - }, - osbv1alpha1.TemplateSpec{ - Action: "bind", - Type: "gotemplate", - Content: "bindcontent", - }, - osbv1alpha1.TemplateSpec{ - Action: "status", - Type: "gotemplate", - Content: "statuscontent", - }, - osbv1alpha1.TemplateSpec{ - Action: "sources", - Type: "gotemplate", - Content: "sourcescontent", - }, - } - plan1 := &osbv1alpha1.SFPlan{ - ObjectMeta: metav1.ObjectMeta{ - Name: "bar", - Namespace: constants.InteroperatorNamespace, - }, - Spec: osbv1alpha1.SFPlanSpec{ - Name: "plan-name", - ID: "plan-id", - Description: "description", - Metadata: nil, - Free: false, - Bindable: true, - PlanUpdatable: true, - Schemas: nil, - Templates: templateSpec, - ServiceID: "service-id", - RawContext: nil, - Manager: nil, - }} - labels := make(map[string]string) - labels["serviceId"] = "foo" - plan1.SetLabels(labels) - // Setup the Manager and Controller. Wrap the Controller Reconcile function so it writes each request to a - // channel when it is finished. - mgr, err := manager.New(cfg, manager.Options{ - MetricsBindAddress: "0", - }) - g.Expect(err).NotTo(gomega.HaveOccurred()) - - c, err = client.New(cfg, client.Options{ - Scheme: mgr.GetScheme(), - Mapper: mgr.GetRESTMapper(), - }) - g.Expect(err).NotTo(gomega.HaveOccurred()) - - c2, err = client.New(cfg2, client.Options{ - Scheme: mgr.GetScheme(), - Mapper: mgr.GetRESTMapper(), - }) - g.Expect(err).NotTo(gomega.HaveOccurred()) - - sfcluster1 := &resourcev1alpha1.SFCluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: "1", - Namespace: constants.InteroperatorNamespace, - }, - } - - sfcluster2 := &resourcev1alpha1.SFCluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: "2", - Namespace: constants.InteroperatorNamespace, - }, - } - - mockClusterRegistry := mock_clusterRegistry.NewMockClusterRegistry(ctrl) - mockClusterRegistry.EXPECT().GetClient("1").Return(c, nil).AnyTimes() - mockClusterRegistry.EXPECT().GetClient("2").Return(c2, nil).AnyTimes() - mockClusterRegistry.EXPECT().ListClusters(nil).Return(&resourcev1alpha1.SFClusterList{ - Items: []resourcev1alpha1.SFCluster{*sfcluster1, *sfcluster2}, - }, nil).AnyTimes() - - controller := &ReconcileSFServices{ - Client: mgr.GetClient(), - Log: ctrlrun.Log.WithName("mcd").WithName("replicator").WithName("service"), - clusterRegistry: mockClusterRegistry, - } - g.Expect(controller.SetupWithManager(mgr)).NotTo(gomega.HaveOccurred()) - stopMgr, mgrStopped := StartTestManager(mgr, g) - - defer func() { - close(stopMgr) - mgrStopped.Wait() - }() - - g.Expect(c.Create(context.TODO(), sfcluster1)).NotTo(gomega.HaveOccurred()) - <-time.After(time.Second) - g.Expect(c.Create(context.TODO(), sfcluster2)).NotTo(gomega.HaveOccurred()) - - createAndTestSFServiceAndPlans("foo", "bar", service1, plan1, t, g) - - g.Expect(c.Delete(context.TODO(), sfcluster1)).NotTo(gomega.HaveOccurred()) - g.Expect(c.Delete(context.TODO(), sfcluster2)).NotTo(gomega.HaveOccurred()) - g.Eventually(func() error { - err := c.Get(context.TODO(), types.NamespacedName{ - Name: sfcluster1.GetName(), - Namespace: sfcluster1.GetNamespace(), - }, sfcluster1) - if err != nil { - if apierrors.IsNotFound(err) { - return nil - } - return err - } - return fmt.Errorf("not deleted") - }, 2*timeout).Should(gomega.Succeed()) - g.Eventually(func() error { - err := c.Get(context.TODO(), types.NamespacedName{ - Name: sfcluster2.GetName(), - Namespace: sfcluster2.GetNamespace(), - }, sfcluster2) - if err != nil { - if apierrors.IsNotFound(err) { - return nil - } - return err - } - return fmt.Errorf("not deleted") - }, timeout).Should(gomega.Succeed()) -}