Skip to content

Commit

Permalink
Migrate more system tests to Go (#4469)
Browse files Browse the repository at this point in the history
* systemtest: migrate profiling tests to Go

* systemtest: migrate remaining instrumentation

* systemtest: migrate another sampling-related test

* systemtest: migrate some API Key command tests

* tests/system/config: drop instrumentation bits
  • Loading branch information
axw authored Dec 15, 2020
1 parent 750ea89 commit 1fd28d8
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 524 deletions.
20 changes: 20 additions & 0 deletions systemtest/apikeycmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ import (

func apiKeyCommand(subcommand string, args ...string) *apmservertest.ServerCmd {
cfg := apmservertest.DefaultConfig()
return apiKeyCommandConfig(cfg, subcommand, args...)
}

func apiKeyCommandConfig(cfg apmservertest.Config, subcommand string, args ...string) *apmservertest.ServerCmd {
cfgargs, err := cfg.Args()
if err != nil {
panic(err)
Expand Down Expand Up @@ -86,6 +90,22 @@ func TestAPIKeyCreateExpiration(t *testing.T) {
assert.Contains(t, attrs, "expiration")
}

func TestAPIKeyCreateInvalidUser(t *testing.T) {
// heartbeat_user lacks cluster privileges, and cannot create keys
// beats_user has cluster privileges, but not APM application privileges
for _, username := range []string{"heartbeat_user", "beats_user"} {
cfg := apmservertest.DefaultConfig()
cfg.Output.Elasticsearch.Username = username
cfg.Output.Elasticsearch.Password = "changeme"

cmd := apiKeyCommandConfig(cfg, "create", "--name", t.Name(), "--json")
out, err := cmd.CombinedOutput()
require.Error(t, err)
attrs := decodeJSONMap(t, bytes.NewReader(out))
assert.Regexp(t, username+` is missing the following requested privilege\(s\): .*`, attrs["error"])
}
}

func TestAPIKeyInvalidateName(t *testing.T) {
systemtest.InvalidateAPIKeys(t)
defer systemtest.InvalidateAPIKeys(t)
Expand Down
101 changes: 80 additions & 21 deletions systemtest/apmservertest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Config struct {
Sampling *SamplingConfig `json:"apm-server.sampling,omitempty"`
RUM *RUMConfig `json:"apm-server.rum,omitempty"`
DataStreams *DataStreamsConfig `json:"apm-server.data_streams,omitempty"`
APIKey *APIKeyConfig `json:"apm-server.api_key,omitempty"`

// ResponseHeaders holds headers to add to all APM Server HTTP responses.
ResponseHeaders http.Header `json:"apm-server.response_headers,omitempty"`
Expand Down Expand Up @@ -125,12 +126,12 @@ func (t *TailSamplingConfig) MarshalJSON() ([]byte, error) {
// Convert time.Durations to durations, to encode as duration strings.
type config struct {
Enabled bool `json:"enabled"`
Interval duration `json:"interval"`
Interval string `json:"interval"`
Policies []TailSamplingPolicy `json:"policies,omitempty"`
}
return json.Marshal(config{
Enabled: t.Enabled,
Interval: duration(t.Interval),
Interval: durationString(t.Interval),
Policies: t.Policies,
})
}
Expand All @@ -157,9 +158,66 @@ type DataStreamsConfig struct {
Enabled bool `json:"enabled"`
}

// APIKeyConfig holds APM Server API Key auth configuration.
type APIKeyConfig struct {
Enabled bool `json:"enabled"`
}

// InstrumentationConfig holds APM Server instrumentation configuration.
type InstrumentationConfig struct {
Enabled bool `json:"enabled"`
Enabled bool `json:"enabled"`
Profiling *ProfilingConfig `json:"profiling,omitempty"`

Hosts []string `json:"hosts,omitempty"`
APIKey string `json:"api_key,omitempty"`
SecretToken string `json:"secret_token,omitempty"`
}

// ProfilingConfig holds APM Server profiling configuration.
type ProfilingConfig struct {
CPU *CPUProfilingConfig `json:"cpu,omitempty"`
Heap *HeapProfilingConfig `json:"heap,omitempty"`
}

// CPUProfilingConfig holds APM Server profiling configuration.
type CPUProfilingConfig struct {
Enabled bool `json:"enabled"`
Interval time.Duration `json:"interval,omitempty"`
Duration time.Duration `json:"duration,omitempty"`
}

func (c *CPUProfilingConfig) MarshalJSON() ([]byte, error) {
// time.Duration is encoded as int64.
// Convert time.Durations to durations, to encode as duration strings.
type config struct {
Enabled bool `json:"enabled"`
Interval string `json:"interval,omitempty"`
Duration string `json:"duration,omitempty"`
}
return json.Marshal(config{
Enabled: c.Enabled,
Interval: durationString(c.Interval),
Duration: durationString(c.Duration),
})
}

// HeapProfilingConfig holds APM Server profiling configuration.
type HeapProfilingConfig struct {
Enabled bool `json:"enabled"`
Interval time.Duration `json:"interval,omitempty"`
}

func (c *HeapProfilingConfig) MarshalJSON() ([]byte, error) {
// time.Duration is encoded as int64.
// Convert time.Durations to durations, to encode as duration strings.
type config struct {
Enabled bool `json:"enabled"`
Interval string `json:"interval,omitempty"`
}
return json.Marshal(config{
Enabled: c.Enabled,
Interval: durationString(c.Interval),
})
}

// OutputConfig holds APM Server libbeat output configuration.
Expand Down Expand Up @@ -203,14 +261,14 @@ func (m *MemoryQueueConfig) MarshalJSON() ([]byte, error) {
// time.Duration is encoded as int64.
// Convert time.Durations to durations, to encode as duration strings.
type config struct {
Events int `json:"events"`
FlushMinEvents int `json:"flush.min_events"`
FlushTimeout duration `json:"flush.timeout"`
Events int `json:"events"`
FlushMinEvents int `json:"flush.min_events"`
FlushTimeout string `json:"flush.timeout,omitempty"`
}
return json.Marshal(config{
Events: m.Events,
FlushMinEvents: m.FlushMinEvents,
FlushTimeout: duration(m.FlushTimeout),
FlushTimeout: durationString(m.FlushTimeout),
})
}

Expand All @@ -228,14 +286,14 @@ func (m *MonitoringConfig) MarshalJSON() ([]byte, error) {
type config struct {
Enabled bool `json:"enabled"`
Elasticsearch *ElasticsearchOutputConfig `json:"elasticsearch,omitempty"`
MetricsPeriod duration `json:"elasticsearch.metrics.period,omitempty"`
StatePeriod duration `json:"elasticsearch.state.period,omitempty"`
MetricsPeriod string `json:"elasticsearch.metrics.period,omitempty"`
StatePeriod string `json:"elasticsearch.state.period,omitempty"`
}
return json.Marshal(config{
Enabled: m.Enabled,
Elasticsearch: m.Elasticsearch,
MetricsPeriod: duration(m.MetricsPeriod),
StatePeriod: duration(m.StatePeriod),
MetricsPeriod: durationString(m.MetricsPeriod),
StatePeriod: durationString(m.StatePeriod),
})
}

Expand All @@ -255,12 +313,12 @@ func (m *TransactionAggregationConfig) MarshalJSON() ([]byte, error) {
// time.Duration is encoded as int64.
// Convert time.Durations to durations, to encode as duration strings.
type config struct {
Enabled bool `json:"enabled"`
Interval duration `json:"interval,omitempty"`
Enabled bool `json:"enabled"`
Interval string `json:"interval,omitempty"`
}
return json.Marshal(config{
Enabled: m.Enabled,
Interval: duration(m.Interval),
Interval: durationString(m.Interval),
})
}

Expand All @@ -274,19 +332,20 @@ func (s *ServiceDestinationAggregationConfig) MarshalJSON() ([]byte, error) {
// time.Duration is encoded as int64.
// Convert time.Durations to durations, to encode as duration strings.
type config struct {
Enabled bool `json:"enabled"`
Interval duration `json:"interval,omitempty"`
Enabled bool `json:"enabled"`
Interval string `json:"interval,omitempty"`
}
return json.Marshal(config{
Enabled: s.Enabled,
Interval: duration(s.Interval),
Interval: durationString(s.Interval),
})
}

type duration time.Duration

func (d duration) MarshalText() (text []byte, err error) {
return []byte(time.Duration(d).String()), nil
func durationString(d time.Duration) string {
if d == 0 {
return ""
}
return d.String()
}

func configArgs(cfg Config, extra map[string]interface{}) ([]string, error) {
Expand Down
166 changes: 166 additions & 0 deletions systemtest/instrumentation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,16 @@
package systemtest_test

import (
"bytes"
"encoding/json"
"net/http"
"net/http/httptest"
"net/http/httputil"
"net/url"
"sort"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -83,3 +91,161 @@ func TestAPMServerInstrumentation(t *testing.T) {
}
t.Fatal("failed to identify log message with matching trace IDs")
}

func TestAPMServerInstrumentationAuth(t *testing.T) {
test := func(t *testing.T, external, useSecretToken, useAPIKey bool) {
systemtest.CleanupElasticsearch(t)
srv := apmservertest.NewUnstartedServer(t)
srv.Config.SecretToken = "hunter2"
srv.Config.APIKey = &apmservertest.APIKeyConfig{Enabled: true}
srv.Config.Instrumentation = &apmservertest.InstrumentationConfig{Enabled: true}

serverURLChan := make(chan string, 1)
if external {
// The server URL is not known ahead of time, so we run
// a reverse proxy which waits for the server URL.
var serverURL string
var serverURLOnce sync.Once
proxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
serverURLOnce.Do(func() {
select {
case <-r.Context().Done():
case serverURL = <-serverURLChan:
}
})
u, err := url.Parse(serverURL)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
rp := httputil.NewSingleHostReverseProxy(u)
rp.ServeHTTP(w, r)
}))
defer proxy.Close()
srv.Config.Instrumentation.Hosts = []string{proxy.URL}
}
if useSecretToken {
srv.Config.Instrumentation.SecretToken = srv.Config.SecretToken
}
if useAPIKey {
systemtest.InvalidateAPIKeys(t)
defer systemtest.InvalidateAPIKeys(t)

cmd := apiKeyCommand("create", "--name", t.Name(), "--json")
out, err := cmd.CombinedOutput()
require.NoError(t, err)
attrs := decodeJSONMap(t, bytes.NewReader(out))
srv.Config.Instrumentation.APIKey = attrs["credentials"].(string)
}

err := srv.Start()
require.NoError(t, err)
serverURLChan <- srv.URL

// Send a transaction to the server, causing the server to
// trace the request from the agent.
tracer := srv.Tracer()
tracer.StartTransaction("name", "type").End()
tracer.Flush(nil)

systemtest.Elasticsearch.ExpectDocs(t, "apm-*", estest.BoolQuery{
Filter: []interface{}{
estest.TermQuery{
Field: "processor.event",
Value: "transaction",
},
estest.TermQuery{
Field: "service.name",
Value: "apm-server",
},
estest.TermQuery{
Field: "transaction.type",
Value: "request",
},
},
})
}
t.Run("self_no_auth", func(t *testing.T) {
// sending data to self, no auth specified
test(t, false, false, false)
})
t.Run("external_secret_token", func(t *testing.T) {
// sending data to external server, secret token specified
test(t, true, true, false)
})
t.Run("external_api_key", func(t *testing.T) {
// sending data to external server, API Key specified
test(t, true, false, true)
})
}

func TestAPMServerProfiling(t *testing.T) {
test := func(t *testing.T, profilingConfig *apmservertest.ProfilingConfig, expectedMetrics []string) {
systemtest.CleanupElasticsearch(t)
srv := apmservertest.NewUnstartedServer(t)
srv.Config.Instrumentation = &apmservertest.InstrumentationConfig{
Enabled: true,
Profiling: profilingConfig,
}
err := srv.Start()
require.NoError(t, err)

// Generate some load to cause the server to consume resources.
tracer := srv.Tracer()
for i := 0; i < 1000; i++ {
tracer.StartTransaction("name", "type").End()
}
tracer.Flush(nil)

result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*", estest.TermQuery{
Field: "processor.event",
Value: "profile",
})
assert.Equal(t, expectedMetrics, profileMetricNames(result))
}
t.Run("cpu", func(t *testing.T) {
test(t, &apmservertest.ProfilingConfig{
CPU: &apmservertest.CPUProfilingConfig{
Enabled: true,
Interval: time.Second,
Duration: time.Second,
},
}, []string{"cpu.ns", "duration", "samples.count"})
})
t.Run("heap", func(t *testing.T) {
test(t, &apmservertest.ProfilingConfig{
Heap: &apmservertest.HeapProfilingConfig{
Enabled: true,
Interval: time.Second,
},
}, []string{
"alloc_objects.count",
"alloc_space.bytes",
"inuse_objects.count",
"inuse_space.bytes",
})
})
}

func profileMetricNames(result estest.SearchResult) []string {
unique := make(map[string]struct{})
var metricNames []string
for _, hit := range result.Hits.Hits {
profileField, ok := hit.Source["profile"].(map[string]interface{})
if !ok {
continue
}
for k, v := range profileField {
if _, ok := v.(float64); !ok {
continue
}
if _, ok := unique[k]; ok {
continue
}
unique[k] = struct{}{}
metricNames = append(metricNames, k)
}
}
sort.Strings(metricNames)
return metricNames
}
Loading

0 comments on commit 1fd28d8

Please sign in to comment.