Skip to content
This repository has been archived by the owner on Jan 29, 2025. It is now read-only.

Commit

Permalink
Merge pull request #1261 from vivekzhere/lazy
Browse files Browse the repository at this point in the history
Lazy Replication of SFService and SFPlan
  • Loading branch information
Vivek Anand T Kallampally authored Mar 12, 2021
2 parents b55c27a + e1121ae commit 54a23db
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 648 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/controllers/multiclusterdeploy/sfclusterreplicator"
"github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/controllers/multiclusterdeploy/sfservicebindingreplicator"
"github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/controllers/multiclusterdeploy/sfserviceinstancereplicator"
"github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/controllers/multiclusterdeploy/sfservicesreplicator"
"github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/controllers/multiclusterdeploy/watchmanager"

ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -70,14 +69,6 @@ func SetupWithManager(mgr ctrl.Manager) error {
return err
}

if err = (&sfservicesreplicator.ReconcileSFServices{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("mcd").WithName("replicator").WithName("service"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create service replicator", "controller", "ReconcileSFServices")
return err
}

if err = (&sfclusterreplicator.SFClusterReplicator{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("mcd").WithName("replicator").WithName("cluster"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/internal/config"
"github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/pkg/cluster/registry"
"github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/pkg/constants"
"github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/pkg/utils"
"github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/pkg/watches"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -143,6 +144,8 @@ func (r *InstanceReplicator) Reconcile(req ctrl.Request) (ctrl.Result, error) {
return ctrl.Result{}, err
}

lastErr := r.reconcileServicePlan(targetClient, instance, clusterID)

if !instance.GetDeletionTimestamp().IsZero() && state == "delete" {
replica.SetName(instance.GetName())
replica.SetNamespace(instance.GetNamespace())
Expand Down Expand Up @@ -251,6 +254,11 @@ func (r *InstanceReplicator) Reconcile(req ctrl.Request) (ctrl.Result, error) {
"replicaState", replicaState, "replicaLastOperation", replicaLastOperation)
}

if lastErr != nil {
// re que if service/plan replication failed
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -322,6 +330,101 @@ func (r *InstanceReplicator) reconcileNamespace(targetClient client.Client, name
return nil
}

func (r *InstanceReplicator) reconcileServicePlan(targetClient client.Client, instance *osbv1alpha1.SFServiceInstance, clusterID string) error {
ctx := context.Background()
serviceID := instance.Spec.ServiceID
planID := instance.Spec.PlanID
log := r.Log.WithValues("clusterID", clusterID, "serviceID", serviceID, "planID", planID)

var lastErr error

service, serviceReplica := &osbv1alpha1.SFService{}, &osbv1alpha1.SFService{}
serviceKey := types.NamespacedName{
Name: serviceID,
Namespace: constants.InteroperatorNamespace,
}

plan, planReplica := &osbv1alpha1.SFPlan{}, &osbv1alpha1.SFPlan{}
planKey := types.NamespacedName{
Name: planID,
Namespace: constants.InteroperatorNamespace,
}

err := r.Get(ctx, serviceKey, service)
if err != nil {
log.Error(err, "Failed to get SFService from leader")
lastErr = err
}

err = targetClient.Get(ctx, serviceKey, serviceReplica)
if err != nil {
if apiErrors.IsNotFound(err) {
replicateSFServiceResourceData(service, serviceReplica)
err = targetClient.Create(ctx, serviceReplica)
if err != nil {
log.Error(err, "Error occurred while replicating SFService to cluster ")
lastErr = err
} else {
log.Info("SFService not found in target cluster. created as copy from leader")
}
} else if !apiErrors.IsNotFound(err) {
log.Error(err, "Failed to fetch SFService from target cluster")
lastErr = err
}
} else {
replicateSFServiceResourceData(service, serviceReplica)
err = targetClient.Update(ctx, serviceReplica)
if err != nil {
log.Error(err, "Error occurred while replicating SFService to cluster")
lastErr = err
} else {
log.Info("updated SFService in target cluster")
}
}

err = r.Get(ctx, planKey, plan)
if err != nil {
log.Error(err, "Failed to get SFPlan from leader")
lastErr = err
}

err = targetClient.Get(ctx, planKey, planReplica)
if err != nil {
if apiErrors.IsNotFound(err) {
replicateSFPlanResourceData(plan, planReplica)
err = utils.SetOwnerReference(serviceReplica, planReplica, r.scheme)
if err != nil {
lastErr = err
}
err = targetClient.Create(ctx, planReplica)
if err != nil {
log.Error(err, "Error occurred while replicating SFPlan to cluster ")
lastErr = err
} else {
log.Info("SFPlan not found in target cluster. created as copy from leader")
}
} else if !apiErrors.IsNotFound(err) {
log.Error(err, "Failed to fetch SFPlan from target cluster")
lastErr = err
}
} else {
replicateSFPlanResourceData(plan, planReplica)
err = utils.SetOwnerReference(serviceReplica, planReplica, r.scheme)
if err != nil {
return err
}
err = targetClient.Update(ctx, planReplica)
if err != nil {
log.Error(err, "Error occurred while replicating SFPlan to cluster")
lastErr = err
} else {
log.Info("updated SFPlan in target cluster")
}
}

return lastErr
}

func (r *InstanceReplicator) setInProgress(instance *osbv1alpha1.SFServiceInstance, state string) error {
instanceID := instance.GetName()
clusterID, _ := instance.GetClusterID()
Expand Down Expand Up @@ -390,6 +493,20 @@ func copyObject(source, destination *osbv1alpha1.SFServiceInstance, preserveReso
}
}

func replicateSFServiceResourceData(source *osbv1alpha1.SFService, dest *osbv1alpha1.SFService) {
source.Spec.DeepCopyInto(&dest.Spec)
dest.SetName(source.GetName())
dest.SetNamespace(source.GetNamespace())
dest.SetLabels(source.GetLabels())
}

func replicateSFPlanResourceData(source *osbv1alpha1.SFPlan, dest *osbv1alpha1.SFPlan) {
source.Spec.DeepCopyInto(&dest.Spec)
dest.SetName(source.GetName())
dest.SetNamespace(source.GetNamespace())
dest.SetLabels(source.GetLabels())
}

// SetupWithManager registers the MCD Instance replicator with manager
// and setups the watches.
func (r *InstanceReplicator) SetupWithManager(mgr ctrl.Manager) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ var c, c2 client.Client

var expectedRequest = reconcile.Request{NamespacedName: types.NamespacedName{Name: "foo", Namespace: constants.InteroperatorNamespace}}
var instanceKey = types.NamespacedName{Name: "instance-id", Namespace: "sf-instance-id"}
var serviceKey = types.NamespacedName{Name: "service-id", Namespace: constants.InteroperatorNamespace}
var planKey = types.NamespacedName{Name: "plan-id", Namespace: constants.InteroperatorNamespace}

const timeout = time.Second * 5

Expand All @@ -70,6 +72,60 @@ var instance = &osbv1alpha1.SFServiceInstance{
},
}

var service = &osbv1alpha1.SFService{
ObjectMeta: metav1.ObjectMeta{
Name: "service-id",
Namespace: constants.InteroperatorNamespace,
},
Spec: osbv1alpha1.SFServiceSpec{
ID: "service-id",
},
}
var templateSpec = []osbv1alpha1.TemplateSpec{
{
Action: "provision",
Type: "gotemplate",
Content: "provisioncontent",
},
{
Action: "bind",
Type: "gotemplate",
Content: "bindcontent",
},
{
Action: "status",
Type: "gotemplate",
Content: "statuscontent",
},
{
Action: "sources",
Type: "gotemplate",
Content: "sourcescontent",
},
}
var plan = &osbv1alpha1.SFPlan{
ObjectMeta: metav1.ObjectMeta{
Name: "plan-id",
Namespace: constants.InteroperatorNamespace,
Labels: map[string]string{
"serviceId": "service-id",
},
},
Spec: osbv1alpha1.SFPlanSpec{
Name: "plan-name",
ID: "plan-id",
Description: "description",
Metadata: nil,
Free: false,
Bindable: true,
PlanUpdatable: true,
Schemas: nil,
Templates: templateSpec,
ServiceID: "service-id",
RawContext: nil,
Manager: nil,
}}

func TestReconcile(t *testing.T) {
instance2 := &osbv1alpha1.SFServiceInstance{}
watchChannel := make(chan event.GenericEvent)
Expand Down Expand Up @@ -121,6 +177,9 @@ func TestReconcile(t *testing.T) {
mgrStopped.Wait()
}()

g.Expect(c.Create(context.TODO(), service)).NotTo(gomega.HaveOccurred())
g.Expect(c.Create(context.TODO(), plan)).NotTo(gomega.HaveOccurred())

// Create a new namespace
ns := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -157,6 +216,20 @@ func TestReconcile(t *testing.T) {
})
g.Expect(err).NotTo(gomega.HaveOccurred())

// Plan and servive should be replicated in follower cluster
g.Eventually(func() error {
err = c2.Get(context.TODO(), serviceKey, service)
if err != nil {
return err
}
err = c2.Get(context.TODO(), planKey, plan)
if err != nil {
return err
}

return nil
}, timeout).Should(gomega.Succeed())

// State should be updated in master cluster
g.Eventually(func() error {
err := c.Get(context.TODO(), instanceKey, instance)
Expand Down
Loading

0 comments on commit 54a23db

Please sign in to comment.