diff --git a/controller/k8s/test_helper.go b/controller/k8s/test_helper.go index 7345f20c576ca..b34da0c2f963e 100644 --- a/controller/k8s/test_helper.go +++ b/controller/k8s/test_helper.go @@ -6,6 +6,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" clientsetscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/metadata/fake" @@ -14,40 +15,40 @@ import ( // NewFakeAPI provides a mock Kubernetes API for testing. func NewFakeAPI(configs ...string) (*API, error) { - clientSet, _, _, spClientSet, err := k8s.NewFakeClientSets(configs...) + clientSet, _, _, spClientSet, dynamicClient, err := k8s.NewFakeClientSets(configs...) if err != nil { return nil, err } - return NewFakeClusterScopedAPI(clientSet, spClientSet), nil + return NewFakeClusterScopedAPI(clientSet, spClientSet, dynamicClient), nil } // NewFakeAPI provides a mock Kubernetes API for testing. func NewFakeAPIWithActions(configs ...string) (*API, func() []testing.Action, error) { - clientSet, _, _, spClientSet, err := k8s.NewFakeClientSets(configs...) + clientSet, _, _, spClientSet, dynamicClient, err := k8s.NewFakeClientSets(configs...) if err != nil { return nil, nil, err } - return NewFakeClusterScopedAPI(clientSet, spClientSet), clientSet.Actions, nil + return NewFakeClusterScopedAPI(clientSet, spClientSet, dynamicClient), clientSet.Actions, nil } // NewFakeAPIWithL5dClient provides a mock Kubernetes API for testing like // NewFakeAPI, but it also returns the mock client for linkerd CRDs func NewFakeAPIWithL5dClient(configs ...string) (*API, l5dcrdclient.Interface, error) { - clientSet, _, _, l5dClientSet, err := k8s.NewFakeClientSets(configs...) + clientSet, _, _, l5dClientSet, dynamicClient, err := k8s.NewFakeClientSets(configs...) if err != nil { return nil, nil, err } - return NewFakeClusterScopedAPI(clientSet, l5dClientSet), l5dClientSet, nil + return NewFakeClusterScopedAPI(clientSet, l5dClientSet, dynamicClient), l5dClientSet, nil } // NewFakeClusterScopedAPI provides a mock Kubernetes API for testing. -func NewFakeClusterScopedAPI(clientSet kubernetes.Interface, l5dClientSet l5dcrdclient.Interface) *API { +func NewFakeClusterScopedAPI(clientSet kubernetes.Interface, l5dClientSet l5dcrdclient.Interface, dynamicClient dynamic.Interface) *API { return NewClusterScopedAPI( clientSet, - nil, + dynamicClient, l5dClientSet, "fake", CJ, diff --git a/multicluster/charts/linkerd-multicluster-link/templates/service-mirror.yaml b/multicluster/charts/linkerd-multicluster-link/templates/service-mirror.yaml index 5800c59b25c1e..fe6b301b7d4c0 100644 --- a/multicluster/charts/linkerd-multicluster-link/templates/service-mirror.yaml +++ b/multicluster/charts/linkerd-multicluster-link/templates/service-mirror.yaml @@ -56,6 +56,9 @@ rules: - apiGroups: ["multicluster.linkerd.io"] resources: ["links"] verbs: ["list", "get", "watch"] + - apiGroups: ["multicluster.linkerd.io"] + resources: ["links/status"] + verbs: ["update"] - apiGroups: ["coordination.k8s.io"] resources: ["leases"] verbs: ["create", "get", "update", "patch"] diff --git a/multicluster/charts/linkerd-multicluster/README.md b/multicluster/charts/linkerd-multicluster/README.md index 79d9e05d6b02b..afcd5b66a5f3f 100644 --- a/multicluster/charts/linkerd-multicluster/README.md +++ b/multicluster/charts/linkerd-multicluster/README.md @@ -94,12 +94,25 @@ Kubernetes: `>=1.22.0-0` | imagePullSecrets | list | `[]` | For Private docker registries, authentication is needed. Registry secrets are applied to the respective service accounts | | linkerdNamespace | string | `"linkerd"` | Namespace of linkerd installation | | linkerdVersion | string | `"linkerdVersionValue"` | Control plane version | +| localServiceMirror.GID | int | `2103` | Group id under which the Service Mirror shall be ran | +| localServiceMirror.UID | int | `2103` | User id under which the Service Mirror shall be ran | +| localServiceMirror.enablePprof | bool | `false` | enables the use of pprof endpoints on control plane component's admin servers | +| localServiceMirror.federatedServiceSelector | string | `"mirror.linkerd.io/federated=member"` | Label selector for federated service members in the local cluster. | +| localServiceMirror.image.name | string | `"cr.l5d.io/linkerd/controller"` | Docker image for the Service mirror component (uses the Linkerd controller image) | +| localServiceMirror.image.pullPolicy | string | imagePullPolicy | Pull policy for the Service mirror container image | +| localServiceMirror.image.version | string | linkerdVersion | Tag for the Service mirror container image | +| localServiceMirror.logFormat | string | `"plain"` | Log format (`plain` or `json`) | +| localServiceMirror.logLevel | string | `"info"` | Log level for the Multicluster components | +| localServiceMirror.replicas | int | `1` | Number of local service mirror replicas to run | +| localServiceMirror.resources | object | `{}` | Resources for the Service mirror container | +| localServiceMirror.serviceMirrorRetryLimit | int | `3` | Number of times local service mirror updates are allowed to be requeued (retried) | | namespaceMetadata.image.name | string | `"extension-init"` | Docker image name for the namespace-metadata instance | | namespaceMetadata.image.pullPolicy | string | imagePullPolicy | Pull policy for the namespace-metadata instance | | namespaceMetadata.image.registry | string | `"cr.l5d.io/linkerd"` | Docker registry for the namespace-metadata instance | | namespaceMetadata.image.tag | string | `"v0.1.1"` | Docker image tag for the namespace-metadata instance | | namespaceMetadata.nodeSelector | object | `{}` | Node selectors for the namespace-metadata instance | | namespaceMetadata.tolerations | list | `[]` | Tolerations for the namespace-metadata instance | +| podAnnotations | object | `{}` | Additional annotations to add to all pods | | podLabels | object | `{}` | Additional labels to add to all pods | | proxyOutboundPort | int | `4140` | The port on which the proxy accepts outbound traffic | | remoteMirrorServiceAccount | bool | `true` | If the remote mirror service account should be installed | diff --git a/multicluster/charts/linkerd-multicluster/templates/local-service-mirror.yaml b/multicluster/charts/linkerd-multicluster/templates/local-service-mirror.yaml new file mode 100644 index 0000000000000..9723c0e79f62f --- /dev/null +++ b/multicluster/charts/linkerd-multicluster/templates/local-service-mirror.yaml @@ -0,0 +1,165 @@ +--- +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: linkerd-local-service-mirror-access-local-resources + labels: + linkerd.io/extension: multicluster + component: local-service-mirror + {{- with .Values.commonLabels }}{{ toYaml . | trim | nindent 4 }}{{- end }} +rules: +- apiGroups: [""] + resources: ["endpoints", "services"] + verbs: ["list", "get", "watch", "create", "delete", "update"] +- apiGroups: [""] + resources: ["namespaces"] + verbs: ["list", "get", "watch"] +- apiGroups: ["coordination.k8s.io"] + resources: ["leases"] + verbs: ["create", "get", "update", "patch"] +- apiGroups: ["multicluster.linkerd.io"] + resources: ["links"] + verbs: ["list", "get", "watch"] +- apiGroups: ["multicluster.linkerd.io"] + resources: ["links/status"] + verbs: ["update"] +--- +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: linkerd-local-service-mirror-access-local-resources + labels: + linkerd.io/extension: multicluster + component: local-service-mirror + {{- with .Values.commonLabels }}{{ toYaml . | trim | nindent 4 }}{{- end }} +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: linkerd-local-service-mirror-access-local-resources +subjects: +- kind: ServiceAccount + name: linkerd-local-service-mirror + namespace: {{.Release.Namespace}} +--- +kind: ServiceAccount +apiVersion: v1 +metadata: + name: linkerd-local-service-mirror + namespace: {{ .Release.Namespace }} + labels: + linkerd.io/extension: multicluster + component: local-service-mirror + {{- with .Values.commonLabels }}{{ toYaml . | trim | nindent 4 }}{{- end }} +{{- include "partials.image-pull-secrets" .Values.imagePullSecrets }} +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + linkerd.io/extension: multicluster + component: local-service-mirror + {{- with .Values.commonLabels }}{{ toYaml . | trim | nindent 4 }}{{- end }} + name: linkerd-local-service-mirror + namespace: {{ .Release.Namespace }} +spec: + replicas: {{ .Values.localServiceMirror.replicas }} + revisionHistoryLimit: {{.Values.revisionHistoryLimit}} + selector: + matchLabels: + component: local-service-mirror + {{- if .Values.enablePodAntiAffinity }} + strategy: + rollingUpdate: + maxUnavailable: 1 + {{- end }} + template: + metadata: + annotations: + linkerd.io/inject: enabled + cluster-autoscaler.kubernetes.io/safe-to-evict: "true" + config.alpha.linkerd.io/proxy-wait-before-exit-seconds: "0" + {{- with .Values.podAnnotations }}{{ toYaml . | trim | nindent 8 }}{{- end }} + labels: + linkerd.io/extension: multicluster + component: local-service-mirror + {{- with .Values.podLabels }}{{ toYaml . | trim | nindent 8 }}{{- end }} + spec: + {{- if .Values.enablePodAntiAffinity}} + {{- with $tree := deepCopy . }} + {{- $_ := set $tree "component" "local-service-mirror" -}} + {{- $_ := set $tree "label" "component" -}} + {{- include "linkerd.affinity" $tree | nindent 6 }} + {{- end }} + {{- end }} + automountServiceAccountToken: false + containers: + - args: + - service-mirror + - -log-level={{.Values.localServiceMirror.logLevel}} + - -log-format={{.Values.localServiceMirror.logFormat}} + - -event-requeue-limit={{.Values.localServiceMirror.serviceMirrorRetryLimit}} + - -namespace={{.Release.Namespace}} + - -enable-pprof={{.Values.localServiceMirror.enablePprof | default false}} + - -local-mirror + - -federated-service-selector={{.Values.localServiceMirror.federatedServiceSelector}} + {{- if or .Values.localServiceMirror.additionalEnv .Values.localServiceMirror.experimentalEnv }} + env: + {{- with .Values.localServiceMirror.additionalEnv }} + {{- toYaml . | nindent 8 -}} + {{- end }} + {{- with .Values.localServiceMirror.experimentalEnv }} + {{- toYaml . | nindent 8 -}} + {{- end }} + {{- end }} + image: {{.Values.localServiceMirror.image.name}}:{{.Values.localServiceMirror.image.version}} + name: service-mirror + securityContext: + allowPrivilegeEscalation: false + capabilities: + drop: + - ALL + readOnlyRootFilesystem: true + runAsNonRoot: true + runAsUser: {{.Values.localServiceMirror.UID}} + runAsGroup: {{.Values.localServiceMirror.GID}} + seccompProfile: + type: RuntimeDefault + volumeMounts: + - mountPath: /var/run/secrets/kubernetes.io/serviceaccount + name: kube-api-access + readOnly: true + ports: + - containerPort: 9999 + name: admin-http + {{- with .Values.localServiceMirror.resources }} + resources: {{ toYaml . | nindent 10 }} + {{- end }} + securityContext: + seccompProfile: + type: RuntimeDefault + serviceAccountName: linkerd-local-service-mirror + volumes: + - {{- include "partials.volumes.manual-mount-service-account-token" . | indent 8 | trimPrefix (repeat 7 " ") }} + {{- with .Values.nodeSelector }} + nodeSelector: {{ toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.tolerations }} + tolerations: {{ toYaml . | nindent 6 }} + {{- end }} +{{- if .Values.enablePodAntiAffinity }} +--- +kind: PodDisruptionBudget +apiVersion: policy/v1 +metadata: + name: linkerd-local-service-mirror + namespace: {{ .Release.Namespace }} + labels: + component: local-service-mirror + annotations: + {{ include "partials.annotations.created-by" . }} +spec: + maxUnavailable: 1 + selector: + matchLabels: + component: local-service-mirror +{{- end}} diff --git a/multicluster/charts/linkerd-multicluster/values.yaml b/multicluster/charts/linkerd-multicluster/values.yaml index 1433166692257..64c184acb8d3c 100644 --- a/multicluster/charts/linkerd-multicluster/values.yaml +++ b/multicluster/charts/linkerd-multicluster/values.yaml @@ -51,6 +51,8 @@ gateway: # -- Control plane version linkerdVersion: linkerdVersionValue +# -- Additional annotations to add to all pods +podAnnotations: {} # -- Additional labels to add to all pods podLabels: {} # -- Labels to apply to all resources @@ -112,3 +114,43 @@ createNamespaceMetadataJob: true # -- Specifies the number of old ReplicaSets to retain to allow rollback. revisionHistoryLimit: 10 + +localServiceMirror: + # -- Number of times local service mirror updates are allowed to be requeued + # (retried) + serviceMirrorRetryLimit: 3 + + # -- Label selector for federated service members in the local cluster. + federatedServiceSelector: "mirror.linkerd.io/federated=member" + + # -- Number of local service mirror replicas to run + replicas: 1 + + image: + # -- Docker image for the Service mirror component (uses the Linkerd controller + # image) + name: cr.l5d.io/linkerd/controller + # -- Pull policy for the Service mirror container image + # @default -- imagePullPolicy + pullPolicy: "" + # -- Tag for the Service mirror container image + # @default -- linkerdVersion + version: linkerdVersionValue + + # -- Log level for the Multicluster components + logLevel: info + + # -- Log format (`plain` or `json`) + logFormat: plain + + # -- enables the use of pprof endpoints on control plane component's admin + # servers + enablePprof: false + + # -- User id under which the Service Mirror shall be ran + UID: 2103 + # -- Group id under which the Service Mirror shall be ran + GID: 2103 + + # -- Resources for the Service mirror container + resources: {} diff --git a/multicluster/cmd/install.go b/multicluster/cmd/install.go index 1aca89294e610..3bb48cdd8b420 100644 --- a/multicluster/cmd/install.go +++ b/multicluster/cmd/install.go @@ -45,6 +45,7 @@ var TemplatesMulticluster = []string{ "templates/remote-access-service-mirror-rbac.yaml", "templates/link-crd.yaml", "templates/service-mirror-policy.yaml", + "templates/local-service-mirror.yaml", } func newMulticlusterInstallCommand() *cobra.Command { @@ -232,6 +233,7 @@ func buildMulticlusterInstallValues(ctx context.Context, opts *multiclusterInsta return nil, err } + defaults.LocalServiceMirror.Image.Version = version.Version defaults.Gateway.Enabled = opts.gateway.Enabled defaults.Gateway.Port = opts.gateway.Port defaults.Gateway.Probe.Seconds = opts.gateway.Probe.Seconds diff --git a/multicluster/cmd/link.go b/multicluster/cmd/link.go index 0fd8e55895aee..ae9e79f357866 100644 --- a/multicluster/cmd/link.go +++ b/multicluster/cmd/link.go @@ -40,24 +40,25 @@ const ( type ( linkOptions struct { - namespace string - clusterName string - apiServerAddress string - serviceAccountName string - gatewayName string - gatewayNamespace string - serviceMirrorRetryLimit uint32 - logLevel string - logFormat string - controlPlaneVersion string - dockerRegistry string - selector string - remoteDiscoverySelector string - gatewayAddresses string - gatewayPort uint32 - ha bool - enableGateway bool - output string + namespace string + clusterName string + apiServerAddress string + serviceAccountName string + gatewayName string + gatewayNamespace string + serviceMirrorRetryLimit uint32 + logLevel string + logFormat string + controlPlaneVersion string + dockerRegistry string + selector string + remoteDiscoverySelector string + federatedServiceSelector string + gatewayAddresses string + gatewayPort uint32 + ha bool + enableGateway bool + output string } ) @@ -237,6 +238,11 @@ A full list of configurable values can be found at https://github.com/linkerd/li return err } + federatedServiceSelector, err := metav1.ParseToLabelSelector(opts.federatedServiceSelector) + if err != nil { + return err + } + link := mc.Link{ Name: opts.clusterName, Namespace: opts.namespace, @@ -244,7 +250,8 @@ A full list of configurable values can be found at https://github.com/linkerd/li TargetClusterDomain: configMap.ClusterDomain, TargetClusterLinkerdNamespace: controlPlaneNamespace, ClusterCredentialsSecret: fmt.Sprintf("cluster-credentials-%s", opts.clusterName), - RemoteDiscoverySelector: *remoteDiscoverySelector, + RemoteDiscoverySelector: remoteDiscoverySelector, + FederatedServiceSelector: federatedServiceSelector, } // If there is a gateway in the exporting cluster, populate Link @@ -298,12 +305,10 @@ A full list of configurable values can be found at https://github.com/linkerd/li } link.GatewayPort = gatewayPort - selector, err := metav1.ParseToLabelSelector(opts.selector) + link.Selector, err = metav1.ParseToLabelSelector(opts.selector) if err != nil { return err } - - link.Selector = *selector } obj, err := link.ToUnstructured() @@ -384,6 +389,7 @@ A full list of configurable values can be found at https://github.com/linkerd/li fmt.Sprintf("Docker registry to pull service mirror controller image from ($%s)", flags.EnvOverrideDockerRegistry)) cmd.Flags().StringVarP(&opts.selector, "selector", "l", opts.selector, "Selector (label query) to filter which services in the target cluster to mirror") cmd.Flags().StringVar(&opts.remoteDiscoverySelector, "remote-discovery-selector", opts.remoteDiscoverySelector, "Selector (label query) to filter which services in the target cluster to mirror in remote discovery mode") + cmd.Flags().StringVar(&opts.federatedServiceSelector, "federated-service-selector", opts.federatedServiceSelector, "Selector (label query) for federated service members in the target cluster") cmd.Flags().StringVar(&opts.gatewayAddresses, "gateway-addresses", opts.gatewayAddresses, "If specified, overwrites gateway addresses when gateway service is not type LoadBalancer (comma separated list)") cmd.Flags().Uint32Var(&opts.gatewayPort, "gateway-port", opts.gatewayPort, "If specified, overwrites gateway port when gateway service is not type LoadBalancer") cmd.Flags().BoolVar(&opts.ha, "ha", opts.ha, "Enable HA configuration for the service-mirror deployment (default false)") @@ -482,19 +488,20 @@ func newLinkOptionsWithDefault() (*linkOptions, error) { } return &linkOptions{ - controlPlaneVersion: version.Version, - namespace: defaultMulticlusterNamespace, - dockerRegistry: pkgcmd.DefaultDockerRegistry, - serviceMirrorRetryLimit: defaults.ServiceMirrorRetryLimit, - logLevel: defaults.LogLevel, - logFormat: defaults.LogFormat, - selector: fmt.Sprintf("%s=%s", k8s.DefaultExportedServiceSelector, "true"), - remoteDiscoverySelector: fmt.Sprintf("%s=%s", k8s.DefaultExportedServiceSelector, "remote-discovery"), - gatewayAddresses: "", - gatewayPort: 0, - ha: false, - enableGateway: true, - output: "yaml", + controlPlaneVersion: version.Version, + namespace: defaultMulticlusterNamespace, + dockerRegistry: pkgcmd.DefaultDockerRegistry, + serviceMirrorRetryLimit: defaults.ServiceMirrorRetryLimit, + logLevel: defaults.LogLevel, + logFormat: defaults.LogFormat, + selector: fmt.Sprintf("%s=%s", k8s.DefaultExportedServiceSelector, "true"), + remoteDiscoverySelector: fmt.Sprintf("%s=%s", k8s.DefaultExportedServiceSelector, "remote-discovery"), + federatedServiceSelector: fmt.Sprintf("%s=%s", k8s.DefaultFederatedServiceSelector, "member"), + gatewayAddresses: "", + gatewayPort: 0, + ha: false, + enableGateway: true, + output: "yaml", }, nil } diff --git a/multicluster/cmd/service-mirror/main.go b/multicluster/cmd/service-mirror/main.go index 3067d483222ad..a20558885bb1e 100644 --- a/multicluster/cmd/service-mirror/main.go +++ b/multicluster/cmd/service-mirror/main.go @@ -54,6 +54,8 @@ func Main(args []string) { enableHeadlessSvc := cmd.Bool("enable-headless-services", false, "toggle support for headless service mirroring") enableNamespaceCreation := cmd.Bool("enable-namespace-creation", false, "toggle support for namespace creation") enablePprof := cmd.Bool("enable-pprof", false, "Enable pprof endpoints on the admin server") + localMirror := cmd.Bool("local-mirror", false, "watch the local cluster for federated service members") + federatedServiceSelector := cmd.String("federated-service-selector", k8s.DefaultFederatedServiceSelector, "Selector (label query) for federated service members in the local cluster") flags.ConfigureAndParse(cmd, args) linkName := cmd.Arg(0) @@ -86,11 +88,7 @@ func Main(args []string) { // // controllerK8sAPI is used by the cluster watcher to manage // mirror resources such as services, namespaces, and endpoints. - k8sAPI, err := k8s.NewAPI(*kubeConfigPath, "", "", []string{}, 0) - //TODO: Use can-i to check for required permissions - if err != nil { - log.Fatalf("Failed to initialize K8s API: %s", err) - } + controllerK8sAPI, err := controllerK8s.InitializeAPI( rootCtx, *kubeConfigPath, @@ -104,72 +102,99 @@ func Main(args []string) { log.Fatalf("Failed to initialize K8s API: %s", err) } - linkClient := k8sAPI.DynamicClient.Resource(multicluster.LinkGVR).Namespace(*namespace) metrics := servicemirror.NewProbeMetricVecs() controllerK8sAPI.Sync(nil) - ready = true - run := func(ctx context.Context) { - main: - for { - // Start link watch - linkWatch, err := linkClient.Watch(ctx, metav1.ListOptions{}) + + var run func(context.Context) + + if *localMirror { + run = func(ctx context.Context) { + err = startLocalClusterWatcher(ctx, *namespace, controllerK8sAPI, *requeueLimit, *repairPeriod, *enableHeadlessSvc, *enableNamespaceCreation, *federatedServiceSelector) if err != nil { - log.Fatalf("Failed to watch Link %s: %s", linkName, err) + log.Fatalf("Failed to start local cluster watcher: %s", err) } - results := linkWatch.ResultChan() - // Each time the link resource is updated, reload the config and restart the - // cluster watcher. + // ctx.Done() is a one-shot channel that will be closed once + // the context has been cancelled. Receiving from a closed + // channel yields the value immediately. + <-ctx.Done() + // The channel will be closed by the leader elector when a + // lease is lost, or by a background task handling SIGTERM. + // Before terminating the loop, stop the workers and set + // them to nil to release memory. + cleanupWorkers() + } + } else { + k8sAPI, err := k8s.NewAPI(*kubeConfigPath, "", "", []string{}, 0) + //TODO: Use can-i to check for required permissions + if err != nil { + log.Fatalf("Failed to initialize K8s API: %s", err) + } + linkClient := k8sAPI.DynamicClient.Resource(multicluster.LinkGVR).Namespace(*namespace) + + run = func(ctx context.Context) { + main: for { - select { - // ctx.Done() is a one-shot channel that will be closed once - // the context has been cancelled. Receiving from a closed - // channel yields the value immediately. - case <-ctx.Done(): - // The channel will be closed by the leader elector when a - // lease is lost, or by a background task handling SIGTERM. - // Before terminating the loop, stop the workers and set - // them to nil to release memory. - cleanupWorkers() - return - case event, ok := <-results: - if !ok { - log.Info("Link watch terminated; restarting watch") - continue main - } - switch obj := event.Object.(type) { - case *dynamic.Unstructured: - if obj.GetName() == linkName { - switch event.Type { - case watch.Added, watch.Modified: - link, err := multicluster.NewLink(*obj) - if err != nil { - log.Errorf("Failed to parse link %s: %s", linkName, err) - continue - } - log.Infof("Got updated link %s: %+v", linkName, link) - creds, err := loadCredentials(ctx, link, *namespace, k8sAPI) - if err != nil { - log.Errorf("Failed to load remote cluster credentials: %s", err) - } - err = restartClusterWatcher(ctx, link, *namespace, creds, controllerK8sAPI, *requeueLimit, *repairPeriod, metrics, *enableHeadlessSvc, *enableNamespaceCreation) - if err != nil { - // failed to restart cluster watcher; give a bit of slack - // and restart the link watch to give it another try - log.Error(err) - time.Sleep(linkWatchRestartAfter) - linkWatch.Stop() + // Start link watch + linkWatch, err := linkClient.Watch(ctx, metav1.ListOptions{}) + if err != nil { + log.Fatalf("Failed to watch Link %s: %s", linkName, err) + } + results := linkWatch.ResultChan() + + // Each time the link resource is updated, reload the config and restart the + // cluster watcher. + for { + select { + // ctx.Done() is a one-shot channel that will be closed once + // the context has been cancelled. Receiving from a closed + // channel yields the value immediately. + case <-ctx.Done(): + // The channel will be closed by the leader elector when a + // lease is lost, or by a background task handling SIGTERM. + // Before terminating the loop, stop the workers and set + // them to nil to release memory. + cleanupWorkers() + return + case event, ok := <-results: + if !ok { + log.Info("Link watch terminated; restarting watch") + continue main + } + switch obj := event.Object.(type) { + case *dynamic.Unstructured: + if obj.GetName() == linkName { + switch event.Type { + case watch.Added, watch.Modified: + link, err := multicluster.NewLink(*obj) + if err != nil { + log.Errorf("Failed to parse link %s: %s", linkName, err) + continue + } + log.Infof("Got updated link %s: %+v", linkName, link) + creds, err := loadCredentials(ctx, link, *namespace, k8sAPI) + if err != nil { + log.Errorf("Failed to load remote cluster credentials: %s", err) + } + err = restartClusterWatcher(ctx, link, *namespace, creds, controllerK8sAPI, *requeueLimit, *repairPeriod, metrics, *enableHeadlessSvc, *enableNamespaceCreation) + if err != nil { + // failed to restart cluster watcher; give a bit of slack + // and restart the link watch to give it another try + log.Error(err) + time.Sleep(linkWatchRestartAfter) + linkWatch.Stop() + } + case watch.Deleted: + log.Infof("Link %s deleted", linkName) + cleanupWorkers() + default: + log.Infof("Ignoring event type %s", event.Type) } - case watch.Deleted: - log.Infof("Link %s deleted", linkName) - cleanupWorkers() - default: - log.Infof("Ignoring event type %s", event.Type) } + default: + log.Errorf("Unknown object type detected: %+v", obj) } - default: - log.Errorf("Unknown object type detected: %+v", obj) } } } @@ -181,12 +206,16 @@ func Main(args []string) { log.Fatal("Failed to fetch 'HOSTNAME' environment variable") } + leaseName := fmt.Sprintf("service-mirror-write-%s", linkName) + if *localMirror { + leaseName = "local-service-mirror-write" + } lock := &resourcelock.LeaseLock{ LeaseMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("service-mirror-write-%s", linkName), + Name: leaseName, Namespace: *namespace, }, - Client: k8sAPI.CoordinationV1(), + Client: controllerK8sAPI.Client.CoordinationV1(), LockConfig: resourcelock.ResourceLockConfig{ Identity: hostname, }, @@ -305,11 +334,15 @@ func restartClusterWatcher( if err != nil { return fmt.Errorf("unable to parse kube config: %w", err) } + remoteAPI, err := controllerK8s.InitializeAPIForConfig(ctx, cfg, false, link.TargetClusterName, controllerK8s.Svc, controllerK8s.Endpoint) + if err != nil { + return fmt.Errorf("cannot initialize api for target cluster %s: %w", link.TargetClusterName, err) + } cw, err := servicemirror.NewRemoteClusterServiceWatcher( ctx, namespace, controllerK8sAPI, - cfg, + remoteAPI, &link, requeueLimit, repairPeriod, @@ -328,3 +361,50 @@ func restartClusterWatcher( return nil } + +func startLocalClusterWatcher( + ctx context.Context, + namespace string, + controllerK8sAPI *controllerK8s.API, + requeueLimit int, + repairPeriod time.Duration, + enableHeadlessSvc bool, + enableNamespaceCreation bool, + federatedServiceSelector string, +) error { + federatedLabelSelector, err := metav1.ParseToLabelSelector(federatedServiceSelector) + if err != nil { + return fmt.Errorf("failed to parse federated service selector: %w", err) + } + + link := multicluster.Link{ + Name: "local", + Namespace: namespace, + TargetClusterName: "", + Selector: nil, + RemoteDiscoverySelector: nil, + FederatedServiceSelector: federatedLabelSelector, + } + cw, err := servicemirror.NewRemoteClusterServiceWatcher( + ctx, + namespace, + controllerK8sAPI, + controllerK8sAPI, + &link, + requeueLimit, + repairPeriod, + make(chan bool), + enableHeadlessSvc, + enableNamespaceCreation, + ) + if err != nil { + return fmt.Errorf("unable to create cluster watcher: %w", err) + } + clusterWatcher = cw + err = clusterWatcher.Start(ctx) + if err != nil { + return fmt.Errorf("failed to start cluster watcher: %w", err) + } + + return nil +} diff --git a/multicluster/cmd/testdata/install_default.golden b/multicluster/cmd/testdata/install_default.golden index 4ffa5d6442bed..32b4e1aea736e 100644 --- a/multicluster/cmd/testdata/install_default.golden +++ b/multicluster/cmd/testdata/install_default.golden @@ -832,3 +832,131 @@ spec: - kind: ServiceAccount name: prometheus namespace: linkerd-viz +--- +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: linkerd-local-service-mirror-access-local-resources + labels: + linkerd.io/extension: multicluster + component: local-service-mirror +rules: +- apiGroups: [""] + resources: ["endpoints", "services"] + verbs: ["list", "get", "watch", "create", "delete", "update"] +- apiGroups: [""] + resources: ["namespaces"] + verbs: ["list", "get", "watch"] +- apiGroups: ["coordination.k8s.io"] + resources: ["leases"] + verbs: ["create", "get", "update", "patch"] +- apiGroups: ["multicluster.linkerd.io"] + resources: ["links"] + verbs: ["list", "get", "watch"] +- apiGroups: ["multicluster.linkerd.io"] + resources: ["links/status"] + verbs: ["update"] +--- +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: linkerd-local-service-mirror-access-local-resources + labels: + linkerd.io/extension: multicluster + component: local-service-mirror +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: linkerd-local-service-mirror-access-local-resources +subjects: +- kind: ServiceAccount + name: linkerd-local-service-mirror + namespace: linkerd-multicluster +--- +kind: ServiceAccount +apiVersion: v1 +metadata: + name: linkerd-local-service-mirror + namespace: linkerd-multicluster + labels: + linkerd.io/extension: multicluster + component: local-service-mirror +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + linkerd.io/extension: multicluster + component: local-service-mirror + name: linkerd-local-service-mirror + namespace: linkerd-multicluster +spec: + replicas: 1 + revisionHistoryLimit: 10 + selector: + matchLabels: + component: local-service-mirror + template: + metadata: + annotations: + linkerd.io/inject: enabled + cluster-autoscaler.kubernetes.io/safe-to-evict: "true" + config.alpha.linkerd.io/proxy-wait-before-exit-seconds: "0" + labels: + linkerd.io/extension: multicluster + component: local-service-mirror + spec: + automountServiceAccountToken: false + containers: + - args: + - service-mirror + - -log-level=info + - -log-format=plain + - -event-requeue-limit=3 + - -namespace=linkerd-multicluster + - -enable-pprof=false + - -local-mirror + - -federated-service-selector=mirror.linkerd.io/federated=member + image: cr.l5d.io/linkerd/controller:linkerdVersionValue + name: service-mirror + securityContext: + allowPrivilegeEscalation: false + capabilities: + drop: + - ALL + readOnlyRootFilesystem: true + runAsNonRoot: true + runAsUser: 2103 + runAsGroup: 2103 + seccompProfile: + type: RuntimeDefault + volumeMounts: + - mountPath: /var/run/secrets/kubernetes.io/serviceaccount + name: kube-api-access + readOnly: true + ports: + - containerPort: 9999 + name: admin-http + securityContext: + seccompProfile: + type: RuntimeDefault + serviceAccountName: linkerd-local-service-mirror + volumes: + - name: kube-api-access + projected: + defaultMode: 420 + sources: + - serviceAccountToken: + expirationSeconds: 3607 + path: token + - configMap: + items: + - key: ca.crt + path: ca.crt + name: kube-root-ca.crt + - downwardAPI: + items: + - fieldRef: + apiVersion: v1 + fieldPath: metadata.namespace + path: namespace diff --git a/multicluster/cmd/testdata/install_ha.golden b/multicluster/cmd/testdata/install_ha.golden index a4a84d67e2ff1..88488f7879cd5 100644 --- a/multicluster/cmd/testdata/install_ha.golden +++ b/multicluster/cmd/testdata/install_ha.golden @@ -904,3 +904,169 @@ spec: - kind: ServiceAccount name: prometheus namespace: linkerd-viz +--- +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: linkerd-local-service-mirror-access-local-resources + labels: + linkerd.io/extension: multicluster + component: local-service-mirror +rules: +- apiGroups: [""] + resources: ["endpoints", "services"] + verbs: ["list", "get", "watch", "create", "delete", "update"] +- apiGroups: [""] + resources: ["namespaces"] + verbs: ["list", "get", "watch"] +- apiGroups: ["coordination.k8s.io"] + resources: ["leases"] + verbs: ["create", "get", "update", "patch"] +- apiGroups: ["multicluster.linkerd.io"] + resources: ["links"] + verbs: ["list", "get", "watch"] +- apiGroups: ["multicluster.linkerd.io"] + resources: ["links/status"] + verbs: ["update"] +--- +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: linkerd-local-service-mirror-access-local-resources + labels: + linkerd.io/extension: multicluster + component: local-service-mirror +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: linkerd-local-service-mirror-access-local-resources +subjects: +- kind: ServiceAccount + name: linkerd-local-service-mirror + namespace: linkerd-multicluster +--- +kind: ServiceAccount +apiVersion: v1 +metadata: + name: linkerd-local-service-mirror + namespace: linkerd-multicluster + labels: + linkerd.io/extension: multicluster + component: local-service-mirror +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + linkerd.io/extension: multicluster + component: local-service-mirror + name: linkerd-local-service-mirror + namespace: linkerd-multicluster +spec: + replicas: 1 + revisionHistoryLimit: 10 + selector: + matchLabels: + component: local-service-mirror + strategy: + rollingUpdate: + maxUnavailable: 1 + template: + metadata: + annotations: + linkerd.io/inject: enabled + cluster-autoscaler.kubernetes.io/safe-to-evict: "true" + config.alpha.linkerd.io/proxy-wait-before-exit-seconds: "0" + labels: + linkerd.io/extension: multicluster + component: local-service-mirror + spec: + affinity: + podAntiAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - podAffinityTerm: + labelSelector: + matchExpressions: + - key: component + operator: In + values: + - local-service-mirror + topologyKey: topology.kubernetes.io/zone + weight: 100 + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchExpressions: + - key: component + operator: In + values: + - local-service-mirror + topologyKey: kubernetes.io/hostname + automountServiceAccountToken: false + containers: + - args: + - service-mirror + - -log-level=info + - -log-format=plain + - -event-requeue-limit=3 + - -namespace=linkerd-multicluster + - -enable-pprof=false + - -local-mirror + - -federated-service-selector=mirror.linkerd.io/federated=member + image: cr.l5d.io/linkerd/controller:linkerdVersionValue + name: service-mirror + securityContext: + allowPrivilegeEscalation: false + capabilities: + drop: + - ALL + readOnlyRootFilesystem: true + runAsNonRoot: true + runAsUser: 2103 + runAsGroup: 2103 + seccompProfile: + type: RuntimeDefault + volumeMounts: + - mountPath: /var/run/secrets/kubernetes.io/serviceaccount + name: kube-api-access + readOnly: true + ports: + - containerPort: 9999 + name: admin-http + securityContext: + seccompProfile: + type: RuntimeDefault + serviceAccountName: linkerd-local-service-mirror + volumes: + - name: kube-api-access + projected: + defaultMode: 420 + sources: + - serviceAccountToken: + expirationSeconds: 3607 + path: token + - configMap: + items: + - key: ca.crt + path: ca.crt + name: kube-root-ca.crt + - downwardAPI: + items: + - fieldRef: + apiVersion: v1 + fieldPath: metadata.namespace + path: namespace +--- +kind: PodDisruptionBudget +apiVersion: policy/v1 +metadata: + name: linkerd-local-service-mirror + namespace: linkerd-multicluster + labels: + component: local-service-mirror + annotations: + linkerd.io/created-by: linkerd/helm linkerdVersionValue +spec: + maxUnavailable: 1 + selector: + matchLabels: + component: local-service-mirror diff --git a/multicluster/cmd/testdata/install_psp.golden b/multicluster/cmd/testdata/install_psp.golden index 2b13a66c64584..7a91e393f025b 100644 --- a/multicluster/cmd/testdata/install_psp.golden +++ b/multicluster/cmd/testdata/install_psp.golden @@ -866,3 +866,131 @@ spec: - kind: ServiceAccount name: prometheus namespace: linkerd-viz +--- +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: linkerd-local-service-mirror-access-local-resources + labels: + linkerd.io/extension: multicluster + component: local-service-mirror +rules: +- apiGroups: [""] + resources: ["endpoints", "services"] + verbs: ["list", "get", "watch", "create", "delete", "update"] +- apiGroups: [""] + resources: ["namespaces"] + verbs: ["list", "get", "watch"] +- apiGroups: ["coordination.k8s.io"] + resources: ["leases"] + verbs: ["create", "get", "update", "patch"] +- apiGroups: ["multicluster.linkerd.io"] + resources: ["links"] + verbs: ["list", "get", "watch"] +- apiGroups: ["multicluster.linkerd.io"] + resources: ["links/status"] + verbs: ["update"] +--- +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: linkerd-local-service-mirror-access-local-resources + labels: + linkerd.io/extension: multicluster + component: local-service-mirror +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: linkerd-local-service-mirror-access-local-resources +subjects: +- kind: ServiceAccount + name: linkerd-local-service-mirror + namespace: linkerd-multicluster +--- +kind: ServiceAccount +apiVersion: v1 +metadata: + name: linkerd-local-service-mirror + namespace: linkerd-multicluster + labels: + linkerd.io/extension: multicluster + component: local-service-mirror +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + linkerd.io/extension: multicluster + component: local-service-mirror + name: linkerd-local-service-mirror + namespace: linkerd-multicluster +spec: + replicas: 1 + revisionHistoryLimit: 10 + selector: + matchLabels: + component: local-service-mirror + template: + metadata: + annotations: + linkerd.io/inject: enabled + cluster-autoscaler.kubernetes.io/safe-to-evict: "true" + config.alpha.linkerd.io/proxy-wait-before-exit-seconds: "0" + labels: + linkerd.io/extension: multicluster + component: local-service-mirror + spec: + automountServiceAccountToken: false + containers: + - args: + - service-mirror + - -log-level=info + - -log-format=plain + - -event-requeue-limit=3 + - -namespace=linkerd-multicluster + - -enable-pprof=false + - -local-mirror + - -federated-service-selector=mirror.linkerd.io/federated=member + image: cr.l5d.io/linkerd/controller:linkerdVersionValue + name: service-mirror + securityContext: + allowPrivilegeEscalation: false + capabilities: + drop: + - ALL + readOnlyRootFilesystem: true + runAsNonRoot: true + runAsUser: 2103 + runAsGroup: 2103 + seccompProfile: + type: RuntimeDefault + volumeMounts: + - mountPath: /var/run/secrets/kubernetes.io/serviceaccount + name: kube-api-access + readOnly: true + ports: + - containerPort: 9999 + name: admin-http + securityContext: + seccompProfile: + type: RuntimeDefault + serviceAccountName: linkerd-local-service-mirror + volumes: + - name: kube-api-access + projected: + defaultMode: 420 + sources: + - serviceAccountToken: + expirationSeconds: 3607 + path: token + - configMap: + items: + - key: ca.crt + path: ca.crt + name: kube-root-ca.crt + - downwardAPI: + items: + - fieldRef: + apiVersion: v1 + fieldPath: metadata.namespace + path: namespace diff --git a/multicluster/cmd/testdata/service_mirror_default.golden b/multicluster/cmd/testdata/service_mirror_default.golden index 2c6261bc819c5..7fe4138740971 100644 --- a/multicluster/cmd/testdata/service_mirror_default.golden +++ b/multicluster/cmd/testdata/service_mirror_default.golden @@ -48,6 +48,9 @@ rules: - apiGroups: ["multicluster.linkerd.io"] resources: ["links"] verbs: ["list", "get", "watch"] + - apiGroups: ["multicluster.linkerd.io"] + resources: ["links/status"] + verbs: ["update"] - apiGroups: ["coordination.k8s.io"] resources: ["leases"] verbs: ["create", "get", "update", "patch"] diff --git a/multicluster/cmd/testdata/service_mirror_ha.golden b/multicluster/cmd/testdata/service_mirror_ha.golden index 5530bd946e3a7..09d26266a3cdd 100644 --- a/multicluster/cmd/testdata/service_mirror_ha.golden +++ b/multicluster/cmd/testdata/service_mirror_ha.golden @@ -48,6 +48,9 @@ rules: - apiGroups: ["multicluster.linkerd.io"] resources: ["links"] verbs: ["list", "get", "watch"] + - apiGroups: ["multicluster.linkerd.io"] + resources: ["links/status"] + verbs: ["update"] - apiGroups: ["coordination.k8s.io"] resources: ["leases"] verbs: ["create", "get", "update", "patch"] diff --git a/multicluster/service-mirror/cluster_watcher.go b/multicluster/service-mirror/cluster_watcher.go index def4795be49e9..ec8179b4812ae 100644 --- a/multicluster/service-mirror/cluster_watcher.go +++ b/multicluster/service-mirror/cluster_watcher.go @@ -3,6 +3,7 @@ package servicemirror import ( "context" "errors" + "flag" "fmt" "net" "sort" @@ -17,10 +18,10 @@ import ( corev1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" @@ -28,6 +29,11 @@ import ( const ( eventTypeSkipped = "ServiceMirroringSkipped" + + reasonMirrored = "Mirrored" + reasonInvalidService = "InvalidService" + reasonError = "Error" + reasonMissingNamespace = "MissingNamespace" ) type ( @@ -64,25 +70,45 @@ type ( nsHandler cache.ResourceEventHandlerRegistration } - // RemoteServiceCreated is generated whenever a remote service is created Observing + // RemoteServiceExported is generated whenever a remote service is created Observing // this event means that the service in question is not mirrored atm - RemoteServiceCreated struct { + RemoteServiceExported struct { service *corev1.Service } - // RemoteServiceUpdated is generated when we see something about an already + // CreateFederatedService is generated whenever a remote service joins a + // federated service and the local federated service does not exist yet. + CreateFederatedService struct { + service *corev1.Service + } + + // RemoteExportedServiceUpdated is generated when we see something about an already // mirrored service change on the remote cluster. In that case we need to // reconcile. Most importantly we need to keep track of exposed ports // and gateway association changes. - RemoteServiceUpdated struct { + RemoteExportedServiceUpdated struct { localService *corev1.Service localEndpoints *corev1.Endpoints remoteUpdate *corev1.Service } - // RemoteServiceDeleted when a remote service is going away or it is not + // RemoteServiceJoinedFederatedService is generated when a remote server + // joins a federated service and the local federated service already exists. + RemoteServiceJoinsFederatedService struct { + localService *corev1.Service + remoteUpdate *corev1.Service + } + + // RemoteServiceUnexported when a remote service is going away or it is not // considered mirrored anymore - RemoteServiceDeleted struct { + RemoteServiceUnexported struct { + Name string + Namespace string + } + + // RemoteServiceLeavesFederatedService when a remote service is going away or + // it is no longer part of the federated service + RemoteServiceLeavesFederatedService struct { Name string Namespace string } @@ -163,7 +189,7 @@ func NewRemoteClusterServiceWatcher( ctx context.Context, serviceMirrorNamespace string, localAPI *k8s.API, - cfg *rest.Config, + remoteAPI *k8s.API, link *multicluster.Link, requeueLimit int, repairPeriod time.Duration, @@ -171,14 +197,10 @@ func NewRemoteClusterServiceWatcher( enableHeadlessSvc bool, enableNamespaceCreation bool, ) (*RemoteClusterServiceWatcher, error) { - remoteAPI, err := k8s.InitializeAPIForConfig(ctx, cfg, false, clusterName, k8s.Svc, k8s.Endpoint) - if err != nil { - return nil, fmt.Errorf("cannot initialize api for target cluster %s: %w", clusterName, err) - } - _, err = remoteAPI.Client.Discovery().ServerVersion() + _, err := remoteAPI.Client.Discovery().ServerVersion() if err != nil { remoteAPI.UnregisterGauges() - return nil, fmt.Errorf("cannot connect to api for target cluster %s: %w", clusterName, err) + return nil, fmt.Errorf("cannot connect to api for target cluster %s: %w", link.TargetClusterName, err) } // Create k8s event recorder @@ -187,7 +209,7 @@ func NewRemoteClusterServiceWatcher( Interface: remoteAPI.Client.CoreV1().Events(""), }) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{ - Component: fmt.Sprintf("linkerd-service-mirror-%s", clusterName), + Component: fmt.Sprintf("linkerd-service-mirror-%s", link.TargetClusterName), }) stopper := make(chan struct{}) @@ -200,8 +222,7 @@ func NewRemoteClusterServiceWatcher( eventBroadcaster: eventBroadcaster, recorder: recorder, log: logging.WithFields(logging.Fields{ - "cluster": clusterName, - "apiAddress": cfg.Host, + "cluster": link.TargetClusterName, }), eventsQueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]()), requeueLimit: requeueLimit, @@ -214,10 +235,14 @@ func NewRemoteClusterServiceWatcher( }, nil } -func (rcsw *RemoteClusterServiceWatcher) mirroredResourceName(remoteName string) string { +func (rcsw *RemoteClusterServiceWatcher) mirrorServiceName(remoteName string) string { return fmt.Sprintf("%s-%s", remoteName, rcsw.link.TargetClusterName) } +func (rcsw *RemoteClusterServiceWatcher) federatedServiceName(remoteName string) string { + return fmt.Sprintf("%s-federated", remoteName) +} + func (rcsw *RemoteClusterServiceWatcher) targetResourceName(mirrorName string) string { return strings.TrimSuffix(mirrorName, "-"+rcsw.link.TargetClusterName) } @@ -226,23 +251,12 @@ func (rcsw *RemoteClusterServiceWatcher) originalResourceName(mirroredName strin return strings.TrimSuffix(mirroredName, fmt.Sprintf("-%s", rcsw.link.TargetClusterName)) } -// Provides labels for mirrored service. -// "remoteService" is an optional parameter. If provided, copies all labels -// from the remote service to mirrored service (except labels with the -// "SvcMirrorPrefix"). -func (rcsw *RemoteClusterServiceWatcher) getMirroredServiceLabels(remoteService *corev1.Service) map[string]string { +// Provides labels for mirrored or federatedservice. +// Copies all labels from the remote service to local service (except labels +// with the "SvcMirrorPrefix"). +func (rcsw *RemoteClusterServiceWatcher) getCommonServiceLabels(remoteService *corev1.Service) map[string]string { labels := map[string]string{ - consts.MirroredResourceLabel: "true", - consts.RemoteClusterNameLabel: rcsw.link.TargetClusterName, - } - - if remoteService == nil { - return labels - } - - if rcsw.isRemoteDiscovery(remoteService.Labels) { - labels[consts.RemoteDiscoveryLabel] = rcsw.link.TargetClusterName - labels[consts.RemoteServiceLabel] = remoteService.GetName() + consts.MirroredResourceLabel: "true", } for key, value := range remoteService.ObjectMeta.Labels { @@ -255,13 +269,33 @@ func (rcsw *RemoteClusterServiceWatcher) getMirroredServiceLabels(remoteService return labels } -// Provides annotations for mirrored service -func (rcsw *RemoteClusterServiceWatcher) getMirroredServiceAnnotations(remoteService *corev1.Service) map[string]string { - annotations := map[string]string{ - consts.RemoteResourceVersionAnnotation: remoteService.ResourceVersion, // needed to detect real changes - consts.RemoteServiceFqName: fmt.Sprintf("%s.%s.svc.%s", remoteService.Name, remoteService.Namespace, rcsw.link.TargetClusterDomain), +// Provides labels for mirror service. +// Copies all labels from the remote service to mirrored service (except labels +// with the "SvcMirrorPrefix"). +func (rcsw *RemoteClusterServiceWatcher) getMirrorServiceLabels(remoteService *corev1.Service) map[string]string { + labels := rcsw.getCommonServiceLabels(remoteService) + labels[consts.RemoteClusterNameLabel] = rcsw.link.TargetClusterName + + if rcsw.isRemoteDiscovery(remoteService.Labels) { + labels[consts.RemoteDiscoveryLabel] = rcsw.link.TargetClusterName + labels[consts.RemoteServiceLabel] = remoteService.GetName() } + return labels +} + +// Provides labels for federated services. Copies all labels from the remote +// service to the federated service (except labels with the "SvcMirrorPrefix"). +func (rcsw *RemoteClusterServiceWatcher) getFederatedServiceLabels(remoteService *corev1.Service) map[string]string { + labels := rcsw.getCommonServiceLabels(remoteService) + + return labels +} + +// Provides annotations for mirror or federated services +func (rcsw *RemoteClusterServiceWatcher) getCommonServiceAnnotations(remoteService *corev1.Service) map[string]string { + annotations := map[string]string{} + for key, value := range remoteService.ObjectMeta.Annotations { // Topology aware hints are not multicluster aware. if key == "service.kubernetes.io/topology-aware-hints" || key == "service.kubernetes.io/topology-mode" { @@ -278,6 +312,31 @@ func (rcsw *RemoteClusterServiceWatcher) getMirroredServiceAnnotations(remoteSer return annotations } +// Provides annotations for mirror services +func (rcsw *RemoteClusterServiceWatcher) getMirrorServiceAnnotations(remoteService *corev1.Service) map[string]string { + annotations := rcsw.getCommonServiceAnnotations(remoteService) + + annotations[consts.RemoteServiceFqName] = fmt.Sprintf("%s.%s.svc.%s", remoteService.Name, remoteService.Namespace, rcsw.link.TargetClusterDomain) + annotations[consts.RemoteResourceVersionAnnotation] = remoteService.ResourceVersion // needed to detect real changes + + return annotations +} + +// Provides annotations for federated service +func (rcsw *RemoteClusterServiceWatcher) getFederatedServiceAnnotations(remoteService *corev1.Service) map[string]string { + annotations := rcsw.getCommonServiceAnnotations(remoteService) + + if rcsw.link.TargetClusterName == "" { + // Local discovery + annotations[consts.LocalDiscoveryAnnotation] = remoteService.Name + } else { + // Remote discovery + annotations[consts.RemoteDiscoveryAnnotation] = fmt.Sprintf("%s@%s", remoteService.Name, rcsw.link.TargetClusterName) + } + + return annotations +} + func (rcsw *RemoteClusterServiceWatcher) mirrorNamespaceIfNecessary(ctx context.Context, namespace string) error { // if the namespace is already present we do not need to change it. // if we are creating it we want to put a label indicating this is a @@ -375,7 +434,10 @@ func (rcsw *RemoteClusterServiceWatcher) cleanupOrphanedServices(ctx context.Con // created. This piece of code is responsible for doing just that. It takes care of // services, endpoints and namespaces (if needed) func (rcsw *RemoteClusterServiceWatcher) cleanupMirroredResources(ctx context.Context) error { - matchLabels := rcsw.getMirroredServiceLabels(nil) + matchLabels := map[string]string{ + consts.MirroredResourceLabel: "true", + consts.RemoteClusterNameLabel: rcsw.link.TargetClusterName, + } services, err := rcsw.localAPIClient.Svc().Lister().List(labels.Set(matchLabels).AsSelector()) if err != nil { @@ -426,8 +488,12 @@ func (rcsw *RemoteClusterServiceWatcher) cleanupMirroredResources(ctx context.Co } // Deletes a locally mirrored service as it is not present on the remote cluster anymore -func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceDeleted(ctx context.Context, ev *RemoteServiceDeleted) error { - localServiceName := rcsw.mirroredResourceName(ev.Name) +func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceUnexported(ctx context.Context, ev *RemoteServiceUnexported) error { + rcsw.deleteLinkMirrorStatus( + ev.Name, ev.Namespace, + ) + + localServiceName := rcsw.mirrorServiceName(ev.Name) localService, err := rcsw.localAPIClient.Svc().Lister().Services(ev.Namespace).Get(localServiceName) var errors []error if err != nil { @@ -476,9 +542,66 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceDeleted(ctx context. return nil } +// Removes a remote service from a local federated service. +func (rcsw *RemoteClusterServiceWatcher) handleFederatedServiceLeave(ctx context.Context, ev *RemoteServiceLeavesFederatedService) error { + rcsw.deleteLinkFederatedStatus( + ev.Name, ev.Namespace, + ) + + localServiceName := rcsw.federatedServiceName(ev.Name) + localService, err := rcsw.localAPIClient.Svc().Lister().Services(ev.Namespace).Get(localServiceName) + + if err != nil { + if kerrors.IsNotFound(err) { + rcsw.log.Debugf("Failed to update federated service %s/%s: %v", ev.Namespace, ev.Name, err) + return nil + } + return RetryableError{[]error{fmt.Errorf("could not fetch service %s/%s: %w", ev.Namespace, localServiceName, err)}} + } + + if rcsw.link.TargetClusterName == "" { + // Local discovery + delete(localService.Annotations, consts.LocalDiscoveryAnnotation) + } else { + remoteTarget := fmt.Sprintf("%s@%s", ev.Name, rcsw.link.TargetClusterName) + if !remoteDiscoveryContains(localService.Annotations[consts.RemoteDiscoveryAnnotation], remoteTarget) { + return nil + } + + remoteDiscoveryList := strings.Split(localService.Annotations[consts.RemoteDiscoveryAnnotation], ",") + newRemoteDiscoveryList := []string{} + for _, member := range remoteDiscoveryList { + if member == remoteTarget { + continue + } + newRemoteDiscoveryList = append(newRemoteDiscoveryList, member) + } + localService.Annotations[consts.RemoteDiscoveryAnnotation] = strings.Join(newRemoteDiscoveryList, ",") + } + + if len(localService.Annotations[consts.RemoteDiscoveryAnnotation]) == 0 && len(localService.Annotations[consts.LocalDiscoveryAnnotation]) == 0 { + rcsw.log.Infof("Deleting federated service %s/%s", ev.Namespace, localServiceName) + if err := rcsw.localAPIClient.Client.CoreV1().Services(ev.Namespace).Delete(ctx, localServiceName, metav1.DeleteOptions{}); err != nil { + if !kerrors.IsNotFound(err) { + return RetryableError{[]error{fmt.Errorf("could not delete service: %s/%s: %w", ev.Namespace, localServiceName, err)}} + } + } + rcsw.log.Infof("Successfully deleted service: %s/%s", ev.Namespace, localServiceName) + rcsw.deleteLinkFederatedStatus( + ev.Name, ev.Namespace, + ) + return nil + } + + if _, err := rcsw.localAPIClient.Client.CoreV1().Services(ev.Namespace).Update(ctx, localService, metav1.UpdateOptions{}); err != nil { + return RetryableError{[]error{err}} + } + return nil +} + // Updates a locally mirrored service. There might have been some pretty fundamental changes such as // new gateway being assigned or additional ports exposed. This method takes care of that. -func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceUpdated(ctx context.Context, ev *RemoteServiceUpdated) error { +func (rcsw *RemoteClusterServiceWatcher) handleRemoteExportedServiceUpdated(ctx context.Context, ev *RemoteExportedServiceUpdated) error { rcsw.log.Infof("Updating mirror service %s/%s", ev.localService.Namespace, ev.localService.Name) if rcsw.isRemoteDiscovery(ev.remoteUpdate.Labels) { @@ -487,6 +610,10 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceUpdated(ctx context. if ev.localEndpoints != nil { err := rcsw.localAPIClient.Client.CoreV1().Endpoints(ev.localService.Namespace).Delete(ctx, ev.localService.Name, metav1.DeleteOptions{}) if err != nil { + rcsw.updateLinkMirrorStatus( + ev.remoteUpdate.GetName(), ev.remoteUpdate.GetNamespace(), + mirrorStatusCondition(false, reasonError, fmt.Sprintf("Failed to delete mirror endpoints: %s", err), nil), + ) return RetryableError{[]error{ fmt.Errorf("failed to delete mirror endpoints for %s/%s: %w", ev.localService.Namespace, ev.localService.Name, err), }} @@ -497,6 +624,10 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceUpdated(ctx context. // be created for it. err := rcsw.createGatewayEndpoints(ctx, ev.remoteUpdate) if err != nil { + rcsw.updateLinkMirrorStatus( + ev.remoteUpdate.GetName(), ev.remoteUpdate.GetNamespace(), + mirrorStatusCondition(false, reasonError, fmt.Sprintf("Failed to create mirror endpoints: %s", err), nil), + ) return err } } else { @@ -504,6 +635,10 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceUpdated(ctx context. // exist for it but may need to be updated. gatewayAddresses, err := rcsw.resolveGatewayAddress() if err != nil { + rcsw.updateLinkMirrorStatus( + ev.remoteUpdate.GetName(), ev.remoteUpdate.GetNamespace(), + mirrorStatusCondition(false, reasonError, fmt.Sprintf("Failed to get gateway address: %s", err), nil), + ) return err } @@ -522,20 +657,87 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceUpdated(ctx context. err = rcsw.updateMirrorEndpoints(ctx, copiedEndpoints) if err != nil { + rcsw.updateLinkMirrorStatus( + ev.remoteUpdate.GetName(), ev.remoteUpdate.GetNamespace(), + mirrorStatusCondition(false, reasonError, fmt.Sprintf("Failed to update mirror endpoints: %s", err), nil), + ) return RetryableError{[]error{err}} } } - ev.localService.Labels = rcsw.getMirroredServiceLabels(ev.remoteUpdate) - ev.localService.Annotations = rcsw.getMirroredServiceAnnotations(ev.remoteUpdate) + ev.localService.Labels = rcsw.getMirrorServiceLabels(ev.remoteUpdate) + ev.localService.Annotations = rcsw.getMirrorServiceAnnotations(ev.remoteUpdate) ev.localService.Spec.Ports = remapRemoteServicePorts(ev.remoteUpdate.Spec.Ports) if _, err := rcsw.localAPIClient.Client.CoreV1().Services(ev.localService.Namespace).Update(ctx, ev.localService, metav1.UpdateOptions{}); err != nil { + rcsw.updateLinkMirrorStatus( + ev.remoteUpdate.GetName(), ev.remoteUpdate.GetNamespace(), + mirrorStatusCondition(false, reasonError, fmt.Sprintf("Failed to update mirror service: %s", err), nil), + ) + return RetryableError{[]error{err}} + } + rcsw.updateLinkMirrorStatus( + ev.remoteUpdate.GetName(), ev.remoteUpdate.GetNamespace(), + mirrorStatusCondition(true, reasonMirrored, "", ev.localService), + ) + return nil +} + +// Updates a federated service to include the remote service as a member. +func (rcsw *RemoteClusterServiceWatcher) handleFederatedServiceJoin(ctx context.Context, ev *RemoteServiceJoinsFederatedService) error { + rcsw.log.Infof("Updating federated service %s/%s", ev.localService.Namespace, ev.localService.Name) + + if ev.remoteUpdate.Spec.ClusterIP == corev1.ClusterIPNone { + rcsw.updateLinkFederatedStatus( + ev.remoteUpdate.GetName(), ev.remoteUpdate.GetNamespace(), + mirrorStatusCondition(false, reasonInvalidService, "Headless service cannot join federated service", nil), + ) + return fmt.Errorf("headless service %s/%s cannot join federated service", ev.remoteUpdate.GetNamespace(), ev.remoteUpdate.GetName()) + } + + if rcsw.link.TargetClusterName == "" { + // Local discovery + ev.localService.Annotations[consts.LocalDiscoveryAnnotation] = ev.remoteUpdate.Name + } else { + // Remote discovery + remoteTarget := fmt.Sprintf("%s@%s", ev.remoteUpdate.Name, rcsw.link.TargetClusterName) + if remoteDiscoveryContains(ev.localService.Annotations[consts.RemoteDiscoveryAnnotation], remoteTarget) { + return nil + } + if ev.localService.Annotations[consts.RemoteDiscoveryAnnotation] == "" { + ev.localService.Annotations[consts.RemoteDiscoveryAnnotation] = remoteTarget + } else { + ev.localService.Annotations[consts.RemoteDiscoveryAnnotation] = fmt.Sprintf( + "%s,%s", + ev.localService.Annotations[consts.RemoteDiscoveryAnnotation], + remoteTarget, + ) + } + } + + if _, err := rcsw.localAPIClient.Client.CoreV1().Services(ev.localService.Namespace).Update(ctx, ev.localService, metav1.UpdateOptions{}); err != nil { + rcsw.updateLinkFederatedStatus( + ev.remoteUpdate.GetName(), ev.remoteUpdate.GetNamespace(), + mirrorStatusCondition(false, reasonError, fmt.Sprintf("Failed to update federated service: %s", err), nil), + ) return RetryableError{[]error{err}} } + rcsw.updateLinkFederatedStatus( + ev.remoteUpdate.GetName(), ev.remoteUpdate.GetNamespace(), + mirrorStatusCondition(true, reasonMirrored, "", ev.localService), + ) return nil } +func remoteDiscoveryContains(remoteDiscoveryList string, remoteTarget string) bool { + for _, member := range strings.Split(remoteDiscoveryList, ",") { + if member == remoteTarget { + return true + } + } + return false +} + func remapRemoteServicePorts(ports []corev1.ServicePort) []corev1.ServicePort { // We ignore the NodePort here as its not relevant // to the local cluster @@ -551,17 +753,25 @@ func remapRemoteServicePorts(ports []corev1.ServicePort) []corev1.ServicePort { return newPorts } -func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceCreated(ctx context.Context, ev *RemoteServiceCreated) error { +func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceExported(ctx context.Context, ev *RemoteServiceExported) error { remoteService := ev.service.DeepCopy() if rcsw.headlessServicesEnabled && remoteService.Spec.ClusterIP == corev1.ClusterIPNone { + rcsw.updateLinkMirrorStatus( + ev.service.GetName(), ev.service.GetNamespace(), + mirrorStatusCondition(false, reasonInvalidService, "Headless mirror services are disabled", nil), + ) return nil } serviceInfo := fmt.Sprintf("%s/%s", remoteService.Namespace, remoteService.Name) - localServiceName := rcsw.mirroredResourceName(remoteService.Name) + localServiceName := rcsw.mirrorServiceName(remoteService.Name) if rcsw.namespaceCreationEnabled { if err := rcsw.mirrorNamespaceIfNecessary(ctx, remoteService.Namespace); err != nil { + rcsw.updateLinkMirrorStatus( + ev.service.GetName(), ev.service.GetNamespace(), + mirrorStatusCondition(false, reasonError, fmt.Sprintf("Failed to create namespace: %s", err), nil), + ) return err } } else { @@ -570,8 +780,16 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceCreated(ctx context. if kerrors.IsNotFound(err) { rcsw.recorder.Event(remoteService, corev1.EventTypeNormal, eventTypeSkipped, "Skipped mirroring service: namespace does not exist") rcsw.log.Warnf("Skipping mirroring of service %s: namespace %s does not exist", serviceInfo, remoteService.Namespace) + rcsw.updateLinkMirrorStatus( + ev.service.GetName(), ev.service.GetNamespace(), + mirrorStatusCondition(false, reasonMissingNamespace, "Namespace does not exist", nil), + ) return nil } + rcsw.updateLinkMirrorStatus( + ev.service.GetName(), ev.service.GetNamespace(), + mirrorStatusCondition(false, reasonError, fmt.Sprintf("Failed get namespace: %s", err), nil), + ) // something else went wrong, so we can just retry return RetryableError{[]error{err}} } @@ -581,8 +799,8 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceCreated(ctx context. ObjectMeta: metav1.ObjectMeta{ Name: localServiceName, Namespace: remoteService.Namespace, - Annotations: rcsw.getMirroredServiceAnnotations(remoteService), - Labels: rcsw.getMirroredServiceLabels(remoteService), + Annotations: rcsw.getMirrorServiceAnnotations(remoteService), + Labels: rcsw.getMirrorServiceLabels(remoteService), }, Spec: corev1.ServiceSpec{ Ports: remapRemoteServicePorts(remoteService.Spec.Ports), @@ -592,6 +810,10 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceCreated(ctx context. rcsw.log.Infof("Creating a new service mirror for %s", serviceInfo) if _, err := rcsw.localAPIClient.Client.CoreV1().Services(remoteService.Namespace).Create(ctx, serviceToCreate, metav1.CreateOptions{}); err != nil { if !kerrors.IsAlreadyExists(err) { + rcsw.updateLinkMirrorStatus( + ev.service.GetName(), ev.service.GetNamespace(), + mirrorStatusCondition(false, reasonError, fmt.Sprintf("Failed to create mirror service: %s", err), nil), + ) // we might have created it during earlier attempt, if that is not the case, we retry return RetryableError{[]error{err}} } @@ -599,9 +821,101 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceCreated(ctx context. if rcsw.isRemoteDiscovery(remoteService.Labels) { // For remote discovery services, skip creating gateway endpoints. + rcsw.updateLinkMirrorStatus( + ev.service.GetName(), ev.service.GetNamespace(), + mirrorStatusCondition(true, reasonMirrored, "", serviceToCreate), + ) return nil } - return rcsw.createGatewayEndpoints(ctx, remoteService) + + err := rcsw.createGatewayEndpoints(ctx, remoteService) + if err != nil { + rcsw.updateLinkMirrorStatus( + ev.service.GetName(), ev.service.GetNamespace(), + mirrorStatusCondition(false, reasonError, fmt.Sprintf("Failed to create mirror endpoints: %s", err), nil), + ) + return err + } + + rcsw.updateLinkMirrorStatus( + ev.service.GetName(), ev.service.GetNamespace(), + mirrorStatusCondition(true, reasonMirrored, "", serviceToCreate), + ) + return nil +} + +func (rcsw *RemoteClusterServiceWatcher) handleCreateFederatedService(ctx context.Context, ev *CreateFederatedService) error { + remoteService := ev.service.DeepCopy() + serviceInfo := fmt.Sprintf("%s/%s", remoteService.Namespace, remoteService.Name) + + if remoteService.Spec.ClusterIP == corev1.ClusterIPNone { + rcsw.updateLinkFederatedStatus( + remoteService.GetName(), remoteService.GetNamespace(), + mirrorStatusCondition(false, reasonInvalidService, "Headless service cannot join federated service", nil), + ) + return fmt.Errorf("headless service %s cannot join federated service", serviceInfo) + } + + localServiceName := rcsw.federatedServiceName(remoteService.Name) + + if rcsw.namespaceCreationEnabled { + if err := rcsw.mirrorNamespaceIfNecessary(ctx, remoteService.Namespace); err != nil { + rcsw.updateLinkFederatedStatus( + remoteService.GetName(), remoteService.GetNamespace(), + mirrorStatusCondition(false, reasonError, fmt.Sprintf("Failed to create namespace: %s", err), nil), + ) + return err + } + } else { + // Ensure the namespace exists, and skip mirroring if it doesn't + if _, err := rcsw.localAPIClient.Client.CoreV1().Namespaces().Get(ctx, remoteService.Namespace, metav1.GetOptions{}); err != nil { + if kerrors.IsNotFound(err) { + rcsw.recorder.Event(remoteService, corev1.EventTypeNormal, eventTypeSkipped, "Skipped mirroring service: namespace does not exist") + rcsw.log.Warnf("Skipping mirroring of service %s: namespace %s does not exist", serviceInfo, remoteService.Namespace) + rcsw.updateLinkFederatedStatus( + remoteService.GetName(), remoteService.GetNamespace(), + mirrorStatusCondition(false, reasonMissingNamespace, "Namespace does not exist", nil), + ) + return nil + } + // something else went wrong, so we can just retry + rcsw.updateLinkFederatedStatus( + remoteService.GetName(), remoteService.GetNamespace(), + mirrorStatusCondition(false, reasonError, fmt.Sprintf("Failed get namespace: %s", err), nil), + ) + return RetryableError{[]error{err}} + } + } + + serviceToCreate := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: localServiceName, + Namespace: remoteService.Namespace, + Annotations: rcsw.getFederatedServiceAnnotations(remoteService), + Labels: rcsw.getFederatedServiceLabels(remoteService), + }, + Spec: corev1.ServiceSpec{ + Ports: remapRemoteServicePorts(remoteService.Spec.Ports), + }, + } + + rcsw.log.Infof("Creating a new federated service for %s", serviceInfo) + if _, err := rcsw.localAPIClient.Client.CoreV1().Services(remoteService.Namespace).Create(ctx, serviceToCreate, metav1.CreateOptions{}); err != nil { + if !kerrors.IsAlreadyExists(err) { + // we might have created it during earlier attempt, if that is not the case, we retry + rcsw.updateLinkFederatedStatus( + remoteService.GetName(), remoteService.GetNamespace(), + mirrorStatusCondition(false, reasonError, fmt.Sprintf("Failed to create federated service: %s", err), nil), + ) + return RetryableError{[]error{err}} + } + } + + rcsw.updateLinkFederatedStatus( + remoteService.GetName(), remoteService.GetNamespace(), + mirrorStatusCondition(true, reasonMirrored, "", serviceToCreate), + ) + return nil } func (rcsw *RemoteClusterServiceWatcher) handleLocalNamespaceAdded(ns *corev1.Namespace) error { @@ -667,7 +981,7 @@ func (rcsw *RemoteClusterServiceWatcher) createGatewayEndpoints(ctx context.Cont return err } - localServiceName := rcsw.mirroredResourceName(exportedService.Name) + localServiceName := rcsw.mirrorServiceName(exportedService.Name) serviceInfo := fmt.Sprintf("%s/%s", exportedService.Namespace, exportedService.Name) endpointsToCreate := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ @@ -723,13 +1037,14 @@ func (rcsw *RemoteClusterServiceWatcher) createGatewayEndpoints(ctx context.Cont // offline for some time due to a crash a CREATE for a service that we have // observed before is simply a case of UPDATE func (rcsw *RemoteClusterServiceWatcher) createOrUpdateService(service *corev1.Service) error { - localName := rcsw.mirroredResourceName(service.Name) + mirrorName := rcsw.mirrorServiceName(service.Name) if rcsw.isExported(service.Labels) || rcsw.isRemoteDiscovery(service.Labels) { - localService, err := rcsw.localAPIClient.Svc().Lister().Services(service.Namespace).Get(localName) + // The desired state is that the local mirror service should exist. + localService, err := rcsw.localAPIClient.Svc().Lister().Services(service.Namespace).Get(mirrorName) if err != nil { if kerrors.IsNotFound(err) { - rcsw.eventsQueue.Add(&RemoteServiceCreated{ + rcsw.eventsQueue.Add(&RemoteServiceExported{ service: service, }) return nil @@ -739,7 +1054,7 @@ func (rcsw *RemoteClusterServiceWatcher) createOrUpdateService(service *corev1.S // if we have the local service present, we need to issue an update lastMirroredRemoteVersion, ok := localService.Annotations[consts.RemoteResourceVersionAnnotation] if ok && lastMirroredRemoteVersion != service.ResourceVersion { - endpoints, err := rcsw.localAPIClient.Endpoint().Lister().Endpoints(service.Namespace).Get(localName) + endpoints, err := rcsw.localAPIClient.Endpoint().Lister().Endpoints(service.Namespace).Get(mirrorName) if err != nil { if kerrors.IsNotFound(err) { endpoints = nil @@ -747,29 +1062,69 @@ func (rcsw *RemoteClusterServiceWatcher) createOrUpdateService(service *corev1.S return RetryableError{[]error{err}} } } - rcsw.eventsQueue.Add(&RemoteServiceUpdated{ + rcsw.eventsQueue.Add(&RemoteExportedServiceUpdated{ localService: localService, localEndpoints: endpoints, remoteUpdate: service, }) - return nil } - - return nil + } else { + // The desired state is that the local mirror service should not exist. + localSvc, err := rcsw.localAPIClient.Svc().Lister().Services(service.Namespace).Get(mirrorName) + if err == nil { + if localSvc.Labels != nil { + _, isMirroredRes := localSvc.Labels[consts.MirroredResourceLabel] + clusterName := localSvc.Labels[consts.RemoteClusterNameLabel] + if isMirroredRes && (clusterName == rcsw.link.TargetClusterName) { + rcsw.eventsQueue.Add(&RemoteServiceUnexported{ + Name: service.Name, + Namespace: service.Namespace, + }) + } + } + } } - localSvc, err := rcsw.localAPIClient.Svc().Lister().Services(service.Namespace).Get(localName) - if err == nil { - if localSvc.Labels != nil { - _, isMirroredRes := localSvc.Labels[consts.MirroredResourceLabel] - clusterName := localSvc.Labels[consts.RemoteClusterNameLabel] - if isMirroredRes && (clusterName == rcsw.link.TargetClusterName) { - rcsw.eventsQueue.Add(&RemoteServiceDeleted{ - Name: service.Name, - Namespace: service.Namespace, + + federatedName := rcsw.federatedServiceName(service.Name) + + if rcsw.isFederatedServiceMember(service.Labels) { + // The desired state is that the local federated service should exist. + localService, err := rcsw.localAPIClient.Svc().Lister().Services(service.Namespace).Get(federatedName) + if err != nil { + if kerrors.IsNotFound(err) { + rcsw.eventsQueue.Add(&CreateFederatedService{ + service: service, }) + return nil + } + rcsw.updateLinkFederatedStatus( + service.Name, service.Namespace, + mirrorStatusCondition(false, reasonError, fmt.Sprintf("Failed to get federated service: %s", err), nil), + ) + return RetryableError{[]error{err}} + } + // if we have the local service present, we need to issue an update + rcsw.eventsQueue.Add(&RemoteServiceJoinsFederatedService{ + localService: localService, + remoteUpdate: service, + }) + } else { + // The desired state is that the local federated service should not + // include the remote service. + localSvc, err := rcsw.localAPIClient.Svc().Lister().Services(service.Namespace).Get(federatedName) + if err == nil { + if localSvc.Labels != nil { + _, isMirroredRes := localSvc.Labels[consts.MirroredResourceLabel] + if isMirroredRes { + rcsw.eventsQueue.Add(&RemoteServiceLeavesFederatedService{ + Name: service.Name, + Namespace: service.Namespace, + }) + } } } } + return nil } @@ -787,12 +1142,16 @@ func (rcsw *RemoteClusterServiceWatcher) getMirrorServices() (*corev1.ServiceLis func (rcsw *RemoteClusterServiceWatcher) handleOnDelete(service *corev1.Service) { if rcsw.isExported(service.Labels) || rcsw.isRemoteDiscovery(service.Labels) { - rcsw.eventsQueue.Add(&RemoteServiceDeleted{ + rcsw.eventsQueue.Add(&RemoteServiceUnexported{ + Name: service.Name, + Namespace: service.Namespace, + }) + } + if rcsw.isFederatedServiceMember(service.Labels) { + rcsw.eventsQueue.Add(&RemoteServiceLeavesFederatedService{ Name: service.Name, Namespace: service.Namespace, }) - } else { - rcsw.log.Infof("Skipping OnDelete for service %s", service) } } @@ -816,12 +1175,18 @@ func (rcsw *RemoteClusterServiceWatcher) processNextEvent(ctx context.Context) ( err = rcsw.handleCreateOrUpdateEndpoints(ctx, ev.ep) case *OnDeleteCalled: rcsw.handleOnDelete(ev.svc) - case *RemoteServiceCreated: - err = rcsw.handleRemoteServiceCreated(ctx, ev) - case *RemoteServiceUpdated: - err = rcsw.handleRemoteServiceUpdated(ctx, ev) - case *RemoteServiceDeleted: - err = rcsw.handleRemoteServiceDeleted(ctx, ev) + case *RemoteServiceExported: + err = rcsw.handleRemoteServiceExported(ctx, ev) + case *RemoteExportedServiceUpdated: + err = rcsw.handleRemoteExportedServiceUpdated(ctx, ev) + case *RemoteServiceUnexported: + err = rcsw.handleRemoteServiceUnexported(ctx, ev) + case *CreateFederatedService: + err = rcsw.handleCreateFederatedService(ctx, ev) + case *RemoteServiceJoinsFederatedService: + err = rcsw.handleFederatedServiceJoin(ctx, ev) + case *RemoteServiceLeavesFederatedService: + err = rcsw.handleFederatedServiceLeave(ctx, ev) case *ClusterUnregistered: err = rcsw.cleanupMirroredResources(ctx) case *OrphanedServicesGcTriggered: @@ -1216,7 +1581,7 @@ func (rcsw *RemoteClusterServiceWatcher) handleCreateOrUpdateEndpoints( return nil } - localServiceName := rcsw.mirroredResourceName(exportedEndpoints.Name) + localServiceName := rcsw.mirrorServiceName(exportedEndpoints.Name) ep, err := rcsw.localAPIClient.Endpoint().Lister().Endpoints(exportedEndpoints.Namespace).Get(localServiceName) if err != nil { return RetryableError{[]error{err}} @@ -1289,10 +1654,13 @@ func (rcsw *RemoteClusterServiceWatcher) updateReadiness(endpoints *corev1.Endpo func (rcsw *RemoteClusterServiceWatcher) isExported(l map[string]string) bool { // Treat an empty selector as "Nothing" instead of "Everything" so that // when the selector field is unset, we don't export all Services. + if rcsw.link.Selector == nil { + return false + } if len(rcsw.link.Selector.MatchExpressions)+len(rcsw.link.Selector.MatchLabels) == 0 { return false } - selector, err := metav1.LabelSelectorAsSelector(&rcsw.link.Selector) + selector, err := metav1.LabelSelectorAsSelector(rcsw.link.Selector) if err != nil { rcsw.log.Errorf("Invalid selector: %s", err) return false @@ -1304,10 +1672,13 @@ func (rcsw *RemoteClusterServiceWatcher) isRemoteDiscovery(l map[string]string) // Treat an empty remoteDiscoverySelector as "Nothing" instead of // "Everything" so that when the remoteDiscoverySelector field is unset, we // don't export all Services. + if rcsw.link.RemoteDiscoverySelector == nil { + return false + } if len(rcsw.link.RemoteDiscoverySelector.MatchExpressions)+len(rcsw.link.RemoteDiscoverySelector.MatchLabels) == 0 { return false } - remoteDiscoverySelector, err := metav1.LabelSelectorAsSelector(&rcsw.link.RemoteDiscoverySelector) + remoteDiscoverySelector, err := metav1.LabelSelectorAsSelector(rcsw.link.RemoteDiscoverySelector) if err != nil { rcsw.log.Errorf("Invalid selector: %s", err) return false @@ -1315,3 +1686,177 @@ func (rcsw *RemoteClusterServiceWatcher) isRemoteDiscovery(l map[string]string) return remoteDiscoverySelector.Matches(labels.Set(l)) } + +func (rcsw *RemoteClusterServiceWatcher) isFederatedServiceMember(l map[string]string) bool { + // Treat an empty federatedServiceSelector as "Nothing" instead of + // "Everything" so that when the federatedServiceSelector field is unset, we + // don't export all Services. + if rcsw.link.FederatedServiceSelector == nil { + return false + } + if len(rcsw.link.FederatedServiceSelector.MatchExpressions)+len(rcsw.link.FederatedServiceSelector.MatchLabels) == 0 { + return false + } + federatedServiceSelector, err := metav1.LabelSelectorAsSelector(rcsw.link.FederatedServiceSelector) + if err != nil { + rcsw.log.Errorf("Invalid selector: %s", err) + return false + } + + return federatedServiceSelector.Matches(labels.Set(l)) +} + +func (rcsw *RemoteClusterServiceWatcher) updateLinkMirrorStatus(remoteName, namespace string, condition map[string]interface{}) { + err := rcsw.updateLinkStatus("mirrorServices", remoteName, namespace, condition) + if err != nil { + rcsw.log.Errorf("Failed to update link status: %s", err) + } +} + +func (rcsw *RemoteClusterServiceWatcher) updateLinkFederatedStatus(remoteName, namespace string, condition map[string]interface{}) { + err := rcsw.updateLinkStatus("federatedServices", remoteName, namespace, condition) + if err != nil { + rcsw.log.Errorf("Failed to update link status: %s", err) + } +} + +func (rcsw *RemoteClusterServiceWatcher) updateLinkStatus(statusSection, remoteName, namespace string, condition map[string]interface{}) error { + if rcsw.link.TargetClusterName == "" { + // The local cluster has no Link resource. + return nil + } + rcsw.log.Errorf("fetching link status %s/%s", rcsw.link.Namespace, rcsw.link.Name) + link, err := rcsw.localAPIClient.DynamicClient.Resource(multicluster.LinkGVR).Namespace(rcsw.link.Namespace).Get(context.Background(), rcsw.link.Name, metav1.GetOptions{}) + if err != nil { + return err + } + rcsw.log.Errorf("got link status: %s", link.Object) + statuses, found, err := unstructured.NestedSlice(link.Object, "status", statusSection) + if err != nil { + return err + } + if !found { + statuses = make([]interface{}, 0) + } + foundStatus := false + for i, status := range statuses { + status, ok := status.(map[string]interface{}) + if !ok { + return fmt.Errorf("mirrorServices status must be an object") + } + statusRemoteName, _, err := unstructured.NestedString(status, "remoteRef", "name") + if err != nil { + return err + } + statusRemoteNamespace, _, err := unstructured.NestedString(status, "remoteRef", "namespace") + if err != nil { + return err + } + if statusRemoteName == remoteName && statusRemoteNamespace == namespace { + foundStatus = true + status["conditions"] = []interface{}{condition} + statuses[i] = status + } + } + if !foundStatus { + statuses = append(statuses, map[string]interface{}{ + "controllerName": "linkerd.io/service-mirror", + "remoteRef": map[string]interface{}{ + "name": remoteName, + "namespace": namespace, + "kind": "Service", + "group": corev1.GroupName, + }, + "conditions": []interface{}{condition}, + }) + } + err = unstructured.SetNestedSlice(link.Object, statuses, "status", statusSection) + if err != nil { + return err + } + rcsw.log.Errorf("new link status: %s", link.Object) + flag.Set("v", "10") + _, err = rcsw.localAPIClient.DynamicClient.Resource(multicluster.LinkGVR).Namespace(rcsw.link.Namespace).UpdateStatus(context.Background(), link, metav1.UpdateOptions{}) + flag.Set("v", "2") + return err +} + +func (rcsw *RemoteClusterServiceWatcher) deleteLinkMirrorStatus(remoteName, namespace string) { + err := rcsw.deleteLinkStatus("mirrorServices", remoteName, namespace) + if err != nil { + rcsw.log.Errorf("Failed to update link status: %s", err) + } +} + +func (rcsw *RemoteClusterServiceWatcher) deleteLinkFederatedStatus(remoteName, namespace string) { + err := rcsw.deleteLinkStatus("federatedServices", remoteName, namespace) + if err != nil { + rcsw.log.Errorf("Failed to update link status: %s", err) + } +} + +func (rcsw *RemoteClusterServiceWatcher) deleteLinkStatus(statusSection, remoteName, namespace string) error { + if rcsw.link.TargetClusterName == "" { + // The local cluster has no Link resource. + return nil + } + link, err := rcsw.localAPIClient.DynamicClient.Resource(multicluster.LinkGVR).Namespace(rcsw.link.Namespace).Get(context.Background(), rcsw.link.Name, metav1.GetOptions{}) + if err != nil { + return err + } + statuses, found, err := unstructured.NestedSlice(link.Object, "status", statusSection) + if err != nil { + return err + } + if !found { + statuses = make([]interface{}, 0) + } + newStatuses := make([]interface{}, 0) + for _, status := range statuses { + status, ok := status.(map[string]interface{}) + if !ok { + return fmt.Errorf("mirrorServices status must be an object") + } + statusRemoteName, _, err := unstructured.NestedString(status, "remoteRef", "name") + if err != nil { + return err + } + statusRemoteNamespace, _, err := unstructured.NestedString(status, "remoteRef", "namespace") + if err != nil { + return err + } + if statusRemoteName == remoteName && statusRemoteNamespace == namespace { + continue + } + newStatuses = append(newStatuses, status) + } + err = unstructured.SetNestedSlice(link.Object, newStatuses, "status", statusSection) + if err != nil { + return err + } + _, err = rcsw.localAPIClient.DynamicClient.Resource(multicluster.LinkGVR).Namespace(rcsw.link.Namespace).UpdateStatus(context.Background(), link, metav1.UpdateOptions{}) + return err +} + +func mirrorStatusCondition(success bool, reason string, message string, localRef *corev1.Service) map[string]interface{} { + status := "True" + if !success { + status = "False" + } + condition := map[string]interface{}{ + "lastTransitionTime": time.Now().Format(time.RFC3339), + "message": message, + "reason": reason, + "status": status, + "type": "Mirrored", + } + if localRef != nil { + condition["localRef"] = map[string]interface{}{ + "name": localRef.Name, + "namespace": localRef.Namespace, + "kind": "Service", + "group": corev1.GroupName, + } + } + return condition +} diff --git a/multicluster/service-mirror/cluster_watcher_headless.go b/multicluster/service-mirror/cluster_watcher_headless.go index 60ef3bb24189a..0c952090ba77a 100644 --- a/multicluster/service-mirror/cluster_watcher_headless.go +++ b/multicluster/service-mirror/cluster_watcher_headless.go @@ -51,7 +51,7 @@ func (rcsw *RemoteClusterServiceWatcher) createOrUpdateHeadlessEndpoints(ctx con return nil } - mirrorServiceName := rcsw.mirroredResourceName(exportedService.Name) + mirrorServiceName := rcsw.mirrorServiceName(exportedService.Name) mirrorService, err := rcsw.localAPIClient.Svc().Lister().Services(exportedService.Namespace).Get(mirrorServiceName) if err != nil { if !kerrors.IsNotFound(err) { @@ -66,7 +66,7 @@ func (rcsw *RemoteClusterServiceWatcher) createOrUpdateHeadlessEndpoints(ctx con } } - headlessMirrorEpName := rcsw.mirroredResourceName(exportedEndpoints.Name) + headlessMirrorEpName := rcsw.mirrorServiceName(exportedEndpoints.Name) headlessMirrorEndpoints, err := rcsw.localAPIClient.Endpoint().Lister().Endpoints(exportedEndpoints.Namespace).Get(headlessMirrorEpName) if err != nil { if !kerrors.IsNotFound(err) { @@ -104,7 +104,7 @@ func (rcsw *RemoteClusterServiceWatcher) createOrUpdateHeadlessEndpoints(ctx con continue } - endpointMirrorName := rcsw.mirroredResourceName(address.Hostname) + endpointMirrorName := rcsw.mirrorServiceName(address.Hostname) endpointMirrorService, err := rcsw.localAPIClient.Svc().Lister().Services(exportedEndpoints.Namespace).Get(endpointMirrorName) if err != nil { if !kerrors.IsNotFound(err) { @@ -136,7 +136,7 @@ func (rcsw *RemoteClusterServiceWatcher) createOrUpdateHeadlessEndpoints(ctx con }) } - headlessMirrorName := rcsw.mirroredResourceName(exportedService.Name) + headlessMirrorName := rcsw.mirrorServiceName(exportedService.Name) matchLabels := map[string]string{ consts.MirroredHeadlessSvcNameLabel: headlessMirrorName, } @@ -197,7 +197,7 @@ func (rcsw *RemoteClusterServiceWatcher) createRemoteHeadlessService(ctx context remoteService := exportedService.DeepCopy() serviceInfo := fmt.Sprintf("%s/%s", remoteService.Namespace, remoteService.Name) - localServiceName := rcsw.mirroredResourceName(remoteService.Name) + localServiceName := rcsw.mirrorServiceName(remoteService.Name) if rcsw.namespaceCreationEnabled { if err := rcsw.mirrorNamespaceIfNecessary(ctx, remoteService.Namespace); err != nil { @@ -219,8 +219,8 @@ func (rcsw *RemoteClusterServiceWatcher) createRemoteHeadlessService(ctx context ObjectMeta: metav1.ObjectMeta{ Name: localServiceName, Namespace: remoteService.Namespace, - Annotations: rcsw.getMirroredServiceAnnotations(remoteService), - Labels: rcsw.getMirroredServiceLabels(remoteService), + Annotations: rcsw.getMirrorServiceAnnotations(remoteService), + Labels: rcsw.getMirrorServiceLabels(remoteService), }, Spec: corev1.ServiceSpec{ Ports: remapRemoteServicePorts(remoteService.Spec.Ports), @@ -260,7 +260,7 @@ func (rcsw *RemoteClusterServiceWatcher) createHeadlessMirrorEndpoints(ctx conte continue } - endpointMirrorName := rcsw.mirroredResourceName(addr.Hostname) + endpointMirrorName := rcsw.mirrorServiceName(addr.Hostname) createdService, err := rcsw.createEndpointMirrorService(ctx, addr.Hostname, exportedEndpoints.ResourceVersion, endpointMirrorName, exportedService) if err != nil { rcsw.log.Errorf("error creating endpoint mirror service %s/%s for exported headless service %s: %v", endpointMirrorName, exportedService.Namespace, exportedServiceInfo, err) @@ -285,7 +285,7 @@ func (rcsw *RemoteClusterServiceWatcher) createHeadlessMirrorEndpoints(ctx conte }) } - headlessMirrorServiceName := rcsw.mirroredResourceName(exportedService.Name) + headlessMirrorServiceName := rcsw.mirrorServiceName(exportedService.Name) headlessMirrorEndpoints := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: headlessMirrorServiceName, @@ -337,8 +337,8 @@ func (rcsw *RemoteClusterServiceWatcher) createEndpointMirrorService(ctx context consts.RemoteServiceFqName: fmt.Sprintf("%s.%s.%s.svc.%s", endpointHostname, exportedService.Name, exportedService.Namespace, rcsw.link.TargetClusterDomain), } - endpointMirrorLabels := rcsw.getMirroredServiceLabels(nil) - mirrorServiceName := rcsw.mirroredResourceName(exportedService.Name) + endpointMirrorLabels := rcsw.getMirrorServiceLabels(exportedService) + mirrorServiceName := rcsw.mirrorServiceName(exportedService.Name) endpointMirrorLabels[consts.MirroredHeadlessSvcNameLabel] = mirrorServiceName // Create service spec, clusterIP diff --git a/multicluster/service-mirror/cluster_watcher_mirroring_test.go b/multicluster/service-mirror/cluster_watcher_mirroring_test.go index 792481ca7abbf..eec9e770a827e 100644 --- a/multicluster/service-mirror/cluster_watcher_mirroring_test.go +++ b/multicluster/service-mirror/cluster_watcher_mirroring_test.go @@ -278,8 +278,8 @@ func TestLocalNamespaceCreatedAfterServiceExport(t *testing.T) { GatewayAddress: "192.0.2.127", GatewayPort: 888, ProbeSpec: defaultProbeSpec, - Selector: *defaultSelector, - RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, + Selector: defaultSelector, + RemoteDiscoverySelector: defaultRemoteDiscoverySelector, }, remoteAPIClient: remoteAPI, localAPIClient: localAPI, @@ -292,7 +292,7 @@ func TestLocalNamespaceCreatedAfterServiceExport(t *testing.T) { headlessServicesEnabled: true, } - q.Add(&RemoteServiceCreated{ + q.Add(&RemoteServiceExported{ service: remoteService("service-one", "ns1", "111", map[string]string{ consts.DefaultExportedServiceSelector: "true", }, []corev1.ServicePort{ @@ -367,8 +367,8 @@ func TestServiceCreatedGatewayAlive(t *testing.T) { GatewayAddress: "192.0.0.1", GatewayPort: 888, ProbeSpec: defaultProbeSpec, - Selector: *defaultSelector, - RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, + Selector: defaultSelector, + RemoteDiscoverySelector: defaultRemoteDiscoverySelector, }, remoteAPIClient: remoteAPI, localAPIClient: localAPI, @@ -378,7 +378,7 @@ func TestServiceCreatedGatewayAlive(t *testing.T) { gatewayAlive: true, } - events.Add(&RemoteServiceCreated{ + events.Add(&RemoteServiceExported{ service: remoteService("svc", "ns", "1", map[string]string{ consts.DefaultExportedServiceSelector: "true", }, []corev1.ServicePort{ @@ -446,7 +446,7 @@ func TestServiceCreatedGatewayAlive(t *testing.T) { // 'new-label'. This should exercise RemoteServiceUpdated which should // update svc-remote; the gateway is still not alive though so we expect // the Endpoints of svc-remote to still have no ready addresses. - events.Add(&RemoteServiceUpdated{ + events.Add(&RemoteExportedServiceUpdated{ localService: remoteService("svc-remote", "ns", "2", nil, nil), localEndpoints: endpoints, remoteUpdate: remoteService("svc", "ns", "2", map[string]string{ @@ -515,8 +515,8 @@ func TestServiceCreatedGatewayDown(t *testing.T) { GatewayAddress: "192.0.0.1", GatewayPort: 888, ProbeSpec: defaultProbeSpec, - Selector: *defaultSelector, - RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, + Selector: defaultSelector, + RemoteDiscoverySelector: defaultRemoteDiscoverySelector, }, remoteAPIClient: remoteAPI, localAPIClient: localAPI, @@ -526,7 +526,7 @@ func TestServiceCreatedGatewayDown(t *testing.T) { gatewayAlive: false, } - events.Add(&RemoteServiceCreated{ + events.Add(&RemoteServiceExported{ service: remoteService("svc", "ns", "1", map[string]string{ consts.DefaultExportedServiceSelector: "true", }, []corev1.ServicePort{ @@ -651,7 +651,7 @@ func TestEmptyRemoteSelectors(t *testing.T) { { description: "empty remote discovery selector does not result in exports", environment: createEnvWithSelector(defaultSelector, &metav1.LabelSelector{}), - expectedEventsInQueue: []interface{}{&RemoteServiceCreated{ + expectedEventsInQueue: []interface{}{&RemoteServiceExported{ service: remoteService("service-one", "ns1", "111", map[string]string{ consts.DefaultExportedServiceSelector: "true", }, []corev1.ServicePort{ @@ -672,7 +672,7 @@ func TestEmptyRemoteSelectors(t *testing.T) { { description: "empty default selector does not result in exports", environment: createEnvWithSelector(&metav1.LabelSelector{}, defaultRemoteDiscoverySelector), - expectedEventsInQueue: []interface{}{&RemoteServiceCreated{ + expectedEventsInQueue: []interface{}{&RemoteServiceExported{ service: remoteService("service-two", "ns1", "111", map[string]string{ consts.DefaultExportedServiceSelector: "remote-discovery", }, []corev1.ServicePort{ @@ -851,7 +851,7 @@ func onAddOrUpdateTestCases(isAdd bool) []mirroringTestCase { { description: fmt.Sprintf("enqueue a RemoteServiceCreated event when this is not a gateway and we have the needed annotations (%s)", testType), environment: onAddOrUpdateExportedSvc(isAdd), - expectedEventsInQueue: []interface{}{&RemoteServiceCreated{ + expectedEventsInQueue: []interface{}{&RemoteServiceExported{ service: remoteService("test-service", "test-namespace", "resVersion", map[string]string{ consts.DefaultExportedServiceSelector: "true", }, nil), @@ -860,7 +860,7 @@ func onAddOrUpdateTestCases(isAdd bool) []mirroringTestCase { { description: fmt.Sprintf("enqueue a RemoteServiceUpdated event if this is a service that we have already mirrored and its res version is different (%s)", testType), environment: onAddOrUpdateRemoteServiceUpdated(isAdd), - expectedEventsInQueue: []interface{}{&RemoteServiceUpdated{ + expectedEventsInQueue: []interface{}{&RemoteExportedServiceUpdated{ localService: mirrorService("test-service-remote", "test-namespace", "pastResourceVersion", nil), localEndpoints: endpoints("test-service-remote", "test-namespace", "0.0.0.0", "", nil), remoteUpdate: remoteService("test-service", "test-namespace", "currentResVersion", map[string]string{ @@ -887,7 +887,7 @@ func onAddOrUpdateTestCases(isAdd bool) []mirroringTestCase { { description: fmt.Sprintf("enqueue RemoteServiceDeleted event as this service is not mirrorable anymore (%s)", testType), environment: serviceNotExportedAnymore(isAdd), - expectedEventsInQueue: []interface{}{&RemoteServiceDeleted{ + expectedEventsInQueue: []interface{}{&RemoteServiceUnexported{ Name: "test-service", Namespace: "test-namespace", }}, @@ -922,7 +922,7 @@ func TestOnDelete(t *testing.T) { description: "enqueues a RemoteServiceDeleted because there is gateway metadata present on the service", environment: onDeleteExportedService, expectedEventsInQueue: []interface{}{ - &RemoteServiceDeleted{ + &RemoteServiceUnexported{ Name: "test-service", Namespace: "test-namespace", }, diff --git a/multicluster/service-mirror/cluster_watcher_test_util.go b/multicluster/service-mirror/cluster_watcher_test_util.go index b0e083ea16ac6..ad9104c8894a3 100644 --- a/multicluster/service-mirror/cluster_watcher_test_util.go +++ b/multicluster/service-mirror/cluster_watcher_test_util.go @@ -83,7 +83,7 @@ func (te *testEnvironment) runEnvironment(watcherQueue workqueue.TypedRateLimiti var createExportedService = &testEnvironment{ events: []interface{}{ - &RemoteServiceCreated{ + &RemoteServiceExported{ service: remoteService("service-one", "ns1", "111", map[string]string{ consts.DefaultExportedServiceSelector: "true", }, []corev1.ServicePort{ @@ -114,14 +114,14 @@ var createExportedService = &testEnvironment{ GatewayAddress: "192.0.2.127", GatewayPort: 888, ProbeSpec: defaultProbeSpec, - Selector: *defaultSelector, - RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, + Selector: defaultSelector, + RemoteDiscoverySelector: defaultRemoteDiscoverySelector, }, } var createRemoteDiscoveryService = &testEnvironment{ events: []interface{}{ - &RemoteServiceCreated{ + &RemoteServiceExported{ service: remoteService("service-one", "ns1", "111", map[string]string{ consts.DefaultExportedServiceSelector: "remote-discovery", }, []corev1.ServicePort{ @@ -151,14 +151,14 @@ var createRemoteDiscoveryService = &testEnvironment{ GatewayAddress: "192.0.2.127", GatewayPort: 888, ProbeSpec: defaultProbeSpec, - Selector: *defaultSelector, - RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, + Selector: defaultSelector, + RemoteDiscoverySelector: defaultRemoteDiscoverySelector, }, } var createExportedHeadlessService = &testEnvironment{ events: []interface{}{ - &RemoteServiceCreated{ + &RemoteServiceExported{ service: remoteHeadlessService("service-one", "ns2", "111", map[string]string{ consts.DefaultExportedServiceSelector: "true", }, []corev1.ServicePort{ @@ -231,14 +231,14 @@ var createExportedHeadlessService = &testEnvironment{ Path: "/probe1", Period: 120, }, - Selector: *defaultSelector, - RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, + Selector: defaultSelector, + RemoteDiscoverySelector: defaultRemoteDiscoverySelector, }, } var deleteMirrorService = &testEnvironment{ events: []interface{}{ - &RemoteServiceDeleted{ + &RemoteServiceUnexported{ Name: "test-service-remote-to-delete", Namespace: "test-namespace-to-delete", }, @@ -254,14 +254,14 @@ var deleteMirrorService = &testEnvironment{ GatewayAddress: "192.0.2.127", GatewayPort: 888, ProbeSpec: defaultProbeSpec, - Selector: *defaultSelector, - RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, + Selector: defaultSelector, + RemoteDiscoverySelector: defaultRemoteDiscoverySelector, }, } var updateServiceWithChangedPorts = &testEnvironment{ events: []interface{}{ - &RemoteServiceUpdated{ + &RemoteExportedServiceUpdated{ remoteUpdate: remoteService("test-service", "test-namespace", "currentServiceResVersion", map[string]string{ consts.DefaultExportedServiceSelector: "true", }, []corev1.ServicePort{ @@ -348,8 +348,8 @@ var updateServiceWithChangedPorts = &testEnvironment{ GatewayAddress: "192.0.2.127", GatewayPort: 888, ProbeSpec: defaultProbeSpec, - Selector: *defaultSelector, - RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, + Selector: defaultSelector, + RemoteDiscoverySelector: defaultRemoteDiscoverySelector, }, } @@ -457,8 +457,8 @@ var updateEndpointsWithChangedHosts = &testEnvironment{ GatewayAddress: "192.0.2.127", GatewayPort: 888, ProbeSpec: defaultProbeSpec, - Selector: *defaultSelector, - RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, + Selector: defaultSelector, + RemoteDiscoverySelector: defaultRemoteDiscoverySelector, }, } var clusterUnregistered = &testEnvironment{ @@ -501,7 +501,7 @@ var gcTriggered = &testEnvironment{ var noGatewayLink = &testEnvironment{ events: []interface{}{ - &RemoteServiceCreated{ + &RemoteServiceExported{ service: remoteService("service-one", "ns1", "111", map[string]string{ consts.DefaultExportedServiceSelector: "remote-discovery", }, []corev1.ServicePort{ @@ -517,7 +517,7 @@ var noGatewayLink = &testEnvironment{ }, }), }, - &RemoteServiceCreated{ + &RemoteServiceExported{ service: remoteService("service-two", "ns1", "111", map[string]string{ consts.DefaultExportedServiceSelector: "true", }, []corev1.ServicePort{ @@ -552,8 +552,8 @@ var noGatewayLink = &testEnvironment{ Port: 0, Period: time.Duration(0) * time.Second, }, - Selector: metav1.LabelSelector{}, - RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, + Selector: &metav1.LabelSelector{}, + RemoteDiscoverySelector: defaultRemoteDiscoverySelector, }, } @@ -571,8 +571,8 @@ func onAddOrUpdateExportedSvc(isAdd bool) *testEnvironment { GatewayAddress: "192.0.2.127", GatewayPort: 888, ProbeSpec: defaultProbeSpec, - Selector: *defaultSelector, - RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, + Selector: defaultSelector, + RemoteDiscoverySelector: defaultRemoteDiscoverySelector, }, } @@ -596,8 +596,8 @@ func onAddOrUpdateRemoteServiceUpdated(isAdd bool) *testEnvironment { GatewayAddress: "192.0.2.127", GatewayPort: 888, ProbeSpec: defaultProbeSpec, - Selector: *defaultSelector, - RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, + Selector: defaultSelector, + RemoteDiscoverySelector: defaultRemoteDiscoverySelector, }, } } @@ -620,8 +620,8 @@ func onAddOrUpdateSameResVersion(isAdd bool) *testEnvironment { GatewayAddress: "192.0.2.127", GatewayPort: 888, ProbeSpec: defaultProbeSpec, - Selector: *defaultSelector, - RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, + Selector: defaultSelector, + RemoteDiscoverySelector: defaultRemoteDiscoverySelector, }, } } @@ -642,8 +642,8 @@ func serviceNotExportedAnymore(isAdd bool) *testEnvironment { GatewayAddress: "192.0.2.127", GatewayPort: 888, ProbeSpec: defaultProbeSpec, - Selector: *defaultSelector, - RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, + Selector: defaultSelector, + RemoteDiscoverySelector: defaultRemoteDiscoverySelector, }, } } @@ -663,8 +663,8 @@ var onDeleteExportedService = &testEnvironment{ GatewayAddress: "192.0.2.127", GatewayPort: 888, ProbeSpec: defaultProbeSpec, - Selector: *defaultSelector, - RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, + Selector: defaultSelector, + RemoteDiscoverySelector: defaultRemoteDiscoverySelector, }, } @@ -681,8 +681,8 @@ var onDeleteNonExportedService = &testEnvironment{ GatewayAddress: "192.0.2.127", GatewayPort: 888, ProbeSpec: defaultProbeSpec, - Selector: *defaultSelector, - RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, + Selector: defaultSelector, + RemoteDiscoverySelector: defaultRemoteDiscoverySelector, }, } @@ -1286,8 +1286,8 @@ func createEnvWithSelector(defaultSelector, remoteSelector *metav1.LabelSelector Port: 0, Period: time.Duration(0) * time.Second, }, - Selector: *defaultSelector, - RemoteDiscoverySelector: *remoteSelector, + Selector: defaultSelector, + RemoteDiscoverySelector: remoteSelector, }, } } diff --git a/multicluster/service-mirror/events_formatting.go b/multicluster/service-mirror/events_formatting.go index 9bbd40922df96..bba411b9c4bc7 100644 --- a/multicluster/service-mirror/events_formatting.go +++ b/multicluster/service-mirror/events_formatting.go @@ -52,16 +52,28 @@ func formatEndpoints(endpoints *corev1.Endpoints) string { } // Events for cluster watcher -func (rsc RemoteServiceCreated) String() string { - return fmt.Sprintf("RemoteServiceCreated: {service: %s}", formatService(rsc.service)) +func (rsc RemoteServiceExported) String() string { + return fmt.Sprintf("RemoteServiceExported: {service: %s}", formatService(rsc.service)) } -func (rsu RemoteServiceUpdated) String() string { - return fmt.Sprintf("RemoteServiceUpdated: {localService: %s, localEndpoints: %s, remoteUpdate: %s}", formatService(rsu.localService), formatEndpoints(rsu.localEndpoints), formatService(rsu.remoteUpdate)) +func (rsu RemoteExportedServiceUpdated) String() string { + return fmt.Sprintf("RemoteExportedServiceUpdated: {localService: %s, localEndpoints: %s, remoteUpdate: %s}", formatService(rsu.localService), formatEndpoints(rsu.localEndpoints), formatService(rsu.remoteUpdate)) } -func (rsd RemoteServiceDeleted) String() string { - return fmt.Sprintf("RemoteServiceDeleted: {name: %s, namespace: %s }", rsd.Name, rsd.Namespace) +func (rsd RemoteServiceUnexported) String() string { + return fmt.Sprintf("RemoteServiceUnexported: {name: %s, namespace: %s }", rsd.Name, rsd.Namespace) +} + +func (cfs CreateFederatedService) String() string { + return fmt.Sprintf("CreateFederatedService: {service: %s}", formatService(cfs.service)) +} + +func (jfs RemoteServiceJoinsFederatedService) String() string { + return fmt.Sprintf("RemoteServiceJoinsFederatedService: {localService: %s, remoteUpdate: %s}", formatService(jfs.localService), formatService(jfs.remoteUpdate)) +} + +func (lfs RemoteServiceLeavesFederatedService) String() string { + return fmt.Sprintf("RemoteServiceLeavesFederatedService: {name: %s, namespace: %s }", lfs.Name, lfs.Namespace) } func (cgu ClusterUnregistered) String() string { diff --git a/multicluster/values/values.go b/multicluster/values/values.go index b7eb95504822d..d14de9ad3561b 100644 --- a/multicluster/values/values.go +++ b/multicluster/values/values.go @@ -5,6 +5,7 @@ import ( "github.com/linkerd/linkerd2/multicluster/static" "github.com/linkerd/linkerd2/pkg/charts" + "github.com/linkerd/linkerd2/pkg/charts/linkerd2" "github.com/linkerd/linkerd2/pkg/k8s" "helm.sh/helm/v3/pkg/chart/loader" "helm.sh/helm/v3/pkg/chartutil" @@ -42,6 +43,8 @@ type Values struct { ServiceMirrorAdditionalEnv []corev1.EnvVar `json:"serviceMirrorAdditionalEnv"` ServiceMirrorExperimentalEnv []corev1.EnvVar `json:"serviceMirrorExperimentalEnv"` + + LocalServiceMirror *LocalServiceMirror `json:"localServiceMirror"` } // Gateway contains all options related to the Gateway Service @@ -70,6 +73,18 @@ type Probe struct { Timeout string `json:"timeout"` } +type LocalServiceMirror struct { + ServiceMirrorRetryLimit uint32 `json:"serviceMirrorRetryLimit"` + FederatedServiceSelector string `json:"federatedServiceSelector"` + Replias uint32 `json:"replicas"` + Image *linkerd2.Image `json:"image"` + LogLevel string `json:"logLevel"` + LogFormat string `json:"logFormat"` + EnablePprof bool `json:"enablePprof"` + UID int64 `json:"UID"` + GID int64 `json:"GID"` +} + // NewInstallValues returns a new instance of the Values type. func NewInstallValues() (*Values, error) { chartDir := fmt.Sprintf("%s/", helmDefaultChartDir) diff --git a/pkg/k8s/fake.go b/pkg/k8s/fake.go index d5d800d8df534..32798122321bb 100644 --- a/pkg/k8s/fake.go +++ b/pkg/k8s/fake.go @@ -23,6 +23,8 @@ import ( "k8s.io/apimachinery/pkg/util/rand" yamlDecoder "k8s.io/apimachinery/pkg/util/yaml" discoveryfake "k8s.io/client-go/discovery/fake" + "k8s.io/client-go/dynamic" + dynamicFake "k8s.io/client-go/dynamic/fake" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" @@ -44,7 +46,7 @@ func init() { // NewFakeAPI provides a mock KubernetesAPI backed by hard-coded resources func NewFakeAPI(configs ...string) (*KubernetesAPI, error) { - client, apiextClient, apiregClient, _, err := NewFakeClientSets(configs...) + client, apiextClient, apiregClient, _, _, err := NewFakeClientSets(configs...) if err != nil { return nil, err } @@ -60,7 +62,7 @@ func NewFakeAPI(configs ...string) (*KubernetesAPI, error) { // NewFakeAPIFromManifests reads from a slice of readers, each representing a // manifest or collection of manifests, and returns a mock KubernetesAPI. func NewFakeAPIFromManifests(readers []io.Reader) (*KubernetesAPI, error) { - client, apiextClient, apiregClient, _, err := newFakeClientSetsFromManifests(readers) + client, apiextClient, apiregClient, _, _, err := newFakeClientSetsFromManifests(readers) if err != nil { return nil, err } @@ -79,6 +81,7 @@ func NewFakeClientSets(configs ...string) ( apiextensionsclient.Interface, apiregistrationclient.Interface, spclient.Interface, + dynamic.Interface, error, ) { objs := []runtime.Object{} @@ -89,7 +92,7 @@ func NewFakeClientSets(configs ...string) ( for _, config := range configs { obj, err := ToRuntimeObject(config) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } switch strings.ToLower(obj.GetObjectKind().GroupVersionKind().Kind) { case "customresourcedefinition": @@ -115,7 +118,7 @@ metadata: name: kubernetes namespace: default`) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } objs = append(objs, endpointslice) @@ -167,6 +170,7 @@ metadata: apiextensionsfake.NewSimpleClientset(apiextObjs...), apiregistrationfake.NewSimpleClientset(apiRegObjs...), spfake.NewSimpleClientset(spObjs...), + dynamicFake.NewSimpleDynamicClient(scheme.Scheme, objs...), nil } @@ -180,6 +184,7 @@ func newFakeClientSetsFromManifests(readers []io.Reader) ( apiextensionsclient.Interface, apiregistrationclient.Interface, spclient.Interface, + dynamic.Interface, error, ) { configs := []string{} @@ -195,13 +200,13 @@ func newFakeClientSetsFromManifests(readers []io.Reader) ( if errors.Is(err, io.EOF) { break } - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } // check for kind var typeMeta metav1.TypeMeta if err := yaml.Unmarshal(bytes, &typeMeta); err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } switch typeMeta.Kind { @@ -211,7 +216,7 @@ func newFakeClientSetsFromManifests(readers []io.Reader) ( case "List": var sourceList corev1.List if err := yaml.Unmarshal(bytes, &sourceList); err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } for _, item := range sourceList.Items { configs = append(configs, string(item.Raw)) diff --git a/pkg/k8s/fake_test.go b/pkg/k8s/fake_test.go index 35351f5c7566a..e7102e4f42716 100644 --- a/pkg/k8s/fake_test.go +++ b/pkg/k8s/fake_test.go @@ -231,7 +231,7 @@ spec: tc := tc // pin t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { - _, _, _, _, err := NewFakeClientSets(tc.k8sConfigs...) + _, _, _, _, _, err := NewFakeClientSets(tc.k8sConfigs...) if diff := deep.Equal(err, tc.err); diff != nil { t.Errorf("%+v", diff) } @@ -330,7 +330,7 @@ spec: readers = append(readers, strings.NewReader(m)) } - _, _, _, _, err := newFakeClientSetsFromManifests(readers) + _, _, _, _, _, err := newFakeClientSetsFromManifests(readers) if diff := deep.Equal(err, tc.err); diff != nil { t.Errorf("%+v", diff) } diff --git a/pkg/k8s/k8s.go b/pkg/k8s/k8s.go index 13ac1dfbfbe54..8f4dafb68a5ca 100644 --- a/pkg/k8s/k8s.go +++ b/pkg/k8s/k8s.go @@ -45,8 +45,8 @@ const ( ServiceProfileKind = "ServiceProfile" LinkAPIGroup = "multicluster.linkerd.io" - LinkAPIVersion = "v1alpha1" - LinkAPIGroupVersion = "multicluster.linkerd.io/v1alpha1" + LinkAPIVersion = "v1alpha2" + LinkAPIGroupVersion = "multicluster.linkerd.io/v1alpha2" LinkKind = "Link" K8sCoreAPIGroup = "core" diff --git a/pkg/k8s/labels.go b/pkg/k8s/labels.go index d4c0465aebdff..89cd9dc3b042d 100644 --- a/pkg/k8s/labels.go +++ b/pkg/k8s/labels.go @@ -431,6 +431,10 @@ const ( // services. DefaultExportedServiceSelector = SvcMirrorPrefix + "/exported" + // DefaultFederatedServiceSelector is the default label selector for + // federated services. + DefaultFederatedServiceSelector = SvcMirrorPrefix + "/federated" + // MirroredResourceLabel indicates that this resource is the result // of a mirroring operation (can be a namespace or a service) MirroredResourceLabel = SvcMirrorPrefix + "/mirrored-service" @@ -461,6 +465,15 @@ const ( // on the remote cluster RemoteServiceFqName = SvcMirrorPrefix + "/remote-svc-fq-name" + // RemoteDiscoveryAnnotation is like the RemoteDiscoveryLabel but it allows + // a list of remote discovery targets to be specified in the format: + // @,@,... + RemoteDiscoveryAnnotation = MulticlusterPrefix + "/remote-discovery" + + // LocalDiscoveryAnnotation is like the RemoteDiscoveryAnnotation but it + // specifies a single service in the local cluster. + LocalDiscoveryAnnotation = MulticlusterPrefix + "/local-discovery" + // RemoteGatewayIdentity follows the same kind of logic as RemoteGatewayNameLabel RemoteGatewayIdentity = SvcMirrorPrefix + "/remote-gateway-identity" diff --git a/pkg/multicluster/link.go b/pkg/multicluster/link.go index b7cf4c2484c8d..a54be7024eb9e 100644 --- a/pkg/multicluster/link.go +++ b/pkg/multicluster/link.go @@ -47,8 +47,9 @@ type ( GatewayPort uint32 GatewayIdentity string ProbeSpec ProbeSpec - Selector metav1.LabelSelector - RemoteDiscoverySelector metav1.LabelSelector + Selector *metav1.LabelSelector + RemoteDiscoverySelector *metav1.LabelSelector + FederatedServiceSelector *metav1.LabelSelector } ErrFieldMissing struct { @@ -161,6 +162,18 @@ func NewLink(u unstructured.Unstructured) (Link, error) { } } + federatedServiceSelector := metav1.LabelSelector{} + if selectorObj, ok := specObj["federatedServiceSelector"]; ok { + bytes, err := json.Marshal(selectorObj) + if err != nil { + return Link{}, err + } + err = json.Unmarshal(bytes, &federatedServiceSelector) + if err != nil { + return Link{}, err + } + } + return Link{ Name: u.GetName(), Namespace: u.GetNamespace(), @@ -172,8 +185,9 @@ func NewLink(u unstructured.Unstructured) (Link, error) { GatewayPort: uint32(gatewayPort), GatewayIdentity: gatewayIdentity, ProbeSpec: probeSpec, - Selector: selector, - RemoteDiscoverySelector: remoteDiscoverySelector, + Selector: &selector, + RemoteDiscoverySelector: &remoteDiscoverySelector, + FederatedServiceSelector: &federatedServiceSelector, }, nil } @@ -228,6 +242,17 @@ func (l Link) ToUnstructured() (unstructured.Unstructured, error) { } spec["remoteDiscoverySelector"] = remoteDiscoverySelector + data, err = json.Marshal(l.FederatedServiceSelector) + if err != nil { + return unstructured.Unstructured{}, err + } + federatedServiceSelector := make(map[string]interface{}) + err = json.Unmarshal(data, &federatedServiceSelector) + if err != nil { + return unstructured.Unstructured{}, err + } + spec["federatedServiceSelector"] = federatedServiceSelector + return unstructured.Unstructured{ Object: map[string]interface{}{ "apiVersion": k8s.LinkAPIGroupVersion, @@ -236,7 +261,8 @@ func (l Link) ToUnstructured() (unstructured.Unstructured, error) { "name": l.Name, "namespace": l.Namespace, }, - "spec": spec, + "spec": spec, + "status": map[string]interface{}{}, }, }, nil }