Skip to content
This repository has been archived by the owner on Jan 29, 2025. It is now read-only.

Lazy Replication of SFService and SFPlan #1261

Merged
merged 1 commit into from
Mar 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/controllers/multiclusterdeploy/sfclusterreplicator"
"github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/controllers/multiclusterdeploy/sfservicebindingreplicator"
"github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/controllers/multiclusterdeploy/sfserviceinstancereplicator"
"github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/controllers/multiclusterdeploy/sfservicesreplicator"
"github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/controllers/multiclusterdeploy/watchmanager"

ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -70,14 +69,6 @@ func SetupWithManager(mgr ctrl.Manager) error {
return err
}

if err = (&sfservicesreplicator.ReconcileSFServices{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("mcd").WithName("replicator").WithName("service"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create service replicator", "controller", "ReconcileSFServices")
return err
}

if err = (&sfclusterreplicator.SFClusterReplicator{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("mcd").WithName("replicator").WithName("cluster"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/internal/config"
"github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/pkg/cluster/registry"
"github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/pkg/constants"
"github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/pkg/utils"
"github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/pkg/watches"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -143,6 +144,8 @@ func (r *InstanceReplicator) Reconcile(req ctrl.Request) (ctrl.Result, error) {
return ctrl.Result{}, err
}

lastErr := r.reconcileServicePlan(targetClient, instance, clusterID)

if !instance.GetDeletionTimestamp().IsZero() && state == "delete" {
replica.SetName(instance.GetName())
replica.SetNamespace(instance.GetNamespace())
Expand Down Expand Up @@ -251,6 +254,11 @@ func (r *InstanceReplicator) Reconcile(req ctrl.Request) (ctrl.Result, error) {
"replicaState", replicaState, "replicaLastOperation", replicaLastOperation)
}

if lastErr != nil {
// re que if service/plan replication failed
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -322,6 +330,101 @@ func (r *InstanceReplicator) reconcileNamespace(targetClient client.Client, name
return nil
}

func (r *InstanceReplicator) reconcileServicePlan(targetClient client.Client, instance *osbv1alpha1.SFServiceInstance, clusterID string) error {
ctx := context.Background()
serviceID := instance.Spec.ServiceID
planID := instance.Spec.PlanID
log := r.Log.WithValues("clusterID", clusterID, "serviceID", serviceID, "planID", planID)

var lastErr error

service, serviceReplica := &osbv1alpha1.SFService{}, &osbv1alpha1.SFService{}
serviceKey := types.NamespacedName{
Name: serviceID,
Namespace: constants.InteroperatorNamespace,
}

plan, planReplica := &osbv1alpha1.SFPlan{}, &osbv1alpha1.SFPlan{}
planKey := types.NamespacedName{
Name: planID,
Namespace: constants.InteroperatorNamespace,
}

err := r.Get(ctx, serviceKey, service)
if err != nil {
log.Error(err, "Failed to get SFService from leader")
lastErr = err
}

err = targetClient.Get(ctx, serviceKey, serviceReplica)
if err != nil {
if apiErrors.IsNotFound(err) {
replicateSFServiceResourceData(service, serviceReplica)
err = targetClient.Create(ctx, serviceReplica)
if err != nil {
log.Error(err, "Error occurred while replicating SFService to cluster ")
lastErr = err
} else {
log.Info("SFService not found in target cluster. created as copy from leader")
}
} else if !apiErrors.IsNotFound(err) {
log.Error(err, "Failed to fetch SFService from target cluster")
lastErr = err
}
} else {
replicateSFServiceResourceData(service, serviceReplica)
err = targetClient.Update(ctx, serviceReplica)
if err != nil {
log.Error(err, "Error occurred while replicating SFService to cluster")
lastErr = err
} else {
log.Info("updated SFService in target cluster")
}
}

err = r.Get(ctx, planKey, plan)
if err != nil {
log.Error(err, "Failed to get SFPlan from leader")
lastErr = err
}

err = targetClient.Get(ctx, planKey, planReplica)
if err != nil {
if apiErrors.IsNotFound(err) {
replicateSFPlanResourceData(plan, planReplica)
err = utils.SetOwnerReference(serviceReplica, planReplica, r.scheme)
if err != nil {
lastErr = err
}
err = targetClient.Create(ctx, planReplica)
if err != nil {
log.Error(err, "Error occurred while replicating SFPlan to cluster ")
lastErr = err
} else {
log.Info("SFPlan not found in target cluster. created as copy from leader")
}
} else if !apiErrors.IsNotFound(err) {
log.Error(err, "Failed to fetch SFPlan from target cluster")
lastErr = err
}
} else {
replicateSFPlanResourceData(plan, planReplica)
err = utils.SetOwnerReference(serviceReplica, planReplica, r.scheme)
if err != nil {
return err
}
err = targetClient.Update(ctx, planReplica)
if err != nil {
log.Error(err, "Error occurred while replicating SFPlan to cluster")
lastErr = err
} else {
log.Info("updated SFPlan in target cluster")
}
}

return lastErr
}

func (r *InstanceReplicator) setInProgress(instance *osbv1alpha1.SFServiceInstance, state string) error {
instanceID := instance.GetName()
clusterID, _ := instance.GetClusterID()
Expand Down Expand Up @@ -390,6 +493,20 @@ func copyObject(source, destination *osbv1alpha1.SFServiceInstance, preserveReso
}
}

func replicateSFServiceResourceData(source *osbv1alpha1.SFService, dest *osbv1alpha1.SFService) {
source.Spec.DeepCopyInto(&dest.Spec)
dest.SetName(source.GetName())
dest.SetNamespace(source.GetNamespace())
dest.SetLabels(source.GetLabels())
}

func replicateSFPlanResourceData(source *osbv1alpha1.SFPlan, dest *osbv1alpha1.SFPlan) {
source.Spec.DeepCopyInto(&dest.Spec)
dest.SetName(source.GetName())
dest.SetNamespace(source.GetNamespace())
dest.SetLabels(source.GetLabels())
}

// SetupWithManager registers the MCD Instance replicator with manager
// and setups the watches.
func (r *InstanceReplicator) SetupWithManager(mgr ctrl.Manager) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ var c, c2 client.Client

var expectedRequest = reconcile.Request{NamespacedName: types.NamespacedName{Name: "foo", Namespace: constants.InteroperatorNamespace}}
var instanceKey = types.NamespacedName{Name: "instance-id", Namespace: "sf-instance-id"}
var serviceKey = types.NamespacedName{Name: "service-id", Namespace: constants.InteroperatorNamespace}
var planKey = types.NamespacedName{Name: "plan-id", Namespace: constants.InteroperatorNamespace}

const timeout = time.Second * 5

Expand All @@ -70,6 +72,60 @@ var instance = &osbv1alpha1.SFServiceInstance{
},
}

var service = &osbv1alpha1.SFService{
ObjectMeta: metav1.ObjectMeta{
Name: "service-id",
Namespace: constants.InteroperatorNamespace,
},
Spec: osbv1alpha1.SFServiceSpec{
ID: "service-id",
},
}
var templateSpec = []osbv1alpha1.TemplateSpec{
{
Action: "provision",
Type: "gotemplate",
Content: "provisioncontent",
},
{
Action: "bind",
Type: "gotemplate",
Content: "bindcontent",
},
{
Action: "status",
Type: "gotemplate",
Content: "statuscontent",
},
{
Action: "sources",
Type: "gotemplate",
Content: "sourcescontent",
},
}
var plan = &osbv1alpha1.SFPlan{
ObjectMeta: metav1.ObjectMeta{
Name: "plan-id",
Namespace: constants.InteroperatorNamespace,
Labels: map[string]string{
"serviceId": "service-id",
},
},
Spec: osbv1alpha1.SFPlanSpec{
Name: "plan-name",
ID: "plan-id",
Description: "description",
Metadata: nil,
Free: false,
Bindable: true,
PlanUpdatable: true,
Schemas: nil,
Templates: templateSpec,
ServiceID: "service-id",
RawContext: nil,
Manager: nil,
}}

func TestReconcile(t *testing.T) {
instance2 := &osbv1alpha1.SFServiceInstance{}
watchChannel := make(chan event.GenericEvent)
Expand Down Expand Up @@ -121,6 +177,9 @@ func TestReconcile(t *testing.T) {
mgrStopped.Wait()
}()

g.Expect(c.Create(context.TODO(), service)).NotTo(gomega.HaveOccurred())
g.Expect(c.Create(context.TODO(), plan)).NotTo(gomega.HaveOccurred())

// Create a new namespace
ns := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -157,6 +216,20 @@ func TestReconcile(t *testing.T) {
})
g.Expect(err).NotTo(gomega.HaveOccurred())

// Plan and servive should be replicated in follower cluster
g.Eventually(func() error {
err = c2.Get(context.TODO(), serviceKey, service)
if err != nil {
return err
}
err = c2.Get(context.TODO(), planKey, plan)
if err != nil {
return err
}

return nil
}, timeout).Should(gomega.Succeed())

// State should be updated in master cluster
g.Eventually(func() error {
err := c.Get(context.TODO(), instanceKey, instance)
Expand Down
Loading