Skip to content

Commit

Permalink
Manual Backport of #2669 into 1.2.x (#2829)
Browse files Browse the repository at this point in the history
net-1776,  add job lifecycle test and changes to connhelper (#2669)

* changes to connhelper, add job lifecycle test

* yaml fixes

* move around job yaml files, update grace period times

* yaml change

* timer change

* wait for job to start when deploying

* fix file paths

* Skip Lifecycle Test on t-proxy

---------

Co-authored-by: trevorLeonHC <137422945+trevorLeonHC@users.noreply.github.com>
  • Loading branch information
Thomas Eckert and trevorLeonHC authored Aug 24, 2023
1 parent 440ff9c commit e312f36
Show file tree
Hide file tree
Showing 13 changed files with 406 additions and 10 deletions.
108 changes: 100 additions & 8 deletions acceptance/framework/connhelper/connect_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
const (
StaticClientName = "static-client"
StaticServerName = "static-server"
JobName = "job-client"
)

// ConnectHelper configures a Consul cluster for connect injection tests.
Expand Down Expand Up @@ -170,6 +171,82 @@ func (c *ConnectHelper) DeployClientAndServer(t *testing.T) {
})
}

// DeployJob deploys a job pod to the Kubernetes
// cluster which will be used to test service mesh connectivity. If the Secure
// flag is true, a pre-check is done to ensure that the ACL tokens for the test
// are deleted. The status of the deployment and injection is checked after the
// deployment is complete to ensure success.
func (c *ConnectHelper) DeployJob(t *testing.T, path string) {
// Check that the ACL token is deleted.
if c.Secure {
// We need to register the cleanup function before we create the
// deployments because golang will execute them in reverse order
// (i.e. the last registered cleanup function will be executed first).
t.Cleanup(func() {
retrier := &retry.Timer{Timeout: 30 * time.Second, Wait: 100 * time.Millisecond}
retry.RunWith(retrier, t, func(r *retry.R) {
tokens, _, err := c.ConsulClient.ACL().TokenList(nil)
require.NoError(r, err)
for _, token := range tokens {
require.NotContains(r, token.Description, JobName)
}
})
})
}

logger.Log(t, "creating job-client deployment")
k8s.DeployJob(t, c.Ctx.KubectlOptions(t), c.Cfg.NoCleanupOnFailure, c.Cfg.NoCleanup, c.Cfg.DebugDirectory, path)

// Check that job-client has been injected and
// now have 2 containers.
for _, labelSelector := range []string{"app=job-client"} {
podList, err := c.Ctx.KubernetesClient(t).CoreV1().Pods(c.Ctx.KubectlOptions(t).Namespace).List(context.Background(), metav1.ListOptions{
LabelSelector: labelSelector,
})
require.NoError(t, err)
require.Len(t, podList.Items, 1)
require.Len(t, podList.Items[0].Spec.Containers, 2)
}
}

// DeployServer deploys a server pod to the Kubernetes
// cluster which will be used to test service mesh connectivity. If the Secure
// flag is true, a pre-check is done to ensure that the ACL tokens for the test
// are deleted. The status of the deployment and injection is checked after the
// deployment is complete to ensure success.
func (c *ConnectHelper) DeployServer(t *testing.T) {
// Check that the ACL token is deleted.
if c.Secure {
// We need to register the cleanup function before we create the
// deployments because golang will execute them in reverse order
// (i.e. the last registered cleanup function will be executed first).
t.Cleanup(func() {
retrier := &retry.Timer{Timeout: 30 * time.Second, Wait: 100 * time.Millisecond}
retry.RunWith(retrier, t, func(r *retry.R) {
tokens, _, err := c.ConsulClient.ACL().TokenList(nil)
require.NoError(r, err)
for _, token := range tokens {
require.NotContains(r, token.Description, StaticServerName)
}
})
})
}

logger.Log(t, "creating static-server deployment")
k8s.DeployKustomize(t, c.Ctx.KubectlOptions(t), c.Cfg.NoCleanupOnFailure, c.Cfg.NoCleanup, c.Cfg.DebugDirectory, "../fixtures/cases/static-server-inject")

// Check that static-server has been injected and
// now have 2 containers.
for _, labelSelector := range []string{"app=static-server"} {
podList, err := c.Ctx.KubernetesClient(t).CoreV1().Pods(c.Ctx.KubectlOptions(t).Namespace).List(context.Background(), metav1.ListOptions{
LabelSelector: labelSelector,
})
require.NoError(t, err)
require.Len(t, podList.Items, 1)
require.Len(t, podList.Items[0].Spec.Containers, 2)
}
}

// SetupAppNamespace creates a namespace where applications are deployed. This
// does nothing if UseAppNamespace is not set. The app namespace is relevant
// when testing with restricted PSA enforcement enabled.
Expand Down Expand Up @@ -215,26 +292,36 @@ func (c *ConnectHelper) CreateResolverRedirect(t *testing.T) {

// TestConnectionFailureWithoutIntention ensures the connection to the static
// server fails when no intentions are configured.
func (c *ConnectHelper) TestConnectionFailureWithoutIntention(t *testing.T) {
func (c *ConnectHelper) TestConnectionFailureWithoutIntention(t *testing.T, clientType ...string) {
logger.Log(t, "checking that the connection is not successful because there's no intention")
opts := c.KubectlOptsForApp(t)
//Default to deploying static-client. If a client type is passed in (ex. job-client), use that instead.
client := StaticClientName
if len(clientType) > 0 {
client = clientType[0]
}
if c.Cfg.EnableTransparentProxy {
k8s.CheckStaticServerConnectionFailing(t, opts, StaticClientName, "http://static-server")
k8s.CheckStaticServerConnectionFailing(t, opts, client, "http://static-server")
} else {
k8s.CheckStaticServerConnectionFailing(t, opts, StaticClientName, "http://localhost:1234")
k8s.CheckStaticServerConnectionFailing(t, opts, client, "http://localhost:1234")
}
}

// CreateIntention creates an intention for the static-server pod to connect to
// the static-client pod.
func (c *ConnectHelper) CreateIntention(t *testing.T) {
func (c *ConnectHelper) CreateIntention(t *testing.T, clientType ...string) {
logger.Log(t, "creating intention")
//Default to deploying static-client. If a client type is passed in (ex. job-client), use that instead.
client := StaticClientName
if len(clientType) > 0 {
client = clientType[0]
}
_, _, err := c.ConsulClient.ConfigEntries().Set(&api.ServiceIntentionsConfigEntry{
Kind: api.ServiceIntentions,
Name: StaticServerName,
Sources: []*api.SourceIntention{
{
Name: StaticClientName,
Name: client,
Action: api.IntentionActionAllow,
},
},
Expand All @@ -244,14 +331,19 @@ func (c *ConnectHelper) CreateIntention(t *testing.T) {

// TestConnectionSuccess ensures the static-server pod can connect to the
// static-client pod once the intention is set.
func (c *ConnectHelper) TestConnectionSuccess(t *testing.T) {
func (c *ConnectHelper) TestConnectionSuccess(t *testing.T, clientType ...string) {
logger.Log(t, "checking that connection is successful")
opts := c.KubectlOptsForApp(t)
//Default to deploying static-client. If a client type is passed in (ex. job-client), use that instead.
client := StaticClientName
if len(clientType) > 0 {
client = clientType[0]
}
if c.Cfg.EnableTransparentProxy {
// todo: add an assertion that the traffic is going through the proxy
k8s.CheckStaticServerConnectionSuccessful(t, opts, StaticClientName, "http://static-server")
k8s.CheckStaticServerConnectionSuccessful(t, opts, client, "http://static-server")
} else {
k8s.CheckStaticServerConnectionSuccessful(t, opts, StaticClientName, "http://localhost:1234")
k8s.CheckStaticServerConnectionSuccessful(t, opts, client, "http://localhost:1234")
}
}

Expand Down
34 changes: 32 additions & 2 deletions acceptance/framework/k8s/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/util/yaml"
)

Expand Down Expand Up @@ -72,6 +73,32 @@ func DeployKustomize(t *testing.T, options *k8s.KubectlOptions, noCleanupOnFailu
RunKubectl(t, options, "wait", "--for=condition=available", "--timeout=5m", fmt.Sprintf("deploy/%s", deployment.Name))
}

func DeployJob(t *testing.T, options *k8s.KubectlOptions, noCleanupOnFailure bool, noCleanup bool, debugDirectory, kustomizeDir string) {
t.Helper()

KubectlApplyK(t, options, kustomizeDir)

output, err := RunKubectlAndGetOutputE(t, options, "kustomize", kustomizeDir)
require.NoError(t, err)

job := batchv1.Job{}
err = yaml.NewYAMLOrJSONDecoder(strings.NewReader(output), 1024).Decode(&job)
require.NoError(t, err)

helpers.Cleanup(t, noCleanupOnFailure, noCleanup, func() {
// Note: this delete command won't wait for pods to be fully terminated.
// This shouldn't cause any test pollution because the underlying
// objects are deployments, and so when other tests create these
// they should have different pod names.
WritePodsDebugInfoIfFailed(t, options, debugDirectory, labelMapToString(job.GetLabels()))
KubectlDeleteK(t, options, kustomizeDir)
})
logger.Log(t, "job deployed")

// Because Jobs don't have a "started" condition, we have to check the status of the Pods they create.
RunKubectl(t, options, "wait", "--for=condition=Ready", "--timeout=5m", "pods", "--selector", fmt.Sprintf("job-name=%s", job.Name))
}

// CheckStaticServerConnection execs into a pod of sourceApp
// and runs a curl command with the provided curlArgs.
// This function assumes that the connection is made to the static-server and expects the output
Expand All @@ -93,15 +120,18 @@ func CheckStaticServerConnection(t *testing.T, options *k8s.KubectlOptions, sour
// on the existence of any of them.
func CheckStaticServerConnectionMultipleFailureMessages(t *testing.T, options *k8s.KubectlOptions, sourceApp string, expectSuccess bool, failureMessages []string, expectedSuccessOutput string, curlArgs ...string) {
t.Helper()

resourceType := "deploy/"
if sourceApp == "job-client" {
resourceType = "jobs/"
}
expectedOutput := "hello world"
if expectedSuccessOutput != "" {
expectedOutput = expectedSuccessOutput
}

retrier := &retry.Timer{Timeout: 320 * time.Second, Wait: 2 * time.Second}

args := []string{"exec", "deploy/" + sourceApp, "-c", sourceApp, "--", "curl", "-vvvsSf"}
args := []string{"exec", resourceType + sourceApp, "-c", sourceApp, "--", "curl", "-vvvsSf"}
args = append(args, curlArgs...)

retry.RunWith(retrier, t, func(r *retry.R) {
Expand Down
169 changes: 169 additions & 0 deletions acceptance/tests/connect/connect_proxy_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,172 @@ func TestConnectInject_ProxyLifecycleShutdown(t *testing.T) {
})
}
}

func TestConnectInject_ProxyLifecycleShutdownJob(t *testing.T) {
cfg := suite.Config()

if cfg.EnableTransparentProxy {
// TODO t-eckert: Come back and review this with wise counsel.
t.Skip("Skipping test because transparent proxy is enabled")
}

defaultGracePeriod := 5

cases := map[string]int{
"../fixtures/cases/jobs/job-client-inject": defaultGracePeriod,
"../fixtures/cases/jobs/job-client-inject-grace-period-0s": 0,
"../fixtures/cases/jobs/job-client-inject-grace-period-10s": 10,
}

// Set up the installation and static-server once.
ctx := suite.Environment().DefaultContext(t)
releaseName := helpers.RandomName()

connHelper := connhelper.ConnectHelper{
ClusterKind: consul.Helm,
ReleaseName: releaseName,
Ctx: ctx,
Cfg: cfg,
HelmValues: map[string]string{
"connectInject.sidecarProxy.lifecycle.defaultShutdownGracePeriodSeconds": strconv.FormatInt(int64(defaultGracePeriod), 10),
"connectInject.sidecarProxy.lifecycle.defaultEnabled": strconv.FormatBool(true),
},
}

connHelper.Setup(t)
connHelper.Install(t)
connHelper.DeployServer(t)

logger.Log(t, "waiting for static-server to be registered with Consul")
retry.RunWith(&retry.Timer{Timeout: 3 * time.Minute, Wait: 5 * time.Second}, t, func(r *retry.R) {
for _, name := range []string{
"static-server",
"static-server-sidecar-proxy",
} {
logger.Logf(t, "checking for %s service in Consul catalog", name)
instances, _, err := connHelper.ConsulClient.Catalog().Service(name, "", nil)
r.Check(err)

if len(instances) != 1 {
r.Errorf("expected 1 instance of %s", name)

}
}
})

// Iterate over the Job cases and test connection.
for path, gracePeriod := range cases {
connHelper.DeployJob(t, path) // Default case.

logger.Log(t, "waiting for job-client to be registered with Consul")
retry.RunWith(&retry.Timer{Timeout: 300 * time.Second, Wait: 5 * time.Second}, t, func(r *retry.R) {
for _, name := range []string{
"job-client",
"job-client-sidecar-proxy",
} {
logger.Logf(t, "checking for %s service in Consul catalog", name)
instances, _, err := connHelper.ConsulClient.Catalog().Service(name, "", nil)
r.Check(err)

if len(instances) != 1 {
r.Errorf("expected 1 instance of %s", name)
}
}
})

connHelper.TestConnectionSuccess(t, connhelper.JobName)

// Get job-client pod name
ns := ctx.KubectlOptions(t).Namespace
pods, err := ctx.KubernetesClient(t).CoreV1().Pods(ns).List(
context.Background(),
metav1.ListOptions{
LabelSelector: "app=job-client",
},
)
require.NoError(t, err)
require.Len(t, pods.Items, 1)
jobName := pods.Items[0].Name

// Exec into job and send shutdown request to running proxy.
// curl --max-time 2 -s -f -XPOST http://127.0.0.1:20600/graceful_shutdown
sendProxyShutdownArgs := []string{"exec", jobName, "-c", connhelper.JobName, "--", "curl", "--max-time", "2", "-s", "-f", "-XPOST", "http://127.0.0.1:20600/graceful_shutdown"}
_, err = k8s.RunKubectlAndGetOutputE(t, ctx.KubectlOptions(t), sendProxyShutdownArgs...)
require.NoError(t, err)

logger.Log(t, "Proxy killed...")

args := []string{"exec", jobName, "-c", connhelper.JobName, "--", "curl", "-vvvsSf"}
if cfg.EnableTransparentProxy {
args = append(args, "http://static-server")
} else {
args = append(args, "http://localhost:1234")
}

if gracePeriod > 0 {
logger.Log(t, "Checking if connection successful within grace period...")
retry.RunWith(&retry.Timer{Timeout: time.Duration(gracePeriod) * time.Second, Wait: 2 * time.Second}, t, func(r *retry.R) {
output, err := k8s.RunKubectlAndGetOutputE(t, ctx.KubectlOptions(t), args...)
require.NoError(r, err)
require.True(r, !strings.Contains(output, "curl: (7) Failed to connect"))
})
//wait for the grace period to end after successful request
time.Sleep(time.Duration(gracePeriod) * time.Second)
}

// Test that requests fail once grace period has ended, or there was no grace period set.
logger.Log(t, "Checking that requests fail now that proxy is killed...")
retry.RunWith(&retry.Timer{Timeout: 2 * time.Minute, Wait: 2 * time.Second}, t, func(r *retry.R) {
output, err := k8s.RunKubectlAndGetOutputE(t, ctx.KubectlOptions(t), args...)
require.Error(r, err)
require.True(r, strings.Contains(output, "curl: (7) Failed to connect"))
})

// Wait for the job to complete.
retry.RunWith(&retry.Timer{Timeout: 4 * time.Minute, Wait: 30 * time.Second}, t, func(r *retry.R) {
logger.Log(t, "Checking if job completed...")
jobs, err := ctx.KubernetesClient(t).BatchV1().Jobs(ns).List(
context.Background(),
metav1.ListOptions{
LabelSelector: "app=job-client",
},
)
require.NoError(r, err)
require.True(r, jobs.Items[0].Status.Succeeded == 1)
})

// Delete the job and its associated Pod.
pods, err = ctx.KubernetesClient(t).CoreV1().Pods(ns).List(
context.Background(),
metav1.ListOptions{
LabelSelector: "app=job-client",
},
)
require.NoError(t, err)
podName := pods.Items[0].Name

err = ctx.KubernetesClient(t).BatchV1().Jobs(ns).Delete(context.Background(), "job-client", metav1.DeleteOptions{})
require.NoError(t, err)

err = ctx.KubernetesClient(t).CoreV1().Pods(ns).Delete(context.Background(), podName, metav1.DeleteOptions{})
require.NoError(t, err)

logger.Log(t, "ensuring job is deregistered after termination")
retry.RunWith(&retry.Timer{Timeout: 4 * time.Minute, Wait: 30 * time.Second}, t, func(r *retry.R) {
for _, name := range []string{
"job-client",
"job-client-sidecar-proxy",
} {
logger.Logf(t, "checking for %s service in Consul catalog", name)
instances, _, err := connHelper.ConsulClient.Catalog().Service(name, "", nil)
r.Check(err)

for _, instance := range instances {
if strings.Contains(instance.ServiceID, jobName) {
r.Errorf("%s is still registered", instance.ServiceID)
}
}
}
})
}
}
Loading

0 comments on commit e312f36

Please sign in to comment.