From 3a2702934f7623695f968e8b3e2cbc4155b62127 Mon Sep 17 00:00:00 2001 From: Mario Date: Tue, 24 Oct 2023 14:04:31 +0200 Subject: [PATCH] [servicegraphconnector] Add flush interval config (#27879) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit **Description:** Add a config option to periodically flush metrics, instead of flushing on every push. **Link to tracking Issue:** #27679 **Testing:** Added tests that verify metrics are flushed asynchronously **Documentation:** Documentation added to `config.go` --------- Co-authored-by: Juraci Paixão Kröhling --- .chloggen/servicegraph-flush-interval.yaml | 27 ++++++++ processor/servicegraphprocessor/config.go | 4 ++ processor/servicegraphprocessor/processor.go | 63 +++++++++++++------ .../servicegraphprocessor/processor_test.go | 61 ++++++++++++++++-- 4 files changed, 131 insertions(+), 24 deletions(-) create mode 100755 .chloggen/servicegraph-flush-interval.yaml diff --git a/.chloggen/servicegraph-flush-interval.yaml b/.chloggen/servicegraph-flush-interval.yaml new file mode 100755 index 000000000000..7fe79f8e03f2 --- /dev/null +++ b/.chloggen/servicegraph-flush-interval.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: servicegraphprocessor, servicegraphconnector + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add a config option to periodically flush metrics, instead of flushing on every push. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27679] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/servicegraphprocessor/config.go b/processor/servicegraphprocessor/config.go index 4357799bad88..ab2172bff0c1 100644 --- a/processor/servicegraphprocessor/config.go +++ b/processor/servicegraphprocessor/config.go @@ -34,6 +34,10 @@ type Config struct { StoreExpirationLoop time.Duration `mapstructure:"store_expiration_loop"` // VirtualNodePeerAttributes the list of attributes need to match, the higher the front, the higher the priority. VirtualNodePeerAttributes []string `mapstructure:"virtual_node_peer_attributes"` + + // MetricsFlushInterval is the interval at which metrics are flushed to the exporter. + // If set to 0, metrics are flushed on every received batch of traces. + MetricsFlushInterval time.Duration `mapstructure:"metrics_flush_interval"` } type StoreConfig struct { diff --git a/processor/servicegraphprocessor/processor.go b/processor/servicegraphprocessor/processor.go index 07f01f93606d..a68314ea0233 100644 --- a/processor/servicegraphprocessor/processor.go +++ b/processor/servicegraphprocessor/processor.go @@ -134,6 +134,8 @@ func (p *serviceGraphProcessor) Start(_ context.Context, host component.Host) er } } + go p.metricFlushLoop(p.config.MetricsFlushInterval) + go p.cacheLoop(p.config.CacheLoop) go p.storeExpirationLoop(p.config.StoreExpirationLoop) @@ -146,6 +148,41 @@ func (p *serviceGraphProcessor) Start(_ context.Context, host component.Host) er return nil } +func (p *serviceGraphProcessor) metricFlushLoop(flushInterval time.Duration) { + if flushInterval <= 0 { + return + } + + ticker := time.NewTicker(flushInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if err := p.flushMetrics(context.Background()); err != nil { + p.logger.Error("failed to flush metrics", zap.Error(err)) + } + case <-p.shutdownCh: + return + } + } +} + +func (p *serviceGraphProcessor) flushMetrics(ctx context.Context) error { + md, err := p.buildMetrics() + if err != nil { + return fmt.Errorf("failed to build metrics: %w", err) + } + + // Skip empty metrics. + if md.MetricCount() == 0 { + return nil + } + + // Firstly, export md to avoid being impacted by downstream trace serviceGraphProcessor errors/latency. + return p.metricsConsumer.ConsumeMetrics(ctx, md) +} + func (p *serviceGraphProcessor) Shutdown(_ context.Context) error { if p.tracesConsumer == nil { p.logger.Info("Shutting down servicegraphconnector") @@ -165,29 +202,17 @@ func (p *serviceGraphProcessor) ConsumeTraces(ctx context.Context, td ptrace.Tra return fmt.Errorf("failed to aggregate metrics: %w", err) } - md, err := p.buildMetrics() - if err != nil { - return fmt.Errorf("failed to build metrics: %w", err) - } - - // Skip empty metrics. - if md.MetricCount() == 0 { - if p.tracesConsumer != nil { - return p.tracesConsumer.ConsumeTraces(ctx, td) + // If metricsFlushInterval is not set, flush metrics immediately. + if p.config.MetricsFlushInterval <= 0 { + if err := p.flushMetrics(ctx); err != nil { + // Not return error here to avoid impacting traces. + p.logger.Error("failed to flush metrics", zap.Error(err)) } - return nil - } - - // true when p is a connector - if p.tracesConsumer == nil { - return p.metricsConsumer.ConsumeMetrics(ctx, md) } - // Firstly, export md to avoid being impacted by downstream trace serviceGraphProcessor errors/latency. - if err := p.metricsConsumer.ConsumeMetrics(ctx, md); err != nil { - return err + if p.tracesConsumer == nil { // True if p is a connector + return nil } - return p.tracesConsumer.ConsumeTraces(ctx, td) } diff --git a/processor/servicegraphprocessor/processor_test.go b/processor/servicegraphprocessor/processor_test.go index 3f5cb7c71550..159fc5916f87 100644 --- a/processor/servicegraphprocessor/processor_test.go +++ b/processor/servicegraphprocessor/processor_test.go @@ -6,6 +6,7 @@ package servicegraphprocessor import ( "context" "crypto/rand" + "sync" "testing" "time" @@ -140,7 +141,8 @@ func TestProcessorConsume(t *testing.T) { MaxItems: 10, TTL: time.Nanosecond, }, - }, sampleTraces: buildSampleTrace(t, "val"), + }, + sampleTraces: buildSampleTrace(t, "val"), verifyMetrics: verifyHappyCaseMetrics, }, { @@ -252,10 +254,47 @@ func TestConnectorConsume(t *testing.T) { assert.NoError(t, err) verifyHappyCaseMetrics(t, md) - // Shutdown the conn + // Shutdown the connector assert.NoError(t, conn.Shutdown(context.Background())) } +func TestProcessor_MetricsFlushInterval(t *testing.T) { + // Prepare + p := newProcessor(zaptest.NewLogger(t), &Config{ + MetricsExporter: "mock", + Dimensions: []string{"some-attribute", "non-existing-attribute"}, + Store: StoreConfig{ + MaxItems: 10, + TTL: time.Nanosecond, + }, + MetricsFlushInterval: 2 * time.Second, + }) + p.tracesConsumer = consumertest.NewNop() + + metricsExporter := newMockMetricsExporter() + mHost := newMockHost(map[component.DataType]map[component.ID]component.Component{ + component.DataTypeMetrics: { + component.NewID("mock"): metricsExporter, + }, + }) + + // Start processor + assert.NoError(t, p.Start(context.Background(), mHost)) + + // Push traces + assert.NoError(t, p.ConsumeTraces(context.Background(), buildSampleTrace(t, "val"))) + + // Metrics are not immediately flushed + assert.Len(t, metricsExporter.getMetrics(), 0) + + // Metrics are flushed after 2 seconds + assert.Eventuallyf(t, func() bool { return len(metricsExporter.getMetrics()) == 1 }, 5*time.Second, 100*time.Millisecond, "metrics are not flushed") + verifyHappyCaseMetrics(t, metricsExporter.getMetrics()[0]) + + // Shutdown the processor + assert.NoError(t, p.Shutdown(context.Background())) +} + func verifyHappyCaseMetrics(t *testing.T, md pmetric.Metrics) { assert.Equal(t, 3, md.MetricCount()) @@ -449,11 +488,14 @@ func (m *mockHost) GetExporters() map[component.DataType]map[component.ID]compon var _ exporter.Metrics = (*mockMetricsExporter)(nil) -func newMockMetricsExporter() exporter.Metrics { +func newMockMetricsExporter() *mockMetricsExporter { return &mockMetricsExporter{} } -type mockMetricsExporter struct{} +type mockMetricsExporter struct { + mtx sync.Mutex + md []pmetric.Metrics +} func (m *mockMetricsExporter) Start(context.Context, component.Host) error { return nil } @@ -461,10 +503,19 @@ func (m *mockMetricsExporter) Shutdown(context.Context) error { return nil } func (m *mockMetricsExporter) Capabilities() consumer.Capabilities { return consumer.Capabilities{} } -func (m *mockMetricsExporter) ConsumeMetrics(context.Context, pmetric.Metrics) error { +func (m *mockMetricsExporter) ConsumeMetrics(_ context.Context, md pmetric.Metrics) error { + m.mtx.Lock() + defer m.mtx.Unlock() + m.md = append(m.md, md) return nil } +func (m *mockMetricsExporter) getMetrics() []pmetric.Metrics { + m.mtx.Lock() + defer m.mtx.Unlock() + return m.md +} + func TestUpdateDurationMetrics(t *testing.T) { p := serviceGraphProcessor{ reqTotal: make(map[string]int64),