Skip to content

Commit

Permalink
fix(kuma-cp) deleted default policy is created on Kuma CP restart (#2507
Browse files Browse the repository at this point in the history
)

Signed-off-by: Ilya Lobkov <ilya.lobkov@konghq.com>
  • Loading branch information
lobkovilya authored Aug 11, 2021
1 parent 71862d8 commit fbd0831
Show file tree
Hide file tree
Showing 9 changed files with 240 additions and 37 deletions.
14 changes: 0 additions & 14 deletions pkg/core/resources/store/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ type CreateOptions struct {
Mesh string
CreationTime time.Time
Owner core_model.Resource
Synced bool
}

type CreateOptionsFunc func(*CreateOptions)
Expand Down Expand Up @@ -48,15 +47,8 @@ func CreateWithOwner(owner core_model.Resource) CreateOptionsFunc {
}
}

func CreateSynced() CreateOptionsFunc {
return func(opts *CreateOptions) {
opts.Synced = true
}
}

type UpdateOptions struct {
ModificationTime time.Time
Synced bool
}

func ModifiedAt(modificationTime time.Time) UpdateOptionsFunc {
Expand All @@ -65,12 +57,6 @@ func ModifiedAt(modificationTime time.Time) UpdateOptionsFunc {
}
}

func UpdateSynced() UpdateOptionsFunc {
return func(opts *UpdateOptions) {
opts.Synced = true
}
}

type UpdateOptionsFunc func(*UpdateOptions)

func NewUpdateOptions(fs ...UpdateOptionsFunc) *UpdateOptions {
Expand Down
3 changes: 1 addition & 2 deletions pkg/kds/store/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ func (s *syncResourceStore) Sync(upstream model.ResourceList, fs ...SyncOptionFu
createOpts := []store.CreateOptionsFunc{
store.CreateBy(rk),
store.CreatedAt(creationTime),
store.CreateSynced(),
}
if opts.Zone != "" {
createOpts = append(createOpts, store.CreateWithOwner(zone))
Expand All @@ -157,7 +156,7 @@ func (s *syncResourceStore) Sync(upstream model.ResourceList, fs ...SyncOptionFu
// some stores manage ModificationTime time on they own (Kubernetes), in order to be consistent
// we set ModificationTime when we add to downstream store. This time is almost the same with ModificationTime
// from upstream store, because we update downstream only when resource have changed in upstream
if err := s.resourceStore.Update(ctx, r, store.ModifiedAt(now), store.UpdateSynced()); err != nil {
if err := s.resourceStore.Update(ctx, r, store.ModifiedAt(now)); err != nil {
return err
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/plugins/common/k8s/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ const (
// The value has a format of a Kubernetes label name.
k8sNameComponent = "k8s.kuma.io/name"

// k8sSynced identifies that resource was synced
K8sSynced = "k8s.kuma.io/synced"
// K8sMeshDefaultsGenerated identifies that default resources for mesh were successfully generated
K8sMeshDefaultsGenerated = "k8s.kuma.io/mesh-defaults-generated"

// Kubernetes secret type to differentiate Kuma System secrets. Secret is bound to a mesh
MeshSecretType = "system.kuma.io/secret"
Expand Down
19 changes: 0 additions & 19 deletions pkg/plugins/resources/k8s/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@ func (s *KubernetesStore) Create(ctx context.Context, r core_model.Resource, fs
}
}

if opts.Synced {
markAsSynced(obj)
}

if err := s.Client.Create(ctx, obj); err != nil {
if kube_apierrs.IsAlreadyExists(err) {
return store.ErrorResourceAlreadyExists(r.Descriptor().Name, opts.Name, opts.Mesh)
Expand All @@ -77,27 +73,12 @@ func (s *KubernetesStore) Create(ctx context.Context, r core_model.Resource, fs
return nil
}

func markAsSynced(obj k8s_model.KubernetesObject) {
annotations := obj.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
}
annotations[k8s_common.K8sSynced] = "true"
obj.SetAnnotations(annotations)
}

func (s *KubernetesStore) Update(ctx context.Context, r core_model.Resource, fs ...store.UpdateOptionsFunc) error {
opts := store.NewUpdateOptions(fs...)

obj, err := s.Converter.ToKubernetesObject(r)
if err != nil {
return errors.Wrapf(err, "failed to convert core model of type %s into k8s counterpart", r.Descriptor().Name)
}

if opts.Synced {
markAsSynced(obj)
}

if err := s.Client.Update(ctx, obj); err != nil {
if kube_apierrs.IsConflict(err) {
return store.ErrorResourceConflict(r.Descriptor().Name, r.GetMeta().GetName(), r.GetMeta().GetMesh())
Expand Down
32 changes: 32 additions & 0 deletions pkg/plugins/runtime/k8s/controllers/mesh_defaults_controller.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
package controllers

import (
"context"

"github.com/pkg/errors"
kube_core "k8s.io/api/core/v1"
kube_apierrs "k8s.io/apimachinery/pkg/api/errors"
kube_ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/kumahq/kuma/pkg/core"
core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
"github.com/kumahq/kuma/pkg/core/resources/manager"
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/core/resources/store"
defaults_mesh "github.com/kumahq/kuma/pkg/defaults/mesh"
common_k8s "github.com/kumahq/kuma/pkg/plugins/common/k8s"
"github.com/kumahq/kuma/pkg/plugins/resources/k8s"
mesh_k8s "github.com/kumahq/kuma/pkg/plugins/resources/k8s/native/api/v1alpha1"
)

Expand All @@ -19,9 +28,32 @@ type MeshDefaultsReconciler struct {
}

func (r *MeshDefaultsReconciler) Reconcile(req kube_ctrl.Request) (kube_ctrl.Result, error) {
mesh := core_mesh.NewMeshResource()
if err := r.ResourceManager.Get(context.Background(), mesh, store.GetByKey(req.Name, core_model.NoMesh)); err != nil {
if kube_apierrs.IsNotFound(err) {
return kube_ctrl.Result{}, nil
}
return kube_ctrl.Result{}, errors.Wrap(err, "could not get default mesh resources")
}

// Before creating default policies for the mesh we want to ensure that this mesh wasn't processed before.
// We can't rely on filtering by CreateFunc, because apparently it sends the create event every time resource
// is added to the underlying Informer. That's why on the Kuma CP restart Mesh will be processed the second time
if processed := mesh.GetMeta().(*k8s.KubernetesMetaAdapter).GetAnnotations()[common_k8s.K8sMeshDefaultsGenerated]; processed == "true" {
return kube_ctrl.Result{}, nil
}

if err := defaults_mesh.EnsureDefaultMeshResources(r.ResourceManager, req.Name); err != nil {
return kube_ctrl.Result{}, errors.Wrap(err, "could not create default mesh resources")
}

if mesh.GetMeta().(*k8s.KubernetesMetaAdapter).GetAnnotations() == nil {
mesh.GetMeta().(*k8s.KubernetesMetaAdapter).Annotations = map[string]string{}
}
mesh.GetMeta().(*k8s.KubernetesMetaAdapter).GetAnnotations()[common_k8s.K8sMeshDefaultsGenerated] = "true"
if err := r.ResourceManager.Update(context.Background(), mesh, store.ModifiedAt(core.Now())); err != nil {
return kube_ctrl.Result{}, errors.Wrap(err, "could not update default mesh resources")
}
return kube_ctrl.Result{}, nil
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package controllers_test

import (
"context"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
kube_types "k8s.io/apimachinery/pkg/types"
kube_ctrl "sigs.k8s.io/controller-runtime"
kube_client "sigs.k8s.io/controller-runtime/pkg/client"
kube_client_fake "sigs.k8s.io/controller-runtime/pkg/client/fake"
kube_reconcile "sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
"github.com/kumahq/kuma/pkg/core/resources/apis/system"
resources_manager "github.com/kumahq/kuma/pkg/core/resources/manager"
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
core_store "github.com/kumahq/kuma/pkg/core/resources/store"
secret_cipher "github.com/kumahq/kuma/pkg/core/secrets/cipher"
secret_manager "github.com/kumahq/kuma/pkg/core/secrets/manager"
"github.com/kumahq/kuma/pkg/plugins/resources/k8s"
"github.com/kumahq/kuma/pkg/plugins/runtime/k8s/controllers"
secrets_k8s "github.com/kumahq/kuma/pkg/plugins/secrets/k8s"
)

var _ = Describe("MeshDefaultsReconciler", func() {

var kubeClient kube_client.Client
var resourceManager resources_manager.ResourceManager
var reconciler kube_reconcile.Reconciler

BeforeEach(func() {
kubeClient = kube_client_fake.NewFakeClientWithScheme(k8sClientScheme)
store, err := k8s.NewStore(kubeClient, k8sClientScheme, k8s.NewSimpleConverter())
Expect(err).ToNot(HaveOccurred())
secretStore, err := secrets_k8s.NewStore(kubeClient, kubeClient, "default")
Expect(err).ToNot(HaveOccurred())

resourceManager = resources_manager.NewResourceManager(store)
customizableManager := resources_manager.NewCustomizableResourceManager(resourceManager, nil)
customizableManager.Customize(
system.SecretType,
secret_manager.NewSecretManager(
secretStore,
secret_cipher.None(),
secret_manager.ValidateDelete(func(ctx context.Context, secretName string, secretMesh string) error { return nil })),
)

reconciler = &controllers.MeshDefaultsReconciler{
ResourceManager: customizableManager,
}
})

createMesh := func() {
Expect(
resourceManager.Create(context.Background(), mesh.NewMeshResource(), core_store.CreateByKey("default", core_model.NoMesh)),
).To(Succeed())
}

hasTrafficPermissions := func() bool {
trafficPermissions := &mesh.TrafficPermissionResourceList{}
Expect(
resourceManager.List(context.Background(), trafficPermissions, core_store.ListByMesh("default")),
).To(Succeed())
return len(trafficPermissions.Items) == 1
}

reconcile := func() {
_, err := reconciler.Reconcile(kube_ctrl.Request{
NamespacedName: kube_types.NamespacedName{
Name: "default",
},
})
Expect(err).ToNot(HaveOccurred())
}

deleteTrafficPermission := func() {
Expect(
resourceManager.Delete(context.Background(), mesh.NewTrafficPermissionResource(),
core_store.DeleteByKey("allow-all-default", "default")),
).To(Succeed())
}

It("should not create a new default policy if it was deleted", func() {
createMesh()
Expect(hasTrafficPermissions()).To(BeFalse())

reconcile()
Expect(hasTrafficPermissions()).To(BeTrue())

deleteTrafficPermission()
Expect(hasTrafficPermissions()).To(BeFalse())

reconcile()
Expect(hasTrafficPermissions()).To(BeFalse())
})
})
16 changes: 16 additions & 0 deletions test/e2e/trafficpermission/kubernetes/e2e_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package kubernetes_test

import (
"testing"

"github.com/kumahq/kuma/pkg/test"
"github.com/kumahq/kuma/test/framework"
)

func TestE2ETrafficPermissionKubernetes(t *testing.T) {
if framework.IsK8sClustersStarted() {
test.RunSpecs(t, "Traffic Permission Kubernetes Suite")
} else {
t.SkipNow()
}
}
83 changes: 83 additions & 0 deletions test/e2e/trafficpermission/kubernetes/traffic_permission_k8s.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package kubernetes

import (
"strings"
"time"

"github.com/gruntwork-io/terratest/modules/k8s"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

config_core "github.com/kumahq/kuma/pkg/config/core"
. "github.com/kumahq/kuma/test/framework"
)

func TrafficPermission() {
var k8sCluster Cluster
var optsKubernetes = KumaK8sDeployOpts

E2EBeforeSuite(func() {
k8sClusters, err := NewK8sClusters([]string{Kuma1}, Silent)
Expect(err).ToNot(HaveOccurred())

k8sCluster = k8sClusters.GetCluster(Kuma1)

Expect(Kuma(config_core.Standalone, optsKubernetes...)(k8sCluster)).To(Succeed())
Expect(k8sCluster.VerifyKuma()).To(Succeed())
})

E2EAfterSuite(func() {
Expect(k8sCluster.DeleteKuma(optsKubernetes...)).To(Succeed())
Expect(k8sCluster.DismissCluster()).To(Succeed())
})

removeDefaultTrafficPermission := func() {
err := k8s.RunKubectlE(k8sCluster.GetTesting(), k8sCluster.GetKubectlOptions(), "delete", "trafficpermission", "allow-all-default")
Expect(err).ToNot(HaveOccurred())
}

noDefaultTrafficPermission := func() {
Eventually(func() bool {
out, err := k8s.RunKubectlAndGetOutputE(k8sCluster.GetTesting(), k8sCluster.GetKubectlOptions(), "get", "trafficpermissions")
if err != nil {
return false
}
return !strings.Contains(out, "allow-all-default")
}, "30s", "1s").Should(BeTrue())
}

defaultPoliciesCreated := func() {
Eventually(func() bool {
out, err := k8s.RunKubectlAndGetOutputE(k8sCluster.GetTesting(), k8sCluster.GetKubectlOptions(), "get", "meshes", "-o", "yaml")
if err != nil {
return false
}
return strings.Contains(out, "k8s.kuma.io/mesh-defaults-generated")
}, "30s", "1s").Should(BeTrue())
}

restartKumaCP := func() {
pods := k8sCluster.GetKuma().(*K8sControlPlane).GetKumaCPPods()
Expect(pods).To(HaveLen(1))
err := k8s.RunKubectlE(k8sCluster.GetTesting(), k8sCluster.GetKubectlOptions(), "delete", "pod", pods[0].GetName(), "-n", pods[0].GetNamespace())
Expect(err).ToNot(HaveOccurred())
Expect(k8sCluster.(*K8sCluster).WaitApp(KumaServiceName, KumaNamespace, 1)).To(Succeed())
}

It("should not create deleted default traffic permission after Kuma CP restart", func() {
// given
defaultPoliciesCreated()

// when
removeDefaultTrafficPermission()
// then
noDefaultTrafficPermission()

// when
restartKumaCP()
// and when
time.Sleep(10 * time.Second)
// then
noDefaultTrafficPermission()
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package kubernetes_test

import (
. "github.com/onsi/ginkgo"

"github.com/kumahq/kuma/test/e2e/trafficpermission/kubernetes"
)

var _ = Describe("Traffic Permission on Kubernetes", kubernetes.TrafficPermission)

0 comments on commit fbd0831

Please sign in to comment.