diff --git a/cluster/kubernetes/kubernetes.go b/cluster/kubernetes/kubernetes.go index a4bc8c429..353c88b5e 100644 --- a/cluster/kubernetes/kubernetes.go +++ b/cluster/kubernetes/kubernetes.go @@ -3,6 +3,7 @@ package kubernetes import ( "bytes" "fmt" + "sync" k8syaml "github.com/ghodss/yaml" "github.com/go-kit/kit/log" @@ -42,7 +43,6 @@ type extendedClient struct { type apiObject struct { bytes []byte - Version string `yaml:"apiVersion"` Kind string `yaml:"kind"` Metadata struct { Name string `yaml:"name"` @@ -50,11 +50,8 @@ type apiObject struct { } `yaml:"metadata"` } -func (obj *apiObject) namespaceOrDefault() string { - if obj.Metadata.Namespace == "" { - return "default" - } - return obj.Metadata.Namespace +func (o *apiObject) hasNamespace() bool { + return o.Metadata.Namespace != "" } // --- add-ons @@ -88,9 +85,33 @@ func isAddon(obj namespacedLabeled) bool { // --- /add ons +type changeSet struct { + nsObjs map[string][]obj + noNsObjs map[string][]obj +} + +func makeChangeSet() changeSet { + return changeSet{ + nsObjs: make(map[string][]obj), + noNsObjs: make(map[string][]obj), + } +} + +func (c *changeSet) stage(cmd, id string, o *apiObject) { + if o.hasNamespace() { + c.nsObjs[cmd] = append(c.nsObjs[cmd], obj{id, o}) + } else { + c.noNsObjs[cmd] = append(c.noNsObjs[cmd], obj{id, o}) + } +} + +type obj struct { + id string + *apiObject +} + type Applier interface { - Delete(logger log.Logger, def *apiObject) error - Apply(logger log.Logger, def *apiObject) error + apply(log.Logger, changeSet, cluster.SyncError) } // Cluster is a handle to a Kubernetes API server. @@ -98,18 +119,18 @@ type Applier interface { type Cluster struct { client extendedClient applier Applier - actionc chan func() version string // string response for the version command. logger log.Logger sshKeyRing ssh.KeyRing + + mu sync.Mutex } -// NewCluster returns a usable cluster. Host should be of the form -// "http://hostname:8080". +// NewCluster returns a usable cluster. func NewCluster(clientset k8sclient.Interface, applier Applier, sshKeyRing ssh.KeyRing, - logger log.Logger) (*Cluster, error) { + logger log.Logger) *Cluster { c := &Cluster{ client: extendedClient{ @@ -117,27 +138,14 @@ func NewCluster(clientset k8sclient.Interface, clientset.Core(), clientset.Extensions(), clientset.AppsV1beta1(), - clientset.BatchV2alpha1()}, + clientset.BatchV2alpha1(), + }, applier: applier, - actionc: make(chan func()), logger: logger, sshKeyRing: sshKeyRing, } - go c.loop() - return c, nil -} - -// Stop terminates the goroutine that serializes and executes requests against -// the cluster. A stopped cluster cannot be restarted. -func (c *Cluster) Stop() { - close(c.actionc) -} - -func (c *Cluster) loop() { - for f := range c.actionc { - f() - } + return c } // --- cluster.Cluster @@ -207,40 +215,40 @@ func (c *Cluster) AllControllers(namespace string) (res []cluster.Controller, er // Sync performs the given actions on resources. Operations are // asynchronous, but serialised. func (c *Cluster) Sync(spec cluster.SyncDef) error { - errc := make(chan error) logger := log.With(c.logger, "method", "Sync") - c.actionc <- func() { - errs := cluster.SyncError{} - for _, action := range spec.Actions { - logger := log.With(logger, "resource", action.ResourceID) - if len(action.Delete) > 0 { - obj, err := definitionObj(action.Delete) - if err == nil { - err = c.applier.Delete(logger, obj) - } - if err != nil { - errs[action.ResourceID] = err - continue - } + + cs := makeChangeSet() + errs := cluster.SyncError{} + for _, action := range spec.Actions { + stages := []struct { + b []byte + cmd string + }{ + {action.Delete, "delete"}, + {action.Apply, "apply"}, + } + for _, stage := range stages { + if len(stage.b) == 0 { + continue } - if len(action.Apply) > 0 { - obj, err := definitionObj(action.Apply) - if err == nil { - err = c.applier.Apply(logger, obj) - } - if err != nil { - errs[action.ResourceID] = err - continue - } + obj, err := definitionObj(stage.b) + id := action.ResourceID + if err == nil { + cs.stage(stage.cmd, id, obj) + } else { + errs[id] = err + break } } - if len(errs) > 0 { - errc <- errs - } else { - errc <- nil - } } - return <-errc + + c.mu.Lock() + defer c.mu.Unlock() + c.applier.apply(logger, cs, errs) + if len(errs) != 0 { + return errs + } + return nil } func (c *Cluster) Ping() error { diff --git a/cluster/kubernetes/kubernetes_test.go b/cluster/kubernetes/kubernetes_test.go index b1aebb482..a1a4b2ff0 100644 --- a/cluster/kubernetes/kubernetes_test.go +++ b/cluster/kubernetes/kubernetes_test.go @@ -4,218 +4,37 @@ package kubernetes // adequate. Starting with Sync. import ( - "errors" - "reflect" "testing" "github.com/go-kit/kit/log" - discovery "k8s.io/client-go/discovery" - admissionregistrationv1alpha1 "k8s.io/client-go/kubernetes/typed/admissionregistration/v1alpha1" - appsv1beta1 "k8s.io/client-go/kubernetes/typed/apps/v1beta1" - authenticationv1 "k8s.io/client-go/kubernetes/typed/authentication/v1" - authenticationv1beta1 "k8s.io/client-go/kubernetes/typed/authentication/v1beta1" - authorizationv1 "k8s.io/client-go/kubernetes/typed/authorization/v1" - authorizationv1beta1 "k8s.io/client-go/kubernetes/typed/authorization/v1beta1" - autoscalingv1 "k8s.io/client-go/kubernetes/typed/autoscaling/v1" - autoscalingv2alpha1 "k8s.io/client-go/kubernetes/typed/autoscaling/v2alpha1" - batchv1 "k8s.io/client-go/kubernetes/typed/batch/v1" - batchv2alpha1 "k8s.io/client-go/kubernetes/typed/batch/v2alpha1" - certificatesv1beta1 "k8s.io/client-go/kubernetes/typed/certificates/v1beta1" - corev1 "k8s.io/client-go/kubernetes/typed/core/v1" - extensionsv1beta1 "k8s.io/client-go/kubernetes/typed/extensions/v1beta1" - networkingv1 "k8s.io/client-go/kubernetes/typed/networking/v1" - policyv1beta1 "k8s.io/client-go/kubernetes/typed/policy/v1beta1" - rbacv1alpha1 "k8s.io/client-go/kubernetes/typed/rbac/v1alpha1" - rbacv1beta1 "k8s.io/client-go/kubernetes/typed/rbac/v1beta1" - settingsv1alpha1 "k8s.io/client-go/kubernetes/typed/settings/v1alpha1" - storagev1 "k8s.io/client-go/kubernetes/typed/storage/v1" - storagev1beta1 "k8s.io/client-go/kubernetes/typed/storage/v1beta1" "github.com/weaveworks/flux/cluster" ) -type command struct { - action string - def string -} - -type mockClientset struct { -} - -func (m *mockClientset) Discovery() discovery.DiscoveryInterface { - return nil -} - -func (m *mockClientset) AdmissionregistrationV1alpha1() admissionregistrationv1alpha1.AdmissionregistrationV1alpha1Interface { - return nil -} - -func (m *mockClientset) Admissionregistration() admissionregistrationv1alpha1.AdmissionregistrationV1alpha1Interface { - return nil -} - -func (m *mockClientset) CoreV1() corev1.CoreV1Interface { - return nil -} - -func (m *mockClientset) Core() corev1.CoreV1Interface { - return nil -} - -func (m *mockClientset) AppsV1beta1() appsv1beta1.AppsV1beta1Interface { - return nil -} - -func (m *mockClientset) Apps() appsv1beta1.AppsV1beta1Interface { - return nil -} - -func (m *mockClientset) AuthenticationV1() authenticationv1.AuthenticationV1Interface { - return nil -} - -func (m *mockClientset) Authentication() authenticationv1.AuthenticationV1Interface { - return nil -} - -func (m *mockClientset) AuthenticationV1beta1() authenticationv1beta1.AuthenticationV1beta1Interface { - return nil -} - -func (m *mockClientset) AuthorizationV1() authorizationv1.AuthorizationV1Interface { - return nil -} - -func (m *mockClientset) Authorization() authorizationv1.AuthorizationV1Interface { - return nil -} - -func (m *mockClientset) AuthorizationV1beta1() authorizationv1beta1.AuthorizationV1beta1Interface { - return nil -} - -func (m *mockClientset) AutoscalingV1() autoscalingv1.AutoscalingV1Interface { - return nil -} - -func (m *mockClientset) Autoscaling() autoscalingv1.AutoscalingV1Interface { - return nil -} - -func (m *mockClientset) AutoscalingV2alpha1() autoscalingv2alpha1.AutoscalingV2alpha1Interface { - return nil -} - -func (m *mockClientset) BatchV1() batchv1.BatchV1Interface { - return nil -} - -func (m *mockClientset) Batch() batchv1.BatchV1Interface { - return nil -} - -func (m *mockClientset) BatchV2alpha1() batchv2alpha1.BatchV2alpha1Interface { - return nil -} - -func (m *mockClientset) CertificatesV1beta1() certificatesv1beta1.CertificatesV1beta1Interface { - return nil -} - -func (m *mockClientset) Certificates() certificatesv1beta1.CertificatesV1beta1Interface { - return nil -} - -func (m *mockClientset) ExtensionsV1beta1() extensionsv1beta1.ExtensionsV1beta1Interface { - return nil -} - -func (m *mockClientset) Extensions() extensionsv1beta1.ExtensionsV1beta1Interface { - return nil -} - -func (m *mockClientset) NetworkingV1() networkingv1.NetworkingV1Interface { - return nil -} - -func (m *mockClientset) Networking() networkingv1.NetworkingV1Interface { - return nil -} - -func (m *mockClientset) PolicyV1beta1() policyv1beta1.PolicyV1beta1Interface { - return nil -} - -func (m *mockClientset) Policy() policyv1beta1.PolicyV1beta1Interface { - return nil -} - -func (m *mockClientset) RbacV1beta1() rbacv1beta1.RbacV1beta1Interface { - return nil -} - -func (m *mockClientset) Rbac() rbacv1beta1.RbacV1beta1Interface { - return nil -} - -func (m *mockClientset) RbacV1alpha1() rbacv1alpha1.RbacV1alpha1Interface { - return nil -} - -func (m *mockClientset) SettingsV1alpha1() settingsv1alpha1.SettingsV1alpha1Interface { - return nil -} - -func (m *mockClientset) Settings() settingsv1alpha1.SettingsV1alpha1Interface { - return nil -} - -func (m *mockClientset) StorageV1beta1() storagev1beta1.StorageV1beta1Interface { - return nil -} - -func (m *mockClientset) StorageV1() storagev1.StorageV1Interface { - return nil -} - -func (m *mockClientset) Storage() storagev1.StorageV1Interface { - return nil -} - type mockApplier struct { - commands []command - applyErr error - createErr error - deleteErr error + commandRun bool } -func (m *mockApplier) Apply(logger log.Logger, obj *apiObject) error { - m.commands = append(m.commands, command{"apply", string(obj.Metadata.Name)}) - return m.applyErr -} - -func (m *mockApplier) Delete(logger log.Logger, obj *apiObject) error { - m.commands = append(m.commands, command{"delete", string(obj.Metadata.Name)}) - return m.deleteErr +func (m *mockApplier) apply(_ log.Logger, c changeSet, _ cluster.SyncError) { + if len(c.nsObjs) != 0 || len(c.noNsObjs) != 0 { + m.commandRun = true + } } func deploymentDef(name string) []byte { return []byte(`--- kind: Deployment metadata: - name: ` + name + ` - namespace: test-ns -`) + name: ` + name) } // --- func setup(t *testing.T) (*Cluster, *mockApplier) { - clientset := &mockClientset{} applier := &mockApplier{} - kube, err := NewCluster(clientset, applier, nil, log.NewNopLogger()) - if err != nil { - t.Fatal(err) + kube := &Cluster{ + applier: applier, + logger: log.NewNopLogger(), } return kube, applier } @@ -225,8 +44,8 @@ func TestSyncNop(t *testing.T) { if err := kube.Sync(cluster.SyncDef{}); err != nil { t.Error(err) } - if len(mock.commands) > 0 { - t.Errorf("expected no commands run, but got %#v", mock.commands) + if mock.commandRun { + t.Error("expected no commands run") } } @@ -243,70 +62,7 @@ func TestSyncMalformed(t *testing.T) { if err == nil { t.Error("expected error because malformed resource def, but got nil") } - if len(mock.commands) > 0 { - t.Errorf("expected no commands run, but got %#v", mock.commands) - } -} - -func TestSyncOrder(t *testing.T) { - kube, mock := setup(t) - if err := kube.Sync(cluster.SyncDef{ - Actions: []cluster.SyncAction{ - cluster.SyncAction{ - ResourceID: "foobar", - Delete: deploymentDef("delete first"), - Apply: deploymentDef("apply last"), - }, - }, - }); err != nil { - t.Error(err) - } - - expected := []command{ - command{"delete", "delete first"}, - command{"apply", "apply last"}, - } - if !reflect.DeepEqual(expected, mock.commands) { - t.Errorf("expected commands:\n%#v\ngot:\n%#v", expected, mock.commands) - } -} - -// Test that getting an error in the middle of an action records the -// error, and skips to the next action. -func TestSkipOnError(t *testing.T) { - kube, mock := setup(t) - mock.deleteErr = errors.New("create failed") - - def := cluster.SyncDef{ - Actions: []cluster.SyncAction{ - cluster.SyncAction{ - ResourceID: "fail in middle", - Delete: deploymentDef("should fail"), - Apply: deploymentDef("skipped"), - }, - cluster.SyncAction{ - ResourceID: "proceed", - Apply: deploymentDef("apply works"), - }, - }, - } - - err := kube.Sync(def) - switch err := err.(type) { - case cluster.SyncError: - if _, ok := err["fail in middle"]; !ok { - t.Errorf("expected error for failing resource %q, but got %#v", "fail in middle", err) - } - default: - t.Errorf("expected sync error, got %#v", err) - } - - expected := []command{ - command{"delete", "should fail"}, - // skip to next resource after failure - command{"apply", "apply works"}, - } - if !reflect.DeepEqual(expected, mock.commands) { - t.Errorf("expected commands:\n%#v\ngot:\n%#v", expected, mock.commands) + if mock.commandRun { + t.Error("expected no commands run") } } diff --git a/cluster/kubernetes/release.go b/cluster/kubernetes/release.go index ed25cba13..98551d39f 100644 --- a/cluster/kubernetes/release.go +++ b/cluster/kubernetes/release.go @@ -10,17 +10,20 @@ import ( "github.com/go-kit/kit/log" "github.com/pkg/errors" + "github.com/weaveworks/flux/cluster" rest "k8s.io/client-go/rest" ) -func NewKubectl(exe string, config *rest.Config, stdout, stderr io.Writer) *Kubectl { - return &Kubectl{exe, config, stdout, stderr} +type Kubectl struct { + exe string + config *rest.Config } -type Kubectl struct { - exe string - config *rest.Config - stdout, stderr io.Writer +func NewKubectl(exe string, config *rest.Config) *Kubectl { + return &Kubectl{ + exe: exe, + config: config, + } } func (c *Kubectl) connectArgs() []string { @@ -49,16 +52,39 @@ func (c *Kubectl) connectArgs() []string { return args } -func (c *Kubectl) kubectlCommand(args ...string) *exec.Cmd { - cmd := exec.Command(c.exe, append(c.connectArgs(), args...)...) - cmd.Stdout = c.stdout - cmd.Stderr = c.stderr - return cmd +func (c *Kubectl) apply(logger log.Logger, cs changeSet, errs cluster.SyncError) { + f := func(m map[string][]obj, cmd string, args ...string) { + objs := m[cmd] + if len(objs) == 0 { + return + } + args = append(args, cmd) + if err := c.doCommand(logger, makeMultidoc(objs), args...); err != nil { + for _, obj := range objs { + r := bytes.NewReader(obj.bytes) + if err := c.doCommand(logger, r, args...); err != nil { + errs[obj.id] = err + } + } + } + } + + // When deleting resources we must ensure any resource in a non-default + // namespace is deleted before the namespace that it is in. Since namespace + // resources don't specify a namespace, this ordering guarantees that. + f(cs.nsObjs, "delete") + f(cs.noNsObjs, "delete", "--namespace", "default") + // Likewise, when applying resources we must ensure the namespace is applied + // first, so we run the commands the other way round. + f(cs.noNsObjs, "apply", "--namespace", "default") + f(cs.nsObjs, "apply") + } -func (c *Kubectl) doCommand(logger log.Logger, newDefinition []byte, args ...string) error { +func (c *Kubectl) doCommand(logger log.Logger, r io.Reader, args ...string) error { + args = append(args, "-f", "-") cmd := c.kubectlCommand(args...) - cmd.Stdin = bytes.NewReader(newDefinition) + cmd.Stdin = r stderr := &bytes.Buffer{} cmd.Stderr = stderr stdout := &bytes.Buffer{} @@ -74,10 +100,14 @@ func (c *Kubectl) doCommand(logger log.Logger, newDefinition []byte, args ...str return err } -func (c *Kubectl) Delete(logger log.Logger, obj *apiObject) error { - return c.doCommand(logger, obj.bytes, "--namespace", obj.namespaceOrDefault(), "delete", "-f", "-") +func makeMultidoc(objs []obj) *bytes.Buffer { + buf := &bytes.Buffer{} + for _, obj := range objs { + buf.WriteString("---\n" + string(obj.bytes)) + } + return buf } -func (c *Kubectl) Apply(logger log.Logger, obj *apiObject) error { - return c.doCommand(logger, obj.bytes, "--namespace", obj.namespaceOrDefault(), "apply", "-f", "-") +func (c *Kubectl) kubectlCommand(args ...string) *exec.Cmd { + return exec.Command(c.exe, append(c.connectArgs(), args...)...) } diff --git a/cmd/fluxd/main.go b/cmd/fluxd/main.go index df543d2b9..a7a8f0b47 100644 --- a/cmd/fluxd/main.go +++ b/cmd/fluxd/main.go @@ -146,7 +146,7 @@ func main() { var clusterVersion string var sshKeyRing ssh.KeyRing var k8s cluster.Cluster - var image_creds func() registry.ImageCreds + var imageCreds func() registry.ImageCreds var k8sManifests cluster.Manifests { restClientConfig, err := rest.InClusterConfig() @@ -209,21 +209,17 @@ func main() { } logger.Log("kubectl", kubectl) - kubectlApplier := kubernetes.NewKubectl(kubectl, restClientConfig, os.Stdout, os.Stderr) - k8s_inst, err := kubernetes.NewCluster(clientset, kubectlApplier, sshKeyRing, logger) - if err != nil { - logger.Log("err", err) - os.Exit(1) - } + kubectlApplier := kubernetes.NewKubectl(kubectl, restClientConfig) + k8sInst := kubernetes.NewCluster(clientset, kubectlApplier, sshKeyRing, logger) - if err := k8s_inst.Ping(); err != nil { + if err := k8sInst.Ping(); err != nil { logger.Log("ping", err) } else { logger.Log("ping", true) } - image_creds = k8s_inst.ImagesToFetch - k8s = k8s_inst + imageCreds = k8sInst.ImagesToFetch + k8s = k8sInst // There is only one way we currently interpret a repo of // files as manifests, and that's as Kubernetes yamels. k8sManifests = &kubernetes.Manifests{} @@ -443,7 +439,7 @@ func main() { cacheWarmer.Notify = daemon.AskForImagePoll cacheWarmer.Priority = daemon.ImageRefresh shutdownWg.Add(1) - go cacheWarmer.Loop(log.With(logger, "component", "warmer"), shutdown, shutdownWg, image_creds) + go cacheWarmer.Loop(log.With(logger, "component", "warmer"), shutdown, shutdownWg, imageCreds) // Update daemonRef so that upstream and handlers point to fully working daemon daemonRef.UpdatePlatform(daemon) diff --git a/sync/sync.go b/sync/sync.go index c7dba38d8..1435d6a71 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -29,53 +29,22 @@ func Sync(m cluster.Manifests, repoResources map[string]resource.Resource, clus // no-op. sync := cluster.SyncDef{} - nsClusterResources, otherClusterResources := separateResourcesByType(clusterResources) - nsRepoResources, otherRepoResources := separateResourcesByType(repoResources) - - // First tackle resources that are not Namespace kind, in case we are deleting the Namespace as well - // Deleting a Namespace first, then a resource in this namespace causes an error - // DANGER ZONE (tamara) This works and is dangerous. At the moment will delete Flux and // other pods unless the relevant manifests are part of the user repo. Needs a lot of thought // before this cleanup cluster feature can be unleashed on the world. if deletes { - for id, res := range otherClusterResources { - prepareSyncDelete(logger, repoResources, id, res, &sync) - } - for id, res := range nsClusterResources { + for id, res := range clusterResources { prepareSyncDelete(logger, repoResources, id, res, &sync) } } - // To avoid errors due to a non existent namespace if a resource in that namespace is created first, - // create Namespace objects first - for id, res := range nsRepoResources { - prepareSyncApply(logger, clusterResources, id, res, &sync) - } - for id, res := range otherRepoResources { + for id, res := range repoResources { prepareSyncApply(logger, clusterResources, id, res, &sync) } return clus.Sync(sync) } -func separateResourcesByType(resources map[string]resource.Resource) (map[string]resource.Resource, map[string]resource.Resource) { - if len(resources) == 0 { - return nil, nil - } - nsResources := make(map[string]resource.Resource) - otherResources := make(map[string]resource.Resource) - for id, res := range resources { - _, kind, _ := res.ResourceID().Components() - if kind == "namespace" { - nsResources[id] = res - } else { - otherResources[id] = res - } - } - return nsResources, otherResources -} - func prepareSyncDelete(logger log.Logger, repoResources map[string]resource.Resource, id string, res resource.Resource, sync *cluster.SyncDef) { if len(repoResources) == 0 { return diff --git a/sync/sync_test.go b/sync/sync_test.go index f15c5d0d5..364c7a3c3 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -66,77 +66,6 @@ func TestSync(t *testing.T) { checkClusterMatchesFiles(t, manifests, clus, checkout.ManifestDir()) } -func TestSeparateByType(t *testing.T) { - var tests = []struct { - msg string - resMap map[string]resource.Resource - expectedNS map[string]resource.Resource - expectedOthers map[string]resource.Resource - }{ - { - msg: "No resources", - resMap: make(map[string]resource.Resource), - expectedNS: nil, - expectedOthers: nil, - }, { - msg: "Only namespace resources", - resMap: map[string]resource.Resource{ - "res1": mockResourceWithoutIgnorePolicy("namespace", "ns1", "ns1"), - "res2": mockResourceWithoutIgnorePolicy("namespace", "ns2", "ns2"), - "res3": mockResourceWithoutIgnorePolicy("namespace", "ns3", "ns3"), - }, - expectedNS: map[string]resource.Resource{ - "res1": mockResourceWithoutIgnorePolicy("namespace", "ns1", "ns1"), - "res2": mockResourceWithoutIgnorePolicy("namespace", "ns2", "ns2"), - "res3": mockResourceWithoutIgnorePolicy("namespace", "ns3", "ns3"), - }, - expectedOthers: make(map[string]resource.Resource), - }, { - msg: "Only non-namespace resources", - resMap: map[string]resource.Resource{ - "res1": mockResourceWithoutIgnorePolicy("deployment", "default", "ns1"), - "res2": mockResourceWithoutIgnorePolicy("deployment", "ns1", "ns2"), - "res3": mockResourceWithoutIgnorePolicy("deployment", "ns2", "ns3"), - }, - expectedNS: make(map[string]resource.Resource), - expectedOthers: map[string]resource.Resource{ - "res1": mockResourceWithoutIgnorePolicy("deployment", "default", "ns1"), - "res2": mockResourceWithoutIgnorePolicy("deployment", "ns1", "ns2"), - "res3": mockResourceWithoutIgnorePolicy("deployment", "ns2", "ns3"), - }, - }, { - msg: "Mixture of resources", - resMap: map[string]resource.Resource{ - "res1": mockResourceWithoutIgnorePolicy("namespace", "ns1", "ns1"), - "res2": mockResourceWithoutIgnorePolicy("namespace", "ns2", "ns2"), - "res3": mockResourceWithoutIgnorePolicy("deployment", "default", "ns1"), - "res4": mockResourceWithoutIgnorePolicy("secret", "ns1", "ns2"), - "res5": mockResourceWithoutIgnorePolicy("service", "ns2", "ns2"), - }, - expectedNS: map[string]resource.Resource{ - "res1": mockResourceWithoutIgnorePolicy("namespace", "ns1", "ns1"), - "res2": mockResourceWithoutIgnorePolicy("namespace", "ns2", "ns2"), - }, - expectedOthers: map[string]resource.Resource{ - "res3": mockResourceWithoutIgnorePolicy("deployment", "default", "ns1"), - "res4": mockResourceWithoutIgnorePolicy("secret", "ns1", "ns2"), - "res5": mockResourceWithoutIgnorePolicy("service", "ns2", "ns2"), - }, - }, - } - - for _, sc := range tests { - r1, r2 := separateResourcesByType(sc.resMap) - - if !reflect.DeepEqual(sc.expectedNS, r1) { - t.Errorf("%s: expected %+v, got %+v\n", sc.msg, sc.expectedNS, r1) - } - if !reflect.DeepEqual(sc.expectedOthers, r2) { - t.Errorf("%s: expected %+v, got %+v\n", sc.msg, sc.expectedOthers, r2) - } - } -} - func TestPrepareSyncDelete(t *testing.T) { var tests = []struct { msg string