Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tests/functional] - Add functional test case covering SCK-SOCK migration #1024

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 163 additions & 15 deletions functional_tests/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path/filepath"
"regexp"
"runtime"
"strconv"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -79,7 +80,8 @@ const (

var globalSinks *sinks

var setupRun = sync.Once{}
var setupRunConsumers = sync.Once{}
var setupRunChart = sync.Once{}

type sinks struct {
logsConsumer *consumertest.LogsSink
Expand All @@ -90,8 +92,8 @@ type sinks struct {
tracesConsumer *consumertest.TracesSink
}

func setupOnce(t *testing.T) *sinks {
setupRun.Do(func() {
func setupOnceConsumers(t *testing.T) *sinks {
setupRunConsumers.Do(func() {
// create an API server
internal.CreateApiServer(t, apiPort)
// set ingest pipelines
Expand All @@ -104,17 +106,22 @@ func setupOnce(t *testing.T) *sinks {
k8sclusterReceiverMetricsConsumer: setupSignalfxReceiver(t, signalFxReceiverK8sClusterReceiverPort),
tracesConsumer: setupTraces(t),
}
})
return globalSinks
}

func setupOnceChart(t *testing.T) {
setupRunChart.Do(func() {
// deploy the chart and applications.
if os.Getenv("SKIP_SETUP") == "true" {
t.Log("Skipping setup as SKIP_SETUP is set to true")
return
}
deployChartsAndApps(t)
})

return globalSinks
}
func deployChartsAndApps(t *testing.T) {

func deployHelm(t *testing.T) (*kubernetes.Clientset, func()) {
kubeConfig, err := clientcmd.BuildConfigFromFlags("", testKubeConfig)
require.NoError(t, err)
clientset, err := kubernetes.NewForConfig(kubeConfig)
Expand Down Expand Up @@ -172,6 +179,18 @@ func deployChartsAndApps(t *testing.T) {

waitForAllDeploymentsToStart(t, clientset)

return clientset, func() {
// perform cleanup
uninstall := action.NewUninstall(actionConfig)
uninstall.IgnoreNotFound = true
uninstall.Wait = true
_, _ = uninstall.Run("sock")
}
}

func deployChartsAndApps(t *testing.T) {
clientset, cleanup := deployHelm(t)

deployments := clientset.AppsV1().Deployments("default")

decode := scheme.Codecs.UniversalDeserializer().Decode
Expand Down Expand Up @@ -249,15 +268,98 @@ func deployChartsAndApps(t *testing.T) {
GracePeriodSeconds: &waitTime,
})
}

cleanup()
})
}

func deployChartsAndAppsMigrationSCK(t *testing.T) func() {
kubeConfig, err := clientcmd.BuildConfigFromFlags("", testKubeConfig)
require.NoError(t, err)
clientset, err := kubernetes.NewForConfig(kubeConfig)
require.NoError(t, err)

chartPath := "/tmp/splunk-connect-for-kubernetes/helm-chart/splunk-connect-for-kubernetes/"
chart, err := loader.Load(chartPath)
require.NoError(t, err)
valuesBytes, err := os.ReadFile(filepath.Join("testdata", "test_values_sck.yaml.tmpl"))
require.NoError(t, err)
replacements := struct {
LogHecHost string
LogHecPort string
}{
hostEndpoint(t),
fmt.Sprintf("%d", hecReceiverPort),
}
tmpl, err := template.New("").Parse(string(valuesBytes))
require.NoError(t, err)
var buf bytes.Buffer
err = tmpl.Execute(&buf, replacements)
require.NoError(t, err)
var values map[string]interface{}
err = yaml.Unmarshal(buf.Bytes(), &values)
require.NoError(t, err)

actionConfig := new(action.Configuration)
if err := actionConfig.Init(kube.GetConfig(testKubeConfig, "", "default"), "default", os.Getenv("HELM_DRIVER"), func(format string, v ...interface{}) {
t.Logf(format+"\n", v)
}); err != nil {
require.NoError(t, err)
}
install := action.NewInstall(actionConfig)
install.Namespace = "default"
install.ReleaseName = "sck"
_, err = install.Run(chart, values)
if err != nil {
t.Logf("error reported during helm install: %v\n", err)
retryUpgrade := action.NewUpgrade(actionConfig)
retryUpgrade.Namespace = "default"
retryUpgrade.Install = true
_, err = retryUpgrade.Run("sck", chart, values)
require.NoError(t, err)
}

waitForAllDeploymentsToStart(t, clientset)

sckJob, err := os.ReadFile(filepath.Join("testdata", "sck_log_job.yaml"))
require.NoError(t, err)
resourceYAML := string(sckJob)
decode := scheme.Codecs.UniversalDeserializer().Decode
obj, _, err := decode(
[]byte(resourceYAML),
nil,
nil,
)
require.NoError(t, err)
job := obj.(*batchv1.Job)
jobClient := clientset.BatchV1().Jobs(job.Namespace)
_, err = jobClient.Create(context.Background(), job, metav1.CreateOptions{})
require.NoError(t, err)
t.Logf("Deployed job %s", job.Name)

waitForAllDeploymentsToStart(t, clientset)

return func() {
// cleanup
if os.Getenv("SKIP_TEARDOWN") == "true" {
t.Log("Skipping teardown as SKIP_TEARDOWN is set to true")
return
}
waitTime := int64(0)
jobClient := clientset.BatchV1().Jobs(job.Namespace)
_ = jobClient.Delete(context.Background(), job.Name, metav1.DeleteOptions{
GracePeriodSeconds: &waitTime,
})
uninstall := action.NewUninstall(actionConfig)
uninstall.IgnoreNotFound = true
uninstall.Wait = true
_, _ = uninstall.Run("sock")
})
_, _ = uninstall.Run("sck")
}
}

func Test_Functions(t *testing.T) {
_ = setupOnce(t)
_ = setupOnceConsumers(t)
setupOnceChart(t)
t.Run("node.js traces captured", testNodeJSTraces)
t.Run("kubernetes cluster metrics", testK8sClusterReceiverMetrics)
t.Run("agent logs", testAgentLogs)
Expand All @@ -266,8 +368,29 @@ func Test_Functions(t *testing.T) {
t.Run("test agent metrics", testAgentMetrics)
}

func Test_Migration(t *testing.T) {
cleanup := deployChartsAndAppsMigrationSCK(t)

sckLogs := getMigrationLogs(t)
assert.True(t, len(sckLogs) >= 5)
cleanup()

// deploye sck-otel chart again
_, cleanupSock := deployHelm(t)
sockLogs := getMigrationLogs(t)
assert.True(t, len(sockLogs) >= 5)
cleanupSock()

pr := sockLogs[0]

for i := 1; i < len(sockLogs); i++ {
assert.True(t, sockLogs[i] >= pr) // current entry should be greater than previous.
pr = sockLogs[i]
}
}

func testNodeJSTraces(t *testing.T) {
tracesConsumer := setupOnce(t).tracesConsumer
tracesConsumer := setupOnceConsumers(t).tracesConsumer

var expectedTraces ptrace.Traces
expectedTracesFile := filepath.Join("testdata", "expected_traces.yaml")
Expand Down Expand Up @@ -418,7 +541,7 @@ func shortenNames(value string) string {
}

func testK8sClusterReceiverMetrics(t *testing.T) {
metricsConsumer := setupOnce(t).k8sclusterReceiverMetricsConsumer
metricsConsumer := setupOnceConsumers(t).k8sclusterReceiverMetricsConsumer
expectedMetricsFile := filepath.Join("testdata", "expected_cluster_receiver.yaml")
expectedMetrics, err := golden.ReadMetrics(expectedMetricsFile)
require.NoError(t, err)
Expand Down Expand Up @@ -496,7 +619,7 @@ func testK8sClusterReceiverMetrics(t *testing.T) {
}

func testAgentLogs(t *testing.T) {
logsConsumer := setupOnce(t).logsConsumer
logsConsumer := setupOnceConsumers(t).logsConsumer
waitForLogs(t, 5, logsConsumer)

var helloWorldResource pcommon.Resource
Expand Down Expand Up @@ -567,7 +690,7 @@ func testAgentLogs(t *testing.T) {
}

func testK8sObjects(t *testing.T) {
logsObjectsConsumer := setupOnce(t).logsObjectsConsumer
logsObjectsConsumer := setupOnceConsumers(t).logsObjectsConsumer
waitForLogs(t, 5, logsObjectsConsumer)

var kinds []string
Expand Down Expand Up @@ -604,7 +727,7 @@ func testK8sObjects(t *testing.T) {
}

func testAgentMetrics(t *testing.T) {
agentMetricsConsumer := setupOnce(t).agentMetricsConsumer
agentMetricsConsumer := setupOnceConsumers(t).agentMetricsConsumer

metricNames := []string{
"container.filesystem.available",
Expand Down Expand Up @@ -655,7 +778,7 @@ func testAgentMetrics(t *testing.T) {
}

func testHECMetrics(t *testing.T) {
hecMetricsConsumer := setupOnce(t).hecMetricsConsumer
hecMetricsConsumer := setupOnceConsumers(t).hecMetricsConsumer

metricNames := []string{
"container.cpu.time",
Expand Down Expand Up @@ -740,6 +863,31 @@ func testHECMetrics(t *testing.T) {
checkMetricsAreEmitted(t, hecMetricsConsumer, metricNames)
}

func getMigrationLogs(t *testing.T) []int {
logsConsumer := setupOnceConsumers(t).logsConsumer
waitForLogs(t, 5, logsConsumer)

migrationLogs := make([]int, 0)
for i := 0; i < len(logsConsumer.AllLogs()); i++ {
l := logsConsumer.AllLogs()[i]
for j := 0; j < l.ResourceLogs().Len(); j++ {
rl := l.ResourceLogs().At(j)
if value, _ := rl.Resource().Attributes().Get("com.splunk.sourcetype"); value.AsString() == "sourcetype-sck-m-anno" {
for k := 0; k < rl.ScopeLogs().Len(); k++ {
sl := rl.ScopeLogs().At(k)
for m := 0; m < sl.LogRecords().Len(); m++ {
logRecord := sl.LogRecords().At(m)
val, err := strconv.Atoi(logRecord.Body().AsString())
require.NoError(t, err)
migrationLogs = append(migrationLogs, val)
}
}
}
}
}
return migrationLogs
}

func waitForAllDeploymentsToStart(t *testing.T, clientset *kubernetes.Clientset) {
require.Eventually(t, func() bool {
di, err := clientset.AppsV1().Deployments("default").List(context.Background(), metav1.ListOptions{})
Expand Down
23 changes: 23 additions & 0 deletions functional_tests/testdata/sck_log_job.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
apiVersion: batch/v1
kind: Job
metadata:
name: pod-sck-index-sck-migration
namespace: default
spec:
parallelism: 1
template:
metadata:
labels:
app: pod-sck-index-sck-migration
annotations:
splunk.com/index: "pod-sck-migration"
splunk.com/sourcetype: "sourcetype-sck-m-anno"
spec:
restartPolicy: Never
containers:
- name: pod-sck-index-sck-migration
image: registry.access.redhat.com/ubi9/ubi
imagePullPolicy: IfNotPresent
securityContext:
runAsUser: 0
command: ['sh','-c', 'for i in {1..1000}; do echo "${i}"; sleep 5; done']
23 changes: 23 additions & 0 deletions functional_tests/testdata/test_values_sck.yaml.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
---
global:
logLevel: info
splunk:
hec:
protocol: http
host: {{ .LogHecHost }}
token: foobar
port: {{ .LogHecPort }}

kubernetes:
clusterName: "dev_sck"

splunk-kubernetes-logging:
enabled: true
containers:
logFormatType: cri

splunk-kubernetes-metrics:
enabled: false

splunk-kubernetes-objects:
enabled: false
Loading