diff --git a/functional_tests/functional_test.go b/functional_tests/functional_test.go index 2cdde4d140..294343f55e 100644 --- a/functional_tests/functional_test.go +++ b/functional_tests/functional_test.go @@ -11,6 +11,7 @@ import ( "path/filepath" "regexp" "runtime" + "strconv" "strings" "sync" "testing" @@ -79,7 +80,8 @@ const ( var globalSinks *sinks -var setupRun = sync.Once{} +var setupRunConsumers = sync.Once{} +var setupRunChart = sync.Once{} type sinks struct { logsConsumer *consumertest.LogsSink @@ -90,8 +92,8 @@ type sinks struct { tracesConsumer *consumertest.TracesSink } -func setupOnce(t *testing.T) *sinks { - setupRun.Do(func() { +func setupOnceConsumers(t *testing.T) *sinks { + setupRunConsumers.Do(func() { // create an API server internal.CreateApiServer(t, apiPort) // set ingest pipelines @@ -104,6 +106,12 @@ func setupOnce(t *testing.T) *sinks { k8sclusterReceiverMetricsConsumer: setupSignalfxReceiver(t, signalFxReceiverK8sClusterReceiverPort), tracesConsumer: setupTraces(t), } + }) + return globalSinks +} + +func setupOnceChart(t *testing.T) { + setupRunChart.Do(func() { // deploy the chart and applications. if os.Getenv("SKIP_SETUP") == "true" { t.Log("Skipping setup as SKIP_SETUP is set to true") @@ -111,10 +119,9 @@ func setupOnce(t *testing.T) *sinks { } deployChartsAndApps(t) }) - - return globalSinks } -func deployChartsAndApps(t *testing.T) { + +func deployHelm(t *testing.T) (*kubernetes.Clientset, func()) { kubeConfig, err := clientcmd.BuildConfigFromFlags("", testKubeConfig) require.NoError(t, err) clientset, err := kubernetes.NewForConfig(kubeConfig) @@ -172,6 +179,18 @@ func deployChartsAndApps(t *testing.T) { waitForAllDeploymentsToStart(t, clientset) + return clientset, func() { + // perform cleanup + uninstall := action.NewUninstall(actionConfig) + uninstall.IgnoreNotFound = true + uninstall.Wait = true + _, _ = uninstall.Run("sock") + } +} + +func deployChartsAndApps(t *testing.T) { + clientset, cleanup := deployHelm(t) + deployments := clientset.AppsV1().Deployments("default") decode := scheme.Codecs.UniversalDeserializer().Decode @@ -249,15 +268,98 @@ func deployChartsAndApps(t *testing.T) { GracePeriodSeconds: &waitTime, }) } + + cleanup() + }) +} + +func deployChartsAndAppsMigrationSCK(t *testing.T) func() { + kubeConfig, err := clientcmd.BuildConfigFromFlags("", testKubeConfig) + require.NoError(t, err) + clientset, err := kubernetes.NewForConfig(kubeConfig) + require.NoError(t, err) + + chartPath := "/tmp/splunk-connect-for-kubernetes/helm-chart/splunk-connect-for-kubernetes/" + chart, err := loader.Load(chartPath) + require.NoError(t, err) + valuesBytes, err := os.ReadFile(filepath.Join("testdata", "test_values_sck.yaml.tmpl")) + require.NoError(t, err) + replacements := struct { + LogHecHost string + LogHecPort string + }{ + hostEndpoint(t), + fmt.Sprintf("%d", hecReceiverPort), + } + tmpl, err := template.New("").Parse(string(valuesBytes)) + require.NoError(t, err) + var buf bytes.Buffer + err = tmpl.Execute(&buf, replacements) + require.NoError(t, err) + var values map[string]interface{} + err = yaml.Unmarshal(buf.Bytes(), &values) + require.NoError(t, err) + + actionConfig := new(action.Configuration) + if err := actionConfig.Init(kube.GetConfig(testKubeConfig, "", "default"), "default", os.Getenv("HELM_DRIVER"), func(format string, v ...interface{}) { + t.Logf(format+"\n", v) + }); err != nil { + require.NoError(t, err) + } + install := action.NewInstall(actionConfig) + install.Namespace = "default" + install.ReleaseName = "sck" + _, err = install.Run(chart, values) + if err != nil { + t.Logf("error reported during helm install: %v\n", err) + retryUpgrade := action.NewUpgrade(actionConfig) + retryUpgrade.Namespace = "default" + retryUpgrade.Install = true + _, err = retryUpgrade.Run("sck", chart, values) + require.NoError(t, err) + } + + waitForAllDeploymentsToStart(t, clientset) + + sckJob, err := os.ReadFile(filepath.Join("testdata", "sck_log_job.yaml")) + require.NoError(t, err) + resourceYAML := string(sckJob) + decode := scheme.Codecs.UniversalDeserializer().Decode + obj, _, err := decode( + []byte(resourceYAML), + nil, + nil, + ) + require.NoError(t, err) + job := obj.(*batchv1.Job) + jobClient := clientset.BatchV1().Jobs(job.Namespace) + _, err = jobClient.Create(context.Background(), job, metav1.CreateOptions{}) + require.NoError(t, err) + t.Logf("Deployed job %s", job.Name) + + waitForAllDeploymentsToStart(t, clientset) + + return func() { + // cleanup + if os.Getenv("SKIP_TEARDOWN") == "true" { + t.Log("Skipping teardown as SKIP_TEARDOWN is set to true") + return + } + waitTime := int64(0) + jobClient := clientset.BatchV1().Jobs(job.Namespace) + _ = jobClient.Delete(context.Background(), job.Name, metav1.DeleteOptions{ + GracePeriodSeconds: &waitTime, + }) uninstall := action.NewUninstall(actionConfig) uninstall.IgnoreNotFound = true uninstall.Wait = true - _, _ = uninstall.Run("sock") - }) + _, _ = uninstall.Run("sck") + } } func Test_Functions(t *testing.T) { - _ = setupOnce(t) + _ = setupOnceConsumers(t) + setupOnceChart(t) t.Run("node.js traces captured", testNodeJSTraces) t.Run("kubernetes cluster metrics", testK8sClusterReceiverMetrics) t.Run("agent logs", testAgentLogs) @@ -266,8 +368,29 @@ func Test_Functions(t *testing.T) { t.Run("test agent metrics", testAgentMetrics) } +func Test_Migration(t *testing.T) { + cleanup := deployChartsAndAppsMigrationSCK(t) + + sckLogs := getMigrationLogs(t) + assert.True(t, len(sckLogs) >= 5) + cleanup() + + // deploye sck-otel chart again + _, cleanupSock := deployHelm(t) + sockLogs := getMigrationLogs(t) + assert.True(t, len(sockLogs) >= 5) + cleanupSock() + + pr := sockLogs[0] + + for i := 1; i < len(sockLogs); i++ { + assert.True(t, sockLogs[i] >= pr) // current entry should be greater than previous. + pr = sockLogs[i] + } +} + func testNodeJSTraces(t *testing.T) { - tracesConsumer := setupOnce(t).tracesConsumer + tracesConsumer := setupOnceConsumers(t).tracesConsumer var expectedTraces ptrace.Traces expectedTracesFile := filepath.Join("testdata", "expected_traces.yaml") @@ -418,7 +541,7 @@ func shortenNames(value string) string { } func testK8sClusterReceiverMetrics(t *testing.T) { - metricsConsumer := setupOnce(t).k8sclusterReceiverMetricsConsumer + metricsConsumer := setupOnceConsumers(t).k8sclusterReceiverMetricsConsumer expectedMetricsFile := filepath.Join("testdata", "expected_cluster_receiver.yaml") expectedMetrics, err := golden.ReadMetrics(expectedMetricsFile) require.NoError(t, err) @@ -496,7 +619,7 @@ func testK8sClusterReceiverMetrics(t *testing.T) { } func testAgentLogs(t *testing.T) { - logsConsumer := setupOnce(t).logsConsumer + logsConsumer := setupOnceConsumers(t).logsConsumer waitForLogs(t, 5, logsConsumer) var helloWorldResource pcommon.Resource @@ -567,7 +690,7 @@ func testAgentLogs(t *testing.T) { } func testK8sObjects(t *testing.T) { - logsObjectsConsumer := setupOnce(t).logsObjectsConsumer + logsObjectsConsumer := setupOnceConsumers(t).logsObjectsConsumer waitForLogs(t, 5, logsObjectsConsumer) var kinds []string @@ -604,7 +727,7 @@ func testK8sObjects(t *testing.T) { } func testAgentMetrics(t *testing.T) { - agentMetricsConsumer := setupOnce(t).agentMetricsConsumer + agentMetricsConsumer := setupOnceConsumers(t).agentMetricsConsumer metricNames := []string{ "container.filesystem.available", @@ -655,7 +778,7 @@ func testAgentMetrics(t *testing.T) { } func testHECMetrics(t *testing.T) { - hecMetricsConsumer := setupOnce(t).hecMetricsConsumer + hecMetricsConsumer := setupOnceConsumers(t).hecMetricsConsumer metricNames := []string{ "container.cpu.time", @@ -740,6 +863,31 @@ func testHECMetrics(t *testing.T) { checkMetricsAreEmitted(t, hecMetricsConsumer, metricNames) } +func getMigrationLogs(t *testing.T) []int { + logsConsumer := setupOnceConsumers(t).logsConsumer + waitForLogs(t, 5, logsConsumer) + + migrationLogs := make([]int, 0) + for i := 0; i < len(logsConsumer.AllLogs()); i++ { + l := logsConsumer.AllLogs()[i] + for j := 0; j < l.ResourceLogs().Len(); j++ { + rl := l.ResourceLogs().At(j) + if value, _ := rl.Resource().Attributes().Get("com.splunk.sourcetype"); value.AsString() == "sourcetype-sck-m-anno" { + for k := 0; k < rl.ScopeLogs().Len(); k++ { + sl := rl.ScopeLogs().At(k) + for m := 0; m < sl.LogRecords().Len(); m++ { + logRecord := sl.LogRecords().At(m) + val, err := strconv.Atoi(logRecord.Body().AsString()) + require.NoError(t, err) + migrationLogs = append(migrationLogs, val) + } + } + } + } + } + return migrationLogs +} + func waitForAllDeploymentsToStart(t *testing.T, clientset *kubernetes.Clientset) { require.Eventually(t, func() bool { di, err := clientset.AppsV1().Deployments("default").List(context.Background(), metav1.ListOptions{}) diff --git a/functional_tests/testdata/sck_log_job.yaml b/functional_tests/testdata/sck_log_job.yaml new file mode 100644 index 0000000000..ccfbb4d845 --- /dev/null +++ b/functional_tests/testdata/sck_log_job.yaml @@ -0,0 +1,23 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: pod-sck-index-sck-migration + namespace: default +spec: + parallelism: 1 + template: + metadata: + labels: + app: pod-sck-index-sck-migration + annotations: + splunk.com/index: "pod-sck-migration" + splunk.com/sourcetype: "sourcetype-sck-m-anno" + spec: + restartPolicy: Never + containers: + - name: pod-sck-index-sck-migration + image: registry.access.redhat.com/ubi9/ubi + imagePullPolicy: IfNotPresent + securityContext: + runAsUser: 0 + command: ['sh','-c', 'for i in {1..1000}; do echo "${i}"; sleep 5; done'] \ No newline at end of file diff --git a/functional_tests/testdata/test_values_sck.yaml.tmpl b/functional_tests/testdata/test_values_sck.yaml.tmpl new file mode 100644 index 0000000000..beeb0e7185 --- /dev/null +++ b/functional_tests/testdata/test_values_sck.yaml.tmpl @@ -0,0 +1,23 @@ +--- +global: + logLevel: info + splunk: + hec: + protocol: http + host: {{ .LogHecHost }} + token: foobar + port: {{ .LogHecPort }} + + kubernetes: + clusterName: "dev_sck" + +splunk-kubernetes-logging: + enabled: true + containers: + logFormatType: cri + +splunk-kubernetes-metrics: + enabled: false + +splunk-kubernetes-objects: + enabled: false