From 828f63bd96653d1809e0ee2478e41cddd60e6d05 Mon Sep 17 00:00:00 2001 From: "kristina.pathak" Date: Tue, 23 Jul 2024 14:50:43 -0700 Subject: [PATCH 01/17] add metadata wrapper around exporter --- exporter/otelarrowexporter/README.md | 11 ++ exporter/otelarrowexporter/factory.go | 26 +-- exporter/otelarrowexporter/metadata.go | 237 ++++++++++++++++++++++++ exporter/otelarrowexporter/otelarrow.go | 25 ++- 4 files changed, 285 insertions(+), 14 deletions(-) create mode 100644 exporter/otelarrowexporter/metadata.go diff --git a/exporter/otelarrowexporter/README.md b/exporter/otelarrowexporter/README.md index 4b3bcb0e4fd7..868e5b28d1e6 100644 --- a/exporter/otelarrowexporter/README.md +++ b/exporter/otelarrowexporter/README.md @@ -115,6 +115,17 @@ streams. - `prioritizer` (default: "leastloaded"): policy for distributing load across multiple streams. +### Matching Metadata Per Stream + +The following configuration values allow for separate streams per unique +metadata combinations: +- `metadata_keys` (default = empty): When set, this exporter will create one + arrow exporter instance per distinct combination of values in the + client.Metadata. +- `metadata_cardinality_limit` (default = 1000): When metadata_keys is not empty, + this setting limits the number of unique combinations of metadata key values + that will be processed over the lifetime of the exporter. + ### Network Configuration This component uses `round_robin` by default as the gRPC load diff --git a/exporter/otelarrowexporter/factory.go b/exporter/otelarrowexporter/factory.go index 0d3a078388e3..6f976e5793e0 100644 --- a/exporter/otelarrowexporter/factory.go +++ b/exporter/otelarrowexporter/factory.go @@ -72,15 +72,15 @@ func createDefaultConfig() component.Config { } } -func (exp *baseExporter) helperOptions() []exporterhelper.Option { +func (e *baseExporter) helperOptions() []exporterhelper.Option { return []exporterhelper.Option{ exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), - exporterhelper.WithTimeout(exp.config.TimeoutSettings), - exporterhelper.WithRetry(exp.config.RetryConfig), - exporterhelper.WithQueue(exp.config.QueueSettings), - exporterhelper.WithStart(exp.start), - exporterhelper.WithBatcher(exp.config.BatcherConfig), - exporterhelper.WithShutdown(exp.shutdown), + exporterhelper.WithTimeout(e.config.TimeoutSettings), + exporterhelper.WithRetry(e.config.RetryConfig), + exporterhelper.WithQueue(e.config.QueueSettings), + exporterhelper.WithStart(e.start), + exporterhelper.WithBatcher(e.config.BatcherConfig), + exporterhelper.WithShutdown(e.shutdown), } } @@ -103,11 +103,11 @@ func createTracesExporter( set exporter.Settings, cfg component.Config, ) (exporter.Traces, error) { - exp, err := newExporter(cfg, set, createArrowTracesStream) + exp, err := newMetadataExporter(cfg, set, createArrowTracesStream) if err != nil { return nil, err } - return exporterhelper.NewTracesExporter(ctx, exp.settings, exp.config, + return exporterhelper.NewTracesExporter(ctx, exp.getSettings(), exp.getConfig(), exp.pushTraces, exp.helperOptions()..., ) @@ -122,11 +122,11 @@ func createMetricsExporter( set exporter.Settings, cfg component.Config, ) (exporter.Metrics, error) { - exp, err := newExporter(cfg, set, createArrowMetricsStream) + exp, err := newMetadataExporter(cfg, set, createArrowMetricsStream) if err != nil { return nil, err } - return exporterhelper.NewMetricsExporter(ctx, exp.settings, exp.config, + return exporterhelper.NewMetricsExporter(ctx, exp.getSettings(), exp.getConfig(), exp.pushMetrics, exp.helperOptions()..., ) @@ -141,11 +141,11 @@ func createLogsExporter( set exporter.Settings, cfg component.Config, ) (exporter.Logs, error) { - exp, err := newExporter(cfg, set, createArrowLogsStream) + exp, err := newMetadataExporter(cfg, set, createArrowLogsStream) if err != nil { return nil, err } - return exporterhelper.NewLogsExporter(ctx, exp.settings, exp.config, + return exporterhelper.NewLogsExporter(ctx, exp.getSettings(), exp.getConfig(), exp.pushLogs, exp.helperOptions()..., ) diff --git a/exporter/otelarrowexporter/metadata.go b/exporter/otelarrowexporter/metadata.go new file mode 100644 index 000000000000..b0d1dc65086d --- /dev/null +++ b/exporter/otelarrowexporter/metadata.go @@ -0,0 +1,237 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otelarrowexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter" + +import ( + "context" + "errors" + "fmt" + "sort" + "strings" + "sync" + + "go.opentelemetry.io/collector/client" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/otel/attribute" + "go.uber.org/multierr" +) + +var ( + // errTooManyBatchers is returned when the MetadataCardinalityLimit has been reached. + errTooManyBatchers = consumererror.NewPermanent(errors.New("too many batcher metadata-value combinations")) + // errUnexpectedType is returned when the object in the map isn't the expected type + errUnexpectedType = errors.New("unexpected type in map") +) + +type MetadataConfig struct { + *Config + + // MetadataKeys is a list of client.Metadata keys that will be + // used to form distinct batchers. If this setting is empty, + // a single batcher instance will be used. When this setting + // is not empty, one batcher will be used per distinct + // combination of values for the listed metadata keys. + // + // Empty value and unset metadata are treated as distinct cases. + // + // Entries are case-insensitive. Duplicated entries will + // trigger a validation error. + MetadataKeys []string `mapstructure:"metadata_keys"` + + // MetadataCardinalityLimit indicates the maximum number of + // batcher instances that will be created through a distinct + // combination of MetadataKeys. + MetadataCardinalityLimit uint32 `mapstructure:"metadata_cardinality_limit"` +} + +var _ component.Config = (*MetadataConfig)(nil) + +// Validate checks if the exporter configuration is valid +func (cfg *MetadataConfig) Validate() error { + uniq := map[string]bool{} + for _, k := range cfg.MetadataKeys { + l := strings.ToLower(k) + if _, has := uniq[l]; has { + return fmt.Errorf("duplicate entry in metadata_keys: %q (case-insensitive)", l) + } + uniq[l] = true + } + return nil +} + +func createMetadataDefaultConfig() component.Config { + return &MetadataConfig{ + Config: createDefaultConfig().(*Config), + } +} + +type metadataExporter struct { + config *MetadataConfig + settings exporter.Settings + scf streamClientFactory + host component.Host + + metadataKeys []string + exporters sync.Map + + // Guards the size and the storing logic to ensure no more than limit items are stored. + // If we are willing to allow "some" extra items than the limit this can be removed and size can be made atomic. + lock sync.Mutex + size int +} + +var _ exp = (*metadataExporter)(nil) + +func newMetadataExporter(cfg component.Config, set exporter.Settings, streamClientFactory streamClientFactory) (exp, error) { + oCfg := cfg.(*MetadataConfig) + // use lower-case, to be consistent with http/2 headers. + mks := make([]string, len(oCfg.MetadataKeys)) + for i, k := range oCfg.MetadataKeys { + mks[i] = strings.ToLower(k) + } + sort.Strings(mks) + if len(mks) == 0 { + return newExporter(cfg, set, streamClientFactory) + } + return &metadataExporter{ + config: oCfg, + settings: set, + scf: streamClientFactory, + metadataKeys: mks, + }, nil +} + +func (e *metadataExporter) getSettings() exporter.Settings { + return e.settings +} + +func (e *metadataExporter) getConfig() component.Config { + return e.config +} + +func (e *metadataExporter) helperOptions() []exporterhelper.Option { + return []exporterhelper.Option{ + exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), + exporterhelper.WithTimeout(e.config.Config.TimeoutSettings), + exporterhelper.WithRetry(e.config.Config.RetryConfig), + exporterhelper.WithQueue(e.config.Config.QueueSettings), + exporterhelper.WithStart(e.start), + exporterhelper.WithShutdown(e.shutdown), + } +} + +func (e *metadataExporter) start(ctx context.Context, host component.Host) (err error) { + e.host = host + return nil +} + +func (e *metadataExporter) shutdown(ctx context.Context) error { + var err error + e.exporters.Range(func(key any, value any) bool { + be, ok := value.(exp) + if !ok { + err = multierr.Append(err, fmt.Errorf("%w: %T", errUnexpectedType, value)) + return true + } + err = multierr.Append(err, be.shutdown(ctx)) + return true + }) + return err +} + +func (e *metadataExporter) pushTraces(ctx context.Context, td ptrace.Traces) error { + s := getSet(ctx, e.metadataKeys) + + be, err := e.getOrCreateExporter(ctx, s) + if err != nil { + return err + } + return be.pushTraces(ctx, td) +} + +func (e *metadataExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error { + s := getSet(ctx, e.metadataKeys) + + be, err := e.getOrCreateExporter(ctx, s) + if err != nil { + return err + } + + return be.(exp).pushMetrics(ctx, md) +} + +func (e *metadataExporter) pushLogs(ctx context.Context, ld plog.Logs) error { + s := getSet(ctx, e.metadataKeys) + + be, err := e.getOrCreateExporter(ctx, s) + if err != nil { + return err + } + + return be.(exp).pushLogs(ctx, ld) +} + +func (e *metadataExporter) getOrCreateExporter(ctx context.Context, s attribute.Set) (exp, error) { + v, ok := e.exporters.Load(s) + if !ok { + e.lock.Lock() + if e.config.MetadataCardinalityLimit != 0 && e.size >= int(e.config.MetadataCardinalityLimit) { + e.lock.Unlock() + return nil, errTooManyBatchers + } + + newExp, err := newExporter(e.config, e.settings, e.scf) + if err != nil { + return nil, fmt.Errorf("failed to create exporter: %w", err) + } + + // aset.ToSlice() returns the sorted, deduplicated, + // and name-downcased list of attributes. + var loaded bool + v, loaded = e.exporters.LoadOrStore(s, newExp) + if !loaded { + // Start the goroutine only if we added the object to the map, otherwise is already started. + err = newExp.start(ctx, e.host) + if err != nil { + e.exporters.Delete(s) + return nil, fmt.Errorf("failed to start exporter: %w", err) + } + e.size++ + } + e.lock.Unlock() + } + val, ok := v.(exp) + if !ok { + return nil, fmt.Errorf("%w: %T", errUnexpectedType, v) + } + return val, nil +} + +func getSet(ctx context.Context, keys []string) attribute.Set { + // Get each metadata key value, form the corresponding + // attribute set for use as a map lookup key. + info := client.FromContext(ctx) + md := map[string][]string{} + var attrs []attribute.KeyValue + for _, k := range keys { + // Lookup the value in the incoming metadata, copy it + // into the outgoing metadata, and create a unique + // value for the attributeSet. + vs := info.Metadata.Get(k) + md[k] = vs + if len(vs) == 1 { + attrs = append(attrs, attribute.String(k, vs[0])) + } else { + attrs = append(attrs, attribute.StringSlice(k, vs)) + } + } + return attribute.NewSet(attrs...) +} diff --git a/exporter/otelarrowexporter/otelarrow.go b/exporter/otelarrowexporter/otelarrow.go index a4a94496717e..69bdcb509d1a 100644 --- a/exporter/otelarrowexporter/otelarrow.go +++ b/exporter/otelarrowexporter/otelarrow.go @@ -37,6 +37,19 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats" ) +type exp interface { + helperOptions() []exporterhelper.Option + getSettings() exporter.Settings + getConfig() component.Config + + start(context.Context, component.Host) error + shutdown(context.Context) error + + pushTraces(context.Context, ptrace.Traces) error + pushMetrics(context.Context, pmetric.Metrics) error + pushLogs(context.Context, plog.Logs) error +} + type baseExporter struct { // Input configuration. config *Config @@ -60,11 +73,13 @@ type baseExporter struct { streamClientFactory streamClientFactory } +var _ exp = (*baseExporter)(nil) + type streamClientFactory func(conn *grpc.ClientConn) arrow.StreamClientFunc // Crete new exporter and start it. The exporter will begin connecting but // this function may return before the connection is established. -func newExporter(cfg component.Config, set exporter.Settings, streamClientFactory streamClientFactory) (*baseExporter, error) { +func newExporter(cfg component.Config, set exporter.Settings, streamClientFactory streamClientFactory) (exp, error) { oCfg := cfg.(*Config) if oCfg.Endpoint == "" { @@ -94,6 +109,14 @@ func newExporter(cfg component.Config, set exporter.Settings, streamClientFactor }, nil } +func (e *baseExporter) getSettings() exporter.Settings { + return e.settings +} + +func (e *baseExporter) getConfig() component.Config { + return e.config +} + // start actually creates the gRPC connection. The client construction is deferred till this point as this // is the only place we get hold of Extensions which are required to construct auth round tripper. func (e *baseExporter) start(ctx context.Context, host component.Host) (err error) { From 3061d330e796d25253a47b14e93fb886620a2b4b Mon Sep 17 00:00:00 2001 From: Kristina Pathak Date: Tue, 23 Jul 2024 21:39:31 -0700 Subject: [PATCH 02/17] Update exporter/otelarrowexporter/metadata.go Co-authored-by: Joshua MacDonald --- exporter/otelarrowexporter/metadata.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/exporter/otelarrowexporter/metadata.go b/exporter/otelarrowexporter/metadata.go index b0d1dc65086d..200fb68e7c4d 100644 --- a/exporter/otelarrowexporter/metadata.go +++ b/exporter/otelarrowexporter/metadata.go @@ -193,8 +193,6 @@ func (e *metadataExporter) getOrCreateExporter(ctx context.Context, s attribute. return nil, fmt.Errorf("failed to create exporter: %w", err) } - // aset.ToSlice() returns the sorted, deduplicated, - // and name-downcased list of attributes. var loaded bool v, loaded = e.exporters.LoadOrStore(s, newExp) if !loaded { From 7571c46677cd427a39eee353d42f07694a27703e Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Fri, 23 Aug 2024 04:29:31 -0400 Subject: [PATCH 03/17] unit tests --- exporter/otelarrowexporter/metadata.go | 43 ++-- exporter/otelarrowexporter/metadata_test.go | 206 +++++++++++++++++++ exporter/otelarrowexporter/otelarrow.go | 12 +- exporter/otelarrowexporter/otelarrow_test.go | 10 +- 4 files changed, 250 insertions(+), 21 deletions(-) create mode 100644 exporter/otelarrowexporter/metadata_test.go diff --git a/exporter/otelarrowexporter/metadata.go b/exporter/otelarrowexporter/metadata.go index 200fb68e7c4d..31293c0094bf 100644 --- a/exporter/otelarrowexporter/metadata.go +++ b/exporter/otelarrowexporter/metadata.go @@ -21,12 +21,13 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/otel/attribute" + "google.golang.org/grpc/metadata" "go.uber.org/multierr" ) var ( - // errTooManyBatchers is returned when the MetadataCardinalityLimit has been reached. - errTooManyBatchers = consumererror.NewPermanent(errors.New("too many batcher metadata-value combinations")) + // errTooManyExporters is returned when the MetadataCardinalityLimit has been reached. + errTooManyExporters = consumererror.NewPermanent(errors.New("too many exporter metadata-value combinations")) // errUnexpectedType is returned when the object in the map isn't the expected type errUnexpectedType = errors.New("unexpected type in map") ) @@ -35,9 +36,9 @@ type MetadataConfig struct { *Config // MetadataKeys is a list of client.Metadata keys that will be - // used to form distinct batchers. If this setting is empty, - // a single batcher instance will be used. When this setting - // is not empty, one batcher will be used per distinct + // used to form distinct exporters. If this setting is empty, + // a single exporter instance will be used. When this setting + // is not empty, one exporter will be used per distinct // combination of values for the listed metadata keys. // // Empty value and unset metadata are treated as distinct cases. @@ -47,9 +48,10 @@ type MetadataConfig struct { MetadataKeys []string `mapstructure:"metadata_keys"` // MetadataCardinalityLimit indicates the maximum number of - // batcher instances that will be created through a distinct + // exporter instances that will be created through a distinct // combination of MetadataKeys. MetadataCardinalityLimit uint32 `mapstructure:"metadata_cardinality_limit"` + } var _ component.Config = (*MetadataConfig)(nil) @@ -81,6 +83,7 @@ type metadataExporter struct { metadataKeys []string exporters sync.Map + metadata metadata.MD // Guards the size and the storing logic to ensure no more than limit items are stored. // If we are willing to allow "some" extra items than the limit this can be removed and size can be made atomic. @@ -133,6 +136,10 @@ func (e *metadataExporter) start(ctx context.Context, host component.Host) (err return nil } +func (e *metadataExporter) setMetadata(md metadata.MD) { + return +} + func (e *metadataExporter) shutdown(ctx context.Context) error { var err error e.exporters.Range(func(key any, value any) bool { @@ -148,19 +155,19 @@ func (e *metadataExporter) shutdown(ctx context.Context) error { } func (e *metadataExporter) pushTraces(ctx context.Context, td ptrace.Traces) error { - s := getSet(ctx, e.metadataKeys) + s, mdata := e.getAttrSet(ctx, e.metadataKeys) - be, err := e.getOrCreateExporter(ctx, s) + be, err := e.getOrCreateExporter(ctx, s, mdata) if err != nil { return err } - return be.pushTraces(ctx, td) + return be.(exp).pushTraces(ctx, td) } func (e *metadataExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error { - s := getSet(ctx, e.metadataKeys) + s, mdata := e.getAttrSet(ctx, e.metadataKeys) - be, err := e.getOrCreateExporter(ctx, s) + be, err := e.getOrCreateExporter(ctx, s, mdata) if err != nil { return err } @@ -169,9 +176,9 @@ func (e *metadataExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) } func (e *metadataExporter) pushLogs(ctx context.Context, ld plog.Logs) error { - s := getSet(ctx, e.metadataKeys) + s, mdata := e.getAttrSet(ctx, e.metadataKeys) - be, err := e.getOrCreateExporter(ctx, s) + be, err := e.getOrCreateExporter(ctx, s, mdata) if err != nil { return err } @@ -179,13 +186,13 @@ func (e *metadataExporter) pushLogs(ctx context.Context, ld plog.Logs) error { return be.(exp).pushLogs(ctx, ld) } -func (e *metadataExporter) getOrCreateExporter(ctx context.Context, s attribute.Set) (exp, error) { +func (e *metadataExporter) getOrCreateExporter(ctx context.Context, s attribute.Set, md metadata.MD) (exp, error) { v, ok := e.exporters.Load(s) if !ok { e.lock.Lock() if e.config.MetadataCardinalityLimit != 0 && e.size >= int(e.config.MetadataCardinalityLimit) { e.lock.Unlock() - return nil, errTooManyBatchers + return nil, errTooManyExporters } newExp, err := newExporter(e.config, e.settings, e.scf) @@ -197,6 +204,7 @@ func (e *metadataExporter) getOrCreateExporter(ctx context.Context, s attribute. v, loaded = e.exporters.LoadOrStore(s, newExp) if !loaded { // Start the goroutine only if we added the object to the map, otherwise is already started. + newExp.setMetadata(md) err = newExp.start(ctx, e.host) if err != nil { e.exporters.Delete(s) @@ -213,7 +221,7 @@ func (e *metadataExporter) getOrCreateExporter(ctx context.Context, s attribute. return val, nil } -func getSet(ctx context.Context, keys []string) attribute.Set { +func (e *metadataExporter) getAttrSet(ctx context.Context, keys []string) (attribute.Set, metadata.MD) { // Get each metadata key value, form the corresponding // attribute set for use as a map lookup key. info := client.FromContext(ctx) @@ -231,5 +239,6 @@ func getSet(ctx context.Context, keys []string) attribute.Set { attrs = append(attrs, attribute.StringSlice(k, vs)) } } - return attribute.NewSet(attrs...) + // ctx = metadata.NewOutgoingContext(ctx, metadata.MD(md)) + return attribute.NewSet(attrs...), metadata.MD(md) } diff --git a/exporter/otelarrowexporter/metadata_test.go b/exporter/otelarrowexporter/metadata_test.go new file mode 100644 index 000000000000..ff856c98a566 --- /dev/null +++ b/exporter/otelarrowexporter/metadata_test.go @@ -0,0 +1,206 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 +package otelarrowexporter +import ( + "context" + "fmt" + "net" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/client" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/exporter/exportertest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata" +) + +func TestSendTracesWithMetadata(t *testing.T) { + // Start an OTel-Arrow receiver. + ln, err := net.Listen("tcp", "localhost:") + require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err) + rcv, err := otelArrowTracesReceiverOnGRPCServer(ln, false) + rcv.hasMetadata = true + rcv.spanCountByMetadata = make(map[string]int) + + rcv.start() + require.NoError(t, err, "Failed to start mock OTLP receiver") + // Also closes the connection. + defer rcv.srv.GracefulStop() + + // Start an OTLP exporter and point to the receiver. + factory := NewFactory() + cfg := createMetadataDefaultConfig().(*MetadataConfig) + cfg.ClientConfig = configgrpc.ClientConfig{ + Endpoint: ln.Addr().String(), + TLSSetting: configtls.ClientConfig{ + Insecure: true, + }, + } + cfg.Arrow.MaxStreamLifetime = 100 * time.Second + + cfg.MetadataCardinalityLimit = 10 + cfg.MetadataKeys = []string{"key1", "key2"} + set := exportertest.NewNopSettings() + bg := context.Background() + exp, err := factory.CreateTracesExporter(bg, set, cfg) + require.NoError(t, err) + require.NotNil(t, exp) + defer func() { + assert.NoError(t, exp.Shutdown(context.Background())) + }() + + host := componenttest.NewNopHost() + assert.NoError(t, exp.Start(context.Background(), host)) + time.Sleep(1 * time.Second) + + // Ensure that initially there is no data in the receiver. + assert.EqualValues(t, 0, rcv.requestCount.Load()) + + callCtxs := []context.Context{ + client.NewContext(context.Background(), client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "key1": {"first"}, + "key2": {"second"}, + }), + }), + client.NewContext(context.Background(), client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "key1": {"third"}, + "key2": {"fourth"}, + }), + }), + } + + expectByContext := make([]int, len(callCtxs)) + + requestCount := 3 + spansPerRequest := 33 + // sentResourceSpans := ptrace.NewTraces().ResourceSpans() + for requestNum := 0; requestNum < requestCount; requestNum++ { + td := testdata.GenerateTraces(spansPerRequest) + spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans() + for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ { + spans.At(spanIndex).SetName(fmt.Sprintf("%d-%d",requestNum, spanIndex)) + } + // td.ResourceSpans().At(0).CopyTo(sentResourceSpans.AppendEmpty()) + + num := requestNum % len(callCtxs) + expectByContext[num] += spansPerRequest + go func(n int) { + assert.NoError(t, exp.ConsumeTraces(callCtxs[n], td)) + + }(num) + } + + assert.Eventually(t, func() bool { + return rcv.requestCount.Load() == int32(requestCount) + }, 1*time.Second, 5*time.Millisecond) + assert.Eventually(t, func() bool { + return rcv.totalItems.Load() == int32(requestCount*spansPerRequest) + }, 1*time.Second, 5*time.Millisecond) + + require.Equal(t, len(callCtxs), len(rcv.spanCountByMetadata)) + for idx, ctx := range callCtxs { + md := client.FromContext(ctx).Metadata + key := fmt.Sprintf("%s|%s", md.Get("key1"), md.Get("key2")) + require.Equal(t, expectByContext[idx], rcv.spanCountByMetadata[key]) + } +} + +func TestDuplicateMetadataKeys(t *testing.T) { + cfg := createMetadataDefaultConfig().(*MetadataConfig) + cfg.MetadataKeys = []string{"myTOKEN", "mytoken"} + err := cfg.Validate() + require.Error(t, err) + require.Contains(t, err.Error(), "duplicate") + require.Contains(t, err.Error(), "mytoken") +} + +func TestMetadataExporterCardinalityLimit(t *testing.T) { + const cardLimit = 10 + // Start an OTel-Arrow receiver. + ln, err := net.Listen("tcp", "localhost:") + require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err) + rcv, err := otelArrowTracesReceiverOnGRPCServer(ln, false) + rcv.hasMetadata = true + rcv.spanCountByMetadata = make(map[string]int) + + rcv.start() + require.NoError(t, err, "Failed to start mock OTLP receiver") + // Also closes the connection. + defer rcv.srv.GracefulStop() + + // Start an OTLP exporter and point to the receiver. + factory := NewFactory() + cfg := createMetadataDefaultConfig().(*MetadataConfig) + cfg.ClientConfig = configgrpc.ClientConfig{ + Endpoint: ln.Addr().String(), + TLSSetting: configtls.ClientConfig{ + Insecure: true, + }, + } + cfg.Arrow.MaxStreamLifetime = 100 * time.Second + + // disable queue settings to allow for error backpropagation. + cfg.QueueSettings.Enabled = false + + cfg.MetadataCardinalityLimit = cardLimit + cfg.MetadataKeys = []string{"key1", "key2"} + set := exportertest.NewNopSettings() + bg := context.Background() + exp, err := factory.CreateTracesExporter(bg, set, cfg) + require.NoError(t, err) + require.NotNil(t, exp) + defer func() { + assert.NoError(t, exp.Shutdown(context.Background())) + }() + + host := componenttest.NewNopHost() + assert.NoError(t, exp.Start(context.Background(), host)) + time.Sleep(1 * time.Second) + + // Ensure that initially there is no data in the receiver. + assert.EqualValues(t, 0, rcv.requestCount.Load()) + + + for requestNum := 0; requestNum < cardLimit; requestNum++ { + td := testdata.GenerateTraces(1) + ctx := client.NewContext(bg, client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "key1": {fmt.Sprint(requestNum)}, + "key2": {fmt.Sprint(requestNum)}, + }), + }) + + assert.NoError(t, exp.ConsumeTraces(ctx, td)) + } + + td := testdata.GenerateTraces(1) + ctx := client.NewContext(bg, client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "key1": {"limit_exceeded"}, + "key2": {"limit_exceeded"}, + }), + }) + + // above the metadata cardinality limit. + err = exp.ConsumeTraces(ctx, td) + require.Error(t, err) + assert.True(t, consumererror.IsPermanent(err)) + assert.Contains(t, err.Error(), "too many") + + assert.Eventually(t, func() bool { + return rcv.requestCount.Load() == int32(cardLimit) + }, 1*time.Second, 5*time.Millisecond) + assert.Eventually(t, func() bool { + return rcv.totalItems.Load() == int32(cardLimit) + }, 1*time.Second, 5*time.Millisecond) + + require.Equal(t, cardLimit, len(rcv.spanCountByMetadata)) +} diff --git a/exporter/otelarrowexporter/otelarrow.go b/exporter/otelarrowexporter/otelarrow.go index 69bdcb509d1a..8f5a288f7813 100644 --- a/exporter/otelarrowexporter/otelarrow.go +++ b/exporter/otelarrowexporter/otelarrow.go @@ -12,6 +12,7 @@ import ( arrowPkg "github.com/apache/arrow/go/v16/arrow" arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record" + // "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configcompression" "go.opentelemetry.io/collector/consumer/consumererror" @@ -41,6 +42,7 @@ type exp interface { helperOptions() []exporterhelper.Option getSettings() exporter.Settings getConfig() component.Config + setMetadata(metadata.MD) start(context.Context, component.Host) error shutdown(context.Context) error @@ -80,7 +82,7 @@ type streamClientFactory func(conn *grpc.ClientConn) arrow.StreamClientFunc // Crete new exporter and start it. The exporter will begin connecting but // this function may return before the connection is established. func newExporter(cfg component.Config, set exporter.Settings, streamClientFactory streamClientFactory) (exp, error) { - oCfg := cfg.(*Config) + oCfg := cfg.(*MetadataConfig) if oCfg.Endpoint == "" { return nil, errors.New("OTLP exporter config requires an Endpoint") @@ -101,7 +103,7 @@ func newExporter(cfg component.Config, set exporter.Settings, streamClientFactor } return &baseExporter{ - config: oCfg, + config: oCfg.Config, settings: set, userAgent: userAgent, netReporter: netReporter, @@ -117,6 +119,9 @@ func (e *baseExporter) getConfig() component.Config { return e.config } +func (e *baseExporter) setMetadata(md metadata.MD) { + e.metadata = metadata.Join(e.metadata, md) +} // start actually creates the gRPC connection. The client construction is deferred till this point as this // is the only place we get hold of Extensions which are required to construct auth round tripper. func (e *baseExporter) start(ctx context.Context, host component.Host) (err error) { @@ -137,7 +142,8 @@ func (e *baseExporter) start(ctx context.Context, host component.Host) (err erro for k, v := range e.config.ClientConfig.Headers { headers[k] = string(v) } - e.metadata = metadata.New(headers) + headerMetadata := metadata.New(headers) + e.metadata = metadata.Join(e.metadata, headerMetadata) e.callOptions = []grpc.CallOption{ grpc.WaitForReady(e.config.ClientConfig.WaitForReady), } diff --git a/exporter/otelarrowexporter/otelarrow_test.go b/exporter/otelarrowexporter/otelarrow_test.go index dfa73f7417cc..256242b8312f 100644 --- a/exporter/otelarrowexporter/otelarrow_test.go +++ b/exporter/otelarrowexporter/otelarrow_test.go @@ -80,16 +80,24 @@ type mockTracesReceiver struct { mockReceiver exportResponse func() ptraceotlp.ExportResponse lastRequest ptrace.Traces + hasMetadata bool + spanCountByMetadata map[string]int } func (r *mockTracesReceiver) Export(ctx context.Context, req ptraceotlp.ExportRequest) (ptraceotlp.ExportResponse, error) { r.requestCount.Add(int32(1)) td := req.Traces() r.totalItems.Add(int32(td.SpanCount())) + r.metadata, _ = metadata.FromIncomingContext(ctx) r.mux.Lock() defer r.mux.Unlock() + if r.hasMetadata { + v1 := r.metadata.Get("key1") + v2 := r.metadata.Get("key2") + hashKey := fmt.Sprintf("%s|%s", v1, v2) + r.spanCountByMetadata[hashKey] += (td.SpanCount()) + } r.lastRequest = td - r.metadata, _ = metadata.FromIncomingContext(ctx) return r.exportResponse(), r.exportError } From 03b6d675533026c1988839ce796463bb9035ecc5 Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Fri, 23 Aug 2024 14:14:09 -0400 Subject: [PATCH 04/17] fix tests --- exporter/otelarrowexporter/factory_test.go | 137 +++++++++++-------- exporter/otelarrowexporter/metadata_test.go | 4 +- exporter/otelarrowexporter/otelarrow_test.go | 20 +-- 3 files changed, 92 insertions(+), 69 deletions(-) diff --git a/exporter/otelarrowexporter/factory_test.go b/exporter/otelarrowexporter/factory_test.go index 0ca198b1ddbe..910c95a4e512 100644 --- a/exporter/otelarrowexporter/factory_test.go +++ b/exporter/otelarrowexporter/factory_test.go @@ -31,8 +31,7 @@ func TestCreateDefaultConfig(t *testing.T) { cfg := factory.CreateDefaultConfig() assert.NotNil(t, cfg, "failed to create default config") assert.NoError(t, componenttest.CheckConfigStruct(cfg)) - ocfg, ok := factory.CreateDefaultConfig().(*Config) - assert.True(t, ok) + ocfg := createMetadataDefaultConfig().(*MetadataConfig) assert.Equal(t, ocfg.RetryConfig, configretry.NewDefaultBackOffConfig()) assert.Equal(t, ocfg.QueueSettings, exporterhelper.NewDefaultQueueConfig()) assert.Equal(t, ocfg.TimeoutSettings, exporterhelper.NewDefaultTimeoutConfig()) @@ -49,7 +48,7 @@ func TestCreateDefaultConfig(t *testing.T) { func TestCreateMetricsExporter(t *testing.T) { factory := NewFactory() - cfg := factory.CreateDefaultConfig().(*Config) + cfg := createMetadataDefaultConfig().(*MetadataConfig) cfg.ClientConfig.Endpoint = testutil.GetAvailableLocalAddress(t) set := exportertest.NewNopSettings() @@ -62,107 +61,127 @@ func TestCreateTracesExporter(t *testing.T) { endpoint := testutil.GetAvailableLocalAddress(t) tests := []struct { name string - config Config + config MetadataConfig mustFailOnCreate bool mustFailOnStart bool }{ { name: "NoEndpoint", - config: Config{ - ClientConfig: configgrpc.ClientConfig{ - Endpoint: "", + config: MetadataConfig{ + Config: &Config{ + ClientConfig: configgrpc.ClientConfig{ + Endpoint: "", + }, }, }, mustFailOnCreate: true, }, { name: "UseSecure", - config: Config{ - ClientConfig: configgrpc.ClientConfig{ - Endpoint: endpoint, - TLSSetting: configtls.ClientConfig{ - Insecure: false, + config: MetadataConfig{ + Config: &Config{ + ClientConfig: configgrpc.ClientConfig{ + Endpoint: endpoint, + TLSSetting: configtls.ClientConfig{ + Insecure: false, + }, }, }, }, }, { name: "Keepalive", - config: Config{ - ClientConfig: configgrpc.ClientConfig{ - Endpoint: endpoint, - Keepalive: &configgrpc.KeepaliveClientConfig{ - Time: 30 * time.Second, - Timeout: 25 * time.Second, - PermitWithoutStream: true, + config: MetadataConfig{ + Config: &Config{ + ClientConfig: configgrpc.ClientConfig{ + Endpoint: endpoint, + Keepalive: &configgrpc.KeepaliveClientConfig{ + Time: 30 * time.Second, + Timeout: 25 * time.Second, + PermitWithoutStream: true, + }, }, }, }, }, { name: "NoneCompression", - config: Config{ - ClientConfig: configgrpc.ClientConfig{ - Endpoint: endpoint, - Compression: "none", + config: MetadataConfig{ + Config: &Config{ + ClientConfig: configgrpc.ClientConfig{ + Endpoint: endpoint, + Compression: "none", + }, }, }, }, { name: "GzipCompression", - config: Config{ - ClientConfig: configgrpc.ClientConfig{ - Endpoint: endpoint, - Compression: configcompression.TypeGzip, + config: MetadataConfig{ + Config: &Config{ + ClientConfig: configgrpc.ClientConfig{ + Endpoint: endpoint, + Compression: configcompression.TypeGzip, + }, }, }, }, { name: "SnappyCompression", - config: Config{ - ClientConfig: configgrpc.ClientConfig{ - Endpoint: endpoint, - Compression: configcompression.TypeSnappy, + config: MetadataConfig{ + Config: &Config{ + ClientConfig: configgrpc.ClientConfig{ + Endpoint: endpoint, + Compression: configcompression.TypeSnappy, + }, }, }, }, { name: "ZstdCompression", - config: Config{ - ClientConfig: configgrpc.ClientConfig{ - Endpoint: endpoint, - Compression: configcompression.TypeZstd, + config: MetadataConfig{ + Config: &Config{ + ClientConfig: configgrpc.ClientConfig{ + Endpoint: endpoint, + Compression: configcompression.TypeZstd, + }, }, }, }, { name: "Headers", - config: Config{ - ClientConfig: configgrpc.ClientConfig{ - Endpoint: endpoint, - Headers: map[string]configopaque.String{ - "hdr1": "val1", - "hdr2": "val2", + config: MetadataConfig{ + Config: &Config{ + ClientConfig: configgrpc.ClientConfig{ + Endpoint: endpoint, + Headers: map[string]configopaque.String{ + "hdr1": "val1", + "hdr2": "val2", + }, }, }, }, }, { name: "NumConsumers", - config: Config{ - ClientConfig: configgrpc.ClientConfig{ - Endpoint: endpoint, + config: MetadataConfig{ + Config: &Config{ + ClientConfig: configgrpc.ClientConfig{ + Endpoint: endpoint, + }, }, }, }, { name: "CaCert", - config: Config{ - ClientConfig: configgrpc.ClientConfig{ - Endpoint: endpoint, - TLSSetting: configtls.ClientConfig{ - Config: configtls.Config{ - CAFile: filepath.Join("testdata", "test_cert.pem"), + config: MetadataConfig{ + Config: &Config{ + ClientConfig: configgrpc.ClientConfig{ + Endpoint: endpoint, + TLSSetting: configtls.ClientConfig{ + Config: configtls.Config{ + CAFile: filepath.Join("testdata", "test_cert.pem"), + }, }, }, }, @@ -170,12 +189,14 @@ func TestCreateTracesExporter(t *testing.T) { }, { name: "CertPemFileError", - config: Config{ - ClientConfig: configgrpc.ClientConfig{ - Endpoint: endpoint, - TLSSetting: configtls.ClientConfig{ - Config: configtls.Config{ - CAFile: "nosuchfile", + config: MetadataConfig{ + Config: &Config{ + ClientConfig: configgrpc.ClientConfig{ + Endpoint: endpoint, + TLSSetting: configtls.ClientConfig{ + Config: configtls.Config{ + CAFile: "nosuchfile", + }, }, }, }, @@ -215,7 +236,7 @@ func TestCreateTracesExporter(t *testing.T) { func TestCreateLogsExporter(t *testing.T) { factory := NewFactory() - cfg := factory.CreateDefaultConfig().(*Config) + cfg := createMetadataDefaultConfig().(*MetadataConfig) cfg.ClientConfig.Endpoint = testutil.GetAvailableLocalAddress(t) set := exportertest.NewNopSettings() @@ -226,7 +247,7 @@ func TestCreateLogsExporter(t *testing.T) { func TestCreateArrowTracesExporter(t *testing.T) { factory := NewFactory() - cfg := factory.CreateDefaultConfig().(*Config) + cfg := createMetadataDefaultConfig().(*MetadataConfig) cfg.ClientConfig.Endpoint = testutil.GetAvailableLocalAddress(t) cfg.Arrow = ArrowConfig{ NumStreams: 1, diff --git a/exporter/otelarrowexporter/metadata_test.go b/exporter/otelarrowexporter/metadata_test.go index ff856c98a566..a32879b64ce2 100644 --- a/exporter/otelarrowexporter/metadata_test.go +++ b/exporter/otelarrowexporter/metadata_test.go @@ -104,8 +104,10 @@ func TestSendTracesWithMetadata(t *testing.T) { assert.Eventually(t, func() bool { return rcv.totalItems.Load() == int32(requestCount*spansPerRequest) }, 1*time.Second, 5*time.Millisecond) + assert.Eventually(t, func() bool { + return len(callCtxs) == len(rcv.spanCountByMetadata) + }, 1*time.Second, 5*time.Millisecond) - require.Equal(t, len(callCtxs), len(rcv.spanCountByMetadata)) for idx, ctx := range callCtxs { md := client.FromContext(ctx).Metadata key := fmt.Sprintf("%s|%s", md.Get("key1"), md.Get("key2")) diff --git a/exporter/otelarrowexporter/otelarrow_test.go b/exporter/otelarrowexporter/otelarrow_test.go index 256242b8312f..6fbb1113130b 100644 --- a/exporter/otelarrowexporter/otelarrow_test.go +++ b/exporter/otelarrowexporter/otelarrow_test.go @@ -306,7 +306,7 @@ func TestSendTraces(t *testing.T) { authID := component.NewID(component.MustNewType("testauth")) expectedHeader := []string{"header-value"} - cfg := factory.CreateDefaultConfig().(*Config) + cfg := createMetadataDefaultConfig().(*MetadataConfig) // Disable queuing to ensure that we execute the request when calling ConsumeTraces // otherwise we will not see any errors. cfg.QueueSettings.Enabled = false @@ -464,7 +464,7 @@ func TestSendTracesWhenEndpointHasHttpScheme(t *testing.T) { // Start an OTLP exporter and point to the receiver. factory := NewFactory() - cfg := factory.CreateDefaultConfig().(*Config) + cfg := createMetadataDefaultConfig().(*MetadataConfig) cfg.ClientConfig = test.gRPCClientSettings cfg.ClientConfig.Endpoint = test.scheme + ln.Addr().String() cfg.Arrow.MaxStreamLifetime = 100 * time.Second @@ -511,7 +511,7 @@ func TestSendMetrics(t *testing.T) { // Start an OTLP exporter and point to the receiver. factory := NewFactory() - cfg := factory.CreateDefaultConfig().(*Config) + cfg := createMetadataDefaultConfig().(*MetadataConfig) // Disable queuing to ensure that we execute the request when calling ConsumeMetrics // otherwise we will not see any errors. cfg.QueueSettings.Enabled = false @@ -611,7 +611,7 @@ func TestSendTraceDataServerDownAndUp(t *testing.T) { // Start an OTel-Arrow exporter and point to the receiver. factory := NewFactory() - cfg := factory.CreateDefaultConfig().(*Config) + cfg := createMetadataDefaultConfig().(*MetadataConfig) // Disable queuing to ensure that we execute the request when calling ConsumeTraces // otherwise we will not see the error. cfg.QueueSettings.Enabled = false @@ -675,7 +675,7 @@ func TestSendTraceDataServerStartWhileRequest(t *testing.T) { // Start an OTel-Arrow exporter and point to the receiver. factory := NewFactory() - cfg := factory.CreateDefaultConfig().(*Config) + cfg := createMetadataDefaultConfig().(*MetadataConfig) cfg.ClientConfig = configgrpc.ClientConfig{ Endpoint: ln.Addr().String(), TLSSetting: configtls.ClientConfig{ @@ -728,7 +728,7 @@ func TestSendTracesOnResourceExhaustion(t *testing.T) { defer rcv.srv.GracefulStop() factory := NewFactory() - cfg := factory.CreateDefaultConfig().(*Config) + cfg := createMetadataDefaultConfig().(*MetadataConfig) cfg.RetryConfig.InitialInterval = 0 cfg.ClientConfig = configgrpc.ClientConfig{ Endpoint: ln.Addr().String(), @@ -809,7 +809,7 @@ func TestSendLogData(t *testing.T) { // Start an OTel-Arrow exporter and point to the receiver. factory := NewFactory() - cfg := factory.CreateDefaultConfig().(*Config) + cfg := createMetadataDefaultConfig().(*MetadataConfig) // Disable queuing to ensure that we execute the request when calling ConsumeLogs // otherwise we will not see any errors. cfg.QueueSettings.Enabled = false @@ -917,7 +917,7 @@ func testSendArrowTraces(t *testing.T, clientWaitForReady, streamServiceAvailabl factory := NewFactory() authID := component.NewID(component.MustNewType("testauth")) expectedHeader := []string{"arrow-ftw"} - cfg := factory.CreateDefaultConfig().(*Config) + cfg := createMetadataDefaultConfig().(*MetadataConfig) cfg.ClientConfig = configgrpc.ClientConfig{ Endpoint: ln.Addr().String(), TLSSetting: configtls.ClientConfig{ @@ -1092,7 +1092,7 @@ func TestSendArrowFailedTraces(t *testing.T) { // Start an OTel-Arrow exporter and point to the receiver. factory := NewFactory() - cfg := factory.CreateDefaultConfig().(*Config) + cfg := createMetadataDefaultConfig().(*MetadataConfig) cfg.ClientConfig = configgrpc.ClientConfig{ Endpoint: ln.Addr().String(), TLSSetting: configtls.ClientConfig{ @@ -1154,7 +1154,7 @@ func TestUserDialOptions(t *testing.T) { // Start an OTel-Arrow exporter and point to the receiver. factory := NewFactory() - cfg := factory.CreateDefaultConfig().(*Config) + cfg := createMetadataDefaultConfig().(*MetadataConfig) cfg.ClientConfig = configgrpc.ClientConfig{ Endpoint: ln.Addr().String(), TLSSetting: configtls.ClientConfig{ From 120f24e9c5ed6690e2f2c290ff8505a074e987ff Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Mon, 26 Aug 2024 02:49:13 -0400 Subject: [PATCH 05/17] remove MetadataConfig --- exporter/otelarrowexporter/README.md | 2 +- exporter/otelarrowexporter/config.go | 36 +++++ exporter/otelarrowexporter/factory_test.go | 136 ++++++++----------- exporter/otelarrowexporter/metadata.go | 53 +------- exporter/otelarrowexporter/metadata_test.go | 6 +- exporter/otelarrowexporter/otelarrow.go | 4 +- exporter/otelarrowexporter/otelarrow_test.go | 20 +-- 7 files changed, 114 insertions(+), 143 deletions(-) diff --git a/exporter/otelarrowexporter/README.md b/exporter/otelarrowexporter/README.md index 868e5b28d1e6..3538bbc4fe73 100644 --- a/exporter/otelarrowexporter/README.md +++ b/exporter/otelarrowexporter/README.md @@ -123,7 +123,7 @@ metadata combinations: arrow exporter instance per distinct combination of values in the client.Metadata. - `metadata_cardinality_limit` (default = 1000): When metadata_keys is not empty, - this setting limits the number of unique combinations of metadata key values + this setting limits the number of unique combinations of metadata key values that will be processed over the lifetime of the exporter. ### Network Configuration diff --git a/exporter/otelarrowexporter/config.go b/exporter/otelarrowexporter/config.go index c512ee30119d..9786e203a3c5 100644 --- a/exporter/otelarrowexporter/config.go +++ b/exporter/otelarrowexporter/config.go @@ -5,6 +5,7 @@ package otelarrowexporter // import "github.com/open-telemetry/opentelemetry-col import ( "fmt" + "strings" "time" "github.com/open-telemetry/otel-arrow/pkg/config" @@ -45,6 +46,23 @@ type Config struct { // exporter is built and configured via code instead of yaml. // Uses include custom dialer, custom user-agent, etc. UserDialOptions []grpc.DialOption `mapstructure:"-"` + + // MetadataKeys is a list of client.Metadata keys that will be + // used to form distinct exporters. If this setting is empty, + // a single exporter instance will be used. When this setting + // is not empty, one exporter will be used per distinct + // combination of values for the listed metadata keys. + // + // Empty value and unset metadata are treated as distinct cases. + // + // Entries are case-insensitive. Duplicated entries will + // trigger a validation error. + MetadataKeys []string `mapstructure:"metadata_keys"` + + // MetadataCardinalityLimit indicates the maximum number of + // exporter instances that will be created through a distinct + // combination of MetadataKeys. + MetadataCardinalityLimit uint32 `mapstructure:"metadata_cardinality_limit"` } // ArrowConfig includes whether Arrow is enabled and the number of @@ -90,6 +108,24 @@ var _ component.Config = (*Config)(nil) var _ component.ConfigValidator = (*ArrowConfig)(nil) +func (cfg *Config) Validate() error { + err := cfg.Arrow.Validate() + if err != nil { + return err + } + + uniq := map[string]bool{} + for _, k := range cfg.MetadataKeys { + l := strings.ToLower(k) + if _, has := uniq[l]; has { + return fmt.Errorf("duplicate entry in metadata_keys: %q (case-insensitive)", l) + } + uniq[l] = true + } + + return nil +} + // Validate returns an error when the number of streams is less than 1. func (cfg *ArrowConfig) Validate() error { if cfg.NumStreams < 1 { diff --git a/exporter/otelarrowexporter/factory_test.go b/exporter/otelarrowexporter/factory_test.go index 910c95a4e512..daa92595d9ec 100644 --- a/exporter/otelarrowexporter/factory_test.go +++ b/exporter/otelarrowexporter/factory_test.go @@ -31,7 +31,7 @@ func TestCreateDefaultConfig(t *testing.T) { cfg := factory.CreateDefaultConfig() assert.NotNil(t, cfg, "failed to create default config") assert.NoError(t, componenttest.CheckConfigStruct(cfg)) - ocfg := createMetadataDefaultConfig().(*MetadataConfig) + ocfg := createDefaultConfig().(*Config) assert.Equal(t, ocfg.RetryConfig, configretry.NewDefaultBackOffConfig()) assert.Equal(t, ocfg.QueueSettings, exporterhelper.NewDefaultQueueConfig()) assert.Equal(t, ocfg.TimeoutSettings, exporterhelper.NewDefaultTimeoutConfig()) @@ -48,7 +48,7 @@ func TestCreateDefaultConfig(t *testing.T) { func TestCreateMetricsExporter(t *testing.T) { factory := NewFactory() - cfg := createMetadataDefaultConfig().(*MetadataConfig) + cfg := createDefaultConfig().(*Config) cfg.ClientConfig.Endpoint = testutil.GetAvailableLocalAddress(t) set := exportertest.NewNopSettings() @@ -61,127 +61,107 @@ func TestCreateTracesExporter(t *testing.T) { endpoint := testutil.GetAvailableLocalAddress(t) tests := []struct { name string - config MetadataConfig + config Config mustFailOnCreate bool mustFailOnStart bool }{ { name: "NoEndpoint", - config: MetadataConfig{ - Config: &Config{ - ClientConfig: configgrpc.ClientConfig{ - Endpoint: "", - }, + config: Config{ + ClientConfig: configgrpc.ClientConfig{ + Endpoint: "", }, }, mustFailOnCreate: true, }, { name: "UseSecure", - config: MetadataConfig{ - Config: &Config{ - ClientConfig: configgrpc.ClientConfig{ - Endpoint: endpoint, - TLSSetting: configtls.ClientConfig{ - Insecure: false, - }, + config: Config{ + ClientConfig: configgrpc.ClientConfig{ + Endpoint: endpoint, + TLSSetting: configtls.ClientConfig{ + Insecure: false, }, }, }, }, { name: "Keepalive", - config: MetadataConfig{ - Config: &Config{ - ClientConfig: configgrpc.ClientConfig{ - Endpoint: endpoint, - Keepalive: &configgrpc.KeepaliveClientConfig{ - Time: 30 * time.Second, - Timeout: 25 * time.Second, - PermitWithoutStream: true, - }, + config: Config{ + ClientConfig: configgrpc.ClientConfig{ + Endpoint: endpoint, + Keepalive: &configgrpc.KeepaliveClientConfig{ + Time: 30 * time.Second, + Timeout: 25 * time.Second, + PermitWithoutStream: true, }, }, }, }, { name: "NoneCompression", - config: MetadataConfig{ - Config: &Config{ - ClientConfig: configgrpc.ClientConfig{ - Endpoint: endpoint, - Compression: "none", - }, + config: Config{ + ClientConfig: configgrpc.ClientConfig{ + Endpoint: endpoint, + Compression: "none", }, }, }, { name: "GzipCompression", - config: MetadataConfig{ - Config: &Config{ - ClientConfig: configgrpc.ClientConfig{ - Endpoint: endpoint, - Compression: configcompression.TypeGzip, - }, + config: Config{ + ClientConfig: configgrpc.ClientConfig{ + Endpoint: endpoint, + Compression: configcompression.TypeGzip, }, }, }, { name: "SnappyCompression", - config: MetadataConfig{ - Config: &Config{ - ClientConfig: configgrpc.ClientConfig{ - Endpoint: endpoint, - Compression: configcompression.TypeSnappy, - }, + config: Config{ + ClientConfig: configgrpc.ClientConfig{ + Endpoint: endpoint, + Compression: configcompression.TypeSnappy, }, }, }, { name: "ZstdCompression", - config: MetadataConfig{ - Config: &Config{ - ClientConfig: configgrpc.ClientConfig{ - Endpoint: endpoint, - Compression: configcompression.TypeZstd, - }, + config: Config{ + ClientConfig: configgrpc.ClientConfig{ + Endpoint: endpoint, + Compression: configcompression.TypeZstd, }, }, }, { name: "Headers", - config: MetadataConfig{ - Config: &Config{ - ClientConfig: configgrpc.ClientConfig{ - Endpoint: endpoint, - Headers: map[string]configopaque.String{ - "hdr1": "val1", - "hdr2": "val2", - }, + config: Config{ + ClientConfig: configgrpc.ClientConfig{ + Endpoint: endpoint, + Headers: map[string]configopaque.String{ + "hdr1": "val1", + "hdr2": "val2", }, }, }, }, { name: "NumConsumers", - config: MetadataConfig{ - Config: &Config{ - ClientConfig: configgrpc.ClientConfig{ - Endpoint: endpoint, - }, + config: Config{ + ClientConfig: configgrpc.ClientConfig{ + Endpoint: endpoint, }, }, }, { name: "CaCert", - config: MetadataConfig{ - Config: &Config{ - ClientConfig: configgrpc.ClientConfig{ - Endpoint: endpoint, - TLSSetting: configtls.ClientConfig{ - Config: configtls.Config{ - CAFile: filepath.Join("testdata", "test_cert.pem"), - }, + config: Config{ + ClientConfig: configgrpc.ClientConfig{ + Endpoint: endpoint, + TLSSetting: configtls.ClientConfig{ + Config: configtls.Config{ + CAFile: filepath.Join("testdata", "test_cert.pem"), }, }, }, @@ -189,14 +169,12 @@ func TestCreateTracesExporter(t *testing.T) { }, { name: "CertPemFileError", - config: MetadataConfig{ - Config: &Config{ - ClientConfig: configgrpc.ClientConfig{ - Endpoint: endpoint, - TLSSetting: configtls.ClientConfig{ - Config: configtls.Config{ - CAFile: "nosuchfile", - }, + config: Config{ + ClientConfig: configgrpc.ClientConfig{ + Endpoint: endpoint, + TLSSetting: configtls.ClientConfig{ + Config: configtls.Config{ + CAFile: "nosuchfile", }, }, }, @@ -236,7 +214,7 @@ func TestCreateTracesExporter(t *testing.T) { func TestCreateLogsExporter(t *testing.T) { factory := NewFactory() - cfg := createMetadataDefaultConfig().(*MetadataConfig) + cfg := createDefaultConfig().(*Config) cfg.ClientConfig.Endpoint = testutil.GetAvailableLocalAddress(t) set := exportertest.NewNopSettings() @@ -247,7 +225,7 @@ func TestCreateLogsExporter(t *testing.T) { func TestCreateArrowTracesExporter(t *testing.T) { factory := NewFactory() - cfg := createMetadataDefaultConfig().(*MetadataConfig) + cfg := createDefaultConfig().(*Config) cfg.ClientConfig.Endpoint = testutil.GetAvailableLocalAddress(t) cfg.Arrow = ArrowConfig{ NumStreams: 1, diff --git a/exporter/otelarrowexporter/metadata.go b/exporter/otelarrowexporter/metadata.go index 31293c0094bf..34226462b70b 100644 --- a/exporter/otelarrowexporter/metadata.go +++ b/exporter/otelarrowexporter/metadata.go @@ -32,51 +32,8 @@ var ( errUnexpectedType = errors.New("unexpected type in map") ) -type MetadataConfig struct { - *Config - - // MetadataKeys is a list of client.Metadata keys that will be - // used to form distinct exporters. If this setting is empty, - // a single exporter instance will be used. When this setting - // is not empty, one exporter will be used per distinct - // combination of values for the listed metadata keys. - // - // Empty value and unset metadata are treated as distinct cases. - // - // Entries are case-insensitive. Duplicated entries will - // trigger a validation error. - MetadataKeys []string `mapstructure:"metadata_keys"` - - // MetadataCardinalityLimit indicates the maximum number of - // exporter instances that will be created through a distinct - // combination of MetadataKeys. - MetadataCardinalityLimit uint32 `mapstructure:"metadata_cardinality_limit"` - -} - -var _ component.Config = (*MetadataConfig)(nil) - -// Validate checks if the exporter configuration is valid -func (cfg *MetadataConfig) Validate() error { - uniq := map[string]bool{} - for _, k := range cfg.MetadataKeys { - l := strings.ToLower(k) - if _, has := uniq[l]; has { - return fmt.Errorf("duplicate entry in metadata_keys: %q (case-insensitive)", l) - } - uniq[l] = true - } - return nil -} - -func createMetadataDefaultConfig() component.Config { - return &MetadataConfig{ - Config: createDefaultConfig().(*Config), - } -} - type metadataExporter struct { - config *MetadataConfig + config *Config settings exporter.Settings scf streamClientFactory host component.Host @@ -94,7 +51,7 @@ type metadataExporter struct { var _ exp = (*metadataExporter)(nil) func newMetadataExporter(cfg component.Config, set exporter.Settings, streamClientFactory streamClientFactory) (exp, error) { - oCfg := cfg.(*MetadataConfig) + oCfg := cfg.(*Config) // use lower-case, to be consistent with http/2 headers. mks := make([]string, len(oCfg.MetadataKeys)) for i, k := range oCfg.MetadataKeys { @@ -123,9 +80,9 @@ func (e *metadataExporter) getConfig() component.Config { func (e *metadataExporter) helperOptions() []exporterhelper.Option { return []exporterhelper.Option{ exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), - exporterhelper.WithTimeout(e.config.Config.TimeoutSettings), - exporterhelper.WithRetry(e.config.Config.RetryConfig), - exporterhelper.WithQueue(e.config.Config.QueueSettings), + exporterhelper.WithTimeout(e.config.TimeoutSettings), + exporterhelper.WithRetry(e.config.RetryConfig), + exporterhelper.WithQueue(e.config.QueueSettings), exporterhelper.WithStart(e.start), exporterhelper.WithShutdown(e.shutdown), } diff --git a/exporter/otelarrowexporter/metadata_test.go b/exporter/otelarrowexporter/metadata_test.go index a32879b64ce2..e5e941b98d9c 100644 --- a/exporter/otelarrowexporter/metadata_test.go +++ b/exporter/otelarrowexporter/metadata_test.go @@ -35,7 +35,7 @@ func TestSendTracesWithMetadata(t *testing.T) { // Start an OTLP exporter and point to the receiver. factory := NewFactory() - cfg := createMetadataDefaultConfig().(*MetadataConfig) + cfg := createDefaultConfig().(*Config) cfg.ClientConfig = configgrpc.ClientConfig{ Endpoint: ln.Addr().String(), TLSSetting: configtls.ClientConfig{ @@ -116,7 +116,7 @@ func TestSendTracesWithMetadata(t *testing.T) { } func TestDuplicateMetadataKeys(t *testing.T) { - cfg := createMetadataDefaultConfig().(*MetadataConfig) + cfg := createDefaultConfig().(*Config) cfg.MetadataKeys = []string{"myTOKEN", "mytoken"} err := cfg.Validate() require.Error(t, err) @@ -140,7 +140,7 @@ func TestMetadataExporterCardinalityLimit(t *testing.T) { // Start an OTLP exporter and point to the receiver. factory := NewFactory() - cfg := createMetadataDefaultConfig().(*MetadataConfig) + cfg := createDefaultConfig().(*Config) cfg.ClientConfig = configgrpc.ClientConfig{ Endpoint: ln.Addr().String(), TLSSetting: configtls.ClientConfig{ diff --git a/exporter/otelarrowexporter/otelarrow.go b/exporter/otelarrowexporter/otelarrow.go index 8f5a288f7813..8dcf4e0a5cad 100644 --- a/exporter/otelarrowexporter/otelarrow.go +++ b/exporter/otelarrowexporter/otelarrow.go @@ -82,7 +82,7 @@ type streamClientFactory func(conn *grpc.ClientConn) arrow.StreamClientFunc // Crete new exporter and start it. The exporter will begin connecting but // this function may return before the connection is established. func newExporter(cfg component.Config, set exporter.Settings, streamClientFactory streamClientFactory) (exp, error) { - oCfg := cfg.(*MetadataConfig) + oCfg := cfg.(*Config) if oCfg.Endpoint == "" { return nil, errors.New("OTLP exporter config requires an Endpoint") @@ -103,7 +103,7 @@ func newExporter(cfg component.Config, set exporter.Settings, streamClientFactor } return &baseExporter{ - config: oCfg.Config, + config: oCfg, settings: set, userAgent: userAgent, netReporter: netReporter, diff --git a/exporter/otelarrowexporter/otelarrow_test.go b/exporter/otelarrowexporter/otelarrow_test.go index 6fbb1113130b..e025d97ac98c 100644 --- a/exporter/otelarrowexporter/otelarrow_test.go +++ b/exporter/otelarrowexporter/otelarrow_test.go @@ -306,7 +306,7 @@ func TestSendTraces(t *testing.T) { authID := component.NewID(component.MustNewType("testauth")) expectedHeader := []string{"header-value"} - cfg := createMetadataDefaultConfig().(*MetadataConfig) + cfg := createDefaultConfig().(*Config) // Disable queuing to ensure that we execute the request when calling ConsumeTraces // otherwise we will not see any errors. cfg.QueueSettings.Enabled = false @@ -464,7 +464,7 @@ func TestSendTracesWhenEndpointHasHttpScheme(t *testing.T) { // Start an OTLP exporter and point to the receiver. factory := NewFactory() - cfg := createMetadataDefaultConfig().(*MetadataConfig) + cfg := createDefaultConfig().(*Config) cfg.ClientConfig = test.gRPCClientSettings cfg.ClientConfig.Endpoint = test.scheme + ln.Addr().String() cfg.Arrow.MaxStreamLifetime = 100 * time.Second @@ -511,7 +511,7 @@ func TestSendMetrics(t *testing.T) { // Start an OTLP exporter and point to the receiver. factory := NewFactory() - cfg := createMetadataDefaultConfig().(*MetadataConfig) + cfg := createDefaultConfig().(*Config) // Disable queuing to ensure that we execute the request when calling ConsumeMetrics // otherwise we will not see any errors. cfg.QueueSettings.Enabled = false @@ -611,7 +611,7 @@ func TestSendTraceDataServerDownAndUp(t *testing.T) { // Start an OTel-Arrow exporter and point to the receiver. factory := NewFactory() - cfg := createMetadataDefaultConfig().(*MetadataConfig) + cfg := createDefaultConfig().(*Config) // Disable queuing to ensure that we execute the request when calling ConsumeTraces // otherwise we will not see the error. cfg.QueueSettings.Enabled = false @@ -675,7 +675,7 @@ func TestSendTraceDataServerStartWhileRequest(t *testing.T) { // Start an OTel-Arrow exporter and point to the receiver. factory := NewFactory() - cfg := createMetadataDefaultConfig().(*MetadataConfig) + cfg := createDefaultConfig().(*Config) cfg.ClientConfig = configgrpc.ClientConfig{ Endpoint: ln.Addr().String(), TLSSetting: configtls.ClientConfig{ @@ -728,7 +728,7 @@ func TestSendTracesOnResourceExhaustion(t *testing.T) { defer rcv.srv.GracefulStop() factory := NewFactory() - cfg := createMetadataDefaultConfig().(*MetadataConfig) + cfg := createDefaultConfig().(*Config) cfg.RetryConfig.InitialInterval = 0 cfg.ClientConfig = configgrpc.ClientConfig{ Endpoint: ln.Addr().String(), @@ -809,7 +809,7 @@ func TestSendLogData(t *testing.T) { // Start an OTel-Arrow exporter and point to the receiver. factory := NewFactory() - cfg := createMetadataDefaultConfig().(*MetadataConfig) + cfg := createDefaultConfig().(*Config) // Disable queuing to ensure that we execute the request when calling ConsumeLogs // otherwise we will not see any errors. cfg.QueueSettings.Enabled = false @@ -917,7 +917,7 @@ func testSendArrowTraces(t *testing.T, clientWaitForReady, streamServiceAvailabl factory := NewFactory() authID := component.NewID(component.MustNewType("testauth")) expectedHeader := []string{"arrow-ftw"} - cfg := createMetadataDefaultConfig().(*MetadataConfig) + cfg := createDefaultConfig().(*Config) cfg.ClientConfig = configgrpc.ClientConfig{ Endpoint: ln.Addr().String(), TLSSetting: configtls.ClientConfig{ @@ -1092,7 +1092,7 @@ func TestSendArrowFailedTraces(t *testing.T) { // Start an OTel-Arrow exporter and point to the receiver. factory := NewFactory() - cfg := createMetadataDefaultConfig().(*MetadataConfig) + cfg := createDefaultConfig().(*Config) cfg.ClientConfig = configgrpc.ClientConfig{ Endpoint: ln.Addr().String(), TLSSetting: configtls.ClientConfig{ @@ -1154,7 +1154,7 @@ func TestUserDialOptions(t *testing.T) { // Start an OTel-Arrow exporter and point to the receiver. factory := NewFactory() - cfg := createMetadataDefaultConfig().(*MetadataConfig) + cfg := createDefaultConfig().(*Config) cfg.ClientConfig = configgrpc.ClientConfig{ Endpoint: ln.Addr().String(), TLSSetting: configtls.ClientConfig{ From 3079c8efa0edcde906b950a96459b395c62d1a58 Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Mon, 26 Aug 2024 03:02:01 -0400 Subject: [PATCH 06/17] lint --- exporter/otelarrowexporter/metadata.go | 18 ++++++++---------- exporter/otelarrowexporter/metadata_test.go | 4 ++-- exporter/otelarrowexporter/otelarrow.go | 1 + exporter/otelarrowexporter/otelarrow_test.go | 6 +++--- 4 files changed, 14 insertions(+), 15 deletions(-) diff --git a/exporter/otelarrowexporter/metadata.go b/exporter/otelarrowexporter/metadata.go index 34226462b70b..c7f05a02dfe3 100644 --- a/exporter/otelarrowexporter/metadata.go +++ b/exporter/otelarrowexporter/metadata.go @@ -21,8 +21,8 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/otel/attribute" - "google.golang.org/grpc/metadata" "go.uber.org/multierr" + "google.golang.org/grpc/metadata" ) var ( @@ -40,7 +40,7 @@ type metadataExporter struct { metadataKeys []string exporters sync.Map - metadata metadata.MD + metadata metadata.MD // Guards the size and the storing logic to ensure no more than limit items are stored. // If we are willing to allow "some" extra items than the limit this can be removed and size can be made atomic. @@ -88,18 +88,16 @@ func (e *metadataExporter) helperOptions() []exporterhelper.Option { } } -func (e *metadataExporter) start(ctx context.Context, host component.Host) (err error) { +func (e *metadataExporter) start(_ context.Context, host component.Host) (err error) { e.host = host return nil } -func (e *metadataExporter) setMetadata(md metadata.MD) { - return -} +func (e *metadataExporter) setMetadata(_ metadata.MD) {} func (e *metadataExporter) shutdown(ctx context.Context) error { var err error - e.exporters.Range(func(key any, value any) bool { + e.exporters.Range(func(_ any, value any) bool { be, ok := value.(exp) if !ok { err = multierr.Append(err, fmt.Errorf("%w: %T", errUnexpectedType, value)) @@ -118,7 +116,7 @@ func (e *metadataExporter) pushTraces(ctx context.Context, td ptrace.Traces) err if err != nil { return err } - return be.(exp).pushTraces(ctx, td) + return be.pushTraces(ctx, td) } func (e *metadataExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error { @@ -129,7 +127,7 @@ func (e *metadataExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) return err } - return be.(exp).pushMetrics(ctx, md) + return be.pushMetrics(ctx, md) } func (e *metadataExporter) pushLogs(ctx context.Context, ld plog.Logs) error { @@ -140,7 +138,7 @@ func (e *metadataExporter) pushLogs(ctx context.Context, ld plog.Logs) error { return err } - return be.(exp).pushLogs(ctx, ld) + return be.pushLogs(ctx, ld) } func (e *metadataExporter) getOrCreateExporter(ctx context.Context, s attribute.Set, md metadata.MD) (exp, error) { diff --git a/exporter/otelarrowexporter/metadata_test.go b/exporter/otelarrowexporter/metadata_test.go index e5e941b98d9c..b5e919a408a7 100644 --- a/exporter/otelarrowexporter/metadata_test.go +++ b/exporter/otelarrowexporter/metadata_test.go @@ -1,6 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package otelarrowexporter + import ( "context" "fmt" @@ -86,7 +87,7 @@ func TestSendTracesWithMetadata(t *testing.T) { td := testdata.GenerateTraces(spansPerRequest) spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans() for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ { - spans.At(spanIndex).SetName(fmt.Sprintf("%d-%d",requestNum, spanIndex)) + spans.At(spanIndex).SetName(fmt.Sprintf("%d-%d", requestNum, spanIndex)) } // td.ResourceSpans().At(0).CopyTo(sentResourceSpans.AppendEmpty()) @@ -170,7 +171,6 @@ func TestMetadataExporterCardinalityLimit(t *testing.T) { // Ensure that initially there is no data in the receiver. assert.EqualValues(t, 0, rcv.requestCount.Load()) - for requestNum := 0; requestNum < cardLimit; requestNum++ { td := testdata.GenerateTraces(1) ctx := client.NewContext(bg, client.Info{ diff --git a/exporter/otelarrowexporter/otelarrow.go b/exporter/otelarrowexporter/otelarrow.go index 8dcf4e0a5cad..b69d2f57edeb 100644 --- a/exporter/otelarrowexporter/otelarrow.go +++ b/exporter/otelarrowexporter/otelarrow.go @@ -122,6 +122,7 @@ func (e *baseExporter) getConfig() component.Config { func (e *baseExporter) setMetadata(md metadata.MD) { e.metadata = metadata.Join(e.metadata, md) } + // start actually creates the gRPC connection. The client construction is deferred till this point as this // is the only place we get hold of Extensions which are required to construct auth round tripper. func (e *baseExporter) start(ctx context.Context, host component.Host) (err error) { diff --git a/exporter/otelarrowexporter/otelarrow_test.go b/exporter/otelarrowexporter/otelarrow_test.go index e025d97ac98c..9deaffe19e4b 100644 --- a/exporter/otelarrowexporter/otelarrow_test.go +++ b/exporter/otelarrowexporter/otelarrow_test.go @@ -78,9 +78,9 @@ func (r *mockReceiver) setExportError(err error) { type mockTracesReceiver struct { ptraceotlp.UnimplementedGRPCServer mockReceiver - exportResponse func() ptraceotlp.ExportResponse - lastRequest ptrace.Traces - hasMetadata bool + exportResponse func() ptraceotlp.ExportResponse + lastRequest ptrace.Traces + hasMetadata bool spanCountByMetadata map[string]int } From 714af4c4eeab7becfb27ceb966b1037d37592185 Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Mon, 26 Aug 2024 03:09:02 -0400 Subject: [PATCH 07/17] changelog --- .chloggen/arrow_exporter_by_metadata.yaml | 27 +++++++++++++++++++++++ exporter/otelarrowexporter/metadata.go | 1 - 2 files changed, 27 insertions(+), 1 deletion(-) create mode 100644 .chloggen/arrow_exporter_by_metadata.yaml diff --git a/.chloggen/arrow_exporter_by_metadata.yaml b/.chloggen/arrow_exporter_by_metadata.yaml new file mode 100644 index 000000000000..1411e5a13ab6 --- /dev/null +++ b/.chloggen/arrow_exporter_by_metadata.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: otelarrowexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Allow separate arrow exporter per unique value of configured metadataKeys. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34178] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] \ No newline at end of file diff --git a/exporter/otelarrowexporter/metadata.go b/exporter/otelarrowexporter/metadata.go index c7f05a02dfe3..7e3d5096890f 100644 --- a/exporter/otelarrowexporter/metadata.go +++ b/exporter/otelarrowexporter/metadata.go @@ -40,7 +40,6 @@ type metadataExporter struct { metadataKeys []string exporters sync.Map - metadata metadata.MD // Guards the size and the storing logic to ensure no more than limit items are stored. // If we are willing to allow "some" extra items than the limit this can be removed and size can be made atomic. From d4b068b36b0e354975143912c009bd9246656db8 Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Mon, 26 Aug 2024 03:34:35 -0400 Subject: [PATCH 08/17] fix race and rm from interface --- exporter/otelarrowexporter/factory_test.go | 8 +++---- exporter/otelarrowexporter/metadata.go | 10 ++++++--- exporter/otelarrowexporter/metadata_test.go | 7 ++++++- exporter/otelarrowexporter/otelarrow.go | 1 - exporter/otelarrowexporter/otelarrow_test.go | 22 ++++++++++---------- 5 files changed, 28 insertions(+), 20 deletions(-) diff --git a/exporter/otelarrowexporter/factory_test.go b/exporter/otelarrowexporter/factory_test.go index daa92595d9ec..7498a0403dd8 100644 --- a/exporter/otelarrowexporter/factory_test.go +++ b/exporter/otelarrowexporter/factory_test.go @@ -31,7 +31,7 @@ func TestCreateDefaultConfig(t *testing.T) { cfg := factory.CreateDefaultConfig() assert.NotNil(t, cfg, "failed to create default config") assert.NoError(t, componenttest.CheckConfigStruct(cfg)) - ocfg := createDefaultConfig().(*Config) + ocfg := factory.CreateDefaultConfig().(*Config) assert.Equal(t, ocfg.RetryConfig, configretry.NewDefaultBackOffConfig()) assert.Equal(t, ocfg.QueueSettings, exporterhelper.NewDefaultQueueConfig()) assert.Equal(t, ocfg.TimeoutSettings, exporterhelper.NewDefaultTimeoutConfig()) @@ -48,7 +48,7 @@ func TestCreateDefaultConfig(t *testing.T) { func TestCreateMetricsExporter(t *testing.T) { factory := NewFactory() - cfg := createDefaultConfig().(*Config) + cfg := factory.CreateDefaultConfig().(*Config) cfg.ClientConfig.Endpoint = testutil.GetAvailableLocalAddress(t) set := exportertest.NewNopSettings() @@ -214,7 +214,7 @@ func TestCreateTracesExporter(t *testing.T) { func TestCreateLogsExporter(t *testing.T) { factory := NewFactory() - cfg := createDefaultConfig().(*Config) + cfg := factory.CreateDefaultConfig().(*Config) cfg.ClientConfig.Endpoint = testutil.GetAvailableLocalAddress(t) set := exportertest.NewNopSettings() @@ -225,7 +225,7 @@ func TestCreateLogsExporter(t *testing.T) { func TestCreateArrowTracesExporter(t *testing.T) { factory := NewFactory() - cfg := createDefaultConfig().(*Config) + cfg := factory.CreateDefaultConfig().(*Config) cfg.ClientConfig.Endpoint = testutil.GetAvailableLocalAddress(t) cfg.Arrow = ArrowConfig{ NumStreams: 1, diff --git a/exporter/otelarrowexporter/metadata.go b/exporter/otelarrowexporter/metadata.go index 7e3d5096890f..b44d9b5a9547 100644 --- a/exporter/otelarrowexporter/metadata.go +++ b/exporter/otelarrowexporter/metadata.go @@ -92,8 +92,6 @@ func (e *metadataExporter) start(_ context.Context, host component.Host) (err er return nil } -func (e *metadataExporter) setMetadata(_ metadata.MD) {} - func (e *metadataExporter) shutdown(ctx context.Context) error { var err error e.exporters.Range(func(_ any, value any) bool { @@ -158,7 +156,13 @@ func (e *metadataExporter) getOrCreateExporter(ctx context.Context, s attribute. v, loaded = e.exporters.LoadOrStore(s, newExp) if !loaded { // Start the goroutine only if we added the object to the map, otherwise is already started. - newExp.setMetadata(md) + be, ok := newExp.(*baseExporter) + if !ok { + return nil, fmt.Errorf("%w: %T", errUnexpectedType, newExp) + } + // set metadata keys for base exporter to add them to the outgoing context. + be.setMetadata(md) + err = newExp.start(ctx, e.host) if err != nil { e.exporters.Delete(s) diff --git a/exporter/otelarrowexporter/metadata_test.go b/exporter/otelarrowexporter/metadata_test.go index b5e919a408a7..8dbfc548feb7 100644 --- a/exporter/otelarrowexporter/metadata_test.go +++ b/exporter/otelarrowexporter/metadata_test.go @@ -95,17 +95,22 @@ func TestSendTracesWithMetadata(t *testing.T) { expectByContext[num] += spansPerRequest go func(n int) { assert.NoError(t, exp.ConsumeTraces(callCtxs[n], td)) - }(num) } assert.Eventually(t, func() bool { + // rcv.mux.Lock() + // defer rcv.mux.Unlock() return rcv.requestCount.Load() == int32(requestCount) }, 1*time.Second, 5*time.Millisecond) assert.Eventually(t, func() bool { + // rcv.mux.Lock() + // defer rcv.mux.Unlock() return rcv.totalItems.Load() == int32(requestCount*spansPerRequest) }, 1*time.Second, 5*time.Millisecond) assert.Eventually(t, func() bool { + rcv.mux.Lock() + defer rcv.mux.Unlock() return len(callCtxs) == len(rcv.spanCountByMetadata) }, 1*time.Second, 5*time.Millisecond) diff --git a/exporter/otelarrowexporter/otelarrow.go b/exporter/otelarrowexporter/otelarrow.go index b69d2f57edeb..4be3756ae815 100644 --- a/exporter/otelarrowexporter/otelarrow.go +++ b/exporter/otelarrowexporter/otelarrow.go @@ -42,7 +42,6 @@ type exp interface { helperOptions() []exporterhelper.Option getSettings() exporter.Settings getConfig() component.Config - setMetadata(metadata.MD) start(context.Context, component.Host) error shutdown(context.Context) error diff --git a/exporter/otelarrowexporter/otelarrow_test.go b/exporter/otelarrowexporter/otelarrow_test.go index 9deaffe19e4b..4c73c153f342 100644 --- a/exporter/otelarrowexporter/otelarrow_test.go +++ b/exporter/otelarrowexporter/otelarrow_test.go @@ -88,9 +88,9 @@ func (r *mockTracesReceiver) Export(ctx context.Context, req ptraceotlp.ExportRe r.requestCount.Add(int32(1)) td := req.Traces() r.totalItems.Add(int32(td.SpanCount())) - r.metadata, _ = metadata.FromIncomingContext(ctx) r.mux.Lock() defer r.mux.Unlock() + r.metadata, _ = metadata.FromIncomingContext(ctx) if r.hasMetadata { v1 := r.metadata.Get("key1") v2 := r.metadata.Get("key2") @@ -306,7 +306,7 @@ func TestSendTraces(t *testing.T) { authID := component.NewID(component.MustNewType("testauth")) expectedHeader := []string{"header-value"} - cfg := createDefaultConfig().(*Config) + cfg := factory.CreateDefaultConfig().(*Config) // Disable queuing to ensure that we execute the request when calling ConsumeTraces // otherwise we will not see any errors. cfg.QueueSettings.Enabled = false @@ -464,7 +464,7 @@ func TestSendTracesWhenEndpointHasHttpScheme(t *testing.T) { // Start an OTLP exporter and point to the receiver. factory := NewFactory() - cfg := createDefaultConfig().(*Config) + cfg := factory.CreateDefaultConfig().(*Config) cfg.ClientConfig = test.gRPCClientSettings cfg.ClientConfig.Endpoint = test.scheme + ln.Addr().String() cfg.Arrow.MaxStreamLifetime = 100 * time.Second @@ -511,7 +511,7 @@ func TestSendMetrics(t *testing.T) { // Start an OTLP exporter and point to the receiver. factory := NewFactory() - cfg := createDefaultConfig().(*Config) + cfg := factory.CreateDefaultConfig().(*Config) // Disable queuing to ensure that we execute the request when calling ConsumeMetrics // otherwise we will not see any errors. cfg.QueueSettings.Enabled = false @@ -611,7 +611,7 @@ func TestSendTraceDataServerDownAndUp(t *testing.T) { // Start an OTel-Arrow exporter and point to the receiver. factory := NewFactory() - cfg := createDefaultConfig().(*Config) + cfg := factory.CreateDefaultConfig().(*Config) // Disable queuing to ensure that we execute the request when calling ConsumeTraces // otherwise we will not see the error. cfg.QueueSettings.Enabled = false @@ -675,7 +675,7 @@ func TestSendTraceDataServerStartWhileRequest(t *testing.T) { // Start an OTel-Arrow exporter and point to the receiver. factory := NewFactory() - cfg := createDefaultConfig().(*Config) + cfg := factory.CreateDefaultConfig().(*Config) cfg.ClientConfig = configgrpc.ClientConfig{ Endpoint: ln.Addr().String(), TLSSetting: configtls.ClientConfig{ @@ -728,7 +728,7 @@ func TestSendTracesOnResourceExhaustion(t *testing.T) { defer rcv.srv.GracefulStop() factory := NewFactory() - cfg := createDefaultConfig().(*Config) + cfg := factory.CreateDefaultConfig().(*Config) cfg.RetryConfig.InitialInterval = 0 cfg.ClientConfig = configgrpc.ClientConfig{ Endpoint: ln.Addr().String(), @@ -809,7 +809,7 @@ func TestSendLogData(t *testing.T) { // Start an OTel-Arrow exporter and point to the receiver. factory := NewFactory() - cfg := createDefaultConfig().(*Config) + cfg := factory.CreateDefaultConfig().(*Config) // Disable queuing to ensure that we execute the request when calling ConsumeLogs // otherwise we will not see any errors. cfg.QueueSettings.Enabled = false @@ -917,7 +917,7 @@ func testSendArrowTraces(t *testing.T, clientWaitForReady, streamServiceAvailabl factory := NewFactory() authID := component.NewID(component.MustNewType("testauth")) expectedHeader := []string{"arrow-ftw"} - cfg := createDefaultConfig().(*Config) + cfg := factory.CreateDefaultConfig().(*Config) cfg.ClientConfig = configgrpc.ClientConfig{ Endpoint: ln.Addr().String(), TLSSetting: configtls.ClientConfig{ @@ -1092,7 +1092,7 @@ func TestSendArrowFailedTraces(t *testing.T) { // Start an OTel-Arrow exporter and point to the receiver. factory := NewFactory() - cfg := createDefaultConfig().(*Config) + cfg := factory.CreateDefaultConfig().(*Config) cfg.ClientConfig = configgrpc.ClientConfig{ Endpoint: ln.Addr().String(), TLSSetting: configtls.ClientConfig{ @@ -1154,7 +1154,7 @@ func TestUserDialOptions(t *testing.T) { // Start an OTel-Arrow exporter and point to the receiver. factory := NewFactory() - cfg := createDefaultConfig().(*Config) + cfg := factory.CreateDefaultConfig().(*Config) cfg.ClientConfig = configgrpc.ClientConfig{ Endpoint: ln.Addr().String(), TLSSetting: configtls.ClientConfig{ From 178994a8014914fbcdd9427577543f343cd6e6af Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Mon, 26 Aug 2024 03:57:59 -0400 Subject: [PATCH 09/17] more lint --- exporter/otelarrowexporter/metadata.go | 4 ++-- exporter/otelarrowexporter/otelarrow.go | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/exporter/otelarrowexporter/metadata.go b/exporter/otelarrowexporter/metadata.go index b44d9b5a9547..1c358450d658 100644 --- a/exporter/otelarrowexporter/metadata.go +++ b/exporter/otelarrowexporter/metadata.go @@ -156,8 +156,8 @@ func (e *metadataExporter) getOrCreateExporter(ctx context.Context, s attribute. v, loaded = e.exporters.LoadOrStore(s, newExp) if !loaded { // Start the goroutine only if we added the object to the map, otherwise is already started. - be, ok := newExp.(*baseExporter) - if !ok { + be, valid := newExp.(*baseExporter) + if !valid { return nil, fmt.Errorf("%w: %T", errUnexpectedType, newExp) } // set metadata keys for base exporter to add them to the outgoing context. diff --git a/exporter/otelarrowexporter/otelarrow.go b/exporter/otelarrowexporter/otelarrow.go index 4be3756ae815..bb22cdca3d6a 100644 --- a/exporter/otelarrowexporter/otelarrow.go +++ b/exporter/otelarrowexporter/otelarrow.go @@ -12,7 +12,6 @@ import ( arrowPkg "github.com/apache/arrow/go/v16/arrow" arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record" - // "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configcompression" "go.opentelemetry.io/collector/consumer/consumererror" From e1aa2a41c18a12d74728c3ca643a632d4b4bdaf5 Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Mon, 26 Aug 2024 03:58:32 -0400 Subject: [PATCH 10/17] go generate? --- exporter/otelarrowexporter/generated_package_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/exporter/otelarrowexporter/generated_package_test.go b/exporter/otelarrowexporter/generated_package_test.go index c19cf02cbd7f..8314a4b32ce5 100644 --- a/exporter/otelarrowexporter/generated_package_test.go +++ b/exporter/otelarrowexporter/generated_package_test.go @@ -3,9 +3,8 @@ package otelarrowexporter import ( - "testing" - "go.uber.org/goleak" + "testing" ) func TestMain(m *testing.M) { From 5f585be618c8bd9914b66dfa0f5a78069aa5749f Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Mon, 26 Aug 2024 04:50:09 -0400 Subject: [PATCH 11/17] make generate --- exporter/otelarrowexporter/generated_package_test.go | 3 ++- exporter/otelarrowexporter/metadata.go | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/exporter/otelarrowexporter/generated_package_test.go b/exporter/otelarrowexporter/generated_package_test.go index 8314a4b32ce5..c19cf02cbd7f 100644 --- a/exporter/otelarrowexporter/generated_package_test.go +++ b/exporter/otelarrowexporter/generated_package_test.go @@ -3,8 +3,9 @@ package otelarrowexporter import ( - "go.uber.org/goleak" "testing" + + "go.uber.org/goleak" ) func TestMain(m *testing.M) { diff --git a/exporter/otelarrowexporter/metadata.go b/exporter/otelarrowexporter/metadata.go index 1c358450d658..f2df361d51d9 100644 --- a/exporter/otelarrowexporter/metadata.go +++ b/exporter/otelarrowexporter/metadata.go @@ -197,6 +197,5 @@ func (e *metadataExporter) getAttrSet(ctx context.Context, keys []string) (attri attrs = append(attrs, attribute.StringSlice(k, vs)) } } - // ctx = metadata.NewOutgoingContext(ctx, metadata.MD(md)) return attribute.NewSet(attrs...), metadata.MD(md) } From 16c8657f5a187d7ed1b9379deeea5f8d56d4cc09 Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Mon, 26 Aug 2024 22:24:15 -0400 Subject: [PATCH 12/17] review feedback --- exporter/otelarrowexporter/metadata.go | 71 +++++++++------------ exporter/otelarrowexporter/metadata_test.go | 6 -- 2 files changed, 31 insertions(+), 46 deletions(-) diff --git a/exporter/otelarrowexporter/metadata.go b/exporter/otelarrowexporter/metadata.go index f2df361d51d9..27a3f29a2ffc 100644 --- a/exporter/otelarrowexporter/metadata.go +++ b/exporter/otelarrowexporter/metadata.go @@ -28,8 +28,6 @@ import ( var ( // errTooManyExporters is returned when the MetadataCardinalityLimit has been reached. errTooManyExporters = consumererror.NewPermanent(errors.New("too many exporter metadata-value combinations")) - // errUnexpectedType is returned when the object in the map isn't the expected type - errUnexpectedType = errors.New("unexpected type in map") ) type metadataExporter struct { @@ -95,11 +93,7 @@ func (e *metadataExporter) start(_ context.Context, host component.Host) (err er func (e *metadataExporter) shutdown(ctx context.Context) error { var err error e.exporters.Range(func(_ any, value any) bool { - be, ok := value.(exp) - if !ok { - err = multierr.Append(err, fmt.Errorf("%w: %T", errUnexpectedType, value)) - return true - } + be := value.(exp) err = multierr.Append(err, be.shutdown(ctx)) return true }) @@ -140,45 +134,42 @@ func (e *metadataExporter) pushLogs(ctx context.Context, ld plog.Logs) error { func (e *metadataExporter) getOrCreateExporter(ctx context.Context, s attribute.Set, md metadata.MD) (exp, error) { v, ok := e.exporters.Load(s) - if !ok { - e.lock.Lock() - if e.config.MetadataCardinalityLimit != 0 && e.size >= int(e.config.MetadataCardinalityLimit) { - e.lock.Unlock() - return nil, errTooManyExporters - } + if ok { + return v.(exp), nil + } - newExp, err := newExporter(e.config, e.settings, e.scf) - if err != nil { - return nil, fmt.Errorf("failed to create exporter: %w", err) - } + e.lock.Lock() + defer e.lock.Unlock() - var loaded bool - v, loaded = e.exporters.LoadOrStore(s, newExp) - if !loaded { - // Start the goroutine only if we added the object to the map, otherwise is already started. - be, valid := newExp.(*baseExporter) - if !valid { - return nil, fmt.Errorf("%w: %T", errUnexpectedType, newExp) - } - // set metadata keys for base exporter to add them to the outgoing context. - be.setMetadata(md) - - err = newExp.start(ctx, e.host) - if err != nil { - e.exporters.Delete(s) - return nil, fmt.Errorf("failed to start exporter: %w", err) - } - e.size++ - } - e.lock.Unlock() + if e.config.MetadataCardinalityLimit != 0 && e.size >= int(e.config.MetadataCardinalityLimit) { + return nil, errTooManyExporters + } + + newExp, err := newExporter(e.config, e.settings, e.scf) + if err != nil { + return nil, fmt.Errorf("failed to create exporter: %w", err) } - val, ok := v.(exp) - if !ok { - return nil, fmt.Errorf("%w: %T", errUnexpectedType, v) + + var loaded bool + v, loaded = e.exporters.LoadOrStore(s, newExp) + if !loaded { + // set metadata keys for base exporter to add them to the outgoing context. + newExp.(*baseExporter).setMetadata(md) + + // Start the goroutine only if we added the object to the map, otherwise is already started. + err = newExp.start(ctx, e.host) + if err != nil { + e.exporters.Delete(s) + return nil, fmt.Errorf("failed to start exporter: %w", err) + } + e.size++ } - return val, nil + + return v.(exp), nil } +// getAttrSet is code taken from the core collector's batchprocessor multibatch logic. +// https://github.com/open-telemetry/opentelemetry-collector/blob/v0.107.0/processor/batchprocessor/batch_processor.go#L298 func (e *metadataExporter) getAttrSet(ctx context.Context, keys []string) (attribute.Set, metadata.MD) { // Get each metadata key value, form the corresponding // attribute set for use as a map lookup key. diff --git a/exporter/otelarrowexporter/metadata_test.go b/exporter/otelarrowexporter/metadata_test.go index 8dbfc548feb7..ab1ee90d689f 100644 --- a/exporter/otelarrowexporter/metadata_test.go +++ b/exporter/otelarrowexporter/metadata_test.go @@ -82,14 +82,12 @@ func TestSendTracesWithMetadata(t *testing.T) { requestCount := 3 spansPerRequest := 33 - // sentResourceSpans := ptrace.NewTraces().ResourceSpans() for requestNum := 0; requestNum < requestCount; requestNum++ { td := testdata.GenerateTraces(spansPerRequest) spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans() for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ { spans.At(spanIndex).SetName(fmt.Sprintf("%d-%d", requestNum, spanIndex)) } - // td.ResourceSpans().At(0).CopyTo(sentResourceSpans.AppendEmpty()) num := requestNum % len(callCtxs) expectByContext[num] += spansPerRequest @@ -99,13 +97,9 @@ func TestSendTracesWithMetadata(t *testing.T) { } assert.Eventually(t, func() bool { - // rcv.mux.Lock() - // defer rcv.mux.Unlock() return rcv.requestCount.Load() == int32(requestCount) }, 1*time.Second, 5*time.Millisecond) assert.Eventually(t, func() bool { - // rcv.mux.Lock() - // defer rcv.mux.Unlock() return rcv.totalItems.Load() == int32(requestCount*spansPerRequest) }, 1*time.Second, 5*time.Millisecond) assert.Eventually(t, func() bool { From 00f640e55de82af6dbfe6f3436dcd0e09e423da1 Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Tue, 10 Sep 2024 03:33:58 -0600 Subject: [PATCH 13/17] fix test --- exporter/otelarrowexporter/metadata_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/otelarrowexporter/metadata_test.go b/exporter/otelarrowexporter/metadata_test.go index ab1ee90d689f..6d453ff7434e 100644 --- a/exporter/otelarrowexporter/metadata_test.go +++ b/exporter/otelarrowexporter/metadata_test.go @@ -203,5 +203,5 @@ func TestMetadataExporterCardinalityLimit(t *testing.T) { return rcv.totalItems.Load() == int32(cardLimit) }, 1*time.Second, 5*time.Millisecond) - require.Equal(t, cardLimit, len(rcv.spanCountByMetadata)) + require.Len(t, rcv.spanCountByMetadata, cardLimit) } From 3f13edf61e3e2a9c5716a90db4144100a7a78a58 Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Fri, 13 Sep 2024 14:42:53 -0600 Subject: [PATCH 14/17] partial review feedback to isolate race --- exporter/otelarrowexporter/factory.go | 35 +++++++++++++------------ exporter/otelarrowexporter/metadata.go | 13 --------- exporter/otelarrowexporter/otelarrow.go | 1 - 3 files changed, 18 insertions(+), 31 deletions(-) diff --git a/exporter/otelarrowexporter/factory.go b/exporter/otelarrowexporter/factory.go index 6f976e5793e0..a868fa286d65 100644 --- a/exporter/otelarrowexporter/factory.go +++ b/exporter/otelarrowexporter/factory.go @@ -72,14 +72,15 @@ func createDefaultConfig() component.Config { } } -func (e *baseExporter) helperOptions() []exporterhelper.Option { +func helperOptions(e exp) []exporterhelper.Option { + cfg := e.getConfig().(*Config) return []exporterhelper.Option{ exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), - exporterhelper.WithTimeout(e.config.TimeoutSettings), - exporterhelper.WithRetry(e.config.RetryConfig), - exporterhelper.WithQueue(e.config.QueueSettings), + exporterhelper.WithTimeout(cfg.TimeoutSettings), + exporterhelper.WithRetry(cfg.RetryConfig), + exporterhelper.WithQueue(cfg.QueueSettings), exporterhelper.WithStart(e.start), - exporterhelper.WithBatcher(e.config.BatcherConfig), + exporterhelper.WithBatcher(cfg.BatcherConfig), exporterhelper.WithShutdown(e.shutdown), } } @@ -103,13 +104,13 @@ func createTracesExporter( set exporter.Settings, cfg component.Config, ) (exporter.Traces, error) { - exp, err := newMetadataExporter(cfg, set, createArrowTracesStream) + e, err := newMetadataExporter(cfg, set, createArrowTracesStream) if err != nil { return nil, err } - return exporterhelper.NewTracesExporter(ctx, exp.getSettings(), exp.getConfig(), - exp.pushTraces, - exp.helperOptions()..., + return exporterhelper.NewTracesExporter(ctx, e.getSettings(), e.getConfig(), + e.pushTraces, + helperOptions(e)..., ) } @@ -122,13 +123,13 @@ func createMetricsExporter( set exporter.Settings, cfg component.Config, ) (exporter.Metrics, error) { - exp, err := newMetadataExporter(cfg, set, createArrowMetricsStream) + e, err := newMetadataExporter(cfg, set, createArrowMetricsStream) if err != nil { return nil, err } - return exporterhelper.NewMetricsExporter(ctx, exp.getSettings(), exp.getConfig(), - exp.pushMetrics, - exp.helperOptions()..., + return exporterhelper.NewMetricsExporter(ctx, e.getSettings(), e.getConfig(), + e.pushMetrics, + helperOptions(e)..., ) } @@ -141,12 +142,12 @@ func createLogsExporter( set exporter.Settings, cfg component.Config, ) (exporter.Logs, error) { - exp, err := newMetadataExporter(cfg, set, createArrowLogsStream) + e, err := newMetadataExporter(cfg, set, createArrowLogsStream) if err != nil { return nil, err } - return exporterhelper.NewLogsExporter(ctx, exp.getSettings(), exp.getConfig(), - exp.pushLogs, - exp.helperOptions()..., + return exporterhelper.NewLogsExporter(ctx, e.getSettings(), e.getConfig(), + e.pushLogs, + helperOptions(e)..., ) } diff --git a/exporter/otelarrowexporter/metadata.go b/exporter/otelarrowexporter/metadata.go index 27a3f29a2ffc..52480db0139d 100644 --- a/exporter/otelarrowexporter/metadata.go +++ b/exporter/otelarrowexporter/metadata.go @@ -13,10 +13,8 @@ import ( "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" @@ -74,17 +72,6 @@ func (e *metadataExporter) getConfig() component.Config { return e.config } -func (e *metadataExporter) helperOptions() []exporterhelper.Option { - return []exporterhelper.Option{ - exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), - exporterhelper.WithTimeout(e.config.TimeoutSettings), - exporterhelper.WithRetry(e.config.RetryConfig), - exporterhelper.WithQueue(e.config.QueueSettings), - exporterhelper.WithStart(e.start), - exporterhelper.WithShutdown(e.shutdown), - } -} - func (e *metadataExporter) start(_ context.Context, host component.Host) (err error) { e.host = host return nil diff --git a/exporter/otelarrowexporter/otelarrow.go b/exporter/otelarrowexporter/otelarrow.go index bb22cdca3d6a..2b57f8803778 100644 --- a/exporter/otelarrowexporter/otelarrow.go +++ b/exporter/otelarrowexporter/otelarrow.go @@ -38,7 +38,6 @@ import ( ) type exp interface { - helperOptions() []exporterhelper.Option getSettings() exporter.Settings getConfig() component.Config From fa6b431ff8af28e4c47c5abe32c23a5c4e740b6d Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Fri, 13 Sep 2024 14:49:28 -0600 Subject: [PATCH 15/17] userAgent once --- exporter/otelarrowexporter/metadata.go | 19 +++++++++++++++++-- exporter/otelarrowexporter/otelarrow.go | 14 +------------- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/exporter/otelarrowexporter/metadata.go b/exporter/otelarrowexporter/metadata.go index 52480db0139d..b9a643784238 100644 --- a/exporter/otelarrowexporter/metadata.go +++ b/exporter/otelarrowexporter/metadata.go @@ -7,10 +7,12 @@ import ( "context" "errors" "fmt" + "runtime" "sort" "strings" "sync" + arrowPkg "github.com/apache/arrow/go/v16/arrow" "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumererror" @@ -21,6 +23,8 @@ import ( "go.opentelemetry.io/otel/attribute" "go.uber.org/multierr" "google.golang.org/grpc/metadata" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/compression/zstd" ) var ( @@ -36,6 +40,8 @@ type metadataExporter struct { metadataKeys []string exporters sync.Map + + userAgent string // Guards the size and the storing logic to ensure no more than limit items are stored. // If we are willing to allow "some" extra items than the limit this can be removed and size can be made atomic. @@ -47,6 +53,15 @@ var _ exp = (*metadataExporter)(nil) func newMetadataExporter(cfg component.Config, set exporter.Settings, streamClientFactory streamClientFactory) (exp, error) { oCfg := cfg.(*Config) + userAgent := fmt.Sprintf("%s/%s (%s/%s)", + set.BuildInfo.Description, set.BuildInfo.Version, runtime.GOOS, runtime.GOARCH) + + if !oCfg.Arrow.Disabled { + // Ignoring an error because Validate() was called. + _ = zstd.SetEncoderConfig(oCfg.Arrow.Zstd) + + userAgent += fmt.Sprintf(" ApacheArrow/%s (NumStreams/%d)", arrowPkg.PkgVersion, oCfg.Arrow.NumStreams) + } // use lower-case, to be consistent with http/2 headers. mks := make([]string, len(oCfg.MetadataKeys)) for i, k := range oCfg.MetadataKeys { @@ -54,7 +69,7 @@ func newMetadataExporter(cfg component.Config, set exporter.Settings, streamClie } sort.Strings(mks) if len(mks) == 0 { - return newExporter(cfg, set, streamClientFactory) + return newExporter(cfg, set, streamClientFactory, userAgent) } return &metadataExporter{ config: oCfg, @@ -132,7 +147,7 @@ func (e *metadataExporter) getOrCreateExporter(ctx context.Context, s attribute. return nil, errTooManyExporters } - newExp, err := newExporter(e.config, e.settings, e.scf) + newExp, err := newExporter(e.config, e.settings, e.scf, e.userAgent) if err != nil { return nil, fmt.Errorf("failed to create exporter: %w", err) } diff --git a/exporter/otelarrowexporter/otelarrow.go b/exporter/otelarrowexporter/otelarrow.go index 2b57f8803778..db72f72cf21b 100644 --- a/exporter/otelarrowexporter/otelarrow.go +++ b/exporter/otelarrowexporter/otelarrow.go @@ -6,11 +6,8 @@ package otelarrowexporter // import "github.com/open-telemetry/opentelemetry-col import ( "context" "errors" - "fmt" - "runtime" "time" - arrowPkg "github.com/apache/arrow/go/v16/arrow" arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configcompression" @@ -78,7 +75,7 @@ type streamClientFactory func(conn *grpc.ClientConn) arrow.StreamClientFunc // Crete new exporter and start it. The exporter will begin connecting but // this function may return before the connection is established. -func newExporter(cfg component.Config, set exporter.Settings, streamClientFactory streamClientFactory) (exp, error) { +func newExporter(cfg component.Config, set exporter.Settings, streamClientFactory streamClientFactory, userAgent string) (exp, error) { oCfg := cfg.(*Config) if oCfg.Endpoint == "" { @@ -89,15 +86,6 @@ func newExporter(cfg component.Config, set exporter.Settings, streamClientFactor if err != nil { return nil, err } - userAgent := fmt.Sprintf("%s/%s (%s/%s)", - set.BuildInfo.Description, set.BuildInfo.Version, runtime.GOOS, runtime.GOARCH) - - if !oCfg.Arrow.Disabled { - // Ignoring an error because Validate() was called. - _ = zstd.SetEncoderConfig(oCfg.Arrow.Zstd) - - userAgent += fmt.Sprintf(" ApacheArrow/%s (NumStreams/%d)", arrowPkg.PkgVersion, oCfg.Arrow.NumStreams) - } return &baseExporter{ config: oCfg, From 38f3188ddb67467401f6e9ee2430bcb5791e354c Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Tue, 17 Sep 2024 02:47:28 -0700 Subject: [PATCH 16/17] move netreporter and fix race? --- exporter/otelarrowexporter/metadata.go | 23 ++++++++++++++------- exporter/otelarrowexporter/metadata_test.go | 6 ++++-- exporter/otelarrowexporter/otelarrow.go | 7 +------ 3 files changed, 21 insertions(+), 15 deletions(-) diff --git a/exporter/otelarrowexporter/metadata.go b/exporter/otelarrowexporter/metadata.go index b9a643784238..a2d267d966c0 100644 --- a/exporter/otelarrowexporter/metadata.go +++ b/exporter/otelarrowexporter/metadata.go @@ -25,6 +25,7 @@ import ( "google.golang.org/grpc/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/compression/zstd" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats" ) var ( @@ -40,6 +41,7 @@ type metadataExporter struct { metadataKeys []string exporters sync.Map + netReporter *netstats.NetworkReporter userAgent string @@ -53,6 +55,10 @@ var _ exp = (*metadataExporter)(nil) func newMetadataExporter(cfg component.Config, set exporter.Settings, streamClientFactory streamClientFactory) (exp, error) { oCfg := cfg.(*Config) + netReporter, err := netstats.NewExporterNetworkReporter(set) + if err != nil { + return nil, err + } userAgent := fmt.Sprintf("%s/%s (%s/%s)", set.BuildInfo.Description, set.BuildInfo.Version, runtime.GOOS, runtime.GOARCH) @@ -69,13 +75,15 @@ func newMetadataExporter(cfg component.Config, set exporter.Settings, streamClie } sort.Strings(mks) if len(mks) == 0 { - return newExporter(cfg, set, streamClientFactory, userAgent) + return newExporter(cfg, set, streamClientFactory, userAgent, netReporter) } return &metadataExporter{ config: oCfg, settings: set, scf: streamClientFactory, metadataKeys: mks, + userAgent: userAgent, + netReporter: netReporter, }, nil } @@ -135,11 +143,6 @@ func (e *metadataExporter) pushLogs(ctx context.Context, ld plog.Logs) error { } func (e *metadataExporter) getOrCreateExporter(ctx context.Context, s attribute.Set, md metadata.MD) (exp, error) { - v, ok := e.exporters.Load(s) - if ok { - return v.(exp), nil - } - e.lock.Lock() defer e.lock.Unlock() @@ -147,7 +150,12 @@ func (e *metadataExporter) getOrCreateExporter(ctx context.Context, s attribute. return nil, errTooManyExporters } - newExp, err := newExporter(e.config, e.settings, e.scf, e.userAgent) + v, ok := e.exporters.Load(s) + if ok { + return v.(exp), nil + } + + newExp, err := newExporter(e.config, e.settings, e.scf, e.userAgent, e.netReporter) if err != nil { return nil, fmt.Errorf("failed to create exporter: %w", err) } @@ -164,6 +172,7 @@ func (e *metadataExporter) getOrCreateExporter(ctx context.Context, s attribute. e.exporters.Delete(s) return nil, fmt.Errorf("failed to start exporter: %w", err) } + e.size++ } diff --git a/exporter/otelarrowexporter/metadata_test.go b/exporter/otelarrowexporter/metadata_test.go index 6d453ff7434e..ce18b5f1dee2 100644 --- a/exporter/otelarrowexporter/metadata_test.go +++ b/exporter/otelarrowexporter/metadata_test.go @@ -48,6 +48,8 @@ func TestSendTracesWithMetadata(t *testing.T) { cfg.MetadataCardinalityLimit = 10 cfg.MetadataKeys = []string{"key1", "key2"} set := exportertest.NewNopSettings() + set.BuildInfo.Description = "Collector" + set.BuildInfo.Version = "1.2.3test" bg := context.Background() exp, err := factory.CreateTracesExporter(bg, set, cfg) require.NoError(t, err) @@ -57,8 +59,8 @@ func TestSendTracesWithMetadata(t *testing.T) { }() host := componenttest.NewNopHost() + assert.NoError(t, exp.Start(context.Background(), host)) - time.Sleep(1 * time.Second) // Ensure that initially there is no data in the receiver. assert.EqualValues(t, 0, rcv.requestCount.Load()) @@ -164,8 +166,8 @@ func TestMetadataExporterCardinalityLimit(t *testing.T) { }() host := componenttest.NewNopHost() + assert.NoError(t, exp.Start(context.Background(), host)) - time.Sleep(1 * time.Second) // Ensure that initially there is no data in the receiver. assert.EqualValues(t, 0, rcv.requestCount.Load()) diff --git a/exporter/otelarrowexporter/otelarrow.go b/exporter/otelarrowexporter/otelarrow.go index db72f72cf21b..07ccb25ea710 100644 --- a/exporter/otelarrowexporter/otelarrow.go +++ b/exporter/otelarrowexporter/otelarrow.go @@ -75,18 +75,13 @@ type streamClientFactory func(conn *grpc.ClientConn) arrow.StreamClientFunc // Crete new exporter and start it. The exporter will begin connecting but // this function may return before the connection is established. -func newExporter(cfg component.Config, set exporter.Settings, streamClientFactory streamClientFactory, userAgent string) (exp, error) { +func newExporter(cfg component.Config, set exporter.Settings, streamClientFactory streamClientFactory, userAgent string, netReporter *netstats.NetworkReporter) (exp, error) { oCfg := cfg.(*Config) if oCfg.Endpoint == "" { return nil, errors.New("OTLP exporter config requires an Endpoint") } - netReporter, err := netstats.NewExporterNetworkReporter(set) - if err != nil { - return nil, err - } - return &baseExporter{ config: oCfg, settings: set, From 32b9cb60e9e9c4a1ae293ff67ac379de5835e8c6 Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Tue, 17 Sep 2024 02:51:40 -0700 Subject: [PATCH 17/17] gofmt --- exporter/otelarrowexporter/metadata.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/exporter/otelarrowexporter/metadata.go b/exporter/otelarrowexporter/metadata.go index a2d267d966c0..1d24bfc6cb81 100644 --- a/exporter/otelarrowexporter/metadata.go +++ b/exporter/otelarrowexporter/metadata.go @@ -41,8 +41,8 @@ type metadataExporter struct { metadataKeys []string exporters sync.Map - netReporter *netstats.NetworkReporter - + netReporter *netstats.NetworkReporter + userAgent string // Guards the size and the storing logic to ensure no more than limit items are stored. @@ -82,8 +82,8 @@ func newMetadataExporter(cfg component.Config, set exporter.Settings, streamClie settings: set, scf: streamClientFactory, metadataKeys: mks, - userAgent: userAgent, - netReporter: netReporter, + userAgent: userAgent, + netReporter: netReporter, }, nil }