From 70820e3c98d80af6e99c476f6755afdaae075c79 Mon Sep 17 00:00:00 2001 From: Jesse Gumz Date: Thu, 6 Apr 2023 09:30:32 -0700 Subject: [PATCH] [apm] peer.service aggregation for trace stats, option to compute stats based on span.kind (#16103) * [apm] initial commit; add peer.service to stats * [apm] re-generate msgpack to account for peer.service * [apm] add tests for peer.service aggregation * [apm] add comment to explain peer_service in proto * [apm] comment revision * [apm] add release notes * Update pkg/trace/stats/aggregation.go Co-authored-by: Diana Shevchenko <40775148+dianashevchenko@users.noreply.github.com> * Update pkg/trace/stats/aggregation.go Co-authored-by: Diana Shevchenko <40775148+dianashevchenko@users.noreply.github.com> * [apm] add consideration of span.kind == INTERNAL for calculating stats * [apm] consider CLIENT/PRODUCER specifically rather than INTERNAL for span.kind; add more tests * [apm] remove incorrect comment * [apm] correct tests, update release notes * [apm] RemoteOutbound -> RemoteOutgoing * [apm] remove unnecessary newlines * [apm] update peer.service test to use a more realistic example for its test case * Update pkg/trace/stats/concentrator.go Co-authored-by: Peter Kalmakis * Update releasenotes/notes/add-peer-service-for-trace-stats-225f1b90c9627c18.yaml Co-authored-by: Austin Lai <76412946+alai97@users.noreply.github.com> * Update releasenotes/notes/add-peer-service-for-trace-stats-225f1b90c9627c18.yaml Co-authored-by: Austin Lai <76412946+alai97@users.noreply.github.com> * [apm] fix bug from remote commit * [apm] remove consideration of span.kind for trace stats computation * [apm] add normalization for peer.service * [apm] add concentrator configuration to enable/disable peer.service stats aggregation * [apm] ensure peer.service is exported from client stats aggregator as well * [apm] update config_template.yaml * [apm] further clarify effect of disabling peer.service stats aggregation * [apm] fix test failures * [apm] rework configuration of peer.service, add back extra aggregators for concentrator * [apm] set peer service aggregation to false by default, ensure config is loaded properly * [apm] revise variable name to be consistent, add check for peer service aggregation in client stats aggregator * [apm] add logic to compute stats based on specific span.kind values * [apm] add config for option to enable stats computation by span.kind * [apm] update documentation and release notes * Update cmd/trace-agent/config/config.go Co-authored-by: Ahmed Mezghani <38987709+ahmed-mez@users.noreply.github.com> * Update pkg/trace/traceutil/span.go Co-authored-by: Ahmed Mezghani <38987709+ahmed-mez@users.noreply.github.com> * [apm] rename peer service aggregation config field * [apm] add more test cases to ensure case insensitivity for span.kind check * [apm] move span.kind check func implementation to concentrator.go * Update pkg/trace/stats/concentrator.go Co-authored-by: Peter Kalmakis * [apm] finish move of span.kind check func * [apm] go back to setting a bool flag for peer.service in concentrator * [apm] add back ExtraAggregators for info test case * [apm] fix testutil fixture BucketWithSpans * [apm] move peer.service aggregation check back into NewAggregationFromSpan * [apm] change CSA to use bool for peer svc aggregation as well * [apm] remove unused const * [apm] update guidance on new config options * [apm] add small test for bucket aggregation key creation and peer.service in CSA * [apm] remove unused field * [apm] fix fuzz test * [apm] fix stats info tests --------- Co-authored-by: Diana Shevchenko <40775148+dianashevchenko@users.noreply.github.com> Co-authored-by: Peter Kalmakis Co-authored-by: Austin Lai <76412946+alai97@users.noreply.github.com> Co-authored-by: Ahmed Mezghani <38987709+ahmed-mez@users.noreply.github.com> --- cmd/trace-agent/config/config.go | 2 + cmd/trace-agent/config/config_test.go | 44 +++ pkg/config/apm.go | 2 + pkg/config/config_template.yaml | 18 + pkg/config/config_test.go | 59 +++ pkg/trace/agent/normalizer.go | 20 + pkg/trace/api/info_test.go | 23 +- pkg/trace/config/config.go | 6 +- pkg/trace/info/info_test.go | 20 +- pkg/trace/info/stats.go | 8 + pkg/trace/info/stats_test.go | 23 +- pkg/trace/pb/stats.pb.go | 354 ++++++++++++------ pkg/trace/pb/stats.proto | 1 + pkg/trace/pb/stats_gen.go | 189 +++++++++- pkg/trace/pb/stats_gen_test.go | 17 +- pkg/trace/stats/aggregation.go | 37 +- pkg/trace/stats/aggregation_test.go | 38 ++ pkg/trace/stats/client_stats_aggregator.go | 58 +-- .../stats/client_stats_aggregator_test.go | 85 +++++ pkg/trace/stats/concentrator.go | 54 ++- pkg/trace/stats/concentrator_test.go | 220 +++++++++++ pkg/trace/stats/statsraw.go | 5 +- pkg/trace/stats/statsraw_test.go | 53 ++- pkg/trace/testutil/stats.go | 2 +- pkg/trace/traceutil/normalize.go | 18 + pkg/trace/traceutil/normalize_test.go | 34 ++ pkg/trace/traceutil/span.go | 1 - ...vice-for-trace-stats-225f1b90c9627c18.yaml | 6 + 28 files changed, 1170 insertions(+), 227 deletions(-) create mode 100644 releasenotes/notes/add-peer-service-for-trace-stats-225f1b90c9627c18.yaml diff --git a/cmd/trace-agent/config/config.go b/cmd/trace-agent/config/config.go index 1889561ba7e5f..9f1d47f39aab3 100644 --- a/cmd/trace-agent/config/config.go +++ b/cmd/trace-agent/config/config.go @@ -194,6 +194,8 @@ func applyDatadogConfig(c *config.AgentConfig) error { if coreconfig.Datadog.IsSet("apm_config.connection_limit") { c.ConnectionLimit = coreconfig.Datadog.GetInt("apm_config.connection_limit") } + c.PeerServiceAggregation = coreconfig.Datadog.GetBool("apm_config.peer_service_aggregation") + c.ComputeStatsBySpanKind = coreconfig.Datadog.GetBool("apm_config.compute_stats_by_span_kind") if coreconfig.Datadog.IsSet("apm_config.extra_sample_rate") { c.ExtraSampleRate = coreconfig.Datadog.GetFloat64("apm_config.extra_sample_rate") } diff --git a/cmd/trace-agent/config/config_test.go b/cmd/trace-agent/config/config_test.go index b0a316b47be12..3737851036fa5 100644 --- a/cmd/trace-agent/config/config_test.go +++ b/cmd/trace-agent/config/config_test.go @@ -1094,3 +1094,47 @@ func TestSetMaxMemCPU(t *testing.T) { assert.Equal(t, 300.0, c.MaxMemory) }) } + +func TestPeerServiceAggregation(t *testing.T) { + t.Run("disabled", func(t *testing.T) { + defer cleanConfig() + cfg := config.New() + err := applyDatadogConfig(cfg) + + assert := assert.New(t) + assert.NoError(err) + assert.False(cfg.PeerServiceAggregation) + }) + t.Run("enabled", func(t *testing.T) { + defer cleanConfig() + coreconfig.Datadog.Set("apm_config.peer_service_aggregation", true) + cfg := config.New() + err := applyDatadogConfig(cfg) + + assert := assert.New(t) + assert.NoError(err) + assert.True(cfg.PeerServiceAggregation) + }) +} + +func TestComputeStatsBySpanKind(t *testing.T) { + t.Run("disabled", func(t *testing.T) { + defer cleanConfig() + cfg := config.New() + err := applyDatadogConfig(cfg) + + assert := assert.New(t) + assert.NoError(err) + assert.False(cfg.ComputeStatsBySpanKind) + }) + t.Run("enabled", func(t *testing.T) { + defer cleanConfig() + coreconfig.Datadog.Set("apm_config.compute_stats_by_span_kind", true) + cfg := config.New() + err := applyDatadogConfig(cfg) + + assert := assert.New(t) + assert.NoError(err) + assert.True(cfg.ComputeStatsBySpanKind) + }) +} diff --git a/pkg/config/apm.go b/pkg/config/apm.go index 63c6260a3b262..e9fa08f0de7d9 100644 --- a/pkg/config/apm.go +++ b/pkg/config/apm.go @@ -68,6 +68,8 @@ func setupAPM(config Config) { config.BindEnvAndSetDefault("apm_config.windows_pipe_buffer_size", 1_000_000, "DD_APM_WINDOWS_PIPE_BUFFER_SIZE") //nolint:errcheck config.BindEnvAndSetDefault("apm_config.windows_pipe_security_descriptor", "D:AI(A;;GA;;;WD)", "DD_APM_WINDOWS_PIPE_SECURITY_DESCRIPTOR") //nolint:errcheck config.BindEnvAndSetDefault("apm_config.remote_tagger", true, "DD_APM_REMOTE_TAGGER") //nolint:errcheck + config.BindEnvAndSetDefault("apm_config.peer_service_stats_aggregation", false, "DD_APM_PEER_SERVICE_STATS_AGGREGATION") //nolint:errcheck + config.BindEnvAndSetDefault("apm_config.compute_stats_by_span_kind", false, "DD_APM_COMPUTE_STATS_BY_SPAN_KIND") //nolint:errcheck config.BindEnv("apm_config.max_catalog_services", "DD_APM_MAX_CATALOG_SERVICES") config.BindEnv("apm_config.receiver_timeout", "DD_APM_RECEIVER_TIMEOUT") diff --git a/pkg/config/config_template.yaml b/pkg/config/config_template.yaml index 2202e42135402..e3fd63cb7a907 100644 --- a/pkg/config/config_template.yaml +++ b/pkg/config/config_template.yaml @@ -1355,6 +1355,24 @@ api_key: # # connection_limit: 2000 + ## @param compute_stats_by_span_kind - bool - default: false + ## @env DD_APM_COMPUTE_STATS_BY_SPAN_KIND - bool - default: false + ## Enables an additional stats computation check on spans to see they have an eligible `span.kind` (server, consumer, client, producer). + ## If enabled, a span with an eligible `span.kind` will have stats computed. If disabled, only top-level and measured spans will have stats computed. + ## NOTE: For stats computed from OTel traces, only top-level spans are considered when this option is off. + ## If you are sending OTel traces and want stats on non-top-level spans, this flag will need to be enabled. + # compute_stats_by_span_kind: false + + ## @param peer_service_aggregation - bool - default: false + ## @env DD_APM_PEER_SERVICE_AGGREGATION - bool - default: false + ## Enables `peer.service` aggregation in the agent. If disabled, aggregated trace stats will not include `peer.service` as a dimension. + ## For the best experience with `peer.service`, it is recommended to also enable `compute_stats_by_span_kind`. + ## If enabling both causes the Agent to consume too many resources, try disabling `compute_stats_by_span_kind` first. + ## If the overhead remains high, it will be due to a high cardinality of `peer.service` values from the traces. You may need to check your instrumentation. + ## NOTE: If you are using an OTel tracer it's best to have both enabled because client/producer spans with a `peer.service` value + ## may not be marked by the Agent as top-level spans. + # peer_service_aggregation: false + ## @param features - list of strings - optional ## @env DD_APM_FEATURES - comma separated list of strings - optional ## Configure additional beta APM features. diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 2effc362cec8f..3adfac5b1b454 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -1298,3 +1298,62 @@ fips: err := setupFipsEndpoints(testConfig) require.Error(t, err) } + +func TestEnablePeerServiceStatsAggregationYAML(t *testing.T) { + datadogYaml := ` +apm_config: + peer_service_stats_aggregation: true +` + testConfig := setupConfFromYAML(datadogYaml) + err := setupFipsEndpoints(testConfig) + require.NoError(t, err) + require.True(t, testConfig.GetBool("apm_config.peer_service_stats_aggregation")) + + datadogYaml = ` +apm_config: + peer_service_stats_aggregation: false +` + testConfig = setupConfFromYAML(datadogYaml) + err = setupFipsEndpoints(testConfig) + require.NoError(t, err) + require.False(t, testConfig.GetBool("apm_config.peer_service_stats_aggregation")) +} + +func TestEnablePeerServiceStatsAggregationEnv(t *testing.T) { + t.Setenv("DD_APM_PEER_SERVICE_STATS_AGGREGATION", "true") + testConfig := setupConfFromYAML("") + require.True(t, testConfig.GetBool("apm_config.peer_service_stats_aggregation")) + t.Setenv("DD_APM_PEER_SERVICE_STATS_AGGREGATION", "false") + testConfig = setupConfFromYAML("") + require.False(t, testConfig.GetBool("apm_config.peer_service_stats_aggregation")) +} + +func TestEnableStatsComputationBySpanKindYAML(t *testing.T) { + datadogYaml := ` +apm_config: + compute_stats_by_span_kind: false +` + testConfig := setupConfFromYAML(datadogYaml) + err := setupFipsEndpoints(testConfig) + require.NoError(t, err) + require.False(t, testConfig.GetBool("apm_config.compute_stats_by_span_kind")) + + datadogYaml = ` +apm_config: + compute_stats_by_span_kind: true +` + testConfig = setupConfFromYAML(datadogYaml) + err = setupFipsEndpoints(testConfig) + require.NoError(t, err) + require.True(t, testConfig.GetBool("apm_config.compute_stats_by_span_kind")) + +} + +func TestComputeStatsBySpanKindEnv(t *testing.T) { + t.Setenv("DD_APM_COMPUTE_STATS_BY_SPAN_KIND", "false") + testConfig := setupConfFromYAML("") + require.False(t, testConfig.GetBool("apm_config.compute_stats_by_span_kind")) + t.Setenv("DD_APM_COMPUTE_STATS_BY_SPAN_KIND", "true") + testConfig = setupConfFromYAML("") + require.True(t, testConfig.GetBool("apm_config.compute_stats_by_span_kind")) +} diff --git a/pkg/trace/agent/normalizer.go b/pkg/trace/agent/normalizer.go index 3d002b9b89248..23a068bbae611 100644 --- a/pkg/trace/agent/normalizer.go +++ b/pkg/trace/agent/normalizer.go @@ -28,6 +28,8 @@ const ( // tagSamplingPriority specifies the sampling priority of the trace. // DEPRECATED: Priority is now specified as a TraceChunk field. tagSamplingPriority = "_sampling_priority_v1" + // peerServiceKey is the key for the peer.service meta field. + peerServiceKey = "peer.service" ) var ( @@ -60,6 +62,24 @@ func (a *Agent) normalize(ts *info.TagStats, s *pb.Span) error { } s.Service = svc + pSvc, ok := s.Meta[peerServiceKey] + if ok { + ps, err := traceutil.NormalizePeerService(pSvc) + switch err { + case traceutil.ErrTooLong: + ts.SpansMalformed.PeerServiceTruncate.Inc() + log.Debugf("Fixing malformed trace. peer.service is too long (reason:peer_service_truncate), truncating peer.service to length=%d: %s", traceutil.MaxServiceLen, ps) + case traceutil.ErrInvalid: + ts.SpansMalformed.PeerServiceInvalid.Inc() + log.Debugf("Fixing malformed trace. peer.service is invalid (reason:peer_service_invalid), replacing invalid peer.service=%s with empty string", pSvc) + default: + if err != nil { + log.Debugf("Unexpected error in peer.service normalization from original value (%s) to new value (%s): %s", pSvc, ps, err) + } + } + s.Meta[peerServiceKey] = ps + } + if a.conf.HasFeature("component2name") { // This feature flag determines the component tag to become the span name. // diff --git a/pkg/trace/api/info_test.go b/pkg/trace/api/info_test.go index 141697d1152de..d9d9aa8616a17 100644 --- a/pkg/trace/api/info_test.go +++ b/pkg/trace/api/info_test.go @@ -243,17 +243,18 @@ func TestInfoHandler(t *testing.T) { Host: "https://target-intake.datadoghq.com", NoProxy: true, }}, - BucketInterval: time.Second, - ExtraAggregators: []string{"agg:val"}, - ExtraSampleRate: 2.4, - TargetTPS: 11, - MaxEPS: 12, - ReceiverHost: "localhost", - ReceiverPort: 8111, - ReceiverSocket: "/sock/path", - ConnectionLimit: 12, - ReceiverTimeout: 100, - MaxRequestBytes: 123, + BucketInterval: time.Second, + ExtraAggregators: []string{"agg:val"}, + PeerServiceAggregation: true, + ExtraSampleRate: 2.4, + TargetTPS: 11, + MaxEPS: 12, + ReceiverHost: "localhost", + ReceiverPort: 8111, + ReceiverSocket: "/sock/path", + ConnectionLimit: 12, + ReceiverTimeout: 100, + MaxRequestBytes: 123, StatsWriter: &config.WriterConfig{ ConnectionLimit: 20, QueueSize: 12, diff --git a/pkg/trace/config/config.go b/pkg/trace/config/config.go index 5f6d04e16227e..57b34f11722d5 100644 --- a/pkg/trace/config/config.go +++ b/pkg/trace/config/config.go @@ -313,8 +313,10 @@ type AgentConfig struct { Endpoints []*Endpoint // Concentrator - BucketInterval time.Duration // the size of our pre-aggregation per bucket - ExtraAggregators []string + BucketInterval time.Duration // the size of our pre-aggregation per bucket + ExtraAggregators []string // DEPRECATED + PeerServiceAggregation bool // enables/disables stats aggregation for peer.service, used by Concentrator and ClientStatsAggregator + ComputeStatsBySpanKind bool // enables/disables the computing of stats based on a span's `span.kind` field // Sampler configuration ExtraSampleRate float64 diff --git a/pkg/trace/info/info_test.go b/pkg/trace/info/info_test.go index dac350b2c585c..3181a04229d76 100644 --- a/pkg/trace/info/info_test.go +++ b/pkg/trace/info/info_test.go @@ -442,6 +442,8 @@ func TestPublishReceiverStats(t *testing.T) { atom(10), atom(11), atom(12), + atom(13), + atom(14), }, TracesFiltered: atom(4), TracesPriorityNone: atom(5), @@ -487,14 +489,16 @@ func TestPublishReceiverStats(t *testing.T) { "ServiceEmpty": 2.0, "ServiceTruncate": 3.0, "ServiceInvalid": 4.0, - "SpanNameEmpty": 5.0, - "SpanNameTruncate": 6.0, - "SpanNameInvalid": 7.0, - "ResourceEmpty": 8.0, - "TypeTruncate": 9.0, - "InvalidStartDate": 10.0, - "InvalidDuration": 11.0, - "InvalidHTTPStatusCode": 12.0, + "PeerServiceTruncate": 5.0, + "PeerServiceInvalid": 6.0, + "SpanNameEmpty": 7.0, + "SpanNameTruncate": 8.0, + "SpanNameInvalid": 9.0, + "ResourceEmpty": 10.0, + "TypeTruncate": 11.0, + "InvalidStartDate": 12.0, + "InvalidDuration": 13.0, + "InvalidHTTPStatusCode": 14.0, }, "SpansReceived": 10.0, "TracerVersion": "", diff --git a/pkg/trace/info/stats.go b/pkg/trace/info/stats.go index 2d4a50df95a1c..3c0005fcbe1cc 100644 --- a/pkg/trace/info/stats.go +++ b/pkg/trace/info/stats.go @@ -256,6 +256,10 @@ type SpansMalformed struct { ServiceTruncate atomic.Int64 // ServiceInvalid is when a span's Service doesn't conform to Datadog tag naming standards ServiceInvalid atomic.Int64 + // PeerServiceTruncate is when a span's peer.service is truncated for exceeding the max length + PeerServiceTruncate atomic.Int64 + // PeerServiceInvalid is when a span's peer.service doesn't conform to Datadog tag naming standards + PeerServiceInvalid atomic.Int64 // SpanNameEmpty is when a span's Name is empty SpanNameEmpty atomic.Int64 // SpanNameTruncate is when a span's Name is truncated for exceeding the max length @@ -280,6 +284,8 @@ func (s *SpansMalformed) tagCounters() map[string]*atomic.Int64 { "service_empty": &s.ServiceEmpty, "service_truncate": &s.ServiceTruncate, "service_invalid": &s.ServiceInvalid, + "peer_service_truncate": &s.PeerServiceTruncate, + "peer_service_invalid": &s.PeerServiceInvalid, "span_name_empty": &s.SpanNameEmpty, "span_name_truncate": &s.SpanNameTruncate, "span_name_invalid": &s.SpanNameInvalid, @@ -420,6 +426,8 @@ func (s *Stats) update(recent *Stats) { s.SpansMalformed.ServiceEmpty.Add(recent.SpansMalformed.ServiceEmpty.Load()) s.SpansMalformed.ServiceTruncate.Add(recent.SpansMalformed.ServiceTruncate.Load()) s.SpansMalformed.ServiceInvalid.Add(recent.SpansMalformed.ServiceInvalid.Load()) + s.SpansMalformed.PeerServiceTruncate.Add(recent.SpansMalformed.PeerServiceTruncate.Load()) + s.SpansMalformed.PeerServiceInvalid.Add(recent.SpansMalformed.PeerServiceInvalid.Load()) s.SpansMalformed.SpanNameEmpty.Add(recent.SpansMalformed.SpanNameEmpty.Load()) s.SpansMalformed.SpanNameTruncate.Add(recent.SpansMalformed.SpanNameTruncate.Load()) s.SpansMalformed.SpanNameInvalid.Add(recent.SpansMalformed.SpanNameInvalid.Load()) diff --git a/pkg/trace/info/stats_test.go b/pkg/trace/info/stats_test.go index 521739d7199ca..14b4e32002c3e 100644 --- a/pkg/trace/info/stats_test.go +++ b/pkg/trace/info/stats_test.go @@ -8,12 +8,13 @@ package info import ( "bytes" "fmt" - "github.com/DataDog/datadog-agent/pkg/trace/log" "reflect" "strings" "testing" "time" + "github.com/DataDog/datadog-agent/pkg/trace/log" + "go.uber.org/atomic" "github.com/DataDog/datadog-agent/pkg/trace/metrics" @@ -60,6 +61,8 @@ func TestSpansMalformed(t *testing.T) { "span_name_invalid": 0, "span_name_empty": 0, "service_truncate": 0, + "peer_service_truncate": 0, + "peer_service_invalid": 0, "invalid_start_date": 0, "invalid_http_status_code": 0, "invalid_duration": 0, @@ -241,12 +244,14 @@ func TestReceiverStats(t *testing.T) { stats.SpansMalformed.ServiceInvalid.Store(4) stats.SpansMalformed.SpanNameEmpty.Store(5) stats.SpansMalformed.SpanNameTruncate.Store(6) - stats.SpansMalformed.SpanNameInvalid.Store(7) - stats.SpansMalformed.ResourceEmpty.Store(8) - stats.SpansMalformed.TypeTruncate.Store(9) - stats.SpansMalformed.InvalidStartDate.Store(10) - stats.SpansMalformed.InvalidDuration.Store(11) - stats.SpansMalformed.InvalidHTTPStatusCode.Store(12) + stats.SpansMalformed.PeerServiceTruncate.Store(7) + stats.SpansMalformed.PeerServiceInvalid.Store(8) + stats.SpansMalformed.SpanNameInvalid.Store(9) + stats.SpansMalformed.ResourceEmpty.Store(10) + stats.SpansMalformed.TypeTruncate.Store(11) + stats.SpansMalformed.InvalidStartDate.Store(12) + stats.SpansMalformed.InvalidDuration.Store(13) + stats.SpansMalformed.InvalidHTTPStatusCode.Store(14) return &ReceiverStats{ Stats: map[Tags]*TagStats{ tags: { @@ -260,7 +265,7 @@ func TestReceiverStats(t *testing.T) { t.Run("PublishAndReset", func(t *testing.T) { rs := testStats() rs.PublishAndReset() - assert.EqualValues(t, 39, statsclient.counts.Load()) + assert.EqualValues(t, 41, statsclient.counts.Load()) assertStatsAreReset(t, rs) }) @@ -282,7 +287,7 @@ func TestReceiverStats(t *testing.T) { logs := strings.Split(b.String(), "\n") assert.Equal(t, "[INFO] [lang:go lang_version:1.12 lang_vendor:gov interpreter:gcc tracer_version:1.33 endpoint_version:v0.4] -> traces received: 1, traces filtered: 4, traces amount: 9 bytes, events extracted: 13, events sampled: 14", logs[0]) - assert.Equal(t, "[WARN] [lang:go lang_version:1.12 lang_vendor:gov interpreter:gcc tracer_version:1.33 endpoint_version:v0.4] -> traces_dropped(decoding_error:1, empty_trace:3, foreign_span:6, payload_too_large:2, span_id_zero:5, timeout:7, trace_id_zero:4, unexpected_eof:8), spans_malformed(duplicate_span_id:1, invalid_duration:11, invalid_http_status_code:12, invalid_start_date:10, resource_empty:8, service_empty:2, service_invalid:4, service_truncate:3, span_name_empty:5, span_name_invalid:7, span_name_truncate:6, type_truncate:9). Enable debug logging for more details.", + assert.Equal(t, "[WARN] [lang:go lang_version:1.12 lang_vendor:gov interpreter:gcc tracer_version:1.33 endpoint_version:v0.4] -> traces_dropped(decoding_error:1, empty_trace:3, foreign_span:6, payload_too_large:2, span_id_zero:5, timeout:7, trace_id_zero:4, unexpected_eof:8), spans_malformed(duplicate_span_id:1, invalid_duration:13, invalid_http_status_code:14, invalid_start_date:12, peer_service_invalid:8, peer_service_truncate:7, resource_empty:10, service_empty:2, service_invalid:4, service_truncate:3, span_name_empty:5, span_name_invalid:9, span_name_truncate:6, type_truncate:11). Enable debug logging for more details.", logs[1]) assertStatsAreReset(t, rs) diff --git a/pkg/trace/pb/stats.pb.go b/pkg/trace/pb/stats.pb.go index 47bbc8f7fce2b..568397dbf57c9 100644 --- a/pkg/trace/pb/stats.pb.go +++ b/pkg/trace/pb/stats.pb.go @@ -3,12 +3,13 @@ package pb -import proto "github.com/gogo/protobuf/proto" -import fmt "fmt" -import math "math" -import _ "github.com/gogo/protobuf/gogoproto" - -import io "io" +import ( + fmt "fmt" + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" + io "io" + math "math" +) // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal @@ -25,7 +26,7 @@ const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package type StatsPayload struct { AgentHostname string `protobuf:"bytes,1,opt,name=agentHostname,proto3" json:"agentHostname,omitempty"` AgentEnv string `protobuf:"bytes,2,opt,name=agentEnv,proto3" json:"agentEnv,omitempty"` - Stats []ClientStatsPayload `protobuf:"bytes,3,rep,name=stats" json:"stats"` + Stats []ClientStatsPayload `protobuf:"bytes,3,rep,name=stats,proto3" json:"stats"` AgentVersion string `protobuf:"bytes,4,opt,name=agentVersion,proto3" json:"agentVersion,omitempty"` ClientComputed bool `protobuf:"varint,5,opt,name=clientComputed,proto3" json:"clientComputed,omitempty"` } @@ -34,7 +35,7 @@ func (m *StatsPayload) Reset() { *m = StatsPayload{} } func (m *StatsPayload) String() string { return proto.CompactTextString(m) } func (*StatsPayload) ProtoMessage() {} func (*StatsPayload) Descriptor() ([]byte, []int) { - return fileDescriptor_stats_51aff86f1ecec260, []int{0} + return fileDescriptor_b4756a0aec8b9d44, []int{0} } func (m *StatsPayload) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -51,8 +52,8 @@ func (m *StatsPayload) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) return b[:n], nil } } -func (dst *StatsPayload) XXX_Merge(src proto.Message) { - xxx_messageInfo_StatsPayload.Merge(dst, src) +func (m *StatsPayload) XXX_Merge(src proto.Message) { + xxx_messageInfo_StatsPayload.Merge(m, src) } func (m *StatsPayload) XXX_Size() int { return m.Size() @@ -106,7 +107,7 @@ type ClientStatsPayload struct { Hostname string `protobuf:"bytes,1,opt,name=hostname,proto3" json:"hostname,omitempty"` Env string `protobuf:"bytes,2,opt,name=env,proto3" json:"env,omitempty"` Version string `protobuf:"bytes,3,opt,name=version,proto3" json:"version,omitempty"` - Stats []ClientStatsBucket `protobuf:"bytes,4,rep,name=stats" json:"stats"` + Stats []ClientStatsBucket `protobuf:"bytes,4,rep,name=stats,proto3" json:"stats"` Lang string `protobuf:"bytes,5,opt,name=lang,proto3" json:"lang,omitempty"` TracerVersion string `protobuf:"bytes,6,opt,name=tracerVersion,proto3" json:"tracerVersion,omitempty"` RuntimeID string `protobuf:"bytes,7,opt,name=runtimeID,proto3" json:"runtimeID,omitempty"` @@ -122,14 +123,14 @@ type ClientStatsPayload struct { ContainerID string `protobuf:"bytes,11,opt,name=containerID,proto3" json:"containerID,omitempty"` // Tags specifies a set of tags obtained from the orchestrator (where applicable) using the specified containerID. // This field should be left empty by the client. It only applies to some specific environment. - Tags []string `protobuf:"bytes,12,rep,name=tags" json:"tags,omitempty"` + Tags []string `protobuf:"bytes,12,rep,name=tags,proto3" json:"tags,omitempty"` } func (m *ClientStatsPayload) Reset() { *m = ClientStatsPayload{} } func (m *ClientStatsPayload) String() string { return proto.CompactTextString(m) } func (*ClientStatsPayload) ProtoMessage() {} func (*ClientStatsPayload) Descriptor() ([]byte, []int) { - return fileDescriptor_stats_51aff86f1ecec260, []int{1} + return fileDescriptor_b4756a0aec8b9d44, []int{1} } func (m *ClientStatsPayload) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -146,8 +147,8 @@ func (m *ClientStatsPayload) XXX_Marshal(b []byte, deterministic bool) ([]byte, return b[:n], nil } } -func (dst *ClientStatsPayload) XXX_Merge(src proto.Message) { - xxx_messageInfo_ClientStatsPayload.Merge(dst, src) +func (m *ClientStatsPayload) XXX_Merge(src proto.Message) { + xxx_messageInfo_ClientStatsPayload.Merge(m, src) } func (m *ClientStatsPayload) XXX_Size() int { return m.Size() @@ -246,7 +247,7 @@ func (m *ClientStatsPayload) GetTags() []string { type ClientStatsBucket struct { Start uint64 `protobuf:"varint,1,opt,name=start,proto3" json:"start,omitempty"` Duration uint64 `protobuf:"varint,2,opt,name=duration,proto3" json:"duration,omitempty"` - Stats []ClientGroupedStats `protobuf:"bytes,3,rep,name=stats" json:"stats"` + Stats []ClientGroupedStats `protobuf:"bytes,3,rep,name=stats,proto3" json:"stats"` // AgentTimeShift is the shift applied by the agent stats aggregator on bucket start // when the received bucket start is outside of the agent aggregation window AgentTimeShift int64 `protobuf:"varint,4,opt,name=agentTimeShift,proto3" json:"agentTimeShift,omitempty"` @@ -256,7 +257,7 @@ func (m *ClientStatsBucket) Reset() { *m = ClientStatsBucket{} } func (m *ClientStatsBucket) String() string { return proto.CompactTextString(m) } func (*ClientStatsBucket) ProtoMessage() {} func (*ClientStatsBucket) Descriptor() ([]byte, []int) { - return fileDescriptor_stats_51aff86f1ecec260, []int{2} + return fileDescriptor_b4756a0aec8b9d44, []int{2} } func (m *ClientStatsBucket) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -273,8 +274,8 @@ func (m *ClientStatsBucket) XXX_Marshal(b []byte, deterministic bool) ([]byte, e return b[:n], nil } } -func (dst *ClientStatsBucket) XXX_Merge(src proto.Message) { - xxx_messageInfo_ClientStatsBucket.Merge(dst, src) +func (m *ClientStatsBucket) XXX_Merge(src proto.Message) { + xxx_messageInfo_ClientStatsBucket.Merge(m, src) } func (m *ClientStatsBucket) XXX_Size() int { return m.Size() @@ -328,13 +329,14 @@ type ClientGroupedStats struct { ErrorSummary []byte `protobuf:"bytes,11,opt,name=errorSummary,proto3" json:"errorSummary,omitempty"` Synthetics bool `protobuf:"varint,12,opt,name=synthetics,proto3" json:"synthetics,omitempty"` TopLevelHits uint64 `protobuf:"varint,13,opt,name=topLevelHits,proto3" json:"topLevelHits,omitempty"` + PeerService string `protobuf:"bytes,14,opt,name=peer_service,json=peerService,proto3" json:"peer_service,omitempty"` } func (m *ClientGroupedStats) Reset() { *m = ClientGroupedStats{} } func (m *ClientGroupedStats) String() string { return proto.CompactTextString(m) } func (*ClientGroupedStats) ProtoMessage() {} func (*ClientGroupedStats) Descriptor() ([]byte, []int) { - return fileDescriptor_stats_51aff86f1ecec260, []int{3} + return fileDescriptor_b4756a0aec8b9d44, []int{3} } func (m *ClientGroupedStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -351,8 +353,8 @@ func (m *ClientGroupedStats) XXX_Marshal(b []byte, deterministic bool) ([]byte, return b[:n], nil } } -func (dst *ClientGroupedStats) XXX_Merge(src proto.Message) { - xxx_messageInfo_ClientGroupedStats.Merge(dst, src) +func (m *ClientGroupedStats) XXX_Merge(src proto.Message) { + xxx_messageInfo_ClientGroupedStats.Merge(m, src) } func (m *ClientGroupedStats) XXX_Size() int { return m.Size() @@ -454,12 +456,67 @@ func (m *ClientGroupedStats) GetTopLevelHits() uint64 { return 0 } +func (m *ClientGroupedStats) GetPeerService() string { + if m != nil { + return m.PeerService + } + return "" +} + func init() { proto.RegisterType((*StatsPayload)(nil), "pb.StatsPayload") proto.RegisterType((*ClientStatsPayload)(nil), "pb.ClientStatsPayload") proto.RegisterType((*ClientStatsBucket)(nil), "pb.ClientStatsBucket") proto.RegisterType((*ClientGroupedStats)(nil), "pb.ClientGroupedStats") } + +func init() { proto.RegisterFile("stats.proto", fileDescriptor_b4756a0aec8b9d44) } + +var fileDescriptor_b4756a0aec8b9d44 = []byte{ + // 645 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x54, 0xc1, 0x6e, 0xda, 0x40, + 0x10, 0xc5, 0xd8, 0x21, 0x61, 0x20, 0x28, 0x5d, 0xb5, 0xa9, 0x15, 0x45, 0x2e, 0x45, 0x55, 0x84, + 0x2a, 0x95, 0xa8, 0xe9, 0x17, 0x94, 0x50, 0x35, 0x91, 0x7a, 0x88, 0x0c, 0xea, 0x15, 0x19, 0x33, + 0x31, 0x56, 0xb0, 0xd7, 0xdd, 0x5d, 0x23, 0xf1, 0x17, 0xfd, 0x85, 0x5e, 0xfa, 0x2d, 0x39, 0xe6, + 0xd8, 0x53, 0x55, 0x25, 0x1f, 0xd2, 0x6a, 0x67, 0x31, 0xc1, 0x44, 0xea, 0x6d, 0xde, 0xdb, 0x61, + 0x76, 0xde, 0xf3, 0x5b, 0xa0, 0x21, 0x55, 0xa0, 0x64, 0x2f, 0x13, 0x5c, 0x71, 0x56, 0xcd, 0x26, + 0x47, 0xef, 0xa2, 0x58, 0xcd, 0xf2, 0x49, 0x2f, 0xe4, 0xc9, 0x69, 0xc4, 0x23, 0x7e, 0x4a, 0x47, + 0x93, 0xfc, 0x9a, 0x10, 0x01, 0xaa, 0xcc, 0x4f, 0x3a, 0x77, 0x16, 0x34, 0x87, 0x7a, 0xc4, 0x55, + 0xb0, 0x9c, 0xf3, 0x60, 0xca, 0xde, 0xc0, 0x7e, 0x10, 0x61, 0xaa, 0x2e, 0xb8, 0x54, 0x69, 0x90, + 0xa0, 0x6b, 0xb5, 0xad, 0x6e, 0xdd, 0x2f, 0x93, 0xec, 0x08, 0xf6, 0x88, 0xf8, 0x94, 0x2e, 0xdc, + 0x2a, 0x35, 0xac, 0x31, 0x3b, 0x83, 0x1d, 0x5a, 0xca, 0xb5, 0xdb, 0x76, 0xb7, 0x71, 0x76, 0xd8, + 0xcb, 0x26, 0xbd, 0xf3, 0x79, 0x8c, 0xa9, 0xda, 0xbc, 0xa8, 0xef, 0xdc, 0xfe, 0x7e, 0x55, 0xf1, + 0x4d, 0x2b, 0xeb, 0x40, 0x93, 0x7e, 0xff, 0x15, 0x85, 0x8c, 0x79, 0xea, 0x3a, 0x34, 0xb3, 0xc4, + 0xb1, 0x13, 0x68, 0x85, 0x34, 0xe6, 0x9c, 0x27, 0x59, 0xae, 0x70, 0xea, 0xee, 0xb4, 0xad, 0xee, + 0x9e, 0xbf, 0xc5, 0x76, 0xfe, 0x56, 0x81, 0x3d, 0xbd, 0x4f, 0xaf, 0x3c, 0x2b, 0x6b, 0x5a, 0x63, + 0x76, 0x00, 0x36, 0xae, 0x95, 0xe8, 0x92, 0xb9, 0xb0, 0xbb, 0x58, 0xed, 0x62, 0x13, 0x5b, 0x40, + 0xf6, 0xbe, 0x90, 0xe7, 0x90, 0xbc, 0x17, 0x5b, 0xf2, 0xfa, 0x79, 0x78, 0x83, 0xaa, 0xac, 0x8e, + 0x81, 0x33, 0x0f, 0xd2, 0x88, 0xf6, 0xad, 0xfb, 0x54, 0x6b, 0x9f, 0x95, 0x08, 0x42, 0x14, 0x85, + 0xe4, 0x9a, 0xf1, 0xb9, 0x44, 0xb2, 0x63, 0xa8, 0x8b, 0x3c, 0x55, 0x71, 0x82, 0x97, 0x03, 0x77, + 0x97, 0x3a, 0x1e, 0x09, 0x2d, 0x49, 0xe2, 0xb7, 0x1c, 0xd3, 0x10, 0xdd, 0xbd, 0xb6, 0xd5, 0x75, + 0xfc, 0x35, 0x66, 0x6f, 0xe1, 0x80, 0xdc, 0xfb, 0x18, 0x45, 0x02, 0xa3, 0x40, 0xe9, 0x2b, 0xea, + 0x34, 0xe0, 0x09, 0xaf, 0xc5, 0x4a, 0x14, 0x8b, 0x38, 0x44, 0x17, 0x8c, 0xd8, 0x15, 0x64, 0x6d, + 0x68, 0x84, 0x3c, 0x55, 0x41, 0x9c, 0xa2, 0xb8, 0x1c, 0xb8, 0x0d, 0x3a, 0xdd, 0xa4, 0xb4, 0x36, + 0x15, 0x44, 0xd2, 0x6d, 0xb6, 0x6d, 0xad, 0x4d, 0xd7, 0x9d, 0x1f, 0x16, 0x3c, 0x7b, 0x62, 0x09, + 0x7b, 0x4e, 0xc6, 0x09, 0x45, 0xee, 0x3b, 0xbe, 0x01, 0x5a, 0xc3, 0x34, 0x17, 0x66, 0xbf, 0xaa, + 0xd1, 0x50, 0xe0, 0xff, 0x24, 0xe9, 0xb3, 0xe0, 0x79, 0x86, 0x53, 0x33, 0xbe, 0xe4, 0xf5, 0x09, + 0xb4, 0x48, 0xdf, 0x28, 0x4e, 0x70, 0x38, 0x8b, 0xaf, 0x15, 0x65, 0xc9, 0xf6, 0xb7, 0xd8, 0xce, + 0x4f, 0xbb, 0x48, 0xc9, 0xe6, 0xac, 0x4d, 0x2b, 0xac, 0xb2, 0x15, 0x0c, 0x1c, 0xca, 0x8e, 0x09, + 0x89, 0x53, 0x3c, 0x03, 0x81, 0x92, 0xe7, 0x22, 0xc4, 0x55, 0x4c, 0xd6, 0x98, 0x75, 0xe1, 0xe0, + 0x62, 0x34, 0xba, 0x1a, 0xeb, 0xb5, 0x72, 0x39, 0x0e, 0xf9, 0x14, 0x69, 0x95, 0x7d, 0xbf, 0xa5, + 0xf9, 0x21, 0xd1, 0xe7, 0x7c, 0x4a, 0x93, 0xd5, 0x32, 0xc3, 0x22, 0x1e, 0xba, 0x66, 0x2f, 0x61, + 0x77, 0xd0, 0x1f, 0x13, 0x6d, 0x82, 0x51, 0x1b, 0xf4, 0x47, 0xfa, 0x80, 0x81, 0x33, 0x8b, 0x95, + 0xa4, 0x30, 0x38, 0x3e, 0xd5, 0xec, 0x10, 0x6a, 0x28, 0x04, 0x17, 0x72, 0x95, 0x82, 0x15, 0x2a, + 0x79, 0x5b, 0xdf, 0xf2, 0xf6, 0x18, 0xea, 0xfc, 0x66, 0x98, 0x27, 0x49, 0x20, 0x96, 0xf4, 0xd5, + 0x9b, 0xfe, 0x23, 0xa1, 0xdf, 0x23, 0xcd, 0x28, 0x1a, 0x1a, 0xd4, 0x50, 0xe2, 0x98, 0x07, 0x20, + 0x97, 0xa9, 0x9a, 0xa1, 0x8a, 0x43, 0xfd, 0xfd, 0xf5, 0x5b, 0xdc, 0x60, 0xf4, 0x0c, 0xc5, 0xb3, + 0x2f, 0xb8, 0xc0, 0xf9, 0x85, 0xde, 0x78, 0x9f, 0x36, 0x28, 0x71, 0xec, 0x35, 0x34, 0x33, 0x44, + 0x31, 0x2e, 0x3c, 0x6f, 0x99, 0x80, 0x69, 0x6e, 0x68, 0xa8, 0xbe, 0x7b, 0x7b, 0xef, 0x59, 0x77, + 0xf7, 0x9e, 0xf5, 0xe7, 0xde, 0xb3, 0xbe, 0x3f, 0x78, 0x95, 0xbb, 0x07, 0xaf, 0xf2, 0xeb, 0xc1, + 0xab, 0x4c, 0x6a, 0xf4, 0x17, 0xf6, 0xe1, 0x5f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x63, 0x08, 0xa2, + 0x86, 0x04, 0x05, 0x00, 0x00, +} + func (m *StatsPayload) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -759,6 +816,12 @@ func (m *ClientGroupedStats) MarshalTo(dAtA []byte) (int, error) { i++ i = encodeVarintStats(dAtA, i, uint64(m.TopLevelHits)) } + if len(m.PeerService) > 0 { + dAtA[i] = 0x72 + i++ + i = encodeVarintStats(dAtA, i, uint64(len(m.PeerService))) + i += copy(dAtA[i:], m.PeerService) + } return i, nil } @@ -937,6 +1000,10 @@ func (m *ClientGroupedStats) Size() (n int) { if m.TopLevelHits != 0 { n += 1 + sovStats(uint64(m.TopLevelHits)) } + l = len(m.PeerService) + if l > 0 { + n += 1 + l + sovStats(uint64(l)) + } return n } @@ -968,7 +1035,7 @@ func (m *StatsPayload) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - wire |= (uint64(b) & 0x7F) << shift + wire |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -996,7 +1063,7 @@ func (m *StatsPayload) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift + stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1006,6 +1073,9 @@ func (m *StatsPayload) Unmarshal(dAtA []byte) error { return ErrInvalidLengthStats } postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthStats + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1025,7 +1095,7 @@ func (m *StatsPayload) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift + stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1035,6 +1105,9 @@ func (m *StatsPayload) Unmarshal(dAtA []byte) error { return ErrInvalidLengthStats } postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthStats + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1054,7 +1127,7 @@ func (m *StatsPayload) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - msglen |= (int(b) & 0x7F) << shift + msglen |= int(b&0x7F) << shift if b < 0x80 { break } @@ -1063,6 +1136,9 @@ func (m *StatsPayload) Unmarshal(dAtA []byte) error { return ErrInvalidLengthStats } postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthStats + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1085,7 +1161,7 @@ func (m *StatsPayload) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift + stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1095,6 +1171,9 @@ func (m *StatsPayload) Unmarshal(dAtA []byte) error { return ErrInvalidLengthStats } postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthStats + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1114,7 +1193,7 @@ func (m *StatsPayload) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - v |= (int(b) & 0x7F) << shift + v |= int(b&0x7F) << shift if b < 0x80 { break } @@ -1129,6 +1208,9 @@ func (m *StatsPayload) Unmarshal(dAtA []byte) error { if skippy < 0 { return ErrInvalidLengthStats } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthStats + } if (iNdEx + skippy) > l { return io.ErrUnexpectedEOF } @@ -1156,7 +1238,7 @@ func (m *ClientStatsPayload) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - wire |= (uint64(b) & 0x7F) << shift + wire |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1184,7 +1266,7 @@ func (m *ClientStatsPayload) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift + stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1194,6 +1276,9 @@ func (m *ClientStatsPayload) Unmarshal(dAtA []byte) error { return ErrInvalidLengthStats } postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthStats + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1213,7 +1298,7 @@ func (m *ClientStatsPayload) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift + stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1223,6 +1308,9 @@ func (m *ClientStatsPayload) Unmarshal(dAtA []byte) error { return ErrInvalidLengthStats } postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthStats + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1242,7 +1330,7 @@ func (m *ClientStatsPayload) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift + stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1252,6 +1340,9 @@ func (m *ClientStatsPayload) Unmarshal(dAtA []byte) error { return ErrInvalidLengthStats } postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthStats + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1271,7 +1362,7 @@ func (m *ClientStatsPayload) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - msglen |= (int(b) & 0x7F) << shift + msglen |= int(b&0x7F) << shift if b < 0x80 { break } @@ -1280,6 +1371,9 @@ func (m *ClientStatsPayload) Unmarshal(dAtA []byte) error { return ErrInvalidLengthStats } postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthStats + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1302,7 +1396,7 @@ func (m *ClientStatsPayload) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift + stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1312,6 +1406,9 @@ func (m *ClientStatsPayload) Unmarshal(dAtA []byte) error { return ErrInvalidLengthStats } postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthStats + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1331,7 +1428,7 @@ func (m *ClientStatsPayload) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift + stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1341,6 +1438,9 @@ func (m *ClientStatsPayload) Unmarshal(dAtA []byte) error { return ErrInvalidLengthStats } postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthStats + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1360,7 +1460,7 @@ func (m *ClientStatsPayload) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift + stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1370,6 +1470,9 @@ func (m *ClientStatsPayload) Unmarshal(dAtA []byte) error { return ErrInvalidLengthStats } postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthStats + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1389,7 +1492,7 @@ func (m *ClientStatsPayload) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.Sequence |= (uint64(b) & 0x7F) << shift + m.Sequence |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1408,7 +1511,7 @@ func (m *ClientStatsPayload) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift + stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1418,6 +1521,9 @@ func (m *ClientStatsPayload) Unmarshal(dAtA []byte) error { return ErrInvalidLengthStats } postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthStats + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1437,7 +1543,7 @@ func (m *ClientStatsPayload) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift + stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1447,6 +1553,9 @@ func (m *ClientStatsPayload) Unmarshal(dAtA []byte) error { return ErrInvalidLengthStats } postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthStats + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1466,7 +1575,7 @@ func (m *ClientStatsPayload) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift + stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1476,6 +1585,9 @@ func (m *ClientStatsPayload) Unmarshal(dAtA []byte) error { return ErrInvalidLengthStats } postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthStats + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1495,7 +1607,7 @@ func (m *ClientStatsPayload) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift + stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1505,6 +1617,9 @@ func (m *ClientStatsPayload) Unmarshal(dAtA []byte) error { return ErrInvalidLengthStats } postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthStats + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1519,6 +1634,9 @@ func (m *ClientStatsPayload) Unmarshal(dAtA []byte) error { if skippy < 0 { return ErrInvalidLengthStats } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthStats + } if (iNdEx + skippy) > l { return io.ErrUnexpectedEOF } @@ -1546,7 +1664,7 @@ func (m *ClientStatsBucket) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - wire |= (uint64(b) & 0x7F) << shift + wire |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1574,7 +1692,7 @@ func (m *ClientStatsBucket) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.Start |= (uint64(b) & 0x7F) << shift + m.Start |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1593,7 +1711,7 @@ func (m *ClientStatsBucket) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.Duration |= (uint64(b) & 0x7F) << shift + m.Duration |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1612,7 +1730,7 @@ func (m *ClientStatsBucket) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - msglen |= (int(b) & 0x7F) << shift + msglen |= int(b&0x7F) << shift if b < 0x80 { break } @@ -1621,6 +1739,9 @@ func (m *ClientStatsBucket) Unmarshal(dAtA []byte) error { return ErrInvalidLengthStats } postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthStats + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1643,7 +1764,7 @@ func (m *ClientStatsBucket) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.AgentTimeShift |= (int64(b) & 0x7F) << shift + m.AgentTimeShift |= int64(b&0x7F) << shift if b < 0x80 { break } @@ -1657,6 +1778,9 @@ func (m *ClientStatsBucket) Unmarshal(dAtA []byte) error { if skippy < 0 { return ErrInvalidLengthStats } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthStats + } if (iNdEx + skippy) > l { return io.ErrUnexpectedEOF } @@ -1684,7 +1808,7 @@ func (m *ClientGroupedStats) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - wire |= (uint64(b) & 0x7F) << shift + wire |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1712,7 +1836,7 @@ func (m *ClientGroupedStats) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift + stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1722,6 +1846,9 @@ func (m *ClientGroupedStats) Unmarshal(dAtA []byte) error { return ErrInvalidLengthStats } postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthStats + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1741,7 +1868,7 @@ func (m *ClientGroupedStats) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift + stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1751,6 +1878,9 @@ func (m *ClientGroupedStats) Unmarshal(dAtA []byte) error { return ErrInvalidLengthStats } postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthStats + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1770,7 +1900,7 @@ func (m *ClientGroupedStats) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift + stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1780,6 +1910,9 @@ func (m *ClientGroupedStats) Unmarshal(dAtA []byte) error { return ErrInvalidLengthStats } postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthStats + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1799,7 +1932,7 @@ func (m *ClientGroupedStats) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.HTTPStatusCode |= (uint32(b) & 0x7F) << shift + m.HTTPStatusCode |= uint32(b&0x7F) << shift if b < 0x80 { break } @@ -1818,7 +1951,7 @@ func (m *ClientGroupedStats) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift + stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1828,6 +1961,9 @@ func (m *ClientGroupedStats) Unmarshal(dAtA []byte) error { return ErrInvalidLengthStats } postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthStats + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1847,7 +1983,7 @@ func (m *ClientGroupedStats) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift + stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1857,6 +1993,9 @@ func (m *ClientGroupedStats) Unmarshal(dAtA []byte) error { return ErrInvalidLengthStats } postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthStats + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1876,7 +2015,7 @@ func (m *ClientGroupedStats) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.Hits |= (uint64(b) & 0x7F) << shift + m.Hits |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1895,7 +2034,7 @@ func (m *ClientGroupedStats) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.Errors |= (uint64(b) & 0x7F) << shift + m.Errors |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1914,7 +2053,7 @@ func (m *ClientGroupedStats) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.Duration |= (uint64(b) & 0x7F) << shift + m.Duration |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1933,7 +2072,7 @@ func (m *ClientGroupedStats) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - byteLen |= (int(b) & 0x7F) << shift + byteLen |= int(b&0x7F) << shift if b < 0x80 { break } @@ -1942,6 +2081,9 @@ func (m *ClientGroupedStats) Unmarshal(dAtA []byte) error { return ErrInvalidLengthStats } postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthStats + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1964,7 +2106,7 @@ func (m *ClientGroupedStats) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - byteLen |= (int(b) & 0x7F) << shift + byteLen |= int(b&0x7F) << shift if b < 0x80 { break } @@ -1973,6 +2115,9 @@ func (m *ClientGroupedStats) Unmarshal(dAtA []byte) error { return ErrInvalidLengthStats } postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthStats + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1995,7 +2140,7 @@ func (m *ClientGroupedStats) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - v |= (int(b) & 0x7F) << shift + v |= int(b&0x7F) << shift if b < 0x80 { break } @@ -2015,11 +2160,43 @@ func (m *ClientGroupedStats) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.TopLevelHits |= (uint64(b) & 0x7F) << shift + m.TopLevelHits |= uint64(b&0x7F) << shift if b < 0x80 { break } } + case 14: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field PeerService", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStats + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthStats + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthStats + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.PeerService = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipStats(dAtA[iNdEx:]) @@ -2029,6 +2206,9 @@ func (m *ClientGroupedStats) Unmarshal(dAtA []byte) error { if skippy < 0 { return ErrInvalidLengthStats } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthStats + } if (iNdEx + skippy) > l { return io.ErrUnexpectedEOF } @@ -2095,10 +2275,13 @@ func skipStats(dAtA []byte) (n int, err error) { break } } - iNdEx += length if length < 0 { return 0, ErrInvalidLengthStats } + iNdEx += length + if iNdEx < 0 { + return 0, ErrInvalidLengthStats + } return iNdEx, nil case 3: for { @@ -2127,6 +2310,9 @@ func skipStats(dAtA []byte) (n int, err error) { return 0, err } iNdEx = start + next + if iNdEx < 0 { + return 0, ErrInvalidLengthStats + } } return iNdEx, nil case 4: @@ -2145,49 +2331,3 @@ var ( ErrInvalidLengthStats = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowStats = fmt.Errorf("proto: integer overflow") ) - -func init() { proto.RegisterFile("stats.proto", fileDescriptor_stats_51aff86f1ecec260) } - -var fileDescriptor_stats_51aff86f1ecec260 = []byte{ - // 631 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x54, 0xc1, 0x6a, 0xdb, 0x4c, - 0x10, 0xb6, 0x2c, 0xc5, 0x89, 0xc7, 0x4e, 0xc8, 0xbf, 0xfc, 0x4d, 0x97, 0x10, 0x54, 0x63, 0x4a, - 0x30, 0x85, 0x3a, 0x34, 0x7d, 0x82, 0x3a, 0x2e, 0x4d, 0xa0, 0x87, 0x20, 0x9b, 0x5e, 0x83, 0x2c, - 0x4d, 0x64, 0x11, 0x4b, 0xab, 0xee, 0xae, 0x0c, 0x3e, 0xf7, 0x05, 0xfa, 0x0a, 0x7d, 0x9b, 0x1c, - 0x73, 0xec, 0xa9, 0x94, 0xe4, 0x41, 0x5a, 0x76, 0x64, 0x39, 0x96, 0x0d, 0xbd, 0xcd, 0xf7, 0xed, - 0x78, 0x76, 0xbe, 0x4f, 0xdf, 0x1a, 0x5a, 0x4a, 0xfb, 0x5a, 0xf5, 0x33, 0x29, 0xb4, 0x60, 0xf5, - 0x6c, 0x72, 0xfc, 0x36, 0x8a, 0xf5, 0x34, 0x9f, 0xf4, 0x03, 0x91, 0x9c, 0x45, 0x22, 0x12, 0x67, - 0x74, 0x34, 0xc9, 0x6f, 0x09, 0x11, 0xa0, 0xaa, 0xf8, 0x49, 0xf7, 0xc1, 0x82, 0xf6, 0xc8, 0x8c, - 0xb8, 0xf6, 0x17, 0x33, 0xe1, 0x87, 0xec, 0x35, 0xec, 0xfb, 0x11, 0xa6, 0xfa, 0x52, 0x28, 0x9d, - 0xfa, 0x09, 0x72, 0xab, 0x63, 0xf5, 0x9a, 0x5e, 0x95, 0x64, 0xc7, 0xb0, 0x47, 0xc4, 0xc7, 0x74, - 0xce, 0xeb, 0xd4, 0xb0, 0xc2, 0xec, 0x1c, 0x76, 0x68, 0x29, 0x6e, 0x77, 0xec, 0x5e, 0xeb, 0xfc, - 0xa8, 0x9f, 0x4d, 0xfa, 0x17, 0xb3, 0x18, 0x53, 0xbd, 0x7e, 0xd1, 0xc0, 0xb9, 0xff, 0xf5, 0xaa, - 0xe6, 0x15, 0xad, 0xac, 0x0b, 0x6d, 0xfa, 0xfd, 0x17, 0x94, 0x2a, 0x16, 0x29, 0x77, 0x68, 0x66, - 0x85, 0x63, 0xa7, 0x70, 0x10, 0xd0, 0x98, 0x0b, 0x91, 0x64, 0xb9, 0xc6, 0x90, 0xef, 0x74, 0xac, - 0xde, 0x9e, 0xb7, 0xc1, 0x76, 0xff, 0xd4, 0x81, 0x6d, 0xdf, 0x67, 0x56, 0x9e, 0x56, 0x35, 0xad, - 0x30, 0x3b, 0x04, 0x1b, 0x57, 0x4a, 0x4c, 0xc9, 0x38, 0xec, 0xce, 0x97, 0xbb, 0xd8, 0xc4, 0x96, - 0x90, 0xbd, 0x2b, 0xe5, 0x39, 0x24, 0xef, 0xc5, 0x86, 0xbc, 0x41, 0x1e, 0xdc, 0xa1, 0xae, 0xaa, - 0x63, 0xe0, 0xcc, 0xfc, 0x34, 0xa2, 0x7d, 0x9b, 0x1e, 0xd5, 0xc6, 0x67, 0x2d, 0xfd, 0x00, 0x65, - 0x29, 0xb9, 0x51, 0xf8, 0x5c, 0x21, 0xd9, 0x09, 0x34, 0x65, 0x9e, 0xea, 0x38, 0xc1, 0xab, 0x21, - 0xdf, 0xa5, 0x8e, 0x67, 0xc2, 0x48, 0x52, 0xf8, 0x35, 0xc7, 0x34, 0x40, 0xbe, 0xd7, 0xb1, 0x7a, - 0x8e, 0xb7, 0xc2, 0xec, 0x0d, 0x1c, 0x92, 0x7b, 0x1f, 0xa2, 0x48, 0x62, 0xe4, 0x6b, 0x73, 0x45, - 0x93, 0x06, 0x6c, 0xf1, 0x46, 0xac, 0x42, 0x39, 0x8f, 0x03, 0xe4, 0x50, 0x88, 0x5d, 0x42, 0xd6, - 0x81, 0x56, 0x20, 0x52, 0xed, 0xc7, 0x29, 0xca, 0xab, 0x21, 0x6f, 0xd1, 0xe9, 0x3a, 0x65, 0xb4, - 0x69, 0x3f, 0x52, 0xbc, 0xdd, 0xb1, 0x8d, 0x36, 0x53, 0x77, 0x7f, 0x58, 0xf0, 0xdf, 0x96, 0x25, - 0xec, 0x7f, 0x32, 0x4e, 0x6a, 0x72, 0xdf, 0xf1, 0x0a, 0x60, 0x34, 0x84, 0xb9, 0x2c, 0xf6, 0xab, - 0x17, 0x1a, 0x4a, 0xfc, 0x8f, 0x24, 0x7d, 0x92, 0x22, 0xcf, 0x30, 0x2c, 0xc6, 0x57, 0xbc, 0x3e, - 0x85, 0x03, 0xd2, 0x37, 0x8e, 0x13, 0x1c, 0x4d, 0xe3, 0x5b, 0x4d, 0x59, 0xb2, 0xbd, 0x0d, 0xb6, - 0xfb, 0xcd, 0x2e, 0x53, 0xb2, 0x3e, 0x6b, 0xdd, 0x0a, 0xab, 0x6a, 0x05, 0x03, 0x87, 0xb2, 0x53, - 0x84, 0xc4, 0x29, 0x9f, 0x81, 0x44, 0x25, 0x72, 0x19, 0xe0, 0x32, 0x26, 0x2b, 0xcc, 0x7a, 0x70, - 0x78, 0x39, 0x1e, 0x5f, 0xdf, 0x98, 0xb5, 0x72, 0x75, 0x13, 0x88, 0x10, 0x69, 0x95, 0x7d, 0xef, - 0xc0, 0xf0, 0x23, 0xa2, 0x2f, 0x44, 0x48, 0x93, 0xf5, 0x22, 0xc3, 0x32, 0x1e, 0xa6, 0x66, 0x2f, - 0x61, 0x77, 0x38, 0xb8, 0x21, 0xba, 0x08, 0x46, 0x63, 0x38, 0x18, 0x9b, 0x03, 0x06, 0xce, 0x34, - 0xd6, 0x8a, 0xc2, 0xe0, 0x78, 0x54, 0xb3, 0x23, 0x68, 0xa0, 0x94, 0x42, 0xaa, 0x65, 0x0a, 0x96, - 0xa8, 0xe2, 0x6d, 0x73, 0xc3, 0xdb, 0x13, 0x68, 0x8a, 0xbb, 0x51, 0x9e, 0x24, 0xbe, 0x5c, 0xd0, - 0x57, 0x6f, 0x7b, 0xcf, 0x84, 0x79, 0x8f, 0x34, 0xa3, 0x6c, 0x68, 0x51, 0x43, 0x85, 0x63, 0x2e, - 0x80, 0x5a, 0xa4, 0x7a, 0x8a, 0x3a, 0x0e, 0xcc, 0xf7, 0x37, 0x6f, 0x71, 0x8d, 0x31, 0x33, 0xb4, - 0xc8, 0x3e, 0xe3, 0x1c, 0x67, 0x97, 0x66, 0xe3, 0x7d, 0xda, 0xa0, 0xc2, 0x0d, 0xf8, 0xfd, 0xa3, - 0x6b, 0x3d, 0x3c, 0xba, 0xd6, 0xef, 0x47, 0xd7, 0xfa, 0xfe, 0xe4, 0xd6, 0x1e, 0x9e, 0xdc, 0xda, - 0xcf, 0x27, 0xb7, 0x36, 0x69, 0xd0, 0xff, 0xd3, 0xfb, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, 0x2c, - 0xa9, 0x29, 0x39, 0xe1, 0x04, 0x00, 0x00, -} diff --git a/pkg/trace/pb/stats.proto b/pkg/trace/pb/stats.proto index 64d7b2d7fd6bc..21294e24be8f2 100644 --- a/pkg/trace/pb/stats.proto +++ b/pkg/trace/pb/stats.proto @@ -67,4 +67,5 @@ message ClientGroupedStats { bytes errorSummary = 11; // ddsketch summary of error spans latencies encoded in protobuf bool synthetics = 12; // set to true on spans generated by synthetics traffic uint64 topLevelHits = 13; // count of top level spans aggregated in the groupedstats + string peer_service = 14; // name of the remote service that the `service` communicated with } diff --git a/pkg/trace/pb/stats_gen.go b/pkg/trace/pb/stats_gen.go index dd6b7c1257a9e..99d62b970e281 100644 --- a/pkg/trace/pb/stats_gen.go +++ b/pkg/trace/pb/stats_gen.go @@ -1,16 +1,11 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2016-present Datadog, Inc. - package pb -// NOTE: THIS FILE WAS PRODUCED BY THE -// MSGP CODE GENERATION TOOL (github.com/tinylib/msgp) -// DO NOT EDIT +// Code generated by github.com/tinylib/msgp DO NOT EDIT. +// Command to generate: msgp -file pkg/trace/pb/stats.pb.go -o pkg/trace/pb/stats_gen.go +// Please remember to add this comment back after re-generation! import ( - _ "github.com/gogo/protobuf/gogoproto" // comment justifying it + _ "github.com/gogo/protobuf/gogoproto" "github.com/tinylib/msgp/msgp" ) @@ -21,83 +16,105 @@ func (z *ClientGroupedStats) DecodeMsg(dc *msgp.Reader) (err error) { var zb0001 uint32 zb0001, err = dc.ReadMapHeader() if err != nil { + err = msgp.WrapError(err) return } for zb0001 > 0 { zb0001-- field, err = dc.ReadMapKeyPtr() if err != nil { + err = msgp.WrapError(err) return } switch msgp.UnsafeString(field) { case "Service": z.Service, err = dc.ReadString() if err != nil { + err = msgp.WrapError(err, "Service") return } case "Name": z.Name, err = dc.ReadString() if err != nil { + err = msgp.WrapError(err, "Name") return } case "Resource": z.Resource, err = dc.ReadString() if err != nil { + err = msgp.WrapError(err, "Resource") return } case "HTTPStatusCode": z.HTTPStatusCode, err = dc.ReadUint32() if err != nil { + err = msgp.WrapError(err, "HTTPStatusCode") return } case "Type": z.Type, err = dc.ReadString() if err != nil { + err = msgp.WrapError(err, "Type") return } case "DBType": z.DBType, err = dc.ReadString() if err != nil { + err = msgp.WrapError(err, "DBType") return } case "Hits": z.Hits, err = dc.ReadUint64() if err != nil { + err = msgp.WrapError(err, "Hits") return } case "Errors": z.Errors, err = dc.ReadUint64() if err != nil { + err = msgp.WrapError(err, "Errors") return } case "Duration": z.Duration, err = dc.ReadUint64() if err != nil { + err = msgp.WrapError(err, "Duration") return } case "OkSummary": z.OkSummary, err = dc.ReadBytes(z.OkSummary) if err != nil { + err = msgp.WrapError(err, "OkSummary") return } case "ErrorSummary": z.ErrorSummary, err = dc.ReadBytes(z.ErrorSummary) if err != nil { + err = msgp.WrapError(err, "ErrorSummary") return } case "Synthetics": z.Synthetics, err = dc.ReadBool() if err != nil { + err = msgp.WrapError(err, "Synthetics") return } case "TopLevelHits": z.TopLevelHits, err = dc.ReadUint64() if err != nil { + err = msgp.WrapError(err, "TopLevelHits") + return + } + case "PeerService": + z.PeerService, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "PeerService") return } default: err = dc.Skip() if err != nil { + err = msgp.WrapError(err) return } } @@ -107,14 +124,15 @@ func (z *ClientGroupedStats) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *ClientGroupedStats) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 13 + // map header, size 14 // write "Service" - err = en.Append(0x8d, 0xa7, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65) + err = en.Append(0x8e, 0xa7, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65) if err != nil { return } err = en.WriteString(z.Service) if err != nil { + err = msgp.WrapError(err, "Service") return } // write "Name" @@ -124,6 +142,7 @@ func (z *ClientGroupedStats) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteString(z.Name) if err != nil { + err = msgp.WrapError(err, "Name") return } // write "Resource" @@ -133,6 +152,7 @@ func (z *ClientGroupedStats) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteString(z.Resource) if err != nil { + err = msgp.WrapError(err, "Resource") return } // write "HTTPStatusCode" @@ -142,6 +162,7 @@ func (z *ClientGroupedStats) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteUint32(z.HTTPStatusCode) if err != nil { + err = msgp.WrapError(err, "HTTPStatusCode") return } // write "Type" @@ -151,6 +172,7 @@ func (z *ClientGroupedStats) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteString(z.Type) if err != nil { + err = msgp.WrapError(err, "Type") return } // write "DBType" @@ -160,6 +182,7 @@ func (z *ClientGroupedStats) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteString(z.DBType) if err != nil { + err = msgp.WrapError(err, "DBType") return } // write "Hits" @@ -169,6 +192,7 @@ func (z *ClientGroupedStats) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteUint64(z.Hits) if err != nil { + err = msgp.WrapError(err, "Hits") return } // write "Errors" @@ -178,6 +202,7 @@ func (z *ClientGroupedStats) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteUint64(z.Errors) if err != nil { + err = msgp.WrapError(err, "Errors") return } // write "Duration" @@ -187,6 +212,7 @@ func (z *ClientGroupedStats) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteUint64(z.Duration) if err != nil { + err = msgp.WrapError(err, "Duration") return } // write "OkSummary" @@ -196,6 +222,7 @@ func (z *ClientGroupedStats) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteBytes(z.OkSummary) if err != nil { + err = msgp.WrapError(err, "OkSummary") return } // write "ErrorSummary" @@ -205,6 +232,7 @@ func (z *ClientGroupedStats) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteBytes(z.ErrorSummary) if err != nil { + err = msgp.WrapError(err, "ErrorSummary") return } // write "Synthetics" @@ -214,6 +242,7 @@ func (z *ClientGroupedStats) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteBool(z.Synthetics) if err != nil { + err = msgp.WrapError(err, "Synthetics") return } // write "TopLevelHits" @@ -222,18 +251,29 @@ func (z *ClientGroupedStats) EncodeMsg(en *msgp.Writer) (err error) { return } err = en.WriteUint64(z.TopLevelHits) + if err != nil { + err = msgp.WrapError(err, "TopLevelHits") + return + } + // write "PeerService" + err = en.Append(0xab, 0x50, 0x65, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65) if err != nil { return } + err = en.WriteString(z.PeerService) + if err != nil { + err = msgp.WrapError(err, "PeerService") + return + } return } // MarshalMsg implements msgp.Marshaler func (z *ClientGroupedStats) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 13 + // map header, size 14 // string "Service" - o = append(o, 0x8d, 0xa7, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65) + o = append(o, 0x8e, 0xa7, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65) o = msgp.AppendString(o, z.Service) // string "Name" o = append(o, 0xa4, 0x4e, 0x61, 0x6d, 0x65) @@ -271,6 +311,9 @@ func (z *ClientGroupedStats) MarshalMsg(b []byte) (o []byte, err error) { // string "TopLevelHits" o = append(o, 0xac, 0x54, 0x6f, 0x70, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x48, 0x69, 0x74, 0x73) o = msgp.AppendUint64(o, z.TopLevelHits) + // string "PeerService" + o = append(o, 0xab, 0x50, 0x65, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65) + o = msgp.AppendString(o, z.PeerService) return } @@ -281,83 +324,105 @@ func (z *ClientGroupedStats) UnmarshalMsg(bts []byte) (o []byte, err error) { var zb0001 uint32 zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) if err != nil { + err = msgp.WrapError(err) return } for zb0001 > 0 { zb0001-- field, bts, err = msgp.ReadMapKeyZC(bts) if err != nil { + err = msgp.WrapError(err) return } switch msgp.UnsafeString(field) { case "Service": z.Service, bts, err = msgp.ReadStringBytes(bts) if err != nil { + err = msgp.WrapError(err, "Service") return } case "Name": z.Name, bts, err = msgp.ReadStringBytes(bts) if err != nil { + err = msgp.WrapError(err, "Name") return } case "Resource": z.Resource, bts, err = msgp.ReadStringBytes(bts) if err != nil { + err = msgp.WrapError(err, "Resource") return } case "HTTPStatusCode": z.HTTPStatusCode, bts, err = msgp.ReadUint32Bytes(bts) if err != nil { + err = msgp.WrapError(err, "HTTPStatusCode") return } case "Type": z.Type, bts, err = msgp.ReadStringBytes(bts) if err != nil { + err = msgp.WrapError(err, "Type") return } case "DBType": z.DBType, bts, err = msgp.ReadStringBytes(bts) if err != nil { + err = msgp.WrapError(err, "DBType") return } case "Hits": z.Hits, bts, err = msgp.ReadUint64Bytes(bts) if err != nil { + err = msgp.WrapError(err, "Hits") return } case "Errors": z.Errors, bts, err = msgp.ReadUint64Bytes(bts) if err != nil { + err = msgp.WrapError(err, "Errors") return } case "Duration": z.Duration, bts, err = msgp.ReadUint64Bytes(bts) if err != nil { + err = msgp.WrapError(err, "Duration") return } case "OkSummary": z.OkSummary, bts, err = msgp.ReadBytesBytes(bts, z.OkSummary) if err != nil { + err = msgp.WrapError(err, "OkSummary") return } case "ErrorSummary": z.ErrorSummary, bts, err = msgp.ReadBytesBytes(bts, z.ErrorSummary) if err != nil { + err = msgp.WrapError(err, "ErrorSummary") return } case "Synthetics": z.Synthetics, bts, err = msgp.ReadBoolBytes(bts) if err != nil { + err = msgp.WrapError(err, "Synthetics") return } case "TopLevelHits": z.TopLevelHits, bts, err = msgp.ReadUint64Bytes(bts) if err != nil { + err = msgp.WrapError(err, "TopLevelHits") + return + } + case "PeerService": + z.PeerService, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "PeerService") return } default: bts, err = msgp.Skip(bts) if err != nil { + err = msgp.WrapError(err) return } } @@ -368,7 +433,7 @@ func (z *ClientGroupedStats) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *ClientGroupedStats) Msgsize() (s int) { - s = 1 + 8 + msgp.StringPrefixSize + len(z.Service) + 5 + msgp.StringPrefixSize + len(z.Name) + 9 + msgp.StringPrefixSize + len(z.Resource) + 15 + msgp.Uint32Size + 5 + msgp.StringPrefixSize + len(z.Type) + 7 + msgp.StringPrefixSize + len(z.DBType) + 5 + msgp.Uint64Size + 7 + msgp.Uint64Size + 9 + msgp.Uint64Size + 10 + msgp.BytesPrefixSize + len(z.OkSummary) + 13 + msgp.BytesPrefixSize + len(z.ErrorSummary) + 11 + msgp.BoolSize + 13 + msgp.Uint64Size + s = 1 + 8 + msgp.StringPrefixSize + len(z.Service) + 5 + msgp.StringPrefixSize + len(z.Name) + 9 + msgp.StringPrefixSize + len(z.Resource) + 15 + msgp.Uint32Size + 5 + msgp.StringPrefixSize + len(z.Type) + 7 + msgp.StringPrefixSize + len(z.DBType) + 5 + msgp.Uint64Size + 7 + msgp.Uint64Size + 9 + msgp.Uint64Size + 10 + msgp.BytesPrefixSize + len(z.OkSummary) + 13 + msgp.BytesPrefixSize + len(z.ErrorSummary) + 11 + msgp.BoolSize + 13 + msgp.Uint64Size + 12 + msgp.StringPrefixSize + len(z.PeerService) return } @@ -379,29 +444,34 @@ func (z *ClientStatsBucket) DecodeMsg(dc *msgp.Reader) (err error) { var zb0001 uint32 zb0001, err = dc.ReadMapHeader() if err != nil { + err = msgp.WrapError(err) return } for zb0001 > 0 { zb0001-- field, err = dc.ReadMapKeyPtr() if err != nil { + err = msgp.WrapError(err) return } switch msgp.UnsafeString(field) { case "Start": z.Start, err = dc.ReadUint64() if err != nil { + err = msgp.WrapError(err, "Start") return } case "Duration": z.Duration, err = dc.ReadUint64() if err != nil { + err = msgp.WrapError(err, "Duration") return } case "Stats": var zb0002 uint32 zb0002, err = dc.ReadArrayHeader() if err != nil { + err = msgp.WrapError(err, "Stats") return } if cap(z.Stats) >= int(zb0002) { @@ -412,17 +482,20 @@ func (z *ClientStatsBucket) DecodeMsg(dc *msgp.Reader) (err error) { for za0001 := range z.Stats { err = z.Stats[za0001].DecodeMsg(dc) if err != nil { + err = msgp.WrapError(err, "Stats", za0001) return } } case "AgentTimeShift": z.AgentTimeShift, err = dc.ReadInt64() if err != nil { + err = msgp.WrapError(err, "AgentTimeShift") return } default: err = dc.Skip() if err != nil { + err = msgp.WrapError(err) return } } @@ -440,6 +513,7 @@ func (z *ClientStatsBucket) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteUint64(z.Start) if err != nil { + err = msgp.WrapError(err, "Start") return } // write "Duration" @@ -449,6 +523,7 @@ func (z *ClientStatsBucket) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteUint64(z.Duration) if err != nil { + err = msgp.WrapError(err, "Duration") return } // write "Stats" @@ -458,11 +533,13 @@ func (z *ClientStatsBucket) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteArrayHeader(uint32(len(z.Stats))) if err != nil { + err = msgp.WrapError(err, "Stats") return } for za0001 := range z.Stats { err = z.Stats[za0001].EncodeMsg(en) if err != nil { + err = msgp.WrapError(err, "Stats", za0001) return } } @@ -473,6 +550,7 @@ func (z *ClientStatsBucket) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteInt64(z.AgentTimeShift) if err != nil { + err = msgp.WrapError(err, "AgentTimeShift") return } return @@ -494,6 +572,7 @@ func (z *ClientStatsBucket) MarshalMsg(b []byte) (o []byte, err error) { for za0001 := range z.Stats { o, err = z.Stats[za0001].MarshalMsg(o) if err != nil { + err = msgp.WrapError(err, "Stats", za0001) return } } @@ -510,29 +589,34 @@ func (z *ClientStatsBucket) UnmarshalMsg(bts []byte) (o []byte, err error) { var zb0001 uint32 zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) if err != nil { + err = msgp.WrapError(err) return } for zb0001 > 0 { zb0001-- field, bts, err = msgp.ReadMapKeyZC(bts) if err != nil { + err = msgp.WrapError(err) return } switch msgp.UnsafeString(field) { case "Start": z.Start, bts, err = msgp.ReadUint64Bytes(bts) if err != nil { + err = msgp.WrapError(err, "Start") return } case "Duration": z.Duration, bts, err = msgp.ReadUint64Bytes(bts) if err != nil { + err = msgp.WrapError(err, "Duration") return } case "Stats": var zb0002 uint32 zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) if err != nil { + err = msgp.WrapError(err, "Stats") return } if cap(z.Stats) >= int(zb0002) { @@ -543,17 +627,20 @@ func (z *ClientStatsBucket) UnmarshalMsg(bts []byte) (o []byte, err error) { for za0001 := range z.Stats { bts, err = z.Stats[za0001].UnmarshalMsg(bts) if err != nil { + err = msgp.WrapError(err, "Stats", za0001) return } } case "AgentTimeShift": z.AgentTimeShift, bts, err = msgp.ReadInt64Bytes(bts) if err != nil { + err = msgp.WrapError(err, "AgentTimeShift") return } default: bts, err = msgp.Skip(bts) if err != nil { + err = msgp.WrapError(err) return } } @@ -579,34 +666,40 @@ func (z *ClientStatsPayload) DecodeMsg(dc *msgp.Reader) (err error) { var zb0001 uint32 zb0001, err = dc.ReadMapHeader() if err != nil { + err = msgp.WrapError(err) return } for zb0001 > 0 { zb0001-- field, err = dc.ReadMapKeyPtr() if err != nil { + err = msgp.WrapError(err) return } switch msgp.UnsafeString(field) { case "Hostname": z.Hostname, err = dc.ReadString() if err != nil { + err = msgp.WrapError(err, "Hostname") return } case "Env": z.Env, err = dc.ReadString() if err != nil { + err = msgp.WrapError(err, "Env") return } case "Version": z.Version, err = dc.ReadString() if err != nil { + err = msgp.WrapError(err, "Version") return } case "Stats": var zb0002 uint32 zb0002, err = dc.ReadArrayHeader() if err != nil { + err = msgp.WrapError(err, "Stats") return } if cap(z.Stats) >= int(zb0002) { @@ -617,48 +710,57 @@ func (z *ClientStatsPayload) DecodeMsg(dc *msgp.Reader) (err error) { for za0001 := range z.Stats { err = z.Stats[za0001].DecodeMsg(dc) if err != nil { + err = msgp.WrapError(err, "Stats", za0001) return } } case "Lang": z.Lang, err = dc.ReadString() if err != nil { + err = msgp.WrapError(err, "Lang") return } case "TracerVersion": z.TracerVersion, err = dc.ReadString() if err != nil { + err = msgp.WrapError(err, "TracerVersion") return } case "RuntimeID": z.RuntimeID, err = dc.ReadString() if err != nil { + err = msgp.WrapError(err, "RuntimeID") return } case "Sequence": z.Sequence, err = dc.ReadUint64() if err != nil { + err = msgp.WrapError(err, "Sequence") return } case "AgentAggregation": z.AgentAggregation, err = dc.ReadString() if err != nil { + err = msgp.WrapError(err, "AgentAggregation") return } case "Service": z.Service, err = dc.ReadString() if err != nil { + err = msgp.WrapError(err, "Service") return } case "ContainerID": z.ContainerID, err = dc.ReadString() if err != nil { + err = msgp.WrapError(err, "ContainerID") return } case "Tags": var zb0003 uint32 zb0003, err = dc.ReadArrayHeader() if err != nil { + err = msgp.WrapError(err, "Tags") return } if cap(z.Tags) >= int(zb0003) { @@ -669,12 +771,14 @@ func (z *ClientStatsPayload) DecodeMsg(dc *msgp.Reader) (err error) { for za0002 := range z.Tags { z.Tags[za0002], err = dc.ReadString() if err != nil { + err = msgp.WrapError(err, "Tags", za0002) return } } default: err = dc.Skip() if err != nil { + err = msgp.WrapError(err) return } } @@ -692,6 +796,7 @@ func (z *ClientStatsPayload) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteString(z.Hostname) if err != nil { + err = msgp.WrapError(err, "Hostname") return } // write "Env" @@ -701,6 +806,7 @@ func (z *ClientStatsPayload) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteString(z.Env) if err != nil { + err = msgp.WrapError(err, "Env") return } // write "Version" @@ -710,6 +816,7 @@ func (z *ClientStatsPayload) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteString(z.Version) if err != nil { + err = msgp.WrapError(err, "Version") return } // write "Stats" @@ -719,11 +826,13 @@ func (z *ClientStatsPayload) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteArrayHeader(uint32(len(z.Stats))) if err != nil { + err = msgp.WrapError(err, "Stats") return } for za0001 := range z.Stats { err = z.Stats[za0001].EncodeMsg(en) if err != nil { + err = msgp.WrapError(err, "Stats", za0001) return } } @@ -734,6 +843,7 @@ func (z *ClientStatsPayload) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteString(z.Lang) if err != nil { + err = msgp.WrapError(err, "Lang") return } // write "TracerVersion" @@ -743,6 +853,7 @@ func (z *ClientStatsPayload) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteString(z.TracerVersion) if err != nil { + err = msgp.WrapError(err, "TracerVersion") return } // write "RuntimeID" @@ -752,6 +863,7 @@ func (z *ClientStatsPayload) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteString(z.RuntimeID) if err != nil { + err = msgp.WrapError(err, "RuntimeID") return } // write "Sequence" @@ -761,6 +873,7 @@ func (z *ClientStatsPayload) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteUint64(z.Sequence) if err != nil { + err = msgp.WrapError(err, "Sequence") return } // write "AgentAggregation" @@ -770,6 +883,7 @@ func (z *ClientStatsPayload) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteString(z.AgentAggregation) if err != nil { + err = msgp.WrapError(err, "AgentAggregation") return } // write "Service" @@ -779,6 +893,7 @@ func (z *ClientStatsPayload) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteString(z.Service) if err != nil { + err = msgp.WrapError(err, "Service") return } // write "ContainerID" @@ -788,6 +903,7 @@ func (z *ClientStatsPayload) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteString(z.ContainerID) if err != nil { + err = msgp.WrapError(err, "ContainerID") return } // write "Tags" @@ -797,11 +913,13 @@ func (z *ClientStatsPayload) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteArrayHeader(uint32(len(z.Tags))) if err != nil { + err = msgp.WrapError(err, "Tags") return } for za0002 := range z.Tags { err = en.WriteString(z.Tags[za0002]) if err != nil { + err = msgp.WrapError(err, "Tags", za0002) return } } @@ -827,6 +945,7 @@ func (z *ClientStatsPayload) MarshalMsg(b []byte) (o []byte, err error) { for za0001 := range z.Stats { o, err = z.Stats[za0001].MarshalMsg(o) if err != nil { + err = msgp.WrapError(err, "Stats", za0001) return } } @@ -867,34 +986,40 @@ func (z *ClientStatsPayload) UnmarshalMsg(bts []byte) (o []byte, err error) { var zb0001 uint32 zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) if err != nil { + err = msgp.WrapError(err) return } for zb0001 > 0 { zb0001-- field, bts, err = msgp.ReadMapKeyZC(bts) if err != nil { + err = msgp.WrapError(err) return } switch msgp.UnsafeString(field) { case "Hostname": z.Hostname, bts, err = msgp.ReadStringBytes(bts) if err != nil { + err = msgp.WrapError(err, "Hostname") return } case "Env": z.Env, bts, err = msgp.ReadStringBytes(bts) if err != nil { + err = msgp.WrapError(err, "Env") return } case "Version": z.Version, bts, err = msgp.ReadStringBytes(bts) if err != nil { + err = msgp.WrapError(err, "Version") return } case "Stats": var zb0002 uint32 zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) if err != nil { + err = msgp.WrapError(err, "Stats") return } if cap(z.Stats) >= int(zb0002) { @@ -905,48 +1030,57 @@ func (z *ClientStatsPayload) UnmarshalMsg(bts []byte) (o []byte, err error) { for za0001 := range z.Stats { bts, err = z.Stats[za0001].UnmarshalMsg(bts) if err != nil { + err = msgp.WrapError(err, "Stats", za0001) return } } case "Lang": z.Lang, bts, err = msgp.ReadStringBytes(bts) if err != nil { + err = msgp.WrapError(err, "Lang") return } case "TracerVersion": z.TracerVersion, bts, err = msgp.ReadStringBytes(bts) if err != nil { + err = msgp.WrapError(err, "TracerVersion") return } case "RuntimeID": z.RuntimeID, bts, err = msgp.ReadStringBytes(bts) if err != nil { + err = msgp.WrapError(err, "RuntimeID") return } case "Sequence": z.Sequence, bts, err = msgp.ReadUint64Bytes(bts) if err != nil { + err = msgp.WrapError(err, "Sequence") return } case "AgentAggregation": z.AgentAggregation, bts, err = msgp.ReadStringBytes(bts) if err != nil { + err = msgp.WrapError(err, "AgentAggregation") return } case "Service": z.Service, bts, err = msgp.ReadStringBytes(bts) if err != nil { + err = msgp.WrapError(err, "Service") return } case "ContainerID": z.ContainerID, bts, err = msgp.ReadStringBytes(bts) if err != nil { + err = msgp.WrapError(err, "ContainerID") return } case "Tags": var zb0003 uint32 zb0003, bts, err = msgp.ReadArrayHeaderBytes(bts) if err != nil { + err = msgp.WrapError(err, "Tags") return } if cap(z.Tags) >= int(zb0003) { @@ -957,12 +1091,14 @@ func (z *ClientStatsPayload) UnmarshalMsg(bts []byte) (o []byte, err error) { for za0002 := range z.Tags { z.Tags[za0002], bts, err = msgp.ReadStringBytes(bts) if err != nil { + err = msgp.WrapError(err, "Tags", za0002) return } } default: bts, err = msgp.Skip(bts) if err != nil { + err = msgp.WrapError(err) return } } @@ -991,29 +1127,34 @@ func (z *StatsPayload) DecodeMsg(dc *msgp.Reader) (err error) { var zb0001 uint32 zb0001, err = dc.ReadMapHeader() if err != nil { + err = msgp.WrapError(err) return } for zb0001 > 0 { zb0001-- field, err = dc.ReadMapKeyPtr() if err != nil { + err = msgp.WrapError(err) return } switch msgp.UnsafeString(field) { case "AgentHostname": z.AgentHostname, err = dc.ReadString() if err != nil { + err = msgp.WrapError(err, "AgentHostname") return } case "AgentEnv": z.AgentEnv, err = dc.ReadString() if err != nil { + err = msgp.WrapError(err, "AgentEnv") return } case "Stats": var zb0002 uint32 zb0002, err = dc.ReadArrayHeader() if err != nil { + err = msgp.WrapError(err, "Stats") return } if cap(z.Stats) >= int(zb0002) { @@ -1024,22 +1165,26 @@ func (z *StatsPayload) DecodeMsg(dc *msgp.Reader) (err error) { for za0001 := range z.Stats { err = z.Stats[za0001].DecodeMsg(dc) if err != nil { + err = msgp.WrapError(err, "Stats", za0001) return } } case "AgentVersion": z.AgentVersion, err = dc.ReadString() if err != nil { + err = msgp.WrapError(err, "AgentVersion") return } case "ClientComputed": z.ClientComputed, err = dc.ReadBool() if err != nil { + err = msgp.WrapError(err, "ClientComputed") return } default: err = dc.Skip() if err != nil { + err = msgp.WrapError(err) return } } @@ -1057,6 +1202,7 @@ func (z *StatsPayload) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteString(z.AgentHostname) if err != nil { + err = msgp.WrapError(err, "AgentHostname") return } // write "AgentEnv" @@ -1066,6 +1212,7 @@ func (z *StatsPayload) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteString(z.AgentEnv) if err != nil { + err = msgp.WrapError(err, "AgentEnv") return } // write "Stats" @@ -1075,11 +1222,13 @@ func (z *StatsPayload) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteArrayHeader(uint32(len(z.Stats))) if err != nil { + err = msgp.WrapError(err, "Stats") return } for za0001 := range z.Stats { err = z.Stats[za0001].EncodeMsg(en) if err != nil { + err = msgp.WrapError(err, "Stats", za0001) return } } @@ -1090,6 +1239,7 @@ func (z *StatsPayload) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteString(z.AgentVersion) if err != nil { + err = msgp.WrapError(err, "AgentVersion") return } // write "ClientComputed" @@ -1099,6 +1249,7 @@ func (z *StatsPayload) EncodeMsg(en *msgp.Writer) (err error) { } err = en.WriteBool(z.ClientComputed) if err != nil { + err = msgp.WrapError(err, "ClientComputed") return } return @@ -1120,6 +1271,7 @@ func (z *StatsPayload) MarshalMsg(b []byte) (o []byte, err error) { for za0001 := range z.Stats { o, err = z.Stats[za0001].MarshalMsg(o) if err != nil { + err = msgp.WrapError(err, "Stats", za0001) return } } @@ -1139,29 +1291,34 @@ func (z *StatsPayload) UnmarshalMsg(bts []byte) (o []byte, err error) { var zb0001 uint32 zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) if err != nil { + err = msgp.WrapError(err) return } for zb0001 > 0 { zb0001-- field, bts, err = msgp.ReadMapKeyZC(bts) if err != nil { + err = msgp.WrapError(err) return } switch msgp.UnsafeString(field) { case "AgentHostname": z.AgentHostname, bts, err = msgp.ReadStringBytes(bts) if err != nil { + err = msgp.WrapError(err, "AgentHostname") return } case "AgentEnv": z.AgentEnv, bts, err = msgp.ReadStringBytes(bts) if err != nil { + err = msgp.WrapError(err, "AgentEnv") return } case "Stats": var zb0002 uint32 zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) if err != nil { + err = msgp.WrapError(err, "Stats") return } if cap(z.Stats) >= int(zb0002) { @@ -1172,22 +1329,26 @@ func (z *StatsPayload) UnmarshalMsg(bts []byte) (o []byte, err error) { for za0001 := range z.Stats { bts, err = z.Stats[za0001].UnmarshalMsg(bts) if err != nil { + err = msgp.WrapError(err, "Stats", za0001) return } } case "AgentVersion": z.AgentVersion, bts, err = msgp.ReadStringBytes(bts) if err != nil { + err = msgp.WrapError(err, "AgentVersion") return } case "ClientComputed": z.ClientComputed, bts, err = msgp.ReadBoolBytes(bts) if err != nil { + err = msgp.WrapError(err, "ClientComputed") return } default: bts, err = msgp.Skip(bts) if err != nil { + err = msgp.WrapError(err) return } } diff --git a/pkg/trace/pb/stats_gen_test.go b/pkg/trace/pb/stats_gen_test.go index f0dff0bb0b4ac..b52383b3bb278 100644 --- a/pkg/trace/pb/stats_gen_test.go +++ b/pkg/trace/pb/stats_gen_test.go @@ -1,13 +1,6 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2016-present Datadog, Inc. - package pb -// NOTE: THIS FILE WAS PRODUCED BY THE -// MSGP CODE GENERATION TOOL (github.com/tinylib/msgp) -// DO NOT EDIT +// Code generated by github.com/tinylib/msgp DO NOT EDIT. import ( "bytes" @@ -81,7 +74,7 @@ func TestEncodeDecodeClientGroupedStats(t *testing.T) { m := v.Msgsize() if buf.Len() > m { - t.Logf("WARNING: Msgsize() for %v is inaccurate", v) + t.Log("WARNING: TestEncodeDecodeClientGroupedStats Msgsize() is inaccurate") } vn := ClientGroupedStats{} @@ -194,7 +187,7 @@ func TestEncodeDecodeClientStatsBucket(t *testing.T) { m := v.Msgsize() if buf.Len() > m { - t.Logf("WARNING: Msgsize() for %v is inaccurate", v) + t.Log("WARNING: TestEncodeDecodeClientStatsBucket Msgsize() is inaccurate") } vn := ClientStatsBucket{} @@ -307,7 +300,7 @@ func TestEncodeDecodeClientStatsPayload(t *testing.T) { m := v.Msgsize() if buf.Len() > m { - t.Logf("WARNING: Msgsize() for %v is inaccurate", v) + t.Log("WARNING: TestEncodeDecodeClientStatsPayload Msgsize() is inaccurate") } vn := ClientStatsPayload{} @@ -420,7 +413,7 @@ func TestEncodeDecodeStatsPayload(t *testing.T) { m := v.Msgsize() if buf.Len() > m { - t.Logf("WARNING: Msgsize() for %v is inaccurate", v) + t.Log("WARNING: TestEncodeDecodeStatsPayload Msgsize() is inaccurate") } vn := StatsPayload{} diff --git a/pkg/trace/stats/aggregation.go b/pkg/trace/stats/aggregation.go index fac0e052dc8be..a48aa73a93410 100644 --- a/pkg/trace/stats/aggregation.go +++ b/pkg/trace/stats/aggregation.go @@ -15,8 +15,9 @@ import ( ) const ( - tagStatusCode = "http.status_code" - tagSynthetics = "synthetics" + tagStatusCode = "http.status_code" + tagSynthetics = "synthetics" + tagPeerService = "peer.service" ) // Aggregation contains all the dimension on which we aggregate statistics. @@ -27,12 +28,13 @@ type Aggregation struct { // BucketsAggregationKey specifies the key by which a bucket is aggregated. type BucketsAggregationKey struct { - Service string - Name string - Resource string - Type string - StatusCode uint32 - Synthetics bool + Service string + Name string + PeerService string + Resource string + Type string + StatusCode uint32 + Synthetics bool } // PayloadAggregationKey specifies the key by which a payload is aggregated. @@ -62,9 +64,9 @@ func getStatusCode(s *pb.Span) uint32 { } // NewAggregationFromSpan creates a new aggregation from the provided span and env -func NewAggregationFromSpan(s *pb.Span, origin string, aggKey PayloadAggregationKey) Aggregation { +func NewAggregationFromSpan(s *pb.Span, origin string, aggKey PayloadAggregationKey, enablePeerSvcAgg bool) Aggregation { synthetics := strings.HasPrefix(origin, tagSynthetics) - return Aggregation{ + agg := Aggregation{ PayloadAggregationKey: aggKey, BucketsAggregationKey: BucketsAggregationKey{ Resource: s.Resource, @@ -75,17 +77,22 @@ func NewAggregationFromSpan(s *pb.Span, origin string, aggKey PayloadAggregation Synthetics: synthetics, }, } + if enablePeerSvcAgg { + agg.PeerService = s.Meta[tagPeerService] + } + return agg } // NewAggregationFromGroup gets the Aggregation key of grouped stats. func NewAggregationFromGroup(g pb.ClientGroupedStats) Aggregation { return Aggregation{ BucketsAggregationKey: BucketsAggregationKey{ - Resource: g.Resource, - Service: g.Service, - Name: g.Name, - StatusCode: g.HTTPStatusCode, - Synthetics: g.Synthetics, + Resource: g.Resource, + Service: g.Service, + PeerService: g.PeerService, + Name: g.Name, + StatusCode: g.HTTPStatusCode, + Synthetics: g.Synthetics, }, } } diff --git a/pkg/trace/stats/aggregation_test.go b/pkg/trace/stats/aggregation_test.go index af67669f648ef..5d3c116d79bd4 100644 --- a/pkg/trace/stats/aggregation_test.go +++ b/pkg/trace/stats/aggregation_test.go @@ -9,6 +9,7 @@ import ( "testing" "github.com/DataDog/datadog-agent/pkg/trace/pb" + "github.com/stretchr/testify/assert" ) func TestGetStatusCode(t *testing.T) { @@ -51,3 +52,40 @@ func TestGetStatusCode(t *testing.T) { } } } + +func TestNewAggregationPeerService(t *testing.T) { + for _, tt := range []struct { + in *pb.Span + enablePeerSvcAgg bool + res Aggregation + }{ + { + &pb.Span{}, + false, + Aggregation{}, + }, + { + &pb.Span{}, + true, + Aggregation{}, + }, + { + &pb.Span{ + Service: "a", + Meta: map[string]string{"peer.service": "remote-service"}, + }, + false, + Aggregation{BucketsAggregationKey: BucketsAggregationKey{Service: "a"}}, + }, + { + &pb.Span{ + Service: "a", + Meta: map[string]string{"peer.service": "remote-service"}, + }, + true, + Aggregation{BucketsAggregationKey: BucketsAggregationKey{Service: "a", PeerService: "remote-service"}}, + }, + } { + assert.Equal(t, tt.res, NewAggregationFromSpan(tt.in, "", PayloadAggregationKey{}, tt.enablePeerSvcAgg)) + } +} diff --git a/pkg/trace/stats/client_stats_aggregator.go b/pkg/trace/stats/client_stats_aggregator.go index 28d56f034cabc..dc2614e08a6ac 100644 --- a/pkg/trace/stats/client_stats_aggregator.go +++ b/pkg/trace/stats/client_stats_aggregator.go @@ -39,11 +39,12 @@ type ClientStatsAggregator struct { out chan pb.StatsPayload buckets map[int64]*bucket // buckets used to aggregate client stats - flushTicker *time.Ticker - oldestTs time.Time - agentEnv string - agentHostname string - agentVersion string + flushTicker *time.Ticker + oldestTs time.Time + agentEnv string + agentHostname string + agentVersion string + peerSvcAggregation bool // flag to enable peer.service aggregation exit chan struct{} done chan struct{} @@ -51,18 +52,20 @@ type ClientStatsAggregator struct { // NewClientStatsAggregator initializes a new aggregator ready to be started func NewClientStatsAggregator(conf *config.AgentConfig, out chan pb.StatsPayload) *ClientStatsAggregator { - return &ClientStatsAggregator{ - flushTicker: time.NewTicker(time.Second), - In: make(chan pb.ClientStatsPayload, 10), - buckets: make(map[int64]*bucket, 20), - out: out, - agentEnv: conf.DefaultEnv, - agentHostname: conf.Hostname, - agentVersion: conf.AgentVersion, - oldestTs: alignAggTs(time.Now().Add(bucketDuration - oldestBucketStart)), - exit: make(chan struct{}), - done: make(chan struct{}), + c := &ClientStatsAggregator{ + flushTicker: time.NewTicker(time.Second), + In: make(chan pb.ClientStatsPayload, 10), + buckets: make(map[int64]*bucket, 20), + out: out, + agentEnv: conf.DefaultEnv, + agentHostname: conf.Hostname, + agentVersion: conf.AgentVersion, + peerSvcAggregation: conf.PeerServiceAggregation, + oldestTs: alignAggTs(time.Now().Add(bucketDuration - oldestBucketStart)), + exit: make(chan struct{}), + done: make(chan struct{}), } + return c } // Start starts the aggregator. @@ -135,7 +138,7 @@ func (a *ClientStatsAggregator) add(now time.Time, p pb.ClientStatsPayload) { a.buckets[ts.Unix()] = b } p.Stats = []pb.ClientStatsBucket{clientBucket} - a.flush(b.add(p)) + a.flush(b.add(p, a.peerSvcAggregation)) } } @@ -173,7 +176,7 @@ type bucket struct { agg map[PayloadAggregationKey]map[BucketsAggregationKey]*aggregatedCounts } -func (b *bucket) add(p pb.ClientStatsPayload) []pb.ClientStatsPayload { +func (b *bucket) add(p pb.ClientStatsPayload, enablePeerSvcAgg bool) []pb.ClientStatsPayload { b.n++ if b.n == 1 { b.first = p @@ -184,15 +187,15 @@ func (b *bucket) add(p pb.ClientStatsPayload) []pb.ClientStatsPayload { first := b.first b.first = pb.ClientStatsPayload{} b.agg = make(map[PayloadAggregationKey]map[BucketsAggregationKey]*aggregatedCounts, 2) - b.aggregateCounts(first) - b.aggregateCounts(p) + b.aggregateCounts(first, enablePeerSvcAgg) + b.aggregateCounts(p, enablePeerSvcAgg) return []pb.ClientStatsPayload{trimCounts(first), trimCounts(p)} } - b.aggregateCounts(p) + b.aggregateCounts(p, enablePeerSvcAgg) return []pb.ClientStatsPayload{trimCounts(p)} } -func (b *bucket) aggregateCounts(p pb.ClientStatsPayload) { +func (b *bucket) aggregateCounts(p pb.ClientStatsPayload, enablePeerSvcAgg bool) { payloadAggKey := newPayloadAggregationKey(p.Env, p.Hostname, p.Version, p.ContainerID) payloadAgg, ok := b.agg[payloadAggKey] if !ok { @@ -205,7 +208,7 @@ func (b *bucket) aggregateCounts(p pb.ClientStatsPayload) { } for _, s := range p.Stats { for _, sb := range s.Stats { - aggKey := newBucketAggregationKey(sb) + aggKey := newBucketAggregationKey(sb, enablePeerSvcAgg) agg, ok := payloadAgg[aggKey] if !ok { agg = &aggregatedCounts{} @@ -232,6 +235,7 @@ func (b *bucket) aggregationToPayloads() []pb.ClientStatsPayload { for aggrKey, counts := range aggrCounts { stats = append(stats, pb.ClientGroupedStats{ Service: aggrKey.Service, + PeerService: aggrKey.PeerService, Name: aggrKey.Name, Resource: aggrKey.Resource, HTTPStatusCode: aggrKey.StatusCode, @@ -263,8 +267,8 @@ func newPayloadAggregationKey(env, hostname, version, cid string) PayloadAggrega return PayloadAggregationKey{Env: env, Hostname: hostname, Version: version, ContainerID: cid} } -func newBucketAggregationKey(b pb.ClientGroupedStats) BucketsAggregationKey { - return BucketsAggregationKey{ +func newBucketAggregationKey(b pb.ClientGroupedStats, enablePeerSvcAgg bool) BucketsAggregationKey { + k := BucketsAggregationKey{ Service: b.Service, Name: b.Name, Resource: b.Resource, @@ -272,6 +276,10 @@ func newBucketAggregationKey(b pb.ClientGroupedStats) BucketsAggregationKey { Synthetics: b.Synthetics, StatusCode: b.HTTPStatusCode, } + if enablePeerSvcAgg { + k.PeerService = b.PeerService + } + return k } func trimCounts(p pb.ClientStatsPayload) pb.ClientStatsPayload { diff --git a/pkg/trace/stats/client_stats_aggregator_test.go b/pkg/trace/stats/client_stats_aggregator_test.go index 68051bd73bf8e..deda8a0441de5 100644 --- a/pkg/trace/stats/client_stats_aggregator_test.go +++ b/pkg/trace/stats/client_stats_aggregator_test.go @@ -52,6 +52,7 @@ func payloadWithCounts(ts time.Time, k BucketsAggregationKey, hits, errors, dura Stats: []pb.ClientGroupedStats{ { Service: k.Service, + PeerService: k.PeerService, Name: k.Name, Resource: k.Resource, HTTPStatusCode: k.StatusCode, @@ -231,6 +232,8 @@ func TestFuzzCountFields(t *testing.T) { assert := assert.New(t) for i := 0; i < 30; i++ { a := newTestAggregator() + // Ensure that peer.service aggregation is on. Some tests may expect non-empty values for peer.service. + a.peerSvcAggregation = true payloadTime := time.Now().Truncate(bucketDuration) merge1 := getTestStatsWithStart(payloadTime) @@ -322,6 +325,8 @@ func TestCountAggregation(t *testing.T) { tc.res.Duration = 403 assert.ElementsMatch(aggCounts.Stats[0].Stats[0].Stats, []pb.ClientGroupedStats{ tc.res, + // Additional grouped stat object that corresponds to the keyDefault/cDefault. + // We do not expect this to be aggregated with the non-default key in the test. { Hits: 0, Errors: 2, @@ -333,6 +338,86 @@ func TestCountAggregation(t *testing.T) { } } +func TestCountAggregationPeerService(t *testing.T) { + assert := assert.New(t) + type tt struct { + k BucketsAggregationKey + res pb.ClientGroupedStats + name string + enablePeerSvcAgg bool + } + tts := []tt{ + { + BucketsAggregationKey{Service: "s", PeerService: "remote-service"}, + pb.ClientGroupedStats{Service: "s", PeerService: ""}, + "peer.service", + false, + }, + { + BucketsAggregationKey{Service: "s", PeerService: "remote-service"}, + pb.ClientGroupedStats{Service: "s", PeerService: "remote-service"}, + "peer.service", + true, + }, + } + for _, tc := range tts { + t.Run(tc.name, func(t *testing.T) { + a := newTestAggregator() + a.peerSvcAggregation = tc.enablePeerSvcAgg + testTime := time.Unix(time.Now().Unix(), 0) + + c1 := payloadWithCounts(testTime, tc.k, 11, 7, 100) + c2 := payloadWithCounts(testTime, tc.k, 27, 2, 300) + c3 := payloadWithCounts(testTime, tc.k, 5, 10, 3) + keyDefault := BucketsAggregationKey{} + cDefault := payloadWithCounts(testTime, keyDefault, 0, 2, 4) + + assert.Len(a.out, 0) + a.add(testTime, deepCopy(c1)) + a.add(testTime, deepCopy(c2)) + a.add(testTime, deepCopy(c3)) + a.add(testTime, deepCopy(cDefault)) + assert.Len(a.out, 3) + a.flushOnTime(testTime.Add(oldestBucketStart + time.Nanosecond)) + assert.Len(a.out, 4) + + assertDistribPayload(t, wrapPayloads([]pb.ClientStatsPayload{c1, c2}), <-a.out) + assertDistribPayload(t, wrapPayload(c3), <-a.out) + assertDistribPayload(t, wrapPayload(cDefault), <-a.out) + aggCounts := <-a.out + assertAggCountsPayload(t, aggCounts) + + tc.res.Hits = 43 + tc.res.Errors = 19 + tc.res.Duration = 403 + assert.ElementsMatch(aggCounts.Stats[0].Stats[0].Stats, []pb.ClientGroupedStats{ + tc.res, + // Additional grouped stat object that corresponds to the keyDefault/cDefault. + // We do not expect this to be aggregated with the non-default key in the test. + { + Hits: 0, + Errors: 2, + Duration: 4, + }, + }) + assert.Len(a.buckets, 0) + }) + } +} + +func TestNewBucketAggregationKeyPeerService(t *testing.T) { + t.Run("disabled", func(t *testing.T) { + assert := assert.New(t) + r := newBucketAggregationKey(pb.ClientGroupedStats{Service: "a", PeerService: "remote-test"}, false) + assert.Equal(BucketsAggregationKey{Service: "a"}, r) + }) + t.Run("enabled", func(t *testing.T) { + assert := assert.New(t) + r := newBucketAggregationKey(pb.ClientGroupedStats{Service: "a", PeerService: "remote-test"}, true) + assert.Equal(BucketsAggregationKey{Service: "a", PeerService: "remote-test"}, r) + }) +} + func deepCopy(p pb.ClientStatsPayload) pb.ClientStatsPayload { new := p new.Stats = deepCopyStatsBucket(p.Stats) diff --git a/pkg/trace/stats/concentrator.go b/pkg/trace/stats/concentrator.go index f4289c680a42c..5de3c578086de 100644 --- a/pkg/trace/stats/concentrator.go +++ b/pkg/trace/stats/concentrator.go @@ -6,6 +6,7 @@ package stats import ( + "strings" "sync" "time" @@ -37,14 +38,16 @@ type Concentrator struct { // It means that we can compute stats only for the last `bufferLen * bsize` and that we // wait such time before flushing the stats. // This only applies to past buckets. Stats buckets in the future are allowed with no restriction. - bufferLen int - exit chan struct{} - exitWG sync.WaitGroup - buckets map[int64]*RawBucket // buckets used to aggregate stats per timestamp - mu sync.Mutex - agentEnv string - agentHostname string - agentVersion string + bufferLen int + exit chan struct{} + exitWG sync.WaitGroup + buckets map[int64]*RawBucket // buckets used to aggregate stats per timestamp + mu sync.Mutex + agentEnv string + agentHostname string + agentVersion string + peerSvcAggregation bool // flag to enable peer.service aggregation + computeStatsBySpanKind bool // flag to enable computation of stats through checking the span.kind field } // NewConcentrator initializes a new concentrator ready to be started @@ -57,13 +60,15 @@ func NewConcentrator(conf *config.AgentConfig, out chan pb.StatsPayload, now tim // override buckets which could have been sent before an Agent restart. oldestTs: alignTs(now.UnixNano(), bsize), // TODO: Move to configuration. - bufferLen: defaultBufferLen, - In: make(chan Input, 100), - Out: out, - exit: make(chan struct{}), - agentEnv: conf.DefaultEnv, - agentHostname: conf.Hostname, - agentVersion: conf.AgentVersion, + bufferLen: defaultBufferLen, + In: make(chan Input, 100), + Out: out, + exit: make(chan struct{}), + agentEnv: conf.DefaultEnv, + agentHostname: conf.Hostname, + agentVersion: conf.AgentVersion, + peerSvcAggregation: conf.PeerServiceAggregation, + computeStatsBySpanKind: conf.ComputeStatsBySpanKind, } return &c } @@ -113,6 +118,17 @@ func (c *Concentrator) Stop() { c.exitWG.Wait() } +// computeStatsForSpanKind returns true if the span.kind value makes the span eligible for stats computation. +func computeStatsForSpanKind(s *pb.Span) bool { + k := strings.ToLower(s.Meta["span.kind"]) + switch k { + case "server", "consumer", "client", "producer": + return true + default: + return false + } +} + // Input specifies a set of traces originating from a certain payload. type Input struct { Traces []traceutil.ProcessedTrace @@ -165,7 +181,11 @@ func (c *Concentrator) addNow(pt *traceutil.ProcessedTrace, containerID string) } for _, s := range pt.TraceChunk.Spans { isTop := traceutil.HasTopLevel(s) - if !(isTop || traceutil.IsMeasured(s)) || traceutil.IsPartialSnapshot(s) { + eligibleSpanKind := c.computeStatsBySpanKind && computeStatsForSpanKind(s) + if !(isTop || traceutil.IsMeasured(s) || eligibleSpanKind) { + continue + } + if traceutil.IsPartialSnapshot(s) { continue } end := s.Start + s.Duration @@ -181,7 +201,7 @@ func (c *Concentrator) addNow(pt *traceutil.ProcessedTrace, containerID string) b = NewRawBucket(uint64(btime), uint64(c.bsize)) c.buckets[btime] = b } - b.HandleSpan(s, weight, isTop, pt.TraceChunk.Origin, aggKey) + b.HandleSpan(s, weight, isTop, pt.TraceChunk.Origin, aggKey, c.peerSvcAggregation) } } diff --git a/pkg/trace/stats/concentrator_test.go b/pkg/trace/stats/concentrator_test.go index a602f23072370..f0b7cf16a65d0 100644 --- a/pkg/trace/stats/concentrator_test.go +++ b/pkg/trace/stats/concentrator_test.go @@ -526,3 +526,223 @@ func TestIgnoresPartialSpans(t *testing.T) { stats := c.flushNow(now.UnixNano() + int64(c.bufferLen)*testBucketInterval) assert.Empty(stats.GetStats()) } + +// TestPeerServiceStats tests that if peer.service is present in the span's meta, we will generate stats with it as an additional field. +func TestPeerServiceStats(t *testing.T) { + assert := assert.New(t) + now := time.Now() + sp := &pb.Span{ + ParentID: 0, + SpanID: 1, + Service: "myservice", + Name: "http.server.request", + Resource: "GET /users", + Duration: 100, + } + peerSvcSp := &pb.Span{ + ParentID: sp.SpanID, + SpanID: 2, + Service: "myservice", + Name: "postgres.query", + Resource: "SELECT user_id from users WHERE user_name = ?", + Duration: 75, + Metrics: map[string]float64{"_dd.measured": 1.0}, + Meta: map[string]string{"peer.service": "users-db"}, + } + t.Run("enabled", func(t *testing.T) { + spans := []*pb.Span{sp, peerSvcSp} + traceutil.ComputeTopLevel(spans) + testTrace := toProcessedTrace(spans, "none", "") + c := NewTestConcentrator(now) + c.peerSvcAggregation = true + c.addNow(testTrace, "") + stats := c.flushNow(now.UnixNano() + int64(c.bufferLen)*testBucketInterval) + assert.Len(stats.Stats[0].Stats[0].Stats, 2) + for _, st := range stats.Stats[0].Stats[0].Stats { + if st.Name == "postgres.query" { + assert.Equal("users-db", st.PeerService) + } else { + assert.Equal("", st.PeerService) + } + } + }) + t.Run("disabled", func(t *testing.T) { + spans := []*pb.Span{sp, peerSvcSp} + traceutil.ComputeTopLevel(spans) + testTrace := toProcessedTrace(spans, "none", "") + c := NewTestConcentrator(now) + c.peerSvcAggregation = false + c.addNow(testTrace, "") + stats := c.flushNow(now.UnixNano() + int64(c.bufferLen)*testBucketInterval) + assert.Len(stats.Stats[0].Stats[0].Stats, 2) + for _, st := range stats.Stats[0].Stats[0].Stats { + assert.Equal("", st.PeerService) + } + }) +} + +// TestComputeStatsThroughSpanKindCheck ensures that we generate stats for spans that have an eligible span.kind. +func TestComputeStatsThroughSpanKindCheck(t *testing.T) { + assert := assert.New(t) + now := time.Now() + sp := &pb.Span{ + ParentID: 0, + SpanID: 1, + Service: "myservice", + Name: "http.server.request", + Resource: "GET /users", + Duration: 500, + } + // Even though span.kind = internal is an ineligible case, we should still compute stats based on the top_level flag. + // This is a case that should rarely (if ever) come up in practice though. + topLevelInternalSpan := &pb.Span{ + ParentID: sp.SpanID, + SpanID: 2, + Service: "myservice", + Name: "internal.op1", + Resource: "compute_1", + Duration: 25, + Metrics: map[string]float64{"_top_level": 1.0}, + Meta: map[string]string{"span.kind": "internal"}, + } + // Even though span.kind = internal is an ineligible case, we should still compute stats based on the measured flag. + measuredInternalSpan := &pb.Span{ + ParentID: sp.SpanID, + SpanID: 3, + Service: "myservice", + Name: "internal.op2", + Resource: "compute_2", + Duration: 25, + Metrics: map[string]float64{"_dd.measured": 1.0}, + Meta: map[string]string{"span.kind": "internal"}, + } + // client is an eligible span.kind for stats computation. + clientSpan := &pb.Span{ + ParentID: sp.SpanID, + SpanID: 4, + Service: "myservice", + Name: "postgres.query", + Resource: "SELECT user_id from users WHERE user_name = ?", + Duration: 75, + Meta: map[string]string{"span.kind": "client"}, + } + t.Run("disabled", func(t *testing.T) { + spans := []*pb.Span{sp, topLevelInternalSpan, measuredInternalSpan, clientSpan} + traceutil.ComputeTopLevel(spans) + testTrace := toProcessedTrace(spans, "none", "") + c := NewTestConcentrator(now) + c.addNow(testTrace, "") + stats := c.flushNow(now.UnixNano() + int64(c.bufferLen)*testBucketInterval) + assert.Len(stats.Stats[0].Stats[0].Stats, 3) + opNames := make(map[string]struct{}, 3) + for _, s := range stats.Stats { + for _, b := range s.Stats { + for _, g := range b.Stats { + opNames[g.Name] = struct{}{} + } + } + } + assert.Equal(map[string]struct{}{"http.server.request": {}, "internal.op1": {}, "internal.op2": {}}, opNames) + }) + t.Run("enabled", func(t *testing.T) { + spans := []*pb.Span{sp, topLevelInternalSpan, measuredInternalSpan, clientSpan} + traceutil.ComputeTopLevel(spans) + testTrace := toProcessedTrace(spans, "none", "") + c := NewTestConcentrator(now) + c.computeStatsBySpanKind = true + c.addNow(testTrace, "") + stats := c.flushNow(now.UnixNano() + int64(c.bufferLen)*testBucketInterval) + assert.Len(stats.Stats[0].Stats[0].Stats, 4) + opNames := make(map[string]struct{}, 4) + for _, s := range stats.Stats { + for _, b := range s.Stats { + for _, g := range b.Stats { + opNames[g.Name] = struct{}{} + } + } + } + assert.Equal(map[string]struct{}{"http.server.request": {}, "internal.op1": {}, "internal.op2": {}, "postgres.query": {}}, opNames) + }) +} + +func TestComputeStatsForSpanKind(t *testing.T) { + assert := assert.New(t) + + type testCase struct { + s *pb.Span + res bool + } + + for _, tc := range []testCase{ + { + &pb.Span{Meta: map[string]string{"span.kind": "server"}}, + true, + }, + { + &pb.Span{Meta: map[string]string{"span.kind": "consumer"}}, + true, + }, + { + &pb.Span{Meta: map[string]string{"span.kind": "client"}}, + true, + }, + { + &pb.Span{Meta: map[string]string{"span.kind": "producer"}}, + true, + }, + { + &pb.Span{Meta: map[string]string{"span.kind": "internal"}}, + false, + }, + { + &pb.Span{Meta: map[string]string{"span.kind": "SERVER"}}, + true, + }, + { + &pb.Span{Meta: map[string]string{"span.kind": "CONSUMER"}}, + true, + }, + { + &pb.Span{Meta: map[string]string{"span.kind": "CLIENT"}}, + true, + }, + { + &pb.Span{Meta: map[string]string{"span.kind": "PRODUCER"}}, + true, + }, + { + &pb.Span{Meta: map[string]string{"span.kind": "INTERNAL"}}, + false, + }, + { + &pb.Span{Meta: map[string]string{"span.kind": "SErVER"}}, + true, + }, + { + &pb.Span{Meta: map[string]string{"span.kind": "COnSUMER"}}, + true, + }, + { + &pb.Span{Meta: map[string]string{"span.kind": "CLiENT"}}, + true, + }, + { + &pb.Span{Meta: map[string]string{"span.kind": "PRoDUCER"}}, + true, + }, + { + &pb.Span{Meta: map[string]string{"span.kind": "INtERNAL"}}, + false, + }, + { + &pb.Span{Meta: map[string]string{"span.kind": ""}}, + false, + }, + { + &pb.Span{Meta: map[string]string{}}, + false, + }, + } { + assert.Equal(tc.res, computeStatsForSpanKind(tc.s)) + } +} diff --git a/pkg/trace/stats/statsraw.go b/pkg/trace/stats/statsraw.go index 666b39c13a36d..b01051536ac5b 100644 --- a/pkg/trace/stats/statsraw.go +++ b/pkg/trace/stats/statsraw.go @@ -72,6 +72,7 @@ func (s *groupedStats) export(a Aggregation) (pb.ClientGroupedStats, error) { OkSummary: okSummary, ErrorSummary: errSummary, Synthetics: a.Synthetics, + PeerService: a.PeerService, }, nil } @@ -144,11 +145,11 @@ func (sb *RawBucket) Export() map[PayloadAggregationKey]pb.ClientStatsBucket { } // HandleSpan adds the span to this bucket stats, aggregated with the finest grain matching given aggregators -func (sb *RawBucket) HandleSpan(s *pb.Span, weight float64, isTop bool, origin string, aggKey PayloadAggregationKey) { +func (sb *RawBucket) HandleSpan(s *pb.Span, weight float64, isTop bool, origin string, aggKey PayloadAggregationKey, enablePeerSvcAgg bool) { if aggKey.Env == "" { panic("env should never be empty") } - aggr := NewAggregationFromSpan(s, origin, aggKey) + aggr := NewAggregationFromSpan(s, origin, aggKey, enablePeerSvcAgg) sb.add(s, weight, isTop, aggr) } diff --git a/pkg/trace/stats/statsraw_test.go b/pkg/trace/stats/statsraw_test.go index 5c87f4f864b11..ad5db46be6d46 100644 --- a/pkg/trace/stats/statsraw_test.go +++ b/pkg/trace/stats/statsraw_test.go @@ -20,7 +20,7 @@ func TestGrain(t *testing.T) { Env: "default", Hostname: "default", ContainerID: "cid", - }) + }, false) assert.Equal(Aggregation{ PayloadAggregationKey: PayloadAggregationKey{ Env: "default", @@ -35,6 +35,53 @@ func TestGrain(t *testing.T) { }, aggr) } +func TestGrainWithPeerService(t *testing.T) { + t.Run("disabled", func(t *testing.T) { + assert := assert.New(t) + s := pb.Span{Service: "thing", Name: "other", Resource: "yo", Meta: map[string]string{"peer.service": "remote-service"}} + aggr := NewAggregationFromSpan(&s, "", PayloadAggregationKey{ + Env: "default", + Hostname: "default", + ContainerID: "cid", + }, false) + assert.Equal(Aggregation{ + PayloadAggregationKey: PayloadAggregationKey{ + Env: "default", + Hostname: "default", + ContainerID: "cid", + }, + BucketsAggregationKey: BucketsAggregationKey{ + Service: "thing", + Name: "other", + Resource: "yo", + PeerService: "", + }, + }, aggr) + }) + t.Run("enabled", func(t *testing.T) { + assert := assert.New(t) + s := pb.Span{Service: "thing", Name: "other", Resource: "yo", Meta: map[string]string{"peer.service": "remote-service"}} + aggr := NewAggregationFromSpan(&s, "", PayloadAggregationKey{ + Env: "default", + Hostname: "default", + ContainerID: "cid", + }, true) + assert.Equal(Aggregation{ + PayloadAggregationKey: PayloadAggregationKey{ + Env: "default", + Hostname: "default", + ContainerID: "cid", + }, + BucketsAggregationKey: BucketsAggregationKey{ + Service: "thing", + Name: "other", + Resource: "yo", + PeerService: "remote-service", + }, + }, aggr) + }) +} + func TestGrainWithExtraTags(t *testing.T) { assert := assert.New(t) s := pb.Span{Service: "thing", Name: "other", Resource: "yo", Meta: map[string]string{tagStatusCode: "418"}} @@ -43,7 +90,7 @@ func TestGrainWithExtraTags(t *testing.T) { Version: "v0", Env: "default", ContainerID: "cid", - }) + }, false) assert.Equal(Aggregation{ PayloadAggregationKey: PayloadAggregationKey{ Hostname: "host-id", @@ -67,7 +114,7 @@ func BenchmarkHandleSpanRandom(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { for _, span := range benchSpans { - sb.HandleSpan(span, 1, true, "", PayloadAggregationKey{"a", "b", "c", "d"}) + sb.HandleSpan(span, 1, true, "", PayloadAggregationKey{"a", "b", "c", "d"}, true) } } } diff --git a/pkg/trace/testutil/stats.go b/pkg/trace/testutil/stats.go index 67a2372c14860..658988dd3cb99 100644 --- a/pkg/trace/testutil/stats.go +++ b/pkg/trace/testutil/stats.go @@ -30,7 +30,7 @@ func BucketWithSpans(spans []*pb.Span) pb.ClientStatsBucket { for _, s := range spans { // override version to ensure all buckets will have the same payload key. s.Meta["version"] = "" - srb.HandleSpan(s, 0, true, "", aggKey) + srb.HandleSpan(s, 0, true, "", aggKey, true) } buckets := srb.Export() if len(buckets) != 1 { diff --git a/pkg/trace/traceutil/normalize.go b/pkg/trace/traceutil/normalize.go index 7859d4f6df1ef..6aaa4edc38c8e 100644 --- a/pkg/trace/traceutil/normalize.go +++ b/pkg/trace/traceutil/normalize.go @@ -72,6 +72,24 @@ func NormalizeService(svc string, lang string) (string, error) { return s, err } +// NormalizePeerService normalizes a span's peer.service and returns an error describing the reason +// (if any) why the name was modified. +func NormalizePeerService(svc string) (string, error) { + if svc == "" { + return "", nil + } + var err error + if len(svc) > MaxServiceLen { + svc = TruncateUTF8(svc, MaxServiceLen) + err = ErrTooLong + } + s := NormalizeTag(svc) + if s == "" { + return "", ErrInvalid + } + return s, err +} + // fallbackServiceNames is a cache of default service names to use // when the span's service is unset or invalid. var fallbackServiceNames sync.Map diff --git a/pkg/trace/traceutil/normalize_test.go b/pkg/trace/traceutil/normalize_test.go index 55b1786df6726..21917df09d6d7 100644 --- a/pkg/trace/traceutil/normalize_test.go +++ b/pkg/trace/traceutil/normalize_test.go @@ -162,3 +162,37 @@ func TestNormalizeService(t *testing.T) { assert.Equal(t, testCase.err, err) } } + +func TestNormalizePeerService(t *testing.T) { + testCases := []struct { + peerService string + normalized string + err error + }{ + { + peerService: "", + normalized: "", + err: nil, + }, + { + peerService: "remote-service", + normalized: "remote-service", + err: nil, + }, + { + peerService: "Too$Long$.Too$Long$.Too$Long$.Too$Long$.Too$Long$.Too$Long$.Too$Long$.Too$Long$.Too$Long$.Too$Long$.Too$Long$.", + normalized: "too_long_.too_long_.too_long_.too_long_.too_long_.too_long_.too_long_.too_long_.too_long_.too_long_.", + err: ErrTooLong, + }, + { + peerService: "bad$remote**service", + normalized: "bad_remote_service", + err: nil, + }, + } + for _, testCase := range testCases { + out, err := NormalizePeerService(testCase.peerService) + assert.Equal(t, testCase.normalized, out) + assert.Equal(t, testCase.err, err) + } +} diff --git a/pkg/trace/traceutil/span.go b/pkg/trace/traceutil/span.go index e37f2c680624d..8c7636baa635d 100644 --- a/pkg/trace/traceutil/span.go +++ b/pkg/trace/traceutil/span.go @@ -16,7 +16,6 @@ import ( const ( // This is a special metric, it's 1 if the span is top-level, 0 if not. topLevelKey = "_top_level" - // measuredKey is a special metric flag that marks a span for trace metrics calculation. measuredKey = "_dd.measured" // tracerTopLevelKey is a metric flag set by tracers on top_level spans diff --git a/releasenotes/notes/add-peer-service-for-trace-stats-225f1b90c9627c18.yaml b/releasenotes/notes/add-peer-service-for-trace-stats-225f1b90c9627c18.yaml new file mode 100644 index 0000000000000..6393b4e425abe --- /dev/null +++ b/releasenotes/notes/add-peer-service-for-trace-stats-225f1b90c9627c18.yaml @@ -0,0 +1,6 @@ +--- +features: + - | + APM: Addition of configuration to add ``peer.service`` to trace stats exported by the Agent. + - | + APM: Addition of configuration to compute trace stats on spans based on their ``span.kind`` value.