diff --git a/systemtest/apikeycmd_test.go b/systemtest/apikeycmd_test.go index 4c59762ee33..1d08f5681da 100644 --- a/systemtest/apikeycmd_test.go +++ b/systemtest/apikeycmd_test.go @@ -37,6 +37,10 @@ import ( func apiKeyCommand(subcommand string, args ...string) *apmservertest.ServerCmd { cfg := apmservertest.DefaultConfig() + return apiKeyCommandConfig(cfg, subcommand, args...) +} + +func apiKeyCommandConfig(cfg apmservertest.Config, subcommand string, args ...string) *apmservertest.ServerCmd { cfgargs, err := cfg.Args() if err != nil { panic(err) @@ -86,6 +90,22 @@ func TestAPIKeyCreateExpiration(t *testing.T) { assert.Contains(t, attrs, "expiration") } +func TestAPIKeyCreateInvalidUser(t *testing.T) { + // heartbeat_user lacks cluster privileges, and cannot create keys + // beats_user has cluster privileges, but not APM application privileges + for _, username := range []string{"heartbeat_user", "beats_user"} { + cfg := apmservertest.DefaultConfig() + cfg.Output.Elasticsearch.Username = username + cfg.Output.Elasticsearch.Password = "changeme" + + cmd := apiKeyCommandConfig(cfg, "create", "--name", t.Name(), "--json") + out, err := cmd.CombinedOutput() + require.Error(t, err) + attrs := decodeJSONMap(t, bytes.NewReader(out)) + assert.Regexp(t, username+` is missing the following requested privilege\(s\): .*`, attrs["error"]) + } +} + func TestAPIKeyInvalidateName(t *testing.T) { systemtest.InvalidateAPIKeys(t) defer systemtest.InvalidateAPIKeys(t) diff --git a/systemtest/apmservertest/config.go b/systemtest/apmservertest/config.go index 25c3fc4a284..9f5e63f8bd7 100644 --- a/systemtest/apmservertest/config.go +++ b/systemtest/apmservertest/config.go @@ -49,6 +49,7 @@ type Config struct { Sampling *SamplingConfig `json:"apm-server.sampling,omitempty"` RUM *RUMConfig `json:"apm-server.rum,omitempty"` DataStreams *DataStreamsConfig `json:"apm-server.data_streams,omitempty"` + APIKey *APIKeyConfig `json:"apm-server.api_key,omitempty"` // ResponseHeaders holds headers to add to all APM Server HTTP responses. ResponseHeaders http.Header `json:"apm-server.response_headers,omitempty"` @@ -125,12 +126,12 @@ func (t *TailSamplingConfig) MarshalJSON() ([]byte, error) { // Convert time.Durations to durations, to encode as duration strings. type config struct { Enabled bool `json:"enabled"` - Interval duration `json:"interval"` + Interval string `json:"interval"` Policies []TailSamplingPolicy `json:"policies,omitempty"` } return json.Marshal(config{ Enabled: t.Enabled, - Interval: duration(t.Interval), + Interval: durationString(t.Interval), Policies: t.Policies, }) } @@ -157,9 +158,66 @@ type DataStreamsConfig struct { Enabled bool `json:"enabled"` } +// APIKeyConfig holds APM Server API Key auth configuration. +type APIKeyConfig struct { + Enabled bool `json:"enabled"` +} + // InstrumentationConfig holds APM Server instrumentation configuration. type InstrumentationConfig struct { - Enabled bool `json:"enabled"` + Enabled bool `json:"enabled"` + Profiling *ProfilingConfig `json:"profiling,omitempty"` + + Hosts []string `json:"hosts,omitempty"` + APIKey string `json:"api_key,omitempty"` + SecretToken string `json:"secret_token,omitempty"` +} + +// ProfilingConfig holds APM Server profiling configuration. +type ProfilingConfig struct { + CPU *CPUProfilingConfig `json:"cpu,omitempty"` + Heap *HeapProfilingConfig `json:"heap,omitempty"` +} + +// CPUProfilingConfig holds APM Server profiling configuration. +type CPUProfilingConfig struct { + Enabled bool `json:"enabled"` + Interval time.Duration `json:"interval,omitempty"` + Duration time.Duration `json:"duration,omitempty"` +} + +func (c *CPUProfilingConfig) MarshalJSON() ([]byte, error) { + // time.Duration is encoded as int64. + // Convert time.Durations to durations, to encode as duration strings. + type config struct { + Enabled bool `json:"enabled"` + Interval string `json:"interval,omitempty"` + Duration string `json:"duration,omitempty"` + } + return json.Marshal(config{ + Enabled: c.Enabled, + Interval: durationString(c.Interval), + Duration: durationString(c.Duration), + }) +} + +// HeapProfilingConfig holds APM Server profiling configuration. +type HeapProfilingConfig struct { + Enabled bool `json:"enabled"` + Interval time.Duration `json:"interval,omitempty"` +} + +func (c *HeapProfilingConfig) MarshalJSON() ([]byte, error) { + // time.Duration is encoded as int64. + // Convert time.Durations to durations, to encode as duration strings. + type config struct { + Enabled bool `json:"enabled"` + Interval string `json:"interval,omitempty"` + } + return json.Marshal(config{ + Enabled: c.Enabled, + Interval: durationString(c.Interval), + }) } // OutputConfig holds APM Server libbeat output configuration. @@ -203,14 +261,14 @@ func (m *MemoryQueueConfig) MarshalJSON() ([]byte, error) { // time.Duration is encoded as int64. // Convert time.Durations to durations, to encode as duration strings. type config struct { - Events int `json:"events"` - FlushMinEvents int `json:"flush.min_events"` - FlushTimeout duration `json:"flush.timeout"` + Events int `json:"events"` + FlushMinEvents int `json:"flush.min_events"` + FlushTimeout string `json:"flush.timeout,omitempty"` } return json.Marshal(config{ Events: m.Events, FlushMinEvents: m.FlushMinEvents, - FlushTimeout: duration(m.FlushTimeout), + FlushTimeout: durationString(m.FlushTimeout), }) } @@ -228,14 +286,14 @@ func (m *MonitoringConfig) MarshalJSON() ([]byte, error) { type config struct { Enabled bool `json:"enabled"` Elasticsearch *ElasticsearchOutputConfig `json:"elasticsearch,omitempty"` - MetricsPeriod duration `json:"elasticsearch.metrics.period,omitempty"` - StatePeriod duration `json:"elasticsearch.state.period,omitempty"` + MetricsPeriod string `json:"elasticsearch.metrics.period,omitempty"` + StatePeriod string `json:"elasticsearch.state.period,omitempty"` } return json.Marshal(config{ Enabled: m.Enabled, Elasticsearch: m.Elasticsearch, - MetricsPeriod: duration(m.MetricsPeriod), - StatePeriod: duration(m.StatePeriod), + MetricsPeriod: durationString(m.MetricsPeriod), + StatePeriod: durationString(m.StatePeriod), }) } @@ -255,12 +313,12 @@ func (m *TransactionAggregationConfig) MarshalJSON() ([]byte, error) { // time.Duration is encoded as int64. // Convert time.Durations to durations, to encode as duration strings. type config struct { - Enabled bool `json:"enabled"` - Interval duration `json:"interval,omitempty"` + Enabled bool `json:"enabled"` + Interval string `json:"interval,omitempty"` } return json.Marshal(config{ Enabled: m.Enabled, - Interval: duration(m.Interval), + Interval: durationString(m.Interval), }) } @@ -274,19 +332,20 @@ func (s *ServiceDestinationAggregationConfig) MarshalJSON() ([]byte, error) { // time.Duration is encoded as int64. // Convert time.Durations to durations, to encode as duration strings. type config struct { - Enabled bool `json:"enabled"` - Interval duration `json:"interval,omitempty"` + Enabled bool `json:"enabled"` + Interval string `json:"interval,omitempty"` } return json.Marshal(config{ Enabled: s.Enabled, - Interval: duration(s.Interval), + Interval: durationString(s.Interval), }) } -type duration time.Duration - -func (d duration) MarshalText() (text []byte, err error) { - return []byte(time.Duration(d).String()), nil +func durationString(d time.Duration) string { + if d == 0 { + return "" + } + return d.String() } func configArgs(cfg Config, extra map[string]interface{}) ([]string, error) { diff --git a/systemtest/instrumentation_test.go b/systemtest/instrumentation_test.go index d7c6d51e4b3..52d4a65a90a 100644 --- a/systemtest/instrumentation_test.go +++ b/systemtest/instrumentation_test.go @@ -18,8 +18,16 @@ package systemtest_test import ( + "bytes" "encoding/json" + "net/http" + "net/http/httptest" + "net/http/httputil" + "net/url" + "sort" + "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -83,3 +91,161 @@ func TestAPMServerInstrumentation(t *testing.T) { } t.Fatal("failed to identify log message with matching trace IDs") } + +func TestAPMServerInstrumentationAuth(t *testing.T) { + test := func(t *testing.T, external, useSecretToken, useAPIKey bool) { + systemtest.CleanupElasticsearch(t) + srv := apmservertest.NewUnstartedServer(t) + srv.Config.SecretToken = "hunter2" + srv.Config.APIKey = &apmservertest.APIKeyConfig{Enabled: true} + srv.Config.Instrumentation = &apmservertest.InstrumentationConfig{Enabled: true} + + serverURLChan := make(chan string, 1) + if external { + // The server URL is not known ahead of time, so we run + // a reverse proxy which waits for the server URL. + var serverURL string + var serverURLOnce sync.Once + proxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + serverURLOnce.Do(func() { + select { + case <-r.Context().Done(): + case serverURL = <-serverURLChan: + } + }) + u, err := url.Parse(serverURL) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + rp := httputil.NewSingleHostReverseProxy(u) + rp.ServeHTTP(w, r) + })) + defer proxy.Close() + srv.Config.Instrumentation.Hosts = []string{proxy.URL} + } + if useSecretToken { + srv.Config.Instrumentation.SecretToken = srv.Config.SecretToken + } + if useAPIKey { + systemtest.InvalidateAPIKeys(t) + defer systemtest.InvalidateAPIKeys(t) + + cmd := apiKeyCommand("create", "--name", t.Name(), "--json") + out, err := cmd.CombinedOutput() + require.NoError(t, err) + attrs := decodeJSONMap(t, bytes.NewReader(out)) + srv.Config.Instrumentation.APIKey = attrs["credentials"].(string) + } + + err := srv.Start() + require.NoError(t, err) + serverURLChan <- srv.URL + + // Send a transaction to the server, causing the server to + // trace the request from the agent. + tracer := srv.Tracer() + tracer.StartTransaction("name", "type").End() + tracer.Flush(nil) + + systemtest.Elasticsearch.ExpectDocs(t, "apm-*", estest.BoolQuery{ + Filter: []interface{}{ + estest.TermQuery{ + Field: "processor.event", + Value: "transaction", + }, + estest.TermQuery{ + Field: "service.name", + Value: "apm-server", + }, + estest.TermQuery{ + Field: "transaction.type", + Value: "request", + }, + }, + }) + } + t.Run("self_no_auth", func(t *testing.T) { + // sending data to self, no auth specified + test(t, false, false, false) + }) + t.Run("external_secret_token", func(t *testing.T) { + // sending data to external server, secret token specified + test(t, true, true, false) + }) + t.Run("external_api_key", func(t *testing.T) { + // sending data to external server, API Key specified + test(t, true, false, true) + }) +} + +func TestAPMServerProfiling(t *testing.T) { + test := func(t *testing.T, profilingConfig *apmservertest.ProfilingConfig, expectedMetrics []string) { + systemtest.CleanupElasticsearch(t) + srv := apmservertest.NewUnstartedServer(t) + srv.Config.Instrumentation = &apmservertest.InstrumentationConfig{ + Enabled: true, + Profiling: profilingConfig, + } + err := srv.Start() + require.NoError(t, err) + + // Generate some load to cause the server to consume resources. + tracer := srv.Tracer() + for i := 0; i < 1000; i++ { + tracer.StartTransaction("name", "type").End() + } + tracer.Flush(nil) + + result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*", estest.TermQuery{ + Field: "processor.event", + Value: "profile", + }) + assert.Equal(t, expectedMetrics, profileMetricNames(result)) + } + t.Run("cpu", func(t *testing.T) { + test(t, &apmservertest.ProfilingConfig{ + CPU: &apmservertest.CPUProfilingConfig{ + Enabled: true, + Interval: time.Second, + Duration: time.Second, + }, + }, []string{"cpu.ns", "duration", "samples.count"}) + }) + t.Run("heap", func(t *testing.T) { + test(t, &apmservertest.ProfilingConfig{ + Heap: &apmservertest.HeapProfilingConfig{ + Enabled: true, + Interval: time.Second, + }, + }, []string{ + "alloc_objects.count", + "alloc_space.bytes", + "inuse_objects.count", + "inuse_space.bytes", + }) + }) +} + +func profileMetricNames(result estest.SearchResult) []string { + unique := make(map[string]struct{}) + var metricNames []string + for _, hit := range result.Hits.Hits { + profileField, ok := hit.Source["profile"].(map[string]interface{}) + if !ok { + continue + } + for k, v := range profileField { + if _, ok := v.(float64); !ok { + continue + } + if _, ok := unique[k]; ok { + continue + } + unique[k] = struct{}{} + metricNames = append(metricNames, k) + } + } + sort.Strings(metricNames) + return metricNames +} diff --git a/systemtest/sampling_test.go b/systemtest/sampling_test.go index f407a2d5629..401c2f9047e 100644 --- a/systemtest/sampling_test.go +++ b/systemtest/sampling_test.go @@ -68,6 +68,23 @@ func TestKeepUnsampled(t *testing.T) { } } +func TestKeepUnsampledWarning(t *testing.T) { + systemtest.CleanupElasticsearch(t) + srv := apmservertest.NewUnstartedServer(t) + srv.Config.Sampling = &apmservertest.SamplingConfig{KeepUnsampled: false} + require.NoError(t, srv.Start()) + require.NoError(t, srv.Close()) + + var messages []string + for _, log := range srv.Logs.All() { + messages = append(messages, log.Message) + } + assert.Contains(t, messages, ""+ + "apm-server.sampling.keep_unsampled and apm-server.aggregation.transactions.enabled are both false, "+ + "which will lead to incorrect metrics being reported in the APM UI", + ) +} + func TestTailSampling(t *testing.T) { systemtest.CleanupElasticsearch(t) diff --git a/tests/system/config/apm-server.yml.j2 b/tests/system/config/apm-server.yml.j2 index a431e033bc7..1723202193e 100644 --- a/tests/system/config/apm-server.yml.j2 +++ b/tests/system/config/apm-server.yml.j2 @@ -109,34 +109,6 @@ apm-server: register.ingest.pipeline.overwrite: {{ register_pipeline_overwrite }} {% endif %} - {% if instrumentation_enabled %} - instrumentation.enabled: {{ instrumentation_enabled }} - {% endif %} - {% if instrumentation_host %} - instrumentation.hosts: [{{ instrumentation_host }}] - {% endif %} - {% if instrumentation_api_key %} - instrumentation.api_key: {{ instrumentation_api_key }} - {% endif %} - {% if instrumentation_secret_token %} - instrumentation.secret_token: {{ instrumentation_secret_token }} - {% endif %} - {% if profiling_cpu_enabled %} - instrumentation.profiling.cpu.enabled: {{ profiling_cpu_enabled }} - {% endif %} - {% if profiling_cpu_interval %} - instrumentation.profiling.cpu.interval: {{ profiling_cpu_interval }} - {% endif %} - {% if profiling_cpu_duration %} - instrumentation.profiling.cpu.duration: {{ profiling_cpu_duration }} - {% endif %} - {% if profiling_heap_enabled %} - instrumentation.profiling.heap.enabled: {{ profiling_heap_enabled }} - {% endif %} - {% if profiling_heap_interval %} - instrumentation.profiling.heap.interval: {{ profiling_heap_interval }} - {% endif %} - {% if aggregation_enabled %} aggregation.transactions.enabled: {{ aggregation_enabled }} {% endif %} @@ -207,37 +179,6 @@ apm-server: {% endif %} {% if acm_cache_expiration is not none %} agent.config.cache.expiration: {{ acm_cache_expiration }}{% endif %} - -################### Libbeat instrumentation ############################### -{% if libbeat_instrumentation_enabled %} -instrumentation.enabled: {{ libbeat_instrumentation_enabled }} -{% endif %} -{% if libbeat_instrumentation_host %} -instrumentation.hosts: [{{ libbeat_instrumentation_host }}] -{% endif %} -{% if libbeat_instrumentation_api_key %} -instrumentation.api_key: {{ libbeat_instrumentation_api_key }} -{% endif %} -{% if libbeat_instrumentation_secret_token %} -instrumentation.secret_token: {{ libbeat_instrumentation_secret_token }} -{% endif %} -{% if libbeat_profiling_cpu_enabled %} -instrumentation.profiling.cpu.enabled: {{ libbeat_profiling_cpu_enabled }} -{% endif %} -{% if libbeat_profiling_cpu_interval %} -instrumentation.profiling.cpu.interval: {{ libbeat_profiling_cpu_interval }} -{% endif %} -{% if libbeat_profiling_cpu_duration %} -instrumentation.profiling.cpu.duration: {{ libbeat_profiling_cpu_duration }} -{% endif %} -{% if libbeat_profiling_heap_enabled %} -instrumentation.profiling.heap.enabled: {{ libbeat_profiling_heap_enabled }} -{% endif %} -{% if libbeat_profiling_heap_interval %} -instrumentation.profiling.heap.interval: {{ libbeat_profiling_heap_interval }} -{% endif %} - - ############################# Setup ########################################## {% if override_template %} diff --git a/tests/system/test_apikey_cmd.py b/tests/system/test_apikey_cmd.py index edcac357b82..ae8530a6daa 100644 --- a/tests/system/test_apikey_cmd.py +++ b/tests/system/test_apikey_cmd.py @@ -186,38 +186,3 @@ def test_verify_each(self): apikey = self.create("--agent-config") result = self.subcommand_output("verify", "--credentials={}".format(apikey["credentials"])) assert result == {'event:write': False, 'config_agent:read': True, 'sourcemap:write': False}, result - - -@integration_test -class APIKeyCommandBadUserTest(APIKeyCommandBaseTest): - - def config(self): - return { - "elasticsearch_host": self.get_elasticsearch_url(user="heartbeat_user", password="changeme"), - "file_enabled": "false", - "kibana_enabled": "false", - } - - def test_create_bad_user(self): - """heartbeat_user doesn't have required cluster privileges, so it can't create keys""" - result = self.subcommand_output("create", "--name", self.apikey_name, exit_code=1) - assert result.get("error") is not None - - -@integration_test -class APIKeyCommandBadUser2Test(APIKeyCommandBaseTest): - - def config(self): - return { - "elasticsearch_host": self.get_elasticsearch_url(user="beats_user", password="changeme"), - "file_enabled": "false", - "kibana_enabled": "false", - } - - def test_create_bad_user(self): - """beats_user does have required cluster privileges, but not APM application privileges, - so it can't create keys - """ - result = self.subcommand_output("create", "--name", self.apikey_name, exit_code=1) - assert result.get("error") is not None, result - assert "beats_user is missing the following requested privilege(s):" in result.get("error"), result diff --git a/tests/system/test_instrumentation.py b/tests/system/test_instrumentation.py deleted file mode 100644 index 42bd821ffdc..00000000000 --- a/tests/system/test_instrumentation.py +++ /dev/null @@ -1,177 +0,0 @@ -from datetime import datetime, timedelta -import os -import time -import requests - -from apmserver import integration_test -from apmserver import ElasticTest -from test_auth import APIKeyBaseTest -from helper import wait_until -from es_helper import index_profile, index_transaction - -# Set ELASTIC_APM_API_REQUEST_TIME to a short duration -# to speed up the time taken for self-tracing events -# to be ingested. -os.environ["ELASTIC_APM_API_REQUEST_TIME"] = "1s" - - -# Exercises the DEPRECATED apm-server.instrumentation.* config -# When updating this file, consider test_libbeat_instrumentation.py -# Remove in 8.0 - -def get_instrumentation_event(es, index): - query = {"term": {"service.name": "apm-server"}} - return es.count(index=index, body={"query": query})['count'] > 0 - - -@integration_test -class TestInMemoryTracingAPIKey(APIKeyBaseTest): - def config(self): - cfg = super(TestInMemoryTracingAPIKey, self).config() - cfg.update({ - "api_key_enabled": True, - "instrumentation_enabled": "true", - }) - return cfg - - def test_api_key_auth(self): - """Self-instrumentation using in-memory listener without configuring an APIKey""" - - # Send a POST request to the intake API URL. Doesn't matter what the - # request body contents are, as the request will fail due to lack of - # authorization. We just want to trigger the server's in-memory tracing, - # and test that the in-memory tracer works without having an api_key configured - r = requests.post(self.intake_url, data="invalid") - self.assertEqual(401, r.status_code) - - wait_until(lambda: get_instrumentation_event(self.es, index_transaction), - name='have in-memory instrumentation documents without api_key') - - -@integration_test -class TestExternalTracingAPIKey(APIKeyBaseTest): - def config(self): - cfg = super(TestExternalTracingAPIKey, self).config() - api_key = self.create_apm_api_key([self.privilege_event], self.resource_any) - cfg.update({ - "api_key_enabled": True, - "instrumentation_enabled": "true", - "instrumentation_api_key": api_key, - # Set instrumentation.hosts to the same APM Server. - # - # Explicitly specifying hosts configures the tracer to - # behave as if it's sending to an external server, rather - # than using the in-memory transport that bypasses auth. - "instrumentation_host": APIKeyBaseTest.host, - }) - return cfg - - def test_api_key_auth(self): - # Send a POST request to the intake API URL. Doesn't matter what the - # request body contents are, as the request will fail due to lack of - # authorization. We just want to trigger the server's tracing. - r = requests.post(self.intake_url, data="invalid") - self.assertEqual(401, r.status_code) - - wait_until(lambda: get_instrumentation_event(self.es, index_transaction), - name='have external server instrumentation documents with api_key') - - -@integration_test -class TestExternalTracingSecretToken(ElasticTest): - def config(self): - cfg = super(TestExternalTracingSecretToken, self).config() - secret_token = "abc123" - cfg.update({ - "secret_token": secret_token, - "instrumentation_enabled": "true", - "instrumentation_secret_token": secret_token, - # Set instrumentation.hosts to the same APM Server. - # - # Explicitly specifying hosts configures the tracer to - # behave as if it's sending to an external server, rather - # than using the in-memory transport that bypasses auth. - "instrumentation_host": ElasticTest.host, - }) - return cfg - - def test_secret_token_auth(self): - # Send a POST request to the intake API URL. Doesn't matter what the - # request body contents are, as the request will fail due to lack of - # authorization. We just want to trigger the server's tracing. - r = requests.post(self.intake_url, data="invalid") - self.assertEqual(401, r.status_code) - - wait_until(lambda: get_instrumentation_event(self.es, index_transaction), - name='have external server instrumentation documents with secret_token') - - -class ProfilingTest(ElasticTest): - def metric_fields(self): - metric_fields = set() - rs = self.es.search(index=index_profile) - for hit in rs["hits"]["hits"]: - profile = hit["_source"]["profile"] - metric_fields.update((k for (k, v) in profile.items() if type(v) is int)) - return metric_fields - - def wait_for_profile(self): - def cond(): - response = self.es.count(index=index_profile, body={"query": {"term": {"processor.name": "profile"}}}) - return response['count'] != 0 - wait_until(cond, max_timeout=10, name="waiting for profile") - - -@integration_test -class TestCPUProfiling(ProfilingTest): - config_overrides = { - "instrumentation_enabled": "true", - "profiling_cpu_enabled": "true", - "profiling_cpu_interval": "1s", - "profiling_cpu_duration": "5s", - } - - def test_self_profiling(self): - """CPU profiling enabled""" - - def create_load(): - payload_path = self.get_payload_path("transactions_spans.ndjson") - with open(payload_path) as f: - requests.post(self.intake_url, data=f, headers={'content-type': 'application/x-ndjson'}) - - # Wait for profiling to begin, and then start sending data - # to the server to create some CPU load. - - time.sleep(1) - start = datetime.now() - while datetime.now()-start < timedelta(seconds=5): - create_load() - self.wait_for_profile() - - expected_metric_fields = set([u"cpu.ns", u"samples.count", u"duration"]) - metric_fields = self.metric_fields() - self.assertEqual(metric_fields, expected_metric_fields) - - -@integration_test -class TestHeapProfiling(ProfilingTest): - config_overrides = { - "instrumentation_enabled": "true", - "profiling_heap_enabled": "true", - "profiling_heap_interval": "1s", - } - - def test_self_profiling(self): - """Heap profiling enabled""" - - time.sleep(1) - self.wait_for_profile() - - expected_metric_fields = set([ - u"alloc_objects.count", - u"inuse_objects.count", - u"alloc_space.bytes", - u"inuse_space.bytes", - ]) - metric_fields = self.metric_fields() - self.assertEqual(metric_fields, expected_metric_fields) diff --git a/tests/system/test_libbeat_instrumentation.py b/tests/system/test_libbeat_instrumentation.py deleted file mode 100644 index d813f6d8545..00000000000 --- a/tests/system/test_libbeat_instrumentation.py +++ /dev/null @@ -1,176 +0,0 @@ -from datetime import datetime, timedelta -import os -import time -import requests - -from apmserver import integration_test -from apmserver import ElasticTest -from test_auth import APIKeyBaseTest -from helper import wait_until -from es_helper import index_profile, index_transaction - -# Set ELASTIC_APM_API_REQUEST_TIME to a short duration -# to speed up the time taken for self-tracing events -# to be ingested. -os.environ["ELASTIC_APM_API_REQUEST_TIME"] = "1s" - -# This exercises the instrumentation.* config -# When updating this file, consider test_instrumentation.py - - -def get_instrumentation_event(es, index): - query = {"term": {"service.name": "apm-server"}} - return es.count(index=index, body={"query": query})['count'] > 0 - - -@integration_test -class TestInMemoryTracingAPIKey(APIKeyBaseTest): - def config(self): - cfg = super(TestInMemoryTracingAPIKey, self).config() - cfg.update({ - "api_key_enabled": True, - "libbeat_instrumentation_enabled": "true", - }) - return cfg - - def test_api_key_auth(self): - """Self-instrumentation using in-memory listener without configuring an APIKey""" - - # Send a POST request to the intake API URL. Doesn't matter what the - # request body contents are, as the request will fail due to lack of - # authorization. We just want to trigger the server's in-memory tracing, - # and test that the in-memory tracer works without having an api_key configured - r = requests.post(self.intake_url, data="invalid") - self.assertEqual(401, r.status_code) - - wait_until(lambda: get_instrumentation_event(self.es, index_transaction), - name='have in-memory instrumentation documents without api_key') - - -@integration_test -class TestExternalTracingAPIKey(APIKeyBaseTest): - def config(self): - cfg = super(TestExternalTracingAPIKey, self).config() - api_key = self.create_apm_api_key([self.privilege_event], self.resource_any) - cfg.update({ - "api_key_enabled": True, - "libbeat_instrumentation_enabled": "true", - "libbeat_instrumentation_api_key": api_key, - # Set instrumentation.hosts to the same APM Server. - # - # Explicitly specifying hosts configures the tracer to - # behave as if it's sending to an external server, rather - # than using the in-memory transport that bypasses auth. - "libbeat_instrumentation_host": APIKeyBaseTest.host, - }) - return cfg - - def test_api_key_auth(self): - # Send a POST request to the intake API URL. Doesn't matter what the - # request body contents are, as the request will fail due to lack of - # authorization. We just want to trigger the server's tracing. - r = requests.post(self.intake_url, data="invalid") - self.assertEqual(401, r.status_code) - - wait_until(lambda: get_instrumentation_event(self.es, index_transaction), - name='have external server instrumentation documents with api_key') - - -@integration_test -class TestExternalTracingSecretToken(ElasticTest): - def config(self): - cfg = super(TestExternalTracingSecretToken, self).config() - secret_token = "abc123" - cfg.update({ - "secret_token": secret_token, - "libbeat_instrumentation_enabled": "true", - "libbeat_instrumentation_secret_token": secret_token, - # Set instrumentation.hosts to the same APM Server. - # - # Explicitly specifying hosts configures the tracer to - # behave as if it's sending to an external server, rather - # than using the in-memory transport that bypasses auth. - "libbeat_instrumentation_host": ElasticTest.host, - }) - return cfg - - def test_secret_token_auth(self): - # Send a POST request to the intake API URL. Doesn't matter what the - # request body contents are, as the request will fail due to lack of - # authorization. We just want to trigger the server's tracing. - r = requests.post(self.intake_url, data="invalid") - self.assertEqual(401, r.status_code) - - wait_until(lambda: get_instrumentation_event(self.es, index_transaction), - name='have external server instrumentation documents with secret_token') - - -class ProfilingTest(ElasticTest): - def metric_fields(self): - metric_fields = set() - rs = self.es.search(index=index_profile) - for hit in rs["hits"]["hits"]: - profile = hit["_source"]["profile"] - metric_fields.update((k for (k, v) in profile.items() if type(v) is int)) - return metric_fields - - def wait_for_profile(self): - def cond(): - response = self.es.count(index=index_profile, body={"query": {"term": {"processor.name": "profile"}}}) - return response['count'] != 0 - wait_until(cond, max_timeout=10, name="waiting for profile") - - -@integration_test -class TestCPUProfiling(ProfilingTest): - config_overrides = { - "libbeat_instrumentation_enabled": "true", - "libbeat_profiling_cpu_enabled": "true", - "libbeat_profiling_cpu_interval": "1s", - "libbeat_profiling_cpu_duration": "5s", - } - - def test_self_profiling(self): - """CPU profiling enabled""" - - def create_load(): - payload_path = self.get_payload_path("transactions_spans.ndjson") - with open(payload_path, 'rb') as f: - requests.post(self.intake_url, data=f, headers={'content-type': 'application/x-ndjson'}) - - # Wait for profiling to begin, and then start sending data - # to the server to create some CPU load. - - time.sleep(1) - start = datetime.now() - while datetime.now()-start < timedelta(seconds=5): - create_load() - self.wait_for_profile() - - expected_metric_fields = set([u"cpu.ns", u"samples.count", u"duration"]) - metric_fields = self.metric_fields() - self.assertEqual(metric_fields, expected_metric_fields) - - -@integration_test -class TestHeapProfiling(ProfilingTest): - config_overrides = { - "libbeat_instrumentation_enabled": "true", - "libbeat_profiling_heap_enabled": "true", - "libbeat_profiling_heap_interval": "1s", - } - - def test_self_profiling(self): - """Heap profiling enabled""" - - time.sleep(1) - self.wait_for_profile() - - expected_metric_fields = set([ - u"alloc_objects.count", - u"inuse_objects.count", - u"alloc_space.bytes", - u"inuse_space.bytes", - ]) - metric_fields = self.metric_fields() - self.assertEqual(metric_fields, expected_metric_fields) diff --git a/tests/system/test_sampling.py b/tests/system/test_sampling.py deleted file mode 100644 index 52c6255b4a8..00000000000 --- a/tests/system/test_sampling.py +++ /dev/null @@ -1,56 +0,0 @@ -import time - -from apmserver import integration_test -from apmserver import ClientSideElasticTest, ElasticTest, ExpvarBaseTest, ProcStartupFailureTest -from helper import wait_until -from es_helper import index_smap, index_metric, index_transaction - - -@integration_test -class TestKeepUnsampled(ElasticTest): - def config(self): - cfg = super(TestKeepUnsampled, self).config() - cfg.update({"sampling_keep_unsampled": True}) - return cfg - - def test(self): - self.load_docs_with_template(self.get_payload_path("transactions_spans.ndjson"), - self.intake_url, 'transaction', 9) - self.assert_no_logged_warnings() - docs = self.wait_for_events('transaction', 4, index=index_transaction) - self.approve_docs('keep_unsampled_transactions', docs) - - -@integration_test -class TestDropUnsampled(ElasticTest): - def config(self): - cfg = super(TestDropUnsampled, self).config() - cfg.update({ - "sampling_keep_unsampled": False, - # Enable aggregation to avoid a warning. - "aggregation_enabled": True, - }) - return cfg - - def test(self): - self.load_docs_with_template(self.get_payload_path("transactions_spans.ndjson"), - self.intake_url, 'transaction', 8) - self.assert_no_logged_warnings() - docs = self.wait_for_events('transaction', 3, index=index_transaction) - self.approve_docs('drop_unsampled_transactions', docs) - - -@integration_test -class TestConfigWarning(ElasticTest): - def config(self): - cfg = super(TestConfigWarning, self).config() - cfg.update({ - "sampling_keep_unsampled": False, - # Disable aggregation to force a warning. - "aggregation_enabled": False, - }) - return cfg - - def test(self): - expected = "apm-server.sampling.keep_unsampled and apm-server.aggregation.transactions.enabled are both false, which will lead to incorrect metrics being reported in the APM UI" - self.assertIn(expected, self.get_log())