Skip to content

Commit

Permalink
[servicegraphconnector] Add flush interval config (#27879)
Browse files Browse the repository at this point in the history
**Description:** Add a config option to periodically flush metrics,
instead of flushing on every push.

**Link to tracking Issue:** <Issue number if applicable> #27679

**Testing:** <Describe what testing was performed and which tests were
added.> Added tests that verify metrics are flushed asynchronously

**Documentation:** <Describe the documentation added.> Documentation
added to `config.go`

---------

Co-authored-by: Juraci Paixão Kröhling <juraci@kroehling.de>
  • Loading branch information
mapno and jpkrohling authored Oct 24, 2023
1 parent 3edf244 commit 3a27029
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 24 deletions.
27 changes: 27 additions & 0 deletions .chloggen/servicegraph-flush-interval.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: servicegraphprocessor, servicegraphconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add a config option to periodically flush metrics, instead of flushing on every push.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27679]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
4 changes: 4 additions & 0 deletions processor/servicegraphprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ type Config struct {
StoreExpirationLoop time.Duration `mapstructure:"store_expiration_loop"`
// VirtualNodePeerAttributes the list of attributes need to match, the higher the front, the higher the priority.
VirtualNodePeerAttributes []string `mapstructure:"virtual_node_peer_attributes"`

// MetricsFlushInterval is the interval at which metrics are flushed to the exporter.
// If set to 0, metrics are flushed on every received batch of traces.
MetricsFlushInterval time.Duration `mapstructure:"metrics_flush_interval"`
}

type StoreConfig struct {
Expand Down
63 changes: 44 additions & 19 deletions processor/servicegraphprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ func (p *serviceGraphProcessor) Start(_ context.Context, host component.Host) er
}
}

go p.metricFlushLoop(p.config.MetricsFlushInterval)

go p.cacheLoop(p.config.CacheLoop)

go p.storeExpirationLoop(p.config.StoreExpirationLoop)
Expand All @@ -146,6 +148,41 @@ func (p *serviceGraphProcessor) Start(_ context.Context, host component.Host) er
return nil
}

func (p *serviceGraphProcessor) metricFlushLoop(flushInterval time.Duration) {
if flushInterval <= 0 {
return
}

ticker := time.NewTicker(flushInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if err := p.flushMetrics(context.Background()); err != nil {
p.logger.Error("failed to flush metrics", zap.Error(err))
}
case <-p.shutdownCh:
return
}
}
}

func (p *serviceGraphProcessor) flushMetrics(ctx context.Context) error {
md, err := p.buildMetrics()
if err != nil {
return fmt.Errorf("failed to build metrics: %w", err)
}

// Skip empty metrics.
if md.MetricCount() == 0 {
return nil
}

// Firstly, export md to avoid being impacted by downstream trace serviceGraphProcessor errors/latency.
return p.metricsConsumer.ConsumeMetrics(ctx, md)
}

func (p *serviceGraphProcessor) Shutdown(_ context.Context) error {
if p.tracesConsumer == nil {
p.logger.Info("Shutting down servicegraphconnector")
Expand All @@ -165,29 +202,17 @@ func (p *serviceGraphProcessor) ConsumeTraces(ctx context.Context, td ptrace.Tra
return fmt.Errorf("failed to aggregate metrics: %w", err)
}

md, err := p.buildMetrics()
if err != nil {
return fmt.Errorf("failed to build metrics: %w", err)
}

// Skip empty metrics.
if md.MetricCount() == 0 {
if p.tracesConsumer != nil {
return p.tracesConsumer.ConsumeTraces(ctx, td)
// If metricsFlushInterval is not set, flush metrics immediately.
if p.config.MetricsFlushInterval <= 0 {
if err := p.flushMetrics(ctx); err != nil {
// Not return error here to avoid impacting traces.
p.logger.Error("failed to flush metrics", zap.Error(err))
}
return nil
}

// true when p is a connector
if p.tracesConsumer == nil {
return p.metricsConsumer.ConsumeMetrics(ctx, md)
}

// Firstly, export md to avoid being impacted by downstream trace serviceGraphProcessor errors/latency.
if err := p.metricsConsumer.ConsumeMetrics(ctx, md); err != nil {
return err
if p.tracesConsumer == nil { // True if p is a connector
return nil
}

return p.tracesConsumer.ConsumeTraces(ctx, td)
}

Expand Down
61 changes: 56 additions & 5 deletions processor/servicegraphprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package servicegraphprocessor
import (
"context"
"crypto/rand"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -140,7 +141,8 @@ func TestProcessorConsume(t *testing.T) {
MaxItems: 10,
TTL: time.Nanosecond,
},
}, sampleTraces: buildSampleTrace(t, "val"),
},
sampleTraces: buildSampleTrace(t, "val"),
verifyMetrics: verifyHappyCaseMetrics,
},
{
Expand Down Expand Up @@ -252,10 +254,47 @@ func TestConnectorConsume(t *testing.T) {
assert.NoError(t, err)
verifyHappyCaseMetrics(t, md)

// Shutdown the conn
// Shutdown the connector
assert.NoError(t, conn.Shutdown(context.Background()))
}

func TestProcessor_MetricsFlushInterval(t *testing.T) {
// Prepare
p := newProcessor(zaptest.NewLogger(t), &Config{
MetricsExporter: "mock",
Dimensions: []string{"some-attribute", "non-existing-attribute"},
Store: StoreConfig{
MaxItems: 10,
TTL: time.Nanosecond,
},
MetricsFlushInterval: 2 * time.Second,
})
p.tracesConsumer = consumertest.NewNop()

metricsExporter := newMockMetricsExporter()
mHost := newMockHost(map[component.DataType]map[component.ID]component.Component{
component.DataTypeMetrics: {
component.NewID("mock"): metricsExporter,
},
})

// Start processor
assert.NoError(t, p.Start(context.Background(), mHost))

// Push traces
assert.NoError(t, p.ConsumeTraces(context.Background(), buildSampleTrace(t, "val")))

// Metrics are not immediately flushed
assert.Len(t, metricsExporter.getMetrics(), 0)

// Metrics are flushed after 2 seconds
assert.Eventuallyf(t, func() bool { return len(metricsExporter.getMetrics()) == 1 }, 5*time.Second, 100*time.Millisecond, "metrics are not flushed")
verifyHappyCaseMetrics(t, metricsExporter.getMetrics()[0])

// Shutdown the processor
assert.NoError(t, p.Shutdown(context.Background()))
}

func verifyHappyCaseMetrics(t *testing.T, md pmetric.Metrics) {
assert.Equal(t, 3, md.MetricCount())

Expand Down Expand Up @@ -449,22 +488,34 @@ func (m *mockHost) GetExporters() map[component.DataType]map[component.ID]compon

var _ exporter.Metrics = (*mockMetricsExporter)(nil)

func newMockMetricsExporter() exporter.Metrics {
func newMockMetricsExporter() *mockMetricsExporter {
return &mockMetricsExporter{}
}

type mockMetricsExporter struct{}
type mockMetricsExporter struct {
mtx sync.Mutex
md []pmetric.Metrics
}

func (m *mockMetricsExporter) Start(context.Context, component.Host) error { return nil }

func (m *mockMetricsExporter) Shutdown(context.Context) error { return nil }

func (m *mockMetricsExporter) Capabilities() consumer.Capabilities { return consumer.Capabilities{} }

func (m *mockMetricsExporter) ConsumeMetrics(context.Context, pmetric.Metrics) error {
func (m *mockMetricsExporter) ConsumeMetrics(_ context.Context, md pmetric.Metrics) error {
m.mtx.Lock()
defer m.mtx.Unlock()
m.md = append(m.md, md)
return nil
}

func (m *mockMetricsExporter) getMetrics() []pmetric.Metrics {
m.mtx.Lock()
defer m.mtx.Unlock()
return m.md
}

func TestUpdateDurationMetrics(t *testing.T) {
p := serviceGraphProcessor{
reqTotal: make(map[string]int64),
Expand Down

0 comments on commit 3a27029

Please sign in to comment.