Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/azuremonitor] Fix memory leaks on shutdown #30801

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion exporter/azuremonitorexporter/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,13 @@

package azuremonitorexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/azuremonitorexporter"

import "github.com/microsoft/ApplicationInsights-Go/appinsights/contracts"
import (
"time"

"github.com/microsoft/ApplicationInsights-Go/appinsights/contracts"
)

type transportChannel interface {
Send(*contracts.Envelope)
Close(retryTimeout ...time.Duration) <-chan struct{}
}
81 changes: 81 additions & 0 deletions exporter/azuremonitorexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func TestCreateTracesExporterUsingDefaultTransportChannel(t *testing.T) {
assert.NotNil(t, exporter)
assert.NoError(t, err)
assert.NotNil(t, f.tChannel)
assert.NoError(t, exporter.Shutdown(ctx))
}

func TestCreateTracesExporterUsingBadConfig(t *testing.T) {
Expand All @@ -53,3 +54,83 @@ func TestCreateTracesExporterUsingBadConfig(t *testing.T) {
assert.Nil(t, exporter)
assert.Error(t, err)
}

func TestCreateLogsExporterUsingSpecificTransportChannel(t *testing.T) {
// mock transport channel creation
f := factory{tChannel: &mockTransportChannel{}}
ctx := context.Background()
params := exportertest.NewNopCreateSettings()
config := createDefaultConfig().(*Config)
config.ConnectionString = "InstrumentationKey=test-key;IngestionEndpoint=https://test-endpoint/"
exporter, err := f.createLogsExporter(ctx, params, config)
assert.NotNil(t, exporter)
assert.NoError(t, err)
}

func TestCreateLogsExporterUsingDefaultTransportChannel(t *testing.T) {
// We get the default transport channel creation, if we don't specify one during f creation
f := factory{}
assert.Nil(t, f.tChannel)
ctx := context.Background()
config := createDefaultConfig().(*Config)
config.ConnectionString = "InstrumentationKey=test-key;IngestionEndpoint=https://test-endpoint/"
exporter, err := f.createLogsExporter(ctx, exportertest.NewNopCreateSettings(), config)
assert.NotNil(t, exporter)
assert.NoError(t, err)
assert.NotNil(t, f.tChannel)
assert.NoError(t, exporter.Shutdown(ctx))
}

func TestCreateLogsExporterUsingBadConfig(t *testing.T) {
// We get the default transport channel creation, if we don't specify one during factory creation
f := factory{}
assert.Nil(t, f.tChannel)
ctx := context.Background()
params := exportertest.NewNopCreateSettings()

badConfig := &badConfig{}

exporter, err := f.createLogsExporter(ctx, params, badConfig)
assert.Nil(t, exporter)
assert.Error(t, err)
}

func TestCreateMetricsExporterUsingSpecificTransportChannel(t *testing.T) {
// mock transport channel creation
f := factory{tChannel: &mockTransportChannel{}}
ctx := context.Background()
params := exportertest.NewNopCreateSettings()
config := createDefaultConfig().(*Config)
config.ConnectionString = "InstrumentationKey=test-key;IngestionEndpoint=https://test-endpoint/"
exporter, err := f.createMetricsExporter(ctx, params, config)
assert.NotNil(t, exporter)
assert.NoError(t, err)
}

func TestCreateMetricsExporterUsingDefaultTransportChannel(t *testing.T) {
// We get the default transport channel creation, if we don't specify one during f creation
f := factory{}
assert.Nil(t, f.tChannel)
ctx := context.Background()
config := createDefaultConfig().(*Config)
config.ConnectionString = "InstrumentationKey=test-key;IngestionEndpoint=https://test-endpoint/"
exporter, err := f.createMetricsExporter(ctx, exportertest.NewNopCreateSettings(), config)
assert.NotNil(t, exporter)
assert.NoError(t, err)
assert.NotNil(t, f.tChannel)
assert.NoError(t, exporter.Shutdown(ctx))
}

func TestCreateMetricsExporterUsingBadConfig(t *testing.T) {
// We get the default transport channel creation, if we don't specify one during factory creation
f := factory{}
assert.Nil(t, f.tChannel)
ctx := context.Background()
params := exportertest.NewNopCreateSettings()

badConfig := &badConfig{}

exporter, err := f.createMetricsExporter(ctx, params, badConfig)
assert.Nil(t, exporter)
assert.Error(t, err)
}
1 change: 1 addition & 0 deletions exporter/azuremonitorexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
go.opentelemetry.io/collector/semconv v0.93.0
go.opentelemetry.io/otel/metric v1.22.0
go.opentelemetry.io/otel/trace v1.22.0
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.26.0
golang.org/x/net v0.20.0
)
Expand Down
1 change: 1 addition & 0 deletions exporter/azuremonitorexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions exporter/azuremonitorexporter/logexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package azuremonitorexporter // import "github.com/open-telemetry/opentelemetry-

import (
"context"
"time"

"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
Expand Down Expand Up @@ -39,6 +40,21 @@ func (exporter *logExporter) onLogData(_ context.Context, logData plog.Logs) err
return nil
}

func (exporter *logExporter) Shutdown(_ context.Context) error {
shutdownTimeout := 1 * time.Second

select {
case <-exporter.transportChannel.Close():
return nil
case <-time.After(shutdownTimeout):
// Currently, due to a dependency's bug (https://github.com/microsoft/ApplicationInsights-Go/issues/70),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if you disagree with my reasoning here, happy to discuss further.

This is not an error might be a bit misleading as technically we want the channel Close returns to send a message before the timeout is hit, but I think it captures the idea.

// the timeout will always be hit before Close is complete. This is not an error, but it will leak a goroutine.
// There's nothing that we can do about this for now, so there's no reason to log or return an error
// here.
return nil
}
}

// Returns a new instance of the log exporter
func newLogsExporter(config *Config, transportChannel transportChannel, set exporter.CreateSettings) (exporter.Logs, error) {
exporter := &logExporter{
Expand All @@ -53,5 +69,6 @@ func newLogsExporter(config *Config, transportChannel transportChannel, set expo
config,
exporter.onLogData,
exporterhelper.WithQueue(config.QueueSettings),
exporterhelper.WithShutdown(exporter.Shutdown),
)
}
19 changes: 18 additions & 1 deletion exporter/azuremonitorexporter/metricexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package azuremonitorexporter // import "github.com/open-telemetry/opentelemetry-

import (
"context"
"time"

"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
Expand Down Expand Up @@ -40,6 +41,21 @@ func (exporter *metricExporter) onMetricData(_ context.Context, metricData pmetr
return nil
}

func (exporter *metricExporter) Shutdown(_ context.Context) error {
shutdownTimeout := 1 * time.Second

select {
case <-exporter.transportChannel.Close():
return nil
case <-time.After(shutdownTimeout):
// Currently, due to a dependency's bug (https://github.com/microsoft/ApplicationInsights-Go/issues/70),
// the timeout will always be hit before Close is complete. This is not an error, but it will leak a goroutine.
// There's nothing that we can do about this for now, so there's no reason to log or return an error
// here.
return nil
}
}

// Returns a new instance of the metric exporter
func newMetricsExporter(config *Config, transportChannel transportChannel, set exporter.CreateSettings) (exporter.Metrics, error) {
exporter := &metricExporter{
Expand All @@ -54,5 +70,6 @@ func newMetricsExporter(config *Config, transportChannel transportChannel, set e
set,
config,
exporter.onMetricData,
exporterhelper.WithQueue(config.QueueSettings))
exporterhelper.WithQueue(config.QueueSettings),
exporterhelper.WithShutdown(exporter.Shutdown))
}
44 changes: 43 additions & 1 deletion exporter/azuremonitorexporter/mock_transportChannel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions exporter/azuremonitorexporter/package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package azuremonitorexporter

import (
"testing"

"go.uber.org/goleak"
)

// Context for ignore opencensus leak: https://github.com/census-instrumentation/opencensus-go/issues/1191
// Context for ignore ApplicationInsights-Go leak: https://github.com/microsoft/ApplicationInsights-Go/issues/70
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m,
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("github.com/microsoft/ApplicationInsights-Go/appinsights.(*throttleManager).Stop"),
)
}
19 changes: 18 additions & 1 deletion exporter/azuremonitorexporter/traceexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package azuremonitorexporter // import "github.com/open-telemetry/opentelemetry-

import (
"context"
"time"

"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
Expand Down Expand Up @@ -62,6 +63,21 @@ func (exporter *traceExporter) onTraceData(_ context.Context, traceData ptrace.T
return visitor.err
}

func (exporter *traceExporter) Shutdown(_ context.Context) error {
shutdownTimeout := 1 * time.Second

select {
case <-exporter.transportChannel.Close():
return nil
case <-time.After(shutdownTimeout):
// Currently, due to a dependency's bug (https://github.com/microsoft/ApplicationInsights-Go/issues/70),
// the timeout will always be hit before Close is complete. This is not an error, but it will leak a goroutine.
// There's nothing that we can do about this for now, so there's no reason to log or return an error
// here.
return nil
}
}

// Returns a new instance of the trace exporter
func newTracesExporter(config *Config, transportChannel transportChannel, set exporter.CreateSettings) (exporter.Traces, error) {
exporter := &traceExporter{
Expand All @@ -75,5 +91,6 @@ func newTracesExporter(config *Config, transportChannel transportChannel, set ex
set,
config,
exporter.onTraceData,
exporterhelper.WithQueue(config.QueueSettings))
exporterhelper.WithQueue(config.QueueSettings),
exporterhelper.WithShutdown(exporter.Shutdown))
}
1 change: 1 addition & 0 deletions exporter/azuremonitorexporter/traceexporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func TestExporterTraceDataCallbackSingleSpanNoEnvelope(t *testing.T) {
func getMockTransportChannel() *mockTransportChannel {
transportChannelMock := mockTransportChannel{}
transportChannelMock.On("Send", mock.Anything)
transportChannelMock.On("Close", mock.Anything)
return &transportChannelMock
}

Expand Down
Loading