From 689c7db794902954dcc7575a494b8f66a153d364 Mon Sep 17 00:00:00 2001 From: Iryna Shustava Date: Mon, 30 Mar 2020 15:57:39 -0700 Subject: [PATCH] server-acl-init: Add -server-address and -server-port * Require -server-address to be provided instead of discovering the server IPs from Kubernetes pods. This allows us to eventually to run this command against external servers or servers deployed on Kube in the same way. On Kubernetes, instead of discovering Pod IPs, we can use server's stateful set DNS names. * [Breaking change] Remove -expected-replicas, -release-name, and -server-label-selector flags because we no longer need them. --- subcommand/server-acl-init/command.go | 82 +-- subcommand/server-acl-init/command_test.go | 620 +++------------------ subcommand/server-acl-init/servers.go | 119 +--- 3 files changed, 121 insertions(+), 700 deletions(-) diff --git a/subcommand/server-acl-init/command.go b/subcommand/server-acl-init/command.go index d7f669b236..72dad6a99f 100644 --- a/subcommand/server-acl-init/command.go +++ b/subcommand/server-acl-init/command.go @@ -25,10 +25,7 @@ type Command struct { flags *flag.FlagSet k8s *k8sflags.K8SFlags - flagReleaseName string - flagServerLabelSelector string flagResourcePrefix string - flagReplicas int flagK8sNamespace string flagAllowDNS bool flagCreateClientToken bool @@ -42,6 +39,8 @@ type Command struct { flagConsulCACert string flagConsulTLSServerName string flagUseHTTPS bool + flagServerHosts []string + flagServerPort uint // Flags to support namespaces flagEnableNamespaces bool // Use namespacing on all components @@ -53,7 +52,7 @@ type Command struct { flagInjectK8SNSMirroringPrefix string // Prefix added to Consul namespaces created when mirroring injected services flagLogLevel string - flagTimeout string + flagTimeout time.Duration clientset kubernetes.Interface // cmdTimeout is cancelled when the command timeout is reached. @@ -69,14 +68,11 @@ type Command struct { func (c *Command) init() { c.flags = flag.NewFlagSet("", flag.ContinueOnError) - c.flags.StringVar(&c.flagReleaseName, "release-name", "", - "Name of Consul Helm release. Deprecated: Use -server-label-selector=component=server,app=consul,release= instead") - c.flags.StringVar(&c.flagServerLabelSelector, "server-label-selector", "", - "Selector (label query) to select Consul server statefulset pods, supports '=', '==', and '!='. (e.g. -l key1=value1,key2=value2)") c.flags.StringVar(&c.flagResourcePrefix, "resource-prefix", "", "Prefix to use for Kubernetes resources. If not set, the \"-consul\" prefix is used, where is the value set by the -release-name flag.") - c.flags.IntVar(&c.flagReplicas, "expected-replicas", 1, - "Number of expected Consul server replicas") + c.flags.Var((*flags.AppendSliceValue)(&c.flagServerHosts), "server-address", + "The address of the Consul server(s), may be provided multiple times. At least one value is required.") + c.flags.UintVar(&c.flagServerPort, "server-port", 8500, "The HTTP or HTTPS port of the Consul server.") c.flags.StringVar(&c.flagK8sNamespace, "k8s-namespace", "", "Name of Kubernetes namespace where the servers are deployed") c.flags.BoolVar(&c.flagAllowDNS, "allow-dns", false, @@ -123,7 +119,7 @@ func (c *Command) init() { c.flags.StringVar(&c.flagInjectK8SNSMirroringPrefix, "inject-k8s-namespace-mirroring-prefix", "", "[Enterprise Only] Prefix that will be added to all k8s namespaces mirrored into Consul by Connect inject "+ "if mirroring is enabled.") - c.flags.StringVar(&c.flagTimeout, "timeout", "10m", + c.flags.DurationVar(&c.flagTimeout, "timeout", 10*time.Minute, "How long we'll try to bootstrap ACLs for before timing out, e.g. 1ms, 2s, 3m") c.flags.StringVar(&c.flagLogLevel, "log-level", "info", "Log verbosity level. Supported values (in order of detail) are \"trace\", "+ @@ -156,33 +152,19 @@ func (c *Command) Run(args []string) int { return 1 } if len(c.flags.Args()) > 0 { - c.UI.Error(fmt.Sprintf("Should have no non-flag arguments.")) + c.UI.Error("Should have no non-flag arguments.") return 1 } - timeout, err := time.ParseDuration(c.flagTimeout) - if err != nil { - c.UI.Error(fmt.Sprintf("%q is not a valid timeout: %s", c.flagTimeout, err)) + if len(c.flagServerHosts) == 0 { + c.UI.Error("-server-address must be set at least once") return 1 } - if c.flagReleaseName != "" && c.flagServerLabelSelector != "" { - c.UI.Error("-release-name and -server-label-selector cannot both be set") - return 1 - } - if c.flagServerLabelSelector != "" && c.flagResourcePrefix == "" { - c.UI.Error("if -server-label-selector is set -resource-prefix must also be set") - return 1 - } - if c.flagReleaseName == "" && c.flagServerLabelSelector == "" { - c.UI.Error("-release-name or -server-label-selector must be set") - return 1 - } - // If only the -release-name is set, we use it as the label selector. - if c.flagReleaseName != "" { - c.flagServerLabelSelector = fmt.Sprintf("app=consul,component=server,release=%s", c.flagReleaseName) + if c.flagResourcePrefix == "" { + c.UI.Error("-resource-prefix must be set") } var cancel context.CancelFunc - c.cmdTimeout, cancel = context.WithTimeout(context.Background(), timeout) + c.cmdTimeout, cancel = context.WithTimeout(context.Background(), c.flagTimeout) // The context will only ever be intentionally ended by the timeout. defer cancel() @@ -209,27 +191,6 @@ func (c *Command) Run(args []string) int { if c.flagUseHTTPS { scheme = "https" } - // Wait if there's a rollout of servers. - ssName := c.withPrefix("server") - err = c.untilSucceeds(fmt.Sprintf("waiting for rollout of statefulset %s", ssName), func() error { - // Note: We can't use the -server-label-selector flag to find the statefulset - // because in older versions of consul-helm it wasn't labeled with - // component: server. We also can't drop that label because it's required - // for targeting the right server Pods. - statefulset, err := c.clientset.AppsV1().StatefulSets(c.flagK8sNamespace).Get(ssName, metav1.GetOptions{}) - if err != nil { - return err - } - if statefulset.Status.CurrentRevision == statefulset.Status.UpdateRevision { - return nil - } - return fmt.Errorf("rollout is in progress (CurrentRevision=%s UpdateRevision=%s)", - statefulset.Status.CurrentRevision, statefulset.Status.UpdateRevision) - }) - if err != nil { - c.Log.Error(err.Error()) - return 1 - } // Check if we've already been bootstrapped. bootTokenSecretName := c.withPrefix("bootstrap-acl-token") @@ -258,12 +219,7 @@ func (c *Command) Run(args []string) int { } // For all of the next operations we'll need a Consul client. - serverPods, err := c.getConsulServers(1, scheme) - if err != nil { - c.Log.Error(err.Error()) - return 1 - } - serverAddr := serverPods[0].Addr + serverAddr := fmt.Sprintf("%s:%d", c.flagServerHosts[0], c.flagServerPort) consulClient, err := api.NewClient(&api.Config{ Address: serverAddr, Scheme: scheme, @@ -474,15 +430,9 @@ func (c *Command) untilSucceeds(opName string, op func() error) error { } // withPrefix returns the name of resource with the correct prefix based -// on the -release-name or -resource-prefix flags. +// on the -resource-prefix flags. func (c *Command) withPrefix(resource string) string { - if c.flagResourcePrefix != "" { - return fmt.Sprintf("%s-%s", c.flagResourcePrefix, resource) - } - // This is to support an older version of the Helm chart that only specified - // the -release-name flag. We ensure that this is set if -resource-prefix - // is not set when parsing the flags. - return fmt.Sprintf("%s-consul-%s", c.flagReleaseName, resource) + return fmt.Sprintf("%s-%s", c.flagResourcePrefix, resource) } const synopsis = "Initialize ACLs on Consul servers and other components." diff --git a/subcommand/server-acl-init/command_test.go b/subcommand/server-acl-init/command_test.go index 12f4322d79..329a35bba2 100644 --- a/subcommand/server-acl-init/command_test.go +++ b/subcommand/server-acl-init/command_test.go @@ -12,16 +12,16 @@ import ( "net/url" "os" "strconv" + "strings" "testing" "time" - "github.com/hashicorp/consul/agent" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/sdk/freeport" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/tlsutil" "github.com/mitchellh/cli" "github.com/stretchr/testify/require" - appv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" @@ -38,15 +38,11 @@ func TestRun_FlagValidation(t *testing.T) { }{ { Flags: []string{}, - ExpErr: "-release-name or -server-label-selector must be set", + ExpErr: "-server-address must be set at least once", }, { - Flags: []string{"-release-name=name", "-server-label-selector=hi"}, - ExpErr: "-release-name and -server-label-selector cannot both be set", - }, - { - Flags: []string{"-server-label-selector=hi"}, - ExpErr: "if -server-label-selector is set -resource-prefix must also be set", + Flags: []string{"-server-address=localhost"}, + ExpErr: "-resource-prefix must be set", }, } @@ -69,9 +65,7 @@ func TestRun_FlagValidation(t *testing.T) { func TestRun_Defaults(t *testing.T) { t.Parallel() for _, flags := range [][]string{ - {"-release-name=" + releaseName}, { - "-server-label-selector=component=server,app=consul,release=" + releaseName, "-resource-prefix=" + resourcePrefix, }, } { @@ -88,7 +82,8 @@ func TestRun_Defaults(t *testing.T) { } args := append([]string{ "-k8s-namespace=" + ns, - "-expected-replicas=1", + "-server-address", strings.Split(testSvr.HTTPAddr, ":")[0], + "-server-port", strings.Split(testSvr.HTTPAddr, ":")[1], }, flags...) responseCode := cmd.Run(args) require.Equal(0, responseCode, ui.ErrorWriter.String()) @@ -139,65 +134,30 @@ func TestRun_Tokens(t *testing.T) { TokenName string SecretName string }{ - "client token -release-name": { - TokenFlag: "-create-client-token", - ResourcePrefixFlag: "", - ReleaseNameFlag: "release-name", - TokenName: "client", - SecretName: "release-name-consul-client-acl-token", - }, "client token -resource-prefix": { TokenFlag: "-create-client-token", ResourcePrefixFlag: "my-prefix", TokenName: "client", SecretName: "my-prefix-client-acl-token", }, - "catalog-sync token -release-name": { - TokenFlag: "-create-sync-token", - ResourcePrefixFlag: "", - ReleaseNameFlag: "release-name", - TokenName: "catalog-sync", - SecretName: "release-name-consul-catalog-sync-acl-token", - }, "catalog-sync token -resource-prefix": { TokenFlag: "-create-sync-token", ResourcePrefixFlag: "my-prefix", TokenName: "catalog-sync", SecretName: "my-prefix-catalog-sync-acl-token", }, - "connect-inject-namespace token -release-name": { - TokenFlag: "-create-inject-namespace-token", - ResourcePrefixFlag: "", - ReleaseNameFlag: "release-name", - TokenName: "connect-inject", - SecretName: "release-name-consul-connect-inject-acl-token", - }, "connect-inject-namespace token -resource-prefix": { TokenFlag: "-create-inject-namespace-token", ResourcePrefixFlag: "my-prefix", TokenName: "connect-inject", SecretName: "my-prefix-connect-inject-acl-token", }, - "enterprise-license token -release-name": { - TokenFlag: "-create-enterprise-license-token", - ResourcePrefixFlag: "", - ReleaseNameFlag: "release-name", - TokenName: "enterprise-license", - SecretName: "release-name-consul-enterprise-license-acl-token", - }, "enterprise-license token -resource-prefix": { TokenFlag: "-create-enterprise-license-token", ResourcePrefixFlag: "my-prefix", TokenName: "enterprise-license", SecretName: "my-prefix-enterprise-license-acl-token", }, - "mesh-gateway token -release-name": { - TokenFlag: "-create-mesh-gateway-token", - ResourcePrefixFlag: "", - ReleaseNameFlag: "release-name", - TokenName: "mesh-gateway", - SecretName: "release-name-consul-mesh-gateway-acl-token", - }, "mesh-gateway token -resource-prefix": { TokenFlag: "-create-mesh-gateway-token", ResourcePrefixFlag: "my-prefix", @@ -225,17 +185,11 @@ func TestRun_Tokens(t *testing.T) { cmd.init() cmdArgs := []string{ "-k8s-namespace=" + ns, - "-expected-replicas=1", + "-server-address", strings.Split(testSvr.HTTPAddr, ":")[0], + "-server-port", strings.Split(testSvr.HTTPAddr, ":")[1], + "-resource-prefix=" + c.ResourcePrefixFlag, c.TokenFlag, } - if c.ResourcePrefixFlag != "" { - // If using the -resource-prefix flag, we expect the -server-label-selector - // flag to also be set. - labelSelector := fmt.Sprintf("release=%s,component=server,app=consul", releaseName) - cmdArgs = append(cmdArgs, "-resource-prefix="+c.ResourcePrefixFlag, "-server-label-selector="+labelSelector) - } else { - cmdArgs = append(cmdArgs, "-release-name="+c.ReleaseNameFlag) - } responseCode := cmd.Run(cmdArgs) require.Equal(0, responseCode, ui.ErrorWriter.String()) @@ -297,10 +251,10 @@ func TestRun_AllowDNS(t *testing.T) { } cmd.init() cmdArgs := []string{ - "-server-label-selector=component=server,app=consul,release=" + releaseName, "-resource-prefix=" + resourcePrefix, "-k8s-namespace=" + ns, - "-expected-replicas=1", + "-server-address", strings.Split(testSvr.HTTPAddr, ":")[0], + "-server-port", strings.Split(testSvr.HTTPAddr, ":")[1], "-allow-dns", } responseCode := cmd.Run(cmdArgs) @@ -370,10 +324,10 @@ func TestRun_ConnectInjectAuthMethod(t *testing.T) { cmd.init() bindingRuleSelector := "serviceaccount.name!=default" cmdArgs := []string{ - "-server-label-selector=component=server,app=consul,release=" + releaseName, "-resource-prefix=" + resourcePrefix, "-k8s-namespace=" + ns, - "-expected-replicas=1", + "-server-address", strings.Split(testSvr.HTTPAddr, ":")[0], + "-server-port", strings.Split(testSvr.HTTPAddr, ":")[1], "-acl-binding-rule-selector=" + bindingRuleSelector, } cmdArgs = append(cmdArgs, c.AuthMethodFlag) @@ -435,10 +389,10 @@ func TestRun_BindingRuleUpdates(t *testing.T) { ui := cli.NewMockUi() commonArgs := []string{ - "-server-label-selector=component=server,app=consul,release=" + releaseName, "-resource-prefix=" + resourcePrefix, "-k8s-namespace=" + ns, - "-expected-replicas=1", + "-server-address", strings.Split(testSvr.HTTPAddr, ":")[0], + "-server-port", strings.Split(testSvr.HTTPAddr, ":")[1], "-create-inject-auth-method", } firstRunArgs := append(commonArgs, @@ -500,40 +454,20 @@ func TestRun_BindingRuleUpdates(t *testing.T) { } } -// Test that if the server pods aren't available at first that bootstrap +// Test that if the servers aren't available at first that bootstrap // still succeeds. -func TestRun_DelayedServerPods(t *testing.T) { +func TestRun_DelayedServers(t *testing.T) { t.Parallel() require := require.New(t) k8s := fake.NewSimpleClientset() - type APICall struct { - Method string - Path string - } - var consulAPICalls []APICall - consulServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Record all the API calls made. - consulAPICalls = append(consulAPICalls, APICall{ - Method: r.Method, - Path: r.URL.Path, - }) - - // Send an empty JSON response with code 200 to all calls. - fmt.Fprintln(w, "{}") - })) - defer consulServer.Close() - serverURL, err := url.Parse(consulServer.URL) - require.NoError(err) - port, err := strconv.Atoi(serverURL.Port()) - require.NoError(err) + randomPorts := freeport.MustTake(6) ui := cli.NewMockUi() cmd := Command{ UI: ui, clientset: k8s, } - cmd.init() // Start the command before the Pod exist. // Run in a goroutine so we can create the Pods asynchronously @@ -541,66 +475,48 @@ func TestRun_DelayedServerPods(t *testing.T) { var responseCode int go func() { responseCode = cmd.Run([]string{ - "-server-label-selector=component=server,app=consul,release=" + releaseName, "-resource-prefix=" + resourcePrefix, "-k8s-namespace=" + ns, - "-expected-replicas=1", + "-server-address=127.0.0.1", + "-server-port=" + strconv.Itoa(randomPorts[1]), }) close(done) }() - // Asynchronously create the server Pod after a delay. + // Asynchronously start the test server after a delay. + testServerReady := make(chan bool) + var srv *testutil.TestServer go func() { // Create the Pods after a delay between 100 and 500ms. // It's randomized to ensure we're not relying on specific timing. delay := 100 + rand.Intn(400) time.Sleep(time.Duration(delay) * time.Millisecond) - pods := k8s.CoreV1().Pods(ns) - _, err = pods.Create(&v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: resourcePrefix + "-server-0", - Labels: map[string]string{ - "component": "server", - "app": "consul", - "release": releaseName, - }, - }, - Status: v1.PodStatus{ - PodIP: serverURL.Hostname(), - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "consul", - Ports: []v1.ContainerPort{ - { - Name: "http", - ContainerPort: int32(port), - }, - }, - }, - }, - }, - }) - require.NoError(err) - _, err = k8s.AppsV1().StatefulSets(ns).Create(&appv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: resourcePrefix + "-server", - Labels: map[string]string{ - "component": "server", - "app": "consul", - "release": releaseName, - }, - }, - Status: appv1.StatefulSetStatus{ - UpdateRevision: "current", - CurrentRevision: "current", - }, + var err error + srv, err = testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) { + c.ACL.Enabled = true + + c.Ports = &testutil.TestPortConfig{ + DNS: randomPorts[0], + HTTP: randomPorts[1], + HTTPS: randomPorts[2], + SerfLan: randomPorts[3], + SerfWan: randomPorts[4], + Server: randomPorts[5], + } }) require.NoError(err) + close(testServerReady) }() + // Wait for server to come up + select { + case <-testServerReady: + defer srv.Stop() + case <-time.After(5 * time.Second): + require.FailNow("test server took longer than 5s to come up") + } + // Wait for the command to exit. select { case <-done: @@ -610,191 +526,28 @@ func TestRun_DelayedServerPods(t *testing.T) { } // Test that the bootstrap kube secret is created. - getBootToken(t, k8s, resourcePrefix, ns) - - // Test that the expected API calls were made. - require.Equal([]APICall{ - { - "PUT", - "/v1/acl/bootstrap", - }, - { - "PUT", - "/v1/acl/policy", - }, - { - "PUT", - "/v1/acl/token", - }, - { - "PUT", - "/v1/agent/token/agent", - }, - { - "PUT", - "/v1/acl/policy", - }, - { - "PUT", - "/v1/acl/token", - }, - }, consulAPICalls) -} - -// Test that if a deployment of the statefulset is in progress we wait. -func TestRun_InProgressDeployment(t *testing.T) { - t.Parallel() - require := require.New(t) - k8s := fake.NewSimpleClientset() - - type APICall struct { - Method string - Path string - } - var consulAPICalls []APICall - consulServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Record all the API calls made. - consulAPICalls = append(consulAPICalls, APICall{ - Method: r.Method, - Path: r.URL.Path, - }) - - // Send an empty JSON response with code 200 to all calls. - fmt.Fprintln(w, "{}") - })) - defer consulServer.Close() - serverURL, err := url.Parse(consulServer.URL) - require.NoError(err) - port, err := strconv.Atoi(serverURL.Port()) - require.NoError(err) + bootToken := getBootToken(t, k8s, resourcePrefix, ns) - // The pods and statefulset are created but as an in-progress deployment - pods := k8s.CoreV1().Pods(ns) - _, err = pods.Create(&v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: resourcePrefix + "-server-0", - Labels: map[string]string{ - "component": "server", - "app": "consul", - "release": releaseName, - }, - }, - Status: v1.PodStatus{ - PodIP: serverURL.Hostname(), - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "consul", - Ports: []v1.ContainerPort{ - { - Name: "http", - ContainerPort: int32(port), - }, - }, - }, - }, - }, + // Check that it has the right policies. + consul, err := api.NewClient(&api.Config{ + Address: srv.HTTPAddr, }) require.NoError(err) - _, err = k8s.AppsV1().StatefulSets(ns).Create(&appv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: resourcePrefix + "-server", - Labels: map[string]string{ - "component": "server", - "app": "consul", - "release": releaseName, - }, - }, - Status: appv1.StatefulSetStatus{ - UpdateRevision: "updated", - CurrentRevision: "current", - }, - }) + tokenData, _, err := consul.ACL().TokenReadSelf(&api.QueryOptions{Token: bootToken}) require.NoError(err) + require.Equal("global-management", tokenData.Policies[0].Name) - ui := cli.NewMockUi() - cmd := Command{ - UI: ui, - clientset: k8s, - } - cmd.init() - - // Start the command before the Pod exist. - // Run in a goroutine so we can create the Pods asynchronously - done := make(chan bool) - var responseCode int - go func() { - responseCode = cmd.Run([]string{ - "-server-label-selector=component=server,app=consul,release=" + releaseName, - "-resource-prefix=" + resourcePrefix, - "-k8s-namespace=" + ns, - "-expected-replicas=1", - }) - close(done) - }() - - // Asynchronously update the deployment status after a delay. - go func() { - // Update after a delay between 100 and 500ms. - // It's randomized to ensure we're not relying on specific timing. - delay := 100 + rand.Intn(400) - time.Sleep(time.Duration(delay) * time.Millisecond) - _, err = k8s.AppsV1().StatefulSets(ns).Update(&appv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: resourcePrefix + "-server", - Labels: map[string]string{ - "component": "server", - "app": "consul", - "release": releaseName, - }, - }, - Status: appv1.StatefulSetStatus{ - UpdateRevision: "updated", - CurrentRevision: "updated", - }, - }) - require.NoError(err) - }() - - // Wait for the command to exit. - select { - case <-done: - require.Equal(0, responseCode, ui.ErrorWriter.String()) - case <-time.After(2 * time.Second): - require.FailNow("command did not exit after 2s") + // Check that the agent policy was created. + policies, _, err := consul.ACL().PolicyList(&api.QueryOptions{Token: bootToken}) + require.NoError(err) + found := false + for _, p := range policies { + if p.Name == "agent-token" { + found = true + break + } } - - // Test that the bootstrap kube secret is created. - getBootToken(t, k8s, resourcePrefix, ns) - - // Test that the expected API calls were made. - require.Equal([]APICall{ - { - "PUT", - "/v1/acl/bootstrap", - }, - { - "PUT", - "/v1/acl/policy", - }, - { - "PUT", - "/v1/acl/token", - }, - { - "PUT", - "/v1/agent/token/agent", - }, - { - "PUT", - "/v1/acl/policy", - }, - { - "PUT", - "/v1/acl/token", - }, - }, consulAPICalls) + require.True(found, "agent-token policy was not found") } // Test that if there's no leader, we retry until one is elected. @@ -838,51 +591,6 @@ func TestRun_NoLeader(t *testing.T) { // Create the Server Pods. serverURL, err := url.Parse(consulServer.URL) require.NoError(err) - port, err := strconv.Atoi(serverURL.Port()) - require.NoError(err) - pods := k8s.CoreV1().Pods(ns) - _, err = pods.Create(&v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: resourcePrefix + "-server-0", - Labels: map[string]string{ - "component": "server", - "app": "consul", - "release": releaseName, - }, - }, - Status: v1.PodStatus{ - PodIP: serverURL.Hostname(), - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "consul", - Ports: []v1.ContainerPort{ - { - Name: "http", - ContainerPort: int32(port), - }, - }, - }, - }, - }, - }) - require.NoError(err) - // Create Consul server Statefulset. - _, err = k8s.AppsV1().StatefulSets(ns).Create(&appv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: resourcePrefix + "-server", - Labels: map[string]string{ - "component": "server", - "app": "consul", - "release": releaseName, - }, - }, - Status: appv1.StatefulSetStatus{ - UpdateRevision: "current", - CurrentRevision: "current", - }, - }) // Run the command. ui := cli.NewMockUi() @@ -890,16 +598,15 @@ func TestRun_NoLeader(t *testing.T) { UI: ui, clientset: k8s, } - cmd.init() done := make(chan bool) var responseCode int go func() { responseCode = cmd.Run([]string{ - "-server-label-selector=component=server,app=consul,release=" + releaseName, "-resource-prefix=" + resourcePrefix, "-k8s-namespace=" + ns, - "-expected-replicas=1", + "-server-address=" + serverURL.Hostname(), + "-server-port=" + serverURL.Port(), }) close(done) }() @@ -993,52 +700,6 @@ func TestRun_ClientTokensRetry(t *testing.T) { // Create the Server Pods. serverURL, err := url.Parse(consulServer.URL) require.NoError(err) - port, err := strconv.Atoi(serverURL.Port()) - require.NoError(err) - pods := k8s.CoreV1().Pods(ns) - _, err = pods.Create(&v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: resourcePrefix + "-server-0", - Labels: map[string]string{ - "component": "server", - "app": "consul", - "release": releaseName, - }, - }, - Status: v1.PodStatus{ - PodIP: serverURL.Hostname(), - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "consul", - Ports: []v1.ContainerPort{ - { - Name: "http", - ContainerPort: int32(port), - }, - }, - }, - }, - }, - }) - require.NoError(err) - // Create the server statefulset. - _, err = k8s.AppsV1().StatefulSets(ns).Create(&appv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: resourcePrefix + "-server", - Labels: map[string]string{ - "component": "server", - "app": "consul", - "release": releaseName, - }, - }, - Status: appv1.StatefulSetStatus{ - UpdateRevision: "current", - CurrentRevision: "current", - }, - }) - require.NoError(err) // Run the command. ui := cli.NewMockUi() @@ -1046,12 +707,11 @@ func TestRun_ClientTokensRetry(t *testing.T) { UI: ui, clientset: k8s, } - cmd.init() responseCode := cmd.Run([]string{ - "-server-label-selector=component=server,app=consul,release=" + releaseName, "-resource-prefix=" + resourcePrefix, "-k8s-namespace=" + ns, - "-expected-replicas=1", + "-server-address=" + serverURL.Hostname(), + "-server-port=" + serverURL.Port(), }) require.Equal(0, responseCode, ui.ErrorWriter.String()) @@ -1120,52 +780,6 @@ func TestRun_AlreadyBootstrapped(t *testing.T) { // Create the Server Pods. serverURL, err := url.Parse(consulServer.URL) require.NoError(err) - port, err := strconv.Atoi(serverURL.Port()) - require.NoError(err) - pods := k8s.CoreV1().Pods(ns) - _, err = pods.Create(&v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: resourcePrefix + "-server-0", - Labels: map[string]string{ - "component": "server", - "app": "consul", - "release": releaseName, - }, - }, - Status: v1.PodStatus{ - PodIP: serverURL.Hostname(), - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "consul", - Ports: []v1.ContainerPort{ - { - Name: "http", - ContainerPort: int32(port), - }, - }, - }, - }, - }, - }) - require.NoError(err) - // Create the server statefulset. - _, err = k8s.AppsV1().StatefulSets(ns).Create(&appv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: resourcePrefix + "-server", - Labels: map[string]string{ - "component": "server", - "app": "consul", - "release": releaseName, - }, - }, - Status: appv1.StatefulSetStatus{ - UpdateRevision: "current", - CurrentRevision: "current", - }, - }) - require.NoError(err) // Create the bootstrap secret. _, err = k8s.CoreV1().Secrets(ns).Create(&v1.Secret{ @@ -1184,12 +798,12 @@ func TestRun_AlreadyBootstrapped(t *testing.T) { UI: ui, clientset: k8s, } - cmd.init() + responseCode := cmd.Run([]string{ - "-server-label-selector=component=server,app=consul,release=" + releaseName, "-resource-prefix=" + resourcePrefix, "-k8s-namespace=" + ns, - "-expected-replicas=1", + "-server-address=" + serverURL.Hostname(), + "-server-port=" + serverURL.Port(), }) require.Equal(0, responseCode, ui.ErrorWriter.String()) @@ -1228,19 +842,18 @@ func TestRun_Timeout(t *testing.T) { UI: ui, clientset: k8s, } - cmd.init() + responseCode := cmd.Run([]string{ - "-server-label-selector=component=server,app=consul,release=" + releaseName, "-resource-prefix=" + resourcePrefix, "-k8s-namespace=" + ns, - "-expected-replicas=1", + "-server-address=foo", "-timeout=500ms", }) require.Equal(1, responseCode, ui.ErrorWriter.String()) } // Test that the bootstrapping process can make calls to Consul API over HTTPS -// when the consul agent is configured with HTTPS only (HTTP disabled). +// when the consul agent is configured with HTTPS. func TestRun_HTTPS(t *testing.T) { t.Parallel() require := require.New(t) @@ -1249,27 +862,15 @@ func TestRun_HTTPS(t *testing.T) { caFile, certFile, keyFile, cleanup := generateServerCerts(t) defer cleanup() - agentConfig := fmt.Sprintf(` - primary_datacenter = "dc1" - acl { - enabled = true - } - ca_file = "%s" - cert_file = "%s" - key_file = "%s"`, caFile, certFile, keyFile) - - // NOTE: We can't use testutil.TestServer for this test because the HTTP - // port can't be disabled (causes a seg fault). - a := &agent.TestAgent{ - Name: t.Name(), - HCL: agentConfig, - UseTLS: true, // this also disables HTTP port - } - - a.Start() - defer a.Shutdown() + srv, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) { + c.ACL.Enabled = true - createTestK8SResources(t, k8s, a.HTTPAddr(), resourcePrefix, "https", ns) + c.CAFile = caFile + c.CertFile = certFile + c.KeyFile = keyFile + }) + require.NoError(err) + defer srv.Stop() // Run the command. ui := cli.NewMockUi() @@ -1277,15 +878,15 @@ func TestRun_HTTPS(t *testing.T) { UI: ui, clientset: k8s, } - cmd.init() + responseCode := cmd.Run([]string{ - "-server-label-selector=component=server,app=consul,release=" + releaseName, "-resource-prefix=" + resourcePrefix, "-k8s-namespace=" + ns, "-use-https", "-consul-tls-server-name", "server.dc1.consul", "-consul-ca-cert", caFile, - "-expected-replicas=1", + "-server-address=127.0.0.1", + "-server-port=" + strings.Split(srv.HTTPSAddr, ":")[1], }) require.Equal(0, responseCode, ui.ErrorWriter.String()) @@ -1307,66 +908,9 @@ func completeSetup(t *testing.T, prefix string) (*fake.Clientset, *testutil.Test }) require.NoError(t, err) - createTestK8SResources(t, k8s, svr.HTTPAddr, prefix, "http", ns) - return k8s, svr } -// Create test k8s resources (server pods and server stateful set) -func createTestK8SResources(t *testing.T, k8s *fake.Clientset, consulHTTPAddr, prefix, scheme, k8sNamespace string) { - require := require.New(t) - consulURL, err := url.Parse("http://" + consulHTTPAddr) - require.NoError(err) - port, err := strconv.Atoi(consulURL.Port()) - require.NoError(err) - - // Create Consul server Pod. - _, err = k8s.CoreV1().Pods(k8sNamespace).Create(&v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: prefix + "-server-0", - Labels: map[string]string{ - "component": "server", - "app": "consul", - "release": releaseName, - }, - }, - Status: v1.PodStatus{ - PodIP: consulURL.Hostname(), - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "consul", - Ports: []v1.ContainerPort{ - { - Name: scheme, - ContainerPort: int32(port), - }, - }, - }, - }, - }, - }) - require.NoError(err) - - // Create Consul server Statefulset. - _, err = k8s.AppsV1().StatefulSets(k8sNamespace).Create(&appv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: prefix + "-server", - Labels: map[string]string{ - "component": "server", - "app": "consul", - "release": releaseName, - }, - }, - Status: appv1.StatefulSetStatus{ - UpdateRevision: "current", - CurrentRevision: "current", - }, - }) - require.NoError(err) -} - // getBootToken gets the bootstrap token from the Kubernetes secret. It will // cause a test failure if the Secret doesn't exist or is malformed. func getBootToken(t *testing.T, k8s *fake.Clientset, prefix string, k8sNamespace string) string { diff --git a/subcommand/server-acl-init/servers.go b/subcommand/server-acl-init/servers.go index 3ae1d9fdf7..6b9d252d94 100644 --- a/subcommand/server-acl-init/servers.go +++ b/subcommand/server-acl-init/servers.go @@ -10,76 +10,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -// podAddr is a convenience struct for passing around pod names and -// addresses for Consul servers. -type podAddr struct { - // Name is the name of the pod. - Name string - // Addr is in the form ":". - Addr string -} - -// getConsulServers returns n Consul server pods with their http addresses. -// If there are less server pods than 'n' then the function will wait. -func (c *Command) getConsulServers(n int, scheme string) ([]podAddr, error) { - var serverPods *apiv1.PodList - err := c.untilSucceeds("discovering Consul server pods", - func() error { - var err error - serverPods, err = c.clientset.CoreV1().Pods(c.flagK8sNamespace).List(metav1.ListOptions{LabelSelector: c.flagServerLabelSelector}) - if err != nil { - return err - } - - if len(serverPods.Items) == 0 { - return fmt.Errorf("no server pods with labels %q found", c.flagServerLabelSelector) - } - - if len(serverPods.Items) < n { - return fmt.Errorf("found %d servers, require %d", len(serverPods.Items), n) - } - - for _, pod := range serverPods.Items { - if pod.Status.PodIP == "" { - return fmt.Errorf("pod %s has no IP", pod.Name) - } - } - return nil - }) - if err != nil { - return nil, err - } - - var podAddrs []podAddr - for _, pod := range serverPods.Items { - var httpPort int32 - for _, p := range pod.Spec.Containers[0].Ports { - if p.Name == scheme { - httpPort = p.ContainerPort - } - } - if httpPort == 0 { - return nil, fmt.Errorf("pod %s has no port labeled '%s'", pod.Name, scheme) - } - addr := fmt.Sprintf("%s:%d", pod.Status.PodIP, httpPort) - podAddrs = append(podAddrs, podAddr{ - Name: pod.Name, - Addr: addr, - }) - } - return podAddrs, nil -} - // bootstrapServers bootstraps ACLs and ensures each server has an ACL token. func (c *Command) bootstrapServers(bootTokenSecretName, scheme string) (string, error) { - serverPods, err := c.getConsulServers(c.flagReplicas, scheme) - if err != nil { - return "", err - } - c.Log.Info(fmt.Sprintf("Found %d Consul server Pods", len(serverPods))) - - // Pick the first pod to connect to for bootstrapping and set up connection. - firstServerAddr := serverPods[0].Addr + // Pick the first server address to connect to for bootstrapping and set up connection. + firstServerAddr := fmt.Sprintf("%s:%d", c.flagServerHosts[0], c.flagServerPort) consulClient, err := api.NewClient(&api.Config{ Address: firstServerAddr, Scheme: scheme, @@ -159,7 +93,7 @@ func (c *Command) bootstrapServers(bootTokenSecretName, scheme string) (string, } // Create new tokens for each server and apply them. - if err := c.setServerTokens(consulClient, serverPods, string(bootstrapToken), scheme); err != nil { + if err := c.setServerTokens(consulClient, string(bootstrapToken), scheme); err != nil { return "", err } return string(bootstrapToken), nil @@ -167,40 +101,20 @@ func (c *Command) bootstrapServers(bootTokenSecretName, scheme string) (string, // setServerTokens creates policies and associated ACL token for each server // and then provides the token to the server. -func (c *Command) setServerTokens(consulClient *api.Client, - serverPods []podAddr, bootstrapToken, scheme string) error { - +func (c *Command) setServerTokens(consulClient *api.Client, bootstrapToken, scheme string) error { agentPolicy, err := c.setServerPolicy(consulClient) if err != nil { return err } // Create agent token for each server agent. - var serverTokens []api.ACLToken - for _, pod := range serverPods { + for _, host := range c.flagServerHosts { var token *api.ACLToken - err := c.untilSucceeds(fmt.Sprintf("creating server token for %s - PUT /v1/acl/token", pod.Name), - func() error { - tokenReq := api.ACLToken{ - Description: fmt.Sprintf("Server Token for %s", pod.Name), - Policies: []*api.ACLTokenPolicyLink{{Name: agentPolicy.Name}}, - } - var err error - token, _, err = consulClient.ACL().TokenCreate(&tokenReq, nil) - return err - }) - if err != nil { - return err - } - serverTokens = append(serverTokens, *token) - } - // Pass out agent tokens to servers. - for i, pod := range serverPods { // We create a new client for each server because we need to call each // server specifically. serverClient, err := api.NewClient(&api.Config{ - Address: pod.Addr, + Address: fmt.Sprintf("%s:%d", host, c.flagServerPort), Scheme: scheme, Token: bootstrapToken, TLSConfig: api.TLSConfig{ @@ -208,21 +122,34 @@ func (c *Command) setServerTokens(consulClient *api.Client, CAFile: c.flagConsulCACert, }, }) + + // Create token for the server + err = c.untilSucceeds(fmt.Sprintf("creating server token for %s - PUT /v1/acl/token", host), + func() error { + tokenReq := api.ACLToken{ + Description: fmt.Sprintf("Server Token for %s", host), + Policies: []*api.ACLTokenPolicyLink{{Name: agentPolicy.Name}}, + } + var err error + token, _, err = serverClient.ACL().TokenCreate(&tokenReq, nil) + return err + }) if err != nil { - return fmt.Errorf(" creating Consul client for address %q: %s", pod.Addr, err) + return err } - podName := pod.Name + // Pass out agent tokens to servers. // Update token. - err = c.untilSucceeds(fmt.Sprintf("updating server token for %s - PUT /v1/agent/token/agent", podName), + err = c.untilSucceeds(fmt.Sprintf("updating server token for %s - PUT /v1/agent/token/agent", host), func() error { - _, err := serverClient.Agent().UpdateAgentACLToken(serverTokens[i].SecretID, nil) + _, err := serverClient.Agent().UpdateAgentACLToken(token.SecretID, nil) return err }) if err != nil { return err } } + return nil }