diff --git a/.chloggen/handle-half-cleaned-up-servicegraph-metric.yaml b/.chloggen/handle-half-cleaned-up-servicegraph-metric.yaml new file mode 100644 index 000000000000..431fb8d9740e --- /dev/null +++ b/.chloggen/handle-half-cleaned-up-servicegraph-metric.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: servicegraphprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix 'failed to find dimensions for key' error from race condition in metrics cleanup. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [ 31701 ] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/connector/servicegraphconnector/connector.go b/connector/servicegraphconnector/connector.go index a8b2818eb565..bd8a22a431be 100644 --- a/connector/servicegraphconnector/connector.go +++ b/connector/servicegraphconnector/connector.go @@ -666,12 +666,6 @@ func (p *serviceGraphConnector) cleanCache() { } p.metricMutex.RUnlock() - p.metricMutex.Lock() - for _, key := range staleSeries { - delete(p.keyToMetric, key) - } - p.metricMutex.Unlock() - p.seriesMutex.Lock() for _, key := range staleSeries { delete(p.reqTotal, key) @@ -684,6 +678,12 @@ func (p *serviceGraphConnector) cleanCache() { delete(p.reqServerDurationSecondsBucketCounts, key) } p.seriesMutex.Unlock() + + p.metricMutex.Lock() + for _, key := range staleSeries { + delete(p.keyToMetric, key) + } + p.metricMutex.Unlock() } // spanDuration returns the duration of the given span in seconds (legacy ms). diff --git a/connector/servicegraphconnector/connector_test.go b/connector/servicegraphconnector/connector_test.go index f395ac87062d..e2e7ac49e1fe 100644 --- a/connector/servicegraphconnector/connector_test.go +++ b/connector/servicegraphconnector/connector_test.go @@ -339,6 +339,73 @@ func TestStaleSeriesCleanup(t *testing.T) { assert.NoError(t, p.Shutdown(context.Background())) } +func TestMapsAreConsistentDuringCleanup(t *testing.T) { + // Prepare + cfg := &Config{ + MetricsExporter: "mock", + Dimensions: []string{"some-attribute", "non-existing-attribute"}, + Store: StoreConfig{ + MaxItems: 10, + TTL: time.Second, + }, + } + + mockMetricsExporter := newMockMetricsExporter() + + set := componenttest.NewNopTelemetrySettings() + set.Logger = zaptest.NewLogger(t) + p := newConnector(set, cfg) + + mHost := newMockHost(map[component.DataType]map[component.ID]component.Component{ + component.DataTypeMetrics: { + component.MustNewID("mock"): mockMetricsExporter, + }, + }) + + assert.NoError(t, p.Start(context.Background(), mHost)) + + // ConsumeTraces + td := buildSampleTrace(t, "first") + assert.NoError(t, p.ConsumeTraces(context.Background(), td)) + + // Make series stale and force a cache cleanup + for key, metric := range p.keyToMetric { + metric.lastUpdated = 0 + p.keyToMetric[key] = metric + } + + // Start cleanup, but use locks to pretend that we are: + // - currently collecting metrics (so seriesMutex is locked) + // - currently getting dimensions for that series (so metricMutex is locked) + p.seriesMutex.Lock() + p.metricMutex.RLock() + go p.cleanCache() + + // Since everything is locked, nothing has happened, so both should still have length 1 + assert.Equal(t, 1, len(p.reqTotal)) + assert.Equal(t, 1, len(p.keyToMetric)) + + // Now we pretend that we have stopped collecting metrics, by unlocking seriesMutex + p.seriesMutex.Unlock() + + // Make sure cleanupCache has continued to the next mutex + time.Sleep(time.Millisecond) + p.seriesMutex.Lock() + + // The expired series should have been removed. The metrics collector now won't look + // for dimensions from that series. It's important that it happens this way around, + // instead of deleting it from `keyToMetric`, otherwise the metrics collector will try + // and fail to find dimensions for a series that is about to be removed. + assert.Equal(t, 0, len(p.reqTotal)) + assert.Equal(t, 1, len(p.keyToMetric)) + + p.metricMutex.RUnlock() + p.seriesMutex.Unlock() + + // Shutdown the connector + assert.NoError(t, p.Shutdown(context.Background())) +} + func setupTelemetry(reader *sdkmetric.ManualReader) component.TelemetrySettings { settings := componenttest.NewNopTelemetrySettings() settings.MetricsLevel = configtelemetry.LevelNormal