From 0a2ac0761517b769e99a72101a940e83b9de4e2a Mon Sep 17 00:00:00 2001 From: Arnav Dugar <87779081+arnavdugar-stripe@users.noreply.github.com> Date: Mon, 29 Aug 2022 15:09:38 -0700 Subject: [PATCH] Add a metric for counting metrics received. --- config.go | 1 + server.go | 42 +++++++++++++++++++++++++++++++++--------- 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/config.go b/config.go index b0668b508..cd08cb21a 100644 --- a/config.go +++ b/config.go @@ -34,6 +34,7 @@ type Config struct { Features struct { DiagnosticsMetricsEnabled bool `yaml:"diagnostics_metrics_enabled"` EnableMetricSinkRouting bool `yaml:"enable_metric_sink_routing"` + EnableSourceMetricCount bool `yaml:"enable_source_metric_count"` MigrateMetricSinks bool `yaml:"migrate_metric_sinks"` MigrateSpanSinks bool `yaml:"migrate_span_sinks"` ProxyProtocol string `yaml:"proxy_protocol"` diff --git a/server.go b/server.go index 8cceeacb1..d1590fcb4 100644 --- a/server.go +++ b/server.go @@ -333,14 +333,23 @@ func scopesFromConfig(conf Config) (scopedstatsd.MetricScopes, error) { } type ingest struct { - server *Server - tags []string + sourceName string + server *Server + tags []string } var _ sources.Ingest = &ingest{} func (ingest *ingest) IngestMetric(metric *samplers.UDPMetric) { metric.Tags = append(metric.Tags, ingest.tags...) + + if ingest.server.Config.Features.EnableSourceMetricCount { + ingest.server.Statsd.Count( + "source.metrics_count", int64(1), []string{ + "protocol:udp", + "source:" + ingest.sourceName, + }, 1.0) + } ingest.server.ingestMetric(metric) } @@ -356,8 +365,15 @@ func (ingest *ingest) IngestMetricProto(metric *metricpb.Metric) { for _, tag := range metric.Tags { h = fnv1a.AddString32(h, tag) } - workerIndex := h % uint32(len(ingest.server.Workers)) + + if ingest.server.Config.Features.EnableSourceMetricCount { + ingest.server.Statsd.Count( + "source.metrics_count", int64(1), []string{ + "protocol:proto", + "source:" + ingest.sourceName, + }, 1.0) + } ingest.server.Workers[workerIndex].ImportMetricChan <- metric } @@ -514,20 +530,27 @@ func NewFromConfig(config ServerConfig) (*Server, error) { } ret.HistogramAggregates.Count = len(conf.Aggregates) - stats, err := statsd.New(conf.StatsAddress, statsd.WithoutTelemetry(), statsd.WithMaxMessagesPerPayload(4096)) + statsClient, err := statsd.New( + conf.StatsAddress, + statsd.WithAggregationInterval(conf.Interval), + statsd.WithChannelMode(), + statsd.WithChannelModeBufferSize(64), + statsd.WithClientSideAggregation(), + statsd.WithMaxMessagesPerPayload(4096), + statsd.WithoutTelemetry()) if err != nil { return ret, err } - stats.Namespace = "veneur." + statsClient.Namespace = "veneur." scopes, err := scopesFromConfig(conf) if err != nil { return ret, err } - ret.Statsd = scopedstatsd.NewClient(stats, conf.VeneurMetricsAdditionalTags, scopes) + ret.Statsd = scopedstatsd.NewClient(statsClient, conf.VeneurMetricsAdditionalTags, scopes) ret.TraceClient, err = trace.NewChannelClient(ret.SpanChan, - trace.ReportStatistics(stats, 1*time.Second, []string{"ssf_format:internal"}), + trace.ReportStatistics(statsClient, 1*time.Second, []string{"ssf_format:internal"}), normalizeSpans(conf), ) if err != nil { @@ -1374,8 +1397,9 @@ func (s *Server) Serve() { for _, source := range s.sources { go func(source internalSource) { source.source.Start(&ingest{ - server: s, - tags: source.tags, + sourceName: source.source.Name(), + server: s, + tags: source.tags, }) done <- struct{}{} }(source)