Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[apm] peer.service aggregation for trace stats, option to compute stats based on span.kind #16103

Merged
merged 53 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from 47 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
df3e63f
[apm] initial commit; add peer.service to stats
jdgumz Mar 7, 2023
8cda026
[apm] re-generate msgpack to account for peer.service
jdgumz Mar 7, 2023
d0e1321
[apm] add tests for peer.service aggregation
jdgumz Mar 14, 2023
f940919
[apm] add comment to explain peer_service in proto
jdgumz Mar 14, 2023
fb95f5c
[apm] comment revision
jdgumz Mar 14, 2023
2ad214f
[apm] add release notes
jdgumz Mar 14, 2023
af0f5f6
Update pkg/trace/stats/aggregation.go
jdgumz Mar 15, 2023
bfaa1e2
Update pkg/trace/stats/aggregation.go
jdgumz Mar 15, 2023
1593a31
Merge branch 'apm/peer-service-aggregation-for-trace-stats' of github…
jdgumz Mar 21, 2023
27c1a02
[apm] add consideration of span.kind == INTERNAL for calculating stats
jdgumz Mar 21, 2023
1e74f07
[apm] consider CLIENT/PRODUCER specifically rather than INTERNAL for …
jdgumz Mar 21, 2023
2e80bb4
[apm] remove incorrect comment
jdgumz Mar 21, 2023
0353e82
[apm] correct tests, update release notes
jdgumz Mar 21, 2023
4eb1a0a
[apm] RemoteOutbound -> RemoteOutgoing
jdgumz Mar 21, 2023
dcfe044
[apm] remove unnecessary newlines
jdgumz Mar 21, 2023
a776a46
[apm] update peer.service test to use a more realistic example for it…
jdgumz Mar 21, 2023
b0a39d6
Update pkg/trace/stats/concentrator.go
jdgumz Mar 21, 2023
59129de
Update releasenotes/notes/add-peer-service-for-trace-stats-225f1b90c9…
jdgumz Mar 21, 2023
66f690d
Update releasenotes/notes/add-peer-service-for-trace-stats-225f1b90c9…
jdgumz Mar 21, 2023
a405615
[apm] fix bug from remote commit
jdgumz Mar 21, 2023
ba3ad76
[apm] remove consideration of span.kind for trace stats computation
jdgumz Mar 24, 2023
588a0ec
[apm] add normalization for peer.service
jdgumz Mar 24, 2023
2586730
[apm] add concentrator configuration to enable/disable peer.service s…
jdgumz Mar 24, 2023
e7222b9
[apm] ensure peer.service is exported from client stats aggregator as…
jdgumz Mar 24, 2023
11cfffb
[apm] update config_template.yaml
jdgumz Mar 24, 2023
90c0b09
[apm] further clarify effect of disabling peer.service stats aggregation
jdgumz Mar 24, 2023
45758c6
[apm] fix test failures
jdgumz Mar 25, 2023
c7c7e51
[apm] rework configuration of peer.service, add back extra aggregator…
jdgumz Mar 29, 2023
394c33b
[apm] set peer service aggregation to false by default, ensure config…
jdgumz Mar 29, 2023
4fa25f1
[apm] revise variable name to be consistent, add check for peer servi…
jdgumz Mar 29, 2023
0986d4f
[apm] add logic to compute stats based on specific span.kind values
jdgumz Mar 29, 2023
2e68665
[apm] add config for option to enable stats computation by span.kind
jdgumz Mar 30, 2023
2c1f694
[apm] update documentation and release notes
jdgumz Mar 30, 2023
0cf26cc
Update cmd/trace-agent/config/config.go
jdgumz Mar 30, 2023
63c572a
Update pkg/trace/traceutil/span.go
jdgumz Mar 30, 2023
6dc94ad
[apm] rename peer service aggregation config field
jdgumz Mar 30, 2023
ffd0975
[apm] add more test cases to ensure case insensitivity for span.kind …
jdgumz Mar 30, 2023
cee986d
[apm] move span.kind check func implementation to concentrator.go
jdgumz Mar 30, 2023
c4ea367
Update pkg/trace/stats/concentrator.go
jdgumz Mar 30, 2023
d6a545c
[apm] finish move of span.kind check func
jdgumz Mar 30, 2023
1a07963
[apm] go back to setting a bool flag for peer.service in concentrator
jdgumz Mar 30, 2023
1454017
[apm] add back ExtraAggregators for info test case
jdgumz Mar 30, 2023
76957f4
[apm] fix testutil fixture BucketWithSpans
jdgumz Mar 30, 2023
1f8c1e3
[apm] move peer.service aggregation check back into NewAggregationFro…
jdgumz Mar 30, 2023
01156fc
[apm] change CSA to use bool for peer svc aggregation as well
jdgumz Mar 30, 2023
98136a2
[apm] remove unused const
jdgumz Mar 30, 2023
ecdeaa4
[apm] update guidance on new config options
jdgumz Mar 31, 2023
c65f007
[apm] add small test for bucket aggregation key creation and peer.ser…
jdgumz Mar 31, 2023
b4bdba1
[apm] remove unused field
jdgumz Apr 3, 2023
4450bb5
[apm] fix fuzz test
jdgumz Apr 3, 2023
29f9320
[apm] fix stats info tests
jdgumz Apr 3, 2023
94886e0
Merge branch 'main' into apm/peer-service-aggregation-for-trace-stats
jdgumz Apr 4, 2023
8ad14ae
Merge branch 'main' into apm/peer-service-aggregation-for-trace-stats
jdgumz Apr 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/trace-agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ func applyDatadogConfig(c *config.AgentConfig) error {
if coreconfig.Datadog.IsSet("apm_config.connection_limit") {
c.ConnectionLimit = coreconfig.Datadog.GetInt("apm_config.connection_limit")
}
c.PeerServiceAggregation = coreconfig.Datadog.GetBool("apm_config.peer_service_aggregation")
c.ComputeStatsBySpanKind = coreconfig.Datadog.GetBool("apm_config.compute_stats_by_span_kind")
if coreconfig.Datadog.IsSet("apm_config.extra_sample_rate") {
c.ExtraSampleRate = coreconfig.Datadog.GetFloat64("apm_config.extra_sample_rate")
}
Expand Down
44 changes: 44 additions & 0 deletions cmd/trace-agent/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1094,3 +1094,47 @@ func TestSetMaxMemCPU(t *testing.T) {
assert.Equal(t, 300.0, c.MaxMemory)
})
}

func TestPeerServiceAggregation(t *testing.T) {
t.Run("disabled", func(t *testing.T) {
defer cleanConfig()
cfg := config.New()
err := applyDatadogConfig(cfg)

assert := assert.New(t)
assert.NoError(err)
assert.False(cfg.PeerServiceAggregation)
})
t.Run("enabled", func(t *testing.T) {
defer cleanConfig()
coreconfig.Datadog.Set("apm_config.peer_service_aggregation", true)
cfg := config.New()
err := applyDatadogConfig(cfg)

assert := assert.New(t)
assert.NoError(err)
assert.True(cfg.PeerServiceAggregation)
})
}

func TestComputeStatsBySpanKind(t *testing.T) {
t.Run("disabled", func(t *testing.T) {
defer cleanConfig()
cfg := config.New()
err := applyDatadogConfig(cfg)

assert := assert.New(t)
assert.NoError(err)
assert.False(cfg.ComputeStatsBySpanKind)
})
t.Run("enabled", func(t *testing.T) {
defer cleanConfig()
coreconfig.Datadog.Set("apm_config.compute_stats_by_span_kind", true)
cfg := config.New()
err := applyDatadogConfig(cfg)

assert := assert.New(t)
assert.NoError(err)
assert.True(cfg.ComputeStatsBySpanKind)
})
}
2 changes: 2 additions & 0 deletions pkg/config/apm.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ func setupAPM(config Config) {
config.BindEnvAndSetDefault("apm_config.windows_pipe_buffer_size", 1_000_000, "DD_APM_WINDOWS_PIPE_BUFFER_SIZE") //nolint:errcheck
config.BindEnvAndSetDefault("apm_config.windows_pipe_security_descriptor", "D:AI(A;;GA;;;WD)", "DD_APM_WINDOWS_PIPE_SECURITY_DESCRIPTOR") //nolint:errcheck
config.BindEnvAndSetDefault("apm_config.remote_tagger", true, "DD_APM_REMOTE_TAGGER") //nolint:errcheck
config.BindEnvAndSetDefault("apm_config.peer_service_stats_aggregation", false, "DD_APM_PEER_SERVICE_STATS_AGGREGATION") //nolint:errcheck
config.BindEnvAndSetDefault("apm_config.compute_stats_by_span_kind", false, "DD_APM_COMPUTE_STATS_BY_SPAN_KIND") //nolint:errcheck

config.BindEnv("apm_config.max_catalog_services", "DD_APM_MAX_CATALOG_SERVICES")
config.BindEnv("apm_config.receiver_timeout", "DD_APM_RECEIVER_TIMEOUT")
Expand Down
20 changes: 19 additions & 1 deletion pkg/config/config_template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1222,10 +1222,28 @@ api_key:
#
# connection_limit: 2000

## @param compute_stats_by_span_kind - bool - default: false
## @env DD_APM_COMPUTE_STATS_BY_SPAN_KIND - bool - default: false
## Enables an additional stats computation check on spans to see they have an eligible `span.kind` (server, consumer, client, producer).
## If enabled, a span with an eligible `span.kind` will have stats computed. If disabled, only top-level and measured spans will have stats computed.
## NOTE: For stats computed from OTel traces, only top-level spans are considered when this option is off.
## If you are sending OTel traces and want stats on non-top-level spans, this flag will need to be enabled.
# compute_stats_by_span_kind: false

## @param peer_service_aggregation - bool - default: false
## @env DD_APM_PEER_SERVICE_AGGREGATION - bool - default: false
## Enables `peer.service` aggregation in the agent. If disabled, aggregated trace stats will not include `peer.service` as a dimension.
## For the best experience with `peer.service`, it is recommended to also enable `compute_stats_by_span_kind`.
## If enabling both causes the Agent to consume too many resources, try disabling `compute_stats_by_span_kind` first.
## If the overhead remains high, it will be due to a high cardinality of `peer.service` values from the traces. You may need to check your instrumentation.
## NOTE: If you are using an OTel tracer it's best to have both enabled because client/producer spans with a `peer.service` value
## may not be marked by the Agent as top-level spans.
# peer_service_aggregation: false

## @param features - list of strings - optional
## @env DD_APM_FEATURES - comma separated list of strings - optional
## Configure additional beta APM features.
## The list of items available under apm_config.features is not guaranteed to persist across versions;
## The list of items available under apm_config.features is not guaranteed to persist across versions;
## a feature may eventually be promoted to its own configuration option on the agent, or dropped entirely.
#
# features: ["error_rare_sample_tracer_drop","table_names","component2name","sql_cache"]
Expand Down
59 changes: 59 additions & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1287,3 +1287,62 @@ fips:
err := setupFipsEndpoints(testConfig)
require.Error(t, err)
}

func TestEnablePeerServiceStatsAggregationYAML(t *testing.T) {
datadogYaml := `
apm_config:
peer_service_stats_aggregation: true
`
testConfig := setupConfFromYAML(datadogYaml)
err := setupFipsEndpoints(testConfig)
require.NoError(t, err)
require.True(t, testConfig.GetBool("apm_config.peer_service_stats_aggregation"))

datadogYaml = `
apm_config:
peer_service_stats_aggregation: false
`
testConfig = setupConfFromYAML(datadogYaml)
err = setupFipsEndpoints(testConfig)
require.NoError(t, err)
require.False(t, testConfig.GetBool("apm_config.peer_service_stats_aggregation"))
}

func TestEnablePeerServiceStatsAggregationEnv(t *testing.T) {
t.Setenv("DD_APM_PEER_SERVICE_STATS_AGGREGATION", "true")
testConfig := setupConfFromYAML("")
require.True(t, testConfig.GetBool("apm_config.peer_service_stats_aggregation"))
t.Setenv("DD_APM_PEER_SERVICE_STATS_AGGREGATION", "false")
testConfig = setupConfFromYAML("")
require.False(t, testConfig.GetBool("apm_config.peer_service_stats_aggregation"))
}

func TestEnableStatsComputationBySpanKindYAML(t *testing.T) {
datadogYaml := `
apm_config:
compute_stats_by_span_kind: false
`
testConfig := setupConfFromYAML(datadogYaml)
err := setupFipsEndpoints(testConfig)
require.NoError(t, err)
require.False(t, testConfig.GetBool("apm_config.compute_stats_by_span_kind"))

datadogYaml = `
apm_config:
compute_stats_by_span_kind: true
`
testConfig = setupConfFromYAML(datadogYaml)
err = setupFipsEndpoints(testConfig)
require.NoError(t, err)
require.True(t, testConfig.GetBool("apm_config.compute_stats_by_span_kind"))

}

func TestComputeStatsBySpanKindEnv(t *testing.T) {
t.Setenv("DD_APM_COMPUTE_STATS_BY_SPAN_KIND", "false")
testConfig := setupConfFromYAML("")
require.False(t, testConfig.GetBool("apm_config.compute_stats_by_span_kind"))
t.Setenv("DD_APM_COMPUTE_STATS_BY_SPAN_KIND", "true")
testConfig = setupConfFromYAML("")
require.True(t, testConfig.GetBool("apm_config.compute_stats_by_span_kind"))
}
20 changes: 20 additions & 0 deletions pkg/trace/agent/normalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
// tagSamplingPriority specifies the sampling priority of the trace.
// DEPRECATED: Priority is now specified as a TraceChunk field.
tagSamplingPriority = "_sampling_priority_v1"
// peerServiceKey is the key for the peer.service meta field.
peerServiceKey = "peer.service"
)

var (
Expand Down Expand Up @@ -60,6 +62,24 @@ func (a *Agent) normalize(ts *info.TagStats, s *pb.Span) error {
}
s.Service = svc

pSvc, ok := s.Meta[peerServiceKey]
if ok {
ps, err := traceutil.NormalizePeerService(pSvc)
switch err {
case traceutil.ErrTooLong:
ts.SpansMalformed.PeerServiceTruncate.Inc()
log.Debugf("Fixing malformed trace. peer.service is too long (reason:peer_service_truncate), truncating peer.service to length=%d: %s", traceutil.MaxServiceLen, ps)
case traceutil.ErrInvalid:
ts.SpansMalformed.PeerServiceInvalid.Inc()
log.Debugf("Fixing malformed trace. peer.service is invalid (reason:peer_service_invalid), replacing invalid peer.service=%s with empty string", pSvc)
default:
if err != nil {
log.Debugf("Unexpected error in peer.service normalization from original value (%s) to new value (%s): %s", pSvc, ps, err)
}
}
s.Meta[peerServiceKey] = ps
}

if a.conf.HasFeature("component2name") {
// This feature flag determines the component tag to become the span name.
//
Expand Down
23 changes: 12 additions & 11 deletions pkg/trace/api/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,17 +243,18 @@ func TestInfoHandler(t *testing.T) {
Host: "https://target-intake.datadoghq.com",
NoProxy: true,
}},
BucketInterval: time.Second,
ExtraAggregators: []string{"agg:val"},
ExtraSampleRate: 2.4,
TargetTPS: 11,
MaxEPS: 12,
ReceiverHost: "localhost",
ReceiverPort: 8111,
ReceiverSocket: "/sock/path",
ConnectionLimit: 12,
ReceiverTimeout: 100,
MaxRequestBytes: 123,
BucketInterval: time.Second,
ExtraAggregators: []string{"agg:val"},
PeerServiceAggregation: true,
ExtraSampleRate: 2.4,
TargetTPS: 11,
MaxEPS: 12,
ReceiverHost: "localhost",
ReceiverPort: 8111,
ReceiverSocket: "/sock/path",
ConnectionLimit: 12,
ReceiverTimeout: 100,
MaxRequestBytes: 123,
StatsWriter: &config.WriterConfig{
ConnectionLimit: 20,
QueueSize: 12,
Expand Down
6 changes: 4 additions & 2 deletions pkg/trace/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,10 @@ type AgentConfig struct {
Endpoints []*Endpoint

// Concentrator
BucketInterval time.Duration // the size of our pre-aggregation per bucket
ExtraAggregators []string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should ExtraAggregators get marked as deprecated as opposed to deleted? My worry here is that it might still live in a customer configuration somewhere, and if we were to re-introduce it in the future we could run into problems. I think this has to be considered like a "reserved" field of sorts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, we could start using is again with peer.service as a possible value that we set if enabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added this back with a comment about it being deprecated. It's not actually used in the current agent code because we don't create aggregation keys with strings anymore. We have the new struct key format, so we can't arbitrarily aggregate over other tag keys.

I think this only works for much older versions of the agent (probably any agent that doesn't use the new ClientStatsPayloads).

BucketInterval time.Duration // the size of our pre-aggregation per bucket
ExtraAggregators []string // DEPRECATED
PeerServiceAggregation bool // enables/disables stats aggregation for peer.service, used by Concentrator and ClientStatsAggregator
ComputeStatsBySpanKind bool // enables/disables the computing of stats based on a span's `span.kind` field

// Sampler configuration
ExtraSampleRate float64
Expand Down
20 changes: 12 additions & 8 deletions pkg/trace/info/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,8 @@ func TestPublishReceiverStats(t *testing.T) {
atom(10),
atom(11),
atom(12),
atom(13),
atom(14),
},
TracesFiltered: atom(4),
TracesPriorityNone: atom(5),
Expand Down Expand Up @@ -487,14 +489,16 @@ func TestPublishReceiverStats(t *testing.T) {
"ServiceEmpty": 2.0,
"ServiceTruncate": 3.0,
"ServiceInvalid": 4.0,
"SpanNameEmpty": 5.0,
"SpanNameTruncate": 6.0,
"SpanNameInvalid": 7.0,
"ResourceEmpty": 8.0,
"TypeTruncate": 9.0,
"InvalidStartDate": 10.0,
"InvalidDuration": 11.0,
"InvalidHTTPStatusCode": 12.0,
"PeerServiceTruncate": 5.0,
"PeerServiceInvalid": 6.0,
"SpanNameEmpty": 7.0,
"SpanNameTruncate": 8.0,
"SpanNameInvalid": 9.0,
"ResourceEmpty": 10.0,
"TypeTruncate": 11.0,
"InvalidStartDate": 12.0,
"InvalidDuration": 13.0,
"InvalidHTTPStatusCode": 14.0,
},
"SpansReceived": 10.0,
"TracerVersion": "",
Expand Down
8 changes: 8 additions & 0 deletions pkg/trace/info/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ type SpansMalformed struct {
ServiceTruncate atomic.Int64
// ServiceInvalid is when a span's Service doesn't conform to Datadog tag naming standards
ServiceInvalid atomic.Int64
// PeerServiceTruncate is when a span's peer.service is truncated for exceeding the max length
PeerServiceTruncate atomic.Int64
// PeerServiceInvalid is when a span's peer.service doesn't conform to Datadog tag naming standards
PeerServiceInvalid atomic.Int64
// SpanNameEmpty is when a span's Name is empty
SpanNameEmpty atomic.Int64
// SpanNameTruncate is when a span's Name is truncated for exceeding the max length
Expand All @@ -280,6 +284,8 @@ func (s *SpansMalformed) tagCounters() map[string]*atomic.Int64 {
"service_empty": &s.ServiceEmpty,
"service_truncate": &s.ServiceTruncate,
"service_invalid": &s.ServiceInvalid,
"peer_service_truncate": &s.PeerServiceTruncate,
"peer_service_invalid": &s.PeerServiceInvalid,
"span_name_empty": &s.SpanNameEmpty,
"span_name_truncate": &s.SpanNameTruncate,
"span_name_invalid": &s.SpanNameInvalid,
Expand Down Expand Up @@ -420,6 +426,8 @@ func (s *Stats) update(recent *Stats) {
s.SpansMalformed.ServiceEmpty.Add(recent.SpansMalformed.ServiceEmpty.Load())
s.SpansMalformed.ServiceTruncate.Add(recent.SpansMalformed.ServiceTruncate.Load())
s.SpansMalformed.ServiceInvalid.Add(recent.SpansMalformed.ServiceInvalid.Load())
s.SpansMalformed.PeerServiceTruncate.Add(recent.SpansMalformed.PeerServiceTruncate.Load())
s.SpansMalformed.PeerServiceInvalid.Add(recent.SpansMalformed.PeerServiceInvalid.Load())
s.SpansMalformed.SpanNameEmpty.Add(recent.SpansMalformed.SpanNameEmpty.Load())
s.SpansMalformed.SpanNameTruncate.Add(recent.SpansMalformed.SpanNameTruncate.Load())
s.SpansMalformed.SpanNameInvalid.Add(recent.SpansMalformed.SpanNameInvalid.Load())
Expand Down
19 changes: 12 additions & 7 deletions pkg/trace/info/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ package info
import (
"bytes"
"fmt"
"github.com/DataDog/datadog-agent/pkg/trace/log"
"reflect"
"strings"
"testing"
"time"

"github.com/DataDog/datadog-agent/pkg/trace/log"

"go.uber.org/atomic"

"github.com/DataDog/datadog-agent/pkg/trace/metrics"
Expand Down Expand Up @@ -60,6 +61,8 @@ func TestSpansMalformed(t *testing.T) {
"span_name_invalid": 0,
"span_name_empty": 0,
"service_truncate": 0,
"peer_service_truncate": 0,
"peer_service_invalid": 0,
"invalid_start_date": 0,
"invalid_http_status_code": 0,
"invalid_duration": 0,
Expand Down Expand Up @@ -241,12 +244,14 @@ func TestReceiverStats(t *testing.T) {
stats.SpansMalformed.ServiceInvalid.Store(4)
stats.SpansMalformed.SpanNameEmpty.Store(5)
stats.SpansMalformed.SpanNameTruncate.Store(6)
stats.SpansMalformed.SpanNameInvalid.Store(7)
stats.SpansMalformed.ResourceEmpty.Store(8)
stats.SpansMalformed.TypeTruncate.Store(9)
stats.SpansMalformed.InvalidStartDate.Store(10)
stats.SpansMalformed.InvalidDuration.Store(11)
stats.SpansMalformed.InvalidHTTPStatusCode.Store(12)
stats.SpansMalformed.PeerServiceTruncate.Store(7)
stats.SpansMalformed.PeerServiceInvalid.Store(8)
stats.SpansMalformed.SpanNameInvalid.Store(9)
stats.SpansMalformed.ResourceEmpty.Store(10)
stats.SpansMalformed.TypeTruncate.Store(11)
stats.SpansMalformed.InvalidStartDate.Store(12)
stats.SpansMalformed.InvalidDuration.Store(13)
stats.SpansMalformed.InvalidHTTPStatusCode.Store(14)
return &ReceiverStats{
Stats: map[Tags]*TagStats{
tags: {
Expand Down
Loading