diff --git a/CHANGELOG.md b/CHANGELOG.md index 2390da03225..5379b63e499 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,6 +53,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Log a suggested view that fixes instrument conflicts in `go.opentelemetry.io/otel/sdk/metric`. (#4349) - Fix possible panic, deadlock and race condition in batch span processor in `go.opentelemetry.io/otel/sdk/trace`. (#4353) - Improve context cancelation handling in batch span processor's `ForceFlush` in `go.opentelemetry.io/otel/sdk/trace`. (#4369) +- Do not block the metric SDK when OTLP metric exports are blocked in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc` and `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#3925, #4395) ## [1.16.0/0.39.0] 2023-05-18 diff --git a/exporters/otlp/otlpmetric/internal/exporter.go b/exporters/otlp/otlpmetric/internal/exporter.go index fe4b4a7868b..f4428ac556f 100644 --- a/exporters/otlp/otlpmetric/internal/exporter.go +++ b/exporters/otlp/otlpmetric/internal/exporter.go @@ -27,6 +27,11 @@ import ( ) // Exporter exports metrics data as OTLP. +// +// Deprecated: Exporter exists for historical compatibility, it should not be +// used. Do not remove Exporter unless the whole +// "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal" module is +// removed. type Exporter struct { // Ensure synchronous access to the client across all functionality. clientMu sync.Mutex @@ -96,6 +101,11 @@ func (e *Exporter) Shutdown(ctx context.Context) error { // New return an Exporter that uses client to transmits the OTLP data it // produces. The client is assumed to be fully started and able to communicate // with its OTLP receiving endpoint. +// +// Deprecated: New exists for historical compatibility, it should not be used. +// Do not remove New unless the whole +// "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal" module is +// removed. func New(client Client) *Exporter { return &Exporter{client: client} } diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go index 34522669f47..e4e97acb750 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go @@ -27,11 +27,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/internal" "go.opentelemetry.io/otel/exporters/otlp/internal/retry" - ominternal "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf" - "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/metric/aggregation" - "go.opentelemetry.io/otel/sdk/metric/metricdata" colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) @@ -41,9 +37,6 @@ type client struct { exportTimeout time.Duration requestFunc retry.RequestFunc - temporalitySelector metric.TemporalitySelector - aggregationSelector metric.AggregationSelector - // ourConn keeps track of where conn was created: true if created here in // NewClient, or false if passed with an option. This is important on // Shutdown as the conn should only be closed if we created it. Otherwise, @@ -54,16 +47,11 @@ type client struct { } // newClient creates a new gRPC metric client. -func newClient(ctx context.Context, options ...Option) (ominternal.Client, error) { - cfg := oconf.NewGRPCConfig(asGRPCOptions(options)...) - +func newClient(ctx context.Context, cfg oconf.Config) (*client, error) { c := &client{ exportTimeout: cfg.Metrics.Timeout, requestFunc: cfg.RetryConfig.RequestFunc(retryable), conn: cfg.GRPCConn, - - temporalitySelector: cfg.Metrics.TemporalitySelector, - aggregationSelector: cfg.Metrics.AggregationSelector, } if len(cfg.Metrics.Headers) > 0 { @@ -88,19 +76,6 @@ func newClient(ctx context.Context, options ...Option) (ominternal.Client, error return c, nil } -// Temporality returns the Temporality to use for an instrument kind. -func (c *client) Temporality(k metric.InstrumentKind) metricdata.Temporality { - return c.temporalitySelector(k) -} - -// Aggregation returns the Aggregation to use for an instrument kind. -func (c *client) Aggregation(k metric.InstrumentKind) aggregation.Aggregation { - return c.aggregationSelector(k) -} - -// ForceFlush does nothing, the client holds no state. -func (c *client) ForceFlush(ctx context.Context) error { return ctx.Err() } - // Shutdown shuts down the client, freeing all resource. // // Any active connections to a remote endpoint are closed if they were created diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go index 06e5fee15e9..02f87fb088a 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go @@ -28,8 +28,10 @@ import ( "google.golang.org/protobuf/types/known/durationpb" ominternal "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otest" "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -129,6 +131,20 @@ func TestRetryable(t *testing.T) { } } +type clientShim struct { + *client +} + +func (clientShim) Temporality(metric.InstrumentKind) metricdata.Temporality { + return metricdata.CumulativeTemporality +} +func (clientShim) Aggregation(metric.InstrumentKind) aggregation.Aggregation { + return nil +} +func (clientShim) ForceFlush(ctx context.Context) error { + return ctx.Err() +} + func TestClient(t *testing.T) { factory := func(rCh <-chan otest.ExportResult) (ominternal.Client, otest.Collector) { coll, err := otest.NewGRPCCollector("", rCh) @@ -136,9 +152,11 @@ func TestClient(t *testing.T) { ctx := context.Background() addr := coll.Addr().String() - client, err := newClient(ctx, WithEndpoint(addr), WithInsecure()) + opts := []Option{WithEndpoint(addr), WithInsecure()} + cfg := oconf.NewGRPCConfig(asGRPCOptions(opts)...) + client, err := newClient(ctx, cfg) require.NoError(t, err) - return client, coll + return clientShim{client}, coll } t.Run("Integration", otest.RunClientTests(factory)) diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/exporter.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/exporter.go index 1f28bfd6a59..8b228cc5a90 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/exporter.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/exporter.go @@ -16,27 +16,62 @@ package otlpmetricgrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlpme import ( "context" + "fmt" + "sync" - ominternal "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/transform" "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" + metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) // Exporter is a OpenTelemetry metric Exporter using gRPC. type Exporter struct { - wrapped *ominternal.Exporter + // Ensure synchronous access to the client across all functionality. + clientMu sync.Mutex + client interface { + UploadMetrics(context.Context, *metricpb.ResourceMetrics) error + Shutdown(context.Context) error + } + + temporalitySelector metric.TemporalitySelector + aggregationSelector metric.AggregationSelector + + shutdownOnce sync.Once +} + +func newExporter(c *client, cfg oconf.Config) (*Exporter, error) { + ts := cfg.Metrics.TemporalitySelector + if ts == nil { + ts = func(metric.InstrumentKind) metricdata.Temporality { + return metricdata.CumulativeTemporality + } + } + + as := cfg.Metrics.AggregationSelector + if as == nil { + as = metric.DefaultAggregationSelector + } + + return &Exporter{ + client: c, + + temporalitySelector: ts, + aggregationSelector: as, + }, nil } // Temporality returns the Temporality to use for an instrument kind. func (e *Exporter) Temporality(k metric.InstrumentKind) metricdata.Temporality { - return e.wrapped.Temporality(k) + return e.temporalitySelector(k) } // Aggregation returns the Aggregation to use for an instrument kind. func (e *Exporter) Aggregation(k metric.InstrumentKind) aggregation.Aggregation { - return e.wrapped.Aggregation(k) + return e.aggregationSelector(k) } // Export transforms and transmits metric data to an OTLP receiver. @@ -44,8 +79,20 @@ func (e *Exporter) Aggregation(k metric.InstrumentKind) aggregation.Aggregation // This method returns an error if called after Shutdown. // This method returns an error if the method is canceled by the passed context. func (e *Exporter) Export(ctx context.Context, rm *metricdata.ResourceMetrics) error { - err := e.wrapped.Export(ctx, rm) - global.Debug("OTLP/gRPC exporter export", "Data", rm) + defer global.Debug("OTLP/gRPC exporter export", "Data", rm) + + otlpRm, err := transform.ResourceMetrics(rm) + // Best effort upload of transformable metrics. + e.clientMu.Lock() + upErr := e.client.UploadMetrics(ctx, otlpRm) + e.clientMu.Unlock() + if upErr != nil { + if err == nil { + return fmt.Errorf("failed to upload metrics: %w", upErr) + } + // Merge the two errors. + return fmt.Errorf("failed to upload incomplete metrics (%s): %w", err, upErr) + } return err } @@ -56,7 +103,8 @@ func (e *Exporter) Export(ctx context.Context, rm *metricdata.ResourceMetrics) e // // This method is safe to call concurrently. func (e *Exporter) ForceFlush(ctx context.Context) error { - return e.wrapped.ForceFlush(ctx) + // The exporter and client hold no state, nothing to flush. + return ctx.Err() } // Shutdown flushes all metric data held by an exporter and releases any held @@ -67,7 +115,34 @@ func (e *Exporter) ForceFlush(ctx context.Context) error { // // This method is safe to call concurrently. func (e *Exporter) Shutdown(ctx context.Context) error { - return e.wrapped.Shutdown(ctx) + err := errShutdown + e.shutdownOnce.Do(func() { + e.clientMu.Lock() + client := e.client + e.client = shutdownClient{} + e.clientMu.Unlock() + err = client.Shutdown(ctx) + }) + return err +} + +var errShutdown = fmt.Errorf("gRPC exporter is shutdown") + +type shutdownClient struct{} + +func (c shutdownClient) err(ctx context.Context) error { + if err := ctx.Err(); err != nil { + return err + } + return errShutdown +} + +func (c shutdownClient) UploadMetrics(ctx context.Context, _ *metricpb.ResourceMetrics) error { + return c.err(ctx) +} + +func (c shutdownClient) Shutdown(ctx context.Context) error { + return c.err(ctx) } // MarshalLog returns logging data about the Exporter. @@ -84,10 +159,10 @@ func (e *Exporter) MarshalLog() interface{} { // on options. If a connection cannot be establishes in the lifetime of ctx, // an error will be returned. func New(ctx context.Context, options ...Option) (*Exporter, error) { - c, err := newClient(ctx, options...) + cfg := oconf.NewGRPCConfig(asGRPCOptions(options)...) + c, err := newClient(ctx, cfg) if err != nil { return nil, err } - exp := ominternal.New(c) - return &Exporter{exp}, nil + return newExporter(c, cfg) } diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/exporter_test.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/exporter_test.go new file mode 100644 index 00000000000..299b5e33c8b --- /dev/null +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/exporter_test.go @@ -0,0 +1,124 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlpmetricgrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otest" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +func TestExporterClientConcurrentSafe(t *testing.T) { + const goroutines = 5 + + coll, err := otest.NewGRPCCollector("", nil) + require.NoError(t, err) + + ctx := context.Background() + addr := coll.Addr().String() + opts := []Option{WithEndpoint(addr), WithInsecure()} + cfg := oconf.NewGRPCConfig(asGRPCOptions(opts)...) + client, err := newClient(ctx, cfg) + require.NoError(t, err) + + exp, err := newExporter(client, oconf.Config{}) + require.NoError(t, err) + rm := new(metricdata.ResourceMetrics) + + done := make(chan struct{}) + first := make(chan struct{}, goroutines) + var wg sync.WaitGroup + for i := 0; i < goroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + assert.NoError(t, exp.Export(ctx, rm)) + assert.NoError(t, exp.ForceFlush(ctx)) + // Ensure some work is done before shutting down. + first <- struct{}{} + + for { + _ = exp.Export(ctx, rm) + _ = exp.ForceFlush(ctx) + + select { + case <-done: + return + default: + } + } + }() + } + + for i := 0; i < goroutines; i++ { + <-first + } + close(first) + assert.NoError(t, exp.Shutdown(ctx)) + assert.ErrorIs(t, exp.Shutdown(ctx), errShutdown) + + close(done) + wg.Wait() +} + +func TestExporterDoesNotBlockTemporalityAndAggregation(t *testing.T) { + rCh := make(chan otest.ExportResult, 1) + coll, err := otest.NewGRPCCollector("", rCh) + require.NoError(t, err) + + ctx := context.Background() + addr := coll.Addr().String() + opts := []Option{WithEndpoint(addr), WithInsecure()} + cfg := oconf.NewGRPCConfig(asGRPCOptions(opts)...) + client, err := newClient(ctx, cfg) + require.NoError(t, err) + + exp, err := newExporter(client, oconf.Config{}) + require.NoError(t, err) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + rm := new(metricdata.ResourceMetrics) + t.Log("starting export") + require.NoError(t, exp.Export(ctx, rm)) + t.Log("export complete") + }() + + assert.Eventually(t, func() bool { + const inst = metric.InstrumentKindCounter + // These should not be blocked. + t.Log("getting temporality") + _ = exp.Temporality(inst) + t.Log("getting aggregation") + _ = exp.Aggregation(inst) + return true + }, time.Second, 10*time.Millisecond) + + // Clear the export. + rCh <- otest.ExportResult{} + close(rCh) + wg.Wait() +} diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/client.go b/exporters/otlp/otlpmetric/otlpmetrichttp/client.go index 81d84995aeb..01b7575eeba 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/client.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/client.go @@ -34,9 +34,6 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/internal/retry" ominternal "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf" - "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/metric/aggregation" - "go.opentelemetry.io/otel/sdk/metric/metricdata" colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) @@ -47,9 +44,6 @@ type client struct { compression Compression requestFunc retry.RequestFunc httpClient *http.Client - - temporalitySelector metric.TemporalitySelector - aggregationSelector metric.AggregationSelector } // Keep it in sync with golang's DefaultTransport from net/http! We @@ -70,9 +64,7 @@ var ourTransport = &http.Transport{ } // newClient creates a new HTTP metric client. -func newClient(opts ...Option) (ominternal.Client, error) { - cfg := oconf.NewHTTPConfig(asHTTPOptions(opts)...) - +func newClient(cfg oconf.Config) (*client, error) { httpClient := &http.Client{ Transport: ourTransport, Timeout: cfg.Metrics.Timeout, @@ -111,25 +103,9 @@ func newClient(opts ...Option) (ominternal.Client, error) { req: req, requestFunc: cfg.RetryConfig.RequestFunc(evaluate), httpClient: httpClient, - - temporalitySelector: cfg.Metrics.TemporalitySelector, - aggregationSelector: cfg.Metrics.AggregationSelector, }, nil } -// Temporality returns the Temporality to use for an instrument kind. -func (c *client) Temporality(k metric.InstrumentKind) metricdata.Temporality { - return c.temporalitySelector(k) -} - -// Aggregation returns the Aggregation to use for an instrument kind. -func (c *client) Aggregation(k metric.InstrumentKind) aggregation.Aggregation { - return c.aggregationSelector(k) -} - -// ForceFlush does nothing, the client holds no state. -func (c *client) ForceFlush(ctx context.Context) error { return ctx.Err() } - // Shutdown shuts down the client, freeing all resources. func (c *client) Shutdown(ctx context.Context) error { // The otlpmetric.Exporter synchronizes access to client methods and diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go b/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go index ec6bdd75916..ab89bd27a9a 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go @@ -28,20 +28,38 @@ import ( "github.com/stretchr/testify/require" ominternal "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otest" "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) +type clientShim struct { + *client +} + +func (clientShim) Temporality(metric.InstrumentKind) metricdata.Temporality { + return metricdata.CumulativeTemporality +} +func (clientShim) Aggregation(metric.InstrumentKind) aggregation.Aggregation { + return nil +} +func (clientShim) ForceFlush(ctx context.Context) error { + return ctx.Err() +} + func TestClient(t *testing.T) { factory := func(rCh <-chan otest.ExportResult) (ominternal.Client, otest.Collector) { coll, err := otest.NewHTTPCollector("", rCh) require.NoError(t, err) addr := coll.Addr().String() - client, err := newClient(WithEndpoint(addr), WithInsecure()) + opts := []Option{WithEndpoint(addr), WithInsecure()} + cfg := oconf.NewHTTPConfig(asHTTPOptions(opts)...) + client, err := newClient(cfg) require.NoError(t, err) - return client, coll + return clientShim{client}, coll } t.Run("Integration", otest.RunClientTests(factory)) diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/exporter.go b/exporters/otlp/otlpmetric/otlpmetrichttp/exporter.go index 47d3c469825..be21d6b004e 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/exporter.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/exporter.go @@ -16,27 +16,62 @@ package otlpmetrichttp // import "go.opentelemetry.io/otel/exporters/otlp/otlpme import ( "context" + "fmt" + "sync" - ominternal "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/transform" "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" + metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) // Exporter is a OpenTelemetry metric Exporter using protobufs over HTTP. type Exporter struct { - wrapped *ominternal.Exporter + // Ensure synchronous access to the client across all functionality. + clientMu sync.Mutex + client interface { + UploadMetrics(context.Context, *metricpb.ResourceMetrics) error + Shutdown(context.Context) error + } + + temporalitySelector metric.TemporalitySelector + aggregationSelector metric.AggregationSelector + + shutdownOnce sync.Once +} + +func newExporter(c *client, cfg oconf.Config) (*Exporter, error) { + ts := cfg.Metrics.TemporalitySelector + if ts == nil { + ts = func(metric.InstrumentKind) metricdata.Temporality { + return metricdata.CumulativeTemporality + } + } + + as := cfg.Metrics.AggregationSelector + if as == nil { + as = metric.DefaultAggregationSelector + } + + return &Exporter{ + client: c, + + temporalitySelector: ts, + aggregationSelector: as, + }, nil } // Temporality returns the Temporality to use for an instrument kind. func (e *Exporter) Temporality(k metric.InstrumentKind) metricdata.Temporality { - return e.wrapped.Temporality(k) + return e.temporalitySelector(k) } // Aggregation returns the Aggregation to use for an instrument kind. func (e *Exporter) Aggregation(k metric.InstrumentKind) aggregation.Aggregation { - return e.wrapped.Aggregation(k) + return e.aggregationSelector(k) } // Export transforms and transmits metric data to an OTLP receiver. @@ -44,8 +79,20 @@ func (e *Exporter) Aggregation(k metric.InstrumentKind) aggregation.Aggregation // This method returns an error if called after Shutdown. // This method returns an error if the method is canceled by the passed context. func (e *Exporter) Export(ctx context.Context, rm *metricdata.ResourceMetrics) error { - err := e.wrapped.Export(ctx, rm) - global.Debug("OTLP/HTTP exporter export", "Data", rm) + defer global.Debug("OTLP/HTTP exporter export", "Data", rm) + + otlpRm, err := transform.ResourceMetrics(rm) + // Best effort upload of transformable metrics. + e.clientMu.Lock() + upErr := e.client.UploadMetrics(ctx, otlpRm) + e.clientMu.Unlock() + if upErr != nil { + if err == nil { + return fmt.Errorf("failed to upload metrics: %w", upErr) + } + // Merge the two errors. + return fmt.Errorf("failed to upload incomplete metrics (%s): %w", err, upErr) + } return err } @@ -56,7 +103,8 @@ func (e *Exporter) Export(ctx context.Context, rm *metricdata.ResourceMetrics) e // // This method is safe to call concurrently. func (e *Exporter) ForceFlush(ctx context.Context) error { - return e.wrapped.ForceFlush(ctx) + // The exporter and client hold no state, nothing to flush. + return ctx.Err() } // Shutdown flushes all metric data held by an exporter and releases any held @@ -67,7 +115,34 @@ func (e *Exporter) ForceFlush(ctx context.Context) error { // // This method is safe to call concurrently. func (e *Exporter) Shutdown(ctx context.Context) error { - return e.wrapped.Shutdown(ctx) + err := errShutdown + e.shutdownOnce.Do(func() { + e.clientMu.Lock() + client := e.client + e.client = shutdownClient{} + e.clientMu.Unlock() + err = client.Shutdown(ctx) + }) + return err +} + +var errShutdown = fmt.Errorf("HTTP exporter is shutdown") + +type shutdownClient struct{} + +func (c shutdownClient) err(ctx context.Context) error { + if err := ctx.Err(); err != nil { + return err + } + return errShutdown +} + +func (c shutdownClient) UploadMetrics(ctx context.Context, _ *metricpb.ResourceMetrics) error { + return c.err(ctx) +} + +func (c shutdownClient) Shutdown(ctx context.Context) error { + return c.err(ctx) } // MarshalLog returns logging data about the Exporter. @@ -79,10 +154,10 @@ func (e *Exporter) MarshalLog() interface{} { // a PeriodicReader to export OpenTelemetry metric data to an OTLP receiving // endpoint using protobufs over HTTP. func New(_ context.Context, opts ...Option) (*Exporter, error) { - c, err := newClient(opts...) + cfg := oconf.NewHTTPConfig(asHTTPOptions(opts)...) + c, err := newClient(cfg) if err != nil { return nil, err } - exp := ominternal.New(c) - return &Exporter{exp}, nil + return newExporter(c, cfg) } diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/exporter_test.go b/exporters/otlp/otlpmetric/otlpmetrichttp/exporter_test.go new file mode 100644 index 00000000000..faf34ba1d8c --- /dev/null +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/exporter_test.go @@ -0,0 +1,124 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlpmetrichttp // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otest" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +func TestExporterClientConcurrentSafe(t *testing.T) { + const goroutines = 5 + + coll, err := otest.NewHTTPCollector("", nil) + require.NoError(t, err) + + ctx := context.Background() + addr := coll.Addr().String() + opts := []Option{WithEndpoint(addr), WithInsecure()} + cfg := oconf.NewHTTPConfig(asHTTPOptions(opts)...) + client, err := newClient(cfg) + require.NoError(t, err) + + exp, err := newExporter(client, oconf.Config{}) + require.NoError(t, err) + rm := new(metricdata.ResourceMetrics) + + done := make(chan struct{}) + first := make(chan struct{}, goroutines) + var wg sync.WaitGroup + for i := 0; i < goroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + assert.NoError(t, exp.Export(ctx, rm)) + assert.NoError(t, exp.ForceFlush(ctx)) + // Ensure some work is done before shutting down. + first <- struct{}{} + + for { + _ = exp.Export(ctx, rm) + _ = exp.ForceFlush(ctx) + + select { + case <-done: + return + default: + } + } + }() + } + + for i := 0; i < goroutines; i++ { + <-first + } + close(first) + assert.NoError(t, exp.Shutdown(ctx)) + assert.ErrorIs(t, exp.Shutdown(ctx), errShutdown) + + close(done) + wg.Wait() +} + +func TestExporterDoesNotBlockTemporalityAndAggregation(t *testing.T) { + rCh := make(chan otest.ExportResult, 1) + coll, err := otest.NewHTTPCollector("", rCh) + require.NoError(t, err) + + ctx := context.Background() + addr := coll.Addr().String() + opts := []Option{WithEndpoint(addr), WithInsecure()} + cfg := oconf.NewHTTPConfig(asHTTPOptions(opts)...) + client, err := newClient(cfg) + require.NoError(t, err) + + exp, err := newExporter(client, oconf.Config{}) + require.NoError(t, err) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + rm := new(metricdata.ResourceMetrics) + t.Log("starting export") + require.NoError(t, exp.Export(ctx, rm)) + t.Log("export complete") + }() + + assert.Eventually(t, func() bool { + const inst = metric.InstrumentKindCounter + // These should not be blocked. + t.Log("getting temporality") + _ = exp.Temporality(inst) + t.Log("getting aggregation") + _ = exp.Aggregation(inst) + return true + }, time.Second, 10*time.Millisecond) + + // Clear the export. + rCh <- otest.ExportResult{} + close(rCh) + wg.Wait() +}